diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index beace9b..365b303 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -20,13 +20,13 @@ namespace daggy { - template + template struct Vertex { RunState state = RunState::QUEUED; uint32_t depCount = 0; - V data; - std::unordered_set children; + T data; + std::unordered_set children; }; template @@ -42,7 +42,7 @@ namespace daggy { void addEdge(const K &from, const K &to); void addEdgeIf(const K &src, - std::function &v)> predicate); + std::function &v)> predicate); [[nodiscard]] bool isValid() const; @@ -61,19 +61,21 @@ namespace daggy { void setVertexState(const K &id, RunState state); - void forEach( - std::function> &)> fun) const; + void forEach(std::function &)> fun) const; [[nodiscard]] bool allVisited() const; std::optional> visitNext(); - Vertex &getVertex(const K &id); + // WARNING: reference potentially invalidated on insertions. + Vertex &getVertex(const K &id); void completeVisit(const K &id); private: - std::unordered_map> vertices_; + std::unordered_map keyMap_; + std::vector vertexName_; + std::vector> vertices_; }; } // namespace daggy diff --git a/daggy/include/daggy/DAG.impl.hxx b/daggy/include/daggy/DAG.impl.hxx index f347143..2a9e33c 100644 --- a/daggy/include/daggy/DAG.impl.hxx +++ b/daggy/include/daggy/DAG.impl.hxx @@ -14,20 +14,20 @@ namespace daggy { template bool DAG::hasVertex(const K &id) { - return vertices_.count(id) != 0; + return keyMap_.count(id) != 0; } template - Vertex &DAG::getVertex(const K &id) + Vertex &DAG::getVertex(const K &id) { - return vertices_.at(id); + return vertices_[keyMap_.at(id)]; } template std::unordered_set DAG::getVertices() const { std::unordered_set keys; - for (const auto it : vertices_) { + for (const auto it : keyMap_) { keys.insert(it.first); } return keys; @@ -36,58 +36,59 @@ namespace daggy { template void DAG::addVertex(K id, V data) { - if (vertices_.count(id) != 0) { + if (keyMap_.count(id) != 0) { std::stringstream ss; ss << "A vertex with ID " << id << " already exists in the DAG"; throw std::runtime_error(ss.str()); } - vertices_.emplace( - id, - Vertex{.state = RunState::QUEUED, .depCount = 0, .data = data}); + size_t idx = vertices_.size(); + vertexName_.emplace_back(id); + vertices_.emplace_back( + Vertex{.state = RunState::QUEUED, .depCount = 0, .data = data}); + keyMap_.emplace(id, idx); } template void DAG::addEdge(const K &from, const K &to) { - if (vertices_.find(from) == vertices_.end()) - throw std::runtime_error("No such vertex"); - if (vertices_.find(to) == vertices_.end()) - throw std::runtime_error("No such vertex"); - vertices_.at(from).children.insert(to); - vertices_.at(to).depCount++; + size_t src = keyMap_.at(from); + size_t dst = keyMap_.at(to); + vertices_[src].children.insert(dst); + vertices_[dst].depCount++; } template - void DAG::addEdgeIf( - const K &src, std::function &v)> predicate) + void DAG::addEdgeIf(const K &src, + std::function &v)> predicate) { - auto &parent = vertices_.at(src); - for (auto &[name, vertex] : vertices_) { - if (!predicate(vertex)) + size_t parentIdx = keyMap_.at(src); + auto &parent = vertices_[parentIdx]; + for (size_t i = 0; i < vertices_.size(); ++i) { + if (!predicate(vertices_[i])) continue; - if (name == src) + if (i == parentIdx) continue; - parent.children.insert(name); - vertex.depCount++; + parent.children.insert(i); + vertices_[i].depCount++; } } template bool DAG::isValid() const { - std::unordered_map depCounts; - std::queue ready; + std::vector depCounts(vertices_.size(), 0); + std::queue ready; size_t processed = 0; - for (const auto &[k, v] : vertices_) { - depCounts[k] = v.depCount; - if (v.depCount == 0) - ready.push(k); + for (size_t i = 0; i < vertices_.size(); ++i) { + depCounts[i] = vertices_[i].depCount; + if (depCounts[i] == 0) + ready.push(i); } while (!ready.empty()) { const auto &k = ready.front(); - for (const auto &child : vertices_.at(k).children) { + for (const auto &child : vertices_[k].children) { auto dc = --depCounts[child]; if (dc == 0) ready.push(child); @@ -103,15 +104,15 @@ namespace daggy { void DAG::reset() { // Reset the state of all vertices - for (auto &[_, v] : vertices_) { + for (auto &v : vertices_) { v.state = RunState::QUEUED; v.depCount = 0; } // Calculate the upstream count - for (auto &[_, v] : vertices_) { + for (auto &v : vertices_) { for (auto c : v.children) { - vertices_.at(c).depCount++; + vertices_[c].depCount++; } } } @@ -119,7 +120,7 @@ namespace daggy { template void DAG::resetRunning() { - for (auto &[k, v] : vertices_) { + for (auto &v : vertices_) { if (v.state != +RunState::RUNNING) continue; v.state = RunState::QUEUED; @@ -129,29 +130,28 @@ namespace daggy { template void DAG::setVertexState(const K &id, RunState state) { - vertices_.at(id).state = state; + vertices_[keyMap_.at(id)].state = state; } template bool DAG::allVisited() const { - for (const auto &[_, v] : vertices_) { - if (v.state != +RunState::COMPLETED) - return false; - } - return true; + return not std::any_of( + vertices_.begin(), vertices_.end(), + [](const auto &v) { return v.state != +RunState::COMPLETED; }); } template std::optional> DAG::visitNext() { - for (auto &[k, v] : vertices_) { + for (size_t i = 0; i < vertices_.size(); ++i) { + auto &v = vertices_[i]; if (v.state != +RunState::QUEUED) continue; if (v.depCount != 0) continue; v.state = RunState::RUNNING; - return std::make_pair(k, v.data); + return std::make_pair(vertexName_[i], v.data); } return {}; } @@ -159,16 +159,15 @@ namespace daggy { template void DAG::completeVisit(const K &id) { - auto &v = vertices_.at(id); + auto &v = vertices_[keyMap_.at(id)]; v.state = RunState::COMPLETED; for (auto c : v.children) { - --vertices_.at(c).depCount; + --vertices_[c].depCount; } } template - void DAG::forEach( - std::function> &)> fun) const + void DAG::forEach(std::function &)> fun) const { for (auto it = vertices_.begin(); it != vertices_.end(); ++it) { fun(*it); diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 12c6b0d..29ecfff 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -146,22 +146,36 @@ namespace daggy { if (fut.valid()) { auto attempt = fut.get(); logger.logTaskAttempt(runID, taskName, attempt); - auto &vert = dag.getVertex(taskName); + + // Not a reference, since adding tasks will invalidate references + auto vert = dag.getVertex(taskName); auto &task = vert.data; if (attempt.rc == 0) { logger.updateTaskState(runID, taskName, RunState::COMPLETED); if (task.isGenerator) { // Parse the output and update the DAGs try { - auto newTasks = expandTaskSet(tasksFromJSON(attempt.outputLog), - executor, parameters); + auto parsedTasks = tasksFromJSON(attempt.outputLog); + auto newTasks = + expandTaskSet(parsedTasks, executor, parameters); updateDAGFromTasks(dag, newTasks); + // Add in dependencies from current task to new tasks for (const auto &[ntName, ntTask] : newTasks) { logger.addTask(runID, ntName, ntTask); - dag.addEdge(taskName, ntName); task.children.insert(ntName); } + + // Efficiently add new edges from generator task + // to children + std::unordered_set baseNames; + for (const auto &[k, v] : parsedTasks) { + baseNames.insert(v.definedName); + } + dag.addEdgeIf(taskName, [&](const auto &v) { + return baseNames.count(v.data.definedName) > 0; + }); + logger.updateTask(runID, taskName, task); } catch (std::exception &e) { diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index c0eb1d1..61764a7 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -63,7 +63,9 @@ TEST_CASE("dag_traversal", "[dag]") std::vector visitOrder(N_VERTICES); size_t i = 0; while (!dag.allVisited()) { - const auto v = dag.visitNext().value(); + auto o = dag.visitNext(); + REQUIRE(o.has_value()); + const auto v = o.value(); dag.completeVisit(v.first); visitOrder[v.first] = i; ++i; diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 1471f27..1c6a6f0 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -150,6 +150,13 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") REQUIRE(attempts.front().rc == 0); } } +} + +TEST_CASE("runDAG_recovery", "[runDAG]") +{ + daggy::executors::task::ForkingTaskExecutor ex(10); + std::stringstream ss; + daggy::loggers::dag_run::OStreamLogger logger(ss); SECTION("Recovery from Error") { @@ -198,6 +205,13 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") cleanup(); } +} + +TEST_CASE("runDAG_generator", "[runDAG_generator]") +{ + daggy::executors::task::ForkingTaskExecutor ex(10); + std::stringstream ss; + daggy::loggers::dag_run::OStreamLogger logger(ss); SECTION("Generator tasks") { @@ -211,8 +225,8 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") ofh << generatorOutput << std::endl; ofh.close(); + daggy::TimePoint globalStartTime = daggy::Clock::now(); std::stringstream jsonTasks; - jsonTasks << R"({ "A": { "job": {"command": [ "/usr/bin/cat", )" << std::quoted(ofn.string()) @@ -221,8 +235,10 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") auto baseTasks = daggy::tasksFromJSON(jsonTasks.str()); REQUIRE(baseTasks.size() == 2); + REQUIRE(baseTasks["A"].children == std::unordered_set{"C"}); auto tasks = daggy::expandTaskSet(baseTasks, ex, params); REQUIRE(tasks.size() == 2); + REQUIRE(tasks["A_0"].children == std::unordered_set{"C"}); auto dag = daggy::buildDAGFromTasks(tasks); REQUIRE(dag.size() == 2); @@ -250,5 +266,27 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") REQUIRE(record.tasks["B_1"].children == std::unordered_set{"C"}); REQUIRE(record.tasks["C_0"].children.empty()); + + // Ensure they were run in the right order + // All A's get run before B's, which run before C's + daggy::TimePoint globalStopTime = daggy::Clock::now(); + std::array minTimes; + minTimes.fill(globalStartTime); + std::array maxTimes; + maxTimes.fill(globalStopTime); + + for (const auto &[k, v] : record.taskAttempts) { + size_t idx = k[0] - 65; + auto &startTime = minTimes[idx]; + auto &stopTime = maxTimes[idx]; + startTime = std::max(startTime, v.front().startTime); + stopTime = std::min(stopTime, v.back().stopTime); + } + + for (size_t i = 0; i < 3; ++i) { + for (size_t j = i + 1; j < 2; ++j) { + REQUIRE(maxTimes[i] < minTimes[j]); + } + } } }