Enhancing the performance of dag building by delaying validation until the end
This commit is contained in:
@@ -42,7 +42,7 @@ namespace daggy {
|
||||
|
||||
void addEdgeIf(const K &src, std::function<bool(const Vertex<K, V> &v)> predicate);
|
||||
|
||||
bool hasPath(const K &from, const K &to) const;
|
||||
bool isValid() const;
|
||||
|
||||
bool hasVertex(const K &from);
|
||||
|
||||
@@ -76,7 +76,9 @@ namespace daggy {
|
||||
private:
|
||||
std::unordered_map<K, Vertex<K, V>> vertices_;
|
||||
std::unordered_set<K> readyVertices_;
|
||||
|
||||
std::optional<K> findCycle_(const K & node, std::unordered_set<K> & seen) const;
|
||||
};
|
||||
}
|
||||
|
||||
#include "DAG.impl.hxx"
|
||||
#include "DAG.impl.hxx"
|
||||
|
||||
@@ -25,8 +25,6 @@ namespace daggy {
|
||||
void DAG<K, V>::addEdge(const K &from, const K &to) {
|
||||
if (vertices_.find(from) == vertices_.end()) throw std::runtime_error("No such vertex");
|
||||
if (vertices_.find(to) == vertices_.end()) throw std::runtime_error("No such vertex");
|
||||
if (hasPath(to, from))
|
||||
throw std::runtime_error("Adding edge would result in a cycle");
|
||||
vertices_.at(from).children.insert(to);
|
||||
vertices_.at(to).depCount++;
|
||||
}
|
||||
@@ -40,15 +38,35 @@ namespace daggy {
|
||||
}
|
||||
|
||||
template<typename K, typename V>
|
||||
bool DAG<K, V>::hasPath(const K &from, const K &to) const {
|
||||
if (vertices_.find(from) == vertices_.end()) throw std::runtime_error("No such vertex");
|
||||
if (vertices_.find(to) == vertices_.end()) throw std::runtime_error("No such vertex");
|
||||
for (const auto &child: vertices_.at(from).children) {
|
||||
if (child == to) return true;
|
||||
if (hasPath(child, to)) return true;
|
||||
std::optional<K> DAG<K, V>::findCycle_(const K & node, std::unordered_set<K> & seen) const {
|
||||
if (seen.count(node) > 0) return node;
|
||||
seen.insert(node);
|
||||
std::optional<K> ret;
|
||||
for (const auto & child : vertices_.at(node).children) {
|
||||
auto res = findCycle_(child, seen);
|
||||
if (res.has_value()) {
|
||||
ret.swap(res);
|
||||
break;
|
||||
}
|
||||
}
|
||||
seen.extract(node);
|
||||
return ret;
|
||||
}
|
||||
|
||||
return false;
|
||||
template<typename K, typename V>
|
||||
bool DAG<K, V>::isValid() const {
|
||||
std::unordered_set<K> seen;
|
||||
for (const auto & [k, v] : vertices_) {
|
||||
seen.clear();
|
||||
if (v.depCount != 0) continue;
|
||||
auto res = findCycle_(k, seen);
|
||||
if (res.has_value()) {
|
||||
std::stringstream ss;
|
||||
ss << "DAG contains a cycle between " << k << " and " << res.value() << std::endl;
|
||||
throw std::runtime_error(ss.str());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
template<typename K, typename V>
|
||||
|
||||
@@ -108,6 +108,7 @@ namespace daggy {
|
||||
TaskDAG dag;
|
||||
updateDAGFromTasks(dag, tasks);
|
||||
dag.reset();
|
||||
dag.isValid();
|
||||
|
||||
// Replay any updates
|
||||
for (const auto &update: updates) {
|
||||
|
||||
@@ -22,15 +22,13 @@ TEST_CASE("dag_construction", "[dag]") {
|
||||
REQUIRE(!dag.empty());
|
||||
|
||||
// Cannot add an edge that would result in a cycle
|
||||
REQUIRE_THROWS(dag.addEdge(9, 5));
|
||||
dag.addEdge(9, 5);
|
||||
REQUIRE_THROWS(dag.isValid());
|
||||
|
||||
// Bounds checking
|
||||
SECTION("addEdge Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.addEdge(20, 0));
|
||||
REQUIRE_THROWS(dag.addEdge(0, 20));
|
||||
}SECTION("hasPath Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.hasPath(20, 0));
|
||||
REQUIRE_THROWS(dag.hasPath(0, 20));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include <iomanip>
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
@@ -180,3 +181,52 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]") {
|
||||
REQUIRE(record.tasks["C_0"].children.empty());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("dag_runner_stress", "[utilities_dag_runner_stress]") {
|
||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||
std::stringstream ss;
|
||||
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
||||
|
||||
|
||||
SECTION("Stress-test") {
|
||||
static std::random_device dev;
|
||||
static std::mt19937 rng(dev());
|
||||
std::uniform_int_distribution<size_t> nDepDist(0, 10);
|
||||
|
||||
const size_t N_NODES = 100;
|
||||
daggy::TaskSet tasks;
|
||||
std::vector<fs::path> fileNames;
|
||||
std::vector<std::string> taskNames;
|
||||
|
||||
for (size_t i = 0; i < N_NODES; ++i) {
|
||||
std::string taskName = std::to_string(i);
|
||||
std::uniform_int_distribution<size_t> depDist(i+1, N_NODES-1);
|
||||
std::unordered_set<std::string> deps;
|
||||
size_t nChildren = nDepDist(rng);
|
||||
for (size_t c = 0; c < nChildren; ++c) {
|
||||
deps.insert(std::to_string(depDist(rng)));
|
||||
}
|
||||
tasks.emplace(taskName, daggy::Task{
|
||||
.definedName = taskName,
|
||||
.job = { { "command", std::vector<std::string>{"/usr/bin/echo", taskName}}},
|
||||
.children = deps
|
||||
});
|
||||
}
|
||||
|
||||
auto dag = daggy::buildDAGFromTasks(tasks);
|
||||
|
||||
auto runID = logger.startDAGRun("test_run", tasks);
|
||||
|
||||
auto tryDAG = daggy::runDAG(runID, ex, logger, dag);
|
||||
|
||||
REQUIRE(tryDAG.allVisited());
|
||||
|
||||
// Get the DAG Run Attempts
|
||||
auto record = logger.getDAGRun(runID);
|
||||
for (const auto & [k, attempts] : record.taskAttempts) {
|
||||
REQUIRE(attempts.size() == 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user