From d15580f47f431b3aeb9824db918054bf01c1f8b6 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 3 Sep 2021 09:10:38 -0300 Subject: [PATCH] Massive re-org to allow per-executor job specification formats and executor-specific task validation and expansion. A few different renames to try and keep things more consistent. --- README.md | 166 +++++++++++------- TODO.md | 51 +++--- daggy/include/daggy/Defines.hpp | 15 +- daggy/include/daggy/Serialization.hpp | 20 ++- daggy/include/daggy/Utilities.hpp | 14 +- .../executors/task/ForkingTaskExecutor.hpp | 28 +-- .../daggy/executors/task/TaskExecutor.hpp | 27 +-- .../daggy/loggers/dag_run/DAGRunLogger.hpp | 2 +- .../include/daggy/loggers/dag_run/Defines.hpp | 2 +- .../loggers/dag_run/FileSystemLogger.hpp | 2 +- .../daggy/loggers/dag_run/OStreamLogger.hpp | 2 +- daggy/src/Serialization.cpp | 151 +++++++++------- daggy/src/Server.cpp | 26 ++- daggy/src/Utilities.cpp | 58 ++++-- .../executors/task/ForkingTaskExecutor.cpp | 35 +++- .../src/loggers/dag_run/FileSystemLogger.cpp | 2 +- daggy/src/loggers/dag_run/OStreamLogger.cpp | 7 +- tests/unit_dagrun_loggers.cpp | 10 +- tests/unit_executor_forkingexecutor.cpp | 51 +++++- tests/unit_serialization.cpp | 59 ++++--- tests/unit_server.cpp | 22 ++- tests/unit_utilities.cpp | 59 ++++--- 22 files changed, 509 insertions(+), 300 deletions(-) diff --git a/README.md b/README.md index 41a5df7..ecf58be 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ DAG Run Definition daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is specifically for submitting DAGs to the REST server for execution (a DAG run). -DAGs are defined in JSON as a set of `tasks`, along with optional `taskParameters` and `executionParameters` (future). +DAGs are defined in JSON as a set of `tasks`, along with optional `job` and `executionParameters` (future). Basic Definition -- @@ -84,18 +84,22 @@ Below is an example DAG Run submission: { "tasks": { "task_one": { - "command": [ - "/usr/bin/touch", - "/tmp/somefile" - ], + "job": { + "command": [ + "/usr/bin/touch", + "/tmp/somefile" + ] + }, "maxRetries": 3, "retryIntervalSeconds": 30 }, "task_two": { - "command": [ - "/usr/bin/touch", - "/tmp/someotherfile" - ], + "job": { + "command": [ + "/usr/bin/touch", + "/tmp/someotherfile" + ] + }, "maxRetries": 3, "retryIntervalSeconds": 30, "parents": [ @@ -109,24 +113,25 @@ Below is an example DAG Run submission: Task Parameters -- -Task commands can be parameterized by passing in an optional `taskParameters` member. Each parameter consists of a name -and either a string value, or an array of string values. Task commands will be regenerated based on the values of the -parameters. +Task commands can be parameterized by passing in an optional `parameters` member. Each parameter consists of a name and +either a string value, or an array of string values. Tasks will be regenerated based on the values of the parameters. For instance: ```json { - "taskParameters": { + "parameters": { "DIRECTORY": "/var/tmp", "FILE": "somefile" }, "tasks": { "task_one": { - "command": [ - "/usr/bin/touch", - "{{DIRECTORY}}/{{FILE}}" - ], + "job": { + "command": [ + "/usr/bin/touch", + "{{DIRECTORY}}/{{FILE}}" + ] + }, "maxRetries": 3, "retryIntervalSeconds": 30 } @@ -135,7 +140,7 @@ For instance: ``` `task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be -populated from the `taskParameters` values. +populated from the `job` values. In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the cartesian product of all relevant values. @@ -144,7 +149,7 @@ Example: ```json { - "taskParameters": { + "job": { "DIRECTORY": "/var/tmp", "FILE": "somefile", "DATE": [ @@ -155,22 +160,28 @@ Example: }, "tasks": { "populate_inputs": { - "command": [ - "/usr/bin/touch", - "{{DIRECTORY}}/{{FILE}}" - ] + "job": { + "command": [ + "/usr/bin/touch", + "{{DIRECTORY}}/{{FILE}}" + ] + } }, "calc_date": { - "command": [ - "/path/to/calculator", - "{{DIRECTORY}}/{{FILE}}", - "{{DATE}}" - ] + "job": { + "command": [ + "/path/to/calculator", + "{{DIRECTORY}}/{{FILE}}", + "{{DATE}}" + ] + } }, "generate_report": { - "command": [ - "/path/to/generator" - ] + "job": { + "command": [ + "/path/to/generator" + ] + } } } } @@ -200,37 +211,48 @@ graph LR - `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01` - `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01` +**NB**: When a task template resolves to multiple tasks instances, all of those new instances are still referred to by +the original name for the purposes of creating dependencies. e.g. to add a dependency dynamically (see next section), +you must refer to `"children": [ "calc_date" ]`, not to the individual `calc_date_1`. + Tasks Generating Tasks ---------------------- -Some DAG structures cannot be known ahead of time, but only at runtime. For instance, if a job pulls multiple files -from a source, each of which can be processed independently, it would be nice if the DAG could modify itself on the fly -to accomodate that request. +Some DAG structures can only be fully known at runtime. For instance, if a job pulls multiple files from a source, each +of which can be processed independently, it would be nice if the DAG could modify itself on the fly to accomodate that +request. -Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be -a JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above, -and can freely define their dependencies the same way. +Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be a +JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above, using +the same parameter list as the original DAG. New tasks can define their own dependencies. **NB:** Generated tasks won't have any children dependencies unless you define them. If there are parameterized dependencies, you must use the name of the original task (e.g. use `calc_date`, not `calc_date_1`) to add a dependency. -**NB:** If you add a child dependency to a task that has already completed, weird things will happen. Don't do it. +**NB:** If you add a child dependency to a task that has already completed, that task won't restart. Best practice is to +create a dependency from the generator task to the task the new tasks will depend on. ```json { "tasks": { "pull_files": { - "command": [ - "/path/to/puller/script", - "{{DATE}}" - ], + "job": { + "command": [ + "/path/to/puller/script", + "{{DATE}}" + ] + }, "isGenerator": true, - children: [ "generate_report" ] + children: [ + "generate_report" + ] }, "generate_report": { - "command": [ - "/path/to/generator" - ] + "job": { + "command": [ + "/path/to/generator" + ] + } } } } @@ -245,20 +267,29 @@ The output of the puller task might be: ```json { - "calc_date_a": { - "command": [ - "/path/to/calculator", - "/path/to/data/file/a" - ], - "children": ["generate_report"] + "calc_date_a": { + "job": { + command + ": [ + "/path/to/calculator", + "/path/to/data/file/a" + ] }, - "calc_date_b": { - "command": [ - "/path/to/calculator", - "/path/to/data/file/b" - ], - "children": ["generate_report"] - } + "children": [ + "generate_report" + ] + }, + "calc_date_b": { + "job": { + "command": [ + "/path/to/calculator", + "/path/to/data/file/b" + ] + }, + "children": [ + "generate_report" + ] + } } ``` @@ -272,8 +303,9 @@ graph LR calc_file_a-->generate_report calc_file_b-->generate_report ``` -Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would -run concurrently, and the `generate_report` wouldn't have any files to report on. + +Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would run concurrently, +and the `generate_report` wouldn't have any files to report on. Execution Parameters -- @@ -288,3 +320,15 @@ jobs on slurm with a specific set of restrictions, or allow for local execution |-----------|-------------| | pool | Names the executor the DAG should run on | | poolParameters | Any parameters the executor accepts that might modify how a task is run | + +Executors +========= + +Different executors require different structures for the `job` task member. + +Default Job Values +------------------ + +A DAG can be submitted with the extra section `jobDefaults`. These values will be used to fill in default values for all +tasks if they aren't overridden. This can be useful for cases like Slurm execution, where tasks will share default +memory and runtime requirements. diff --git a/TODO.md b/TODO.md index b6fb763..d7e6289 100644 --- a/TODO.md +++ b/TODO.md @@ -1,27 +1,30 @@ Tasks == -- Open - - REST Server - - [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?) - - Core Functionality - - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma - separated list - - Executors - - [ ] Slurm Executor - - Loggers - - [ ] FileSystemLogger - - [ ] Add unit tests - - [ ] Add more error checking - - [ ] General logger - - [ ] Redis DAGRunLogger - - Server - - [ ] Multiple execution pools - - [ ] per-Executor parameters - - Utilities - - daggyd - - [ ] Add config file support - - [ ] Support for all the different executors / state loggers - - daggyc - - [ ] Submission - - [ ] Querying +- REST Server + - [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?) +- Core Functionality + - rename TaskList to TaskSet + - Add support for Task.runSpec + - Executors can validate runspecs + - Executors can expand runspecs + - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma + separated list + - Executors + - [ ] Slurm Executor + - Loggers + - [ ] FileSystemLogger + - [ ] Add unit tests + - [ ] Add more error checking + - [ ] General logger + - [ ] Redis DAGRunLogger + - Server + - [ ] Multiple execution pools + - [ ] per-Executor parameters +- Utilities + - daggyd + - [ ] Add config file support + - [ ] Support for all the different executors / state loggers + - daggyc + - [ ] Submission + - [ ] Querying diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index 2569f06..8d6587e 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -11,8 +11,8 @@ namespace daggy { // Commands and parameters - using ParameterValue = std::variant>; - using ParameterValues = std::unordered_map; + using ConfigValue = std::variant>; + using ConfigValues = std::unordered_map; using Command = std::vector; // Time @@ -21,10 +21,9 @@ namespace daggy { // DAG Runs using DAGRunID = size_t; - using TaskID = size_t; BETTER_ENUM(RunState, uint32_t, - QUEUED = 1, + QUEUED = 1 << 0, RUNNING = 1 << 1, RETRY = 1 << 2, ERRORED = 1 << 3, @@ -34,25 +33,25 @@ namespace daggy { struct Task { std::string definedName; - std::vector command; + bool isGenerator; // True if the output of this task is a JSON set of tasks to complete uint32_t maxRetries; uint32_t retryIntervalSeconds; // Time to wait between retries + ConfigValues job; // It's up to the individual inspectors to convert values from strings // array of strings std::unordered_set children; std::unordered_set parents; - bool isGenerator; // True if the output of this task is a JSON set of tasks to complete bool operator==(const Task &other) const { return (definedName == other.definedName) and (maxRetries == other.maxRetries) and (retryIntervalSeconds == other.retryIntervalSeconds) - and (command == other.command) + and (job == other.job) and (children == other.children) and (parents == other.parents) and (isGenerator == other.isGenerator); } }; - using TaskList = std::unordered_map; + using TaskSet = std::unordered_map; struct AttemptRecord { TimePoint startTime; diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 66c343b..cdea8fe 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -12,22 +12,26 @@ namespace rj = rapidjson; namespace daggy { - // Parameters - ParameterValues parametersFromJSON(const std::string &jsonSpec); + void checkRJParse(const rj::ParseResult &result, const std::string &prefix = ""); - ParameterValues parametersFromJSON(const rj::Value &spec); + // Parameters + ConfigValues configFromJSON(const std::string &jsonSpec); + + ConfigValues configFromJSON(const rj::Value &spec); + + std::string configToJSON(const ConfigValues &config); // Tasks - TaskList - taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues ¶meters = {}); + Task + taskFromJSON(const std::string &name, const rj::Value &spec, const ConfigValues &jobDefaults = {}); - TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters = {}); + TaskSet tasksFromJSON(const std::string &jsonSpec, const ConfigValues &jobDefaults = {}); - TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters = {}); + TaskSet tasksFromJSON(const rj::Value &spec, const ConfigValues &jobDefaults = {}); std::string taskToJSON(const Task &task); - std::string tasksToJSON(const TaskList &tasks); + std::string tasksToJSON(const TaskSet &tasks); // Attempt Records std::string attemptRecordToJSON(const AttemptRecord &attemptRecord); diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 26469f4..345922c 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -17,13 +17,19 @@ namespace daggy { std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement); - std::vector expandCommands(const std::vector &command, const ParameterValues ¶meters); + std::vector interpolateValues(const std::vector &raw, const ConfigValues &values); + + TaskSet + expandTaskSet(const TaskSet &tasks, + executors::task::TaskExecutor &executor, + const ConfigValues &interpolatedValues = {}); + TaskDAG - buildDAGFromTasks(TaskList &tasks, + buildDAGFromTasks(TaskSet &tasks, const std::vector &updates = {}); - void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks); + void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks); // Blocking call std::vector @@ -37,7 +43,7 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger, TaskDAG dag, - const ParameterValues taskParameters = {}); + const ConfigValues job = {}); std::ostream &operator<<(std::ostream &os, const TimePoint &tp); } diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index 3b0bd4b..bc0e60e 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -2,18 +2,22 @@ #include "TaskExecutor.hpp" -namespace daggy { - namespace executors { - namespace task { - class ForkingTaskExecutor : public TaskExecutor { - public: - ForkingTaskExecutor(size_t nThreads) - : TaskExecutor(nThreads) {} +namespace daggy::executors::task { + class ForkingTaskExecutor : public TaskExecutor { + public: + using Command = std::vector; - const std::string getName() const override { return "ForkingTaskExecutor"; } + ForkingTaskExecutor(size_t nThreads) + : TaskExecutor(nThreads) {} - AttemptRecord runCommand(const Task &task) override; - }; - } - } + // Validates the job to ensure that all required values are set and are of the right type, + bool validateTaskParameters(const ConfigValues &job) override; + + std::vector + expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; + + // Runs the task + AttemptRecord execute(const Task &task) override; + + }; } diff --git a/daggy/include/daggy/executors/task/TaskExecutor.hpp b/daggy/include/daggy/executors/task/TaskExecutor.hpp index 0b7c116..80948c9 100644 --- a/daggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/TaskExecutor.hpp @@ -14,20 +14,21 @@ If there are many retries, logs are returned for each attempt. */ -namespace daggy { - namespace executors { - namespace task { - class TaskExecutor { - public: - TaskExecutor(size_t nThreads) : threadPool(nThreads) {}; +namespace daggy::executors::task { + class TaskExecutor { + public: + TaskExecutor(size_t nThreads) : threadPool(nThreads) {}; - virtual const std::string getName() const = 0; + // Validates the job to ensure that all required values are set and are of the right type, + virtual bool validateTaskParameters(const ConfigValues &job) = 0; - // This will block if the dag_executor is full - virtual AttemptRecord runCommand(const Task &task) = 0; + // Will use the expansion values to return the fully expanded tasks. + virtual std::vector + expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) = 0; - ThreadPool threadPool; - }; - } - } + // Blocking execution of a task + virtual AttemptRecord execute(const Task &task) = 0; + + ThreadPool threadPool; + }; } diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index c81957f..815d48e 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -17,7 +17,7 @@ namespace daggy { class DAGRunLogger { public: // Execution - virtual DAGRunID startDAGRun(std::string name, const TaskList &tasks) = 0; + virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0; virtual void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) = 0; diff --git a/daggy/include/daggy/loggers/dag_run/Defines.hpp b/daggy/include/daggy/loggers/dag_run/Defines.hpp index ef2f674..a18fc94 100644 --- a/daggy/include/daggy/loggers/dag_run/Defines.hpp +++ b/daggy/include/daggy/loggers/dag_run/Defines.hpp @@ -23,7 +23,7 @@ namespace daggy::loggers::dag_run { // Pretty heavy weight, but struct DAGRunRecord { std::string name; - TaskList tasks; + TaskSet tasks; std::unordered_map taskRunStates; std::unordered_map> taskAttempts; std::vector taskStateChanges; diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp index 5c0f0b4..a34c5a7 100644 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -37,7 +37,7 @@ namespace daggy::loggers::dag_run { FileSystemLogger(fs::path root); // Execution - DAGRunID startDAGRun(std::string name, const TaskList &tasks) override; + DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override; diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 1f43839..13c06c0 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -18,7 +18,7 @@ namespace daggy { OStreamLogger(std::ostream &os); // Execution - DAGRunID startDAGRun(std::string name, const TaskList &tasks) override; + DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override; void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) override; diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 9c73a70..e22e9cc 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -1,22 +1,30 @@ #include #include +#include + #include #include namespace daggy { - - ParameterValues parametersFromJSON(const std::string &jsonSpec) { - rj::Document doc; - rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); - if (!parseResult) { - throw std::runtime_error("Parameters spec is not valid JSON"); + void checkRJParse(const rj::ParseResult &result, const std::string &prefix) { + if (!result) { + std::stringstream ss; + ss << (prefix.empty() ? "" : prefix + ':') + << "Error parsing JSON: " << rj::GetParseError_En(result.Code()) + << " at byte offset " << result.Offset(); + throw std::runtime_error(ss.str()); } - return parametersFromJSON(doc); } - ParameterValues parametersFromJSON(const rj::Value &spec) { - std::unordered_map parameters; + ConfigValues configFromJSON(const std::string &jsonSpec) { + rj::Document doc; + checkRJParse(doc.Parse(jsonSpec.c_str()), "Parsing config"); + return configFromJSON(doc); + } + + ConfigValues configFromJSON(const rj::Value &spec) { + std::unordered_map parameters; if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); } for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { if (!it->name.IsString()) { @@ -44,86 +52,105 @@ namespace daggy { return parameters; } - TaskList - taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues ¶meters) { - TaskList tasks; + std::string configToJSON(const ConfigValues &config) { + std::stringstream ss; + ss << '{'; + bool first = true; + for (const auto &[k, v]: config) { + if (first) { first = false; } else { ss << ", "; } + ss << std::quoted(k) << ": "; + if (std::holds_alternative(v)) { + ss << std::quoted(std::get(v)); + } else { + ss << '['; + const auto &vals = std::get>(v); + bool firstVal = true; + for (const auto &val: vals) { + if (firstVal) { firstVal = false; } else { ss << ", "; } + ss << std::quoted(val); + } + ss << ']'; + } + } + ss << '}'; + return ss.str(); + } + + Task + taskFromJSON(const std::string &name, const rj::Value &spec, const ConfigValues &jobDefaults) { + Task task{ + .definedName = name, + .isGenerator = false, + .maxRetries = 0, + .retryIntervalSeconds = 0, + .job = jobDefaults + }; if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); } - if (!spec.HasMember("command")) { - throw std::runtime_error("Task " + name + " is missing required 'command' field"); - } - // Grab the standard fields with defaults; - bool isGenerator = false; if (spec.HasMember("isGenerator")) { - isGenerator = spec["isGenerator"].GetBool(); + task.isGenerator = spec["isGenerator"].GetBool(); } - uint8_t maxRetries = 0; - if (spec.HasMember("maxRetries")) { maxRetries = spec["maxRetries"].GetInt(); } - uint8_t retryIntervalSeconds = 0; - if (spec.HasMember( - "retryIntervalSeconds")) { retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt(); } + if (spec.HasMember("maxRetries")) { + task.maxRetries = spec["maxRetries"].GetInt(); + } + + if (spec.HasMember("retryIntervalSeconds")) { + task.retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt(); + } // Children / parents - std::unordered_set children; if (spec.HasMember("children")) { const auto &specChildren = spec["children"].GetArray(); for (size_t c = 0; c < specChildren.Size(); ++c) { - children.insert(specChildren[c].GetString()); + task.children.insert(specChildren[c].GetString()); } } - std::unordered_set parents; if (spec.HasMember("parents")) { const auto &specParents = spec["parents"].GetArray(); for (size_t c = 0; c < specParents.Size(); ++c) { - parents.insert(specParents[c].GetString()); + task.parents.insert(specParents[c].GetString()); } } - // Build out the commands - std::vector command; - for (size_t cmd = 0; cmd < spec["command"].Size(); ++cmd) { - command.emplace_back(spec["command"][cmd].GetString()); + if (spec.HasMember("job")) { + const auto ¶ms = spec["job"]; + if (!params.IsObject()) throw std::runtime_error("job is not a dictionary."); + for (auto it = params.MemberBegin(); it != params.MemberEnd(); ++it) { + if (!it->name.IsString()) throw std::runtime_error("job key must be a string."); + if (it->value.IsArray()) { + std::vector values; + for (size_t i = 0; i < it->value.Size(); ++i) { + values.emplace_back(it->value[i].GetString()); + } + task.job.insert_or_assign(it->name.GetString(), values); + } else { + task.job.insert_or_assign(it->name.GetString(), it->value.GetString()); + } + } } - auto commands = expandCommands(command, parameters); - // Create the tasks - for (size_t tid = 0; tid < commands.size(); ++tid) { - std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid)); - tasks.emplace(taskName, Task{ - .definedName = name, - .command = commands[tid], - .maxRetries = maxRetries, - .retryIntervalSeconds = retryIntervalSeconds, - .children = children, - .parents = parents, - .isGenerator = isGenerator - }); - } - return tasks; + return task; } - TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters) { + TaskSet tasksFromJSON(const std::string &jsonSpec, const ConfigValues &jobDefaults) { rj::Document doc; - rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); - if (!parseResult) { - throw std::runtime_error("Unable to parse spec: "); - } - return tasksFromJSON(doc, parameters); + checkRJParse(doc.Parse(jsonSpec.c_str())); + return tasksFromJSON(doc, jobDefaults); } - TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters) { - TaskList tasks; + TaskSet tasksFromJSON(const rj::Value &spec, const ConfigValues &jobDefaults) { + TaskSet tasks; if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); } // Tasks for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { if (!it->name.IsString()) throw std::runtime_error("Task names must be a string."); if (!it->value.IsObject()) throw std::runtime_error("Task definitions must be an object."); - auto subTasks = taskFromJSON(it->name.GetString(), it->value, parameters); - tasks.merge(subTasks); + const auto &taskName = it->name.GetString(); + tasks.emplace(taskName, taskFromJSON(taskName, it->value, jobDefaults)); } return tasks; } @@ -138,15 +165,7 @@ namespace daggy { << R"("maxRetries": )" << task.maxRetries << ',' << R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ','; - // Commands - ss << R"("command": [)"; - first = true; - for (const auto &part: task.command) { - if (!first) ss << ','; - ss << std::quoted(part); - first = false; - } - ss << "],"; + ss << R"("job": )" << configToJSON(task.job) << ','; ss << R"("children": [)"; first = true; @@ -163,7 +182,7 @@ namespace daggy { return ss.str(); } - std::string tasksToJSON(const TaskList &tasks) { + std::string tasksToJSON(const TaskSet &tasks) { std::stringstream ss; ss << "{"; diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 541c81f..8d847b4 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -85,7 +85,7 @@ namespace daggy { /* * { * "name": "DAG Run Name" - * "taskParameters": {...} + * "job": {...} * "tasks": {...} */ void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { @@ -105,21 +105,33 @@ namespace daggy { std::string runName = doc["name"].GetString(); // Get parameters if there are any - ParameterValues parameters; - if (doc.HasMember("taskParameters")) { + ConfigValues parameters; + if (doc.HasMember("parameters")) { try { - auto parsedParams = parametersFromJSON(doc["taskParameters"].GetObject()); + auto parsedParams = configFromJSON(doc["parameters"].GetObject()); parameters.swap(parsedParams); } catch (std::exception &e) { REQ_ERROR(Bad_Request, e.what()); } } + // Job Defaults + ConfigValues jobDefaults; + if (doc.HasMember("jobDefaults")) { + try { + auto parsedJobDefaults = configFromJSON(doc["jobDefaults"].GetObject()); + jobDefaults.swap(parsedJobDefaults); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, e.what()); + } + } + // Get the tasks - TaskList tasks; + TaskSet tasks; try { - auto parsedTasks = tasksFromJSON(doc["tasks"], parameters); - tasks.swap(parsedTasks); + auto taskTemplates = tasksFromJSON(doc["tasks"], jobDefaults); + auto expandedTasks = expandTaskSet(taskTemplates, executor_, parameters); + tasks.swap(expandedTasks); } catch (std::exception &e) { REQ_ERROR(Bad_Request, e.what()); } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index d183273..66711ab 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -14,14 +14,14 @@ namespace daggy { } std::vector> - expandCommands(const std::vector &command, const ParameterValues ¶meters) { - std::vector> commands{{}}; + interpolateValues(const std::vector &raw, const ConfigValues &values) { + std::vector> cooked{{}}; - for (const auto &part: command) { + for (const auto &part: raw) { std::vector expandedPart{part}; // Find all values of parameters, and expand them - for (const auto &[paramRaw, paramValue]: parameters) { + for (const auto &[paramRaw, paramValue]: values) { std::string param = "{{" + paramRaw + "}}"; auto pos = part.find(param); if (pos == std::string::npos) continue; @@ -44,17 +44,42 @@ namespace daggy { std::vector> newCommands; for (const auto &newPart: expandedPart) { - for (auto cmd: commands) { + for (auto cmd: cooked) { cmd.push_back(newPart); newCommands.emplace_back(cmd); } } - commands.swap(newCommands); + cooked.swap(newCommands); } - return commands; + return cooked; } - void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks) { + TaskSet + expandTaskSet(const TaskSet &tasks, + executors::task::TaskExecutor &executor, + const ConfigValues &interpolatedValues) { + // Expand the tasks first + TaskSet newTaskSet; + for (const auto &[baseName, task]: tasks) { + executor.validateTaskParameters(task.job); + const auto newJobs = executor.expandTaskParameters(task.job, interpolatedValues); + if (newJobs.size() == 1) { + newTaskSet.emplace(baseName, task); + } else { + size_t i = 0; + for (const auto &newJob: newJobs) { + Task newTask{task}; + newTask.job = newJob; + newTaskSet.emplace(baseName + "_" + std::to_string(i), newTask); + ++i; + } + } + } + return newTaskSet; + } + + + void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks) { // Add all the vertices std::unordered_map> definedSets; for (const auto &[name, task]: tasks) { @@ -79,10 +104,9 @@ namespace daggy { } } - TaskDAG buildDAGFromTasks(TaskList &tasks, + TaskDAG buildDAGFromTasks(TaskSet &tasks, const std::vector &updates) { TaskDAG dag; - updateDAGFromTasks(dag, tasks); dag.reset(); @@ -111,7 +135,7 @@ namespace daggy { logger.updateTaskState(runID, taskName, RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { - attempts.push_back(executor.runCommand(task)); + attempts.push_back(executor.execute(task)); logger.logTaskAttempt(runID, taskName, attempts.back()); if (attempts.back().rc == 0) break; logger.updateTaskState(runID, taskName, RunState::RETRY); @@ -123,7 +147,7 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger, TaskDAG dag, - const ParameterValues taskParameters + const ConfigValues parameters ) { logger.updateDAGRunState(runID, RunState::RUNNING); @@ -146,9 +170,11 @@ namespace daggy { auto &task = vert.data; if (task.isGenerator) { // Parse the output and update the DAGs - // TODO: Let the logger know about the new tasks try { - auto newTasks = tasksFromJSON(attemptRecords.back().outputLog, taskParameters); + auto newTasks = expandTaskSet(tasksFromJSON(attemptRecords.back().outputLog), + executor, + parameters + ); updateDAGFromTasks(dag, newTasks); for (const auto &[ntName, ntTask]: newTasks) { @@ -158,6 +184,10 @@ namespace daggy { } logger.updateTask(runID, taskName, task); } catch (std::exception &e) { + logger.logTaskAttempt(runID, taskName, + AttemptRecord{.executorLog = + std::string{"Failed to parse JSON output: "} + + e.what()}); logger.updateTaskState(runID, taskName, RunState::ERRORED); ++errored; } diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index f1b3eeb..3f6184d 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -31,23 +32,26 @@ std::string slurp(int fd) { } daggy::AttemptRecord -ForkingTaskExecutor::runCommand(const Task &task) { +ForkingTaskExecutor::execute(const Task &task) { AttemptRecord rec; rec.startTime = Clock::now(); // Need to convert the strings std::vector argv; - for (const auto &s : task.command) { + const auto command = std::get(task.job.at("command")); + for (const auto &s: command) { argv.push_back(const_cast(s.c_str())); } argv.push_back(nullptr); // Create the pipe int stdoutPipe[2]; - pipe2(stdoutPipe, O_DIRECT); + int pipeRC = pipe2(stdoutPipe, O_DIRECT); + if (pipeRC != 0) throw std::runtime_error("Unable to create pipe for stdout"); int stderrPipe[2]; - pipe2(stderrPipe, O_DIRECT); + pipeRC = pipe2(stderrPipe, O_DIRECT); + if (pipeRC != 0) throw std::runtime_error("Unable to create pipe for stderr"); pid_t child = fork(); if (child < 0) { @@ -84,3 +88,26 @@ ForkingTaskExecutor::runCommand(const Task &task) { return rec; } + +bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job) { + auto it = job.find("command"); + if (it == job.end()) + throw std::runtime_error(R"(job does not have a "command" argument)"); + if (!std::holds_alternative(it->second)) + throw std::runtime_error(R"(taskParameter's "command" must be an array of strings)"); + return true; +} + +std::vector +ForkingTaskExecutor::expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) { + std::vector newValues; + + const auto command = std::get(job.at("command")); + for (const auto &expandedCommand: interpolateValues(command, expansionValues)) { + ConfigValues newCommand{job}; + newCommand.at("command") = expandedCommand; + newValues.emplace_back(newCommand); + } + + return newValues; +} diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index 4aea9dd..253cabf 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -39,7 +39,7 @@ namespace daggy { } // Execution - DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskList &tasks) { + DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskSet &tasks) { DAGRunID runID = nextRunID_++; // TODO make this threadsafe diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index d826362..3700404 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -4,6 +4,7 @@ #include #include +#include namespace daggy { namespace loggers { @@ -11,7 +12,7 @@ namespace daggy { OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {} // Execution - DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskList &tasks) { + DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskSet &tasks) { std::lock_guard lock(guard_); size_t runID = dagRuns_.size(); dagRuns_.push_back({ @@ -26,9 +27,7 @@ namespace daggy { os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; for (const auto &[name, task]: tasks) { - os_ << "TASK (" << name << "): "; - std::copy(task.command.begin(), task.command.end(), - std::ostream_iterator(os_, " ")); + os_ << "TASK (" << name << "): " << configToJSON(task.job); os_ << std::endl; } return runID; diff --git a/tests/unit_dagrun_loggers.cpp b/tests/unit_dagrun_loggers.cpp index e22d36b..1f826a9 100644 --- a/tests/unit_dagrun_loggers.cpp +++ b/tests/unit_dagrun_loggers.cpp @@ -12,13 +12,13 @@ namespace fs = std::filesystem; using namespace daggy; using namespace daggy::loggers::dag_run; -const TaskList SAMPLE_TASKS{ - {"work_a", Task{.command{"/bin/echo", "a"}, .children{"c"}}}, - {"work_b", Task{.command{"/bin/echo", "b"}, .children{"c"}}}, - {"work_c", Task{.command{"/bin/echo", "c"}}} +const TaskSet SAMPLE_TASKS{ + {"work_a", Task{.job{{"command", std::vector{"/bin/echo", "a"}}}, .children{"c"}}}, + {"work_b", Task{.job{{"command", std::vector{"/bin/echo", "b"}}}, .children{"c"}}}, + {"work_c", Task{.job{{"command", std::vector{"/bin/echo", "c"}}}}} }; -inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, const TaskList &tasks) { +inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, const TaskSet &tasks) { auto runID = logger.startDAGRun(name, tasks); auto dagRun = logger.getDAGRun(runID); diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index 576fc09..d225f20 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -2,6 +2,8 @@ #include #include "daggy/executors/task/ForkingTaskExecutor.hpp" +#include "daggy/Serialization.hpp" +#include "daggy/Utilities.hpp" #include @@ -9,9 +11,12 @@ TEST_CASE("Basic Execution", "[forking_executor]") { daggy::executors::task::ForkingTaskExecutor ex(10); SECTION("Simple Run") { - daggy::Task task{.command{"/usr/bin/echo", "abc", "123"}}; + daggy::Task task{.job{ + {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/echo", "abc", "123"}}}}; - auto rec = ex.runCommand(task); + REQUIRE(ex.validateTaskParameters(task.job)); + + auto rec = ex.execute(task); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -19,9 +24,10 @@ TEST_CASE("Basic Execution", "[forking_executor]") { } SECTION("Error Run") { - daggy::Task task{.command{"/usr/bin/expr", "1", "+", "+"}}; + daggy::Task task{.job{ + {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}}; - auto rec = ex.runCommand(task); + auto rec = ex.execute(task); REQUIRE(rec.rc == 2); REQUIRE(rec.errorLog.size() >= 20); @@ -33,16 +39,45 @@ TEST_CASE("Basic Execution", "[forking_executor]") { "/usr/share/dict/linux.words", "/usr/share/dict/cracklib-small", "/etc/ssh/moduli" }; - for (const auto &bigFile : BIG_FILES) { + for (const auto &bigFile: BIG_FILES) { if (!std::filesystem::exists(bigFile)) continue; - daggy::Task task{.command{"/usr/bin/cat", bigFile}}; + daggy::Task task{.job{ + {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/cat", bigFile}}}}; - auto rec = ex.runCommand(task); + auto rec = ex.execute(task); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); REQUIRE(rec.errorLog.empty()); } } -} + + SECTION("Parameter Expansion") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"}; + auto params = daggy::configFromJSON(testParams); + + std::string taskJSON = R"({"B": {"job": {"command": ["/usr/bin/echo", "{{DATE}}"]}, "children": ["C"]}})"; + auto tasks = daggy::tasksFromJSON(taskJSON); + + auto result = daggy::expandTaskSet(tasks, ex, params); + REQUIRE(result.size() == 2); + } + + SECTION("Build with expansion") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; + auto params = daggy::configFromJSON(testParams); + std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["B"]}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})"; + auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks), ex, params); + REQUIRE(tasks.size() == 4); + } + + SECTION("Build with expansion using parents instead of children") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; + auto params = daggy::configFromJSON(testParams); + std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "parents": ["A"]}, "C": {"job": {"command": ["/bin/echo", "C"]}, "parents": ["A"]}})"; + auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks), ex, params); + + REQUIRE(tasks.size() == 4); + } +} \ No newline at end of file diff --git a/tests/unit_serialization.cpp b/tests/unit_serialization.cpp index dd11fb0..74a3671 100644 --- a/tests/unit_serialization.cpp +++ b/tests/unit_serialization.cpp @@ -11,49 +11,68 @@ namespace fs = std::filesystem; TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") { SECTION("Basic Parse") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; - auto params = daggy::parametersFromJSON(testParams); + auto params = daggy::configFromJSON(testParams); REQUIRE(params.size() == 2); REQUIRE(std::holds_alternative>(params["DATE"])); REQUIRE(std::holds_alternative(params["SOURCE"])); }SECTION("Invalid JSON") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"}; - REQUIRE_THROWS(daggy::parametersFromJSON(testParams)); + REQUIRE_THROWS(daggy::configFromJSON(testParams)); }SECTION("Non-string Keys") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], 6: "name"})"}; - REQUIRE_THROWS(daggy::parametersFromJSON(testParams)); + REQUIRE_THROWS(daggy::configFromJSON(testParams)); }SECTION("Non-array/Non-string values") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": {"name": "kevin"}})"}; - REQUIRE_THROWS(daggy::parametersFromJSON(testParams)); + REQUIRE_THROWS(daggy::configFromJSON(testParams)); } } TEST_CASE("Task Deserialization", "[deserialize_task]") { SECTION("Build with no expansion") { - std::string testTasks = R"({ "A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})"; + std::string testTasks = R"({ + "A": { + "job": { "command": ["/bin/echo", "A"] }, + "children": ["C"] + }, + "B": { + "job": {"command": ["/bin/echo", "B"]}, + "children": ["C"] + }, + "C": { + "job": {"command": ["/bin/echo", "C"]} + } + })"; auto tasks = daggy::tasksFromJSON(testTasks); REQUIRE(tasks.size() == 3); } - SECTION("Build with expansion") { - std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; - auto params = daggy::parametersFromJSON(testParams); - std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["B"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})"; - auto tasks = daggy::tasksFromJSON(testTasks, params); - REQUIRE(tasks.size() == 4); - } - - SECTION("Build with expansion using parents instead of children") { - std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; - auto params = daggy::parametersFromJSON(testParams); - std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]}, "C": {"command": ["/bin/echo", "C"], "parents": ["A"]}})"; - auto tasks = daggy::tasksFromJSON(testTasks, params); - REQUIRE(tasks.size() == 4); + SECTION("Build with job defaults") { + std::string testTasks = R"({ + "A": { + "job": { "command": ["/bin/echo", "A"] }, + "children": ["B"] + }, + "B": { + "job": { + "command": ["/bin/echo", "C"], + "memory": "1G" + } + } + })"; + daggy::ConfigValues jobDefaults{{"runtime", "60"}, + {"memory", "300M"}}; + auto tasks = daggy::tasksFromJSON(testTasks, jobDefaults); + REQUIRE(tasks.size() == 2); + REQUIRE(std::get(tasks["A"].job["runtime"]) == "60"); + REQUIRE(std::get(tasks["A"].job["memory"]) == "300M"); + REQUIRE(std::get(tasks["B"].job["runtime"]) == "60"); + REQUIRE(std::get(tasks["B"].job["memory"]) == "1G"); } } TEST_CASE("Task Serialization", "[serialize_tasks]") { SECTION("Build with no expansion") { - std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})"; + std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["C"]}, "B": {"job": {"command": ["/bin/echo", "B"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})"; auto tasks = daggy::tasksFromJSON(testTasks); auto genJSON = daggy::tasksToJSON(tasks); diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index 8dd64db..581b210 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -6,9 +6,10 @@ #include #include -#include "daggy/Server.hpp" -#include "daggy/executors/task/ForkingTaskExecutor.hpp" -#include "daggy/loggers/dag_run/OStreamLogger.hpp" +#include +#include +#include +#include namespace rj = rapidjson; @@ -73,10 +74,10 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { SECTION("Simple DAGRun Submission") { std::string dagRun = R"({ "name": "unit_server", - "taskParameters": { "FILE": [ "A", "B" ] }, + "parameters": { "FILE": [ "A", "B" ] }, "tasks": { - "touch": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ] }, - "cat": { "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ], + "touch": { "job": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ]} }, + "cat": { "job": { "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ]}, "parents": [ "touch" ] } } @@ -90,8 +91,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { REQUIRE(response.code() == Pistache::Http::Code::Ok); rj::Document doc; - rj::ParseResult parseResult = doc.Parse(response.body().c_str()); - REQUIRE(parseResult); + daggy::checkRJParse(doc.Parse(response.body().c_str())); REQUIRE(doc.IsObject()); REQUIRE(doc.HasMember("runID")); @@ -104,8 +104,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { REQUIRE(response.code() == Pistache::Http::Code::Ok); rj::Document doc; - rj::ParseResult parseResult = doc.Parse(response.body().c_str()); - REQUIRE(parseResult); + daggy::checkRJParse(doc.Parse(response.body().c_str())); REQUIRE(doc.IsArray()); REQUIRE(doc.Size() >= 1); @@ -134,8 +133,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID)); REQUIRE(response.code() == Pistache::Http::Code::Ok); rj::Document doc; - rj::ParseResult parseResult = doc.Parse(response.body().c_str()); - REQUIRE(parseResult); + daggy::checkRJParse(doc.Parse(response.body().c_str())); REQUIRE(doc.IsObject()); REQUIRE(doc.HasMember("taskStates")); diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 5d941da..82a5b7c 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -24,18 +24,18 @@ TEST_CASE("String Utilities", "[utilities_string]") { TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") { SECTION("Basic expansion") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"}; - auto params = daggy::parametersFromJSON(testParams); + auto params = daggy::configFromJSON(testParams); std::vector cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"}; - auto allCommands = daggy::expandCommands(cmd, params); + auto allCommands = daggy::interpolateValues(cmd, params); REQUIRE(allCommands.size() == 6); } SECTION("Skip over unused parameters") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"}; - auto params = daggy::parametersFromJSON(testParams); + auto params = daggy::configFromJSON(testParams); std::vector cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}"}; - auto allCommands = daggy::expandCommands(cmd, params); + auto allCommands = daggy::interpolateValues(cmd, params); // TYPE isn't used, so it's just |DATE| * |SOURCE| REQUIRE(allCommands.size() == 2); @@ -44,9 +44,9 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") { SECTION("Expand within a command part") { std::string testParams{ R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": ["A", "B"], "TYPE": ["a", "b", "c"]})"}; - auto params = daggy::parametersFromJSON(testParams); + auto params = daggy::configFromJSON(testParams); std::vector cmd{"/usr/bin/touch", "{{DATE}}_{{SOURCE}}"}; - auto result = daggy::expandCommands(cmd, params); + auto result = daggy::interpolateValues(cmd, params); // TYPE isn't used, so it's just |DATE| * |SOURCE| REQUIRE(result.size() == 4); @@ -62,11 +62,11 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { SECTION("Simple execution") { std::string prefix = "asdlk_"; - std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")" - + prefix + R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")" - + prefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")" - + prefix + R"(C"]}})"; - auto tasks = daggy::tasksFromJSON(taskJSON); + std::string taskJSON = R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + + prefix + R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" + + prefix + R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" + + prefix + R"(C"]}}})"; + auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex); auto dag = daggy::buildDAGFromTasks(tasks); auto runID = logger.startDAGRun("test_run", tasks); @@ -102,12 +102,13 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { std::string goodPrefix = "rec_error_"; std::string badPrefix = "noexist/rec_error_"; - std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")" + std::string taskJSON = R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + goodPrefix + - R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")" - + badPrefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")" - + badPrefix + R"(C"]}})"; - auto tasks = daggy::tasksFromJSON(taskJSON); + R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" + + badPrefix + + R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" + + badPrefix + R"(C"]}}})"; + auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex); auto dag = daggy::buildDAGFromTasks(tasks); auto runID = logger.startDAGRun("test_run", tasks); @@ -134,22 +135,31 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { SECTION("Generator tasks") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"}; - auto params = daggy::parametersFromJSON(testParams); + auto params = daggy::configFromJSON(testParams); + + std::string generatorOutput = R"({"B": {"job": {"command": ["/usr/bin/echo", "-e", "{{DATE}}"]}, "children": ["C"]}})"; + fs::path ofn = fs::current_path() / "generator_test_output.json"; + std::ofstream ofh{ofn}; + ofh << generatorOutput << std::endl; + ofh.close(); - std::string generatorOutput = R"({"B": {"command": ["/usr/bin/echo", "{{DATE}}"], "children": ["C"]}})"; std::stringstream jsonTasks; - jsonTasks << R"({ "A": { "command": [ "/usr/bin/echo", )" << std::quoted(generatorOutput) - << R"(], "children": ["C"], "isGenerator": true},)" - << R"("C": { "command": [ "/usr/bin/echo", "hello!"] } })"; + jsonTasks << R"({ "A": { "job": {"command": [ "/usr/bin/cat", )" << std::quoted(ofn.string()) + << R"(]}, "children": ["C"], "isGenerator": true},)" + << R"("C": { "job": {"command": [ "/usr/bin/echo", "hello!"]} } })"; - auto tasks = daggy::tasksFromJSON(jsonTasks.str()); + auto baseTasks = daggy::tasksFromJSON(jsonTasks.str()); + REQUIRE(baseTasks.size() == 2); + auto tasks = daggy::expandTaskSet(baseTasks, ex, params); + REQUIRE(tasks.size() == 2); auto dag = daggy::buildDAGFromTasks(tasks); REQUIRE(dag.size() == 2); auto runID = logger.startDAGRun("generator_run", tasks); auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params); + REQUIRE(finalDAG.allVisited()); REQUIRE(finalDAG.size() == 4); // Check the logger @@ -157,16 +167,15 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { REQUIRE(record.tasks.size() == 4); REQUIRE(record.taskRunStates.size() == 4); - for (const auto & [taskName, attempts] : record.taskAttempts) { + for (const auto &[taskName, attempts]: record.taskAttempts) { REQUIRE(attempts.size() == 1); REQUIRE(attempts.back().rc == 0); } // Ensure that children were updated properly - REQUIRE(record.tasks["A"].children == std::unordered_set{"B_0", "B_1", "C"}); + REQUIRE(record.tasks["A"].children == std::unordered_set{"B_0", "B_1", "C"}); REQUIRE(record.tasks["B_0"].children == std::unordered_set{"C"}); REQUIRE(record.tasks["B_1"].children == std::unordered_set{"C"}); REQUIRE(record.tasks["C"].children.empty()); - } }