From 39d5ae08be436d742ef4a03770aff1b6d9cc4afe Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 20 Sep 2021 19:05:56 -0300 Subject: [PATCH] Adding a No-op task executor for testing Fixing DFS implementation of DAG validation to be much faster Adding in additional tests to ensure the run order of expanded tasks is preserved Adding additional compile-time checks, resolving issues that came up as a result --- CMakeLists.txt | 1 + daggy/include/daggy/DAG.hpp | 5 +- daggy/include/daggy/DAG.impl.hxx | 60 +++++----- daggy/include/daggy/Defines.hpp | 2 +- daggy/include/daggy/Utilities.hpp | 2 +- .../daggy/executors/task/NoopTaskExecutor.hpp | 20 ++++ daggy/src/Serialization.cpp | 9 ++ daggy/src/Utilities.cpp | 32 ++---- daggy/src/executors/task/CMakeLists.txt | 1 + daggy/src/executors/task/NoopTaskExecutor.cpp | 42 +++++++ tests/unit_dag.cpp | 4 +- tests/unit_server.cpp | 15 ++- tests/unit_utilities.cpp | 104 +++++++++--------- utils/daggyd/daggyd.cpp | 3 +- 14 files changed, 187 insertions(+), 113 deletions(-) create mode 100644 daggy/include/daggy/executors/task/NoopTaskExecutor.hpp create mode 100644 daggy/src/executors/task/NoopTaskExecutor.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 59c282a..c7a4ed3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ project(overall) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_EXPORT_COMPILE_COMMANDS True) +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror") set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party) diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index 18b3e5c..b43886b 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "Defines.hpp" @@ -35,7 +36,7 @@ namespace daggy { // Vertices void addVertex(K id, V data); - const std::vector> &getVertices(); + std::unordered_set getVertices() const; // Edges void addEdge(const K &src, const K &dst); @@ -75,8 +76,6 @@ namespace daggy { private: std::unordered_map> vertices_; - - std::optional findCycle_(const K & node, std::unordered_set & seen) const; }; } diff --git a/daggy/include/daggy/DAG.impl.hxx b/daggy/include/daggy/DAG.impl.hxx index 7237e97..527ac4b 100644 --- a/daggy/include/daggy/DAG.impl.hxx +++ b/daggy/include/daggy/DAG.impl.hxx @@ -11,6 +11,15 @@ namespace daggy { template Vertex &DAG::getVertex(const K &id) { return vertices_.at(id); } + template + std::unordered_set DAG::getVertices() const { + std::unordered_set keys; + for (const auto it : vertices_) { + keys.insert(it.first); + } + return keys; + } + template void DAG::addVertex(K id, V data) { if (vertices_.count(id) != 0) { @@ -31,42 +40,37 @@ namespace daggy { template void DAG::addEdgeIf(const K &src, std::function &v)> predicate) { - for (const auto &[name, vertex]: vertices_) { + auto & parent = vertices_.at(src); + for (auto &[name, vertex]: vertices_) { + if (! predicate(vertex)) continue; if (name == src) continue; - if (predicate(vertex)) addEdge(src, name); + parent.children.insert(name); + vertex.depCount++; } } - template - std::optional DAG::findCycle_(const K & node, std::unordered_set & seen) const { - if (seen.count(node) > 0) return node; - seen.insert(node); - std::optional ret; - for (const auto & child : vertices_.at(node).children) { - auto res = findCycle_(child, seen); - if (res.has_value()) { - ret.swap(res); - break; - } - } - seen.extract(node); - return ret; - } - template bool DAG::isValid() const { - std::unordered_set seen; + std::unordered_map depCounts; + std::queue ready; + size_t processed = 0; + for (const auto & [k, v] : vertices_) { - seen.clear(); - if (v.depCount != 0) continue; - auto res = findCycle_(k, seen); - if (res.has_value()) { - std::stringstream ss; - ss << "DAG contains a cycle between " << k << " and " << res.value() << std::endl; - throw std::runtime_error(ss.str()); - } + depCounts[k] = v.depCount; + if (v.depCount == 0) ready.push(k); } - return true; + + while (! ready.empty()) { + const auto & k = ready.front(); + for (const auto & child : vertices_.at(k).children) { + auto dc = --depCounts[child]; + if (dc == 0) ready.push(child); + } + processed++; + ready.pop(); + } + + return processed == vertices_.size(); } template diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index 8d6587e..ba7fb4a 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -16,7 +16,7 @@ namespace daggy { using Command = std::vector; // Time - using Clock = std::chrono::system_clock; + using Clock = std::chrono::high_resolution_clock; using TimePoint = std::chrono::time_point; // DAG Runs diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 22b8d4f..7e70626 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -29,7 +29,7 @@ namespace daggy { buildDAGFromTasks(TaskSet &tasks, const std::vector &updates = {}); - void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks); + void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks); TaskDAG runDAG(DAGRunID runID, executors::task::TaskExecutor &executor, diff --git a/daggy/include/daggy/executors/task/NoopTaskExecutor.hpp b/daggy/include/daggy/executors/task/NoopTaskExecutor.hpp new file mode 100644 index 0000000..4455dfd --- /dev/null +++ b/daggy/include/daggy/executors/task/NoopTaskExecutor.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include "TaskExecutor.hpp" + +namespace daggy::executors::task { + class NoopTaskExecutor : public TaskExecutor { + public: + using Command = std::vector; + + // Validates the job to ensure that all required values are set and are of the right type, + bool validateTaskParameters(const ConfigValues &job) override; + + std::vector + expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; + + // Runs the task + std::future execute(const std::string &taskName, const Task &task) override; + }; +} + diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index c80f435..5078bba 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -152,6 +152,15 @@ namespace daggy { const auto &taskName = it->name.GetString(); tasks.emplace(taskName, taskFromJSON(taskName, it->value, jobDefaults)); } + + // Normalize tasks so all the children are populated + for (auto &[k, v] : tasks) { + for (const auto & p : v.parents) { + tasks[p].children.insert(k); + } + v.parents.clear(); + } + return tasks; } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index c0a680f..be4bddd 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -78,28 +78,19 @@ namespace daggy { } - void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks) { - // Add all the vertices - std::unordered_map> definedSets; + void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks) { + // Add the missing vertices for (const auto &[name, task]: tasks) { dag.addVertex(name, task); - definedSets[task.definedName].insert(name); } // Add edges for (const auto &[name, task]: tasks) { - for (const auto &defChild: task.children) { - for (const auto &child: definedSets[defChild]) { - dag.addEdge(name, child); - } - } + dag.addEdgeIf(name, [&](const auto &v) { return task.children.count(v.data.definedName) > 0; }); + } - for (const auto &defParent: task.parents) { - for (const auto &parent: definedSets[defParent]) { - dag.addEdge(parent, name); - tasks.at(parent).children.insert(name); - } - } + if (! dag.isValid()) { + throw std::runtime_error("DAG contains a cycle"); } } @@ -107,8 +98,6 @@ namespace daggy { const std::vector &updates) { TaskDAG dag; updateDAGFromTasks(dag, tasks); - dag.reset(); - dag.isValid(); // Replay any updates for (const auto &update: updates) { @@ -120,6 +109,9 @@ namespace daggy { dag.setVertexState(update.taskName, RunState::RUNNING); dag.setVertexState(update.taskName, RunState::COMPLETED); break; + case RunState::COMPLETED: + case RunState::QUEUED: + break; } } @@ -145,10 +137,10 @@ namespace daggy { if (fut.valid()) { auto attempt = fut.get(); logger.logTaskAttempt(runID, taskName, attempt); + auto &vert = dag.getVertex(taskName); + auto &task = vert.data; if (attempt.rc == 0) { logger.updateTaskState(runID, taskName, RunState::COMPLETED); - auto &vert = dag.getVertex(taskName); - auto &task = vert.data; if (task.isGenerator) { // Parse the output and update the DAGs try { @@ -177,7 +169,6 @@ namespace daggy { --running; } else { // RC isn't 0 - const auto & task = dag.getVertex(taskName).data; if (taskAttemptCounts[taskName] <= task.maxRetries) { logger.updateTaskState(runID, taskName, RunState::RETRY); runningTasks[taskName] = executor.execute(taskName, task); @@ -198,6 +189,7 @@ namespace daggy { auto &task = t.value().second; taskAttemptCounts[taskName] = 1; + logger.updateTaskState(runID, taskName, RunState::RUNNING); runningTasks.emplace(taskName, executor.execute(taskName, task)); ++running; diff --git a/daggy/src/executors/task/CMakeLists.txt b/daggy/src/executors/task/CMakeLists.txt index 061c9c9..5fda838 100644 --- a/daggy/src/executors/task/CMakeLists.txt +++ b/daggy/src/executors/task/CMakeLists.txt @@ -1,4 +1,5 @@ target_sources(${PROJECT_NAME} PRIVATE ForkingTaskExecutor.cpp SlurmTaskExecutor.cpp + NoopTaskExecutor.cpp ) diff --git a/daggy/src/executors/task/NoopTaskExecutor.cpp b/daggy/src/executors/task/NoopTaskExecutor.cpp new file mode 100644 index 0000000..c56f0ee --- /dev/null +++ b/daggy/src/executors/task/NoopTaskExecutor.cpp @@ -0,0 +1,42 @@ +#include +#include + +namespace daggy::executors::task { + std::future + NoopTaskExecutor::execute(const std::string &taskName, const Task &task) { + std::promise promise; + auto ts = Clock::now(); + promise.set_value(AttemptRecord{ + .startTime = ts, + .stopTime = ts, + .rc = 0, + .executorLog = taskName, + .outputLog = taskName, + .errorLog = taskName + }); + return promise.get_future(); + } + + bool NoopTaskExecutor::validateTaskParameters(const ConfigValues &job) { + auto it = job.find("command"); + if (it == job.end()) + throw std::runtime_error(R"(job does not have a "command" argument)"); + if (!std::holds_alternative(it->second)) + throw std::runtime_error(R"(taskParameter's "command" must be an array of strings)"); + return true; + } + + std::vector + NoopTaskExecutor::expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) { + std::vector newValues; + + const auto command = std::get(job.at("command")); + for (const auto &expandedCommand: interpolateValues(command, expansionValues)) { + ConfigValues newCommand{job}; + newCommand.at("command") = expandedCommand; + newValues.emplace_back(newCommand); + } + + return newValues; + } +} diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index 6a3c9df..ed29bd9 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -11,7 +11,7 @@ TEST_CASE("dag_construction", "[dag]") { REQUIRE(dag.empty()); REQUIRE_NOTHROW(dag.addVertex(0, 0)); - for (int i = 1; i < 10; ++i) { + for (size_t i = 1; i < 10; ++i) { dag.addVertex(i, i); REQUIRE(dag.hasVertex(i)); REQUIRE(dag.getVertex(i).data == i); @@ -23,7 +23,7 @@ TEST_CASE("dag_construction", "[dag]") { // Cannot add an edge that would result in a cycle dag.addEdge(9, 5); - REQUIRE_THROWS(dag.isValid()); + REQUIRE(!dag.isValid()); // Bounds checking SECTION("addEdge Bounds Checking") { diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index ff70a68..09de8b6 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -142,19 +142,24 @@ TEST_CASE("rest_endpoint", "[server_basic]") { REQUIRE(doc.IsObject()); REQUIRE(doc.HasMember("taskStates")); - const auto &taskStates = doc["taskStates"].GetArray(); - REQUIRE(taskStates.Size() == 3); + const auto &taskStates = doc["taskStates"].GetObject(); + + size_t nStates = 0; + for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); ++it) { + nStates++; + } + REQUIRE(nStates == 3); complete = true; - for (size_t i = 0; i < taskStates.Size(); ++i) { - std::string state = taskStates[i].GetString(); + for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); ++it) { + std::string state = it->value.GetString(); if (state != "COMPLETED") { complete = false; break; } } if (complete) break; - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::seconds(1)); } REQUIRE(complete); diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 64944f5..8b0009f 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -12,6 +13,7 @@ #include "daggy/Utilities.hpp" #include "daggy/Serialization.hpp" #include "daggy/executors/task/ForkingTaskExecutor.hpp" +#include "daggy/executors/task/NoopTaskExecutor.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" namespace fs = std::filesystem; @@ -56,6 +58,56 @@ TEST_CASE("string_expansion", "[utilities_parameter_expansion]") { } } +TEST_CASE("dag_runner_order", "[dagrun_order]") { + daggy::executors::task::NoopTaskExecutor ex; + std::stringstream ss; + daggy::loggers::dag_run::OStreamLogger logger(ss); + + daggy::TimePoint startTime = daggy::Clock::now(); + + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"}; + auto params = daggy::configFromJSON(testParams); + + std::string taskJSON = R"({ + "A": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "B","D" ]}, + "B": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "C","D","E" ]}, + "C": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "D"]}, + "D": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "E"]}, + "E": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}} + })"; + + auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex, params); + + REQUIRE(tasks.size() == 20); + + auto dag = daggy::buildDAGFromTasks(tasks); + auto runID = logger.startDAGRun("test_run", tasks); + auto endDAG = daggy::runDAG(runID, ex, logger, dag); + + REQUIRE(endDAG.allVisited()); + + // Ensure the run order + auto rec = logger.getDAGRun(runID); + + daggy::TimePoint stopTime = daggy::Clock::now(); + std::array minTimes; minTimes.fill(startTime); + std::array maxTimes; maxTimes.fill(stopTime); + + for (const auto &[k, v] : rec.taskAttempts) { + size_t idx = k[0] - 65; + auto & startTime = minTimes[idx]; + auto & stopTime = maxTimes[idx]; + startTime = std::max(startTime, v.front().startTime); + stopTime = std::min(stopTime, v.back().stopTime); + } + + for (size_t i = 0; i < 5; ++i) { + for (size_t j = i+1; j < 4; ++j) { + REQUIRE(maxTimes[i] < minTimes[j]); + } + } +} + TEST_CASE("dag_runner", "[utilities_dag_runner]") { daggy::executors::task::ForkingTaskExecutor ex(10); std::stringstream ss; @@ -181,55 +233,3 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") { REQUIRE(record.tasks["C_0"].children.empty()); } } - -TEST_CASE("dag_runner_stress", "[utilities_dag_runner_stress]") { - daggy::executors::task::ForkingTaskExecutor ex(10); - std::stringstream ss; - daggy::loggers::dag_run::OStreamLogger logger(ss); - - - SECTION("Stress-test") { - static std::random_device dev; - static std::mt19937 rng(dev()); - std::uniform_int_distribution nDepDist(0, 10); - - const size_t N_NODES = 100; - daggy::TaskSet tasks; - std::vector fileNames; - std::vector taskNames; - - for (size_t i = 0; i < N_NODES; ++i) { - std::string taskName = std::to_string(i); - std::uniform_int_distribution depDist(i+1, N_NODES-1); - std::unordered_set deps; - size_t nChildren = nDepDist(rng); - for (size_t c = 0; c < nChildren; ++c) { - deps.insert(std::to_string(depDist(rng))); - } - tasks.emplace(taskName, daggy::Task{ - .definedName = taskName, - .job = { { "command", std::vector{"/usr/bin/echo", taskName}}}, - .children = deps - }); - } - - auto dag = daggy::buildDAGFromTasks(tasks); - - /** - - auto runID = logger.startDAGRun("test_run", tasks); - - auto tryDAG = daggy::runDAG(runID, ex, logger, dag); - - REQUIRE(tryDAG.allVisited()); - - // Get the DAG Run Attempts - auto record = logger.getDAGRun(runID); - for (const auto & [k, attempts] : record.taskAttempts) { - REQUIRE(attempts.size() == 1); - } - */ - } - - -} diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index 7e67b15..d2d6fb3 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -95,7 +95,8 @@ void daemonize() { /* Change the working directory to the root directory */ /* or another appropriated directory */ - chdir("/"); + auto rc = chdir("/"); + (void)rc; /* Close all open file descriptors */ for (auto x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { close(x); }