Converting underlying DAG implementation to use vectors for storage

Reduces hashing required and speeds up traversals
This commit is contained in:
Ian Roddis
2021-09-22 14:19:20 -03:00
parent 33942e2c63
commit 510aa236c4
5 changed files with 114 additions and 59 deletions

View File

@@ -20,13 +20,13 @@
namespace daggy { namespace daggy {
template <typename K, typename V> template <typename T>
struct Vertex struct Vertex
{ {
RunState state = RunState::QUEUED; RunState state = RunState::QUEUED;
uint32_t depCount = 0; uint32_t depCount = 0;
V data; T data;
std::unordered_set<K> children; std::unordered_set<size_t> children;
}; };
template <typename K, typename V> template <typename K, typename V>
@@ -42,7 +42,7 @@ namespace daggy {
void addEdge(const K &from, const K &to); void addEdge(const K &from, const K &to);
void addEdgeIf(const K &src, void addEdgeIf(const K &src,
std::function<bool(const Vertex<K, V> &v)> predicate); std::function<bool(const Vertex<V> &v)> predicate);
[[nodiscard]] bool isValid() const; [[nodiscard]] bool isValid() const;
@@ -61,19 +61,21 @@ namespace daggy {
void setVertexState(const K &id, RunState state); void setVertexState(const K &id, RunState state);
void forEach( void forEach(std::function<void(const Vertex<V> &)> fun) const;
std::function<void(const std::pair<K, Vertex<K, V>> &)> fun) const;
[[nodiscard]] bool allVisited() const; [[nodiscard]] bool allVisited() const;
std::optional<std::pair<K, V>> visitNext(); std::optional<std::pair<K, V>> visitNext();
Vertex<K, V> &getVertex(const K &id); // WARNING: reference potentially invalidated on insertions.
Vertex<V> &getVertex(const K &id);
void completeVisit(const K &id); void completeVisit(const K &id);
private: private:
std::unordered_map<K, Vertex<K, V>> vertices_; std::unordered_map<K, size_t> keyMap_;
std::vector<K> vertexName_;
std::vector<Vertex<V>> vertices_;
}; };
} // namespace daggy } // namespace daggy

View File

@@ -14,20 +14,20 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
bool DAG<K, V>::hasVertex(const K &id) bool DAG<K, V>::hasVertex(const K &id)
{ {
return vertices_.count(id) != 0; return keyMap_.count(id) != 0;
} }
template <typename K, typename V> template <typename K, typename V>
Vertex<K, V> &DAG<K, V>::getVertex(const K &id) Vertex<V> &DAG<K, V>::getVertex(const K &id)
{ {
return vertices_.at(id); return vertices_[keyMap_.at(id)];
} }
template <typename K, typename V> template <typename K, typename V>
std::unordered_set<K> DAG<K, V>::getVertices() const std::unordered_set<K> DAG<K, V>::getVertices() const
{ {
std::unordered_set<K> keys; std::unordered_set<K> keys;
for (const auto it : vertices_) { for (const auto it : keyMap_) {
keys.insert(it.first); keys.insert(it.first);
} }
return keys; return keys;
@@ -36,58 +36,59 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::addVertex(K id, V data) void DAG<K, V>::addVertex(K id, V data)
{ {
if (vertices_.count(id) != 0) { if (keyMap_.count(id) != 0) {
std::stringstream ss; std::stringstream ss;
ss << "A vertex with ID " << id << " already exists in the DAG"; ss << "A vertex with ID " << id << " already exists in the DAG";
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
vertices_.emplace( size_t idx = vertices_.size();
id, vertexName_.emplace_back(id);
Vertex<K, V>{.state = RunState::QUEUED, .depCount = 0, .data = data}); vertices_.emplace_back(
Vertex<V>{.state = RunState::QUEUED, .depCount = 0, .data = data});
keyMap_.emplace(id, idx);
} }
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::addEdge(const K &from, const K &to) void DAG<K, V>::addEdge(const K &from, const K &to)
{ {
if (vertices_.find(from) == vertices_.end()) size_t src = keyMap_.at(from);
throw std::runtime_error("No such vertex"); size_t dst = keyMap_.at(to);
if (vertices_.find(to) == vertices_.end()) vertices_[src].children.insert(dst);
throw std::runtime_error("No such vertex"); vertices_[dst].depCount++;
vertices_.at(from).children.insert(to);
vertices_.at(to).depCount++;
} }
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::addEdgeIf( void DAG<K, V>::addEdgeIf(const K &src,
const K &src, std::function<bool(const Vertex<K, V> &v)> predicate) std::function<bool(const Vertex<V> &v)> predicate)
{ {
auto &parent = vertices_.at(src); size_t parentIdx = keyMap_.at(src);
for (auto &[name, vertex] : vertices_) { auto &parent = vertices_[parentIdx];
if (!predicate(vertex)) for (size_t i = 0; i < vertices_.size(); ++i) {
if (!predicate(vertices_[i]))
continue; continue;
if (name == src) if (i == parentIdx)
continue; continue;
parent.children.insert(name); parent.children.insert(i);
vertex.depCount++; vertices_[i].depCount++;
} }
} }
template <typename K, typename V> template <typename K, typename V>
bool DAG<K, V>::isValid() const bool DAG<K, V>::isValid() const
{ {
std::unordered_map<K, size_t> depCounts; std::vector<size_t> depCounts(vertices_.size(), 0);
std::queue<K> ready; std::queue<size_t> ready;
size_t processed = 0; size_t processed = 0;
for (const auto &[k, v] : vertices_) { for (size_t i = 0; i < vertices_.size(); ++i) {
depCounts[k] = v.depCount; depCounts[i] = vertices_[i].depCount;
if (v.depCount == 0) if (depCounts[i] == 0)
ready.push(k); ready.push(i);
} }
while (!ready.empty()) { while (!ready.empty()) {
const auto &k = ready.front(); const auto &k = ready.front();
for (const auto &child : vertices_.at(k).children) { for (const auto &child : vertices_[k].children) {
auto dc = --depCounts[child]; auto dc = --depCounts[child];
if (dc == 0) if (dc == 0)
ready.push(child); ready.push(child);
@@ -103,15 +104,15 @@ namespace daggy {
void DAG<K, V>::reset() void DAG<K, V>::reset()
{ {
// Reset the state of all vertices // Reset the state of all vertices
for (auto &[_, v] : vertices_) { for (auto &v : vertices_) {
v.state = RunState::QUEUED; v.state = RunState::QUEUED;
v.depCount = 0; v.depCount = 0;
} }
// Calculate the upstream count // Calculate the upstream count
for (auto &[_, v] : vertices_) { for (auto &v : vertices_) {
for (auto c : v.children) { for (auto c : v.children) {
vertices_.at(c).depCount++; vertices_[c].depCount++;
} }
} }
} }
@@ -119,7 +120,7 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::resetRunning() void DAG<K, V>::resetRunning()
{ {
for (auto &[k, v] : vertices_) { for (auto &v : vertices_) {
if (v.state != +RunState::RUNNING) if (v.state != +RunState::RUNNING)
continue; continue;
v.state = RunState::QUEUED; v.state = RunState::QUEUED;
@@ -129,29 +130,28 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::setVertexState(const K &id, RunState state) void DAG<K, V>::setVertexState(const K &id, RunState state)
{ {
vertices_.at(id).state = state; vertices_[keyMap_.at(id)].state = state;
} }
template <typename K, typename V> template <typename K, typename V>
bool DAG<K, V>::allVisited() const bool DAG<K, V>::allVisited() const
{ {
for (const auto &[_, v] : vertices_) { return not std::any_of(
if (v.state != +RunState::COMPLETED) vertices_.begin(), vertices_.end(),
return false; [](const auto &v) { return v.state != +RunState::COMPLETED; });
}
return true;
} }
template <typename K, typename V> template <typename K, typename V>
std::optional<std::pair<K, V>> DAG<K, V>::visitNext() std::optional<std::pair<K, V>> DAG<K, V>::visitNext()
{ {
for (auto &[k, v] : vertices_) { for (size_t i = 0; i < vertices_.size(); ++i) {
auto &v = vertices_[i];
if (v.state != +RunState::QUEUED) if (v.state != +RunState::QUEUED)
continue; continue;
if (v.depCount != 0) if (v.depCount != 0)
continue; continue;
v.state = RunState::RUNNING; v.state = RunState::RUNNING;
return std::make_pair(k, v.data); return std::make_pair(vertexName_[i], v.data);
} }
return {}; return {};
} }
@@ -159,16 +159,15 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::completeVisit(const K &id) void DAG<K, V>::completeVisit(const K &id)
{ {
auto &v = vertices_.at(id); auto &v = vertices_[keyMap_.at(id)];
v.state = RunState::COMPLETED; v.state = RunState::COMPLETED;
for (auto c : v.children) { for (auto c : v.children) {
--vertices_.at(c).depCount; --vertices_[c].depCount;
} }
} }
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::forEach( void DAG<K, V>::forEach(std::function<void(const Vertex<V> &)> fun) const
std::function<void(const std::pair<K, Vertex<K, V>> &)> fun) const
{ {
for (auto it = vertices_.begin(); it != vertices_.end(); ++it) { for (auto it = vertices_.begin(); it != vertices_.end(); ++it) {
fun(*it); fun(*it);

View File

@@ -146,22 +146,36 @@ namespace daggy {
if (fut.valid()) { if (fut.valid()) {
auto attempt = fut.get(); auto attempt = fut.get();
logger.logTaskAttempt(runID, taskName, attempt); logger.logTaskAttempt(runID, taskName, attempt);
auto &vert = dag.getVertex(taskName);
// Not a reference, since adding tasks will invalidate references
auto vert = dag.getVertex(taskName);
auto &task = vert.data; auto &task = vert.data;
if (attempt.rc == 0) { if (attempt.rc == 0) {
logger.updateTaskState(runID, taskName, RunState::COMPLETED); logger.updateTaskState(runID, taskName, RunState::COMPLETED);
if (task.isGenerator) { if (task.isGenerator) {
// Parse the output and update the DAGs // Parse the output and update the DAGs
try { try {
auto newTasks = expandTaskSet(tasksFromJSON(attempt.outputLog), auto parsedTasks = tasksFromJSON(attempt.outputLog);
executor, parameters); auto newTasks =
expandTaskSet(parsedTasks, executor, parameters);
updateDAGFromTasks(dag, newTasks); updateDAGFromTasks(dag, newTasks);
// Add in dependencies from current task to new tasks
for (const auto &[ntName, ntTask] : newTasks) { for (const auto &[ntName, ntTask] : newTasks) {
logger.addTask(runID, ntName, ntTask); logger.addTask(runID, ntName, ntTask);
dag.addEdge(taskName, ntName);
task.children.insert(ntName); task.children.insert(ntName);
} }
// Efficiently add new edges from generator task
// to children
std::unordered_set<std::string> baseNames;
for (const auto &[k, v] : parsedTasks) {
baseNames.insert(v.definedName);
}
dag.addEdgeIf(taskName, [&](const auto &v) {
return baseNames.count(v.data.definedName) > 0;
});
logger.updateTask(runID, taskName, task); logger.updateTask(runID, taskName, task);
} }
catch (std::exception &e) { catch (std::exception &e) {

View File

@@ -63,7 +63,9 @@ TEST_CASE("dag_traversal", "[dag]")
std::vector<size_t> visitOrder(N_VERTICES); std::vector<size_t> visitOrder(N_VERTICES);
size_t i = 0; size_t i = 0;
while (!dag.allVisited()) { while (!dag.allVisited()) {
const auto v = dag.visitNext().value(); auto o = dag.visitNext();
REQUIRE(o.has_value());
const auto v = o.value();
dag.completeVisit(v.first); dag.completeVisit(v.first);
visitOrder[v.first] = i; visitOrder[v.first] = i;
++i; ++i;

View File

@@ -150,6 +150,13 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]")
REQUIRE(attempts.front().rc == 0); REQUIRE(attempts.front().rc == 0);
} }
} }
}
TEST_CASE("runDAG_recovery", "[runDAG]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Recovery from Error") SECTION("Recovery from Error")
{ {
@@ -198,6 +205,13 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]")
cleanup(); cleanup();
} }
}
TEST_CASE("runDAG_generator", "[runDAG_generator]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Generator tasks") SECTION("Generator tasks")
{ {
@@ -211,8 +225,8 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]")
ofh << generatorOutput << std::endl; ofh << generatorOutput << std::endl;
ofh.close(); ofh.close();
daggy::TimePoint globalStartTime = daggy::Clock::now();
std::stringstream jsonTasks; std::stringstream jsonTasks;
jsonTasks jsonTasks
<< R"({ "A": { "job": {"command": [ "/usr/bin/cat", )" << R"({ "A": { "job": {"command": [ "/usr/bin/cat", )"
<< std::quoted(ofn.string()) << std::quoted(ofn.string())
@@ -221,8 +235,10 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]")
auto baseTasks = daggy::tasksFromJSON(jsonTasks.str()); auto baseTasks = daggy::tasksFromJSON(jsonTasks.str());
REQUIRE(baseTasks.size() == 2); REQUIRE(baseTasks.size() == 2);
REQUIRE(baseTasks["A"].children == std::unordered_set<std::string>{"C"});
auto tasks = daggy::expandTaskSet(baseTasks, ex, params); auto tasks = daggy::expandTaskSet(baseTasks, ex, params);
REQUIRE(tasks.size() == 2); REQUIRE(tasks.size() == 2);
REQUIRE(tasks["A_0"].children == std::unordered_set<std::string>{"C"});
auto dag = daggy::buildDAGFromTasks(tasks); auto dag = daggy::buildDAGFromTasks(tasks);
REQUIRE(dag.size() == 2); REQUIRE(dag.size() == 2);
@@ -250,5 +266,27 @@ TEST_CASE("dag_runner", "[utilities_dag_runner]")
REQUIRE(record.tasks["B_1"].children == REQUIRE(record.tasks["B_1"].children ==
std::unordered_set<std::string>{"C"}); std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["C_0"].children.empty()); REQUIRE(record.tasks["C_0"].children.empty());
// Ensure they were run in the right order
// All A's get run before B's, which run before C's
daggy::TimePoint globalStopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 3> minTimes;
minTimes.fill(globalStartTime);
std::array<daggy::TimePoint, 3> maxTimes;
maxTimes.fill(globalStopTime);
for (const auto &[k, v] : record.taskAttempts) {
size_t idx = k[0] - 65;
auto &startTime = minTimes[idx];
auto &stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 3; ++i) {
for (size_t j = i + 1; j < 2; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
} }
} }