From 2c00001e0bf2ecadc01e3c8aca3f8e987e13274b Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 30 Aug 2021 22:05:37 -0300 Subject: [PATCH] Adding support for isGenerator tasks - Changing how DAG is represented, both in code and how DAGs are defined in JSON. - Removing std::vector representation in favour of a map that will enforce unique task names - Task names now have a name (generated), and a definedName. - Adding support to loggers to add tasks after a DAGRun has been initialized. --- README.md | 132 ++++++++++--- TODO.md | 5 + daggy/include/daggy/DAG.hpp | 38 ++-- daggy/include/daggy/DAG.impl.hxx | 119 ++++++++++++ daggy/include/daggy/Defines.hpp | 12 +- daggy/include/daggy/Serialization.hpp | 9 +- daggy/include/daggy/Utilities.hpp | 22 ++- .../daggy/loggers/dag_run/DAGRunLogger.hpp | 6 +- .../include/daggy/loggers/dag_run/Defines.hpp | 10 +- .../loggers/dag_run/FileSystemLogger.hpp | 2 +- .../daggy/loggers/dag_run/OStreamLogger.hpp | 10 +- daggy/src/DAG.cpp | 91 --------- daggy/src/Serialization.cpp | 182 ++++++++---------- daggy/src/Server.cpp | 17 +- daggy/src/Utilities.cpp | 130 ++++++++----- .../src/loggers/dag_run/FileSystemLogger.cpp | 14 +- daggy/src/loggers/dag_run/OStreamLogger.cpp | 65 ++++--- tests/unit_dag.cpp | 31 +-- tests/unit_dagrun_loggers.cpp | 67 +++++++ tests/unit_serialization.cpp | 17 +- tests/unit_server.cpp | 14 +- tests/unit_utilities.cpp | 75 ++++++-- 22 files changed, 672 insertions(+), 396 deletions(-) create mode 100644 daggy/include/daggy/DAG.impl.hxx delete mode 100644 daggy/src/DAG.cpp create mode 100644 tests/unit_dagrun_loggers.cpp diff --git a/README.md b/README.md index e34ee81..41a5df7 100644 --- a/README.md +++ b/README.md @@ -17,12 +17,12 @@ graph LR Pull_A-->Transform_A; Pull_B-->Transform_B; Pull_C-->Transform_C; - + Transform_A-->Derive_Data_AB; Transform_B-->Derive_Data_AB; Derive_Data_AB-->Derive_Data_ABC; Transform_C-->Derive_Data_ABC; - + Derive_Data_ABC-->Report; ``` @@ -65,14 +65,15 @@ Basic Definition A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes: -| Attribute | Required | Description | -|------------|------------|--------------------------------------------------------| -| name | Yes | Name of this task. Must be unique. | -| command | Yes | The command to execute | -| maxRetries | No | If a task fails, how many times to retry (default: 0) | -| retry | No | How many seconds to wait between retries. | -| children | No | List of names of tasks that depend on this task | -| parents | No | List of names of tasks that this task depends on | +| Attribute | Required | Description | +|--------------|--------------|---------------------------------------------------------------| +| name | Yes | Name of this task. Must be unique. | +| command | Yes | The command to execute | +| maxRetries | No | If a task fails, how many times to retry (default: 0) | +| retry | No | How many seconds to wait between retries. | +| children | No | List of names of tasks that depend on this task | +| parents | No | List of names of tasks that this task depends on | +| isGenerator | No | The output of this task generates additional task definitions | Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you to define your task dependencies in the way that is most natural to how you think. @@ -81,9 +82,8 @@ Below is an example DAG Run submission: ```json { - "tasks": [ - { - "name": "task_one", + "tasks": { + "task_one": { "command": [ "/usr/bin/touch", "/tmp/somefile" @@ -91,8 +91,7 @@ Below is an example DAG Run submission: "maxRetries": 3, "retryIntervalSeconds": 30 }, - { - "name": "task_two", + "task_two": { "command": [ "/usr/bin/touch", "/tmp/someotherfile" @@ -103,7 +102,7 @@ Below is an example DAG Run submission: "task_one" ] } - ] + } } ``` @@ -122,9 +121,8 @@ For instance: "DIRECTORY": "/var/tmp", "FILE": "somefile" }, - "tasks": [ - { - "name": "task_one", + "tasks": { + "task_one": { "command": [ "/usr/bin/touch", "{{DIRECTORY}}/{{FILE}}" @@ -132,9 +130,9 @@ For instance: "maxRetries": 3, "retryIntervalSeconds": 30 } - ] + } } -``` +``` `task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be populated from the `taskParameters` values. @@ -155,31 +153,28 @@ Example: "2021-03-01" ] }, - "tasks": [ - { - "name": "populate_inputs", + "tasks": { + "populate_inputs": { "command": [ "/usr/bin/touch", "{{DIRECTORY}}/{{FILE}}" ] }, - { - "name": "calc_date", + "calc_date": { "command": [ "/path/to/calculator", "{{DIRECTORY}}/{{FILE}}", "{{DATE}}" ] }, - { - "name": "generate_report", + "generate_report": { "command": [ "/path/to/generator" ] } - ] + } } -``` +``` Conceptually, this DAG looks like this: @@ -205,6 +200,81 @@ graph LR - `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01` - `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01` +Tasks Generating Tasks +---------------------- + +Some DAG structures cannot be known ahead of time, but only at runtime. For instance, if a job pulls multiple files +from a source, each of which can be processed independently, it would be nice if the DAG could modify itself on the fly +to accomodate that request. + +Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be +a JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above, +and can freely define their dependencies the same way. + +**NB:** Generated tasks won't have any children dependencies unless you define them. If there are parameterized +dependencies, you must use the name of the original task (e.g. use `calc_date`, not `calc_date_1`) to add a dependency. + +**NB:** If you add a child dependency to a task that has already completed, weird things will happen. Don't do it. + +```json +{ + "tasks": { + "pull_files": { + "command": [ + "/path/to/puller/script", + "{{DATE}}" + ], + "isGenerator": true, + children: [ "generate_report" ] + }, + "generate_report": { + "command": [ + "/path/to/generator" + ] + } + } +} +``` + +```mermaid +graph LR + pull_files-->generate_report +``` + +The output of the puller task might be: + +```json +{ + "calc_date_a": { + "command": [ + "/path/to/calculator", + "/path/to/data/file/a" + ], + "children": ["generate_report"] + }, + "calc_date_b": { + "command": [ + "/path/to/calculator", + "/path/to/data/file/b" + ], + "children": ["generate_report"] + } +} +``` + +Once the first task runs, its output is parse as additional tasks to run. The new DAG will look like this: + +```mermaid +graph LR + pull_files-->generate_report + pull_files-->calc_file_a + pull_files-->calc_file_b + calc_file_a-->generate_report + calc_file_b-->generate_report +``` +Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would +run concurrently, and the `generate_report` wouldn't have any files to report on. + Execution Parameters -- (future work) @@ -217,4 +287,4 @@ jobs on slurm with a specific set of restrictions, or allow for local execution | Attribute | Description | |-----------|-------------| | pool | Names the executor the DAG should run on | -| poolParameters | Any parameters the executor accepts that might modify how a task is run | \ No newline at end of file +| poolParameters | Any parameters the executor accepts that might modify how a task is run | diff --git a/TODO.md b/TODO.md index ffca01e..72a8a3b 100644 --- a/TODO.md +++ b/TODO.md @@ -9,6 +9,11 @@ Tasks - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated list - Allow for tasks to define next tasks + - Refactor [de]serialization so that a task can be parsed by itself + - Add notation of parameterValues + - Tasks are now refered by two names: + - baseName is the original name in the spec + - name is the individual tasks - Add execution gates - Executors - [ ] Slurm Executor diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index bec335b..9f7aacb 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "Defines.hpp" @@ -18,27 +19,33 @@ namespace daggy { + template struct Vertex { RunState state; uint32_t depCount; - std::unordered_set children; + K key; + V data; + std::unordered_set children; }; - using Edge = std::pair; + template class DAG { + using Edge = std::pair; public: // Vertices - size_t addVertex(); + void addVertex(K id, V data); - const std::vector &getVertices(); + const std::vector> &getVertices(); // Edges - void addEdge(const size_t src, const size_t dst); + void addEdge(const K &src, const K &dst); - void dropEdge(const size_t src, const size_t dst); + void addEdgeIf(const K &src, std::function &v)> predicate); - bool hasPath(const size_t from, const size_t to) const; + bool hasPath(const K &from, const K &to) const; + + bool hasVertex(const K &from); const std::vector &getEdges(); @@ -53,17 +60,24 @@ namespace daggy { // Reset any vertex with RUNNING state to QUEUED void resetRunning(); - RunState getVertexState(const size_t id) const; + RunState getVertexState(const K &id) const; - void setVertexState(const size_t id, RunState state); + void setVertexState(const K &id, RunState state); + + void forEach(std::function &)> fun) const; bool allVisited() const; - std::optional visitNext(); + std::optional> visitNext(); - void completeVisit(const size_t id); + Vertex &getVertex(const K &id); + + void completeVisit(const K &id); private: - std::vector vertices_; + std::unordered_map> vertices_; + std::unordered_set readyVertices_; }; } + +#include "DAG.impl.hxx" \ No newline at end of file diff --git a/daggy/include/daggy/DAG.impl.hxx b/daggy/include/daggy/DAG.impl.hxx new file mode 100644 index 0000000..d14857c --- /dev/null +++ b/daggy/include/daggy/DAG.impl.hxx @@ -0,0 +1,119 @@ +namespace daggy { + template + size_t DAG::size() const { return vertices_.size(); } + + template + bool DAG::empty() const { return vertices_.empty(); } + + template + bool DAG::hasVertex(const K &id) { return vertices_.count(id) != 0; } + + template + Vertex &DAG::getVertex(const K &id) { return vertices_.at(id); } + + template + void DAG::addVertex(K id, V data) { + if (vertices_.count(id) != 0) { + std::stringstream ss; + ss << "A vertex with ID " << id << " already exists in the DAG"; + throw std::runtime_error(ss.str()); + } + vertices_.emplace(id, Vertex{.state = RunState::QUEUED, .depCount = 0, .key = id, .data = data + }); + } + + template + void DAG::addEdge(const K &from, const K &to) { + if (vertices_.find(from) == vertices_.end()) throw std::runtime_error("No such vertex"); + if (vertices_.find(to) == vertices_.end()) throw std::runtime_error("No such vertex"); + if (hasPath(to, from)) + throw std::runtime_error("Adding edge would result in a cycle"); + vertices_.at(from).children.insert(to); + vertices_.at(to).depCount++; + } + + template + void DAG::addEdgeIf(const K &src, std::function &v)> predicate) { + for (const auto &[name, vertex]: vertices_) { + if (name == src) continue; + if (predicate(vertex)) addEdge(src, name); + } + } + + template + bool DAG::hasPath(const K &from, const K &to) const { + if (vertices_.find(from) == vertices_.end()) throw std::runtime_error("No such vertex"); + if (vertices_.find(to) == vertices_.end()) throw std::runtime_error("No such vertex"); + for (const auto &child: vertices_.at(from).children) { + if (child == to) return true; + if (hasPath(child, to)) return true; + } + + return false; + } + + template + void DAG::reset() { + // Reset the state of all vertices + for (auto &[_, v]: vertices_) { + v.state = RunState::QUEUED; + v.depCount = 0; + } + + // Calculate the upstream count + for (auto &[_, v]: vertices_) { + for (auto c: v.children) { + vertices_.at(c).depCount++; + } + } + } + + template + void DAG::resetRunning() { + for (auto &[k, v]: vertices_) { + if (v.state != +RunState::RUNNING) continue; + v.state = RunState::QUEUED; + } + } + + template + void DAG::setVertexState(const K &id, RunState state) { + vertices_.at(id).state = state; + } + + template + bool DAG::allVisited() const { + for (const auto &[_, v]: vertices_) { + if (v.state != +RunState::COMPLETED) return false; + } + return true; + } + + template + std::optional> DAG::visitNext() { + for (auto &[k, v]: vertices_) { + if (v.state != +RunState::QUEUED) continue; + if (v.depCount != 0) continue; + v.state = RunState::RUNNING; + return v; + } + return {}; + } + + template + void DAG::completeVisit(const K &id) { + auto &v = vertices_.at(id); + v.state = RunState::COMPLETED; + for (auto c: v.children) { + --vertices_.at(c).depCount; + } + } + + template + void DAG::forEach(std::function &)> fun) const { + for (const auto &[_, v]: vertices_) { + fun(v); + } + } + +} diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index 780783a..dd598ee 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -34,20 +34,28 @@ namespace daggy { struct Task { std::string name; + // definedName is the name from the original DAGDefinition. + std::string definedName; std::vector command; uint32_t maxRetries; uint32_t retryIntervalSeconds; // Time to wait between retries std::unordered_set children; + std::unordered_set parents; + bool isGenerator; // True if the output of this task is a JSON set of tasks to complete bool operator==(const Task &other) const { return (name == other.name) and (maxRetries == other.maxRetries) and (retryIntervalSeconds == other.retryIntervalSeconds) and (command == other.command) - and (children == other.children); + and (children == other.children) + and (parents == other.parents) + and (isGenerator == other.isGenerator); } }; + using TaskList = std::unordered_map; + struct AttemptRecord { TimePoint startTime; TimePoint stopTime; @@ -58,4 +66,4 @@ namespace daggy { }; } -BETTER_ENUMS_DECLARE_STD_HASH(daggy::RunState) \ No newline at end of file +BETTER_ENUMS_DECLARE_STD_HASH(daggy::RunState) diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 43a1068..66c343b 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -18,13 +18,16 @@ namespace daggy { ParameterValues parametersFromJSON(const rj::Value &spec); // Tasks - std::vector tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters = {}); + TaskList + taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues ¶meters = {}); - std::vector tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters = {}); + TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters = {}); + + TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters = {}); std::string taskToJSON(const Task &task); - std::string tasksToJSON(const std::vector &tasks); + std::string tasksToJSON(const TaskList &tasks); // Attempt Records std::string attemptRecordToJSON(const AttemptRecord &attemptRecord); diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 0b7e203..79236fa 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -13,12 +13,20 @@ #include "DAG.hpp" namespace daggy { + using TaskDAG = DAG; + std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement); std::vector expandCommands(const std::vector &command, const ParameterValues ¶meters); - DAG buildDAGFromTasks(const std::vector &tasks, - const std::vector &updates = {}); + std::unordered_set + findDerivedVertices(TaskDAG &dag, const std::string &definedName); + + TaskDAG + buildDAGFromTasks(TaskList &tasks, + const std::vector &updates = {}); + + void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks); // Blocking call std::vector @@ -28,11 +36,11 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger); - DAG runDAG(DAGRunID runID, - std::vector tasks, - executors::task::TaskExecutor &executor, - loggers::dag_run::DAGRunLogger &logger, - DAG dag); + TaskDAG runDAG(DAGRunID runID, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGRunLogger &logger, + TaskDAG dag, + const ParameterValues taskParameters = {}); std::ostream &operator<<(std::ostream &os, const TimePoint &tp); } diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index e5594b8..c81957f 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -17,7 +17,11 @@ namespace daggy { class DAGRunLogger { public: // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) = 0; + virtual DAGRunID startDAGRun(std::string name, const TaskList &tasks) = 0; + + virtual void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) = 0; + + virtual void updateTask(DAGRunID dagRunID, const std::string taskName, const Task &task) = 0; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0; diff --git a/daggy/include/daggy/loggers/dag_run/Defines.hpp b/daggy/include/daggy/loggers/dag_run/Defines.hpp index fde2b8e..ef2f674 100644 --- a/daggy/include/daggy/loggers/dag_run/Defines.hpp +++ b/daggy/include/daggy/loggers/dag_run/Defines.hpp @@ -4,12 +4,14 @@ #include #include #include +#include +#include "../../Defines.hpp" namespace daggy::loggers::dag_run { struct TaskUpdateRecord { TimePoint time; - TaskID taskID; + std::string taskName; RunState newState; }; @@ -21,9 +23,9 @@ namespace daggy::loggers::dag_run { // Pretty heavy weight, but struct DAGRunRecord { std::string name; - std::vector tasks; - std::vector taskRunStates; - std::vector> taskAttempts; + TaskList tasks; + std::unordered_map taskRunStates; + std::unordered_map> taskAttempts; std::vector taskStateChanges; std::vector dagStateChanges; }; diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp index 84d1756..5c0f0b4 100644 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -37,7 +37,7 @@ namespace daggy::loggers::dag_run { FileSystemLogger(fs::path root); // Execution - DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + DAGRunID startDAGRun(std::string name, const TaskList &tasks) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override; diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 3bbc5dd..1f43839 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -18,7 +18,11 @@ namespace daggy { OStreamLogger(std::ostream &os); // Execution - DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + DAGRunID startDAGRun(std::string name, const TaskList &tasks) override; + + void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) override; + + void updateTask(DAGRunID dagRunID, const std::string taskName, const Task &task) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override; @@ -36,6 +40,10 @@ namespace daggy { std::mutex guard_; std::ostream &os_; std::vector dagRuns_; + + void _updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state); + + void _updateDAGRunState(DAGRunID dagRunID, RunState state); }; } } diff --git a/daggy/src/DAG.cpp b/daggy/src/DAG.cpp deleted file mode 100644 index 08ec1d6..0000000 --- a/daggy/src/DAG.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include -#include - -namespace daggy { - size_t DAG::size() const { return vertices_.size(); } - - bool DAG::empty() const { return vertices_.empty(); } - - size_t DAG::addVertex() { - vertices_.push_back(Vertex{.state = RunState::QUEUED, .depCount = 0}); - return vertices_.size() - 1; - } - - void DAG::dropEdge(const size_t from, const size_t to) { - if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); - if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); - vertices_[from].children.extract(to); - } - - void DAG::addEdge(const size_t from, const size_t to) { - if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); - if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); - if (hasPath(to, from)) - throw std::runtime_error("Adding edge would result in a cycle"); - vertices_[from].children.insert(to); - } - - bool DAG::hasPath(const size_t from, const size_t to) const { - if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); - if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); - for (const auto &child: vertices_[from].children) { - if (child == to) return true; - if (hasPath(child, to)) return true; - } - - return false; - } - - void DAG::reset() { - // Reset the state of all vertices - for (auto &v: vertices_) { - v.state = RunState::QUEUED; - v.depCount = 0; - } - - // Calculate the upstream count - for (auto &v: vertices_) { - for (auto c: v.children) { - ++vertices_[c].depCount; - } - } - } - - void DAG::resetRunning() { - for (auto &v: vertices_) { - if (v.state != +RunState::RUNNING) continue; - v.state = RunState::QUEUED; - } - } - - void DAG::setVertexState(const size_t id, RunState state) { - vertices_[id].state = state; - } - - bool DAG::allVisited() const { - for (const auto &v: vertices_) { - if (v.state != +RunState::COMPLETED) return false; - } - return true; - } - - std::optional DAG::visitNext() { - for (size_t i = 0; i < vertices_.size(); ++i) { - auto &v = vertices_[i]; - - if (v.state != +RunState::QUEUED) continue; - if (v.depCount != 0) continue; - v.state = RunState::RUNNING; - return i; - } - return {}; - } - - void DAG::completeVisit(const size_t id) { - auto &v = vertices_[id]; - v.state = RunState::COMPLETED; - for (auto c: v.children) { - --vertices_[c].depCount; - } - } -} diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index bed95d7..a7e110c 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -44,7 +44,69 @@ namespace daggy { return parameters; } - std::vector tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters) { + TaskList + taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues ¶meters) { + TaskList tasks; + if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); } + + if (!spec.HasMember("command")) { + throw std::runtime_error("Task " + name + " is missing required 'command' field"); + } + + // Grab the standard fields with defaults; + bool isGenerator = false; + if (spec.HasMember("isGenerator")) { + isGenerator = spec["isGenerator"].GetBool(); + } + + uint8_t maxRetries = 0; + if (spec.HasMember("maxRetries")) { maxRetries = spec["maxRetries"].GetInt(); } + uint8_t retryIntervalSeconds = 0; + if (spec.HasMember( + "retryIntervalSeconds")) { retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt(); } + + // Children / parents + std::unordered_set children; + if (spec.HasMember("children")) { + const auto &specChildren = spec["children"].GetArray(); + for (size_t c = 0; c < specChildren.Size(); ++c) { + children.insert(specChildren[c].GetString()); + } + } + + std::unordered_set parents; + if (spec.HasMember("parents")) { + const auto &specParents = spec["parents"].GetArray(); + for (size_t c = 0; c < specParents.Size(); ++c) { + parents.insert(specParents[c].GetString()); + } + } + + // Build out the commands + std::vector command; + for (size_t cmd = 0; cmd < spec["command"].Size(); ++cmd) { + command.emplace_back(spec["command"][cmd].GetString()); + } + auto commands = expandCommands(command, parameters); + + // Create the tasks + for (size_t tid = 0; tid < commands.size(); ++tid) { + std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid)); + tasks.emplace(taskName, Task{ + .name = taskName, + .definedName = name, + .command = commands[tid], + .maxRetries = maxRetries, + .retryIntervalSeconds = retryIntervalSeconds, + .children = children, + .parents = parents, + .isGenerator = isGenerator + }); + } + return tasks; + } + + TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters) { rj::Document doc; rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); if (!parseResult) { @@ -53,101 +115,22 @@ namespace daggy { return tasksFromJSON(doc, parameters); } - std::vector tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters) { - std::vector tasks; - if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); } - - const std::vector reqFields{"name", "command"}; - std::unordered_map> childrenMap; - // Maps child -> parent - std::unordered_map> parentMap; - std::unordered_map taskIndex; + TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters) { + TaskList tasks; + if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); } // Tasks - for (size_t i = 0; i < spec.Size(); ++i) { - if (!spec[i].IsObject()) { - throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary."); - } - const auto &taskSpec = spec[i].GetObject(); - - for (const auto &reqField : reqFields) { - if (!taskSpec.HasMember(reqField.c_str())) { - throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField); - } - } - - // Grab the standard fields with defaults; - std::string name = taskSpec["name"].GetString(); - taskIndex[name] = i; - - uint8_t maxRetries = 0; - if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); } - uint8_t retryIntervalSeconds = 0; - if (taskSpec.HasMember( - "retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); } - - // Children / parents - std::unordered_set children; - if (taskSpec.HasMember("children")) { - const auto &specChildren = taskSpec["children"].GetArray(); - for (size_t c = 0; c < specChildren.Size(); ++c) { - children.insert(specChildren[c].GetString()); - } - } - if (taskSpec.HasMember("parents")) { - const auto &specParents = taskSpec["parents"].GetArray(); - for (size_t c = 0; c < specParents.Size(); ++c) { - parentMap[name].emplace_back(specParents[c].GetString()); - } - } - - // Build out the commands - std::vector command; - for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) { - command.emplace_back(taskSpec["command"][cmd].GetString()); - } - auto commands = expandCommands(command, parameters); - - // Create the tasks - auto &taskNames = childrenMap[name]; - for (size_t tid = 0; tid < commands.size(); ++tid) { - std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid)); - taskNames.push_back(taskName); - tasks.emplace_back(Task{ - .name = taskName, - .command = commands[tid], - .maxRetries = maxRetries, - .retryIntervalSeconds = retryIntervalSeconds, - .children = children - }); - } + for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { + if (!it->name.IsString()) throw std::runtime_error("Task names must be a string."); + if (!it->value.IsObject()) throw std::runtime_error("Task definitions must be an object."); + auto subTasks = taskFromJSON(it->name.GetString(), it->value, parameters); + tasks.merge(subTasks); } - - // Update any missing child -> parent relationship - for (auto &task : tasks) { - auto pit = parentMap.find(task.name); - if (pit == parentMap.end()) { continue; } - - for (const auto &parent : pit->second) { - tasks[taskIndex[parent]].children.insert(task.name); - } - } - - // At the end, replace the names of the children with all the expanded versions - for (auto &task : tasks) { - std::unordered_set children; - for (const auto &child : task.children) { - auto &newChildren = childrenMap[child]; - std::copy(newChildren.begin(), newChildren.end(), std::inserter(children, children.end())); - } - task.children.swap(children); - } - return tasks; } - // I really want to do this with rapidjson, but damn they make it ugly and difficult. - // So we'll shortcut and generate the JSON directly. +// I really want to do this with rapidjson, but damn they make it ugly and difficult. +// So we'll shortcut and generate the JSON directly. std::string taskToJSON(const Task &task) { std::stringstream ss; bool first = false; @@ -160,7 +143,7 @@ namespace daggy { // Commands ss << R"("command": [)"; first = true; - for (const auto &part : task.command) { + for (const auto &part: task.command) { if (!first) ss << ','; ss << std::quoted(part); first = false; @@ -169,29 +152,31 @@ namespace daggy { ss << R"("children": [)"; first = true; - for (const auto &child : task.children) { + for (const auto &child: task.children) { if (!first) ss << ','; ss << std::quoted(child); first = false; } - ss << "]"; + ss << "],"; + + ss << R"("isGenerator": )" << (task.isGenerator ? "true" : "false"); ss << '}'; return ss.str(); } - std::string tasksToJSON(const std::vector &tasks) { + std::string tasksToJSON(const TaskList &tasks) { std::stringstream ss; - ss << "["; + ss << "{"; bool first = true; - for (const auto &task : tasks) { + for (const auto &[name, task]: tasks) { if (!first) ss << ','; - ss << taskToJSON(task); + ss << std::quoted(name) << ": " << taskToJSON(task); first = false; } - ss << "]"; + ss << "}"; return ss.str(); } @@ -228,4 +213,5 @@ namespace daggy { ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); return Clock::from_time_t(mktime(&dt)); } + } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 356321f..d4b504d 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -118,21 +118,20 @@ namespace daggy { } // Get the tasks - std::vector tasks; + TaskList tasks; try { - auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray(), parameters); + auto parsedTasks = tasksFromJSON(doc["tasks"], parameters); tasks.swap(parsedTasks); } catch (std::exception &e) { REQ_ERROR(Bad_Request, e.what()); } - // Get a run ID auto runID = logger_.startDAGRun(runName, tasks); auto dag = buildDAGFromTasks(tasks); runnerPool_.addTask( - [this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); + [this, parameters, runID, dag]() { runDAG(runID, this->executor_, this->logger_, dag, parameters); }); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); } @@ -190,7 +189,7 @@ namespace daggy { // task run states ss << R"("taskStates": [ )"; first = true; - for (const auto &state: run.taskRunStates) { + for (const auto &[_, state]: run.taskRunStates) { if (first) { first = false; } else { ss << ','; } ss << std::quoted(state._to_string()); } @@ -198,10 +197,10 @@ namespace daggy { // Attempt records first = true; - ss << R"("taskAttempts": [ )"; - for (const auto &attempts: run.taskAttempts) { + ss << R"("taskAttempts": { )"; + for (const auto &[taskName, attempts]: run.taskAttempts) { if (first) { first = false; } else { ss << ','; } - ss << '['; + ss << std::quoted(taskName) << ": ["; bool firstAttempt = true; for (const auto &attempt: attempts) { if (firstAttempt) { firstAttempt = false; } else { ss << ','; } @@ -216,7 +215,7 @@ namespace daggy { } ss << ']'; } - ss << "],"; + ss << "},"; // DAG state changes first = true; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 7d4a725..e0e2abd 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -1,6 +1,7 @@ #include #include +#include namespace daggy { std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) { @@ -16,22 +17,22 @@ namespace daggy { expandCommands(const std::vector &command, const ParameterValues ¶meters) { std::vector> commands{{}}; - for (const auto &part : command) { + for (const auto &part: command) { std::vector expandedPart{part}; // Find all values of parameters, and expand them - for (const auto &[param, paramValue] : parameters) { + for (const auto &[param, paramValue]: parameters) { auto pos = part.find(param); if (pos == std::string::npos) continue; std::vector newExpandedPart; if (std::holds_alternative(paramValue)) { - for (auto &cmd : expandedPart) { + for (auto &cmd: expandedPart) { newExpandedPart.push_back(globalSub(cmd, param, std::get(paramValue))); } } else { - for (const auto &val : std::get>(paramValue)) { - for (auto cmd : expandedPart) { + for (const auto &val: std::get>(paramValue)) { + for (auto cmd: expandedPart) { newExpandedPart.push_back(globalSub(cmd, param, val)); } } @@ -41,8 +42,8 @@ namespace daggy { } std::vector> newCommands; - for (const auto &newPart : expandedPart) { - for (auto cmd : commands) { + for (const auto &newPart: expandedPart) { + for (auto cmd: commands) { cmd.push_back(newPart); newCommands.emplace_back(cmd); } @@ -52,33 +53,58 @@ namespace daggy { return commands; } - DAG buildDAGFromTasks(const std::vector &tasks, - const std::vector &updates) { - DAG dag; - std::unordered_map taskIDs; + std::unordered_set + findDerivedVertices(TaskDAG &dag, const std::string &definedName) { + std::unordered_set vertices; + dag.forEach([&](const auto &v) { + if (v.data.definedName == definedName) { + vertices.insert(v.data.name); + } + }); + return vertices; + } + void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks) { // Add all the vertices - for (const auto &task : tasks) { - taskIDs[task.name] = dag.addVertex(); + std::unordered_map> definedSets; + for (const auto &[name, task]: tasks) { + dag.addVertex(name, task); + definedSets[task.definedName].insert(name); } // Add edges - for (size_t i = 0; i < tasks.size(); ++i) { - for (const auto &c : tasks[i].children) { - dag.addEdge(i, taskIDs[c]); + for (const auto &[name, task]: tasks) { + for (const auto &defChild: task.children) { + for (const auto &child: definedSets[defChild]) { + dag.addEdge(name, child); + } + } + + for (const auto &defParent: task.parents) { + for (const auto &parent: definedSets[defParent]) { + dag.addEdge(parent, name); + tasks.at(parent).children.insert(name); + } } } + } + + TaskDAG buildDAGFromTasks(TaskList &tasks, + const std::vector &updates) { + TaskDAG dag; + + updateDAGFromTasks(dag, tasks); dag.reset(); // Replay any updates - for (const auto &update : updates) { + for (const auto &update: updates) { switch (update.newState) { case RunState::RUNNING: case RunState::RETRY: case RunState::ERRORED: case RunState::KILLED: - dag.setVertexState(update.taskID, RunState::RUNNING); - dag.setVertexState(update.taskID, RunState::COMPLETED); + dag.setVertexState(update.taskName, RunState::RUNNING); + dag.setVertexState(update.taskName, RunState::COMPLETED); break; } } @@ -87,7 +113,6 @@ namespace daggy { } std::vector runTask(DAGRunID runID, - TaskID taskID, const Task &task, executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger) { @@ -103,45 +128,55 @@ namespace daggy { return attempts; } - DAG runDAG(DAGRunID runID, - std::vector tasks, - executors::task::TaskExecutor &executor, - loggers::dag_run::DAGRunLogger &logger, - DAG dag) { + TaskDAG runDAG(DAGRunID runID, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGRunLogger &logger, + TaskDAG dag, + const ParameterValues taskParameters + ) { logger.updateDAGRunState(runID, RunState::RUNNING); - struct TaskState { - size_t tid; - std::future> fut; - bool complete; - }; + std::unordered_map>> runningTasks; - std::vector taskStates; - - // TODO Handle case where everything is wedged due to errors size_t running = 0; size_t errored = 0; while (!dag.allVisited()) { // Check for any completed tasks - for (auto &taskState : taskStates) { - if (taskState.complete) continue; - - if (taskState.fut.valid()) { - auto attemptRecords = taskState.fut.get(); - const auto &taskName = tasks[taskState.tid].name; + for (auto &[taskName, fut]: runningTasks) { + if (fut.valid()) { + auto attemptRecords = fut.get(); if (attemptRecords.empty()) { logger.updateTaskState(runID, taskName, RunState::ERRORED); ++errored; } if (attemptRecords.back().rc == 0) { logger.updateTaskState(runID, taskName, RunState::COMPLETED); - dag.completeVisit(taskState.tid); + auto &vert = dag.getVertex(taskName); + auto &task = vert.data; + if (task.isGenerator) { + // Parse the output and update the DAGs + // TODO: Let the logger know about the new tasks + try { + auto newTasks = tasksFromJSON(attemptRecords.back().outputLog, taskParameters); + updateDAGFromTasks(dag, newTasks); + + for (const auto &[ntName, ntTask]: newTasks) { + logger.addTask(runID, ntName, ntTask); + dag.addEdge(taskName, ntName); + task.children.insert(ntName); + } + logger.updateTask(runID, taskName, task); + } catch (std::exception &e) { + logger.updateTaskState(runID, task.name, RunState::ERRORED); + ++errored; + } + } + dag.completeVisit(taskName); --running; } else { logger.updateTaskState(runID, taskName, RunState::ERRORED); ++errored; } - taskState.complete = true; } } @@ -150,15 +185,10 @@ namespace daggy { auto t = dag.visitNext(); while (t.has_value()) { // Schedule the task to run - auto tid = t.value(); - TaskState tsk{ - .tid = tid, - .fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() { - return runTask(runID, tid, tasks[tid], executor, logger); - }), - .complete = false - }; - taskStates.push_back(std::move(tsk)); + auto vertex = t.value(); + runningTasks.emplace(vertex.data.name, tq->addTask([runID, vertex, &executor, &logger]() { + return runTask(runID, vertex.data, executor, logger); + })); ++running; auto nextTask = dag.visitNext(); diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index 2a93667..2e47724 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -39,7 +39,7 @@ namespace daggy { } // Execution - DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector &tasks) { + DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskList &tasks) { DAGRunID runID = nextRunID_++; // TODO make this threadsafe @@ -56,8 +56,8 @@ namespace daggy { ofh.close(); // Task directories - for (const auto &task: tasks) { - auto taskDir = runRoot / task.name; + for (const auto &[name, task]: tasks) { + auto taskDir = runRoot / name; fs::create_directories(taskDir); std::ofstream ofh(taskDir / "states.csv"); } @@ -136,7 +136,7 @@ namespace daggy { doc.Parse(metaData.c_str()); record.name = doc["name"].GetString(); - record.tasks = tasksFromJSON(doc["tasks"].GetObject()); + record.tasks = tasksFromJSON(doc["tasks"]); // DAG State Changes std::string line; @@ -158,10 +158,10 @@ namespace daggy { ifh.close(); // Task states - for (const auto &task: record.tasks) { + for (const auto &[_, task]: record.tasks) { auto taskStateFile = runRoot / task.name / "states.csv"; if (!fs::exists(taskStateFile)) { - record.taskRunStates.push_back(RunState::QUEUED); + record.taskRunStates.emplace(task.name, RunState::QUEUED); continue; } @@ -170,7 +170,7 @@ namespace daggy { std::stringstream ss{line}; while (std::getline(ss, token, ',')) { continue; } RunState taskState = RunState::_from_string(token.c_str()); - record.taskRunStates.emplace_back(taskState); + record.taskRunStates.emplace(task.name, taskState); ifh.close(); } return record; diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index 42dcfff..57f4ca8 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -11,19 +11,21 @@ namespace daggy { OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {} // Execution - DAGRunID OStreamLogger::startDAGRun(std::string name, const std::vector &tasks) { + DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskList &tasks) { std::lock_guard lock(guard_); size_t runID = dagRuns_.size(); dagRuns_.push_back({ .name = name, - .tasks = tasks, - .taskRunStates{tasks.size(), RunState::QUEUED}, - .taskAttempts = std::vector>(tasks.size()) + .tasks = tasks }); + for (const auto &[name, _]: tasks) { + _updateTaskState(runID, name, RunState::QUEUED); + } + _updateDAGRunState(runID, RunState::QUEUED); os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; - for (const auto &task: tasks) { + for (const auto &[_, task]: tasks) { os_ << "TASK (" << task.name << "): "; std::copy(task.command.begin(), task.command.end(), std::ostream_iterator(os_, " ")); @@ -32,8 +34,25 @@ namespace daggy { return runID; } + void OStreamLogger::addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) { + std::lock_guard lock(guard_); + auto &dagRun = dagRuns_[dagRunID]; + dagRun.tasks[taskName] = task; + _updateTaskState(dagRunID, taskName, RunState::QUEUED); + } + + void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string taskName, const Task &task) { + std::lock_guard lock(guard_); + auto &dagRun = dagRuns_[dagRunID]; + dagRun.tasks[taskName] = task; + } + void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) { std::lock_guard lock(guard_); + _updateDAGRunState(dagRunID, state); + } + + void OStreamLogger::_updateDAGRunState(DAGRunID dagRunID, RunState state) { os_ << "DAG State Change(" << dagRunID << "): " << state._to_string() << std::endl; dagRuns_[dagRunID].dagStateChanges.push_back({Clock::now(), state}); } @@ -45,26 +64,25 @@ namespace daggy { os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": " << msg << std::endl; - const auto &tasks = dagRuns_[dagRunID].tasks; - auto it = std::find_if(tasks.begin(), tasks.end(), - [&taskName](const Task &a) { return a.name == taskName; }); - if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName); - size_t taskID = it - tasks.begin(); - dagRuns_[dagRunID].taskAttempts[taskID].push_back(attempt); + dagRuns_[dagRunID].taskAttempts[taskName].push_back(attempt); } void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { std::lock_guard lock(guard_); - auto &dagRun = dagRuns_[dagRunID]; - const auto &tasks = dagRun.tasks; - auto it = std::find_if(tasks.begin(), tasks.end(), - [&taskName](const Task &a) { return a.name == taskName; }); - if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName); - size_t taskID = it - tasks.begin(); - dagRun.taskStateChanges.push_back({Clock::now(), taskID, state}); - dagRun.taskRunStates[taskID] = state; + _updateTaskState(dagRunID, taskName, state); + } - os_ << "Task State Change (" << dagRunID << '/' << taskName << " [task_id: " << taskID << "]): " + void OStreamLogger::_updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { + auto &dagRun = dagRuns_.at(dagRunID); + dagRun.taskStateChanges.push_back({Clock::now(), taskName, state}); + auto it = dagRun.taskRunStates.find(taskName); + if (it == dagRun.taskRunStates.end()) { + dagRun.taskRunStates.emplace(taskName, state); + } else { + it->second = state; + } + + os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << state._to_string() << std::endl; } @@ -84,12 +102,7 @@ namespace daggy { run.dagStateChanges.back().time) }; - std::vector states(run.tasks.size(), RunState::QUEUED); - for (const auto &taskUpdate: run.taskStateChanges) { - states[taskUpdate.taskID] = taskUpdate.newState; - } - - for (const auto &taskState: states) { + for (const auto &[_, taskState]: run.taskRunStates) { summary.taskStateCounts[taskState]++; } diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index 807cef0..c84e367 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -5,14 +5,16 @@ #include TEST_CASE("DAG Construction Tests", "[dag]") { - daggy::DAG dag; + daggy::DAG dag; REQUIRE(dag.size() == 0); REQUIRE(dag.empty()); - REQUIRE_NOTHROW(dag.addVertex()); + REQUIRE_NOTHROW(dag.addVertex(0, 0)); for (int i = 1; i < 10; ++i) { - dag.addVertex(); + dag.addVertex(i, i); + REQUIRE(dag.hasVertex(i)); + REQUIRE(dag.getVertex(i).data == i); dag.addEdge(i - 1, i); } @@ -26,9 +28,6 @@ TEST_CASE("DAG Construction Tests", "[dag]") { SECTION("addEdge Bounds Checking") { REQUIRE_THROWS(dag.addEdge(20, 0)); REQUIRE_THROWS(dag.addEdge(0, 20)); - }SECTION("dropEdge Bounds Checking") { - REQUIRE_THROWS(dag.dropEdge(20, 0)); - REQUIRE_THROWS(dag.dropEdge(0, 20)); }SECTION("hasPath Bounds Checking") { REQUIRE_THROWS(dag.hasPath(20, 0)); REQUIRE_THROWS(dag.hasPath(0, 20)); @@ -36,11 +35,11 @@ TEST_CASE("DAG Construction Tests", "[dag]") { } TEST_CASE("DAG Traversal Tests", "[dag]") { - daggy::DAG dag; + daggy::DAG dag; const int N_VERTICES = 10; - for (int i = 0; i < N_VERTICES; ++i) { dag.addVertex(); } + for (int i = 0; i < N_VERTICES; ++i) { dag.addVertex(i, i); } /* 0 ---------------------\ @@ -61,24 +60,30 @@ TEST_CASE("DAG Traversal Tests", "[dag]") { {7, 9} }; - for (auto const[from, to] : edges) { + for (auto const[from, to]: edges) { dag.addEdge(from, to); } - SECTION("Baisc Traversal") { + SECTION("Basic Traversal") { dag.reset(); std::vector visitOrder(N_VERTICES); size_t i = 0; while (!dag.allVisited()) { const auto &v = dag.visitNext().value(); - dag.completeVisit(v); - visitOrder[v] = i; + dag.completeVisit(v.key); + visitOrder[v.key] = i; ++i; } // Ensure visit order is preserved - for (auto const[from, to] : edges) { + for (auto const[from, to]: edges) { REQUIRE(visitOrder[from] <= visitOrder[to]); } } + + SECTION("Iteration") { + size_t nVisited = 0; + dag.forEach([&](const daggy::Vertex &) { ++nVisited; }); + REQUIRE(nVisited == dag.size()); + } } diff --git a/tests/unit_dagrun_loggers.cpp b/tests/unit_dagrun_loggers.cpp new file mode 100644 index 0000000..e22d36b --- /dev/null +++ b/tests/unit_dagrun_loggers.cpp @@ -0,0 +1,67 @@ +#include +#include +#include + +#include + +#include "daggy/loggers/dag_run/FileSystemLogger.hpp" +#include "daggy/loggers/dag_run/OStreamLogger.hpp" + +namespace fs = std::filesystem; + +using namespace daggy; +using namespace daggy::loggers::dag_run; + +const TaskList SAMPLE_TASKS{ + {"work_a", Task{.command{"/bin/echo", "a"}, .children{"c"}}}, + {"work_b", Task{.command{"/bin/echo", "b"}, .children{"c"}}}, + {"work_c", Task{.command{"/bin/echo", "c"}}} +}; + +inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, const TaskList &tasks) { + auto runID = logger.startDAGRun(name, tasks); + auto dagRun = logger.getDAGRun(runID); + + REQUIRE(dagRun.tasks == tasks); + + REQUIRE(dagRun.taskRunStates.size() == tasks.size()); + auto nonQueuedTask = std::find_if(dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(), + [](const auto &a) { return a.second != +RunState::QUEUED; }); + REQUIRE(nonQueuedTask == dagRun.taskRunStates.end()); + + REQUIRE(dagRun.dagStateChanges.size() == 1); + REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::QUEUED); + return runID; +} + +/* +TEST_CASE("Filesystem Logger", "[filesystem_logger]") { + const fs::path logRoot{"fs_logger_unit"}; + auto cleanup = [&]() { + if (fs::exists(logRoot)) { + fs::remove_all(logRoot); + } + }; + + //cleanup(); + daggy::loggers::dag_run::FileSystemLogger logger(logRoot); + + SECTION("DAGRun Starts") { + testDAGRunInit(logger, "init_test", SAMPLE_TASKS); + } + + // cleanup(); +} +*/ + +TEST_CASE("ostream Logger", "[ostream_logger]") { + //cleanup(); + std::stringstream ss; + daggy::loggers::dag_run::OStreamLogger logger(ss); + + SECTION("DAGRun Starts") { + testDAGRunInit(logger, "init_test", SAMPLE_TASKS); + } + + // cleanup(); +} diff --git a/tests/unit_serialization.cpp b/tests/unit_serialization.cpp index 4590095..ab14631 100644 --- a/tests/unit_serialization.cpp +++ b/tests/unit_serialization.cpp @@ -29,7 +29,7 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") { TEST_CASE("Task Deserialization", "[deserialize_task]") { SECTION("Build with no expansion") { - std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; + std::string testTasks = R"({ "A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})"; auto tasks = daggy::tasksFromJSON(testTasks); REQUIRE(tasks.size() == 3); } @@ -37,7 +37,7 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") { SECTION("Build with expansion") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; auto params = daggy::parametersFromJSON(testParams); - std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["B"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; + std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["B"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})"; auto tasks = daggy::tasksFromJSON(testTasks, params); REQUIRE(tasks.size() == 4); } @@ -45,7 +45,7 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") { SECTION("Build with expansion using parents instead of children") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; auto params = daggy::parametersFromJSON(testParams); - std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]},{"name": "C", "command": ["/bin/echo", "C"], "parents": ["A"]}])"; + std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]}, "C": {"command": ["/bin/echo", "C"], "parents": ["A"]}})"; auto tasks = daggy::tasksFromJSON(testTasks, params); REQUIRE(tasks.size() == 4); } @@ -53,21 +53,16 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") { TEST_CASE("Task Serialization", "[serialize_tasks]") { SECTION("Build with no expansion") { - std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; + std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})"; auto tasks = daggy::tasksFromJSON(testTasks); - std::unordered_map taskMap; - for (size_t i = 0; i < tasks.size(); ++i) { - taskMap[tasks[i].name] = i; - } - auto genJSON = daggy::tasksToJSON(tasks); auto regenTasks = daggy::tasksFromJSON(genJSON); REQUIRE(regenTasks.size() == tasks.size()); - for (const auto &task : regenTasks) { - const auto &other = tasks[taskMap[task.name]]; + for (const auto &[name, task]: regenTasks) { + const auto &other = tasks[name]; REQUIRE(task == other); } } diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index 15521b4..8dd64db 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -74,16 +74,12 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { std::string dagRun = R"({ "name": "unit_server", "taskParameters": { "FILE": [ "A", "B" ] }, - "tasks": [ - { "name": "touch", - "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ] - }, - { - "name": "cat", - "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ], + "tasks": { + "touch": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ] }, + "cat": { "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ], "parents": [ "touch" ] } - ] + } })"; @@ -160,7 +156,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { REQUIRE(complete); std::this_thread::sleep_for(std::chrono::seconds(2)); - for (const auto &pth : std::vector{"dagrun_A", "dagrun_B"}) { + for (const auto &pth: std::vector{"dagrun_A", "dagrun_B"}) { REQUIRE(fs::exists(pth)); fs::remove(pth); } diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 17737ac..5d941da 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -62,20 +62,20 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { SECTION("Simple execution") { std::string prefix = "asdlk_"; - std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")" - + prefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")" - + prefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")" - + prefix + R"(C"]}])"; + std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")" + + prefix + R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")" + + prefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")" + + prefix + R"(C"]}})"; auto tasks = daggy::tasksFromJSON(taskJSON); auto dag = daggy::buildDAGFromTasks(tasks); auto runID = logger.startDAGRun("test_run", tasks); - auto endDAG = daggy::runDAG(runID, tasks, ex, logger, dag); + auto endDAG = daggy::runDAG(runID, ex, logger, dag); REQUIRE(endDAG.allVisited()); std::vector letters{"A", "B", "C"}; - for (const auto &letter : letters) { + for (const auto &letter: letters) { fs::path file{prefix + letter}; REQUIRE(fs::exists(file)); fs::remove(file); @@ -83,7 +83,7 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { // Get the DAG Run Attempts auto record = logger.getDAGRun(runID); - for (const auto &attempts : record.taskAttempts) { + for (const auto &[_, attempts]: record.taskAttempts) { REQUIRE(attempts.size() == 1); REQUIRE(attempts.front().rc == 0); } @@ -93,45 +93,80 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { auto cleanup = []() { // Cleanup std::vector paths{"rec_error_A", "noexist"}; - for (const auto &pth : paths) { + for (const auto &pth: paths) { if (fs::exists(pth)) fs::remove_all(pth); } }; cleanup(); - - // daggy::loggers::dag_run::OStreamLogger logger(std::cout); - std::string goodPrefix = "rec_error_"; std::string badPrefix = "noexist/rec_error_"; - std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")" + std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")" + goodPrefix + - R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")" - + badPrefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")" - + badPrefix + R"(C"]}])"; + R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")" + + badPrefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")" + + badPrefix + R"(C"]}})"; auto tasks = daggy::tasksFromJSON(taskJSON); auto dag = daggy::buildDAGFromTasks(tasks); auto runID = logger.startDAGRun("test_run", tasks); - auto tryDAG = daggy::runDAG(runID, tasks, ex, logger, dag); + auto tryDAG = daggy::runDAG(runID, ex, logger, dag); REQUIRE(!tryDAG.allVisited()); // Create the missing dir, then continue to run the DAG fs::create_directory("noexist"); tryDAG.resetRunning(); - auto endDAG = daggy::runDAG(runID, tasks, ex, logger, tryDAG); + auto endDAG = daggy::runDAG(runID, ex, logger, tryDAG); REQUIRE(endDAG.allVisited()); // Get the DAG Run Attempts auto record = logger.getDAGRun(runID); - REQUIRE(record.taskAttempts[0].size() == 1); // A ran fine - REQUIRE(record.taskAttempts[1].size() == 2); // B errored and had to be retried - REQUIRE(record.taskAttempts[2].size() == 1); // C wasn't run because B errored + REQUIRE(record.taskAttempts["A"].size() == 1); // A ran fine + REQUIRE(record.taskAttempts["B"].size() == 2); // B errored and had to be retried + REQUIRE(record.taskAttempts["C"].size() == 1); // C wasn't run because B errored cleanup(); } + + SECTION("Generator tasks") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"}; + auto params = daggy::parametersFromJSON(testParams); + + std::string generatorOutput = R"({"B": {"command": ["/usr/bin/echo", "{{DATE}}"], "children": ["C"]}})"; + std::stringstream jsonTasks; + + jsonTasks << R"({ "A": { "command": [ "/usr/bin/echo", )" << std::quoted(generatorOutput) + << R"(], "children": ["C"], "isGenerator": true},)" + << R"("C": { "command": [ "/usr/bin/echo", "hello!"] } })"; + + auto tasks = daggy::tasksFromJSON(jsonTasks.str()); + auto dag = daggy::buildDAGFromTasks(tasks); + REQUIRE(dag.size() == 2); + + auto runID = logger.startDAGRun("generator_run", tasks); + auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params); + + REQUIRE(finalDAG.size() == 4); + + // Check the logger + auto record = logger.getDAGRun(runID); + + REQUIRE(record.tasks.size() == 4); + REQUIRE(record.taskRunStates.size() == 4); + for (const auto & [taskName, attempts] : record.taskAttempts) { + REQUIRE(attempts.size() == 1); + REQUIRE(attempts.back().rc == 0); + } + + // Ensure that children were updated properly + REQUIRE(record.tasks["A"].children == std::unordered_set{"B_0", "B_1", "C"}); + REQUIRE(record.tasks["B_0"].children == std::unordered_set{"C"}); + REQUIRE(record.tasks["B_1"].children == std::unordered_set{"C"}); + REQUIRE(record.tasks["C"].children.empty()); + + } }