Adding in queuing in the daggyd process for more even distribution
This commit is contained in:
@@ -2,7 +2,8 @@
|
||||
|
||||
#include <rapidjson/document.h>
|
||||
|
||||
#include <random>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
|
||||
#include "TaskExecutor.hpp"
|
||||
|
||||
@@ -56,6 +57,7 @@ namespace daggy::executors::task {
|
||||
|
||||
private:
|
||||
void monitor();
|
||||
void dispatchQueuedTasks();
|
||||
|
||||
struct RunningTask
|
||||
{
|
||||
@@ -63,19 +65,32 @@ namespace daggy::executors::task {
|
||||
DAGRunID runID;
|
||||
std::string taskName;
|
||||
std::string runnerURL;
|
||||
uint32_t retries;
|
||||
daggy_runner::Capacity resources;
|
||||
};
|
||||
|
||||
struct QueuedTask
|
||||
{
|
||||
Task task;
|
||||
RunningTask rt;
|
||||
};
|
||||
|
||||
std::mutex queuedGuard_;
|
||||
std::condition_variable queuedCV_;
|
||||
std::deque<QueuedTask> queuedTasks_;
|
||||
|
||||
// Resolves jobs through polling
|
||||
std::atomic<bool> running_;
|
||||
bool promptTask_;
|
||||
std::thread monitorWorker_;
|
||||
std::thread dispatchWorker_;
|
||||
|
||||
struct RunnerCapacity
|
||||
{
|
||||
daggy_runner::Capacity current;
|
||||
daggy_runner::Capacity total;
|
||||
};
|
||||
RunnerCapacity getRunnerCapacity(const std::string &runnerURL);
|
||||
|
||||
std::mutex runnersGuard_;
|
||||
std::unordered_map<std::string, RunnerCapacity> runners_;
|
||||
|
||||
|
||||
@@ -78,7 +78,9 @@ namespace daggy::executors::task::daggy_runner {
|
||||
|
||||
DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor()
|
||||
: running_(true)
|
||||
, promptTask_(false)
|
||||
, monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this)
|
||||
, dispatchWorker_(&DaggyRunnerTaskExecutor::dispatchQueuedTasks, this)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -150,91 +152,79 @@ TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID,
|
||||
const Task &task)
|
||||
{
|
||||
auto taskUsed = capacityFromTask(task);
|
||||
|
||||
// Get the capacities for all the runners
|
||||
// Capacities for a runner can be negative, meaning that they're currently
|
||||
// oversubscribed.
|
||||
std::vector<std::pair<std::string, double>> impacts;
|
||||
QueuedTask qt{.task = task,
|
||||
.rt{.fut = std::make_shared<Future<AttemptRecord>>(),
|
||||
.runID = runID,
|
||||
.taskName = taskName,
|
||||
.resources = taskUsed}};
|
||||
auto fut = qt.rt.fut;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(runnersGuard_);
|
||||
for (auto &[runner, caps] : runners_) {
|
||||
const auto result = HTTP_REQUEST(runner + "/ready");
|
||||
if (result.code != 200) {
|
||||
std::lock_guard<std::mutex> lock(queuedGuard_);
|
||||
queuedTasks_.emplace_back(std::move(qt));
|
||||
}
|
||||
promptTask_ = true;
|
||||
queuedCV_.notify_one();
|
||||
return fut;
|
||||
}
|
||||
|
||||
void DaggyRunnerTaskExecutor::dispatchQueuedTasks()
|
||||
{
|
||||
while (running_) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
std::vector<std::string> runners;
|
||||
std::optional<QueuedTask> oqt;
|
||||
{
|
||||
// Wait for either a new task, or an existing task to finish
|
||||
std::unique_lock<std::mutex> lock(queuedGuard_);
|
||||
queuedCV_.wait(lock, [&] { return !running_ or !queuedTasks_.empty(); });
|
||||
promptTask_ = false;
|
||||
// Check to see if there's a worker available
|
||||
if (queuedTasks_.empty())
|
||||
continue;
|
||||
const auto &fqt = queuedTasks_.front();
|
||||
std::lock_guard<std::mutex> rlock(runnersGuard_);
|
||||
for (auto &[runner, caps] : runners_) {
|
||||
if (caps.total.cores == 0) {
|
||||
caps = getRunnerCapacity(runner);
|
||||
}
|
||||
if (fqt.rt.resources.cores <= caps.current.cores and
|
||||
fqt.rt.resources.memoryMB <= caps.current.memoryMB) {
|
||||
runners.push_back(runner);
|
||||
}
|
||||
}
|
||||
|
||||
if (runners.empty())
|
||||
continue;
|
||||
|
||||
oqt.emplace(std::move(queuedTasks_.front()));
|
||||
queuedTasks_.pop_front();
|
||||
}
|
||||
|
||||
auto &qt = oqt.value();
|
||||
|
||||
for (const auto &runner : runners) {
|
||||
std::stringstream ss;
|
||||
ss << runner << "/v1/task/" << qt.rt.runID << "/" << qt.rt.taskName;
|
||||
auto url = ss.str();
|
||||
|
||||
const auto response = HTTP_REQUEST(url, taskToJSON(qt.task), "POST");
|
||||
if (response.code != HTTPCode::Ok) {
|
||||
std::cout << response.code << " : " << response.body << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Set capacities if they haven't been discovered yet
|
||||
if (caps.total.cores == 0) {
|
||||
const auto &[code, json] = JSON_HTTP_REQUEST(runner + "/v1/capacity");
|
||||
if (code != HTTPCode::Ok) {
|
||||
std::cerr << "Runner " << runner
|
||||
<< " appears to be up, but cannot retrieve capacity";
|
||||
continue;
|
||||
}
|
||||
caps.current = capacityFromJSON(json["current"]);
|
||||
caps.total = capacityFromJSON(json["total"]);
|
||||
}
|
||||
|
||||
double cores = (caps.current.cores - taskUsed.cores);
|
||||
double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB);
|
||||
|
||||
double impact =
|
||||
std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB);
|
||||
impacts.emplace_back(runner, impact);
|
||||
// Subtract the capacity from the runner
|
||||
std::lock_guard<std::mutex> rlock(runnersGuard_);
|
||||
auto &cur = runners_.at(runner).current;
|
||||
cur.cores -= qt.rt.resources.cores;
|
||||
cur.memoryMB -= qt.rt.resources.memoryMB;
|
||||
break;
|
||||
}
|
||||
|
||||
if (impacts.empty()) {
|
||||
auto fut = std::make_shared<Future<AttemptRecord>>();
|
||||
fut->set(AttemptRecord{
|
||||
.rc = -1, .executorLog = "No runners available for execution"});
|
||||
return fut;
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(rtGuard_);
|
||||
runningTasks_.emplace(std::make_pair(qt.rt.runID, qt.rt.taskName),
|
||||
std::move(qt.rt));
|
||||
}
|
||||
|
||||
std::sort(impacts.begin(), impacts.end(),
|
||||
[](const auto &a, const auto &b) { return a.second > b.second; });
|
||||
|
||||
std::string submitted_runner;
|
||||
for (const auto &[runner, _] : impacts) {
|
||||
auto &caps = runners_.at(runner);
|
||||
caps.current.cores -= taskUsed.cores;
|
||||
caps.current.memoryMB -= taskUsed.memoryMB;
|
||||
|
||||
std::stringstream ss;
|
||||
ss << runner << "/v1/task/" << runID << "/" << taskName;
|
||||
auto url = ss.str();
|
||||
|
||||
const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST");
|
||||
if (response.code != HTTPCode::Ok) {
|
||||
std::cout << response.code << " : " << response.body << std::endl;
|
||||
continue;
|
||||
// throw std::runtime_error("Unable to submit task: " + response.body);
|
||||
}
|
||||
|
||||
submitted_runner = runner;
|
||||
break;
|
||||
}
|
||||
|
||||
if (submitted_runner.empty()) {
|
||||
auto fut = std::make_shared<Future<AttemptRecord>>();
|
||||
fut->set(AttemptRecord{
|
||||
.rc = -1, .executorLog = "No runners available for execution"});
|
||||
return fut;
|
||||
}
|
||||
|
||||
RunningTask rt{.fut = std::make_shared<Future<AttemptRecord>>(),
|
||||
.runID = runID,
|
||||
.taskName = taskName,
|
||||
.runnerURL = submitted_runner,
|
||||
.retries = 3,
|
||||
.resources = taskUsed};
|
||||
|
||||
TaskFuture fut = rt.fut;
|
||||
|
||||
std::lock_guard<std::mutex> lock(rtGuard_);
|
||||
runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt));
|
||||
|
||||
return fut;
|
||||
}
|
||||
|
||||
bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
|
||||
@@ -242,23 +232,24 @@ bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
|
||||
return true;
|
||||
}
|
||||
|
||||
void DaggyRunnerTaskExecutor::addRunner(const std::string &url)
|
||||
DaggyRunnerTaskExecutor::RunnerCapacity
|
||||
DaggyRunnerTaskExecutor::getRunnerCapacity(const std::string &runnerURL)
|
||||
{
|
||||
// Try and get the capacity
|
||||
const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity");
|
||||
const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity");
|
||||
if (code != HTTPCode::Ok) {
|
||||
std::cerr << "Failed to contact runner " << url << ": "
|
||||
<< doc["error"].GetString()
|
||||
<< ", will attempt to set capacities later" << std::endl;
|
||||
|
||||
runners_.emplace(url, RunnerCapacity{});
|
||||
return;
|
||||
return RunnerCapacity{};
|
||||
}
|
||||
|
||||
RunnerCapacity caps{.current = capacityFromJSON(doc["current"]),
|
||||
.total = capacityFromJSON(doc["total"])};
|
||||
return DaggyRunnerTaskExecutor::RunnerCapacity{
|
||||
.current = capacityFromJSON(doc["current"]),
|
||||
.total = capacityFromJSON(doc["total"])};
|
||||
}
|
||||
|
||||
void DaggyRunnerTaskExecutor::addRunner(const std::string &url)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(runnersGuard_);
|
||||
runners_.emplace(url, caps);
|
||||
runners_.emplace(url, getRunnerCapacity(url));
|
||||
}
|
||||
|
||||
void DaggyRunnerTaskExecutor::monitor()
|
||||
@@ -266,7 +257,7 @@ void DaggyRunnerTaskExecutor::monitor()
|
||||
std::unordered_map<std::string, RunnerCapacity> runners;
|
||||
|
||||
while (running_) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
std::unordered_map<std::pair<DAGRunID, std::string>,
|
||||
std::optional<AttemptRecord>>
|
||||
resolvedJobs;
|
||||
@@ -284,44 +275,44 @@ void DaggyRunnerTaskExecutor::monitor()
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(runnersGuard_);
|
||||
runners = runners_;
|
||||
}
|
||||
for (auto &[runnerURL, caps] : runners_) {
|
||||
rj::Document doc;
|
||||
try {
|
||||
auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
|
||||
if (code != HTTPCode::Ok) {
|
||||
std::cout << "Unable to poll: " << code << ": " << dumpJSON(json)
|
||||
<< std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto &[runnerURL, caps] : runners) {
|
||||
rj::Document doc;
|
||||
try {
|
||||
auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
|
||||
if (code != HTTPCode::Ok) {
|
||||
std::cout << "Unable to poll: " << code << ": " << dumpJSON(json)
|
||||
<< std::endl;
|
||||
doc.Swap(json);
|
||||
}
|
||||
catch (std::exception &e) {
|
||||
std::cout << "Unable to poll: " << e.what() << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
doc.Swap(json);
|
||||
}
|
||||
catch (std::exception &e) {
|
||||
std::cout << "Unable to poll: " << e.what() << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!doc.IsArray()) {
|
||||
std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto tasks = doc.GetArray();
|
||||
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
|
||||
const auto &task = tasks[idx];
|
||||
auto tid = std::make_pair(task["runID"].GetInt64(),
|
||||
task["taskName"].GetString());
|
||||
auto it = taskResources.find(tid);
|
||||
if (it != taskResources.end()) {
|
||||
caps.current.cores += it->second.cores;
|
||||
caps.current.memoryMB += it->second.memoryMB;
|
||||
if (!doc.IsArray()) {
|
||||
std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto attempt = attemptRecordFromJSON(task["attempt"]);
|
||||
resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"]));
|
||||
const auto tasks = doc.GetArray();
|
||||
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
|
||||
const auto &task = tasks[idx];
|
||||
auto tid = std::make_pair(task["runID"].GetInt64(),
|
||||
task["taskName"].GetString());
|
||||
auto it = taskResources.find(tid);
|
||||
if (it != taskResources.end()) {
|
||||
caps.current.cores += it->second.cores;
|
||||
caps.current.memoryMB += it->second.memoryMB;
|
||||
}
|
||||
|
||||
auto attempt = attemptRecordFromJSON(task["attempt"]);
|
||||
resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"]));
|
||||
promptTask_ = true;
|
||||
queuedCV_.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user