From 8ca5cdafe28ac8afe55b9f8b430e375a21a2e3d0 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 24 Dec 2021 10:44:19 -0400 Subject: [PATCH] Fixing issue with resolving --- daggyr/libdaggyr/src/Server.cpp | 2 + libdaggy/src/Utilities.cpp | 2 +- .../task/DaggyRunnerTaskExecutor.cpp | 137 ++++++++---------- 3 files changed, 66 insertions(+), 75 deletions(-) diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 99d8c60..2b72ef9 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -204,9 +204,11 @@ namespace daggy::daggyr { } std::cout << "Resolved " << it->runID << " / " << it->taskName << std::endl; + it = pending_.erase(it); } else { payload << R"("state": "PENDING")"; + ++it; } payload << "}"; } diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index bba4668..47127c8 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -234,7 +234,7 @@ namespace daggy { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 0514127..55fe0a8 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -149,7 +149,6 @@ std::future DaggyRunnerTaskExecutor::execute( double impact = std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB); - std::cout << runner << ": " << impact << std::endl; impacts.emplace_back(runner, impact); } @@ -162,10 +161,8 @@ std::future DaggyRunnerTaskExecutor::execute( return fut; } - std::sort(impacts.begin(), impacts.end()); - for (const auto &[runner, impact] : impacts) { - std::cout << "\t" << runner << ": " << impact << std::endl; - } + std::sort(impacts.begin(), impacts.end(), + [](const auto &a, const auto &b) { return a.second < b.second; }); runner = impacts.back().first; auto &caps = runners_.at(runner); @@ -173,8 +170,6 @@ std::future DaggyRunnerTaskExecutor::execute( caps.current.memoryMB -= taskUsed.memoryMB; } - std::cout << "Queuing on runner: " << runner << std::endl; - std::stringstream ss; ss << runner << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); @@ -221,80 +216,74 @@ void DaggyRunnerTaskExecutor::addRunner(const std::string &url) void DaggyRunnerTaskExecutor::monitor() { while (running_) { + std::unordered_map, + std::optional> + resolvedJobs; + + std::unordered_map, Capacity> + taskResources; + { - std::unordered_map, - std::optional> - resolvedJobs; - - std::unordered_map, Capacity> - taskResources; - - { - std::lock_guard lock(rtGuard_); - for (const auto &[tid, info] : runningTasks_) { - taskResources.emplace(tid, info.resources); - } + std::lock_guard lock(rtGuard_); + for (const auto &[tid, info] : runningTasks_) { + taskResources.emplace(tid, info.resources); } + } - { - std::lock_guard lock(runnersGuard_); - for (auto &[runnerURL, caps] : runners_) { - try { - const auto &[code, json] = - JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); - if (code != HTTPCode::Ok) - continue; - - const auto tasks = json.GetArray(); - for (size_t idx = 0; idx < tasks.Size(); ++idx) { - const auto &task = tasks[idx]; - if (task["state"] == "PENDING") { - resolvedJobs.emplace( - std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()), - std::nullopt); - } - else { - auto tid = std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()); - const auto &res = taskResources.at(tid); - caps.current.cores += res.cores; - caps.current.memoryMB += res.memoryMB; - - resolvedJobs.emplace(tid, - attemptRecordFromJSON(task["attempt"])); - } - } - } - catch (std::exception &e) { - std::cout << "Curl timeout failed for runner " << runnerURL << ": " - << e.what() << std::endl; - } - } - } - - std::vector> completedTasks; - { - std::lock_guard lock(rtGuard_); - for (auto &[taskID, task] : runningTasks_) { - auto it = resolvedJobs.find(taskID); - if (it == resolvedJobs.end()) { - --task.retries; - - if (task.retries == 0) { - AttemptRecord record{ - .rc = -1, - .executorLog = "Unable to query runner for progress"}; - task.prom.set_value(std::move(record)); - completedTasks.emplace_back(taskID); - } + { + std::lock_guard lock(runnersGuard_); + for (auto &[runnerURL, caps] : runners_) { + try { + const auto &[code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); + if (code != HTTPCode::Ok) continue; + + const auto tasks = json.GetArray(); + for (size_t idx = 0; idx < tasks.Size(); ++idx) { + const auto &task = tasks[idx]; + if (task["state"] == "PENDING") { + resolvedJobs.emplace(std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()), + std::nullopt); + } + else { + auto tid = std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()); + const auto &res = taskResources.at(tid); + caps.current.cores += res.cores; + caps.current.memoryMB += res.memoryMB; + + resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); + } } - else if (it->second.has_value()) { - // Task has completed - task.prom.set_value(it->second.value()); + } + catch (std::exception &e) { + std::cout << "Curl timeout failed for runner " << runnerURL << ": " + << e.what() << std::endl; + } + } + } + + std::vector> completedTasks; + { + std::lock_guard lock(rtGuard_); + for (auto &[taskID, task] : runningTasks_) { + auto it = resolvedJobs.find(taskID); + if (it == resolvedJobs.end()) { + --task.retries; + + if (task.retries == 0) { + AttemptRecord record{ + .rc = -1, .executorLog = "Unable to query runner for progress"}; + task.prom.set_value(std::move(record)); completedTasks.emplace_back(taskID); } + continue; + } + else if (it->second.has_value()) { + // Task has completed + task.prom.set_value(it->second.value()); + completedTasks.emplace_back(taskID); } } for (const auto &tid : completedTasks) {