diff --git a/.gitignore b/.gitignore index 08f2b60..27ebc3b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ build .cache cmake-build-* +.idea \ No newline at end of file diff --git a/TODO.md b/TODO.md index d27fcb8..08774f0 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,5 @@ - Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger - Separate concerns for DAG logger vs DAG definition storage - Add in authorization scheme (maybe JWT?) -- Flesh out server and interface \ No newline at end of file +- Flesh out server and interface +- Add ability to define child -> parent relationships \ No newline at end of file diff --git a/daggy/include/daggy/Logger.hpp b/daggy/include/daggy/Logger.hpp new file mode 100644 index 0000000..105b944 --- /dev/null +++ b/daggy/include/daggy/Logger.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include + +#include "DAGRun.hpp" + +/* + MetaStore represents the interface to store all the state information + for daggy to run. Abstracted in case other back-end solutions need to + be supported. +*/ + +namespace daggy { + using DAGDefID = int16_t; + using DAGRunID = size_t; + + enum class DAGRunState : uint32_t { + QUEUED = 0, + RUNNING, + ERRORED, + KILLED, + COMPLETED + }; + + class DAGLogger { + public: + // Execution + virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) = 0; + virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0; + virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0; + virtual void updateDAGRunState(DAGRunID dagRunId, DAGRunState state) = 0; + }; +} diff --git a/daggy/include/daggy/MetaStore.hpp b/daggy/include/daggy/MetaStore.hpp deleted file mode 100644 index 3764bab..0000000 --- a/daggy/include/daggy/MetaStore.hpp +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include - -#include "DAGRun.hpp" - -/* - MetaStore represents the interface to store all the state information - for daggy to run. Abstracted in case other back-end solutions need to - be supported. -*/ - -namespace daggy { - using DAGDefID = int16_t; - using DAGRunID = size_t; - - class MetaStore { - // Basic storage + retrieval of DAG Definitions - virtual DAGDefID storeDAGDefinition(std::string name, std::string definition) = 0; - - virtual DAGDefID getCurrentDAGVersion(std::string name) = 0; - - virtual std::string getDAGDefinition(std::string name, DAGDefID version = -1) = 0; - - // DAG Run State - - /* - * startDAGRun // DAG starts up, returns a DAGID for future updates - * updateDAGRun // DAG State transitions - * updateTaskState // Task state updates - */ - virtual DAGRunID startDAGRun(std::string dagName, DAGDefID version, DAGRun dagRun - ) = 0; - - virtual void updateTask(DAGRunID rid, std::string taskName, VertexState state) = 0; - - virtual void updateDAGRun(DAGRunID rid, DAGState state) = 0; - - // Retrievals - virtual DAGRun & getDAGRun(DAGRunID) = 0; - }; -} diff --git a/daggy/include/daggy/TaskExecutor.hpp b/daggy/include/daggy/TaskExecutor.hpp index 565be6b..e817827 100644 --- a/daggy/include/daggy/TaskExecutor.hpp +++ b/daggy/include/daggy/TaskExecutor.hpp @@ -8,6 +8,7 @@ #include "Task.hpp" #include "AttemptRecord.hpp" +#include "ThreadPool.hpp" /* Executors run Tasks, returning a future with the results. @@ -17,11 +18,13 @@ namespace daggy { class TaskExecutor { public: - TaskExecutor() = default; + TaskExecutor(size_t nThreads) : threadPool(nThreads) {}; virtual const std::string getName() const = 0; // This will block if the executor is full virtual AttemptRecord runCommand(std::vector cmd) = 0; + + ThreadPool threadPool; }; } diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 8169898..e315883 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -7,7 +7,10 @@ #include +#include "Logger.hpp" +#include "TaskExecutor.hpp" #include "Task.hpp" +#include "ThreadPool.hpp" namespace rj = rapidjson; @@ -16,10 +19,20 @@ namespace daggy { using ParameterValues = std::unordered_map; using Command = std::vector; + // Dealing with JSON ParameterValues parseParameters(const std::string & jsonSpec); ParameterValues parseParameters(const rj::Document & spec); std::vector buildTasks(const std::string & jsonSpec, const ParameterValues & parameters = {}); std::vector buildTasks(const rj::Document & spec, const ParameterValues & parameters = {}); - std::vector expandCommands(const std::vector & command, const ParameterValues & parameters); + + // DAG execution + // DAG vertex IDs should correspond to the position of tasks in vector. e.g. Vertex ID 0 corresponds to tasks[0] + // I'm not crazy about this loose coupling, but + void runDAG(DAGRunID runID, + std::vector tasks, + TaskExecutor & executor, + DAGLogger & logger, + DAG dag); + } diff --git a/daggy/include/daggy/executors/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/ForkingTaskExecutor.hpp index f17e5cf..66a23db 100644 --- a/daggy/include/daggy/executors/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/ForkingTaskExecutor.hpp @@ -7,7 +7,9 @@ namespace daggy { namespace executor { class ForkingTaskExecutor : public TaskExecutor { public: - ForkingTaskExecutor() = default; + ForkingTaskExecutor(size_t nThreads) + : TaskExecutor(nThreads) + {} const std::string getName() const override { return "ForkingTaskExecutor"; } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 4b251da..4c813f6 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -71,6 +71,9 @@ namespace daggy { const std::vector reqFields{"name", "command"}; std::unordered_map> childrenMap; + // Maps child -> parent + std::unordered_map> parentMap; + std::unordered_map taskIndex; // Tasks for (size_t i = 0; i < spec.Size(); ++i) { @@ -79,7 +82,7 @@ namespace daggy { } const auto & taskSpec = spec[i].GetObject(); - for (const auto reqField : reqFields) { + for (const auto & reqField : reqFields) { if (! taskSpec.HasMember(reqField.c_str())) { throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField); } @@ -87,10 +90,14 @@ namespace daggy { // Grab the standard fields with defaults; std::string name = taskSpec["name"].GetString(); + taskIndex[name] = i; + uint8_t maxRetries = 0; if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); } uint8_t retryIntervalSeconds = 0; if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); } + + // Children / parents std::vector children; if (taskSpec.HasMember("children")) { const auto & specChildren = taskSpec["children"].GetArray(); @@ -98,17 +105,22 @@ namespace daggy { children.emplace_back(specChildren[c].GetString()); } } + if (taskSpec.HasMember("parents")) { + const auto & specParents = taskSpec["parents"].GetArray(); + for (size_t c = 0; c < specParents.Size(); ++c) { + parentMap[name].emplace_back(specParents[c].GetString()); + } + } // Build out the commands std::vector command; for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) { - command.push_back(taskSpec["command"][cmd].GetString()); + command.emplace_back(taskSpec["command"][cmd].GetString()); } auto commands = expandCommands(command, parameters); // Create the tasks auto & taskNames = childrenMap[name]; - size_t tid = 0; for (size_t tid = 0; tid < commands.size(); ++tid) { std::string taskName = name + "_" + std::to_string(tid); taskNames.push_back(taskName); @@ -122,6 +134,16 @@ namespace daggy { } } + // Update any missing child -> parent relationship + for (auto & task : tasks) { + auto pit = parentMap.find(task.name); + if (pit == parentMap.end()) { continue; } + + for (const auto & parent : pit->second) { + tasks[taskIndex[parent]].children.emplace_back(task.name); + } + } + // At the end, replace the names of the children with all the expanded versions for (auto & task : tasks) { std::vector children; @@ -143,4 +165,66 @@ namespace daggy { } return buildTasks(doc, parameters); } -} + + void runDAG(DAGRunID runID, + std::vector tasks, + TaskExecutor & executor, + DAGLogger & logger, + DAG dag) { + + struct TaskState { + size_t tid; + std::future> fut; + bool complete; + }; + + std::vector taskStates; + + while (!dag.allVisited()) { + // Check for any completed tasks + for (auto &taskState : taskStates) { + if (taskState.complete) continue; + + if (taskState.fut.valid()) { + auto attemptRecords = taskState.fut.get(); + if (attemptRecords.back().rc == 0) { + dag.completeVisit(taskState.tid); + } + taskState.complete = true; + } + } + + // Add all remaining tasks in a task queue to avoid dominating the thread pool + auto tq = std::make_shared(); + auto t = dag.visitNext(); + while (t.has_value()) { + // Schedule the task to run + auto tid = t.value(); + TaskState tsk{ + .tid = tid, + .fut = tq->addTask( + [tid, &tasks, &executor]() { + std::vector attempts; + + while (attempts.size() < tasks[tid].maxRetries) { + attempts.push_back(executor.runCommand(tasks[tid].command)); + if (attempts.back().rc == 0) break; + } + return attempts; + }) + , .complete = false + }; + taskStates.push_back(std::move(tsk)); + + // + auto nextTask = dag.visitNext(); + if (not nextTask.has_value()) break; + t.emplace(nextTask.value()); + } + if (! tq->empty()) { + executor.threadPool.addTasks(tq); + } + std::this_thread::sleep_for(250ms); + } + } +} \ No newline at end of file diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index 1fe3b42..acf4e33 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -6,7 +6,7 @@ #include "catch.hpp" TEST_CASE("Basic Execution", "[forking_executor]") { - daggy::executor::ForkingTaskExecutor ex; + daggy::executor::ForkingTaskExecutor ex(10); SECTION("Simple Run") { std::vector cmd{"/usr/bin/echo", "abc", "123"}; diff --git a/tests/unit_scheduler.cpp b/tests/unit_scheduler.cpp index 25cd03a..cbc582f 100644 --- a/tests/unit_scheduler.cpp +++ b/tests/unit_scheduler.cpp @@ -7,7 +7,7 @@ #include "catch.hpp" TEST_CASE("Basic Scheduler Execution", "[scheduler]") { - daggy::executor::ForkingTaskExecutor ex; + daggy::executor::ForkingTaskExecutor ex(10); daggy::Scheduler sched(ex); std::vector tasks { diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 9a77163..5b4b0f9 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -63,17 +63,14 @@ TEST_CASE("Building Tasks", "[utilities_build_tasks]") { auto params = daggy::parseParameters(testParams); std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["B"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; auto tasks = daggy::buildTasks(testTasks, params); - - /* - for (const auto & task : tasks) { - std::cout << task.name << ": "; - for (const auto & part : task.children) { - std::cout << part << " "; - } - std::cout << std::endl; - } - */ REQUIRE(tasks.size() == 4); } + SECTION("Build with expansion using parents instead of children") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; + auto params = daggy::parseParameters(testParams); + std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]},{"name": "C", "command": ["/bin/echo", "C"], "parents": ["A"]}])"; + auto tasks = daggy::buildTasks(testTasks, params); + REQUIRE(tasks.size() == 4); + } } \ No newline at end of file