Squashed commit of the following:

commit 73994327de890590eede353c8131f3f7c1e8aaa3
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Aug 25 13:38:29 2021 -0300

    - Fixing up checks for individual dag runs

commit f20e3a3dec8c063111cf60f2bec2b8f84c8a4100
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Aug 25 10:49:43 2021 -0300

    - Finishing serialization of DAGRun
    - Checkpointing work.

commit b490abadf93e3085e4204003de7eaa8183b4e1d5
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Aug 25 10:34:08 2021 -0300

    - Consolidating struct definitions into Defines.hpp
    - Renaming DAGRunRecord member runStates to taskRunStates

commit 050346ec1fd10d1091f261905c6175ffe0bcf001
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Aug 25 09:27:05 2021 -0300

    - Adding additional tests for server endpoints
This commit is contained in:
Ian Roddis
2021-08-25 13:40:05 -03:00
parent 6ed57806d0
commit 212bd80df2
20 changed files with 344 additions and 177 deletions

View File

@@ -37,6 +37,12 @@ and [postgres](https://postgresql.org).
Building
==
**Requirements:**
- git
- cmake >= 3.14
- gcc >= 9
```
git clone https://gitlab.com/iroddis/daggy
cd daggy

10
TODO.md
View File

@@ -6,8 +6,10 @@ Tasks
- [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?)
- [ ] Flesh out server and interface
- Core Functionality
- [ ] Handle return on errored DAG / Task
- [ ] Clearing a DAG Task
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
separated list
- Allow for tasks to define next tasks
- Add execution gates
- Executors
- [ ] Slurm Executor
- Loggers
@@ -22,4 +24,6 @@ Tasks
- [X] Add ability to define child -> parent relationships
- [X] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list,
and logger
- [X] Resume a failed DAG
- [X] Resume a failed DAG
- [X] Handle return on errored DAG / Task
- [X] Clearing a DAG Task

View File

@@ -1,17 +0,0 @@
#pragma once
#include <chrono>
#include <string>
#include "Defines.hpp"
namespace daggy {
struct AttemptRecord {
TimePoint startTime;
TimePoint stopTime;
int rc; // RC from the task
std::string executorLog; // Logs from the dag_executor
std::string outputLog; // stdout from command
std::string errorLog; // stderr from command
};
}

View File

@@ -1,21 +0,0 @@
#pragma once
#include <string>
#include <unordered_map>
#include <variant>
#include "DAG.hpp"
#include "Task.hpp"
#include "AttemptRecord.hpp"
namespace daggy {
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
using TaskRun = std::vector<AttemptRecord>;
struct DAGRun {
std::vector<Task> tasks;
std::unordered_map<std::string, ParameterValue> parameters;
DAG dag;
std::vector<TaskRun> taskRuns;
};
}

View File

@@ -3,6 +3,7 @@
#include <chrono>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <variant>
#include <vector>
@@ -29,4 +30,29 @@ namespace daggy {
KILLED = 1 << 3,
COMPLETED = 1 << 4
};
struct Task {
std::string name;
std::vector<std::string> command;
uint32_t maxRetries;
uint32_t retryIntervalSeconds; // Time to wait between retries
std::unordered_set<std::string> children;
bool operator==(const Task &other) const {
return (name == other.name)
and (maxRetries == other.maxRetries)
and (retryIntervalSeconds == other.retryIntervalSeconds)
and (command == other.command)
and (children == other.children);
}
};
struct AttemptRecord {
TimePoint startTime;
TimePoint stopTime;
int rc; // RC from the task
std::string executorLog; // Logs from the dag_executor
std::string outputLog; // stdout from command
std::string errorLog; // stderr from command
};
}

View File

@@ -8,8 +8,6 @@
#include <rapidjson/document.h>
#include "Defines.hpp"
#include "Task.hpp"
#include "AttemptRecord.hpp"
namespace rj = rapidjson;

View File

@@ -1,24 +0,0 @@
#pragma once
#include <cstdint>
#include <string>
#include <vector>
#include <unordered_set>
namespace daggy {
struct Task {
std::string name;
std::vector<std::string> command;
uint32_t maxRetries;
uint32_t retryIntervalSeconds; // Time to wait between retries
std::unordered_set<std::string> children;
bool operator==(const Task &other) const {
return (name == other.name)
and (maxRetries == other.maxRetries)
and (retryIntervalSeconds == other.retryIntervalSeconds)
and (command == other.command)
and (children == other.children);
}
};
}

View File

@@ -9,7 +9,6 @@
#include "daggy/loggers/dag_run/DAGRunLogger.hpp"
#include "daggy/executors/task/TaskExecutor.hpp"
#include "Task.hpp"
#include "Defines.hpp"
#include "DAG.hpp"

View File

@@ -16,4 +16,4 @@ namespace daggy {
};
}
}
}
}

View File

@@ -6,8 +6,7 @@
#include <thread>
#include <vector>
#include "daggy/Task.hpp"
#include "daggy/AttemptRecord.hpp"
#include "daggy/Defines.hpp"
#include "daggy/ThreadPool.hpp"
/*

View File

@@ -2,8 +2,6 @@
#include <string>
#include "../../Task.hpp"
#include "../../AttemptRecord.hpp"
#include "../../Defines.hpp"
#include "Defines.hpp"

View File

@@ -1,37 +1,39 @@
#pragma once
namespace daggy {
namespace loggers {
namespace dag_run {
struct TaskUpdateRecord {
TimePoint time;
TaskID taskID;
RunState newState;
};
#include <cstdint>
#include <string>
#include <vector>
#include <unordered_set>
struct DAGUpdateRecord {
TimePoint time;
RunState newState;
};
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
std::vector<Task> tasks;
std::vector<RunState> runStates;
std::vector<std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges;
};
namespace daggy::loggers::dag_run {
struct TaskUpdateRecord {
TimePoint time;
TaskID taskID;
RunState newState;
};
struct DAGRunSummary {
DAGRunID runID;
std::string name;
RunState runState;
TimePoint startTime;
TimePoint lastUpdate;
std::unordered_map<RunState, size_t> taskStateCounts;
};
}
}
}
struct DAGUpdateRecord {
TimePoint time;
RunState newState;
};
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
std::vector<Task> tasks;
std::vector<RunState> taskRunStates;
std::vector<std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges;
};
struct DAGRunSummary {
DAGRunID runID;
std::string name;
RunState runState;
TimePoint startTime;
TimePoint lastUpdate;
std::unordered_map<RunState, size_t> taskStateCounts;
};
}

View File

@@ -6,65 +6,62 @@
#include <rapidjson/document.h>
#include "DAGRunLogger.hpp"
#include "Defines.hpp"
namespace fs = std::filesystem;
namespace rj = rapidjson;
namespace daggy {
namespace loggers {
namespace dag_run {
/*
* This logger should only be used for debug purposes. It's not really optimized for querying, and will
* use a ton of inodes to track state.
*
* On the plus side, it's trivial to look at without using the API.
*
* Filesystem logger creates the following structure:
* {root}/
* runs/
* {runID}/
* meta.json --- Contains the DAG name, task definitions
* states.csv --- DAG state changes
* {taskName}/
* states.csv --- TASK state changes
* {attempt}/
* metadata.json --- timestamps and rc
* output.log
* error.log
* executor.log
*/
class FileSystemLogger : public DAGRunLogger {
public:
FileSystemLogger(fs::path root);
namespace daggy::loggers::dag_run {
/*
* This logger should only be used for debug purposes. It's not really optimized for querying, and will
* use a ton of inodes to track state.
*
* On the plus side, it's trivial to look at without using the API.
*
* Filesystem logger creates the following structure:
* {root}/
* runs/
* {runID}/
* meta.json --- Contains the DAG name, task definitions
* states.csv --- DAG state changes
* {taskName}/
* states.csv --- TASK state changes
* {attempt}/
* metadata.json --- timestamps and rc
* output.log
* error.log
* executor.log
*/
class FileSystemLogger : public DAGRunLogger {
public:
FileSystemLogger(fs::path root);
// Execution
DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
// Execution
DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
void
logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override;
void
logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override;
void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override;
void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override;
// Querying
std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
// Querying
std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
private:
fs::path root_;
std::atomic<DAGRunID> nextRunID_;
std::mutex lock_;
private:
fs::path root_;
std::atomic<DAGRunID> nextRunID_;
std::mutex lock_;
// std::unordered_map<fs::path, std::mutex> runLocks;
// std::unordered_map<fs::path, std::mutex> runLocks;
inline const fs::path getCurrentPath() const;
inline const fs::path getCurrentPath() const;
inline const fs::path getRunsRoot() const;
inline const fs::path getRunsRoot() const;
inline const fs::path getRunRoot(DAGRunID runID) const;
};
}
}
inline const fs::path getRunRoot(DAGRunID runID) const;
};
}

View File

@@ -4,6 +4,7 @@
#include <mutex>
#include "DAGRunLogger.hpp"
#include "Defines.hpp"
namespace daggy {
namespace loggers {
@@ -38,4 +39,4 @@ namespace daggy {
};
}
}
}
}

View File

@@ -228,4 +228,4 @@ namespace daggy {
ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z");
return Clock::from_time_t(mktime(&dt));
}
}
}

View File

@@ -1,3 +1,5 @@
#include <magic_enum.hpp>
#include <daggy/Server.hpp>
#include <daggy/Serialization.hpp>
@@ -64,6 +66,12 @@ namespace daggy {
.bind(&Server::handleRunDAG, this)
.produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "Run a DAG");
// List detailed DAG run
dagPath
.route(desc_.get("/:runID"))
.bind(&Server::handleGetDAGRun, this)
.produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "Details of a specific DAG run");
// List all DAG runs
dagPath
@@ -72,12 +80,7 @@ namespace daggy {
.produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "The list of all known DAG Runs");
// List detailed DAG run
dagPath
.route(desc_.get("/{id}"))
.bind(&Server::handleGetDAGRun, this)
.produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "Details of a specific DAG run");
}
/*
@@ -127,7 +130,7 @@ namespace daggy {
auto runID = logger_.startDAGRun(runName, tasks);
auto dag = buildDAGFromTasks(tasks);
auto fut = runnerPool_.addTask(
runnerPool_.addTask(
[this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); });
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
@@ -135,10 +138,99 @@ namespace daggy {
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (!handleAuth(request, response)) return;
auto dagRuns = logger_.getDAGs(0);
std::stringstream ss;
ss << '[';
bool first = true;
for (const auto &run : dagRuns) {
if (first) {
first = false;
} else {
ss << ", ";
}
ss << " {"
<< R"("runID": )" << run.runID << ','
<< R"("name": )" << std::quoted(run.name) << ","
<< R"("startTime": )" << std::quoted(timePointToString(run.startTime)) << ','
<< R"("lastUpdate": )" << std::quoted(timePointToString(run.lastUpdate)) << ','
<< R"("taskCounts": {)";
bool firstState = true;
for (const auto &[state, count] : run.taskStateCounts) {
if (firstState) {
firstState = false;
} else {
ss << ", ";
}
ss << std::quoted(magic_enum::enum_name(state)) << ':' << count;
}
ss << '}' // end of taskCounts
<< '}'; // end of item
}
ss << ']';
response.send(Pistache::Http::Code::Ok, ss.str());
}
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (!handleAuth(request, response)) return;
if (!request.hasParam(":runID")) { REQ_ERROR(Not_Found, "No runID provided in URL"); }
DAGRunID runID = request.param(":runID").as<size_t>();
auto run = logger_.getDAGRun(runID);
bool first = true;
std::stringstream ss;
ss << "{"
<< R"("runID": )" << runID << ','
<< R"("name": )" << std::quoted(run.name) << ','
<< R"("tasks": )" << tasksToJSON(run.tasks) << ',';
// task run states
ss << R"("taskStates": [ )";
first = true;
for (const auto &state : run.taskRunStates) {
if (first) { first = false; } else { ss << ','; }
ss << std::quoted(magic_enum::enum_name(state));
}
ss << "],";
// Attempt records
first = true;
ss << R"("taskAttempts": [ )";
for (const auto &attempts : run.taskAttempts) {
if (first) { first = false; } else { ss << ','; }
ss << '[';
bool firstAttempt = true;
for (const auto &attempt : attempts) {
if (firstAttempt) { firstAttempt = false; } else { ss << ','; }
ss << '{'
<< R"("startTime":)" << std::quoted(timePointToString(attempt.startTime)) << ','
<< R"("stopTime":)" << std::quoted(timePointToString(attempt.stopTime)) << ','
<< R"("rc":)" << attempt.rc << ','
<< R"("outputLog":)" << std::quoted(attempt.outputLog) << ','
<< R"("errorLog":)" << std::quoted(attempt.errorLog) << ','
<< R"("executorLog":)" << std::quoted(attempt.executorLog)
<< '}';
}
ss << ']';
}
ss << "],";
// DAG state changes
first = true;
ss << R"("dagStateChanges": [ )";
for (const auto &change : run.dagStateChanges) {
if (first) { first = false; } else { ss << ','; }
ss << '{'
<< R"("newState": )" << std::quoted(magic_enum::enum_name(change.newState)) << ','
<< R"("time": )" << std::quoted(timePointToString(change.time))
<< '}';
}
ss << "]";
ss << '}';
response.send(Pistache::Http::Code::Ok, ss.str());
}
void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {

View File

@@ -174,6 +174,11 @@ namespace daggy {
}
std::this_thread::sleep_for(250ms);
}
if (dag.allVisited()) {
logger.updateDAGRunState(runID, RunState::COMPLETED);
}
return dag;
}

View File

@@ -160,7 +160,7 @@ namespace daggy {
for (const auto &task : record.tasks) {
auto taskStateFile = runRoot / task.name / "states.csv";
if (!fs::exists(taskStateFile)) {
record.runStates.push_back(RunState::QUEUED);
record.taskRunStates.push_back(RunState::QUEUED);
continue;
}
@@ -169,9 +169,9 @@ namespace daggy {
std::stringstream ss{line};
while (std::getline(ss, token, ',')) { continue; }
RunState taskState = magic_enum::enum_cast<RunState>(token).value();
record.runStates.emplace_back(taskState);
record.taskRunStates.emplace_back(taskState);
ifh.close();
}
return record;
}
}
}

View File

@@ -17,8 +17,7 @@ namespace daggy {
dagRuns_.push_back({
.name = name,
.tasks = tasks,
//.runStates = std::vector<RunState>(tasks.size(), RunState::QUEUED),
.runStates{tasks.size(), RunState::QUEUED},
.taskRunStates{tasks.size(), RunState::QUEUED},
.taskAttempts = std::vector<std::vector<AttemptRecord>>(tasks.size())
});
@@ -56,21 +55,53 @@ namespace daggy {
void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) {
std::lock_guard<std::mutex> lock(guard_);
os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << magic_enum::enum_name(state)
<< std::endl;
const auto &tasks = dagRuns_[dagRunID].tasks;
auto &dagRun = dagRuns_[dagRunID];
const auto &tasks = dagRun.tasks;
auto it = std::find_if(tasks.begin(), tasks.end(),
[&taskName](const Task &a) { return a.name == taskName; });
if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName);
size_t taskID = it - tasks.begin();
dagRuns_[dagRunID].taskStateChanges.push_back({Clock::now(), taskID, state});
dagRun.taskStateChanges.push_back({Clock::now(), taskID, state});
dagRun.taskRunStates[taskID] = state;
os_ << "Task State Change (" << dagRunID << '/' << taskName << " [task_id: " << taskID << "]): "
<< magic_enum::enum_name(state)
<< std::endl;
}
// Querying
std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) { return {}; }
std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) {
std::vector<DAGRunSummary> summaries;
std::lock_guard<std::mutex> lock(guard_);
size_t i = 0;
for (const auto &run : dagRuns_) {
DAGRunSummary summary{
.runID = i,
.name = run.name,
.runState = run.dagStateChanges.back().newState,
.startTime = run.dagStateChanges.front().time,
.lastUpdate = std::max<TimePoint>(run.taskStateChanges.back().time,
run.dagStateChanges.back().time)
};
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return dagRuns_[dagRunID]; }
std::vector<RunState> states(run.tasks.size());
for (const auto &taskUpdate : run.taskStateChanges) {
states[taskUpdate.taskID] = taskUpdate.newState;
}
for (const auto &taskState : states) {
summary.taskStateCounts[taskState]++;
}
summaries.emplace_back(summary);
}
return summaries;
}
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) {
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_[dagRunID];
}
}
}
}

View File

@@ -4,11 +4,14 @@
#include <catch2/catch.hpp>
#include <pistache/client.h>
#include <rapidjson/document.h>
#include "daggy/Server.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace rj = rapidjson;
Pistache::Http::Response
REQUEST(std::string url, std::string payload = "") {
Pistache::Http::Experimental::Client client;
@@ -81,12 +84,80 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
"parents": [ "touch" ]
}
]
})";
})";
// Submit, and get the runID
daggy::DAGRunID runID = 0;
{
auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun);
REQUIRE(response.code() == Pistache::Http::Code::Ok);
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(response.body().c_str());
REQUIRE(parseResult);
REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("runID"));
runID = doc["runID"].GetUint64();
}
// Ensure our runID shows up in the list of running DAGs
{
auto response = REQUEST(baseURL + "/v1/dagrun/");
REQUIRE(response.code() == Pistache::Http::Code::Ok);
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(response.body().c_str());
REQUIRE(parseResult);
REQUIRE(doc.IsArray());
REQUIRE(doc.Size() >= 1);
// Ensure that our DAG is in the list and matches our given DAGRunID
bool found = false;
const auto &runs = doc.GetArray();
for (size_t i = 0; i < runs.Size(); ++i) {
const auto &run = runs[i];
REQUIRE(run.IsObject());
REQUIRE(run.HasMember("name"));
REQUIRE(run.HasMember("runID"));
std::string runName = run["name"].GetString();
if (runName == "unit_server") {
REQUIRE(run["runID"].GetUint64() == runID);
found = true;
break;
}
}
REQUIRE(found);
}
// Wait until our DAG is complete
bool complete = false;
while (!complete) {
complete = true;
auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID));
REQUIRE(response.code() == Pistache::Http::Code::Ok);
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(response.body().c_str());
REQUIRE(parseResult);
REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("taskStates"));
const auto &taskStates = doc["taskStates"].GetArray();
REQUIRE(taskStates.Size() == 3);
for (size_t i = 0; i < taskStates.Size(); ++i) {
std::string state = taskStates[i].GetString();
if (state != "COMPLETED") {
complete = false;
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun);
REQUIRE(response.code() == Pistache::Http::Code::Ok);
std::this_thread::sleep_for(std::chrono::seconds(2));
for (const auto &pth : std::vector<fs::path>{"dagrun_A", "dagrun_B"}) {
REQUIRE(fs::exists(pth));
fs::remove(pth);
@@ -94,4 +165,4 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
}
server.shutdown();
}
}