diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 588cd53..f75f6f9 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -22,4 +22,8 @@ namespace daggy { std::vector tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters = {}); std::vector tasksFromJSON(const rj::Document &spec, const ParameterValues ¶meters = {}); + + std::string taskToJSON(const Task &task); + + std::string tasksToJSON(const std::vector &tasks); } diff --git a/daggy/include/daggy/Task.hpp b/daggy/include/daggy/Task.hpp index e012cac..da6323b 100644 --- a/daggy/include/daggy/Task.hpp +++ b/daggy/include/daggy/Task.hpp @@ -8,7 +8,7 @@ namespace daggy { struct Task { std::string name; std::vector command; - uint8_t maxRetries; + uint32_t maxRetries; uint32_t retryIntervalSeconds; // Time to wait between retries std::vector children; }; diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 0acd89b..4800319 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -7,7 +7,7 @@ #include -#include "daggy/loggers/dag_run/DAGLogger.hpp" +#include "daggy/loggers/dag_run/DAGRunLogger.hpp" #include "daggy/executors/task/TaskExecutor.hpp" #include "Task.hpp" #include "Defines.hpp" @@ -24,12 +24,12 @@ namespace daggy { TaskID taskID, const Task &task, executors::task::TaskExecutor &executor, - loggers::dag_run::DAGLogger &logger); + loggers::dag_run::DAGRunLogger &logger); void runDAG(DAGRunID runID, std::vector tasks, executors::task::TaskExecutor &executor, - loggers::dag_run::DAGLogger &logger, + loggers::dag_run::DAGRunLogger &logger, DAG dag); } diff --git a/daggy/include/daggy/loggers/dag_run/DAGLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp similarity index 72% rename from daggy/include/daggy/loggers/dag_run/DAGLogger.hpp rename to daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index 2029bbc..1b6c4c3 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -8,7 +8,7 @@ #include "Defines.hpp" /* - DAGLogger represents the interface to store all the state information + DAGRunLogger represents the interface to store all the state information for daggy to run. Abstracted in case other back-end solutions need to be supported. */ @@ -16,21 +16,21 @@ namespace daggy { namespace loggers { namespace dag_run { - class DAGLogger { + class DAGRunLogger { public: // Execution virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) = 0; - virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; + virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0; virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0; - virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0; + virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) = 0; // Querying virtual std::vector getDAGs(uint32_t stateMask) = 0; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0; }; } } diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp index 955b821..815ee82 100644 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -5,7 +5,7 @@ #include #include -#include "DAGLogger.hpp" +#include "DAGRunLogger.hpp" namespace fs = std::filesystem; namespace rj = rapidjson; @@ -34,23 +34,23 @@ namespace daggy { * stderr * execlog */ - class FileSystemLogger : public DAGLogger { + class FileSystemLogger : public DAGRunLogger { public: FileSystemLogger(fs::path root); // Execution virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; - virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; + virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override; virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; - virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; + virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override; // Querying virtual std::vector getDAGs(uint32_t stateMask) override; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); + virtual DAGRunRecord getDAGRun(DAGRunID dagRunID); private: fs::path root_; diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 894e8d4..3e047db 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -3,7 +3,7 @@ #include #include -#include "DAGLogger.hpp" +#include "DAGRunLogger.hpp" namespace daggy { namespace loggers { @@ -12,23 +12,23 @@ namespace daggy { * This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff * to stdout. */ - class OStreamLogger : public DAGLogger { + class OStreamLogger : public DAGRunLogger { public: OStreamLogger(std::ostream &os); // Execution virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; - virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; + virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override; virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; - virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; + virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override; // Querying virtual std::vector getDAGs(uint32_t stateMask) override; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); + virtual DAGRunRecord getDAGRun(DAGRunID dagRunID); private: DAGRunID nextRunID_; diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 5732bc1..6b3a0d1 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -1,3 +1,6 @@ +#include +#include + #include #include @@ -142,4 +145,46 @@ namespace daggy { return tasks; } + + // I really want to do this with rapidjson, but damn they make it ugly and difficult. + // So we'll shortcut and generate the JSON directly. + std::string taskToJSON(const Task &task) { + std::stringstream ss; + ss << "{" + << R"("name": )" << std::quoted(task.name) << ',' + << R"("maxRetries": )" << task.maxRetries << ',' + << R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ','; + + // Commands + ss << R"("command": [)"; + for (auto it = task.command.begin(); it != task.command.end(); ++it) { + ss << std::quoted(*it); + if (it != task.command.end() - 1) ss << ", "; + } + ss << "],"; + + ss << R"("children": [)"; + for (auto it = task.children.begin(); it != task.children.end(); ++it) { + ss << std::quoted(*it); + if (it != task.children.end() - 1) ss << ", "; + } + ss << "]"; + + ss << '}'; + return ss.str(); + } + + std::string tasksToJSON(const std::vector &tasks) { + std::stringstream ss; + + ss << "["; + + for (auto it = tasks.begin(); it != tasks.end(); ++it) { + ss << taskToJSON(*it); + if (it != tasks.end() - 1) ss << ", "; + } + ss << "]"; + + return ss.str(); + } } \ No newline at end of file diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 72ffb4c..e137c2c 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -53,7 +53,7 @@ namespace daggy { TaskID taskID, const Task &task, executors::task::TaskExecutor &executor, - loggers::dag_run::DAGLogger &logger) { + loggers::dag_run::DAGRunLogger &logger) { std::vector attempts; logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING); @@ -69,7 +69,7 @@ namespace daggy { void runDAG(DAGRunID runID, std::vector tasks, executors::task::TaskExecutor &executor, - loggers::dag_run::DAGLogger &logger, + loggers::dag_run::DAGRunLogger &logger, DAG dag) { logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index 56c8540..9ee4065 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -17,14 +17,13 @@ namespace daggy { : root_(root), nextRunID_(0) { const std::vector reqPaths{root_, getCurrentPath(), getRunsRoot()}; for (const auto &path : reqPaths) { - if (!fs::exists(path)) { fs::create_directory(path); } + if (!fs::exists(path)) { fs::create_directories(path); } } // Get the next run ID - size_t runID = 0; for (auto &dir : fs::directory_iterator(getRunsRoot())) { try { - runID = std::stoull(dir.path().stem()); + size_t runID = std::stoull(dir.path().stem()); if (runID > nextRunID_) nextRunID_ = runID + 1; } catch (std::exception &e) { continue; @@ -41,16 +40,20 @@ namespace daggy { // std::lock_guard guard(runLocks[runDir]); // Init the directory + fs::path runRoot = getRunsRoot() / std::to_string(runID); + fs::create_directories(runRoot); + + // Create meta.json with DAGRun Name and task definitions } void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {} void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {} - void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) {} + void FileSystemLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {} // Querying std::vector FileSystemLogger::getDAGs(uint32_t stateMask) {} - DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {} + DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) {} } diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index 9edd0c1..0f537cd 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -37,7 +37,7 @@ namespace daggy { // Querying std::vector OStreamLogger::getDAGs(uint32_t stateMask) { return {}; } - DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunId) { return {}; } + DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return {}; } } } } diff --git a/tests/unit_serialization.cpp b/tests/unit_serialization.cpp index 4338043..c9bf47a 100644 --- a/tests/unit_serialization.cpp +++ b/tests/unit_serialization.cpp @@ -49,4 +49,14 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") { auto tasks = daggy::tasksFromJSON(testTasks, params); REQUIRE(tasks.size() == 4); } +} + +TEST_CASE("Task Serialization", "[serialize_tasks]") { + SECTION("Build with no expansion") { + std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; + auto tasks = daggy::tasksFromJSON(testTasks); + auto genJSON = daggy::tasksToJSON(tasks); + std::cout << genJSON << std::endl; + REQUIRE_NOTHROW(daggy::tasksFromJSON(genJSON)); + } } \ No newline at end of file