From 779d6adaea1c1b46a7ee015c5b7db55b4d821372 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 24 Dec 2021 10:21:19 -0400 Subject: [PATCH] Moving to a poll method for workers, and daggyd-preserved capacities --- daggyr/daggyr/daggyr.cpp | 6 +- daggyr/libdaggyr/include/daggyr/Server.hpp | 12 +- daggyr/libdaggyr/src/Server.cpp | 100 +++++----- libdaggy/include/daggy/Utilities.hpp | 3 +- .../task/DaggyRunnerTaskExecutor.hpp | 10 +- libdaggy/src/Utilities.cpp | 20 +- .../task/DaggyRunnerTaskExecutor.cpp | 176 ++++++++++++------ 7 files changed, 199 insertions(+), 128 deletions(-) diff --git a/daggyr/daggyr/daggyr.cpp b/daggyr/daggyr/daggyr.cpp index 3770d21..de04de7 100644 --- a/daggyr/daggyr/daggyr.cpp +++ b/daggyr/daggyr/daggyr.cpp @@ -119,7 +119,7 @@ int main(int argc, char **argv) args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); args.add_argument("--config").default_value(std::string{}); args.add_argument("--ip").default_value(std::string{"127.0.0.1"}); - args.add_argument("--port").default_value(int{2504}); + args.add_argument("--port").default_value(2504u); try { args.parse_args(argc, argv); @@ -138,7 +138,7 @@ int main(int argc, char **argv) bool asDaemon = args.get("--daemon"); auto configFile = args.get("--config"); std::string listenIP = args.get("--ip"); - int listenPort = args.get("--port"); + int listenPort = args.get("--port"); size_t webThreads = 50; ssize_t maxCores = std::max(1U, std::thread::hardware_concurrency() - 2); ssize_t maxMemoryMB = @@ -164,7 +164,7 @@ int main(int argc, char **argv) if (co.HasMember("cores")) maxCores = co["cores"].GetInt64(); if (co.HasMember("memoryMB")) - maxCores = co["memoryMB"].GetInt64(); + maxMemoryMB = co["memoryMB"].GetInt64(); } } diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp index 6cd335a..e7b95ef 100644 --- a/daggyr/libdaggyr/include/daggyr/Server.hpp +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #define DAGGY_REST_HANDLER(func) \ void func(const Pistache::Rest::Request &request, \ @@ -47,7 +48,7 @@ namespace daggy::daggyr { DAGGY_REST_HANDLER(handleReady); DAGGY_REST_HANDLER(handleGetCapacity); DAGGY_REST_HANDLER(handleRunTask); - DAGGY_REST_HANDLER(handleGetTask); + DAGGY_REST_HANDLER(handlePollTasks); DAGGY_REST_HANDLER(handleStopTask); DAGGY_REST_HANDLER(handleValidateTask); @@ -71,14 +72,13 @@ namespace daggy::daggyr { struct PendingJob { + DAGRunID runID; + std::string taskName; std::future fut; Capacity resourcesUsed; + bool resolved; }; - std::unordered_map, PendingJob> pending_; - - std::mutex resultsGuard_; - std::unordered_map, AttemptRecord> - results_; + std::list pending_; }; } // namespace daggy::daggyr diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index e167775..99d8c60 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -96,11 +96,12 @@ namespace daggy::daggyr { .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Run a task"); - versionPath.route(desc_.get("/task/:runID/:taskName")) - .bind(&Server::handleGetTask, this) + versionPath.route(desc_.get("/poll")) + .bind(&Server::handlePollTasks, this) .produces(MIME(Application, Json)) - .response(Http::Code::Ok, - "Get the state and potentially the AttemptRecord of a task"); + .response( + Http::Code::Ok, + "Poll all running tasks, getting completed attempts and state"); versionPath.route(desc_.del("/task/:runID/:taskName")) .bind(&Server::handleStopTask, this) @@ -153,70 +154,65 @@ namespace daggy::daggyr { { std::lock_guard lock(pendingGuard_); - pending_.emplace( - std::make_pair(runID, taskName), - PendingJob{.fut = executor_.execute(runID, taskName, task), + pending_.push_back( + PendingJob{.runID = runID, + .taskName = taskName, + .fut = executor_.execute(runID, taskName, task), .resourcesUsed = resourcesUsed}); } + std::cout << "Enqueuing " << runID << " / " << taskName << std::endl; + response.send(Pistache::Http::Code::Ok, ""); } - void Server::handleGetTask(const Pistache::Rest::Request &request, - Pistache::Http::ResponseWriter response) + void Server::handlePollTasks(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; - auto runID = request.param(":runID").as(); - auto taskName = request.param(":taskName").as(); - - auto taskID = std::make_pair(runID, taskName); - - std::string payload; + std::stringstream payload; + payload << "["; + bool first = true; // Check to see if it's pending - bool found = false; - { - std::lock_guard lock(pendingGuard_); - auto it = pending_.find(taskID); - if (it != pending_.end()) { - // poll it - if (it->second.fut.valid() and - it->second.fut.wait_for(1ms) == std::future_status::ready) { - auto attempt = it->second.fut.get(); - { - std::lock_guard rlock(resultsGuard_); - results_.emplace(taskID, attempt); - } - { - std::lock_guard rlock(capacityGuard_); - curCapacity_.cores += it->second.resourcesUsed.cores; - curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB; - } - std::cout << "Resolved " << it->first.first << " / " - << it->first.second << std::endl; - pending_.extract(it); - } - else { - payload = R"({ "state": "RUNNING" })"; - found = true; - } + std::lock_guard lock(pendingGuard_); + auto it = pending_.begin(); + while (it != pending_.end()) { + if (first) { + first = false; } - } - - if (!found) { - std::lock_guard lock(resultsGuard_); - auto it = results_.find(taskID); - if (it == results_.end()) { - REQ_RESPONSE(Not_Found, "No such task"); + else { + payload << ", "; } - payload = R"({ "state": "COMPLETED", "attempt": )" + - attemptRecordToJSON(it->second) + "}"; - } + payload << R"({ "runID": )" << it->runID << R"(, "taskName": )" + << std::quoted(it->taskName) << ", "; - response.send(Pistache::Http::Code::Ok, payload); + // poll it + if (it->fut.valid() and + it->fut.wait_for(1ms) == std::future_status::ready) { + auto attempt = it->fut.get(); + + payload << R"("state": "COMPLETED", "attempt":)" + << attemptRecordToJSON(attempt); + { + std::lock_guard rlock(capacityGuard_); + curCapacity_.cores += it->resourcesUsed.cores; + curCapacity_.memoryMB += it->resourcesUsed.memoryMB; + } + std::cout << "Resolved " << it->runID << " / " << it->taskName + << std::endl; + } + else { + payload << R"("state": "PENDING")"; + } + payload << "}"; + } + payload << "]"; + + response.send(Pistache::Http::Code::Ok, payload.str()); } void Server::handleStopTask(const Pistache::Rest::Request &request, diff --git a/libdaggy/include/daggy/Utilities.hpp b/libdaggy/include/daggy/Utilities.hpp index f3447dd..47a4af2 100644 --- a/libdaggy/include/daggy/Utilities.hpp +++ b/libdaggy/include/daggy/Utilities.hpp @@ -49,7 +49,8 @@ namespace daggy { { Ok = 200, Not_Found = 404, - Not_Acceptable = 406 + Not_Acceptable = 406, + Server_Error = 500 }; struct HTTPResponse diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 5bd8aba..453e447 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -58,13 +58,21 @@ namespace daggy::executors::task { std::string taskName; std::string runnerURL; uint32_t retries; + daggy_runner::Capacity resources; }; // Resolves jobs through polling std::atomic running_; std::thread monitorWorker_; - std::unordered_set runners_; + struct RunnerCapacity + { + daggy_runner::Capacity current; + daggy_runner::Capacity total; + }; + std::mutex runnersGuard_; + std::unordered_map runners_; + std::mutex rtGuard_; std::unordered_map, RunningTask> runningTasks_; diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index 52b10a9..bba4668 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -234,7 +234,7 @@ namespace daggy { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); @@ -254,8 +254,9 @@ namespace daggy { if (res != CURLE_OK) { curl_easy_cleanup(curl); - throw std::runtime_error(std::string{"CURL Failed: "} + - curl_easy_strerror(res)); + response.code = HTTPCode::Server_Error; + response.body = std::string{"CURL Failed: "} + curl_easy_strerror(res); + return response; } curl_easy_cleanup(curl); @@ -275,7 +276,18 @@ namespace daggy { auto response = HTTP_REQUEST(url, payload, method); rj::Document doc; - checkRJParse(doc.Parse(response.body.c_str())); + if (response.code == HTTPCode::Server_Error) { + doc.SetObject(); + auto &alloc = doc.GetAllocator(); + doc.AddMember("error", + rj::Value().SetString(response.body.c_str(), + response.body.size(), alloc), + alloc); + } + else { + checkRJParse(doc.Parse(response.body.c_str())); + } + return std::make_pair(response.code, std::move(doc)); } diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 3ba314f..fcaf048 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -135,44 +135,45 @@ std::future DaggyRunnerTaskExecutor::execute( // Capacities for a runner can be negative, meaning that they're currently // oversubscribed. std::vector> impacts; + std::string runner; - for (const auto &runner : runners_) { - try { - const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); - if (code != HTTPCode::Ok) { + { + std::lock_guard lock(runnersGuard_); + for (const auto &[runner, caps] : runners_) { + const auto result = HTTP_REQUEST(runner + "/ready"); + if (result.code != 200) continue; - } - auto curCap = capacityFromJSON(doc["current"]); - auto totCap = capacityFromJSON(doc["total"]); - - double cores = (curCap.cores - taskUsed.cores); - double memoryMB = (curCap.memoryMB - taskUsed.memoryMB); + double cores = (caps.current.cores - taskUsed.cores); + double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB); double impact = - std::min(cores / totCap.cores, memoryMB / totCap.memoryMB); + std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB); + std::cout << runner << ": " << impact << std::endl; impacts.emplace_back(runner, impact); } - catch (const std::exception &_) { - continue; + + if (impacts.empty()) { + std::promise prom; + auto fut = prom.get_future(); + AttemptRecord record{.rc = -1, + .executorLog = "No runners available for execution"}; + prom.set_value(std::move(record)); + return fut; } + + std::sort(impacts.begin(), impacts.end()); + + runner = impacts.back().first; + auto &caps = runners_.at(runner); + caps.current.cores -= taskUsed.cores; + caps.current.memoryMB -= taskUsed.memoryMB; } - if (impacts.empty()) { - std::promise prom; - auto fut = prom.get_future(); - AttemptRecord record{.rc = -1, - .executorLog = "No runners available for execution"}; - prom.set_value(std::move(record)); - return fut; - } - - std::sort(impacts.begin(), impacts.end()); - - auto runner = impacts.back(); + std::cout << "Queuing on runner: " << runner << std::endl; std::stringstream ss; - ss << runner.first << "/v1/task/" << runID << "/" << taskName; + ss << runner << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); @@ -182,8 +183,9 @@ std::future DaggyRunnerTaskExecutor::execute( RunningTask rt{.prom{}, .runID = runID, .taskName = taskName, - .runnerURL = runner.first, - .retries = 3}; + .runnerURL = runner, + .retries = 3, + .resources = taskUsed}; auto fut = rt.prom.get_future(); @@ -200,51 +202,103 @@ bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) void DaggyRunnerTaskExecutor::addRunner(const std::string &url) { - runners_.insert(url); + // Try and get the capacity + const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity"); + if (code != HTTPCode::Ok) { + std::cerr << "Failed to add runner " << url << ": " + << doc["error"].GetString() << std::endl; + return; + } + RunnerCapacity caps{.current = capacityFromJSON(doc["current"]), + .total = capacityFromJSON(doc["total"])}; + std::lock_guard lock(runnersGuard_); + runners_.emplace(url, caps); } void DaggyRunnerTaskExecutor::monitor() { while (running_) { { - std::vector> resolvedJobs; + std::unordered_map, + std::optional> + resolvedJobs; + + std::unordered_map, Capacity> + taskResources; { std::lock_guard lock(rtGuard_); - for (auto &[taskID, task] : runningTasks_) { - try { - const auto &[code, json] = JSON_HTTP_REQUEST( - task.runnerURL + "/v1/task/" + std::to_string(taskID.first) + - "/" + taskID.second); - if (code != HTTPCode::Ok) { - --task.retries; - - if (task.retries == 0) { - AttemptRecord record{ - .rc = -1, - .executorLog = "Unable to query runner for progress"}; - task.prom.set_value(std::move(record)); - resolvedJobs.emplace_back(taskID); - } - continue; - } - - if (json["state"] == "COMPLETED") { - auto attempt = attemptRecordFromJSON(json["attempt"]); - task.prom.set_value(std::move(attempt)); - resolvedJobs.emplace_back(taskID); - } - } - catch (std::runtime_error &e) { - continue; - } - } - for (const auto &tid : resolvedJobs) { - runningTasks_.extract(tid); + for (const auto &[tid, info] : runningTasks_) { + taskResources.emplace(tid, info.resources); } } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + { + std::lock_guard lock(runnersGuard_); + for (auto &[runnerURL, caps] : runners_) { + try { + const auto &[code, json] = + JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); + if (code != HTTPCode::Ok) + continue; + + const auto tasks = json.GetArray(); + for (size_t idx = 0; idx < tasks.Size(); ++idx) { + const auto &task = tasks[idx]; + if (task["state"] == "PENDING") { + resolvedJobs.emplace( + std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()), + std::nullopt); + } + else { + auto tid = std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()); + const auto &res = taskResources.at(tid); + caps.current.cores += res.cores; + caps.current.memoryMB += res.memoryMB; + + resolvedJobs.emplace(tid, + attemptRecordFromJSON(task["attempt"])); + } + } + } + catch (std::exception &e) { + std::cout << "Curl timeout failed for runner " << runnerURL << ": " + << e.what() << std::endl; + } + } + } + + std::vector> completedTasks; + { + std::lock_guard lock(rtGuard_); + for (auto &[taskID, task] : runningTasks_) { + auto it = resolvedJobs.find(taskID); + if (it == resolvedJobs.end()) { + --task.retries; + + if (task.retries == 0) { + AttemptRecord record{ + .rc = -1, + .executorLog = "Unable to query runner for progress"}; + task.prom.set_value(std::move(record)); + completedTasks.emplace_back(taskID); + } + continue; + } + else if (it->second.has_value()) { + // Task has completed + task.prom.set_value(it->second.value()); + completedTasks.emplace_back(taskID); + } + } + } + for (const auto &tid : completedTasks) { + runningTasks_.extract(tid); + } } + + std::this_thread::sleep_for(std::chrono::seconds(1)); } }