From 9f90f54b67195f28e614aac35357caf0e218f5cd Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 13 Aug 2021 10:23:55 -0300 Subject: [PATCH] - More work on DAGLoggers - Still need unit tests for the FilesystemLogger --- daggy/include/daggy/AttemptRecord.hpp | 6 +- daggy/include/daggy/Defines.hpp | 1 + daggy/include/daggy/Serialization.hpp | 12 +- daggy/include/daggy/Utilities.hpp | 1 + .../daggy/loggers/dag_run/DAGRunLogger.hpp | 5 +- .../loggers/dag_run/FileSystemLogger.hpp | 28 ++-- .../daggy/loggers/dag_run/OStreamLogger.hpp | 13 +- daggy/src/Serialization.cpp | 32 ++++- daggy/src/Utilities.cpp | 21 ++- .../executors/task/ForkingTaskExecutor.cpp | 4 +- .../src/loggers/dag_run/FileSystemLogger.cpp | 129 +++++++++++++++++- daggy/src/loggers/dag_run/OStreamLogger.cpp | 11 +- tests/unit_executor_forkingexecutor.cpp | 12 +- 13 files changed, 221 insertions(+), 54 deletions(-) diff --git a/daggy/include/daggy/AttemptRecord.hpp b/daggy/include/daggy/AttemptRecord.hpp index 54ee29b..5b3515e 100644 --- a/daggy/include/daggy/AttemptRecord.hpp +++ b/daggy/include/daggy/AttemptRecord.hpp @@ -10,8 +10,8 @@ namespace daggy { TimePoint startTime; TimePoint stopTime; int rc; // RC from the task - std::string metaLog; // Logs from the dag_executor - std::string output; // stdout from command - std::string error; // stderr from command + std::string executorLog; // Logs from the dag_executor + std::string outputLog; // stdout from command + std::string errorLog; // stderr from command }; } diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index be1c5dd..2cca419 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -20,4 +20,5 @@ namespace daggy { using DAGDefID = int16_t; using DAGRunID = size_t; using TaskID = size_t; + } diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 5324979..c908c07 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -9,6 +9,7 @@ #include "Defines.hpp" #include "Task.hpp" +#include "AttemptRecord.hpp" namespace rj = rapidjson; @@ -16,17 +17,24 @@ namespace daggy { // Parameters ParameterValues parametersFromJSON(const std::string &jsonSpec); - ParameterValues parametersFromJSON(const rj::Document &spec); + ParameterValues parametersFromJSON(const rj::Value &spec); // Tasks std::vector tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters = {}); - std::vector tasksFromJSON(const rj::Document &spec, const ParameterValues ¶meters = {}); + std::vector tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters = {}); std::string taskToJSON(const Task &task); std::string tasksToJSON(const std::vector &tasks); + // Attempt Records + std::string attemptRecordToJSON(const AttemptRecord &attemptRecord); + // default serialization std::ostream &operator<<(std::ostream &os, const Task &task); + + std::string timePointToString(const TimePoint &tp); + + TimePoint stringToTimePoint(const std::string &timeStr); } diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 4800319..9d8af8f 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -32,4 +32,5 @@ namespace daggy { loggers::dag_run::DAGRunLogger &logger, DAG dag); + std::ostream &operator<<(std::ostream &os, const TimePoint &tp); } diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index 1b6c4c3..d9852fb 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -23,9 +23,10 @@ namespace daggy { virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0; - virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0; + virtual void + logTaskAttempt(DAGRunID dagRunID, const std::string &taskName, const AttemptRecord &attempt) = 0; - virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) = 0; + virtual void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) = 0; // Querying virtual std::vector getDAGs(uint32_t stateMask) = 0; diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp index 815ee82..ac3689b 100644 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -21,36 +21,36 @@ namespace daggy { * * Filesystem logger creates the following structure: * {root}/ - * current/ - * {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups * runs/ * {runID}/ * meta.json --- Contains the DAG name, task definitions - * {taskID}/ - * states --- State changes + * states.csv --- DAG state changes + * {taskName}/ + * states.csv --- TASK state changes * {attempt}/ - * meta.json --- timestamps and rc - * stdout - * stderr - * execlog + * metadata.json --- timestamps and rc + * output.log + * error.log + * executor.log */ class FileSystemLogger : public DAGRunLogger { public: FileSystemLogger(fs::path root); // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; - virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override; + void updateDAGRunState(DAGRunID dagRunID, RunState state) override; - virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; + void + logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override; - virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override; + void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override; // Querying - virtual std::vector getDAGs(uint32_t stateMask) override; + std::vector getDAGs(uint32_t stateMask) override; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunID); + DAGRunRecord getDAGRun(DAGRunID dagRunID) override; private: fs::path root_; diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 3e047db..c3cc8d9 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -17,18 +17,19 @@ namespace daggy { OStreamLogger(std::ostream &os); // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; - virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override; + void updateDAGRunState(DAGRunID dagRunID, RunState state) override; - virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; + void + logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override; - virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override; + void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override; // Querying - virtual std::vector getDAGs(uint32_t stateMask) override; + std::vector getDAGs(uint32_t stateMask) override; - virtual DAGRunRecord getDAGRun(DAGRunID dagRunID); + DAGRunRecord getDAGRun(DAGRunID dagRunID) override; private: DAGRunID nextRunID_; diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index e82dd77..31acf36 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -15,7 +15,7 @@ namespace daggy { return parametersFromJSON(doc); } - ParameterValues parametersFromJSON(const rj::Document &spec) { + ParameterValues parametersFromJSON(const rj::Value &spec) { std::unordered_map parameters; if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); } for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { @@ -53,7 +53,7 @@ namespace daggy { return tasksFromJSON(doc, parameters); } - std::vector tasksFromJSON(const rj::Document &spec, const ParameterValues ¶meters) { + std::vector tasksFromJSON(const rj::Value &spec, const ParameterValues ¶meters) { std::vector tasks; if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); } @@ -200,4 +200,32 @@ namespace daggy { os << taskToJSON(task); return os; } + + std::string attemptRecordToJSON(const AttemptRecord &record) { + std::stringstream ss; + + ss << "{" + << R"("startTime": )" << std::quoted(timePointToString(record.startTime)) << ',' + << R"("stopTime": )" << std::quoted(timePointToString(record.stopTime)) << ',' + << R"("rc": )" << std::to_string(record.rc) << ',' + << R"("executorLog": )" << std::quoted(record.executorLog) << ',' + << R"("outputLog": )" << std::quoted(record.outputLog) << ',' + << R"("errorLog": )" << std::quoted(record.errorLog) + << '}'; + + return ss.str(); + } + + std::string timePointToString(const TimePoint &tp) { + std::stringstream ss; + ss << tp; + return ss.str(); + } + + TimePoint stringToTimePoint(const std::string &timeString) { + std::tm dt; + std::stringstream ss{timeString}; + ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); + return Clock::from_time_t(mktime(&dt)); + } } \ No newline at end of file diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index e137c2c..62a8919 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -1,3 +1,5 @@ +#include + #include namespace daggy { @@ -55,13 +57,13 @@ namespace daggy { executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger) { std::vector attempts; - logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING); + logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { attempts.push_back(executor.runCommand(task.command)); - logger.logTaskAttempt(runID, taskID, attempts.back()); + logger.logTaskAttempt(runID, task.name, attempts.back()); if (attempts.back().rc == 0) break; - logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY); + logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RETRY); } return attempts; } @@ -88,16 +90,17 @@ namespace daggy { if (taskState.fut.valid()) { auto attemptRecords = taskState.fut.get(); + const auto &taskName = tasks[taskState.tid].name; if (attemptRecords.empty()) { - logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED); + logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED); continue; } if (attemptRecords.back().rc == 0) { - logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED); + logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::COMPLETED); dag.completeVisit(taskState.tid); taskState.complete = true; } else { - logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED); + logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED); } } } @@ -127,4 +130,10 @@ namespace daggy { std::this_thread::sleep_for(250ms); } } + + std::ostream &operator<<(std::ostream &os, const TimePoint &tp) { + auto t_c = Clock::to_time_t(tp); + os << std::put_time(std::localtime(&t_c), "%Y-%m-%d %H:%M:%S %Z"); + return os; + } } \ No newline at end of file diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index 2fde945..fc0b60b 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -62,8 +62,8 @@ ForkingTaskExecutor::runCommand(std::vector cmd) { } std::atomic running = true; - std::thread stdoutReader([&]() { while (running) rec.output.append(slurp(stdoutPipe[0])); }); - std::thread stderrReader([&]() { while (running) rec.error.append(slurp(stderrPipe[0])); }); + std::thread stdoutReader([&]() { while (running) rec.outputLog.append(slurp(stdoutPipe[0])); }); + std::thread stderrReader([&]() { while (running) rec.errorLog.append(slurp(stderrPipe[0])); }); int rc = 0; waitpid(child, &rc, 0); diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index c4e6e77..2b5c6e8 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -1,4 +1,10 @@ +#include + +#include + #include +#include +#include namespace fs = std::filesystem; @@ -44,17 +50,128 @@ namespace daggy { fs::create_directories(runRoot); // Create meta.json with DAGRun Name and task definitions + std::ofstream ofh(runRoot / "metadata.json", std::ios::trunc | std::ios::binary); + ofh << R"({ "name": )" << std::quoted(name) << R"(, "tasks": )" << tasksToJSON(tasks) << "}\n"; + ofh.close(); + + // Task directories + for (const auto &task : tasks) { + auto taskDir = runRoot / task.name; + fs::create_directories(taskDir); + std::ofstream ofh(taskDir / "states.csv"); + } + return runID; } - void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {} + void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) { + std::ofstream ofh(getRunRoot(dagRunID) / "states.csv", std::ios::binary | std::ios::app); + ofh << std::quoted(timePointToString(Clock::now())) << ',' << magic_enum::enum_name(state) << '\n'; + ofh.flush(); + ofh.close(); + } - void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {} + void + FileSystemLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName, + const AttemptRecord &attempt) { + auto taskRoot = getRunRoot(dagRunID) / taskName; + size_t i = 1; + while (fs::exists(taskRoot / std::to_string(i))) { ++i; } - void FileSystemLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {} + auto attemptDir = taskRoot / std::to_string(i); + fs::create_directories(attemptDir); + + std::ofstream ofh; + + // Metadata + ofh.open(attemptDir / "metadata.json"); + ofh << "{\n" + << R"("startTime": )" << std::quoted(timePointToString(attempt.startTime)) << ",\n" + << R"("stopTime": )" << std::quoted(timePointToString(attempt.stopTime)) << ",\n" + << R"("rc": )" << attempt.rc << '\n' + << '}'; + + // output + ofh.open(attemptDir / "executor.log"); + ofh << attempt.executorLog << std::flush; + ofh.close(); + + // Output + ofh.open(attemptDir / "output.log"); + ofh << attempt.outputLog << std::flush; + ofh.close(); + + // Error + ofh.open(attemptDir / "error.log"); + ofh << attempt.errorLog << std::flush; + ofh.close(); + } + + void FileSystemLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { + std::ofstream ofh(getRunRoot(dagRunID) / taskName / "states.csv", std::ios::binary | std::ios::app); + ofh << std::quoted(timePointToString(Clock::now())) << ',' << magic_enum::enum_name(state) << '\n'; + ofh.flush(); + ofh.close(); + } // Querying - std::vector FileSystemLogger::getDAGs(uint32_t stateMask) { return {}; } + std::vector FileSystemLogger::getDAGs(uint32_t stateMask) { + return {}; + } - DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) { return {}; } -} + DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) { + DAGRunRecord record; + auto runRoot = getRunRoot(dagRunID); + if (!fs::exists(runRoot)) { + throw std::runtime_error("No DAGRun with that ID exists"); + } + + std::ifstream ifh(runRoot / "metadata.json", std::ios::binary); + std::string metaData; + std::getline(ifh, metaData, '\0'); + ifh.close(); + + rj::Document doc; + doc.Parse(metaData.c_str()); + + record.name = doc["name"].GetString(); + record.tasks = tasksFromJSON(doc["tasks"].GetObject()); + + // DAG State Changes + std::string line; + std::string token; + auto dagStateFile = runRoot / "states.csv"; + ifh.open(dagStateFile); + while (std::getline(ifh, line)) { + std::stringstream ss{line}; + std::string time; + std::string state; + std::getline(ss, time, ','); + std::getline(ss, state); + + record.dagStateChanges.emplace_back(DAGUpdateRecord{ + .time = stringToTimePoint(time), + .newState = magic_enum::enum_cast(state).value() + }); + } + ifh.close(); + + // Task states + for (const auto &task : record.tasks) { + auto taskStateFile = runRoot / task.name / "states.csv"; + if (!fs::exists(taskStateFile)) { + record.runStates.push_back(RunState::QUEUED); + continue; + } + + ifh.open(taskStateFile); + while (std::getline(ifh, line)) { continue; } + std::stringstream ss{line}; + while (std::getline(ss, token, ',')) { continue; } + RunState taskState = magic_enum::enum_cast(token).value(); + record.runStates.emplace_back(taskState); + ifh.close(); + } + return record; + } +} \ No newline at end of file diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index 0f537cd..3b0a425 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -21,16 +21,17 @@ namespace daggy { os_ << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl; } - void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord &attempt) { + void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName, + const AttemptRecord &attempt) { std::lock_guard lock(guard_); - const std::string &msg = attempt.rc == 0 ? attempt.output : attempt.error; - os_ << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": " + const std::string &msg = attempt.rc == 0 ? attempt.outputLog : attempt.errorLog; + os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": " << msg << std::endl; } - void OStreamLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) { + void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { std::lock_guard lock(guard_); - os_ << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state) + os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << magic_enum::enum_name(state) << std::endl; } diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index e4a69ce..61e61c0 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -14,8 +14,8 @@ TEST_CASE("Basic Execution", "[forking_executor]") { auto rec = ex.runCommand(cmd); REQUIRE(rec.rc == 0); - REQUIRE(rec.output == "abc 123\n"); - REQUIRE(rec.error.empty()); + REQUIRE(rec.outputLog == "abc 123\n"); + REQUIRE(rec.errorLog.empty()); } SECTION("Error Run") { @@ -24,8 +24,8 @@ TEST_CASE("Basic Execution", "[forking_executor]") { auto rec = ex.runCommand(cmd); REQUIRE(rec.rc == 2); - REQUIRE(rec.error == "/usr/bin/expr: syntax error: missing argument after ‘+’\n"); - REQUIRE(rec.output.empty()); + REQUIRE(rec.errorLog == "/usr/bin/expr: syntax error: missing argument after ‘+’\n"); + REQUIRE(rec.outputLog.empty()); } SECTION("Large Output") { @@ -41,8 +41,8 @@ TEST_CASE("Basic Execution", "[forking_executor]") { auto rec = ex.runCommand(cmd); REQUIRE(rec.rc == 0); - REQUIRE(rec.output.size() == std::filesystem::file_size(bigFile)); - REQUIRE(rec.error.empty()); + REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); + REQUIRE(rec.errorLog.empty()); } } }