diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index aa4d771..4d044a0 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -6,6 +6,7 @@ #include #include +#include "ThreadPool.hpp" #include "loggers/dag_run/DAGRunLogger.hpp" #include "executors/task/TaskExecutor.hpp" diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 345922c..22b8d4f 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -31,14 +31,6 @@ namespace daggy { void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks); - // Blocking call - std::vector - runTask(DAGRunID runID, - const std::string &taskName, - const Task &task, - executors::task::TaskExecutor &executor, - loggers::dag_run::DAGRunLogger &logger); - TaskDAG runDAG(DAGRunID runID, executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger, diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index 298b57b..9b99040 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -1,6 +1,7 @@ #pragma once #include "TaskExecutor.hpp" +#include namespace daggy::executors::task { class ForkingTaskExecutor : public TaskExecutor { @@ -8,7 +9,7 @@ namespace daggy::executors::task { using Command = std::vector; ForkingTaskExecutor(size_t nThreads) - : TaskExecutor(nThreads) {} + : tp_(nThreads) {} // Validates the job to ensure that all required values are set and are of the right type, bool validateTaskParameters(const ConfigValues &job) override; @@ -17,7 +18,10 @@ namespace daggy::executors::task { expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - AttemptRecord execute(const std::string &taskName, const Task &task) override; + std::future execute(const std::string &taskName, const Task &task) override; + private: + ThreadPool tp_; + AttemptRecord runTask(const Task &task); }; } diff --git a/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp b/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp index 5dc3009..09145e1 100644 --- a/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp @@ -7,7 +7,8 @@ namespace daggy::executors::task { public: using Command = std::vector; - SlurmTaskExecutor(size_t nThreads); + SlurmTaskExecutor(); + ~SlurmTaskExecutor(); // Validates the job to ensure that all required values are set and are of the right type, bool validateTaskParameters(const ConfigValues &job) override; @@ -16,6 +17,21 @@ namespace daggy::executors::task { expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - AttemptRecord execute(const std::string &taskName, const Task &task) override; + std::future execute(const std::string &taskName, const Task &task) override; + + private: + struct Job { + std::promise prom; + std::string stdoutFile; + std::string stderrFile; + }; + + std::mutex promiseGuard_; + std::unordered_map runningJobs_; + std::atomic running_; + + // Monitors jobs and resolves promises + std::thread monitorWorker_; + void monitor(); }; } diff --git a/daggy/include/daggy/executors/task/TaskExecutor.hpp b/daggy/include/daggy/executors/task/TaskExecutor.hpp index 14fb508..afed3d9 100644 --- a/daggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/TaskExecutor.hpp @@ -6,8 +6,7 @@ #include #include -#include "daggy/Defines.hpp" -#include "daggy/ThreadPool.hpp" +#include /* Executors run Tasks, returning a future with the results. @@ -17,7 +16,7 @@ namespace daggy::executors::task { class TaskExecutor { public: - TaskExecutor(size_t nThreads) : threadPool(nThreads) {}; + virtual ~TaskExecutor() = default; // Validates the job to ensure that all required values are set and are of the right type, virtual bool validateTaskParameters(const ConfigValues &job) = 0; @@ -27,8 +26,6 @@ namespace daggy::executors::task { expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) = 0; // Blocking execution of a task - virtual AttemptRecord execute(const std::string &taskName, const Task &task) = 0; - - ThreadPool threadPool; + virtual std::future execute(const std::string &taskName, const Task &task) = 0; }; } diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index 815d48e..d8ed8c7 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -16,6 +16,8 @@ namespace daggy { namespace dag_run { class DAGRunLogger { public: + virtual ~DAGRunLogger() = default; + // Execution virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index b133778..198e071 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -1,8 +1,11 @@ +#include #include #include #include +using namespace std::chrono_literals; + namespace daggy { std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) { size_t pos = string.find(pattern); @@ -64,7 +67,9 @@ namespace daggy { executor.validateTaskParameters(task.job); const auto newJobs = executor.expandTaskParameters(task.job, interpolatedValues); if (newJobs.size() == 1) { - newTaskSet.emplace(baseName, task); + Task newTask{task}; + newTask.job = newJobs.front(); + newTaskSet.emplace(baseName, newTask); } else { size_t i = 0; for (const auto &newJob: newJobs) { @@ -126,23 +131,6 @@ namespace daggy { return dag; } - std::vector runTask(DAGRunID runID, - const std::string &taskName, - const Task &task, - executors::task::TaskExecutor &executor, - loggers::dag_run::DAGRunLogger &logger) { - std::vector attempts; - logger.updateTaskState(runID, taskName, RunState::RUNNING); - - while (attempts.size() < task.maxRetries + 1) { - attempts.push_back(executor.execute(taskName, task)); - logger.logTaskAttempt(runID, taskName, attempts.back()); - if (attempts.back().rc == 0) break; - logger.updateTaskState(runID, taskName, RunState::RETRY); - } - return attempts; - } - TaskDAG runDAG(DAGRunID runID, executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger, @@ -151,7 +139,8 @@ namespace daggy { ) { logger.updateDAGRunState(runID, RunState::RUNNING); - std::unordered_map>> runningTasks; + std::unordered_map> runningTasks; + std::unordered_map taskAttemptCounts; size_t running = 0; size_t errored = 0; @@ -159,19 +148,16 @@ namespace daggy { // Check for any completed tasks for (auto &[taskName, fut]: runningTasks) { if (fut.valid()) { - auto attemptRecords = fut.get(); - if (attemptRecords.empty()) { - logger.updateTaskState(runID, taskName, RunState::ERRORED); - ++errored; - } - if (attemptRecords.back().rc == 0) { + auto attempt = fut.get(); + logger.logTaskAttempt(runID, taskName, attempt); + if (attempt.rc == 0) { logger.updateTaskState(runID, taskName, RunState::COMPLETED); auto &vert = dag.getVertex(taskName); auto &task = vert.data; if (task.isGenerator) { // Parse the output and update the DAGs try { - auto newTasks = expandTaskSet(tasksFromJSON(attemptRecords.back().outputLog), + auto newTasks = expandTaskSet(tasksFromJSON(attempt.outputLog), executor, parameters ); @@ -195,31 +181,36 @@ namespace daggy { dag.completeVisit(taskName); --running; } else { - logger.updateTaskState(runID, taskName, RunState::ERRORED); - ++errored; + // RC isn't 0 + const auto & task = dag.getVertex(taskName).data; + if (taskAttemptCounts[taskName] <= task.maxRetries) { + logger.updateTaskState(runID, taskName, RunState::RETRY); + runningTasks.extract(taskName); + runningTasks.emplace(taskName, executor.execute(taskName, task)); + ++taskAttemptCounts[taskName]; + } else { + logger.updateTaskState(runID, taskName, RunState::ERRORED); + ++errored; + } } } } // Add all remaining tasks in a task queue to avoid dominating the thread pool - auto tq = std::make_shared(); auto t = dag.visitNext(); while (t.has_value()) { // Schedule the task to run auto &taskName = t.value().first; auto &task = t.value().second; - runningTasks.emplace(taskName, tq->addTask([runID, taskName, task, &executor, &logger]() { - return runTask(runID, taskName, task, executor, logger); - })); + taskAttemptCounts[taskName] = 1; + + runningTasks.emplace(taskName, executor.execute(taskName, task)); ++running; auto nextTask = dag.visitNext(); if (not nextTask.has_value()) break; t.emplace(nextTask.value()); } - if (!tq->empty()) { - executor.threadPool.addTasks(tq); - } if (running > 0 and errored == running) { logger.updateDAGRunState(runID, RunState::ERRORED); break; diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index 40c793b..521eea2 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -31,8 +31,14 @@ std::string slurp(int fd) { return result; } + +std::future +ForkingTaskExecutor::execute(const std::string &taskName, const Task &task) { + return tp_.addTask([&](){return runTask(task);}); +} + daggy::AttemptRecord -ForkingTaskExecutor::execute(const std::string &, const Task &task) { +ForkingTaskExecutor::runTask(const Task &task) { AttemptRecord rec; rec.startTime = Clock::now(); diff --git a/daggy/src/executors/task/SlurmTaskExecutor.cpp b/daggy/src/executors/task/SlurmTaskExecutor.cpp index fcc4a55..c9914aa 100644 --- a/daggy/src/executors/task/SlurmTaskExecutor.cpp +++ b/daggy/src/executors/task/SlurmTaskExecutor.cpp @@ -1,3 +1,5 @@ +#include +#include #ifdef DAGGY_ENABLE_SLURM #include #include @@ -44,8 +46,10 @@ namespace daggy::executors::task { dest.swap(contents); } - SlurmTaskExecutor::SlurmTaskExecutor(size_t nThreads) - : TaskExecutor(nThreads) { + SlurmTaskExecutor::SlurmTaskExecutor() + : running_(true) + , monitorWorker_(&SlurmTaskExecutor::monitor, this) + { std::string priority = "SLURM_PRIO_PROCESS=" + std::to_string(getpriority(PRIO_PROCESS, 0)); std::string submitDir = "SLURM_SUBMIT_DIR=" + fs::current_path().string(); @@ -71,6 +75,11 @@ namespace daggy::executors::task { putenv(const_cast(ss.str().c_str())); } + SlurmTaskExecutor::~SlurmTaskExecutor() { + running_ = false; + monitorWorker_.join(); + } + // Validates the job to ensure that all required values are set and are of the right type, bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job) { const std::unordered_set requiredFields{ @@ -107,8 +116,8 @@ namespace daggy::executors::task { return newValues; } - AttemptRecord SlurmTaskExecutor::execute(const std::string &taskName, const Task &task) { - AttemptRecord record; + std::future + SlurmTaskExecutor::execute(const std::string &taskName, const Task &task) { std::stringstream executorLog; const auto &job = task.job; @@ -122,9 +131,12 @@ namespace daggy::executors::task { // Convert command to argc / argv std::vector argv{nullptr}; const auto command = std::get>(task.job.at("command")); - for (const auto &s: command) { - argv.push_back(const_cast(s.c_str())); - } + std::transform(command.begin(), + command.end(), + std::back_inserter(argv), + [](const std::string & s) { + return const_cast(s.c_str()); + }); char empty[] = ""; char *env[1]; @@ -170,78 +182,93 @@ namespace daggy::executors::task { error_code = slurm_submit_batch_job(&jd, &resp_msg); if (error_code) { - record.rc = error_code; - executorLog << "Unable to submit slurm job: " + std::stringstream ss; + ss << "Unable to submit slurm job: " << slurm_strerror(error_code); - record.executorLog = executorLog.str(); - return record; + throw std::runtime_error(ss.str()); } uint32_t jobID = resp_msg->job_id; executorLog << "Job " << resp_msg->job_submit_user_msg << '\n'; slurm_free_submit_response_response_msg(resp_msg); - // Pool until done - job_info_msg_t * jobStatus; - bool waiting = true; - while (waiting) { - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - error_code = slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL); - if (error_code == SLURM_SUCCESS) { - uint32_t idx = jobStatus->record_count; - if (idx > 0) { + std::lock_guard lock(promiseGuard_); + Job newJob{ + .prom{}, + .stdoutFile = stdoutFile, + .stderrFile = stderrFile + }; + auto fut = newJob.prom.get_future(); + runningJobs_.emplace(jobID, std::move(newJob)); + + return fut; + } + + void SlurmTaskExecutor::monitor() { + std::unordered_set resolvedJobs; + while (running_) { + { + std::lock_guard lock(promiseGuard_); + for (auto & [jobID, job] : runningJobs_) { + job_info_msg_t * jobStatus; + int error_code = slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL); + if (error_code != SLURM_SUCCESS) continue; + + uint32_t idx = jobStatus->record_count; + if (idx == 0) continue; idx--; const slurm_job_info_t & jobInfo = jobStatus->job_array[idx]; + AttemptRecord record; switch(jobInfo.job_state) { case JOB_PENDING: case JOB_SUSPENDED: case JOB_RUNNING: continue; break; - // Job has finished + // Job has finished case JOB_COMPLETE: /* completed execution successfully */ case JOB_FAILED: /* completed execution unsuccessfully */ - record.rc = jobInfo.exit_code; - executorLog << "Script errored.\n"; + record.executorLog = "Script errored.\n"; break; case JOB_CANCELLED: /* cancelled by user */ - executorLog << "Job cancelled by user.\n"; + record.executorLog = "Job cancelled by user.\n"; break; case JOB_TIMEOUT: /* terminated on reaching time limit */ - executorLog << "Job exceeded time limit.\n"; + record.executorLog = "Job exceeded time limit.\n"; break; case JOB_NODE_FAIL: /* terminated on node failure */ - executorLog << "Node failed during execution\n"; + record.executorLog = "Node failed during execution\n"; break; case JOB_PREEMPTED: /* terminated due to preemption */ - executorLog << "Job terminated due to pre-emption.\n"; + record.executorLog = "Job terminated due to pre-emption.\n"; break; case JOB_BOOT_FAIL: /* terminated due to node boot failure */ - executorLog << "Job failed to run due to failure of compute node to boot.\n"; + record.executorLog = "Job failed to run due to failure of compute node to boot.\n"; break; case JOB_DEADLINE: /* terminated on deadline */ - executorLog << "Job terminated due to deadline.\n"; + record.executorLog = "Job terminated due to deadline.\n"; break; case JOB_OOM: /* experienced out of memory error */ - executorLog << "Job terminated due to out-of-memory.\n"; + record.executorLog = "Job terminated due to out-of-memory.\n"; break; } - waiting = false; record.rc = jobInfo.exit_code; + slurm_free_job_info_msg(jobStatus); + + readAndClean(job.stdoutFile, record.outputLog); + readAndClean(job.stderrFile, record.errorLog); + + job.prom.set_value(std::move(record)); + resolvedJobs.insert(jobID); } - slurm_free_job_info_msg(jobStatus); - readAndClean(stdoutFile, record.outputLog); - readAndClean(stderrFile, record.errorLog); - record.executorLog = executorLog.str(); - } else { - executorLog << "Failed to poll for job: " - << slurm_strerror(error_code); + for (const auto &jobID : resolvedJobs) { + runningJobs_.extract(jobID); + } } + + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - - - return record; } } #endif diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index 98d80bc..5e6195d 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -16,7 +16,8 @@ TEST_CASE("forking_executor", "[forking_executor]") { REQUIRE(ex.validateTaskParameters(task.job)); - auto rec = ex.execute("command", task); + auto recFuture = ex.execute("command", task); + auto rec = recFuture.get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -27,7 +28,8 @@ TEST_CASE("forking_executor", "[forking_executor]") { daggy::Task task{.job{ {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}}; - auto rec = ex.execute("command", task); + auto recFuture = ex.execute("command", task); + auto rec = recFuture.get(); REQUIRE(rec.rc == 2); REQUIRE(rec.errorLog.size() >= 20); @@ -45,7 +47,8 @@ TEST_CASE("forking_executor", "[forking_executor]") { daggy::Task task{.job{ {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/cat", bigFile}}}}; - auto rec = ex.execute("command", task); + auto recFuture = ex.execute("command", task); + auto rec = recFuture.get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); @@ -80,4 +83,4 @@ TEST_CASE("forking_executor", "[forking_executor]") { REQUIRE(tasks.size() == 4); } -} \ No newline at end of file +} diff --git a/tests/unit_executor_slurmexecutor.cpp b/tests/unit_executor_slurmexecutor.cpp index c74f617..20151e8 100644 --- a/tests/unit_executor_slurmexecutor.cpp +++ b/tests/unit_executor_slurmexecutor.cpp @@ -15,7 +15,7 @@ namespace fs = std::filesystem; #ifdef DAGGY_ENABLE_SLURM TEST_CASE("slurm_execution", "[slurm_executor]") { - daggy::executors::task::SlurmTaskExecutor ex(10); + daggy::executors::task::SlurmTaskExecutor ex; daggy::ConfigValues defaultJobValues{ {"minCPUs", "1"}, @@ -37,7 +37,8 @@ TEST_CASE("slurm_execution", "[slurm_executor]") { REQUIRE(ex.validateTaskParameters(task.job)); - auto rec = ex.execute("command", task); + auto recFuture = ex.execute("command", task); + auto rec = recFuture.get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -49,7 +50,8 @@ TEST_CASE("slurm_execution", "[slurm_executor]") { {"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}}; task.job.merge(defaultJobValues); - auto rec = ex.execute("command", task); + auto recFuture = ex.execute("command", task); + auto rec = recFuture.get(); REQUIRE(rec.rc != 0); REQUIRE(rec.errorLog.size() >= 20); @@ -68,7 +70,8 @@ TEST_CASE("slurm_execution", "[slurm_executor]") { {"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/cat", bigFile}}}}; task.job.merge(defaultJobValues); - auto rec = ex.execute("command", task); + auto recFuture = ex.execute("command", task); + auto rec = recFuture.get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); diff --git a/tests/unit_threadpool.cpp b/tests/unit_threadpool.cpp index 9191a9a..8492af6 100644 --- a/tests/unit_threadpool.cpp +++ b/tests/unit_threadpool.cpp @@ -38,4 +38,25 @@ TEST_CASE("threadpool", "[threadpool]") { for (auto &r: res) r.get(); REQUIRE(cnt == 100); } + + SECTION("parallel") { + std::vector> res; + using namespace std::chrono_literals; + std::atomic maxCnt{0}; + for (size_t i = 0; i < 100; ++i) + res.push_back(tp.addTask([&cnt,&maxCnt, i]() { + auto delay = 20ms; + uint32_t current = cnt.fetch_add(1); + delay += i * 1ms; + std::this_thread::sleep_for(delay); + if (current > maxCnt) { + maxCnt = current; + } + cnt--; + return; + })); + for (auto &r: res) r.get(); + REQUIRE(maxCnt > 1); + } + } diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index e22d918..7e67b15 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -185,7 +185,7 @@ int main(int argc, char **argv) { } #ifdef DAGGY_ENABLE_SLURM - daggy::executors::task::SlurmTaskExecutor executor(executorThreads); + daggy::executors::task::SlurmTaskExecutor executor; #else daggy::executors::task::ForkingTaskExecutor executor(executorThreads); #endif