Refactoring RunState, fixing logic error in when to end errored DAG runs, added convenience functions to retry failed DAGs.

This commit is contained in:
Ian Roddis
2021-08-20 21:44:12 -03:00
parent dc8ea4c369
commit 2083b1c3f1
8 changed files with 124 additions and 42 deletions

4
.gitignore vendored
View File

@@ -1,4 +1,4 @@
build
.cache
cmake-build-*
.idea
cmake-build-debug/
.idea

View File

@@ -9,6 +9,8 @@
#include <functional>
#include <optional>
#include "Defines.hpp"
/*
The DAG structure in daggy is just to ensure that tasks are run
in the correct dependent order.
@@ -16,14 +18,8 @@
namespace daggy {
enum class VertexState : uint32_t {
UNVISITED = 0,
VISITING,
VISITED
};
struct Vertex {
VertexState state;
RunState state;
uint32_t depCount;
std::unordered_set<size_t> children;
};
@@ -51,10 +47,15 @@ namespace daggy {
bool empty() const;
// Traversal
// Reset the DAG to completely unvisited
void reset();
VertexState getVertexState(const size_t id) const;
// Reset any vertex with RUNNING state to QUEUED
void resetRunning();
RunState getVertexState(const size_t id) const;
void setVertexState(const size_t id, RunState state);
bool allVisited() const;

View File

@@ -21,4 +21,12 @@ namespace daggy {
using DAGRunID = size_t;
using TaskID = size_t;
enum class RunState : uint32_t {
QUEUED = 0,
RUNNING = 1,
RETRY = 1 << 1,
ERRORED = 1 << 2,
KILLED = 1 << 3,
COMPLETED = 1 << 4
};
}

View File

@@ -18,7 +18,8 @@ namespace daggy {
std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters);
DAG buildDAGFromTasks(const std::vector<Task> &tasks);
DAG buildDAGFromTasks(const std::vector<Task> &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates = {});
// Blocking call
std::vector<AttemptRecord>

View File

@@ -3,15 +3,6 @@
namespace daggy {
namespace loggers {
namespace dag_run {
enum class RunState : uint32_t {
QUEUED = 0,
RUNNING = 1,
RETRY = 1 << 1,
ERRORED = 1 << 2,
KILLED = 1 << 3,
COMPLETED = 1 << 4
};
struct TaskUpdateRecord {
TimePoint time;
TaskID taskID;
@@ -26,11 +17,11 @@ namespace daggy {
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
std::vector <Task> tasks;
std::vector <RunState> runStates;
std::vector <std::vector<AttemptRecord>> taskAttempts;
std::vector <TaskUpdateRecord> taskStateChanges;
std::vector <DAGUpdateRecord> dagStateChanges;
std::vector<Task> tasks;
std::vector<RunState> runStates;
std::vector<std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges;
};
struct DAGRunSummary {
@@ -39,7 +30,7 @@ namespace daggy {
RunState runState;
TimePoint startTime;
TimePoint lastUpdate;
std::unordered_map <RunState, size_t> taskStateCounts;
std::unordered_map<RunState, size_t> taskStateCounts;
};
}
}

View File

@@ -7,7 +7,7 @@ namespace daggy {
bool DAG::empty() const { return vertices_.empty(); }
size_t DAG::addVertex() {
vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0});
vertices_.push_back(Vertex{.state = RunState::QUEUED, .depCount = 0});
return vertices_.size() - 1;
}
@@ -39,7 +39,7 @@ namespace daggy {
void DAG::reset() {
// Reset the state of all vertices
for (auto &v : vertices_) {
v.state = VertexState::UNVISITED;
v.state = RunState::QUEUED;
v.depCount = 0;
}
@@ -51,9 +51,20 @@ namespace daggy {
}
}
void DAG::resetRunning() {
for (auto &v : vertices_) {
if (v.state != RunState::RUNNING) continue;
v.state = RunState::QUEUED;
}
}
void DAG::setVertexState(const size_t id, RunState state) {
vertices_[id].state = state;
}
bool DAG::allVisited() const {
for (const auto &v : vertices_) {
if (v.state != VertexState::VISITED) return false;
if (v.state != RunState::COMPLETED) return false;
}
return true;
}
@@ -62,9 +73,9 @@ namespace daggy {
for (size_t i = 0; i < vertices_.size(); ++i) {
auto &v = vertices_[i];
if (v.state != VertexState::UNVISITED) continue;
if (v.state != RunState::QUEUED) continue;
if (v.depCount != 0) continue;
v.state = VertexState::VISITING;
v.state = RunState::RUNNING;
return i;
}
return {};
@@ -72,7 +83,7 @@ namespace daggy {
void DAG::completeVisit(const size_t id) {
auto &v = vertices_[id];
v.state = VertexState::VISITED;
v.state = RunState::COMPLETED;
for (auto c : v.children) {
--vertices_[c].depCount;
}

View File

@@ -52,7 +52,8 @@ namespace daggy {
return commands;
}
DAG buildDAGFromTasks(const std::vector<Task> &tasks) {
DAG buildDAGFromTasks(const std::vector<Task> &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates) {
DAG dag;
std::unordered_map<std::string, size_t> taskIDs;
@@ -68,6 +69,20 @@ namespace daggy {
}
}
dag.reset();
// Replay any updates
for (const auto &update : updates) {
switch (update.newState) {
case RunState::RUNNING:
case RunState::RETRY:
case RunState::ERRORED:
case RunState::KILLED:
dag.setVertexState(update.taskID, RunState::RUNNING);
dag.setVertexState(update.taskID, RunState::COMPLETED);
break;
}
}
return dag;
}
@@ -77,13 +92,13 @@ namespace daggy {
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger) {
std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RUNNING);
logger.updateTaskState(runID, task.name, RunState::RUNNING);
while (attempts.size() < task.maxRetries + 1) {
attempts.push_back(executor.runCommand(task.command));
logger.logTaskAttempt(runID, task.name, attempts.back());
if (attempts.back().rc == 0) break;
logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RETRY);
logger.updateTaskState(runID, task.name, RunState::RETRY);
}
return attempts;
}
@@ -93,7 +108,7 @@ namespace daggy {
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
DAG dag) {
logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING);
logger.updateDAGRunState(runID, RunState::RUNNING);
struct TaskState {
size_t tid;
@@ -103,6 +118,9 @@ namespace daggy {
std::vector<TaskState> taskStates;
// TODO Handle case where everything is wedged due to errors
size_t running = 0;
size_t errored = 0;
while (!dag.allVisited()) {
// Check for any completed tasks
for (auto &taskState : taskStates) {
@@ -112,16 +130,18 @@ namespace daggy {
auto attemptRecords = taskState.fut.get();
const auto &taskName = tasks[taskState.tid].name;
if (attemptRecords.empty()) {
logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED);
continue;
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
if (attemptRecords.back().rc == 0) {
logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::COMPLETED);
logger.updateTaskState(runID, taskName, RunState::COMPLETED);
dag.completeVisit(taskState.tid);
taskState.complete = true;
--running;
} else {
logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED);
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
taskState.complete = true;
}
}
@@ -139,6 +159,7 @@ namespace daggy {
.complete = false
};
taskStates.push_back(std::move(tsk));
++running;
auto nextTask = dag.visitNext();
if (not nextTask.has_value()) break;
@@ -147,6 +168,10 @@ namespace daggy {
if (!tq->empty()) {
executor.threadPool.addTasks(tq);
}
if (running > 0 and errored == running) {
logger.updateDAGRunState(runID, RunState::ERRORED);
break;
}
std::this_thread::sleep_for(250ms);
}
return dag;

View File

@@ -88,4 +88,49 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
REQUIRE(attempts.front().rc == 0);
}
}
SECTION("Recovery from Error") {
auto cleanup = []() {
// Cleanup
std::vector<fs::path> paths{"/tmp/rec_error_A", "/tmp/noexist" };
for (const auto & pth : paths) {
if (fs::exists(pth)) fs::remove_all(pth);
}
};
cleanup();
// daggy::loggers::dag_run::OStreamLogger logger(std::cout);
std::string goodPrefix = "/tmp/rec_error_";
std::string badPrefix = "/tmp/noexist/rec_error_";
std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")"
+ goodPrefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")"
+ badPrefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")"
+ badPrefix + R"(C"]}])";
auto tasks = daggy::tasksFromJSON(taskJSON);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto tryDAG = daggy::runDAG(runID, tasks, ex, logger, dag);
REQUIRE(!tryDAG.allVisited());
// Create the missing dir, then continue to run the DAG
fs::create_directory("/tmp/noexist");
tryDAG.resetRunning();
auto endDAG = daggy::runDAG(runID, tasks, ex, logger, tryDAG);
REQUIRE(endDAG.allVisited());
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
REQUIRE(record.taskAttempts[0].size() == 1); // A ran fine
REQUIRE(record.taskAttempts[1].size() == 2); // B errored and had to be retried
REQUIRE(record.taskAttempts[2].size() == 1); // C wasn't run because B errored
cleanup();
}
}