From 1a467428a2efaef744592f5df1d0f0ef5ba7c8d7 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Sun, 8 Aug 2021 12:22:39 -0300 Subject: [PATCH] - Roughing out the DAGLogger interface - Adding spec for FileSystem Logger --- TODO.md | 10 +-- daggy/include/daggy/DAGLogger.hpp | 68 +++++++++++++++++++ daggy/include/daggy/Logger.hpp | 33 --------- daggy/include/daggy/Utilities.hpp | 2 +- .../daggy/dagloggers/FileSystemLogger.hpp | 55 +++++++++++++++ .../daggy/executors/ForkingTaskExecutor.hpp | 1 - daggy/src/dagloggers/FileSystemLogger.cpp | 28 ++++++++ 7 files changed, 157 insertions(+), 40 deletions(-) create mode 100644 daggy/include/daggy/DAGLogger.hpp delete mode 100644 daggy/include/daggy/Logger.hpp create mode 100644 daggy/include/daggy/dagloggers/FileSystemLogger.hpp create mode 100644 daggy/src/dagloggers/FileSystemLogger.cpp diff --git a/TODO.md b/TODO.md index 08774f0..6324cd6 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,5 @@ -- Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger -- Separate concerns for DAG logger vs DAG definition storage -- Add in authorization scheme (maybe JWT?) -- Flesh out server and interface -- Add ability to define child -> parent relationships \ No newline at end of file +- [ ] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger +- [ ] Separate concerns for DAG logger vs DAG definition storage +- [ ] Add in authorization scheme (maybe JWT?) +- [ ] Flesh out server and interface +- [X] Add ability to define child -> parent relationships diff --git a/daggy/include/daggy/DAGLogger.hpp b/daggy/include/daggy/DAGLogger.hpp new file mode 100644 index 0000000..e67ce72 --- /dev/null +++ b/daggy/include/daggy/DAGLogger.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include + +#include "DAGRun.hpp" + +/* + DAGLogger represents the interface to store all the state information + for daggy to run. Abstracted in case other back-end solutions need to + be supported. +*/ + +namespace daggy { + using DAGDefID = int16_t; + using DAGRunID = size_t; + + enum class RunState : uint32_t { + QUEUED = 0, + RUNNING = 1, + ERRORED = 1 << 1, + KILLED = 1 << 2, + COMPLETED = 1 << 3 + }; + + struct TaskUpdateRecord { + TimePoint time; + size_t taskID; + RunState newState; + }; + + struct DAGUpdateRecord { + TimePoint time; + RunState newState; + }; + + // Pretty heavy weight, but + struct DAGRunRecord { + std::string name; + std::vector tasks; + std::vector runStates; + std::vector> taskAttempts; + std::vector taskStateChanges; + std::vector dagStateChanges; + }; + + struct DAGRunSummary { + DAGRunID runID; + std::string name; + RunState runState; + TimePoint startTime; + TimePoint lastUpdate; + std::unordered_map taskStates; + }; + + class DAGLogger { + public: + // Execution + virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) = 0; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; + virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0; + virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0; + virtual void updateTaskState(DAGRunID dagRunId, RunState state) = 0; + + // Querying + virtual std::vector getDAGs(uint32_t stateMask) = 0; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; + }; +} diff --git a/daggy/include/daggy/Logger.hpp b/daggy/include/daggy/Logger.hpp deleted file mode 100644 index 105b944..0000000 --- a/daggy/include/daggy/Logger.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include - -#include "DAGRun.hpp" - -/* - MetaStore represents the interface to store all the state information - for daggy to run. Abstracted in case other back-end solutions need to - be supported. -*/ - -namespace daggy { - using DAGDefID = int16_t; - using DAGRunID = size_t; - - enum class DAGRunState : uint32_t { - QUEUED = 0, - RUNNING, - ERRORED, - KILLED, - COMPLETED - }; - - class DAGLogger { - public: - // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) = 0; - virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0; - virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0; - virtual void updateDAGRunState(DAGRunID dagRunId, DAGRunState state) = 0; - }; -} diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 241dcdd..ebe515d 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -7,7 +7,7 @@ #include -#include "Logger.hpp" +#include "DAGLogger.hpp" #include "TaskExecutor.hpp" #include "Task.hpp" #include "Defines.hpp" diff --git a/daggy/include/daggy/dagloggers/FileSystemLogger.hpp b/daggy/include/daggy/dagloggers/FileSystemLogger.hpp new file mode 100644 index 0000000..75d9d23 --- /dev/null +++ b/daggy/include/daggy/dagloggers/FileSystemLogger.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include + +#include +#include "../DAGLogger.hpp" + +namespace fs = std::filesystem; +namespace rj = rapidjson; + +namespace daggy { + /* + * This logger should only be used for debug purposes. It's not really optimized for querying, and will + * use a ton of inodes to track state. + * + * On the plus side, it's trivial to look at without using the API. + * + * Filesystem logger creates the following structure: + * {root}/ + * current/ + * {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups + * runs/ + * {runID}/ + * meta.json --- Contains the DAG name, task definitions + * {taskID}/ + * states --- State changes + * {attempt}/ + * meta.json --- timestamps and rc + * stdout + * stderr + * execlog + */ + class FileSystemLogger : DAGLogger { + public: + FileSystemLogger(fs::path root); + + // Execution + virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) override; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; + virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) override; + virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) override; + virtual void updateTaskState(DAGRunID dagRunId, RunState state) override; + + // Querying + virtual std::vector getDAGs(uint32_t stateMask) override; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); + + private: + fs::path root_; + std::mutex lock_; + + const fs::path getCurrentPath() const; + const fs::path getRunsRoot() const; + }; +} diff --git a/daggy/include/daggy/executors/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/ForkingTaskExecutor.hpp index 66a23db..9283c23 100644 --- a/daggy/include/daggy/executors/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/ForkingTaskExecutor.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include "../TaskExecutor.hpp" namespace daggy { diff --git a/daggy/src/dagloggers/FileSystemLogger.cpp b/daggy/src/dagloggers/FileSystemLogger.cpp new file mode 100644 index 0000000..390f615 --- /dev/null +++ b/daggy/src/dagloggers/FileSystemLogger.cpp @@ -0,0 +1,28 @@ +#include + +namespace fs = std::filesystem; + +namespace daggy { + const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; } + const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; } + + FileSystemLogger(fs::path root) + : root_(root) + { + std::vector reqPaths{ root_, getCurrentPath(), getRunsRoot()}; + for (const path : reqPaths) { + if (! fs::exists(path)) { fs::create_directory(path); } + } + } + + // Execution + DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector & tasks){ } + void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ } + void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ } + void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ } + void FileSystemLogger::updateTaskState(DAGRunID dagRunId, RunState state){ } + + // Querying + std::vector FileSystemLogger::getDAGs(uint32_t stateMask){ } + DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {} +}