- Adding support for state to OStreamLogger to make it more useful for test cases

- Making runDAG return the end DAG
- Adding much more robust test for DAG execution for basic tests.
This commit is contained in:
Ian Roddis
2021-08-20 12:43:01 -03:00
parent 0f1f00362c
commit dc8ea4c369
5 changed files with 70 additions and 20 deletions

View File

@@ -28,11 +28,11 @@ namespace daggy {
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger);
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
DAG dag);
DAG runDAG(DAGRunID runID,
std::vector<Task> tasks,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
DAG dag);
std::ostream &operator<<(std::ostream &os, const TimePoint &tp);
}

View File

@@ -32,9 +32,9 @@ namespace daggy {
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
private:
DAGRunID nextRunID_;
std::mutex guard_;
std::ostream &os_;
std::vector<DAGRunRecord> dagRuns_;
};
}
}

View File

@@ -88,11 +88,11 @@ namespace daggy {
return attempts;
}
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
DAG dag) {
DAG runDAG(DAGRunID runID,
std::vector<Task> tasks,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
DAG dag) {
logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING);
struct TaskState {
@@ -149,6 +149,7 @@ namespace daggy {
}
std::this_thread::sleep_for(250ms);
}
return dag;
}
std::ostream &operator<<(std::ostream &os, const TimePoint &tp) {

View File

@@ -1,4 +1,5 @@
#include <iterator>
#include <algorithm>
#include <magic_enum.hpp>
@@ -7,17 +8,26 @@
namespace daggy {
namespace loggers {
namespace dag_run {
OStreamLogger::OStreamLogger(std::ostream &os) : nextRunID_(0), os_(os) {}
OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {}
// Execution
DAGRunID OStreamLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) {
std::lock_guard<std::mutex> lock(guard_);
size_t runID = nextRunID_++;
size_t runID = dagRuns_.size();
dagRuns_.push_back({
.name = name,
.tasks = tasks,
//.runStates = std::vector<RunState>(tasks.size(), RunState::QUEUED),
.runStates{tasks.size(), RunState::QUEUED},
.taskAttempts = std::vector<std::vector<AttemptRecord>>(tasks.size())
});
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
<< " tasks" << std::endl;
for (const auto &task : tasks) {
os_ << "TASK (" << task.name << "): ";
std::copy(task.command.begin(), task.command.end(), std::ostream_iterator<std::string>(os_, " "));
std::copy(task.command.begin(), task.command.end(),
std::ostream_iterator<std::string>(os_, " "));
os_ << std::endl;
}
return runID;
@@ -26,6 +36,7 @@ namespace daggy {
void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {
std::lock_guard<std::mutex> lock(guard_);
os_ << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl;
dagRuns_[dagRunID].dagStateChanges.push_back({Clock::now(), state});
}
void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName,
@@ -34,18 +45,32 @@ namespace daggy {
const std::string &msg = attempt.rc == 0 ? attempt.outputLog : attempt.errorLog;
os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": "
<< msg << std::endl;
const auto &tasks = dagRuns_[dagRunID].tasks;
auto it = std::find_if(tasks.begin(), tasks.end(),
[&taskName](const Task &a) { return a.name == taskName; });
if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName);
size_t taskID = it - tasks.begin();
dagRuns_[dagRunID].taskAttempts[taskID].push_back(attempt);
}
void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) {
std::lock_guard<std::mutex> lock(guard_);
os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << magic_enum::enum_name(state)
<< std::endl;
const auto &tasks = dagRuns_[dagRunID].tasks;
auto it = std::find_if(tasks.begin(), tasks.end(),
[&taskName](const Task &a) { return a.name == taskName; });
if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName);
size_t taskID = it - tasks.begin();
dagRuns_[dagRunID].taskStateChanges.push_back({Clock::now(), taskID, state});
}
// Querying
std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) { return {}; }
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return {}; }
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return dagRuns_[dagRunID]; }
}
}
}

View File

@@ -13,6 +13,8 @@
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
TEST_CASE("String Utilities", "[utilities_string]") {
std::string test = "/this/is/{{A}}/test/{{A}}";
auto res = daggy::globalSub(test, "{{A}}", "hello");
@@ -58,10 +60,32 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
std::string taskJSON = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
auto tasks = daggy::tasksFromJSON(taskJSON);
auto dag = daggy::buildDAGFromTasks(tasks);
SECTION("Simple execution") {
std::string prefix = "/tmp/asdlk_";
std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")"
+ prefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")"
+ prefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")"
+ prefix + R"(C"]}])";
auto tasks = daggy::tasksFromJSON(taskJSON);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
daggy::runDAG(runID, tasks, ex, logger, dag);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, tasks, ex, logger, dag);
REQUIRE(endDAG.allVisited());
std::vector<std::string> letters{"A", "B", "C"};
for (const auto &letter : letters) {
fs::path file{prefix + letter};
REQUIRE(fs::exists(file));
fs::remove(file);
}
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
for (const auto &attempts : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.front().rc == 0);
}
}
}