From 5af8deabec071de72d124858cdd2b654049113c4 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 28 Jan 2022 14:18:01 -0400 Subject: [PATCH] Making DaggyTaskRunner block until a runner is available --- daggyr/libdaggyr/src/Server.cpp | 3 + .../task/DaggyRunnerTaskExecutor.hpp | 22 +-- .../task/DaggyRunnerTaskExecutor.cpp | 125 ++++++------------ .../src/executors/task/SSHTaskExecutor.cpp | 7 - 4 files changed, 49 insertions(+), 108 deletions(-) diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 869d2cb..9364c4b 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -142,6 +142,9 @@ namespace daggy::daggyr { auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); + std::cout << "Received request for " << runID << " / " << taskName + << std::endl; + Capacity resourcesUsed; Task task; try { diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 4859543..9da818a 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -57,7 +57,6 @@ namespace daggy::executors::task { private: void monitor(); - void dispatchQueuedTasks(); struct RunningTask { @@ -68,31 +67,16 @@ namespace daggy::executors::task { daggy_runner::Capacity resources; }; - struct QueuedTask - { - Task task; - RunningTask rt; - }; - - std::mutex queuedGuard_; - std::condition_variable queuedCV_; - std::deque queuedTasks_; - // Resolves jobs through polling std::atomic running_; bool promptTask_; std::thread monitorWorker_; - std::thread dispatchWorker_; - struct RunnerCapacity - { - daggy_runner::Capacity current; - daggy_runner::Capacity total; - }; - RunnerCapacity getRunnerCapacity(const std::string &runnerURL); + daggy_runner::Capacity getRunnerCapacity(const std::string &runnerURL); std::mutex runnersGuard_; - std::unordered_map runners_; + std::condition_variable runnersCV_; + std::unordered_map runners_; std::mutex rtGuard_; std::unordered_map, RunningTask> diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 7db6a00..e30129c 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -80,7 +80,6 @@ DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) , promptTask_(false) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) - , dispatchWorker_(&DaggyRunnerTaskExecutor::dispatchQueuedTasks, this) { } @@ -88,7 +87,6 @@ DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() { running_ = false; monitorWorker_.join(); - dispatchWorker_.join(); } std::string DaggyRunnerTaskExecutor::description() const @@ -153,79 +151,44 @@ TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID, const Task &task) { auto taskUsed = capacityFromTask(task); - QueuedTask qt{.task = task, - .rt{.fut = std::make_shared>(), - .runID = runID, - .taskName = taskName, - .resources = taskUsed}}; - auto fut = qt.rt.fut; + + std::string exe_runner; + Capacity *exe_capacity; + + // Block until a host is found + std::unique_lock lock(runnersGuard_); + // Wait for a host to be available + runnersCV_.wait(lock, [&] { + for (auto &[runner, capacity] : runners_) { + if (capacity.cores >= taskUsed.cores and + capacity.memoryMB >= taskUsed.memoryMB) { + exe_runner = runner; + exe_capacity = &capacity; + return true; + } + } + return false; + }); + exe_capacity->cores -= taskUsed.cores; + exe_capacity->memoryMB -= taskUsed.memoryMB; + + std::stringstream ss; + ss << exe_runner << "/v1/task/" << runID << "/" << taskName; + auto url = ss.str(); + + const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); + + RunningTask rt{.fut = std::make_shared>(), + .runID = runID, + .taskName = taskName, + .resources = taskUsed}; + + auto fut = rt.fut; { - std::lock_guard lock(queuedGuard_); - queuedTasks_.emplace_back(std::move(qt)); - } - promptTask_ = true; - queuedCV_.notify_one(); - return fut; -} - -void DaggyRunnerTaskExecutor::dispatchQueuedTasks() -{ - while (running_) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - std::vector runners; - std::optional oqt; - { - // Wait for either a new task, or an existing task to finish - std::unique_lock lock(queuedGuard_); - queuedCV_.wait(lock, [&] { return !running_ or !queuedTasks_.empty(); }); - promptTask_ = false; - // Check to see if there's a worker available - if (queuedTasks_.empty()) - continue; - const auto &fqt = queuedTasks_.front(); - std::lock_guard rlock(runnersGuard_); - for (auto &[runner, caps] : runners_) { - if (caps.total.cores == 0) { - caps = getRunnerCapacity(runner); - } - if (fqt.rt.resources.cores <= caps.current.cores and - fqt.rt.resources.memoryMB <= caps.current.memoryMB) { - runners.push_back(runner); - } - } - - if (runners.empty()) - continue; - - oqt.emplace(std::move(queuedTasks_.front())); - queuedTasks_.pop_front(); - } - - auto &qt = oqt.value(); - - for (const auto &runner : runners) { - std::stringstream ss; - ss << runner << "/v1/task/" << qt.rt.runID << "/" << qt.rt.taskName; - auto url = ss.str(); - - const auto response = HTTP_REQUEST(url, taskToJSON(qt.task), "POST"); - if (response.code != HTTPCode::Ok) { - std::cout << response.code << " : " << response.body << std::endl; - continue; - } - - // Subtract the capacity from the runner - std::lock_guard rlock(runnersGuard_); - auto &cur = runners_.at(runner).current; - cur.cores -= qt.rt.resources.cores; - cur.memoryMB -= qt.rt.resources.memoryMB; - break; - } - std::lock_guard lock(rtGuard_); - runningTasks_.emplace(std::make_pair(qt.rt.runID, qt.rt.taskName), - std::move(qt.rt)); + runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); } + return fut; } bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) @@ -233,18 +196,16 @@ bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) return true; } -DaggyRunnerTaskExecutor::RunnerCapacity -DaggyRunnerTaskExecutor::getRunnerCapacity(const std::string &runnerURL) +daggy_runner::Capacity DaggyRunnerTaskExecutor::getRunnerCapacity( + const std::string &runnerURL) { // Try and get the capacity const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity"); if (code != HTTPCode::Ok) { - return RunnerCapacity{}; + return Capacity{}; } - return DaggyRunnerTaskExecutor::RunnerCapacity{ - .current = capacityFromJSON(doc["current"]), - .total = capacityFromJSON(doc["total"])}; + return capacityFromJSON(doc["total"]); } void DaggyRunnerTaskExecutor::addRunner(const std::string &url) @@ -255,7 +216,7 @@ void DaggyRunnerTaskExecutor::addRunner(const std::string &url) void DaggyRunnerTaskExecutor::monitor() { - std::unordered_map runners; + std::unordered_map runners; while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); @@ -305,14 +266,14 @@ void DaggyRunnerTaskExecutor::monitor() task["taskName"].GetString()); auto it = taskResources.find(tid); if (it != taskResources.end()) { - caps.current.cores += it->second.cores; - caps.current.memoryMB += it->second.memoryMB; + caps.cores += it->second.cores; + caps.memoryMB += it->second.memoryMB; } auto attempt = attemptRecordFromJSON(task["attempt"]); resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); promptTask_ = true; - queuedCV_.notify_one(); + runnersCV_.notify_one(); } } } diff --git a/libdaggy/src/executors/task/SSHTaskExecutor.cpp b/libdaggy/src/executors/task/SSHTaskExecutor.cpp index 56777db..d75f677 100644 --- a/libdaggy/src/executors/task/SSHTaskExecutor.cpp +++ b/libdaggy/src/executors/task/SSHTaskExecutor.cpp @@ -119,13 +119,6 @@ void SSHTaskExecutor::monitor() if (attempt.rc == 255) { --rt.sshRetries; if (rt.sshRetries > 0) { - /* - std::cout << "Resubmitting: " << rt.sshRetries; - for (const auto &i : std::get>( - rt.task.job.at("command"))) - std::cout << " " << i; - std::cout << std::endl; - */ rt.feFuture = fe_.execute(rt.runID, rt.taskName, rt.task); continue; }