From dc8ea4c369a092d887d0b7b76a19edd8b3647edc Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 20 Aug 2021 12:43:01 -0300 Subject: [PATCH] - Adding support for state to OStreamLogger to make it more useful for test cases - Making runDAG return the end DAG - Adding much more robust test for DAG execution for basic tests. --- daggy/include/daggy/Utilities.hpp | 10 +++--- .../daggy/loggers/dag_run/OStreamLogger.hpp | 2 +- daggy/src/Utilities.cpp | 11 +++--- daggy/src/loggers/dag_run/OStreamLogger.cpp | 33 +++++++++++++++--- tests/unit_utilities.cpp | 34 ++++++++++++++++--- 5 files changed, 70 insertions(+), 20 deletions(-) diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index aba7bef..e0182fb 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -28,11 +28,11 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger); - void runDAG(DAGRunID runID, - std::vector tasks, - executors::task::TaskExecutor &executor, - loggers::dag_run::DAGRunLogger &logger, - DAG dag); + DAG runDAG(DAGRunID runID, + std::vector tasks, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGRunLogger &logger, + DAG dag); std::ostream &operator<<(std::ostream &os, const TimePoint &tp); } diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index c3cc8d9..990c4fc 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -32,9 +32,9 @@ namespace daggy { DAGRunRecord getDAGRun(DAGRunID dagRunID) override; private: - DAGRunID nextRunID_; std::mutex guard_; std::ostream &os_; + std::vector dagRuns_; }; } } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 0deff84..ed9657e 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -88,11 +88,11 @@ namespace daggy { return attempts; } - void runDAG(DAGRunID runID, - std::vector tasks, - executors::task::TaskExecutor &executor, - loggers::dag_run::DAGRunLogger &logger, - DAG dag) { + DAG runDAG(DAGRunID runID, + std::vector tasks, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGRunLogger &logger, + DAG dag) { logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); struct TaskState { @@ -149,6 +149,7 @@ namespace daggy { } std::this_thread::sleep_for(250ms); } + return dag; } std::ostream &operator<<(std::ostream &os, const TimePoint &tp) { diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index a6ff5d0..35a034e 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -7,17 +8,26 @@ namespace daggy { namespace loggers { namespace dag_run { - OStreamLogger::OStreamLogger(std::ostream &os) : nextRunID_(0), os_(os) {} + OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {} // Execution DAGRunID OStreamLogger::startDAGRun(std::string name, const std::vector &tasks) { std::lock_guard lock(guard_); - size_t runID = nextRunID_++; + size_t runID = dagRuns_.size(); + dagRuns_.push_back({ + .name = name, + .tasks = tasks, + //.runStates = std::vector(tasks.size(), RunState::QUEUED), + .runStates{tasks.size(), RunState::QUEUED}, + .taskAttempts = std::vector>(tasks.size()) + }); + os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; for (const auto &task : tasks) { os_ << "TASK (" << task.name << "): "; - std::copy(task.command.begin(), task.command.end(), std::ostream_iterator(os_, " ")); + std::copy(task.command.begin(), task.command.end(), + std::ostream_iterator(os_, " ")); os_ << std::endl; } return runID; @@ -26,6 +36,7 @@ namespace daggy { void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) { std::lock_guard lock(guard_); os_ << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl; + dagRuns_[dagRunID].dagStateChanges.push_back({Clock::now(), state}); } void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName, @@ -34,18 +45,32 @@ namespace daggy { const std::string &msg = attempt.rc == 0 ? attempt.outputLog : attempt.errorLog; os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": " << msg << std::endl; + + const auto &tasks = dagRuns_[dagRunID].tasks; + auto it = std::find_if(tasks.begin(), tasks.end(), + [&taskName](const Task &a) { return a.name == taskName; }); + if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName); + size_t taskID = it - tasks.begin(); + dagRuns_[dagRunID].taskAttempts[taskID].push_back(attempt); } void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { std::lock_guard lock(guard_); os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << magic_enum::enum_name(state) << std::endl; + + const auto &tasks = dagRuns_[dagRunID].tasks; + auto it = std::find_if(tasks.begin(), tasks.end(), + [&taskName](const Task &a) { return a.name == taskName; }); + if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName); + size_t taskID = it - tasks.begin(); + dagRuns_[dagRunID].taskStateChanges.push_back({Clock::now(), taskID, state}); } // Querying std::vector OStreamLogger::getDAGs(uint32_t stateMask) { return {}; } - DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return {}; } + DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return dagRuns_[dagRunID]; } } } } diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 8456681..b850f22 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -13,6 +13,8 @@ #include "daggy/executors/task/ForkingTaskExecutor.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" +namespace fs = std::filesystem; + TEST_CASE("String Utilities", "[utilities_string]") { std::string test = "/this/is/{{A}}/test/{{A}}"; auto res = daggy::globalSub(test, "{{A}}", "hello"); @@ -58,10 +60,32 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") { std::stringstream ss; daggy::loggers::dag_run::OStreamLogger logger(ss); - std::string taskJSON = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; - auto tasks = daggy::tasksFromJSON(taskJSON); - auto dag = daggy::buildDAGFromTasks(tasks); + SECTION("Simple execution") { + std::string prefix = "/tmp/asdlk_"; + std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")" + + prefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")" + + prefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")" + + prefix + R"(C"]}])"; + auto tasks = daggy::tasksFromJSON(taskJSON); + auto dag = daggy::buildDAGFromTasks(tasks); - auto runID = logger.startDAGRun("test_run", tasks); - daggy::runDAG(runID, tasks, ex, logger, dag); + auto runID = logger.startDAGRun("test_run", tasks); + auto endDAG = daggy::runDAG(runID, tasks, ex, logger, dag); + + REQUIRE(endDAG.allVisited()); + + std::vector letters{"A", "B", "C"}; + for (const auto &letter : letters) { + fs::path file{prefix + letter}; + REQUIRE(fs::exists(file)); + fs::remove(file); + } + + // Get the DAG Run Attempts + auto record = logger.getDAGRun(runID); + for (const auto &attempts : record.taskAttempts) { + REQUIRE(attempts.size() == 1); + REQUIRE(attempts.front().rc == 0); + } + } }