From dd473ab8f015e02b665a20dd8d74831d11c33de2 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 23 Dec 2021 11:47:55 -0400 Subject: [PATCH] Adjusting capacity impact calculation to yield a more even distribution of jobs --- daggyr/libdaggyr/src/Server.cpp | 4 +--- .../task/DaggyRunnerTaskExecutor.hpp | 2 ++ .../task/DaggyRunnerTaskExecutor.cpp | 19 ++++++++----------- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 6f9f72e..e167775 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -159,8 +159,6 @@ namespace daggy::daggyr { .resourcesUsed = resourcesUsed}); } - std::cout << "Enqueued " << runID << " / " << taskName << std::endl; - response.send(Pistache::Http::Code::Ok, ""); } @@ -196,9 +194,9 @@ namespace daggy::daggyr { curCapacity_.cores += it->second.resourcesUsed.cores; curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB; } - pending_.extract(it); std::cout << "Resolved " << it->first.first << " / " << it->first.second << std::endl; + pending_.extract(it); } else { payload = R"({ "state": "RUNNING" })"; diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 89b07a7..5bd8aba 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -2,6 +2,8 @@ #include +#include + #include "TaskExecutor.hpp" namespace rj = rapidjson; diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index fb2d9f8..3ba314f 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -146,11 +146,11 @@ std::future DaggyRunnerTaskExecutor::execute( auto curCap = capacityFromJSON(doc["current"]); auto totCap = capacityFromJSON(doc["total"]); - double cores = curCap.cores < 0 ? totCap.cores : curCap.cores; - double memoryMB = curCap.memoryMB < 0 ? totCap.memoryMB : curCap.memoryMB; + double cores = (curCap.cores - taskUsed.cores); + double memoryMB = (curCap.memoryMB - taskUsed.memoryMB); double impact = - std::max(taskUsed.cores / cores, taskUsed.memoryMB / memoryMB); + std::min(cores / totCap.cores, memoryMB / totCap.memoryMB); impacts.emplace_back(runner, impact); } catch (const std::exception &_) { @@ -167,15 +167,12 @@ std::future DaggyRunnerTaskExecutor::execute( return fut; } - auto cit = impacts.begin(); - for (auto it = impacts.begin(); it != impacts.end(); ++it) { - std::cout << it->first << " impact is " << it->second << std::endl; - if (it->second < cit->second) - cit = it; - } + std::sort(impacts.begin(), impacts.end()); + + auto runner = impacts.back(); std::stringstream ss; - ss << cit->first << "/v1/task/" << runID << "/" << taskName; + ss << runner.first << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); @@ -185,7 +182,7 @@ std::future DaggyRunnerTaskExecutor::execute( RunningTask rt{.prom{}, .runID = runID, .taskName = taskName, - .runnerURL = cit->first, + .runnerURL = runner.first, .retries = 3}; auto fut = rt.prom.get_future();