From 7b07380e1640b06f8b9bb99992f35e8ce50c9546 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Tue, 31 Aug 2021 12:21:34 -0300 Subject: [PATCH] - Removing duplicate information (taskName stored in 3 places) --- TODO.md | 6 ----- daggy/include/daggy/DAG.hpp | 5 ++-- daggy/include/daggy/DAG.impl.hxx | 25 ++++++++++------- daggy/include/daggy/Defines.hpp | 4 +-- daggy/include/daggy/Utilities.hpp | 5 +--- daggy/src/Serialization.cpp | 2 -- daggy/src/Utilities.cpp | 27 +++++++------------ .../src/loggers/dag_run/FileSystemLogger.cpp | 8 +++--- daggy/src/loggers/dag_run/OStreamLogger.cpp | 4 +-- tests/unit_dag.cpp | 9 ++++--- 10 files changed, 41 insertions(+), 54 deletions(-) diff --git a/TODO.md b/TODO.md index 72a8a3b..dfe5618 100644 --- a/TODO.md +++ b/TODO.md @@ -8,12 +8,6 @@ Tasks - Core Functionality - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated list - - Allow for tasks to define next tasks - - Refactor [de]serialization so that a task can be parsed by itself - - Add notation of parameterValues - - Tasks are now refered by two names: - - baseName is the original name in the spec - - name is the individual tasks - Add execution gates - Executors - [ ] Slurm Executor diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index 9f7aacb..1a84621 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -23,7 +23,6 @@ namespace daggy { struct Vertex { RunState state; uint32_t depCount; - K key; V data; std::unordered_set children; }; @@ -64,11 +63,11 @@ namespace daggy { void setVertexState(const K &id, RunState state); - void forEach(std::function &)> fun) const; + void forEach(std::function> &)> fun) const; bool allVisited() const; - std::optional> visitNext(); + std::optional> visitNext(); Vertex &getVertex(const K &id); diff --git a/daggy/include/daggy/DAG.impl.hxx b/daggy/include/daggy/DAG.impl.hxx index d14857c..423b6ed 100644 --- a/daggy/include/daggy/DAG.impl.hxx +++ b/daggy/include/daggy/DAG.impl.hxx @@ -18,8 +18,7 @@ namespace daggy { ss << "A vertex with ID " << id << " already exists in the DAG"; throw std::runtime_error(ss.str()); } - vertices_.emplace(id, Vertex{.state = RunState::QUEUED, .depCount = 0, .key = id, .data = data - }); + vertices_.emplace(id, Vertex{.state = RunState::QUEUED, .depCount = 0, .data = data}); } template @@ -90,12 +89,13 @@ namespace daggy { } template - std::optional> DAG::visitNext() { + std::optional> + DAG::visitNext() { for (auto &[k, v]: vertices_) { if (v.state != +RunState::QUEUED) continue; if (v.depCount != 0) continue; v.state = RunState::RUNNING; - return v; + return std::make_pair(k, v.data); } return {}; } @@ -110,10 +110,17 @@ namespace daggy { } template - void DAG::forEach(std::function &)> fun) const { - for (const auto &[_, v]: vertices_) { - fun(v); - } - } + void DAG::forEach(std::function> &) + > fun) const { + for ( + auto it = vertices_.begin(); + it != vertices_. + + end(); + + ++it) { + fun(*it); +} +} } diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index dd598ee..2569f06 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -33,8 +33,6 @@ namespace daggy { ); struct Task { - std::string name; - // definedName is the name from the original DAGDefinition. std::string definedName; std::vector command; uint32_t maxRetries; @@ -44,7 +42,7 @@ namespace daggy { bool isGenerator; // True if the output of this task is a JSON set of tasks to complete bool operator==(const Task &other) const { - return (name == other.name) + return (definedName == other.definedName) and (maxRetries == other.maxRetries) and (retryIntervalSeconds == other.retryIntervalSeconds) and (command == other.command) diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 79236fa..26469f4 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -19,9 +19,6 @@ namespace daggy { std::vector expandCommands(const std::vector &command, const ParameterValues ¶meters); - std::unordered_set - findDerivedVertices(TaskDAG &dag, const std::string &definedName); - TaskDAG buildDAGFromTasks(TaskList &tasks, const std::vector &updates = {}); @@ -31,7 +28,7 @@ namespace daggy { // Blocking call std::vector runTask(DAGRunID runID, - TaskID taskID, + const std::string &taskName, const Task &task, executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger); diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index a7e110c..63dcfe9 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -93,7 +93,6 @@ namespace daggy { for (size_t tid = 0; tid < commands.size(); ++tid) { std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid)); tasks.emplace(taskName, Task{ - .name = taskName, .definedName = name, .command = commands[tid], .maxRetries = maxRetries, @@ -136,7 +135,6 @@ namespace daggy { bool first = false; ss << "{" - << R"("name": )" << std::quoted(task.name) << ',' << R"("maxRetries": )" << task.maxRetries << ',' << R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ','; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index e0e2abd..ae96bad 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -53,17 +53,6 @@ namespace daggy { return commands; } - std::unordered_set - findDerivedVertices(TaskDAG &dag, const std::string &definedName) { - std::unordered_set vertices; - dag.forEach([&](const auto &v) { - if (v.data.definedName == definedName) { - vertices.insert(v.data.name); - } - }); - return vertices; - } - void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks) { // Add all the vertices std::unordered_map> definedSets; @@ -113,17 +102,18 @@ namespace daggy { } std::vector runTask(DAGRunID runID, + const std::string &taskName, const Task &task, executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger) { std::vector attempts; - logger.updateTaskState(runID, task.name, RunState::RUNNING); + logger.updateTaskState(runID, taskName, RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { attempts.push_back(executor.runCommand(task)); - logger.logTaskAttempt(runID, task.name, attempts.back()); + logger.logTaskAttempt(runID, taskName, attempts.back()); if (attempts.back().rc == 0) break; - logger.updateTaskState(runID, task.name, RunState::RETRY); + logger.updateTaskState(runID, taskName, RunState::RETRY); } return attempts; } @@ -167,7 +157,7 @@ namespace daggy { } logger.updateTask(runID, taskName, task); } catch (std::exception &e) { - logger.updateTaskState(runID, task.name, RunState::ERRORED); + logger.updateTaskState(runID, taskName, RunState::ERRORED); ++errored; } } @@ -185,9 +175,10 @@ namespace daggy { auto t = dag.visitNext(); while (t.has_value()) { // Schedule the task to run - auto vertex = t.value(); - runningTasks.emplace(vertex.data.name, tq->addTask([runID, vertex, &executor, &logger]() { - return runTask(runID, vertex.data, executor, logger); + auto &taskName = t.value().first; + auto &task = t.value().second; + runningTasks.emplace(taskName, tq->addTask([runID, taskName, task, &executor, &logger]() { + return runTask(runID, taskName, task, executor, logger); })); ++running; diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index 2e47724..4aea9dd 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -158,10 +158,10 @@ namespace daggy { ifh.close(); // Task states - for (const auto &[_, task]: record.tasks) { - auto taskStateFile = runRoot / task.name / "states.csv"; + for (const auto &[taskName, task]: record.tasks) { + auto taskStateFile = runRoot / taskName / "states.csv"; if (!fs::exists(taskStateFile)) { - record.taskRunStates.emplace(task.name, RunState::QUEUED); + record.taskRunStates.emplace(taskName, RunState::QUEUED); continue; } @@ -170,7 +170,7 @@ namespace daggy { std::stringstream ss{line}; while (std::getline(ss, token, ',')) { continue; } RunState taskState = RunState::_from_string(token.c_str()); - record.taskRunStates.emplace(task.name, taskState); + record.taskRunStates.emplace(taskName, taskState); ifh.close(); } return record; diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index 57f4ca8..d826362 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -25,8 +25,8 @@ namespace daggy { os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; - for (const auto &[_, task]: tasks) { - os_ << "TASK (" << task.name << "): "; + for (const auto &[name, task]: tasks) { + os_ << "TASK (" << name << "): "; std::copy(task.command.begin(), task.command.end(), std::ostream_iterator(os_, " ")); os_ << std::endl; diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index c84e367..88965fd 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -70,8 +70,8 @@ TEST_CASE("DAG Traversal Tests", "[dag]") { size_t i = 0; while (!dag.allVisited()) { const auto &v = dag.visitNext().value(); - dag.completeVisit(v.key); - visitOrder[v.key] = i; + dag.completeVisit(v.first); + visitOrder[v.first] = i; ++i; } @@ -83,7 +83,10 @@ TEST_CASE("DAG Traversal Tests", "[dag]") { SECTION("Iteration") { size_t nVisited = 0; - dag.forEach([&](const daggy::Vertex &) { ++nVisited; }); + dag.forEach([&](auto &k) { + (void) k; + ++nVisited; + }); REQUIRE(nVisited == dag.size()); } }