diff --git a/CMakeLists.txt b/CMakeLists.txt index 59c282a..c7a4ed3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ project(overall) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_EXPORT_COMPILE_COMMANDS True) +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror") set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party) diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index 18b3e5c..b43886b 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "Defines.hpp" @@ -35,7 +36,7 @@ namespace daggy { // Vertices void addVertex(K id, V data); - const std::vector> &getVertices(); + std::unordered_set getVertices() const; // Edges void addEdge(const K &src, const K &dst); @@ -75,8 +76,6 @@ namespace daggy { private: std::unordered_map> vertices_; - - std::optional findCycle_(const K & node, std::unordered_set & seen) const; }; } diff --git a/daggy/include/daggy/DAG.impl.hxx b/daggy/include/daggy/DAG.impl.hxx index 7237e97..527ac4b 100644 --- a/daggy/include/daggy/DAG.impl.hxx +++ b/daggy/include/daggy/DAG.impl.hxx @@ -11,6 +11,15 @@ namespace daggy { template Vertex &DAG::getVertex(const K &id) { return vertices_.at(id); } + template + std::unordered_set DAG::getVertices() const { + std::unordered_set keys; + for (const auto it : vertices_) { + keys.insert(it.first); + } + return keys; + } + template void DAG::addVertex(K id, V data) { if (vertices_.count(id) != 0) { @@ -31,42 +40,37 @@ namespace daggy { template void DAG::addEdgeIf(const K &src, std::function &v)> predicate) { - for (const auto &[name, vertex]: vertices_) { + auto & parent = vertices_.at(src); + for (auto &[name, vertex]: vertices_) { + if (! predicate(vertex)) continue; if (name == src) continue; - if (predicate(vertex)) addEdge(src, name); + parent.children.insert(name); + vertex.depCount++; } } - template - std::optional DAG::findCycle_(const K & node, std::unordered_set & seen) const { - if (seen.count(node) > 0) return node; - seen.insert(node); - std::optional ret; - for (const auto & child : vertices_.at(node).children) { - auto res = findCycle_(child, seen); - if (res.has_value()) { - ret.swap(res); - break; - } - } - seen.extract(node); - return ret; - } - template bool DAG::isValid() const { - std::unordered_set seen; + std::unordered_map depCounts; + std::queue ready; + size_t processed = 0; + for (const auto & [k, v] : vertices_) { - seen.clear(); - if (v.depCount != 0) continue; - auto res = findCycle_(k, seen); - if (res.has_value()) { - std::stringstream ss; - ss << "DAG contains a cycle between " << k << " and " << res.value() << std::endl; - throw std::runtime_error(ss.str()); - } + depCounts[k] = v.depCount; + if (v.depCount == 0) ready.push(k); } - return true; + + while (! ready.empty()) { + const auto & k = ready.front(); + for (const auto & child : vertices_.at(k).children) { + auto dc = --depCounts[child]; + if (dc == 0) ready.push(child); + } + processed++; + ready.pop(); + } + + return processed == vertices_.size(); } template diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index 8d6587e..ba7fb4a 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -16,7 +16,7 @@ namespace daggy { using Command = std::vector; // Time - using Clock = std::chrono::system_clock; + using Clock = std::chrono::high_resolution_clock; using TimePoint = std::chrono::time_point; // DAG Runs diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 22b8d4f..7e70626 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -29,7 +29,7 @@ namespace daggy { buildDAGFromTasks(TaskSet &tasks, const std::vector &updates = {}); - void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks); + void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks); TaskDAG runDAG(DAGRunID runID, executors::task::TaskExecutor &executor, diff --git a/daggy/include/daggy/executors/task/NoopTaskExecutor.hpp b/daggy/include/daggy/executors/task/NoopTaskExecutor.hpp new file mode 100644 index 0000000..4455dfd --- /dev/null +++ b/daggy/include/daggy/executors/task/NoopTaskExecutor.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include "TaskExecutor.hpp" + +namespace daggy::executors::task { + class NoopTaskExecutor : public TaskExecutor { + public: + using Command = std::vector; + + // Validates the job to ensure that all required values are set and are of the right type, + bool validateTaskParameters(const ConfigValues &job) override; + + std::vector + expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; + + // Runs the task + std::future execute(const std::string &taskName, const Task &task) override; + }; +} + diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index c80f435..5078bba 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -152,6 +152,15 @@ namespace daggy { const auto &taskName = it->name.GetString(); tasks.emplace(taskName, taskFromJSON(taskName, it->value, jobDefaults)); } + + // Normalize tasks so all the children are populated + for (auto &[k, v] : tasks) { + for (const auto & p : v.parents) { + tasks[p].children.insert(k); + } + v.parents.clear(); + } + return tasks; } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index c0a680f..be4bddd 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -78,28 +78,19 @@ namespace daggy { } - void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks) { - // Add all the vertices - std::unordered_map> definedSets; + void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks) { + // Add the missing vertices for (const auto &[name, task]: tasks) { dag.addVertex(name, task); - definedSets[task.definedName].insert(name); } // Add edges for (const auto &[name, task]: tasks) { - for (const auto &defChild: task.children) { - for (const auto &child: definedSets[defChild]) { - dag.addEdge(name, child); - } - } + dag.addEdgeIf(name, [&](const auto &v) { return task.children.count(v.data.definedName) > 0; }); + } - for (const auto &defParent: task.parents) { - for (const auto &parent: definedSets[defParent]) { - dag.addEdge(parent, name); - tasks.at(parent).children.insert(name); - } - } + if (! dag.isValid()) { + throw std::runtime_error("DAG contains a cycle"); } } @@ -107,8 +98,6 @@ namespace daggy { const std::vector &updates) { TaskDAG dag; updateDAGFromTasks(dag, tasks); - dag.reset(); - dag.isValid(); // Replay any updates for (const auto &update: updates) { @@ -120,6 +109,9 @@ namespace daggy { dag.setVertexState(update.taskName, RunState::RUNNING); dag.setVertexState(update.taskName, RunState::COMPLETED); break; + case RunState::COMPLETED: + case RunState::QUEUED: + break; } } @@ -145,10 +137,10 @@ namespace daggy { if (fut.valid()) { auto attempt = fut.get(); logger.logTaskAttempt(runID, taskName, attempt); + auto &vert = dag.getVertex(taskName); + auto &task = vert.data; if (attempt.rc == 0) { logger.updateTaskState(runID, taskName, RunState::COMPLETED); - auto &vert = dag.getVertex(taskName); - auto &task = vert.data; if (task.isGenerator) { // Parse the output and update the DAGs try { @@ -177,7 +169,6 @@ namespace daggy { --running; } else { // RC isn't 0 - const auto & task = dag.getVertex(taskName).data; if (taskAttemptCounts[taskName] <= task.maxRetries) { logger.updateTaskState(runID, taskName, RunState::RETRY); runningTasks[taskName] = executor.execute(taskName, task); @@ -198,6 +189,7 @@ namespace daggy { auto &task = t.value().second; taskAttemptCounts[taskName] = 1; + logger.updateTaskState(runID, taskName, RunState::RUNNING); runningTasks.emplace(taskName, executor.execute(taskName, task)); ++running; diff --git a/daggy/src/executors/task/CMakeLists.txt b/daggy/src/executors/task/CMakeLists.txt index 061c9c9..5fda838 100644 --- a/daggy/src/executors/task/CMakeLists.txt +++ b/daggy/src/executors/task/CMakeLists.txt @@ -1,4 +1,5 @@ target_sources(${PROJECT_NAME} PRIVATE ForkingTaskExecutor.cpp SlurmTaskExecutor.cpp + NoopTaskExecutor.cpp ) diff --git a/daggy/src/executors/task/NoopTaskExecutor.cpp b/daggy/src/executors/task/NoopTaskExecutor.cpp new file mode 100644 index 0000000..c56f0ee --- /dev/null +++ b/daggy/src/executors/task/NoopTaskExecutor.cpp @@ -0,0 +1,42 @@ +#include +#include + +namespace daggy::executors::task { + std::future + NoopTaskExecutor::execute(const std::string &taskName, const Task &task) { + std::promise promise; + auto ts = Clock::now(); + promise.set_value(AttemptRecord{ + .startTime = ts, + .stopTime = ts, + .rc = 0, + .executorLog = taskName, + .outputLog = taskName, + .errorLog = taskName + }); + return promise.get_future(); + } + + bool NoopTaskExecutor::validateTaskParameters(const ConfigValues &job) { + auto it = job.find("command"); + if (it == job.end()) + throw std::runtime_error(R"(job does not have a "command" argument)"); + if (!std::holds_alternative(it->second)) + throw std::runtime_error(R"(taskParameter's "command" must be an array of strings)"); + return true; + } + + std::vector + NoopTaskExecutor::expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) { + std::vector newValues; + + const auto command = std::get(job.at("command")); + for (const auto &expandedCommand: interpolateValues(command, expansionValues)) { + ConfigValues newCommand{job}; + newCommand.at("command") = expandedCommand; + newValues.emplace_back(newCommand); + } + + return newValues; + } +} diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index 6a3c9df..ed29bd9 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -11,7 +11,7 @@ TEST_CASE("dag_construction", "[dag]") { REQUIRE(dag.empty()); REQUIRE_NOTHROW(dag.addVertex(0, 0)); - for (int i = 1; i < 10; ++i) { + for (size_t i = 1; i < 10; ++i) { dag.addVertex(i, i); REQUIRE(dag.hasVertex(i)); REQUIRE(dag.getVertex(i).data == i); @@ -23,7 +23,7 @@ TEST_CASE("dag_construction", "[dag]") { // Cannot add an edge that would result in a cycle dag.addEdge(9, 5); - REQUIRE_THROWS(dag.isValid()); + REQUIRE(!dag.isValid()); // Bounds checking SECTION("addEdge Bounds Checking") { diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index ff70a68..09de8b6 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -142,19 +142,24 @@ TEST_CASE("rest_endpoint", "[server_basic]") { REQUIRE(doc.IsObject()); REQUIRE(doc.HasMember("taskStates")); - const auto &taskStates = doc["taskStates"].GetArray(); - REQUIRE(taskStates.Size() == 3); + const auto &taskStates = doc["taskStates"].GetObject(); + + size_t nStates = 0; + for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); ++it) { + nStates++; + } + REQUIRE(nStates == 3); complete = true; - for (size_t i = 0; i < taskStates.Size(); ++i) { - std::string state = taskStates[i].GetString(); + for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); ++it) { + std::string state = it->value.GetString(); if (state != "COMPLETED") { complete = false; break; } } if (complete) break; - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::seconds(1)); } REQUIRE(complete); diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 64944f5..8b0009f 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -12,6 +13,7 @@ #include "daggy/Utilities.hpp" #include "daggy/Serialization.hpp" #include "daggy/executors/task/ForkingTaskExecutor.hpp" +#include "daggy/executors/task/NoopTaskExecutor.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" namespace fs = std::filesystem; @@ -56,6 +58,56 @@ TEST_CASE("string_expansion", "[utilities_parameter_expansion]") { } } +TEST_CASE("dag_runner_order", "[dagrun_order]") { + daggy::executors::task::NoopTaskExecutor ex; + std::stringstream ss; + daggy::loggers::dag_run::OStreamLogger logger(ss); + + daggy::TimePoint startTime = daggy::Clock::now(); + + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"}; + auto params = daggy::configFromJSON(testParams); + + std::string taskJSON = R"({ + "A": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "B","D" ]}, + "B": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "C","D","E" ]}, + "C": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "D"]}, + "D": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "E"]}, + "E": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}} + })"; + + auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex, params); + + REQUIRE(tasks.size() == 20); + + auto dag = daggy::buildDAGFromTasks(tasks); + auto runID = logger.startDAGRun("test_run", tasks); + auto endDAG = daggy::runDAG(runID, ex, logger, dag); + + REQUIRE(endDAG.allVisited()); + + // Ensure the run order + auto rec = logger.getDAGRun(runID); + + daggy::TimePoint stopTime = daggy::Clock::now(); + std::array minTimes; minTimes.fill(startTime); + std::array maxTimes; maxTimes.fill(stopTime); + + for (const auto &[k, v] : rec.taskAttempts) { + size_t idx = k[0] - 65; + auto & startTime = minTimes[idx]; + auto & stopTime = maxTimes[idx]; + startTime = std::max(startTime, v.front().startTime); + stopTime = std::min(stopTime, v.back().stopTime); + } + + for (size_t i = 0; i < 5; ++i) { + for (size_t j = i+1; j < 4; ++j) { + REQUIRE(maxTimes[i] < minTimes[j]); + } + } +} + TEST_CASE("dag_runner", "[utilities_dag_runner]") { daggy::executors::task::ForkingTaskExecutor ex(10); std::stringstream ss; @@ -181,55 +233,3 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") { REQUIRE(record.tasks["C_0"].children.empty()); } } - -TEST_CASE("dag_runner_stress", "[utilities_dag_runner_stress]") { - daggy::executors::task::ForkingTaskExecutor ex(10); - std::stringstream ss; - daggy::loggers::dag_run::OStreamLogger logger(ss); - - - SECTION("Stress-test") { - static std::random_device dev; - static std::mt19937 rng(dev()); - std::uniform_int_distribution nDepDist(0, 10); - - const size_t N_NODES = 100; - daggy::TaskSet tasks; - std::vector fileNames; - std::vector taskNames; - - for (size_t i = 0; i < N_NODES; ++i) { - std::string taskName = std::to_string(i); - std::uniform_int_distribution depDist(i+1, N_NODES-1); - std::unordered_set deps; - size_t nChildren = nDepDist(rng); - for (size_t c = 0; c < nChildren; ++c) { - deps.insert(std::to_string(depDist(rng))); - } - tasks.emplace(taskName, daggy::Task{ - .definedName = taskName, - .job = { { "command", std::vector{"/usr/bin/echo", taskName}}}, - .children = deps - }); - } - - auto dag = daggy::buildDAGFromTasks(tasks); - - /** - - auto runID = logger.startDAGRun("test_run", tasks); - - auto tryDAG = daggy::runDAG(runID, ex, logger, dag); - - REQUIRE(tryDAG.allVisited()); - - // Get the DAG Run Attempts - auto record = logger.getDAGRun(runID); - for (const auto & [k, attempts] : record.taskAttempts) { - REQUIRE(attempts.size() == 1); - } - */ - } - - -} diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index 7e67b15..d2d6fb3 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -95,7 +95,8 @@ void daemonize() { /* Change the working directory to the root directory */ /* or another appropriated directory */ - chdir("/"); + auto rc = chdir("/"); + (void)rc; /* Close all open file descriptors */ for (auto x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { close(x); }