From 30aea0818c1594e366a0d7bbedc3b275df115972 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 9 Aug 2021 15:07:16 -0300 Subject: [PATCH] - Running project through clang-tidy formatter. --- daggy/include/daggy/Serialization.hpp | 10 +-- daggy/include/daggy/Utilities.hpp | 22 +++---- .../executors/task/ForkingTaskExecutor.hpp | 3 +- .../daggy/loggers/dag_run/DAGLoggerBase.hpp | 12 ++-- .../loggers/dag_run/FileSystemLogger.hpp | 10 ++- .../daggy/loggers/dag_run/StdOutLogger.hpp | 8 ++- daggy/src/Serialization.cpp | 62 ++++++++++--------- daggy/src/Utilities.cpp | 45 +++++++------- .../src/loggers/dag_run/FileSystemLogger.cpp | 34 +++++----- daggy/src/loggers/dag_run/StdOutLogger.cpp | 22 ++++--- 10 files changed, 129 insertions(+), 99 deletions(-) diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 65b8843..588cd53 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -14,10 +14,12 @@ namespace rj = rapidjson; namespace daggy { // Parameters - ParameterValues parametersFromJSON(const std::string & jsonSpec); - ParameterValues parametersFromJSON(const rj::Document & spec); + ParameterValues parametersFromJSON(const std::string &jsonSpec); + + ParameterValues parametersFromJSON(const rj::Document &spec); // Tasks - std::vector tasksFromJSON(const std::string & jsonSpec, const ParameterValues & parameters = {}); - std::vector tasksFromJSON(const rj::Document & spec, const ParameterValues & parameters = {}); + std::vector tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters = {}); + + std::vector tasksFromJSON(const rj::Document &spec, const ParameterValues ¶meters = {}); } diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index fbc24b9..747f9a7 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -14,22 +14,22 @@ #include "DAG.hpp" namespace daggy { - std::vector expandCommands(const std::vector & command, const ParameterValues & parameters); + std::vector expandCommands(const std::vector &command, const ParameterValues ¶meters); - DAG buildDAGFromTasks(const std::vector & tasks); + DAG buildDAGFromTasks(const std::vector &tasks); // Blocking call std::vector - runTask(DAGRunID runID, - TaskID taskID, - const Task & task, - executors::task::TaskExecutor & executor, - loggers::dag_run::DAGLoggerBase & logger); + runTask(DAGRunID runID, + TaskID taskID, + const Task &task, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGLoggerBase &logger); void runDAG(DAGRunID runID, - std::vector tasks, - executors::task::TaskExecutor & executor, - loggers::dag_run::DAGLoggerBase & logger, - DAG dag); + std::vector tasks, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGLoggerBase &logger, + DAG dag); } diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index d9fdfc8..99abade 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -8,8 +8,7 @@ namespace daggy { class ForkingTaskExecutor : public TaskExecutor { public: ForkingTaskExecutor(size_t nThreads) - : TaskExecutor(nThreads) - {} + : TaskExecutor(nThreads) {} const std::string getName() const override { return "ForkingTaskExecutor"; } diff --git a/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp b/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp index 5f233fb..17decd4 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp @@ -16,7 +16,7 @@ namespace daggy { enum class RunState : uint32_t { QUEUED = 0, RUNNING = 1, - RETRY = 1 << 1, + RETRY = 1 << 1, ERRORED = 1 << 2, KILLED = 1 << 3, COMPLETED = 1 << 4 @@ -46,7 +46,7 @@ namespace daggy { struct DAGRunSummary { DAGRunID runID; std::string name; - RunState runState; + RunState runState; TimePoint startTime; TimePoint lastUpdate; std::unordered_map taskStateCounts; @@ -55,13 +55,17 @@ namespace daggy { class DAGLoggerBase { public: // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) = 0; + virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) = 0; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; - virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) = 0; + + virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0; + virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0; // Querying virtual std::vector getDAGs(uint32_t stateMask) = 0; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; }; } diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp index 7d4c5f6..0884b71 100644 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -39,13 +39,17 @@ namespace daggy { FileSystemLogger(fs::path root); // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) override; + virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; - virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override; + + virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; + virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; // Querying virtual std::vector getDAGs(uint32_t stateMask) override; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); private: @@ -56,7 +60,9 @@ namespace daggy { // std::unordered_map runLocks; inline const fs::path getCurrentPath() const; + inline const fs::path getRunsRoot() const; + inline const fs::path getRunRoot(DAGRunID runID) const; }; } diff --git a/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp b/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp index b25b3c2..a0b6815 100644 --- a/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp @@ -17,13 +17,17 @@ namespace daggy { StdOutLogger(); // Execution - virtual DAGRunID startDAGRun(std::string name, const std::vector & tasks) override; + virtual DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; - virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override; + + virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override; + virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; // Querying virtual std::vector getDAGs(uint32_t stateMask) override; + virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); private: diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 56a73d2..5732bc1 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -3,28 +3,30 @@ namespace daggy { - ParameterValues parametersFromJSON(const std::string & jsonSpec) { + ParameterValues parametersFromJSON(const std::string &jsonSpec) { rj::Document doc; rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); - if (! parseResult) { + if (!parseResult) { throw std::runtime_error("Parameters spec is not valid JSON"); } return parametersFromJSON(doc); } - ParameterValues parametersFromJSON(const rj::Document & spec) { + ParameterValues parametersFromJSON(const rj::Document &spec) { std::unordered_map parameters; if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); } for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { - if (! it->name.IsString()) { + if (!it->name.IsString()) { throw std::runtime_error("All keys must be strings."); } std::string name = std::string{"{{"} + it->name.GetString() + "}}"; if (it->value.IsArray()) { std::vector values; for (size_t i = 0; i < it->value.Size(); ++i) { - if (! it->value[i].IsString()) { - throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + " is not a string."); + if (!it->value[i].IsString()) { + throw std::runtime_error( + "Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + + " is not a string."); } values.emplace_back(it->value[i].GetString()); } @@ -32,22 +34,23 @@ namespace daggy { } else if (it->value.IsString()) { parameters[name] = it->value.GetString(); } else { - throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array."); + throw std::runtime_error( + "Attribute for " + std::string{it->name.GetString()} + " is not a string or an array."); } } return parameters; } - std::vector tasksFromJSON(const std::string & jsonSpec, const ParameterValues & parameters) { + std::vector tasksFromJSON(const std::string &jsonSpec, const ParameterValues ¶meters) { rj::Document doc; rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); - if (! parseResult) { + if (!parseResult) { throw std::runtime_error("Unable to parse spec: "); } return tasksFromJSON(doc, parameters); } - std::vector tasksFromJSON(const rj::Document & spec, const ParameterValues & parameters) { + std::vector tasksFromJSON(const rj::Document &spec, const ParameterValues ¶meters) { std::vector tasks; if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); } @@ -59,13 +62,13 @@ namespace daggy { // Tasks for (size_t i = 0; i < spec.Size(); ++i) { - if (! spec[i].IsObject()) { + if (!spec[i].IsObject()) { throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary."); } - const auto & taskSpec = spec[i].GetObject(); + const auto &taskSpec = spec[i].GetObject(); - for (const auto & reqField : reqFields) { - if (! taskSpec.HasMember(reqField.c_str())) { + for (const auto &reqField : reqFields) { + if (!taskSpec.HasMember(reqField.c_str())) { throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField); } } @@ -77,18 +80,19 @@ namespace daggy { uint8_t maxRetries = 0; if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); } uint8_t retryIntervalSeconds = 0; - if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); } + if (taskSpec.HasMember( + "retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); } // Children / parents std::vector children; if (taskSpec.HasMember("children")) { - const auto & specChildren = taskSpec["children"].GetArray(); + const auto &specChildren = taskSpec["children"].GetArray(); for (size_t c = 0; c < specChildren.Size(); ++c) { children.emplace_back(specChildren[c].GetString()); } } if (taskSpec.HasMember("parents")) { - const auto & specParents = taskSpec["parents"].GetArray(); + const auto &specParents = taskSpec["parents"].GetArray(); for (size_t c = 0; c < specParents.Size(); ++c) { parentMap[name].emplace_back(specParents[c].GetString()); } @@ -102,35 +106,35 @@ namespace daggy { auto commands = expandCommands(command, parameters); // Create the tasks - auto & taskNames = childrenMap[name]; + auto &taskNames = childrenMap[name]; for (size_t tid = 0; tid < commands.size(); ++tid) { std::string taskName = name + "_" + std::to_string(tid); taskNames.push_back(taskName); - tasks.emplace_back(Task { - .name = name + "_" + std::to_string(tid), - .command = commands[tid], - .maxRetries = maxRetries, - .retryIntervalSeconds = retryIntervalSeconds, - .children = children + tasks.emplace_back(Task{ + .name = name + "_" + std::to_string(tid), + .command = commands[tid], + .maxRetries = maxRetries, + .retryIntervalSeconds = retryIntervalSeconds, + .children = children }); } } // Update any missing child -> parent relationship - for (auto & task : tasks) { + for (auto &task : tasks) { auto pit = parentMap.find(task.name); if (pit == parentMap.end()) { continue; } - for (const auto & parent : pit->second) { + for (const auto &parent : pit->second) { tasks[taskIndex[parent]].children.emplace_back(task.name); } } // At the end, replace the names of the children with all the expanded versions - for (auto & task : tasks) { + for (auto &task : tasks) { std::vector children; - for (const auto & child : task.children) { - auto & newChildren = childrenMap[child]; + for (const auto &child : task.children) { + auto &newChildren = childrenMap[child]; std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children)); } task.children.swap(children); diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index cfc5a29..b65a8d4 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -2,16 +2,16 @@ namespace daggy { std::vector> - expandCommands(const std::vector & command, const ParameterValues & parameters) { - std::vector> commands{ {} }; + expandCommands(const std::vector &command, const ParameterValues ¶meters) { + std::vector> commands{{}}; - for (const auto & part : command) { + for (const auto &part : command) { // this isn't an interpolated value if (parameters.find(part) == parameters.end()) { for (auto &cmd : commands) cmd.push_back(part); continue; } - auto & inVal = parameters.at(part); + auto &inVal = parameters.at(part); if (std::holds_alternative(inVal)) { for (auto &cmd : commands) cmd.push_back(std::get(inVal)); continue; @@ -19,7 +19,7 @@ namespace daggy { // Ends up being expensive, as it's a cartesian product std::vector> newCommands; - for (const auto & val : std::get>(inVal)) { + for (const auto &val : std::get>(inVal)) { for (auto cmd : commands) { cmd.push_back(val); newCommands.push_back(cmd); @@ -30,7 +30,7 @@ namespace daggy { return commands; } - DAG buildDAGFromTasks(const std::vector & tasks) { + DAG buildDAGFromTasks(const std::vector &tasks) { DAG dag; std::unordered_map taskIDs; @@ -50,27 +50,26 @@ namespace daggy { } std::vector runTask(DAGRunID runID, - TaskID taskID, - const Task & task, - executors::task::TaskExecutor & executor, - loggers::dag_run::DAGLoggerBase & logger) - { + TaskID taskID, + const Task &task, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGLoggerBase &logger) { std::vector attempts; - logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING ); + logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { attempts.push_back(executor.runCommand(task.command)); logger.logTaskAttempt(runID, taskID, attempts.back()); if (attempts.back().rc == 0) break; - logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY ); + logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY); } return attempts; } void runDAG(DAGRunID runID, std::vector tasks, - executors::task::TaskExecutor & executor, - loggers::dag_run::DAGLoggerBase & logger, + executors::task::TaskExecutor &executor, + loggers::dag_run::DAGLoggerBase &logger, DAG dag) { logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); @@ -90,15 +89,15 @@ namespace daggy { if (taskState.fut.valid()) { auto attemptRecords = taskState.fut.get(); if (attemptRecords.empty()) { - logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED ); + logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED); continue; } if (attemptRecords.back().rc == 0) { - logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED ); + logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED); dag.completeVisit(taskState.tid); taskState.complete = true; } else { - logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED ); + logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED); } } } @@ -110,9 +109,11 @@ namespace daggy { // Schedule the task to run auto tid = t.value(); TaskState tsk{ - .tid = tid, - .fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() {return runTask(runID, tid, tasks[tid], executor, logger);}), - .complete = false + .tid = tid, + .fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() { + return runTask(runID, tid, tasks[tid], executor, logger); + }), + .complete = false }; taskStates.push_back(std::move(tsk)); @@ -120,7 +121,7 @@ namespace daggy { if (not nextTask.has_value()) break; t.emplace(nextTask.value()); } - if (! tq->empty()) { + if (!tq->empty()) { executor.threadPool.addTasks(tq); } std::this_thread::sleep_for(250ms); diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index f19f7db..56c8540 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -6,32 +6,34 @@ using namespace daggy::loggers::dag_run; namespace daggy { inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; } + inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; } - inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); } + + inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const { + return getRunsRoot() / std::to_string(runID); + } FileSystemLogger::FileSystemLogger(fs::path root) - : root_(root) - , nextRunID_(0) - { - const std::vector reqPaths{ root_, getCurrentPath(), getRunsRoot()}; - for (const auto & path : reqPaths) { - if (! fs::exists(path)) { fs::create_directory(path); } + : root_(root), nextRunID_(0) { + const std::vector reqPaths{root_, getCurrentPath(), getRunsRoot()}; + for (const auto &path : reqPaths) { + if (!fs::exists(path)) { fs::create_directory(path); } } // Get the next run ID size_t runID = 0; - for (auto & dir : fs::directory_iterator(getRunsRoot())) { + for (auto &dir : fs::directory_iterator(getRunsRoot())) { try { runID = std::stoull(dir.path().stem()); if (runID > nextRunID_) nextRunID_ = runID + 1; - } catch (std::exception & e) { + } catch (std::exception &e) { continue; } } } // Execution - DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector & tasks){ + DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector &tasks) { DAGRunID runID = nextRunID_++; // TODO make this threadsafe @@ -40,11 +42,15 @@ namespace daggy { // Init the directory } - void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ } - void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ } - void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state){ } + + void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {} + + void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {} + + void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) {} // Querying - std::vector FileSystemLogger::getDAGs(uint32_t stateMask){ } + std::vector FileSystemLogger::getDAGs(uint32_t stateMask) {} + DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {} } diff --git a/daggy/src/loggers/dag_run/StdOutLogger.cpp b/daggy/src/loggers/dag_run/StdOutLogger.cpp index 3701de3..4cbb986 100644 --- a/daggy/src/loggers/dag_run/StdOutLogger.cpp +++ b/daggy/src/loggers/dag_run/StdOutLogger.cpp @@ -5,34 +5,38 @@ namespace daggy { namespace loggers { namespace dag_run { - StdOutLogger::StdOutLogger() : nextRunID_(0) { } + StdOutLogger::StdOutLogger() : nextRunID_(0) {} // Execution - DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector & tasks) { + DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector &tasks) { std::lock_guard lock(guard_); size_t runID = nextRunID_++; - std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; + std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() + << " tasks" << std::endl; return runID; } - void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ + void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) { std::lock_guard lock(guard_); std::cout << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl; } - void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord & attempt){ + void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord &attempt) { std::lock_guard lock(guard_); - const std::string & msg = attempt.rc == 0 ? attempt.output : attempt.error; - std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": " << msg << std::endl; + const std::string &msg = attempt.rc == 0 ? attempt.output : attempt.error; + std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": " + << msg << std::endl; } void StdOutLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) { std::lock_guard lock(guard_); - std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state) << std::endl; + std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state) + << std::endl; } // Querying - std::vector StdOutLogger::getDAGs(uint32_t stateMask){ return {}; } + std::vector StdOutLogger::getDAGs(uint32_t stateMask) { return {}; } + DAGRunRecord StdOutLogger::getDAGRun(DAGRunID dagRunId) { return {}; } } }