diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 23a930b..d57f526 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -197,7 +197,7 @@ namespace daggy::daggyd { queueDAG_(runID, dag, dagSpec.taskConfig); response.send(Pistache::Http::Code::Ok, - R"({"runID": )" + std::to_string(runID) + "}"); + R"({"runID": )" + std::to_string(runID) + "}\n"); } void Server::handleValidateDAG(const Pistache::Rest::Request &request, @@ -205,12 +205,13 @@ namespace daggy::daggyd { { try { dagFromJSON(request.body()); - response.send(Pistache::Http::Code::Ok, R"({"valid": true})"); + response.send(Pistache::Http::Code::Ok, R"({"valid": true}\n)"); } catch (std::exception &e) { std::string error = e.what(); - response.send(Pistache::Http::Code::Ok, - std::string{R"({"valid": true, "error": })"} + error + "}"); + response.send( + Pistache::Http::Code::Ok, + std::string{R"({"valid": true, "error": })"} + error + "}\n"); } } @@ -275,7 +276,7 @@ namespace daggy::daggyd { ss << '}' // end of taskCounts << '}'; // end of item } - ss << ']'; + ss << "]\n"; } else { // HTML @@ -311,7 +312,7 @@ namespace daggy::daggyd { } ss << ""; } - ss << ""; + ss << "\n"; } response.send(Pistache::Http::Code::Ok, ss.str()); } @@ -388,7 +389,7 @@ namespace daggy::daggyd { ss << stateUpdateRecordToJSON(change); } ss << "]"; - ss << '}'; + ss << "}\n"; } else { std::unordered_map stateCounts; @@ -591,7 +592,7 @@ namespace daggy::daggyd { << attempt.executorLog << ""; } - ss << ""; + ss << "\n"; } response.send(Pistache::Http::Code::Ok, ss.str()); } @@ -610,7 +611,7 @@ namespace daggy::daggyd { std::stringstream ss; ss << R"({ "runID": )" << runID << R"(, "taskName": )" << std::quoted(taskName) << R"(, "state": )" - << std::quoted(state._to_string()) << '}'; + << std::quoted(state._to_string()) << "}\n"; response.send(Pistache::Http::Code::Ok, ss.str()); } catch (std::exception &e) { @@ -642,7 +643,7 @@ namespace daggy::daggyd { void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - response.send(Pistache::Http::Code::Ok, R"({ "msg": "Ya like DAGs?"})"); + response.send(Pistache::Http::Code::Ok, R"({ "msg": "Ya like DAGs?"}\n)"); } /* diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp index e7b95ef..4457a52 100644 --- a/daggyr/libdaggyr/include/daggyr/Server.hpp +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -9,8 +9,8 @@ #include #include #include +#include #include -#include #define DAGGY_REST_HANDLER(func) \ void func(const Pistache::Rest::Request &request, \ @@ -58,6 +58,8 @@ namespace daggy::daggyr { executors::task::ForkingTaskExecutor executor_; + using TaskID = std::pair; + struct TaskRecord { RunState state; @@ -68,17 +70,21 @@ namespace daggy::daggyr { Capacity maxCapacity_; Capacity curCapacity_; - std::mutex pendingGuard_; - struct PendingJob { - DAGRunID runID; - std::string taskName; - std::future fut; + daggy::executors::task::TaskFuture fut; Capacity resourcesUsed; bool resolved; }; - std::list pending_; + void monitor(); + std::atomic running_; + std::thread monitorWorker_; + + std::mutex pendingGuard_; + std::unordered_map pending_; + + std::mutex resolvedGuard_; + std::deque resolved_; }; } // namespace daggy::daggyr diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index f0ee0df..c41808f 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -38,6 +38,8 @@ namespace daggy::daggyr { , executor_(maxCores) , maxCapacity_{maxCores, maxMemoryMB} , curCapacity_{maxCores, maxMemoryMB} + , running_(true) + , monitorWorker_(&Server::monitor, this) { } @@ -63,6 +65,8 @@ namespace daggy::daggyr { void Server::shutdown() { endpoint_.shutdown(); + running_ = false; + monitorWorker_.join(); } uint16_t Server::getPort() const @@ -154,63 +158,92 @@ namespace daggy::daggyr { { std::lock_guard lock(pendingGuard_); - pending_.push_back( - PendingJob{.runID = runID, - .taskName = taskName, - .fut = executor_.execute(runID, taskName, task), - .resourcesUsed = resourcesUsed}); + pending_.emplace(std::make_pair(runID, taskName), + PendingJob{ + .fut = executor_.execute(runID, taskName, task), + .resourcesUsed = resourcesUsed, + }); } response.send(Pistache::Http::Code::Ok, ""); } + void Server::monitor() + { + std::unordered_map resolved; + while (running_) { + resolved.clear(); + std::vector resolvedIDs; + { + std::lock_guard lock(pendingGuard_); + for (const auto &[tid, job] : pending_) { + if (job.fut->ready()) { + resolved.emplace(tid, job.fut->get()); + resolvedIDs.push_back(tid); + } + } + + for (const auto &tid : resolvedIDs) { + pending_.extract(tid); + } + } + + std::unordered_map payloads; + for (const auto &[tid, attempt] : resolved) { + std::stringstream ss; + ss << R"({ "runID": )" << tid.first << R"(, "taskName": )" + << std::quoted(tid.second) << ", " + << R"("state": "COMPLETED", "attempt":)" + << attemptRecordToJSON(attempt) << "}"; + payloads.emplace(tid, ss.str()); + } + + { + std::lock_guard lock(resolvedGuard_); + for (const auto &[_, item] : payloads) + resolved_.push_back(item); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } + void Server::handlePollTasks(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; + auto ss = Clock::now(); std::stringstream payload; payload << "["; bool first = true; - - // Check to see if it's pending - std::lock_guard lock(pendingGuard_); - auto it = pending_.begin(); - while (it != pending_.end()) { - if (first) { - first = false; - } - else { - payload << ", "; - } - - payload << R"({ "runID": )" << it->runID << R"(, "taskName": )" - << std::quoted(it->taskName) << ", "; - - // poll it - if (it->fut.valid() and - it->fut.wait_for(1ms) == std::future_status::ready) { - auto attempt = it->fut.get(); - - payload << R"("state": "COMPLETED", "attempt":)" - << attemptRecordToJSON(attempt); - { - std::lock_guard rlock(capacityGuard_); - curCapacity_.cores += it->resourcesUsed.cores; - curCapacity_.memoryMB += it->resourcesUsed.memoryMB; + size_t cnt = 0; + { + std::lock_guard lock(resolvedGuard_); + cnt = resolved_.size(); + for (const auto &item : resolved_) { + if (first) { + first = false; } - it = pending_.erase(it); + else { + payload << ", "; + } + payload << item; } - else { - payload << R"("state": "PENDING")"; - ++it; - } - payload << "}"; + resolved_.clear(); } payload << "]"; - response.send(Pistache::Http::Code::Ok, payload.str()); + auto payloadStr = payload.str(); + response.send(Pistache::Http::Code::Ok, payloadStr); + auto ee = Clock::now(); + + std::cout + << "Completed request: with " << cnt << " updates in" + << " total (" + << std::chrono::duration_cast(ee - ss).count() + << " ns)\n"; } void Server::handleStopTask(const Pistache::Rest::Request &request, diff --git a/libdaggy/include/daggy/DAGRunner.hpp b/libdaggy/include/daggy/DAGRunner.hpp index f821de2..caaa8ab 100644 --- a/libdaggy/include/daggy/DAGRunner.hpp +++ b/libdaggy/include/daggy/DAGRunner.hpp @@ -47,7 +47,8 @@ namespace daggy { ssize_t nRunningTasks_; ssize_t nErroredTasks_; - std::unordered_map> runningTasks_; + std::unordered_map + runningTasks_; std::unordered_map taskAttemptCounts_; std::mutex runGuard_; diff --git a/libdaggy/include/daggy/Defines.hpp b/libdaggy/include/daggy/Defines.hpp index 4e7e1ee..942eb7b 100644 --- a/libdaggy/include/daggy/Defines.hpp +++ b/libdaggy/include/daggy/Defines.hpp @@ -9,6 +9,8 @@ #include #include +#include "Future.hpp" + namespace daggy { // Commands and parameters using ConfigValue = std::variant>; @@ -72,6 +74,7 @@ namespace daggy { std::string outputLog; // stdout from command std::string errorLog; // stderr from command }; + } // namespace daggy BETTER_ENUMS_DECLARE_STD_HASH(daggy::RunState) diff --git a/libdaggy/include/daggy/Future.hpp b/libdaggy/include/daggy/Future.hpp new file mode 100644 index 0000000..771e536 --- /dev/null +++ b/libdaggy/include/daggy/Future.hpp @@ -0,0 +1,113 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace daggy { + + enum class FutureState : uint8_t + { + NOT_READY, + OK, + ERROR, + }; + + template + class Future + { + public: + Future() + : state_{FutureState::NOT_READY} + , val_(std::nullopt) + { + } + + FutureState state() + { + return state_; + } + + void set(const T val) + { + if (val_) { + std::cout << "Future already has a value!" << std::endl; + throw std::runtime_error("Future already has a value"); + } + val_.emplace(val); + state_ = FutureState::OK; + } + + bool ready() const + { + return state_.load() != FutureState::NOT_READY; + } + + void setException(const std::exception &e) + { + exp_ = e; + state_ = FutureState::ERROR; + } + + T get() + { + while (!ready()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + if (state_ == FutureState::ERROR) + throw exp_; + return *val_; + } + + private: + std::atomic state_; + std::optional val_; + std::exception exp_; + }; + + template <> + struct Future + { + public: + Future() + : state_{FutureState::NOT_READY} + { + } + + FutureState state() + { + return state_; + } + + bool ready() const + { + return state_ != FutureState::NOT_READY; + } + + void set() + { + state_ = FutureState::OK; + } + + void setException(const std::exception &e) + { + exp_ = e; + state_ = FutureState::ERROR; + } + + void get() + { + if (state_ == FutureState::NOT_READY) + throw std::runtime_error("Value is not ready"); + if (state_ == FutureState::ERROR) + throw exp_; + } + + private: + std::atomic state_; + std::exception exp_; + }; +} // namespace daggy diff --git a/libdaggy/include/daggy/ThreadPool.hpp b/libdaggy/include/daggy/ThreadPool.hpp index 5774d10..f829ac2 100644 --- a/libdaggy/include/daggy/ThreadPool.hpp +++ b/libdaggy/include/daggy/ThreadPool.hpp @@ -3,17 +3,17 @@ #include #include #include -#include -#include -#include #include #include #include #include +#include "Future.hpp" + using namespace std::chrono_literals; namespace daggy { + class ThreadPool { public: @@ -65,7 +65,7 @@ namespace daggy { for (size_t i = 0; i < nWorkers; ++i) workers_.emplace_back([&] { - std::packaged_task task; + std::function task; while (true) { { std::unique_lock lock(mtx_); @@ -88,15 +88,30 @@ namespace daggy { { if (drain_) throw std::runtime_error("Unable to add task to draining pool"); + using return_type = std::invoke_result_t; - std::packaged_task task( - std::bind(std::forward(f), std::forward(args)...)); + auto callable = + std::bind(std::forward(f), std::forward(args)...); + auto res = std::make_shared>(); - std::future res = task.get_future(); { std::lock_guard guard(mtx_); - tasks_.emplace(std::move(task)); + tasks_.emplace([res, task = std::move(callable)]() -> void { + try { + if constexpr ((std::is_same::value)) { + task(); + res->set(); + } + else { + return_type val = task(); + res->set(val); + } + } + catch (std::exception &e) { + res->setException(e); + } + }); } cv_.notify_one(); return res; @@ -117,7 +132,7 @@ namespace daggy { // need to keep track of threads, so we can join them std::vector workers_; // the task queue - std::queue> tasks_; + std::queue> tasks_; // synchronization std::mutex mtx_; diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 1a9d268..70ec0b8 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -45,9 +45,8 @@ namespace daggy::executors::task { const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - std::future execute(DAGRunID runID, - const std::string &taskName, - const Task &task) override; + TaskFuture execute(DAGRunID runID, const std::string &taskName, + const Task &task) override; bool stop(DAGRunID runID, const std::string &taskName) override; @@ -60,7 +59,7 @@ namespace daggy::executors::task { struct RunningTask { - std::promise prom; + TaskFuture fut; DAGRunID runID; std::string taskName; std::string runnerURL; diff --git a/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index f5b1be5..29b996f 100644 --- a/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -25,9 +25,8 @@ namespace daggy::executors::task { const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - std::future execute(DAGRunID runID, - const std::string &taskName, - const Task &task) override; + TaskFuture execute(DAGRunID runID, const std::string &taskName, + const Task &task) override; bool stop(DAGRunID runID, const std::string &taskName) override; diff --git a/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp index fa104ab..b4c3bf2 100644 --- a/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/NoopTaskExecutor.hpp @@ -16,12 +16,11 @@ namespace daggy::executors::task { const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - std::future execute(DAGRunID runID, - const std::string &taskName, - const Task &task) override; + TaskFuture execute(DAGRunID runID, const std::string &taskName, + const Task &task) override; bool stop(DAGRunID runID, const std::string &taskName) override; - std::string description() const; + std::string description() const override; }; } // namespace daggy::executors::task diff --git a/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp index 1f2751a..07b105c 100644 --- a/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/SlurmTaskExecutor.hpp @@ -19,9 +19,8 @@ namespace daggy::executors::task { const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - std::future execute(DAGRunID runID, - const std::string &taskName, - const Task &task) override; + TaskFuture execute(DAGRunID runID, const std::string &taskName, + const Task &task) override; bool stop(DAGRunID runID, const std::string &taskName) override; @@ -30,7 +29,7 @@ namespace daggy::executors::task { private: struct Job { - std::promise prom; + TaskFuture fut; std::string stdoutFile; std::string stderrFile; DAGRunID runID; diff --git a/libdaggy/include/daggy/executors/task/TaskExecutor.hpp b/libdaggy/include/daggy/executors/task/TaskExecutor.hpp index 69165f5..9db7dae 100644 --- a/libdaggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/TaskExecutor.hpp @@ -13,6 +13,8 @@ */ namespace daggy::executors::task { + using TaskFuture = std::shared_ptr>; + class TaskExecutor { public: @@ -27,9 +29,8 @@ namespace daggy::executors::task { const ConfigValues &job, const ConfigValues &expansionValues) = 0; // Blocking execution of a task - virtual std::future execute(DAGRunID runID, - const std::string &taskName, - const Task &task) = 0; + virtual TaskFuture execute(DAGRunID runID, const std::string &taskName, + const Task &task) = 0; // Kill a currently executing task. This will resolve the future. virtual bool stop(DAGRunID runID, const std::string &taskName) = 0; diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp index 342d17e..deed5f4 100644 --- a/libdaggy/src/DAGRunner.cpp +++ b/libdaggy/src/DAGRunner.cpp @@ -102,12 +102,12 @@ namespace daggy { while (t.has_value()) { // Schedule the task to run auto &taskName = t.value().first; - auto &task = t.value().second; taskAttemptCounts_[taskName] = 1; logger_.updateTaskState(runID_, taskName, RunState::RUNNING); try { - auto fut = executor_.execute(runID_, taskName, task); + auto &task = t.value().second; + auto fut = executor_.execute(runID_, taskName, task); runningTasks_.emplace(taskName, std::move(fut)); } catch (std::exception &e) { @@ -125,8 +125,8 @@ namespace daggy { void DAGRunner::collectFinished() { for (auto &[taskName, fut] : runningTasks_) { - if (fut.valid() and fut.wait_for(1ms) == std::future_status::ready) { - auto attempt = fut.get(); + if (fut->ready()) { + auto attempt = fut->get(); logger_.logTaskAttempt(runID_, taskName, attempt); // Not a reference, since adding tasks will invalidate references diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index 0958cae..7e2a349 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -234,7 +234,7 @@ namespace daggy { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); + // curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index c7c3413..17479c8 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -145,8 +145,9 @@ std::vector DaggyRunnerTaskExecutor::expandTaskParameters( } // Runs the task -std::future DaggyRunnerTaskExecutor::execute( - DAGRunID runID, const std::string &taskName, const Task &task) +TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID, + const std::string &taskName, + const Task &task) { auto taskUsed = capacityFromTask(task); @@ -183,11 +184,9 @@ std::future DaggyRunnerTaskExecutor::execute( } if (impacts.empty()) { - std::promise prom; - auto fut = prom.get_future(); - AttemptRecord record{.rc = -1, - .executorLog = "No runners available for execution"}; - prom.set_value(std::move(record)); + auto fut = std::make_shared>(); + fut->set(AttemptRecord{ + .rc = -1, .executorLog = "No runners available for execution"}); return fut; } } @@ -217,22 +216,20 @@ std::future DaggyRunnerTaskExecutor::execute( } if (submitted_runner.empty()) { - std::promise prom; - auto fut = prom.get_future(); - AttemptRecord record{.rc = -1, - .executorLog = "No runners available for execution"}; - prom.set_value(std::move(record)); + auto fut = std::make_shared>(); + fut->set(AttemptRecord{ + .rc = -1, .executorLog = "No runners available for execution"}); return fut; } - RunningTask rt{.prom{}, + RunningTask rt{.fut = std::make_shared>(), .runID = runID, .taskName = taskName, .runnerURL = submitted_runner, .retries = 3, .resources = taskUsed}; - auto fut = rt.prom.get_future(); + TaskFuture fut = rt.fut; std::lock_guard lock(rtGuard_); runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); @@ -293,34 +290,34 @@ void DaggyRunnerTaskExecutor::monitor() rj::Document doc; try { auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); - if (code != HTTPCode::Ok) + if (code != HTTPCode::Ok) { + std::cout << "Unable to poll: " << code << ": " << dumpJSON(json) + << std::endl; continue; + } + doc.Swap(json); } catch (std::exception &e) { + std::cout << "Unable to poll: " << e.what() << std::endl; continue; } + std::cout << "Doc is now: " << doc.Size() << std::endl; const auto tasks = doc.GetArray(); for (size_t idx = 0; idx < tasks.Size(); ++idx) { const auto &task = tasks[idx]; auto tid = std::make_pair(task["runID"].GetInt64(), task["taskName"].GetString()); - - if (task["state"] == "PENDING") { - resolvedJobs.emplace(tid, std::nullopt); + auto it = taskResources.find(tid); + if (it != taskResources.end()) { + const auto &res = taskResources.at(tid); + caps.current.cores += res.cores; + caps.current.memoryMB += res.memoryMB; } - else { - auto it = taskResources.find(tid); - if (it != taskResources.end()) { - const auto &res = taskResources.at(tid); - caps.current.cores += res.cores; - caps.current.memoryMB += res.memoryMB; - } - auto attempt = attemptRecordFromJSON(task["attempt"]); - resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); - } + auto attempt = attemptRecordFromJSON(task["attempt"]); + resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); } } @@ -329,20 +326,11 @@ void DaggyRunnerTaskExecutor::monitor() std::lock_guard lock(rtGuard_); for (auto &[taskID, task] : runningTasks_) { auto it = resolvedJobs.find(taskID); - if (it == resolvedJobs.end()) { - --task.retries; - - if (task.retries == 0) { - AttemptRecord record{ - .rc = -1, .executorLog = "Unable to query runner for progress"}; - task.prom.set_value(std::move(record)); - completedTasks.emplace_back(taskID); - } + if (it == resolvedJobs.end()) continue; - } - else if (it->second.has_value()) { + if (it->second.has_value()) { // Task has completed - task.prom.set_value(it->second.value()); + task.fut->set(std::move(it->second.value())); completedTasks.emplace_back(taskID); } } @@ -351,6 +339,6 @@ void DaggyRunnerTaskExecutor::monitor() } } - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::seconds(10)); } } diff --git a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp index 0182b13..469693b 100644 --- a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp @@ -90,8 +90,9 @@ bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName) return true; } -std::future ForkingTaskExecutor::execute( - DAGRunID runID, const std::string &taskName, const Task &task) +TaskFuture ForkingTaskExecutor::execute(DAGRunID runID, + const std::string &taskName, + const Task &task) { std::string key = std::to_string(runID) + "_" + taskName; std::lock_guard lock(taskControlsGuard_); diff --git a/libdaggy/src/executors/task/NoopTaskExecutor.cpp b/libdaggy/src/executors/task/NoopTaskExecutor.cpp index 08e1514..f4ac61b 100644 --- a/libdaggy/src/executors/task/NoopTaskExecutor.cpp +++ b/libdaggy/src/executors/task/NoopTaskExecutor.cpp @@ -8,18 +8,20 @@ namespace daggy::executors::task { return "NoopTaskExecutor"; } - std::future NoopTaskExecutor::execute( - DAGRunID runID, const std::string &taskName, const Task &task) + TaskFuture NoopTaskExecutor::execute(DAGRunID runID, + const std::string &taskName, + const Task &task) { std::promise promise; - auto ts = Clock::now(); - promise.set_value(AttemptRecord{.startTime = ts, - .stopTime = ts, - .rc = 0, - .executorLog = taskName, - .outputLog = taskName, - .errorLog = taskName}); - return promise.get_future(); + auto ts = Clock::now(); + auto fut = std::make_shared>(); + fut->set(AttemptRecord{.startTime = ts, + .stopTime = ts, + .rc = 0, + .executorLog = taskName, + .outputLog = taskName, + .errorLog = taskName}); + return fut; } bool NoopTaskExecutor::validateTaskParameters(const ConfigValues &job) diff --git a/libdaggy/src/executors/task/SlurmTaskExecutor.cpp b/libdaggy/src/executors/task/SlurmTaskExecutor.cpp index b792df2..094d9c2 100644 --- a/libdaggy/src/executors/task/SlurmTaskExecutor.cpp +++ b/libdaggy/src/executors/task/SlurmTaskExecutor.cpp @@ -87,8 +87,7 @@ namespace daggy::executors::task { // Resolve the remaining futures std::lock_guard lock(promiseGuard_); for (auto &[jobID, job] : runningJobs_) { - job.prom.set_value( - AttemptRecord{.rc = -1, .executorLog = "executor killed"}); + job.fut->set(AttemptRecord{.rc = -1, .executorLog = "executor killed"}); } runningJobs_.clear(); } @@ -153,8 +152,9 @@ namespace daggy::executors::task { return newValues; } - std::future SlurmTaskExecutor::execute( - DAGRunID runID, const std::string &taskName, const Task &task) + TaskFuture SlurmTaskExecutor::execute(DAGRunID runID, + const std::string &taskName, + const Task &task) { std::stringstream executorLog; @@ -247,12 +247,12 @@ namespace daggy::executors::task { slurm_free_submit_response_response_msg(resp_msg); std::lock_guard lock(promiseGuard_); - Job newJob{.prom{}, + Job newJob{.fut = std::make_shared>(), .stdoutFile = stdoutFile, .stderrFile = stderrFile, .runID = runID, .taskName = taskName}; - auto fut = newJob.prom.get_future(); + auto fut = newJob.fut; runningJobs_.emplace(jobID, std::move(newJob)); return fut; @@ -348,7 +348,7 @@ namespace daggy::executors::task { readAndClean(job.stdoutFile, record.outputLog); readAndClean(job.stderrFile, record.errorLog); - job.prom.set_value(std::move(record)); + job.fut->set(std::move(record)); resolvedJobs.insert(jobID); } diff --git a/libdaggy/tests/unit_executor_forkingexecutor.cpp b/libdaggy/tests/unit_executor_forkingexecutor.cpp index 9738d3a..bcc82ee 100644 --- a/libdaggy/tests/unit_executor_forkingexecutor.cpp +++ b/libdaggy/tests/unit_executor_forkingexecutor.cpp @@ -23,7 +23,7 @@ TEST_CASE("forking_executor", "[forking_executor]") REQUIRE(ex.validateTaskParameters(task.job)); auto recFuture = ex.execute(0, "command", task); - auto rec = recFuture.get(); + auto rec = recFuture->get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -37,7 +37,7 @@ TEST_CASE("forking_executor", "[forking_executor]") REQUIRE(ex.validateTaskParameters(task.job)); auto recFuture = ex.execute(0, "command", task); - auto rec = recFuture.get(); + auto rec = recFuture->get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -71,7 +71,7 @@ TEST_CASE("forking_executor", "[forking_executor]") REQUIRE(ex.validateTaskParameters(task.job)); auto recFuture = ex.execute(0, "command", task); - auto rec = recFuture.get(); + auto rec = recFuture->get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -89,7 +89,7 @@ TEST_CASE("forking_executor", "[forking_executor]") "/usr/bin/expr", "1", "+", "+"}}}}; auto recFuture = ex.execute(0, "command", task); - auto rec = recFuture.get(); + auto rec = recFuture->get(); REQUIRE(rec.rc == 2); REQUIRE(rec.errorLog.size() >= 20); @@ -106,7 +106,7 @@ TEST_CASE("forking_executor", "[forking_executor]") auto recFuture = ex.execute(0, "command", task); std::this_thread::sleep_for(1s); ex.stop(0, "command"); - auto rec = recFuture.get(); + auto rec = recFuture->get(); auto stop = daggy::Clock::now(); REQUIRE(rec.rc == 9); @@ -133,7 +133,7 @@ TEST_CASE("forking_executor", "[forking_executor]") "/usr/bin/cat", bigFile}}}}; auto recFuture = ex.execute(0, "command", task); - auto rec = recFuture.get(); + auto rec = recFuture->get(); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); diff --git a/libdaggy/tests/unit_executor_noopexecutor.cpp b/libdaggy/tests/unit_executor_noopexecutor.cpp index 220fb08..767669c 100644 --- a/libdaggy/tests/unit_executor_noopexecutor.cpp +++ b/libdaggy/tests/unit_executor_noopexecutor.cpp @@ -23,7 +23,7 @@ TEST_CASE("noop_executor", "[noop_executor]") REQUIRE(ex.validateTaskParameters(task.job)); auto recFuture = ex.execute(0, "command", task); - auto rec = recFuture.get(); + auto rec = recFuture->get(); REQUIRE(rec.rc == 0); } diff --git a/libdaggy/tests/unit_threadpool.cpp b/libdaggy/tests/unit_threadpool.cpp index 852ba98..a150fde 100644 --- a/libdaggy/tests/unit_threadpool.cpp +++ b/libdaggy/tests/unit_threadpool.cpp @@ -11,24 +11,24 @@ TEST_CASE("threadpool", "[threadpool]") std::atomic cnt(0); ThreadPool tp(10); - std::vector> rets; + std::vector>> rets; SECTION("Adding large tasks queues with return values") { - std::vector> res; + std::vector>> res; for (size_t i = 0; i < 100; ++i) res.emplace_back(tp.addTask([&cnt]() { cnt++; return cnt.load(); })); for (auto &r : res) - r.get(); + r->get(); REQUIRE(cnt == 100); } SECTION("Slow runs") { - std::vector> res; + std::vector>> res; using namespace std::chrono_literals; for (size_t i = 0; i < 100; ++i) res.push_back(tp.addTask([&cnt]() { @@ -37,7 +37,7 @@ TEST_CASE("threadpool", "[threadpool]") return; })); for (auto &r : res) - r.get(); + r->get(); REQUIRE(cnt == 100); } }