From 2083b1c3f1508dc56d3bebae3486faa78078bbc7 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 20 Aug 2021 21:44:12 -0300 Subject: [PATCH] Refactoring RunState, fixing logic error in when to end errored DAG runs, added convenience functions to retry failed DAGs. --- .gitignore | 4 +- daggy/include/daggy/DAG.hpp | 19 ++++---- daggy/include/daggy/Defines.hpp | 8 ++++ daggy/include/daggy/Utilities.hpp | 3 +- .../include/daggy/loggers/dag_run/Defines.hpp | 21 +++------ daggy/src/DAG.cpp | 23 +++++++--- daggy/src/Utilities.cpp | 43 ++++++++++++++---- tests/unit_utilities.cpp | 45 +++++++++++++++++++ 8 files changed, 124 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index 27ebc3b..2ac2e63 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ build .cache -cmake-build-* -.idea \ No newline at end of file +cmake-build-debug/ +.idea diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index 37c2fee..bec335b 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -9,6 +9,8 @@ #include #include +#include "Defines.hpp" + /* The DAG structure in daggy is just to ensure that tasks are run in the correct dependent order. @@ -16,14 +18,8 @@ namespace daggy { - enum class VertexState : uint32_t { - UNVISITED = 0, - VISITING, - VISITED - }; - struct Vertex { - VertexState state; + RunState state; uint32_t depCount; std::unordered_set children; }; @@ -51,10 +47,15 @@ namespace daggy { bool empty() const; - // Traversal + // Reset the DAG to completely unvisited void reset(); - VertexState getVertexState(const size_t id) const; + // Reset any vertex with RUNNING state to QUEUED + void resetRunning(); + + RunState getVertexState(const size_t id) const; + + void setVertexState(const size_t id, RunState state); bool allVisited() const; diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index 2cca419..3d5291d 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -21,4 +21,12 @@ namespace daggy { using DAGRunID = size_t; using TaskID = size_t; + enum class RunState : uint32_t { + QUEUED = 0, + RUNNING = 1, + RETRY = 1 << 1, + ERRORED = 1 << 2, + KILLED = 1 << 3, + COMPLETED = 1 << 4 + }; } diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index e0182fb..e25f335 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -18,7 +18,8 @@ namespace daggy { std::vector expandCommands(const std::vector &command, const ParameterValues ¶meters); - DAG buildDAGFromTasks(const std::vector &tasks); + DAG buildDAGFromTasks(const std::vector &tasks, + const std::vector &updates = {}); // Blocking call std::vector diff --git a/daggy/include/daggy/loggers/dag_run/Defines.hpp b/daggy/include/daggy/loggers/dag_run/Defines.hpp index 0b9112e..ca5292d 100644 --- a/daggy/include/daggy/loggers/dag_run/Defines.hpp +++ b/daggy/include/daggy/loggers/dag_run/Defines.hpp @@ -3,15 +3,6 @@ namespace daggy { namespace loggers { namespace dag_run { - enum class RunState : uint32_t { - QUEUED = 0, - RUNNING = 1, - RETRY = 1 << 1, - ERRORED = 1 << 2, - KILLED = 1 << 3, - COMPLETED = 1 << 4 - }; - struct TaskUpdateRecord { TimePoint time; TaskID taskID; @@ -26,11 +17,11 @@ namespace daggy { // Pretty heavy weight, but struct DAGRunRecord { std::string name; - std::vector tasks; - std::vector runStates; - std::vector > taskAttempts; - std::vector taskStateChanges; - std::vector dagStateChanges; + std::vector tasks; + std::vector runStates; + std::vector> taskAttempts; + std::vector taskStateChanges; + std::vector dagStateChanges; }; struct DAGRunSummary { @@ -39,7 +30,7 @@ namespace daggy { RunState runState; TimePoint startTime; TimePoint lastUpdate; - std::unordered_map taskStateCounts; + std::unordered_map taskStateCounts; }; } } diff --git a/daggy/src/DAG.cpp b/daggy/src/DAG.cpp index c81d87a..aa9ed07 100644 --- a/daggy/src/DAG.cpp +++ b/daggy/src/DAG.cpp @@ -7,7 +7,7 @@ namespace daggy { bool DAG::empty() const { return vertices_.empty(); } size_t DAG::addVertex() { - vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0}); + vertices_.push_back(Vertex{.state = RunState::QUEUED, .depCount = 0}); return vertices_.size() - 1; } @@ -39,7 +39,7 @@ namespace daggy { void DAG::reset() { // Reset the state of all vertices for (auto &v : vertices_) { - v.state = VertexState::UNVISITED; + v.state = RunState::QUEUED; v.depCount = 0; } @@ -51,9 +51,20 @@ namespace daggy { } } + void DAG::resetRunning() { + for (auto &v : vertices_) { + if (v.state != RunState::RUNNING) continue; + v.state = RunState::QUEUED; + } + } + + void DAG::setVertexState(const size_t id, RunState state) { + vertices_[id].state = state; + } + bool DAG::allVisited() const { for (const auto &v : vertices_) { - if (v.state != VertexState::VISITED) return false; + if (v.state != RunState::COMPLETED) return false; } return true; } @@ -62,9 +73,9 @@ namespace daggy { for (size_t i = 0; i < vertices_.size(); ++i) { auto &v = vertices_[i]; - if (v.state != VertexState::UNVISITED) continue; + if (v.state != RunState::QUEUED) continue; if (v.depCount != 0) continue; - v.state = VertexState::VISITING; + v.state = RunState::RUNNING; return i; } return {}; @@ -72,7 +83,7 @@ namespace daggy { void DAG::completeVisit(const size_t id) { auto &v = vertices_[id]; - v.state = VertexState::VISITED; + v.state = RunState::COMPLETED; for (auto c : v.children) { --vertices_[c].depCount; } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index ed9657e..b2557aa 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -52,7 +52,8 @@ namespace daggy { return commands; } - DAG buildDAGFromTasks(const std::vector &tasks) { + DAG buildDAGFromTasks(const std::vector &tasks, + const std::vector &updates) { DAG dag; std::unordered_map taskIDs; @@ -68,6 +69,20 @@ namespace daggy { } } dag.reset(); + + // Replay any updates + for (const auto &update : updates) { + switch (update.newState) { + case RunState::RUNNING: + case RunState::RETRY: + case RunState::ERRORED: + case RunState::KILLED: + dag.setVertexState(update.taskID, RunState::RUNNING); + dag.setVertexState(update.taskID, RunState::COMPLETED); + break; + } + } + return dag; } @@ -77,13 +92,13 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger) { std::vector attempts; - logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RUNNING); + logger.updateTaskState(runID, task.name, RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { attempts.push_back(executor.runCommand(task.command)); logger.logTaskAttempt(runID, task.name, attempts.back()); if (attempts.back().rc == 0) break; - logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RETRY); + logger.updateTaskState(runID, task.name, RunState::RETRY); } return attempts; } @@ -93,7 +108,7 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger, DAG dag) { - logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); + logger.updateDAGRunState(runID, RunState::RUNNING); struct TaskState { size_t tid; @@ -103,6 +118,9 @@ namespace daggy { std::vector taskStates; + // TODO Handle case where everything is wedged due to errors + size_t running = 0; + size_t errored = 0; while (!dag.allVisited()) { // Check for any completed tasks for (auto &taskState : taskStates) { @@ -112,16 +130,18 @@ namespace daggy { auto attemptRecords = taskState.fut.get(); const auto &taskName = tasks[taskState.tid].name; if (attemptRecords.empty()) { - logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED); - continue; + logger.updateTaskState(runID, taskName, RunState::ERRORED); + ++errored; } if (attemptRecords.back().rc == 0) { - logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::COMPLETED); + logger.updateTaskState(runID, taskName, RunState::COMPLETED); dag.completeVisit(taskState.tid); - taskState.complete = true; + --running; } else { - logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED); + logger.updateTaskState(runID, taskName, RunState::ERRORED); + ++errored; } + taskState.complete = true; } } @@ -139,6 +159,7 @@ namespace daggy { .complete = false }; taskStates.push_back(std::move(tsk)); + ++running; auto nextTask = dag.visitNext(); if (not nextTask.has_value()) break; @@ -147,6 +168,10 @@ namespace daggy { if (!tq->empty()) { executor.threadPool.addTasks(tq); } + if (running > 0 and errored == running) { + logger.updateDAGRunState(runID, RunState::ERRORED); + break; + } std::this_thread::sleep_for(250ms); } return dag; diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index b850f22..30df212 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -88,4 +88,49 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { REQUIRE(attempts.front().rc == 0); } } + + SECTION("Recovery from Error") { + auto cleanup = []() { + // Cleanup + std::vector paths{"/tmp/rec_error_A", "/tmp/noexist" }; + for (const auto & pth : paths) { + if (fs::exists(pth)) fs::remove_all(pth); + } + }; + + cleanup(); + + + // daggy::loggers::dag_run::OStreamLogger logger(std::cout); + + std::string goodPrefix = "/tmp/rec_error_"; + std::string badPrefix = "/tmp/noexist/rec_error_"; + std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")" + + goodPrefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")" + + badPrefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")" + + badPrefix + R"(C"]}])"; + auto tasks = daggy::tasksFromJSON(taskJSON); + auto dag = daggy::buildDAGFromTasks(tasks); + + auto runID = logger.startDAGRun("test_run", tasks); + + auto tryDAG = daggy::runDAG(runID, tasks, ex, logger, dag); + + REQUIRE(!tryDAG.allVisited()); + + // Create the missing dir, then continue to run the DAG + fs::create_directory("/tmp/noexist"); + tryDAG.resetRunning(); + auto endDAG = daggy::runDAG(runID, tasks, ex, logger, tryDAG); + + REQUIRE(endDAG.allVisited()); + + // Get the DAG Run Attempts + auto record = logger.getDAGRun(runID); + REQUIRE(record.taskAttempts[0].size() == 1); // A ran fine + REQUIRE(record.taskAttempts[1].size() == 2); // B errored and had to be retried + REQUIRE(record.taskAttempts[2].size() == 1); // C wasn't run because B errored + + cleanup(); + } }