- Adding task serialization

- Renaming DAGLogger to DAGRunLogger
- Adding more functionality to FileSystemLogger
This commit is contained in:
Ian Roddis
2021-08-10 11:44:56 -03:00
parent 1af9ba2124
commit 621467dd5a
11 changed files with 89 additions and 27 deletions

View File

@@ -22,4 +22,8 @@ namespace daggy {
std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {}); std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {});
std::vector<Task> tasksFromJSON(const rj::Document &spec, const ParameterValues &parameters = {}); std::vector<Task> tasksFromJSON(const rj::Document &spec, const ParameterValues &parameters = {});
std::string taskToJSON(const Task &task);
std::string tasksToJSON(const std::vector<Task> &tasks);
} }

View File

@@ -8,7 +8,7 @@ namespace daggy {
struct Task { struct Task {
std::string name; std::string name;
std::vector<std::string> command; std::vector<std::string> command;
uint8_t maxRetries; uint32_t maxRetries;
uint32_t retryIntervalSeconds; // Time to wait between retries uint32_t retryIntervalSeconds; // Time to wait between retries
std::vector<std::string> children; std::vector<std::string> children;
}; };

View File

@@ -7,7 +7,7 @@
#include <rapidjson/document.h> #include <rapidjson/document.h>
#include "daggy/loggers/dag_run/DAGLogger.hpp" #include "daggy/loggers/dag_run/DAGRunLogger.hpp"
#include "daggy/executors/task/TaskExecutor.hpp" #include "daggy/executors/task/TaskExecutor.hpp"
#include "Task.hpp" #include "Task.hpp"
#include "Defines.hpp" #include "Defines.hpp"
@@ -24,12 +24,12 @@ namespace daggy {
TaskID taskID, TaskID taskID,
const Task &task, const Task &task,
executors::task::TaskExecutor &executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLogger &logger); loggers::dag_run::DAGRunLogger &logger);
void runDAG(DAGRunID runID, void runDAG(DAGRunID runID,
std::vector<Task> tasks, std::vector<Task> tasks,
executors::task::TaskExecutor &executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLogger &logger, loggers::dag_run::DAGRunLogger &logger,
DAG dag); DAG dag);
} }

View File

@@ -8,7 +8,7 @@
#include "Defines.hpp" #include "Defines.hpp"
/* /*
DAGLogger represents the interface to store all the state information DAGRunLogger represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to for daggy to run. Abstracted in case other back-end solutions need to
be supported. be supported.
*/ */
@@ -16,21 +16,21 @@
namespace daggy { namespace daggy {
namespace loggers { namespace loggers {
namespace dag_run { namespace dag_run {
class DAGLogger { class DAGRunLogger {
public: public:
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) = 0; virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0; virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0; virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) = 0;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0; virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0;
}; };
} }
} }

View File

@@ -5,7 +5,7 @@
#include <mutex> #include <mutex>
#include <rapidjson/document.h> #include <rapidjson/document.h>
#include "DAGLogger.hpp" #include "DAGRunLogger.hpp"
namespace fs = std::filesystem; namespace fs = std::filesystem;
namespace rj = rapidjson; namespace rj = rapidjson;
@@ -34,23 +34,23 @@ namespace daggy {
* stderr * stderr
* execlog * execlog
*/ */
class FileSystemLogger : public DAGLogger { class FileSystemLogger : public DAGRunLogger {
public: public:
FileSystemLogger(fs::path root); FileSystemLogger(fs::path root);
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override; virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override; virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); virtual DAGRunRecord getDAGRun(DAGRunID dagRunID);
private: private:
fs::path root_; fs::path root_;

View File

@@ -3,7 +3,7 @@
#include <iostream> #include <iostream>
#include <mutex> #include <mutex>
#include "DAGLogger.hpp" #include "DAGRunLogger.hpp"
namespace daggy { namespace daggy {
namespace loggers { namespace loggers {
@@ -12,23 +12,23 @@ namespace daggy {
* This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff * This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff
* to stdout. * to stdout.
*/ */
class OStreamLogger : public DAGLogger { class OStreamLogger : public DAGRunLogger {
public: public:
OStreamLogger(std::ostream &os); OStreamLogger(std::ostream &os);
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override; virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override; virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); virtual DAGRunRecord getDAGRun(DAGRunID dagRunID);
private: private:
DAGRunID nextRunID_; DAGRunID nextRunID_;

View File

@@ -1,3 +1,6 @@
#include <sstream>
#include <iomanip>
#include <daggy/Serialization.hpp> #include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp> #include <daggy/Utilities.hpp>
@@ -142,4 +145,46 @@ namespace daggy {
return tasks; return tasks;
} }
// I really want to do this with rapidjson, but damn they make it ugly and difficult.
// So we'll shortcut and generate the JSON directly.
std::string taskToJSON(const Task &task) {
std::stringstream ss;
ss << "{"
<< R"("name": )" << std::quoted(task.name) << ','
<< R"("maxRetries": )" << task.maxRetries << ','
<< R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ',';
// Commands
ss << R"("command": [)";
for (auto it = task.command.begin(); it != task.command.end(); ++it) {
ss << std::quoted(*it);
if (it != task.command.end() - 1) ss << ", ";
}
ss << "],";
ss << R"("children": [)";
for (auto it = task.children.begin(); it != task.children.end(); ++it) {
ss << std::quoted(*it);
if (it != task.children.end() - 1) ss << ", ";
}
ss << "]";
ss << '}';
return ss.str();
}
std::string tasksToJSON(const std::vector<Task> &tasks) {
std::stringstream ss;
ss << "[";
for (auto it = tasks.begin(); it != tasks.end(); ++it) {
ss << taskToJSON(*it);
if (it != tasks.end() - 1) ss << ", ";
}
ss << "]";
return ss.str();
}
} }

View File

@@ -53,7 +53,7 @@ namespace daggy {
TaskID taskID, TaskID taskID,
const Task &task, const Task &task,
executors::task::TaskExecutor &executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLogger &logger) { loggers::dag_run::DAGRunLogger &logger) {
std::vector<AttemptRecord> attempts; std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING); logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING);
@@ -69,7 +69,7 @@ namespace daggy {
void runDAG(DAGRunID runID, void runDAG(DAGRunID runID,
std::vector<Task> tasks, std::vector<Task> tasks,
executors::task::TaskExecutor &executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLogger &logger, loggers::dag_run::DAGRunLogger &logger,
DAG dag) { DAG dag) {
logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING);

View File

@@ -17,14 +17,13 @@ namespace daggy {
: root_(root), nextRunID_(0) { : root_(root), nextRunID_(0) {
const std::vector<fs::path> reqPaths{root_, getCurrentPath(), getRunsRoot()}; const std::vector<fs::path> reqPaths{root_, getCurrentPath(), getRunsRoot()};
for (const auto &path : reqPaths) { for (const auto &path : reqPaths) {
if (!fs::exists(path)) { fs::create_directory(path); } if (!fs::exists(path)) { fs::create_directories(path); }
} }
// Get the next run ID // Get the next run ID
size_t runID = 0;
for (auto &dir : fs::directory_iterator(getRunsRoot())) { for (auto &dir : fs::directory_iterator(getRunsRoot())) {
try { try {
runID = std::stoull(dir.path().stem()); size_t runID = std::stoull(dir.path().stem());
if (runID > nextRunID_) nextRunID_ = runID + 1; if (runID > nextRunID_) nextRunID_ = runID + 1;
} catch (std::exception &e) { } catch (std::exception &e) {
continue; continue;
@@ -41,16 +40,20 @@ namespace daggy {
// std::lock_guard<std::mutex> guard(runLocks[runDir]); // std::lock_guard<std::mutex> guard(runLocks[runDir]);
// Init the directory // Init the directory
fs::path runRoot = getRunsRoot() / std::to_string(runID);
fs::create_directories(runRoot);
// Create meta.json with DAGRun Name and task definitions
} }
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {} void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {}
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {} void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {}
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) {} void FileSystemLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {}
// Querying // Querying
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) {} std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) {}
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {} DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) {}
} }

View File

@@ -37,7 +37,7 @@ namespace daggy {
// Querying // Querying
std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) { return {}; } std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) { return {}; }
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunId) { return {}; } DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return {}; }
} }
} }
} }

View File

@@ -49,4 +49,14 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") {
auto tasks = daggy::tasksFromJSON(testTasks, params); auto tasks = daggy::tasksFromJSON(testTasks, params);
REQUIRE(tasks.size() == 4); REQUIRE(tasks.size() == 4);
} }
}
TEST_CASE("Task Serialization", "[serialize_tasks]") {
SECTION("Build with no expansion") {
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
auto tasks = daggy::tasksFromJSON(testTasks);
auto genJSON = daggy::tasksToJSON(tasks);
std::cout << genJSON << std::endl;
REQUIRE_NOTHROW(daggy::tasksFromJSON(genJSON));
}
} }