From 4da78c9dd6ba0f29f614490d66a4c0e86018a379 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 7 Jan 2022 14:37:06 -0400 Subject: [PATCH] Adding executor description for verbose output, adding output directory for build artifacts --- CMakeLists.txt | 4 ++++ daggyd/daggyd/daggyd.cpp | 7 ++++--- libdaggy/include/daggy/ThreadPool.hpp | 5 +++++ .../task/DaggyRunnerTaskExecutor.hpp | 2 ++ .../executors/task/ForkingTaskExecutor.hpp | 2 ++ .../daggy/executors/task/NoopTaskExecutor.hpp | 2 ++ .../executors/task/SlurmTaskExecutor.hpp | 2 ++ .../daggy/executors/task/TaskExecutor.hpp | 3 +++ .../task/DaggyRunnerTaskExecutor.cpp | 19 +++++++++++++++++++ .../executors/task/ForkingTaskExecutor.cpp | 7 +++++++ .../src/executors/task/NoopTaskExecutor.cpp | 6 ++++++ .../src/executors/task/SlurmTaskExecutor.cpp | 5 +++++ 12 files changed, 61 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3fcfb9b..a594844 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,10 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_EXPORT_COMPILE_COMMANDS True) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror") +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) + if(CMAKE_BUILD_TYPE MATCHES "Debug") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer") # set(TSAN_OPTIONS "suppressions=${CMAKE_CURRENT_DIR}/tests/tsan.supp") diff --git a/daggyd/daggyd/daggyd.cpp b/daggyd/daggyd/daggyd.cpp index 1a3486e..1b39260 100644 --- a/daggyd/daggyd/daggyd.cpp +++ b/daggyd/daggyd/daggyd.cpp @@ -270,11 +270,15 @@ int main(int argc, char **argv) doc.SetObject(); } + auto logger = loggerFactory(doc); + auto executor = executorFactory(doc); + if (verbose) { std::cout << "Server running at http://" << listenIP << ':' << listenPort << std::endl << "Max DAG Processing: " << dagThreads << std::endl << "Max Web Clients: " << webThreads << std::endl + << "Executor: " << executor->description() << std::endl << std::endl << "Ctrl-C to exit" << std::endl; } @@ -283,9 +287,6 @@ int main(int argc, char **argv) daemonize(); } - auto logger = loggerFactory(doc); - auto executor = executorFactory(doc); - Pistache::Address listenSpec(listenIP, listenPort); daggy::daggyd::Server server(listenSpec, *logger, *executor, dagThreads); diff --git a/libdaggy/include/daggy/ThreadPool.hpp b/libdaggy/include/daggy/ThreadPool.hpp index 4208d47..a4212aa 100644 --- a/libdaggy/include/daggy/ThreadPool.hpp +++ b/libdaggy/include/daggy/ThreadPool.hpp @@ -169,6 +169,11 @@ namespace daggy { cv_.notify_one(); } + size_t size() const + { + return workers_.size(); + } + private: // need to keep track of threads, so we can join them std::vector workers_; diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 2fb5fdc..65be9b1 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -51,6 +51,8 @@ namespace daggy::executors::task { bool stop(DAGRunID runID, const std::string &taskName) override; + std::string description() const; + void addRunner(const std::string &url); private: diff --git a/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index a142796..f5b1be5 100644 --- a/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -31,6 +31,8 @@ namespace daggy::executors::task { bool stop(DAGRunID runID, const std::string &taskName) override; + std::string description() const override; + private: ThreadPool tp_; std::mutex taskControlsGuard_; diff --git a/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp index 92730cc..fa104ab 100644 --- a/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp @@ -21,5 +21,7 @@ namespace daggy::executors::task { const Task &task) override; bool stop(DAGRunID runID, const std::string &taskName) override; + + std::string description() const; }; } // namespace daggy::executors::task diff --git a/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp index 90e3e2e..1f2751a 100644 --- a/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp @@ -25,6 +25,8 @@ namespace daggy::executors::task { bool stop(DAGRunID runID, const std::string &taskName) override; + std::string description() const override; + private: struct Job { diff --git a/libdaggy/include/daggy/executors/task/TaskExecutor.hpp b/libdaggy/include/daggy/executors/task/TaskExecutor.hpp index f2c02ec..69165f5 100644 --- a/libdaggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/TaskExecutor.hpp @@ -33,5 +33,8 @@ namespace daggy::executors::task { // Kill a currently executing task. This will resolve the future. virtual bool stop(DAGRunID runID, const std::string &taskName) = 0; + + // Report a description of the executor + virtual std::string description() const = 0; }; } // namespace daggy::executors::task diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index d89e771..4924b21 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -88,6 +88,25 @@ DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() monitorWorker_.join(); } +std::string DaggyRunnerTaskExecutor::description() const +{ + std::stringstream ss; + ss << "DaggyRunnerTaskExecutor running with " << runners_.size() + << " runners: ["; + bool first = true; + for (const auto &[runner, _] : runners_) { + if (first) { + first = false; + } + else { + ss << ", "; + } + ss << runner; + } + ss << "]"; + return ss.str(); +} + // Validates the job to ensure that all required values are set and are of // the right type, bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) diff --git a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp index 3c232e5..ca39994 100644 --- a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp @@ -72,6 +72,13 @@ ForkingTaskExecutor::~ForkingTaskExecutor() taskControls_.clear(); } +std::string ForkingTaskExecutor::description() const +{ + std::stringstream ss; + ss << "ForkingTaskExecutor with " << tp_.size() << " threads"; + return ss.str(); +} + bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName) { std::string key = std::to_string(runID) + "_" + taskName; diff --git a/libdaggy/src/executors/task/NoopTaskExecutor.cpp b/libdaggy/src/executors/task/NoopTaskExecutor.cpp index 5d9d434..08e1514 100644 --- a/libdaggy/src/executors/task/NoopTaskExecutor.cpp +++ b/libdaggy/src/executors/task/NoopTaskExecutor.cpp @@ -2,6 +2,12 @@ #include namespace daggy::executors::task { + + std::string NoopTaskExecutor::description() const + { + return "NoopTaskExecutor"; + } + std::future NoopTaskExecutor::execute( DAGRunID runID, const std::string &taskName, const Task &task) { diff --git a/libdaggy/src/executors/task/SlurmTaskExecutor.cpp b/libdaggy/src/executors/task/SlurmTaskExecutor.cpp index 26b44a5..b792df2 100644 --- a/libdaggy/src/executors/task/SlurmTaskExecutor.cpp +++ b/libdaggy/src/executors/task/SlurmTaskExecutor.cpp @@ -93,6 +93,11 @@ namespace daggy::executors::task { runningJobs_.clear(); } + std::string SlurmTaskExecutor::description() const + { + return "SlurmTaskExecutor"; + } + // Validates the job to ensure that all required values are set and are of // the right type, bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job)