diff --git a/CMakeLists.txt b/CMakeLists.txt index fcd049b..a77a216 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,8 +10,10 @@ find_package (Threads REQUIRED) include(cmake/rapidjson.cmake) include(cmake/Pistache.cmake) +include(cmake/MagicEnum.cmake) include_directories(${RAPIDJSON_INCLUDE_DIR}) +include_directories(${MAGIC_ENUM_INCLUDE_DIR}) add_subdirectory(daggy) add_subdirectory(tests) diff --git a/cmake/MagicEnum.cmake b/cmake/MagicEnum.cmake new file mode 100644 index 0000000..d63a488 --- /dev/null +++ b/cmake/MagicEnum.cmake @@ -0,0 +1,17 @@ +include(ExternalProject) +# Download RapidJSON +ExternalProject_Add( + magic-enum + PREFIX "third_party/magic-enum" + GIT_REPOSITORY "https://github.com/Neargye/magic_enum" + GIT_TAG "v0.7.3" + TIMEOUT 10 + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + UPDATE_COMMAND "" +) + +# Magic Enums is a header-only +ExternalProject_Get_Property(magic-enum source_dir) +set(MAGIC_ENUM_INCLUDE_DIR ${source_dir}/include) \ No newline at end of file diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index 74e51e2..ddee947 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -7,4 +7,4 @@ add_library(${PROJECT_NAME} STATIC ${SOURCES}) include_directories(${PISTACHE_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} pistache pthread) -add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson) \ No newline at end of file +add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson magic-enum) \ No newline at end of file diff --git a/daggy/include/daggy/AttemptRecord.hpp b/daggy/include/daggy/AttemptRecord.hpp index 68acb3d..54ee29b 100644 --- a/daggy/include/daggy/AttemptRecord.hpp +++ b/daggy/include/daggy/AttemptRecord.hpp @@ -10,7 +10,7 @@ namespace daggy { TimePoint startTime; TimePoint stopTime; int rc; // RC from the task - std::string metaLog; // Logs from the executor + std::string metaLog; // Logs from the dag_executor std::string output; // stdout from command std::string error; // stderr from command }; diff --git a/daggy/include/daggy/DAGLogger.hpp b/daggy/include/daggy/DAGLogger.hpp deleted file mode 100644 index e67ce72..0000000 --- a/daggy/include/daggy/DAGLogger.hpp +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include - -#include "DAGRun.hpp" - -/* - DAGLogger represents the interface to store all the state information - for daggy to run. Abstracted in case other back-end solutions need to - be supported. -*/ - -namespace daggy { - using DAGDefID = int16_t; - using DAGRunID = size_t; - - enum class RunState : uint32_t { - QUEUED = 0, - RUNNING = 1, - ERRORED = 1 << 1, - KILLED = 1 << 2, - COMPLETED = 1 << 3 - }; - - struct TaskUpdateRecord { - TimePoint time; - size_t taskID; - RunState newState; - }; - - struct DAGUpdateRecord { - TimePoint time; - RunState newState; - }; - - // Pretty heavy weight, but - struct DAGRunRecord { - std::string name; - std::vector tasks; - std::vector runStates; - std::vector> taskAttempts; - std::vector taskStateChanges; - std::vector dagStateChanges; - }; - - struct DAGRunSummary { - DAGRunID runID; - std::string name; - RunState runState; - TimePoint startTime; - TimePoint lastUpdate; - std::unordered_map taskStates; - }; - - class DAGLogger { - public: - // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) = 0; - virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; - virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0; - virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0; - virtual void updateTaskState(DAGRunID dagRunId, RunState state) = 0; - - // Querying - virtual std::vector getDAGs(uint32_t stateMask) = 0; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; - }; -} diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index b180aef..be1c5dd 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -19,4 +19,5 @@ namespace daggy { // DAG Runs using DAGDefID = int16_t; using DAGRunID = size_t; + using TaskID = size_t; } diff --git a/daggy/include/daggy/TaskExecutor.hpp b/daggy/include/daggy/TaskExecutor.hpp deleted file mode 100644 index e817827..0000000 --- a/daggy/include/daggy/TaskExecutor.hpp +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include "Task.hpp" -#include "AttemptRecord.hpp" -#include "ThreadPool.hpp" - -/* - Executors run Tasks, returning a future with the results. - If there are many retries, logs are returned for each attempt. -*/ - -namespace daggy { - class TaskExecutor { - public: - TaskExecutor(size_t nThreads) : threadPool(nThreads) {}; - - virtual const std::string getName() const = 0; - - // This will block if the executor is full - virtual AttemptRecord runCommand(std::vector cmd) = 0; - - ThreadPool threadPool; - }; -} diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index ebe515d..fbc24b9 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -7,19 +7,29 @@ #include -#include "DAGLogger.hpp" -#include "TaskExecutor.hpp" +#include "daggy/loggers/dag_run/DAGLoggerBase.hpp" +#include "daggy/executors/task/TaskExecutor.hpp" #include "Task.hpp" #include "Defines.hpp" +#include "DAG.hpp" namespace daggy { std::vector expandCommands(const std::vector & command, const ParameterValues & parameters); + DAG buildDAGFromTasks(const std::vector & tasks); + // Blocking call + std::vector + runTask(DAGRunID runID, + TaskID taskID, + const Task & task, + executors::task::TaskExecutor & executor, + loggers::dag_run::DAGLoggerBase & logger); + void runDAG(DAGRunID runID, std::vector tasks, - TaskExecutor & executor, - DAGLogger & logger, + executors::task::TaskExecutor & executor, + loggers::dag_run::DAGLoggerBase & logger, DAG dag); } diff --git a/daggy/include/daggy/dagloggers/FileSystemLogger.hpp b/daggy/include/daggy/dagloggers/FileSystemLogger.hpp deleted file mode 100644 index a166d7e..0000000 --- a/daggy/include/daggy/dagloggers/FileSystemLogger.hpp +++ /dev/null @@ -1,59 +0,0 @@ -#pragma once - -#include - -#include -#include "../DAGLogger.hpp" - -namespace fs = std::filesystem; -namespace rj = rapidjson; - -namespace daggy { - /* - * This logger should only be used for debug purposes. It's not really optimized for querying, and will - * use a ton of inodes to track state. - * - * On the plus side, it's trivial to look at without using the API. - * - * Filesystem logger creates the following structure: - * {root}/ - * current/ - * {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups - * runs/ - * {runID}/ - * meta.json --- Contains the DAG name, task definitions - * {taskID}/ - * states --- State changes - * {attempt}/ - * meta.json --- timestamps and rc - * stdout - * stderr - * execlog - */ - class FileSystemLogger : DAGLogger { - public: - FileSystemLogger(fs::path root); - - // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) override; - virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; - virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) override; - virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) override; - virtual void updateTaskState(DAGRunID dagRunId, RunState state) override; - - // Querying - virtual std::vector getDAGs(uint32_t stateMask) override; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); - - private: - fs::path root_; - std::atomic nextRunID_; - std::mutex lock_; - - std::unordered_map runLocks; - - inline const fs::path getCurrentPath() const; - inline const fs::path getRunsRoot() const; - inline const fs::path getRunRoot(DAGRunID runID) const; - }; -} diff --git a/daggy/include/daggy/executors/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/ForkingTaskExecutor.hpp deleted file mode 100644 index 9283c23..0000000 --- a/daggy/include/daggy/executors/ForkingTaskExecutor.hpp +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include "../TaskExecutor.hpp" - -namespace daggy { - namespace executor { - class ForkingTaskExecutor : public TaskExecutor { - public: - ForkingTaskExecutor(size_t nThreads) - : TaskExecutor(nThreads) - {} - - const std::string getName() const override { return "ForkingTaskExecutor"; } - - AttemptRecord runCommand(std::vector cmd) override; - }; - } -} diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp new file mode 100644 index 0000000..d9fdfc8 --- /dev/null +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include "TaskExecutor.hpp" + +namespace daggy { + namespace executors { + namespace task { + class ForkingTaskExecutor : public TaskExecutor { + public: + ForkingTaskExecutor(size_t nThreads) + : TaskExecutor(nThreads) + {} + + const std::string getName() const override { return "ForkingTaskExecutor"; } + + AttemptRecord runCommand(std::vector cmd) override; + }; + } + } +} \ No newline at end of file diff --git a/daggy/include/daggy/executors/task/TaskExecutor.hpp b/daggy/include/daggy/executors/task/TaskExecutor.hpp new file mode 100644 index 0000000..ecfad02 --- /dev/null +++ b/daggy/include/daggy/executors/task/TaskExecutor.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "daggy/Task.hpp" +#include "daggy/AttemptRecord.hpp" +#include "daggy/ThreadPool.hpp" + +/* + Executors run Tasks, returning a future with the results. + If there are many retries, logs are returned for each attempt. +*/ + +namespace daggy { + namespace executors { + namespace task { + class TaskExecutor { + public: + TaskExecutor(size_t nThreads) : threadPool(nThreads) {}; + + virtual const std::string getName() const = 0; + + // This will block if the dag_executor is full + virtual AttemptRecord runCommand(std::vector cmd) = 0; + + ThreadPool threadPool; + }; + } + } +} diff --git a/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp b/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp new file mode 100644 index 0000000..5f233fb --- /dev/null +++ b/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include + +#include "daggy/DAGRun.hpp" + +/* + DAGLoggerBase represents the interface to store all the state information + for daggy to run. Abstracted in case other back-end solutions need to + be supported. +*/ + +namespace daggy { + namespace loggers { + namespace dag_run { + enum class RunState : uint32_t { + QUEUED = 0, + RUNNING = 1, + RETRY = 1 << 1, + ERRORED = 1 << 2, + KILLED = 1 << 3, + COMPLETED = 1 << 4 + }; + + struct TaskUpdateRecord { + TimePoint time; + TaskID taskID; + RunState newState; + }; + + struct DAGUpdateRecord { + TimePoint time; + RunState newState; + }; + + // Pretty heavy weight, but + struct DAGRunRecord { + std::string name; + std::vector tasks; + std::vector runStates; + std::vector> taskAttempts; + std::vector taskStateChanges; + std::vector dagStateChanges; + }; + + struct DAGRunSummary { + DAGRunID runID; + std::string name; + RunState runState; + TimePoint startTime; + TimePoint lastUpdate; + std::unordered_map taskStateCounts; + }; + + class DAGLoggerBase { + public: + // Execution + virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) = 0; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; + virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) = 0; + virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0; + + // Querying + virtual std::vector getDAGs(uint32_t stateMask) = 0; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; + }; + } + } +} diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp new file mode 100644 index 0000000..7d4c5f6 --- /dev/null +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -0,0 +1,64 @@ +#pragma once + +#include +#include +#include + +#include +#include "DAGLoggerBase.hpp" + +namespace fs = std::filesystem; +namespace rj = rapidjson; + +namespace daggy { + namespace loggers { + namespace dag_run { + /* + * This logger should only be used for debug purposes. It's not really optimized for querying, and will + * use a ton of inodes to track state. + * + * On the plus side, it's trivial to look at without using the API. + * + * Filesystem logger creates the following structure: + * {root}/ + * current/ + * {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups + * runs/ + * {runID}/ + * meta.json --- Contains the DAG name, task definitions + * {taskID}/ + * states --- State changes + * {attempt}/ + * meta.json --- timestamps and rc + * stdout + * stderr + * execlog + */ + class FileSystemLogger : DAGLoggerBase { + public: + FileSystemLogger(fs::path root); + + // Execution + virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) override; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; + virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override; + virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; + + // Querying + virtual std::vector getDAGs(uint32_t stateMask) override; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); + + private: + fs::path root_; + std::atomic nextRunID_; + std::mutex lock_; + + // std::unordered_map runLocks; + + inline const fs::path getCurrentPath() const; + inline const fs::path getRunsRoot() const; + inline const fs::path getRunRoot(DAGRunID runID) const; + }; + } + } +} diff --git a/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp b/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp new file mode 100644 index 0000000..b25b3c2 --- /dev/null +++ b/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +#include "DAGLoggerBase.hpp" + +namespace daggy { + namespace loggers { + namespace dag_run { + /* + * This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff + * to stdout. + */ + class StdOutLogger : public DAGLoggerBase { + public: + StdOutLogger(); + + // Execution + virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) override; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; + virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override; + virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; + + // Querying + virtual std::vector getDAGs(uint32_t stateMask) override; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); + + private: + DAGRunID nextRunID_; + std::mutex guard_; + }; + } + } +} \ No newline at end of file diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index d31d73b..cfc5a29 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -30,11 +30,49 @@ namespace daggy { return commands; } + DAG buildDAGFromTasks(const std::vector & tasks) { + DAG dag; + std::unordered_map taskIDs; + + // Add all the vertices + for (const auto &task : tasks) { + taskIDs[task.name] = dag.addVertex(); + } + + // Add edges + for (size_t i = 0; i < tasks.size(); ++i) { + for (const auto &c : tasks[i].children) { + dag.addEdge(i, taskIDs[c]); + } + } + dag.reset(); + return dag; + } + + std::vector runTask(DAGRunID runID, + TaskID taskID, + const Task & task, + executors::task::TaskExecutor & executor, + loggers::dag_run::DAGLoggerBase & logger) + { + std::vector attempts; + logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING ); + + while (attempts.size() < task.maxRetries + 1) { + attempts.push_back(executor.runCommand(task.command)); + logger.logTaskAttempt(runID, taskID, attempts.back()); + if (attempts.back().rc == 0) break; + logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY ); + } + return attempts; + } + void runDAG(DAGRunID runID, std::vector tasks, - TaskExecutor & executor, - DAGLogger & logger, + executors::task::TaskExecutor & executor, + loggers::dag_run::DAGLoggerBase & logger, DAG dag) { + logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); struct TaskState { size_t tid; @@ -51,10 +89,17 @@ namespace daggy { if (taskState.fut.valid()) { auto attemptRecords = taskState.fut.get(); - if (attemptRecords.back().rc == 0) { - dag.completeVisit(taskState.tid); + if (attemptRecords.empty()) { + logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED ); + continue; + } + if (attemptRecords.back().rc == 0) { + logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED ); + dag.completeVisit(taskState.tid); + taskState.complete = true; + } else { + logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED ); } - taskState.complete = true; } } @@ -66,21 +111,11 @@ namespace daggy { auto tid = t.value(); TaskState tsk{ .tid = tid, - .fut = tq->addTask( - [tid, &tasks, &executor]() { - std::vector attempts; - - while (attempts.size() < tasks[tid].maxRetries) { - attempts.push_back(executor.runCommand(tasks[tid].command)); - if (attempts.back().rc == 0) break; - } - return attempts; - }) - , .complete = false + .fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() {return runTask(runID, tid, tasks[tid], executor, logger);}), + .complete = false }; taskStates.push_back(std::move(tsk)); - // auto nextTask = dag.visitNext(); if (not nextTask.has_value()) break; t.emplace(nextTask.value()); diff --git a/daggy/src/dagloggers/FileSystemLogger.cpp b/daggy/src/dagloggers/FileSystemLogger.cpp index d37315d..f19f7db 100644 --- a/daggy/src/dagloggers/FileSystemLogger.cpp +++ b/daggy/src/dagloggers/FileSystemLogger.cpp @@ -1,28 +1,32 @@ -#include +#include namespace fs = std::filesystem; +using namespace daggy::loggers::dag_run; + namespace daggy { inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; } inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; } - inline const fs::path getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); } + inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); } - FileSystemLogger(fs::path root) + FileSystemLogger::FileSystemLogger(fs::path root) : root_(root) , nextRunID_(0) { - const std::vector reqPaths{ root_, getCurrentPath(), getRunsRoot()}; + const std::vector reqPaths{ root_, getCurrentPath(), getRunsRoot()}; for (const auto & path : reqPaths) { if (! fs::exists(path)) { fs::create_directory(path); } } // Get the next run ID size_t runID = 0; - for (auto & dir : fs::std::filesystem::directory_iterator(getRunsRoot())) { + for (auto & dir : fs::directory_iterator(getRunsRoot())) { try { - runID = std::stoull(dir.stem()); + runID = std::stoull(dir.path().stem()); if (runID > nextRunID_) nextRunID_ = runID + 1; - } catch {} + } catch (std::exception & e) { + continue; + } } } @@ -32,14 +36,13 @@ namespace daggy { // TODO make this threadsafe fs::path runDir = getRunRoot(runID); - std::lock_guard guard(runLocks[runDir]); + // std::lock_guard guard(runLocks[runDir]); // Init the directory } - void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ } + void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ } void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ } - void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ } - void FileSystemLogger::updateTaskState(DAGRunID dagRunId, RunState state){ } + void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state){ } // Querying std::vector FileSystemLogger::getDAGs(uint32_t stateMask){ } diff --git a/daggy/src/dagloggers/StdOutLogger.cpp b/daggy/src/dagloggers/StdOutLogger.cpp new file mode 100644 index 0000000..3701de3 --- /dev/null +++ b/daggy/src/dagloggers/StdOutLogger.cpp @@ -0,0 +1,39 @@ +#include + +#include + +namespace daggy { + namespace loggers { + namespace dag_run { + StdOutLogger::StdOutLogger() : nextRunID_(0) { } + + // Execution + DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector & tasks) { + std::lock_guard lock(guard_); + size_t runID = nextRunID_++; + std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; + return runID; + } + + void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ + std::lock_guard lock(guard_); + std::cout << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl; + } + + void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord & attempt){ + std::lock_guard lock(guard_); + const std::string & msg = attempt.rc == 0 ? attempt.output : attempt.error; + std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": " << msg << std::endl; + } + + void StdOutLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) { + std::lock_guard lock(guard_); + std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state) << std::endl; + } + + // Querying + std::vector StdOutLogger::getDAGs(uint32_t stateMask){ return {}; } + DAGRunRecord StdOutLogger::getDAGRun(DAGRunID dagRunId) { return {}; } + } + } +} diff --git a/daggy/src/executors/ForkingTaskExecutor.cpp b/daggy/src/executors/ForkingTaskExecutor.cpp index 9d612d2..2fde945 100644 --- a/daggy/src/executors/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/ForkingTaskExecutor.cpp @@ -1,14 +1,11 @@ -#include - -#include -#include +#include #include #include #include #include -using namespace daggy::executor; +using namespace daggy::executors::task; std::string slurp(int fd) { std::string result; diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index acf4e33..7ddcd7e 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -1,12 +1,12 @@ #include #include -#include "daggy/executors/ForkingTaskExecutor.hpp" +#include "daggy/executors/task/ForkingTaskExecutor.hpp" #include "catch.hpp" TEST_CASE("Basic Execution", "[forking_executor]") { - daggy::executor::ForkingTaskExecutor ex(10); + daggy::executors::task::ForkingTaskExecutor ex(10); SECTION("Simple Run") { std::vector cmd{"/usr/bin/echo", "abc", "123"}; diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index b7fc424..9948ea4 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -6,9 +6,11 @@ #include "daggy/Utilities.hpp" #include "daggy/Serialization.hpp" +#include "daggy/executors/task/ForkingTaskExecutor.hpp" +#include "daggy/loggers/dag_run/StdOutLogger.hpp" TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") { - SECTION("Basic Parse") { + SECTION("Basic expansion") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"}; auto params = daggy::parametersFromJSON(testParams); std::vector cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"}; @@ -26,4 +28,16 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") { // TYPE isn't used, so it's just |DATE| * |SOURCE| REQUIRE(allCommands.size() == 2); } +} + +TEST_CASE("DAG Runner", "[utilities_dag_runner]") { + daggy::executors::task::ForkingTaskExecutor ex(10); + daggy::loggers::dag_run::StdOutLogger logger; + + std::string taskJSON = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; + auto tasks = daggy::tasksFromJSON(taskJSON); + auto dag = daggy::buildDAGFromTasks(tasks); + + auto runID = logger.startDAGRun("test_run", tasks); + daggy::runDAG(runID, tasks, ex, logger, dag); } \ No newline at end of file