From f9076be081ab6f0618bd3e9b75a5064d145f4218 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 7 Jan 2022 16:00:04 -0400 Subject: [PATCH] Adding fix for race condition in task resource management --- .../task/DaggyRunnerTaskExecutor.cpp | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 4924b21..52e4a39 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -268,34 +268,39 @@ void DaggyRunnerTaskExecutor::monitor() { std::lock_guard lock(runnersGuard_); for (auto &[runnerURL, caps] : runners_) { + rj::Document doc; try { - const auto &[code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); + auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); if (code != HTTPCode::Ok) continue; + doc.Swap(json); + } + catch (std::exception &e) { + std::cout << "Curl failed for runner " << runnerURL << ": " + << e.what() << std::endl; + } - const auto tasks = json.GetArray(); - for (size_t idx = 0; idx < tasks.Size(); ++idx) { - const auto &task = tasks[idx]; - if (task["state"] == "PENDING") { - resolvedJobs.emplace(std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()), - std::nullopt); - } - else { - auto tid = std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()); + const auto tasks = doc.GetArray(); + for (size_t idx = 0; idx < tasks.Size(); ++idx) { + const auto &task = tasks[idx]; + if (task["state"] == "PENDING") { + resolvedJobs.emplace(std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()), + std::nullopt); + } + else { + auto tid = std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()); + auto it = taskResources.find(tid); + if (it != taskResources.end()) { const auto &res = taskResources.at(tid); caps.current.cores += res.cores; caps.current.memoryMB += res.memoryMB; - - resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); } + + resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); } } - catch (std::exception &e) { - std::cout << "Curl timeout failed for runner " << runnerURL << ": " - << e.what() << std::endl; - } } }