diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 70ec0b8..4859543 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -2,7 +2,8 @@ #include -#include +#include +#include #include "TaskExecutor.hpp" @@ -56,6 +57,7 @@ namespace daggy::executors::task { private: void monitor(); + void dispatchQueuedTasks(); struct RunningTask { @@ -63,19 +65,32 @@ namespace daggy::executors::task { DAGRunID runID; std::string taskName; std::string runnerURL; - uint32_t retries; daggy_runner::Capacity resources; }; + struct QueuedTask + { + Task task; + RunningTask rt; + }; + + std::mutex queuedGuard_; + std::condition_variable queuedCV_; + std::deque queuedTasks_; + // Resolves jobs through polling std::atomic running_; + bool promptTask_; std::thread monitorWorker_; + std::thread dispatchWorker_; struct RunnerCapacity { daggy_runner::Capacity current; daggy_runner::Capacity total; }; + RunnerCapacity getRunnerCapacity(const std::string &runnerURL); + std::mutex runnersGuard_; std::unordered_map runners_; diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 22d16d1..a830264 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -78,7 +78,9 @@ namespace daggy::executors::task::daggy_runner { DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) + , promptTask_(false) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) + , dispatchWorker_(&DaggyRunnerTaskExecutor::dispatchQueuedTasks, this) { } @@ -150,91 +152,79 @@ TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID, const Task &task) { auto taskUsed = capacityFromTask(task); - - // Get the capacities for all the runners - // Capacities for a runner can be negative, meaning that they're currently - // oversubscribed. - std::vector> impacts; + QueuedTask qt{.task = task, + .rt{.fut = std::make_shared>(), + .runID = runID, + .taskName = taskName, + .resources = taskUsed}}; + auto fut = qt.rt.fut; { - std::lock_guard lock(runnersGuard_); - for (auto &[runner, caps] : runners_) { - const auto result = HTTP_REQUEST(runner + "/ready"); - if (result.code != 200) { + std::lock_guard lock(queuedGuard_); + queuedTasks_.emplace_back(std::move(qt)); + } + promptTask_ = true; + queuedCV_.notify_one(); + return fut; +} + +void DaggyRunnerTaskExecutor::dispatchQueuedTasks() +{ + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::vector runners; + std::optional oqt; + { + // Wait for either a new task, or an existing task to finish + std::unique_lock lock(queuedGuard_); + queuedCV_.wait(lock, [&] { return !running_ or !queuedTasks_.empty(); }); + promptTask_ = false; + // Check to see if there's a worker available + if (queuedTasks_.empty()) + continue; + const auto &fqt = queuedTasks_.front(); + std::lock_guard rlock(runnersGuard_); + for (auto &[runner, caps] : runners_) { + if (caps.total.cores == 0) { + caps = getRunnerCapacity(runner); + } + if (fqt.rt.resources.cores <= caps.current.cores and + fqt.rt.resources.memoryMB <= caps.current.memoryMB) { + runners.push_back(runner); + } + } + + if (runners.empty()) + continue; + + oqt.emplace(std::move(queuedTasks_.front())); + queuedTasks_.pop_front(); + } + + auto &qt = oqt.value(); + + for (const auto &runner : runners) { + std::stringstream ss; + ss << runner << "/v1/task/" << qt.rt.runID << "/" << qt.rt.taskName; + auto url = ss.str(); + + const auto response = HTTP_REQUEST(url, taskToJSON(qt.task), "POST"); + if (response.code != HTTPCode::Ok) { + std::cout << response.code << " : " << response.body << std::endl; continue; } - // Set capacities if they haven't been discovered yet - if (caps.total.cores == 0) { - const auto &[code, json] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); - if (code != HTTPCode::Ok) { - std::cerr << "Runner " << runner - << " appears to be up, but cannot retrieve capacity"; - continue; - } - caps.current = capacityFromJSON(json["current"]); - caps.total = capacityFromJSON(json["total"]); - } - - double cores = (caps.current.cores - taskUsed.cores); - double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB); - - double impact = - std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB); - impacts.emplace_back(runner, impact); + // Subtract the capacity from the runner + std::lock_guard rlock(runnersGuard_); + auto &cur = runners_.at(runner).current; + cur.cores -= qt.rt.resources.cores; + cur.memoryMB -= qt.rt.resources.memoryMB; + break; } - if (impacts.empty()) { - auto fut = std::make_shared>(); - fut->set(AttemptRecord{ - .rc = -1, .executorLog = "No runners available for execution"}); - return fut; - } + std::lock_guard lock(rtGuard_); + runningTasks_.emplace(std::make_pair(qt.rt.runID, qt.rt.taskName), + std::move(qt.rt)); } - - std::sort(impacts.begin(), impacts.end(), - [](const auto &a, const auto &b) { return a.second > b.second; }); - - std::string submitted_runner; - for (const auto &[runner, _] : impacts) { - auto &caps = runners_.at(runner); - caps.current.cores -= taskUsed.cores; - caps.current.memoryMB -= taskUsed.memoryMB; - - std::stringstream ss; - ss << runner << "/v1/task/" << runID << "/" << taskName; - auto url = ss.str(); - - const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); - if (response.code != HTTPCode::Ok) { - std::cout << response.code << " : " << response.body << std::endl; - continue; - // throw std::runtime_error("Unable to submit task: " + response.body); - } - - submitted_runner = runner; - break; - } - - if (submitted_runner.empty()) { - auto fut = std::make_shared>(); - fut->set(AttemptRecord{ - .rc = -1, .executorLog = "No runners available for execution"}); - return fut; - } - - RunningTask rt{.fut = std::make_shared>(), - .runID = runID, - .taskName = taskName, - .runnerURL = submitted_runner, - .retries = 3, - .resources = taskUsed}; - - TaskFuture fut = rt.fut; - - std::lock_guard lock(rtGuard_); - runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); - - return fut; } bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) @@ -242,23 +232,24 @@ bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) return true; } -void DaggyRunnerTaskExecutor::addRunner(const std::string &url) +DaggyRunnerTaskExecutor::RunnerCapacity +DaggyRunnerTaskExecutor::getRunnerCapacity(const std::string &runnerURL) { // Try and get the capacity - const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity"); + const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity"); if (code != HTTPCode::Ok) { - std::cerr << "Failed to contact runner " << url << ": " - << doc["error"].GetString() - << ", will attempt to set capacities later" << std::endl; - - runners_.emplace(url, RunnerCapacity{}); - return; + return RunnerCapacity{}; } - RunnerCapacity caps{.current = capacityFromJSON(doc["current"]), - .total = capacityFromJSON(doc["total"])}; + return DaggyRunnerTaskExecutor::RunnerCapacity{ + .current = capacityFromJSON(doc["current"]), + .total = capacityFromJSON(doc["total"])}; +} + +void DaggyRunnerTaskExecutor::addRunner(const std::string &url) +{ std::lock_guard lock(runnersGuard_); - runners_.emplace(url, caps); + runners_.emplace(url, getRunnerCapacity(url)); } void DaggyRunnerTaskExecutor::monitor() @@ -266,7 +257,7 @@ void DaggyRunnerTaskExecutor::monitor() std::unordered_map runners; while (running_) { - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); std::unordered_map, std::optional> resolvedJobs; @@ -284,44 +275,44 @@ void DaggyRunnerTaskExecutor::monitor() { std::lock_guard lock(runnersGuard_); - runners = runners_; - } + for (auto &[runnerURL, caps] : runners_) { + rj::Document doc; + try { + auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); + if (code != HTTPCode::Ok) { + std::cout << "Unable to poll: " << code << ": " << dumpJSON(json) + << std::endl; + continue; + } - for (auto &[runnerURL, caps] : runners) { - rj::Document doc; - try { - auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); - if (code != HTTPCode::Ok) { - std::cout << "Unable to poll: " << code << ": " << dumpJSON(json) - << std::endl; + doc.Swap(json); + } + catch (std::exception &e) { + std::cout << "Unable to poll: " << e.what() << std::endl; continue; } - doc.Swap(json); - } - catch (std::exception &e) { - std::cout << "Unable to poll: " << e.what() << std::endl; - continue; - } - - if (!doc.IsArray()) { - std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl; - continue; - } - - const auto tasks = doc.GetArray(); - for (size_t idx = 0; idx < tasks.Size(); ++idx) { - const auto &task = tasks[idx]; - auto tid = std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()); - auto it = taskResources.find(tid); - if (it != taskResources.end()) { - caps.current.cores += it->second.cores; - caps.current.memoryMB += it->second.memoryMB; + if (!doc.IsArray()) { + std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl; + continue; } - auto attempt = attemptRecordFromJSON(task["attempt"]); - resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); + const auto tasks = doc.GetArray(); + for (size_t idx = 0; idx < tasks.Size(); ++idx) { + const auto &task = tasks[idx]; + auto tid = std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()); + auto it = taskResources.find(tid); + if (it != taskResources.end()) { + caps.current.cores += it->second.cores; + caps.current.memoryMB += it->second.memoryMB; + } + + auto attempt = attemptRecordFromJSON(task["attempt"]); + resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); + promptTask_ = true; + queuedCV_.notify_one(); + } } }