From 57e93b5045d0764daa70d5bcbc75a7f9a7da40c8 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Wed, 2 Feb 2022 21:12:05 -0400 Subject: [PATCH] Simplifying daggyr server, and returning to a task submit / task poll model. Squashed commit of the following: commit 0ef57f095d15f0402915de54f83c1671120bd228 Author: Ian Roddis Date: Wed Feb 2 08:18:03 2022 -0400 Simplifying task polling and reducing lock scopes commit d77ef02021cc728849c7d1fb0185dd1a861b4a3d Author: Ian Roddis Date: Wed Feb 2 08:02:47 2022 -0400 Simplifying check commit c1acf34440162abb890a959f3685c2d184242ed5 Author: Ian Roddis Date: Wed Feb 2 08:01:13 2022 -0400 Removing capacity tracking from runner, since it is maintained in daggyd commit 9401246f92113ab140143c1895978b9de8bd9972 Author: Ian Roddis Date: Wed Feb 2 07:47:28 2022 -0400 Adding retry for submission commit 398aa04a320347bb35f23f3f101d91ab4df25652 Author: Ian Roddis Date: Tue Feb 1 14:54:20 2022 -0400 Adding in execution note, as well as requeuing the result if the peer disconnects commit 637b14af6d5b53f25b9c38d4c8a7ed8532af5599 Author: Ian Roddis Date: Tue Feb 1 14:13:59 2022 -0400 Fixing locking issues commit 4d6716dfda8aa7f51e0abbdab833aff618915ba0 Author: Ian Roddis Date: Tue Feb 1 13:33:14 2022 -0400 Single task daggyr working commit bd48a5452a92817faf25ee44a6115aaa2f6c30d1 Author: Ian Roddis Date: Tue Feb 1 12:22:04 2022 -0400 Checkpointing work --- daggyr/libdaggyr/include/daggyr/Server.hpp | 32 +---- daggyr/libdaggyr/src/Server.cpp | 126 ++++++------------ .../task/DaggyRunnerTaskExecutor.hpp | 5 +- libdaggy/src/DAGRunner.cpp | 2 +- libdaggy/src/Utilities.cpp | 26 ++-- .../task/DaggyRunnerTaskExecutor.cpp | 117 ++++++++-------- 6 files changed, 116 insertions(+), 192 deletions(-) diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp index fd49247..fba0861 100644 --- a/daggyr/libdaggyr/include/daggyr/Server.hpp +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -48,7 +48,7 @@ namespace daggy::daggyr { DAGGY_REST_HANDLER(handleReady); DAGGY_REST_HANDLER(handleGetCapacity); DAGGY_REST_HANDLER(handleRunTask); - DAGGY_REST_HANDLER(handlePollTasks); + DAGGY_REST_HANDLER(handlePollTask); DAGGY_REST_HANDLER(handleStopTask); DAGGY_REST_HANDLER(handleValidateTask); @@ -59,33 +59,9 @@ namespace daggy::daggyr { executors::task::ForkingTaskExecutor executor_; using TaskID = std::pair; - - struct TaskRecord - { - RunState state; - AttemptRecord attempt; - }; - - std::mutex capacityGuard_; Capacity maxCapacity_; - Capacity curCapacity_; - - struct PendingJob - { - daggy::executors::task::TaskFuture fut; - Capacity resourcesUsed; - bool resolved; - }; - - std::mutex resolvedGuard_; - std::string resolved_; - size_t nResolved_; - - void monitor(); - std::atomic running_; - std::thread monitorWorker_; - - std::mutex pendingGuard_; - std::unordered_map pending_; + std::mutex rtGuard_; + std::unordered_map + runningTasks_; }; } // namespace daggy::daggyr diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 869d2cb..a134e48 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -37,11 +37,6 @@ namespace daggy::daggyr { , desc_("Daggy Runner API", "0.1") , executor_(maxCores) , maxCapacity_{maxCores, maxMemoryMB} - , curCapacity_{maxCores, maxMemoryMB} - , resolved_("[") - , nResolved_(0) - , running_(true) - , monitorWorker_(&Server::monitor, this) { } @@ -67,8 +62,6 @@ namespace daggy::daggyr { void Server::shutdown() { endpoint_.shutdown(); - running_ = false; - monitorWorker_.join(); } uint16_t Server::getPort() const @@ -102,8 +95,8 @@ namespace daggy::daggyr { .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Run a task"); - versionPath.route(desc_.get("/poll")) - .bind(&Server::handlePollTasks, this) + versionPath.route(desc_.get("/task/:runID/:taskName")) + .bind(&Server::handlePollTask, this) .produces(MIME(Application, Json)) .response( Http::Code::Ok, @@ -152,87 +145,53 @@ namespace daggy::daggyr { REQ_RESPONSE(Not_Acceptable, e.what()); } - { - std::lock_guard lock(capacityGuard_); - curCapacity_.cores -= resourcesUsed.cores; - curCapacity_.memoryMB -= resourcesUsed.memoryMB; - } + auto tid = std::make_pair(runID, taskName); + auto fut = executor_.execute(runID, taskName, task); { - std::lock_guard lock(pendingGuard_); - pending_.emplace(std::make_pair(runID, taskName), - PendingJob{ - .fut = executor_.execute(runID, taskName, task), - .resourcesUsed = resourcesUsed, - }); + std::lock_guard lock(rtGuard_); + runningTasks_.emplace(std::move(tid), std::move(fut)); } response.send(Pistache::Http::Code::Ok, ""); } - void Server::monitor() - { - std::unordered_map resolved; - while (running_) { - resolved.clear(); - std::vector resolvedIDs; - { - std::lock_guard lock(pendingGuard_); - for (const auto &[tid, job] : pending_) { - if (job.fut->ready()) { - resolved.emplace(tid, job.fut->get()); - resolvedIDs.push_back(tid); - } - } - - for (const auto &tid : resolvedIDs) { - pending_.extract(tid); - } - } - - std::unordered_map payloads; - for (const auto &[tid, attempt] : resolved) { - std::stringstream ss; - ss << R"({ "runID": )" << tid.first << R"(, "taskName": )" - << std::quoted(tid.second) << ", " - << R"("state": "COMPLETED", "attempt":)" - << attemptRecordToJSON(attempt) << "}"; - payloads.emplace(tid, ss.str()); - } - - { - std::lock_guard lock(resolvedGuard_); - for (const auto &[_, item] : payloads) { - if (resolved_.empty()) { - resolved_ = "["; - } - - if (nResolved_ > 0) - resolved_ += ','; - resolved_ += item; - ++nResolved_; - } - } - - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } - - void Server::handlePollTasks(const Pistache::Rest::Request &request, - Pistache::Http::ResponseWriter response) + void Server::handlePollTask(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; - std::string payload = "["; - payload.reserve(65536); - { - std::lock_guard lock(resolvedGuard_); - payload.swap(resolved_); - nResolved_ = 0; - } - payload += "]"; - response.send(Pistache::Http::Code::Ok, payload); + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + + auto taskID = std::make_pair(runID, taskName); + std::unordered_map::node_type + node; + bool notFound = false; + { + std::lock_guard lock(rtGuard_); + auto it = runningTasks_.find(taskID); + if (it == runningTasks_.end() || !it->second->ready()) { + notFound = true; + } + else { + node = runningTasks_.extract(taskID); + } + } + + if (notFound) { + response.send(Pistache::Http::Code::Not_Found, ""); + return; + } + + auto prom = response.send(Pistache::Http::Code::Ok, + attemptRecordToJSON(node.mapped()->get())); + // If the promise fails, then reinsert the result for later polling + if (prom.isRejected()) { + std::lock_guard lock(rtGuard_); + runningTasks_.insert(std::move(node)); + } } void Server::handleStopTask(const Pistache::Rest::Request &request, @@ -252,14 +211,7 @@ namespace daggy::daggyr { void Server::handleGetCapacity(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - std::string payload; - { - std::lock_guard lock(capacityGuard_); - payload = R"({ "current": )" + capacityToJSON(curCapacity_) + - R"(, "total": )" + capacityToJSON(maxCapacity_) + "}"; - } - - response.send(Pistache::Http::Code::Ok, payload); + response.send(Pistache::Http::Code::Ok, capacityToJSON(maxCapacity_)); } void Server::handleReady(const Pistache::Rest::Request &request, diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 9da818a..e9772cf 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -57,6 +57,7 @@ namespace daggy::executors::task { private: void monitor(); + using TaskID = std::pair; struct RunningTask { @@ -69,7 +70,6 @@ namespace daggy::executors::task { // Resolves jobs through polling std::atomic running_; - bool promptTask_; std::thread monitorWorker_; daggy_runner::Capacity getRunnerCapacity(const std::string &runnerURL); @@ -79,7 +79,6 @@ namespace daggy::executors::task { std::unordered_map runners_; std::mutex rtGuard_; - std::unordered_map, RunningTask> - runningTasks_; + std::unordered_map runningTasks_; }; } // namespace daggy::executors::task diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp index 5e1b056..fe64cfd 100644 --- a/libdaggy/src/DAGRunner.cpp +++ b/libdaggy/src/DAGRunner.cpp @@ -95,7 +95,7 @@ namespace daggy { if (!running_) return; - const size_t MAX_SUBMITS = 25; + const size_t MAX_SUBMITS = 100; size_t n_submitted = 0; /* diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index 757233c..d1c4263 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -234,7 +234,7 @@ namespace daggy { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 20); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); @@ -276,16 +276,20 @@ namespace daggy { auto response = HTTP_REQUEST(url, payload, method); rj::Document doc; - try { - checkRJParse(doc.Parse(response.body.c_str())); - } - catch (std::exception &e) { - doc.SetObject(); - auto &alloc = doc.GetAllocator(); - std::string message = (response.body.empty() ? e.what() : response.body); - doc.AddMember( - "error", - rj::Value().SetString(message.c_str(), message.size(), alloc), alloc); + if (!response.body.empty()) { + try { + checkRJParse(doc.Parse(response.body.c_str())); + } + catch (std::exception &e) { + doc.SetObject(); + auto &alloc = doc.GetAllocator(); + std::string message = + (response.body.empty() ? e.what() : response.body); + doc.AddMember( + "error", + rj::Value().SetString(message.c_str(), message.size(), alloc), + alloc); + } } return std::make_pair(response.code, std::move(doc)); diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index e30129c..0957543 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -78,7 +78,6 @@ namespace daggy::executors::task::daggy_runner { DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) - , promptTask_(false) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) { } @@ -176,11 +175,21 @@ TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID, ss << exe_runner << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); - const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); + // TODO catching this failure state doesn't allow for runners + // dying. + while (true) { + auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); + if (response.code == 200) + break; + std::cout << "Submitting " << taskName << " expected code 200, got " + << response.code << '[' << response.body << "]\n"; + std::this_thread::sleep_for(250ms); + } RunningTask rt{.fut = std::make_shared>(), .runID = runID, .taskName = taskName, + .runnerURL = exe_runner, .resources = taskUsed}; auto fut = rt.fut; @@ -202,10 +211,10 @@ daggy_runner::Capacity DaggyRunnerTaskExecutor::getRunnerCapacity( // Try and get the capacity const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity"); if (code != HTTPCode::Ok) { - return Capacity{}; + throw std::runtime_error("Unable to get capacity from runner " + runnerURL); } - return capacityFromJSON(doc["total"]); + return capacityFromJSON(doc); } void DaggyRunnerTaskExecutor::addRunner(const std::string &url) @@ -216,83 +225,67 @@ void DaggyRunnerTaskExecutor::addRunner(const std::string &url) void DaggyRunnerTaskExecutor::monitor() { - std::unordered_map runners; - + std::vector resolvedTasks; + std::vector> + runningTasks; + std::unordered_map returnedResources; while (running_) { - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - std::unordered_map, - std::optional> - resolvedJobs; + std::this_thread::sleep_for(2s); + resolvedTasks.clear(); + runningTasks.clear(); + returnedResources.clear(); - std::unordered_map, Capacity> - taskResources; - - // Cache what's running now + // Copy the running tasks to prevent holding the lock too long { std::lock_guard lock(rtGuard_); for (const auto &[tid, info] : runningTasks_) { - taskResources.emplace(tid, info.resources); + runningTasks.emplace_back( + std::make_tuple(tid, info.runnerURL, info.fut, info.resources)); } } - { - std::lock_guard lock(runnersGuard_); - for (auto &[runnerURL, caps] : runners_) { - rj::Document doc; - try { - auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); - if (code != HTTPCode::Ok) { - std::cout << "Unable to poll: " << code << ": " << dumpJSON(json) - << std::endl; - continue; - } - - doc.Swap(json); - } - catch (std::exception &e) { - std::cout << "Unable to poll: " << e.what() << std::endl; + for (const auto &[tid, runner, fut, resources] : runningTasks) { + rj::Document doc; + try { + std::string url = + runner + "/v1/task/" + std::to_string(tid.first) + "/" + tid.second; + auto [code, json] = JSON_HTTP_REQUEST(url); + if (code != HTTPCode::Ok) { continue; } - if (!doc.IsArray()) { - std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl; - continue; - } + doc.Swap(json); + } + catch (std::exception &e) { + continue; + } - const auto tasks = doc.GetArray(); - for (size_t idx = 0; idx < tasks.Size(); ++idx) { - const auto &task = tasks[idx]; - auto tid = std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()); - auto it = taskResources.find(tid); - if (it != taskResources.end()) { - caps.cores += it->second.cores; - caps.memoryMB += it->second.memoryMB; - } + auto &cap = returnedResources[runner]; + cap.cores += resources.cores; + cap.memoryMB += resources.memoryMB; - auto attempt = attemptRecordFromJSON(task["attempt"]); - resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); - promptTask_ = true; - runnersCV_.notify_one(); + auto attempt = attemptRecordFromJSON(doc); + attempt.executorLog += "\nExecuted on " + runner; + fut->set(attempt); + resolvedTasks.push_back(tid); + } + + if (!returnedResources.empty()) { + { + std::lock_guard rLock(runnersGuard_); + for (const auto &[runner, res] : returnedResources) { + auto &caps = runners_[runner]; + caps.cores += res.cores; + caps.memoryMB += res.memoryMB; } } } - std::vector> completedTasks; - { + if (!resolvedTasks.empty()) { std::lock_guard lock(rtGuard_); - for (auto &[taskID, task] : runningTasks_) { - auto it = resolvedJobs.find(taskID); - if (it == resolvedJobs.end()) - continue; - if (it->second.has_value()) { - // Task has completed - task.fut->set(std::move(it->second.value())); - completedTasks.emplace_back(taskID); - } - } - for (const auto &tid : completedTasks) { + for (const auto &tid : resolvedTasks) { runningTasks_.extract(tid); + runnersCV_.notify_one(); } } }