diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index 53645f8..beace9b 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -23,8 +23,8 @@ namespace daggy { template struct Vertex { - RunState state; - uint32_t depCount; + RunState state = RunState::QUEUED; + uint32_t depCount = 0; V data; std::unordered_set children; }; @@ -32,8 +32,6 @@ namespace daggy { template class DAG { - using Edge = std::pair; - public: // Vertices void addVertex(K id, V data); @@ -41,21 +39,19 @@ namespace daggy { std::unordered_set getVertices() const; // Edges - void addEdge(const K &src, const K &dst); + void addEdge(const K &from, const K &to); void addEdgeIf(const K &src, std::function &v)> predicate); - bool isValid() const; + [[nodiscard]] bool isValid() const; - bool hasVertex(const K &from); - - const std::vector &getEdges(); + bool hasVertex(const K &id); // Attributes - size_t size() const; + [[nodiscard]] size_t size() const; - bool empty() const; + [[nodiscard]] bool empty() const; // Reset the DAG to completely unvisited void reset(); @@ -63,14 +59,12 @@ namespace daggy { // Reset any vertex with RUNNING state to QUEUED void resetRunning(); - RunState getVertexState(const K &id) const; - void setVertexState(const K &id, RunState state); void forEach( std::function> &)> fun) const; - bool allVisited() const; + [[nodiscard]] bool allVisited() const; std::optional> visitNext(); diff --git a/daggy/include/daggy/DAG.impl.hxx b/daggy/include/daggy/DAG.impl.hxx index 79da631..f347143 100644 --- a/daggy/include/daggy/DAG.impl.hxx +++ b/daggy/include/daggy/DAG.impl.hxx @@ -167,16 +167,10 @@ namespace daggy { } template - void DAG::forEach(std::function> &) - - > - fun) const + void DAG::forEach( + std::function> &)> fun) const { - for (auto it = vertices_.begin(); it != vertices_. - - end(); - - ++it) { + for (auto it = vertices_.begin(); it != vertices_.end(); ++it) { fun(*it); } } diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index 9e36503..f4f4231 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -27,11 +27,9 @@ namespace daggy { { } - Server &setWebHandlerThreads(size_t nThreads); - Server &setSSLCertificates(const fs::path &cert, const fs::path &key); - void init(int threads = 1); + void init(size_t threads = 1); void start(); diff --git a/daggy/include/daggy/ThreadPool.hpp b/daggy/include/daggy/ThreadPool.hpp index b6ade2a..4208d47 100644 --- a/daggy/include/daggy/ThreadPool.hpp +++ b/daggy/include/daggy/ThreadPool.hpp @@ -130,7 +130,7 @@ namespace daggy { } if (tqit_ == taskQueues_.end()) tqit_ = taskQueues_.begin(); - task = std::move((*tqit_)->pop()); + task = (*tqit_)->pop(); if ((*tqit_)->empty()) { tqit_ = taskQueues_.erase(tqit_); } @@ -160,7 +160,7 @@ namespace daggy { return fut; } - void addTasks(std::shared_ptr tq) + void addTasks(std::shared_ptr &tq) { if (drain_) throw std::runtime_error("Unable to add task to draining pool"); @@ -170,7 +170,7 @@ namespace daggy { } private: - // need to keep track of threads so we can join them + // need to keep track of threads, so we can join them std::vector workers_; // the task queue std::list> taskQueues_; diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index 2272f98..860fe04 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -10,7 +10,7 @@ namespace daggy::executors::task { public: using Command = std::vector; - ForkingTaskExecutor(size_t nThreads) + explicit ForkingTaskExecutor(size_t nThreads) : tp_(nThreads) { } diff --git a/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp b/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp index db3cbdf..f8f159e 100644 --- a/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp @@ -9,7 +9,7 @@ namespace daggy::executors::task { using Command = std::vector; SlurmTaskExecutor(); - ~SlurmTaskExecutor(); + ~SlurmTaskExecutor() override; // Validates the job to ensure that all required values are set and are of // the right type, diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index a1b211e..0e97036 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -11,7 +11,7 @@ be supported. */ -namespace daggy { namespace loggers { namespace dag_run { +namespace daggy::loggers::dag_run { class DAGRunLogger { public: @@ -20,10 +20,10 @@ namespace daggy { namespace loggers { namespace dag_run { // Execution virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0; - virtual void addTask(DAGRunID dagRunID, const std::string taskName, + virtual void addTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) = 0; - virtual void updateTask(DAGRunID dagRunID, const std::string taskName, + virtual void updateTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) = 0; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0; @@ -39,4 +39,4 @@ namespace daggy { namespace loggers { namespace dag_run { virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0; }; -}}} // namespace daggy::loggers::dag_run +} // namespace daggy::loggers::dag_run diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp deleted file mode 100644 index 66e7795..0000000 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include - -#include -#include -#include - -#include "DAGRunLogger.hpp" -#include "Defines.hpp" - -namespace fs = std::filesystem; -namespace rj = rapidjson; - -namespace daggy::loggers::dag_run { - /* - * This logger should only be used for debug purposes. It's not really - * optimized for querying, and will use a ton of inodes to track state. - * - * On the plus side, it's trivial to look at without using the API. - * - * Filesystem logger creates the following structure: - * {root}/ - * runs/ - * {runID}/ - * meta.json --- Contains the DAG name, task definitions - * states.csv --- DAG state changes - * {taskName}/ - * states.csv --- TASK state changes - * {attempt}/ - * metadata.json --- timestamps and rc - * output.log - * error.log - * executor.log - */ - class FileSystemLogger : public DAGRunLogger - { - public: - FileSystemLogger(fs::path root); - - // Execution - DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override; - - void updateDAGRunState(DAGRunID dagRunID, RunState state) override; - - void logTaskAttempt(DAGRunID, const std::string &taskName, - const AttemptRecord &attempt) override; - - void updateTaskState(DAGRunID dagRunID, const std::string &taskName, - RunState state) override; - - // Querying - std::vector getDAGs(uint32_t stateMask) override; - - DAGRunRecord getDAGRun(DAGRunID dagRunID) override; - - private: - fs::path root_; - std::atomic nextRunID_; - std::mutex lock_; - - // std::unordered_map runLocks; - - inline const fs::path getCurrentPath() const; - - inline const fs::path getRunsRoot() const; - - inline const fs::path getRunRoot(DAGRunID runID) const; - }; -} // namespace daggy::loggers::dag_run diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 0948b71..ad0979c 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -6,7 +6,7 @@ #include "DAGRunLogger.hpp" #include "Defines.hpp" -namespace daggy { namespace loggers { namespace dag_run { +namespace daggy::loggers::dag_run { /* * This logger should only be used for debug purposes. It doesn't actually log * anything, just prints stuff to stdout. @@ -14,15 +14,15 @@ namespace daggy { namespace loggers { namespace dag_run { class OStreamLogger : public DAGRunLogger { public: - OStreamLogger(std::ostream &os); + explicit OStreamLogger(std::ostream &os); // Execution DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override; - void addTask(DAGRunID dagRunID, const std::string taskName, + void addTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) override; - void updateTask(DAGRunID dagRunID, const std::string taskName, + void updateTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override; @@ -48,4 +48,4 @@ namespace daggy { namespace loggers { namespace dag_run { void _updateDAGRunState(DAGRunID dagRunID, RunState state); }; -}}} // namespace daggy::loggers::dag_run +} // namespace daggy::loggers::dag_run diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 8f5eac3..7540eaa 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -77,9 +77,9 @@ namespace daggy { } else { ss << '['; - const auto &vals = std::get>(v); - bool firstVal = true; - for (const auto &val : vals) { + const auto &values = std::get>(v); + bool firstVal = true; + for (const auto &val : values) { if (firstVal) { firstVal = false; } @@ -200,7 +200,7 @@ namespace daggy { std::string taskToJSON(const Task &task) { std::stringstream ss; - bool first = false; + bool first; ss << "{" << R"("maxRetries": )" << task.maxRetries << ',' @@ -283,7 +283,7 @@ namespace daggy { TimePoint stringToTimePoint(const std::string &timeString) { - std::tm dt; + std::tm dt{}; std::stringstream ss{timeString}; ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); return Clock::from_time_t(mktime(&dt)); diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 6049722..81cacf8 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -13,7 +13,7 @@ namespace rj = rapidjson; using namespace Pistache; namespace daggy { - void Server::init(int threads) + void Server::init(size_t threads) { auto opts = Http::Endpoint::options() .threads(threads) @@ -33,6 +33,12 @@ namespace daggy { endpoint_.serveThreaded(); } + Server &Server::setSSLCertificates(const fs::path &cert, const fs::path &key) + { + endpoint_.useSSL(cert, key); + return *this; + } + void Server::shutdown() { endpoint_.shutdown(); @@ -47,8 +53,9 @@ namespace daggy { { desc_.info().license("MIT", "https://opensource.org/licenses/MIT"); - auto backendErrorResponse = desc_.response( - Http::Code::Internal_Server_Error, "An error occured with the backend"); + auto backendErrorResponse = + desc_.response(Http::Code::Internal_Server_Error, + "An error occurred with the backend"); desc_.schemes(Rest::Scheme::Http) .basePath("/v1") @@ -212,8 +219,8 @@ namespace daggy { if (!request.hasParam(":runID")) { REQ_ERROR(Not_Found, "No runID provided in URL"); } - DAGRunID runID = request.param(":runID").as(); - auto run = logger_.getDAGRun(runID); + auto runID = request.param(":runID").as(); + auto run = logger_.getDAGRun(runID); bool first = true; std::stringstream ss; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 48cf17c..12c6b0d 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -42,7 +42,7 @@ namespace daggy { else { for (const auto &val : std::get>(paramValue)) { - for (auto cmd : expandedPart) { + for (const auto &cmd : expandedPart) { newExpandedPart.push_back(globalSub(cmd, param, val)); } } diff --git a/daggy/src/executors/task/CMakeLists.txt b/daggy/src/executors/task/CMakeLists.txt index 5fda838..0b480d2 100644 --- a/daggy/src/executors/task/CMakeLists.txt +++ b/daggy/src/executors/task/CMakeLists.txt @@ -1,5 +1,5 @@ target_sources(${PROJECT_NAME} PRIVATE - ForkingTaskExecutor.cpp - SlurmTaskExecutor.cpp - NoopTaskExecutor.cpp -) + SlurmTaskExecutor.cpp + NoopTaskExecutor.cpp + ForkingTaskExecutor.cpp + ) diff --git a/daggy/src/executors/task/SlurmTaskExecutor.cpp b/daggy/src/executors/task/SlurmTaskExecutor.cpp index 5c2cc38..e53e121 100644 --- a/daggy/src/executors/task/SlurmTaskExecutor.cpp +++ b/daggy/src/executors/task/SlurmTaskExecutor.cpp @@ -2,12 +2,11 @@ #include #ifdef DAGGY_ENABLE_SLURM #include -#include #include #include -#include #include +#include #include #include #include @@ -223,7 +222,6 @@ namespace daggy::executors::task { case JOB_SUSPENDED: case JOB_RUNNING: continue; - break; // Job has finished case JOB_COMPLETE: /* completed execution successfully */ case JOB_FAILED: /* completed execution unsuccessfully */ diff --git a/daggy/src/loggers/dag_run/CMakeLists.txt b/daggy/src/loggers/dag_run/CMakeLists.txt index d283f25..c3fee3e 100644 --- a/daggy/src/loggers/dag_run/CMakeLists.txt +++ b/daggy/src/loggers/dag_run/CMakeLists.txt @@ -1,4 +1,3 @@ target_sources(${PROJECT_NAME} PRIVATE - FileSystemLogger.cpp - OStreamLogger.cpp -) + OStreamLogger.cpp + ) diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp deleted file mode 100644 index 3e8eb22..0000000 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ /dev/null @@ -1,212 +0,0 @@ -#include - -#include -#include -#include -#include -#include - -namespace fs = std::filesystem; - -using namespace daggy::loggers::dag_run; - -namespace daggy { - inline const fs::path FileSystemLogger::getCurrentPath() const - { - return root_ / "current"; - } - - inline const fs::path FileSystemLogger::getRunsRoot() const - { - return root_ / "runs"; - } - - inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const - { - return getRunsRoot() / std::to_string(runID); - } - - FileSystemLogger::FileSystemLogger(fs::path root) - : root_(root) - , nextRunID_(0) - { - const std::vector reqPaths{root_, getCurrentPath(), - getRunsRoot()}; - for (const auto &path : reqPaths) { - if (!fs::exists(path)) { - fs::create_directories(path); - } - } - - // Get the next run ID - for (auto &dir : fs::directory_iterator(getRunsRoot())) { - try { - size_t runID = std::stoull(dir.path().stem()); - if (runID > nextRunID_) - nextRunID_ = runID + 1; - } - catch (std::exception &e) { - continue; - } - } - } - - // Execution - DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskSet &tasks) - { - DAGRunID runID = nextRunID_++; - - // TODO make this threadsafe - fs::path runDir = getRunRoot(runID); - // std::lock_guard guard(runLocks[runDir]); - - // Init the directory - fs::path runRoot = getRunsRoot() / std::to_string(runID); - fs::create_directories(runRoot); - - // Create meta.json with DAGRun Name and task definitions - std::ofstream ofh(runRoot / "metadata.json", - std::ios::trunc | std::ios::binary); - ofh << R"({ "name": )" << std::quoted(name) << R"(, "tasks": )" - << tasksToJSON(tasks) << "}\n"; - ofh.close(); - - // Task directories - for (const auto &[name, task] : tasks) { - auto taskDir = runRoot / name; - fs::create_directories(taskDir); - std::ofstream ofh(taskDir / "states.csv"); - } - - return runID; - } - - void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) - { - std::ofstream ofh(getRunRoot(dagRunID) / "states.csv", - std::ios::binary | std::ios::app); - ofh << std::quoted(timePointToString(Clock::now())) << ',' - << state._to_string() << '\n'; - ofh.flush(); - ofh.close(); - } - - void FileSystemLogger::logTaskAttempt(DAGRunID dagRunID, - const std::string &taskName, - const AttemptRecord &attempt) - { - auto taskRoot = getRunRoot(dagRunID) / taskName; - size_t i = 1; - while (fs::exists(taskRoot / std::to_string(i))) { - ++i; - } - - auto attemptDir = taskRoot / std::to_string(i); - fs::create_directories(attemptDir); - - std::ofstream ofh; - - // Metadata - ofh.open(attemptDir / "metadata.json"); - ofh << "{\n" - << R"("startTime": )" - << std::quoted(timePointToString(attempt.startTime)) << ",\n" - << R"("stopTime": )" << std::quoted(timePointToString(attempt.stopTime)) - << ",\n" - << R"("rc": )" << attempt.rc << '\n' - << '}'; - - // output - ofh.open(attemptDir / "executor.log"); - ofh << attempt.executorLog << std::flush; - ofh.close(); - - // Output - ofh.open(attemptDir / "output.log"); - ofh << attempt.outputLog << std::flush; - ofh.close(); - - // Error - ofh.open(attemptDir / "error.log"); - ofh << attempt.errorLog << std::flush; - ofh.close(); - } - - void FileSystemLogger::updateTaskState(DAGRunID dagRunID, - const std::string &taskName, - RunState state) - { - std::ofstream ofh(getRunRoot(dagRunID) / taskName / "states.csv", - std::ios::binary | std::ios::app); - ofh << std::quoted(timePointToString(Clock::now())) << ',' - << state._to_string() << '\n'; - ofh.flush(); - ofh.close(); - } - - // Querying - std::vector FileSystemLogger::getDAGs(uint32_t stateMask) - { - return {}; - } - - DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) - { - DAGRunRecord record; - auto runRoot = getRunRoot(dagRunID); - if (!fs::exists(runRoot)) { - throw std::runtime_error("No DAGRun with that ID exists"); - } - - std::ifstream ifh(runRoot / "metadata.json", std::ios::binary); - std::string metaData; - std::getline(ifh, metaData, '\0'); - ifh.close(); - - rj::Document doc; - doc.Parse(metaData.c_str()); - - record.name = doc["name"].GetString(); - record.tasks = tasksFromJSON(doc["tasks"]); - - // DAG State Changes - std::string line; - std::string token; - auto dagStateFile = runRoot / "states.csv"; - ifh.open(dagStateFile); - while (std::getline(ifh, line)) { - std::stringstream ss{line}; - std::string time; - std::string state; - std::getline(ss, time, ','); - std::getline(ss, state); - - record.dagStateChanges.emplace_back( - DAGUpdateRecord{.time = stringToTimePoint(time), - .newState = RunState::_from_string(state.c_str())}); - } - ifh.close(); - - // Task states - for (const auto &[taskName, task] : record.tasks) { - auto taskStateFile = runRoot / taskName / "states.csv"; - if (!fs::exists(taskStateFile)) { - record.taskRunStates.emplace(taskName, RunState::QUEUED); - continue; - } - - ifh.open(taskStateFile); - while (std::getline(ifh, line)) { - continue; - } - std::stringstream ss{line}; - while (std::getline(ss, token, ',')) { - continue; - } - RunState taskState = RunState::_from_string(token.c_str()); - record.taskRunStates.emplace(taskName, taskState); - ifh.close(); - } - return record; - } -} // namespace daggy diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index a56d29c..f21a09a 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -31,7 +31,7 @@ namespace daggy { namespace loggers { namespace dag_run { return runID; } - void OStreamLogger::addTask(DAGRunID dagRunID, const std::string taskName, + void OStreamLogger::addTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) { std::lock_guard lock(guard_); @@ -40,7 +40,7 @@ namespace daggy { namespace loggers { namespace dag_run { _updateTaskState(dagRunID, taskName, RunState::QUEUED); } - void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string taskName, + void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) { std::lock_guard lock(guard_); diff --git a/tests/int_basic.cpp b/tests/int_basic.cpp index 5088e5a..0c16e19 100644 --- a/tests/int_basic.cpp +++ b/tests/int_basic.cpp @@ -1,8 +1,6 @@ #include #include -#include "daggy/DAG.hpp" - TEST_CASE("General tests", "[general]") { REQUIRE(1 == 1); diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index b2e0392..c0eb1d1 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -53,14 +53,14 @@ TEST_CASE("dag_traversal", "[dag]") std::vector> edges{{0, 6}, {1, 5}, {5, 6}, {6, 7}, {2, 3}, {3, 5}, {4, 7}, {7, 8}, {7, 9}}; - for (auto const [from, to] : edges) { + for (const auto &[from, to] : edges) { dag.addEdge(from, to); } SECTION("Basic Traversal") { dag.reset(); - std::vector visitOrder(N_VERTICES); + std::vector visitOrder(N_VERTICES); size_t i = 0; while (!dag.allVisited()) { const auto v = dag.visitNext().value(); @@ -70,7 +70,7 @@ TEST_CASE("dag_traversal", "[dag]") } // Ensure visit order is preserved - for (auto const [from, to] : edges) { + for (const auto &[from, to] : edges) { REQUIRE(visitOrder[from] <= visitOrder[to]); } } diff --git a/tests/unit_dagrun_loggers.cpp b/tests/unit_dagrun_loggers.cpp index dc9a72a..c8bf95b 100644 --- a/tests/unit_dagrun_loggers.cpp +++ b/tests/unit_dagrun_loggers.cpp @@ -3,7 +3,6 @@ #include #include -#include "daggy/loggers/dag_run/FileSystemLogger.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" namespace fs = std::filesystem; @@ -40,26 +39,6 @@ inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, return runID; } -/* -TEST_CASE("Filesystem Logger", "[filesystem_logger]") { - const fs::path logRoot{"fs_logger_unit"}; - auto cleanup = [&]() { - if (fs::exists(logRoot)) { - fs::remove_all(logRoot); - } - }; - - //cleanup(); - daggy::loggers::dag_run::FileSystemLogger logger(logRoot); - - SECTION("DAGRun Starts") { - testDAGRunInit(logger, "init_test", SAMPLE_TASKS); - } - - // cleanup(); -} -*/ - TEST_CASE("ostream_logger", "[ostream_logger]") { // cleanup(); diff --git a/tests/unit_executor_slurmexecutor.cpp b/tests/unit_executor_slurmexecutor.cpp index 3695f89..2b63a72 100644 --- a/tests/unit_executor_slurmexecutor.cpp +++ b/tests/unit_executor_slurmexecutor.cpp @@ -1,4 +1,3 @@ -#include #include #include diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index 98a1eaf..bb7f444 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -7,12 +7,12 @@ #include #include #include -#include #include namespace rj = rapidjson; -Pistache::Http::Response REQUEST(std::string url, std::string payload = "") +Pistache::Http::Response REQUEST(const std::string &url, + const std::string &payload = "") { Pistache::Http::Experimental::Client client; client.init(); @@ -28,12 +28,12 @@ Pistache::Http::Response REQUEST(std::string url, std::string payload = "") request.then( [&](Pistache::Http::Response rsp) { ok = true; - response = rsp; + response = std::move(rsp); }, [&](std::exception_ptr ptr) { error = true; try { - std::rethrow_exception(ptr); + std::rethrow_exception(std::move(ptr)); } catch (std::exception &e) { msg = e.what(); diff --git a/tests/unit_threadpool.cpp b/tests/unit_threadpool.cpp index 507b673..054a25b 100644 --- a/tests/unit_threadpool.cpp +++ b/tests/unit_threadpool.cpp @@ -18,10 +18,10 @@ TEST_CASE("threadpool", "[threadpool]") auto tq = std::make_shared(); std::vector> res; for (size_t i = 0; i < 100; ++i) - res.emplace_back(std::move(tq->addTask([&cnt]() { + res.emplace_back(tq->addTask([&cnt]() { cnt++; return cnt.load(); - }))); + })); tp.addTasks(tq); for (auto &r : res) r.get(); diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 73fd36d..1471f27 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include "daggy/Serialization.hpp" #include "daggy/Utilities.hpp" @@ -67,7 +66,7 @@ TEST_CASE("dag_runner_order", "[dagrun_order]") std::stringstream ss; daggy::loggers::dag_run::OStreamLogger logger(ss); - daggy::TimePoint startTime = daggy::Clock::now(); + daggy::TimePoint globalStartTime = daggy::Clock::now(); std::string testParams{ R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"}; @@ -94,11 +93,11 @@ TEST_CASE("dag_runner_order", "[dagrun_order]") // Ensure the run order auto rec = logger.getDAGRun(runID); - daggy::TimePoint stopTime = daggy::Clock::now(); + daggy::TimePoint globalStopTime = daggy::Clock::now(); std::array minTimes; - minTimes.fill(startTime); + minTimes.fill(globalStartTime); std::array maxTimes; - maxTimes.fill(stopTime); + maxTimes.fill(globalStopTime); for (const auto &[k, v] : rec.taskAttempts) { size_t idx = k[0] - 65; diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index c7bb6c6..495e00e 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -1,8 +1,8 @@ -#include #include #include #include +#include #include #include #include @@ -38,6 +38,8 @@ void signalHandler(int signal) case SIGTERM: running = false; break; + default: + break; } } @@ -61,7 +63,7 @@ void daemonize() sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */ sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */ sigprocmask(SIG_BLOCK, &newSigSet, - NULL); /* Block the above specified signals */ + nullptr); /* Block the above specified signals */ /* Set up a signal handler */ newSigAction.sa_handler = signalHandler; @@ -69,9 +71,9 @@ void daemonize() newSigAction.sa_flags = 0; /* Signals to handle */ - sigaction(SIGHUP, &newSigAction, NULL); /* catch hangup signal */ - sigaction(SIGTERM, &newSigAction, NULL); /* catch term signal */ - sigaction(SIGINT, &newSigAction, NULL); /* catch interrupt signal */ + sigaction(SIGHUP, &newSigAction, nullptr); /* catch hangup signal */ + sigaction(SIGTERM, &newSigAction, nullptr); /* catch term signal */ + sigaction(SIGINT, &newSigAction, nullptr); /* catch interrupt signal */ // Fork once pid = fork(); @@ -107,7 +109,7 @@ void daemonize() (void)rc; /* Close all open file descriptors */ - for (auto x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { + for (int x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { close(x); } } @@ -152,14 +154,14 @@ int main(int argc, char **argv) exit(1); } - bool verbose = args.get("--verbose"); - bool asDaemon = args.get("--daemon"); - std::string logFileName = args.get("--log-file"); - std::string listenIP = args.get("--ip"); - uint16_t listenPort = args.get("--port"); - size_t executorThreads = args.get("--executor-threads"); - size_t webThreads = args.get("--web-threads"); - size_t dagThreads = args.get("--dag-threads"); + bool verbose = args.get("--verbose"); + bool asDaemon = args.get("--daemon"); + auto logFileName = args.get("--log-file"); + auto listenIP = args.get("--ip"); + auto listenPort = args.get("--port"); + auto executorThreads = args.get("--executor-threads"); + auto webThreads = args.get("--web-threads"); + auto dagThreads = args.get("--dag-threads"); if (logFileName == "-") { if (asDaemon) {