Adjusting capacity impact calculation to yield a more even distribution of jobs

This commit is contained in:
Ian Roddis
2021-12-23 11:47:55 -04:00
parent 85d252f43c
commit dd473ab8f0
3 changed files with 11 additions and 14 deletions

View File

@@ -159,8 +159,6 @@ namespace daggy::daggyr {
.resourcesUsed = resourcesUsed}); .resourcesUsed = resourcesUsed});
} }
std::cout << "Enqueued " << runID << " / " << taskName << std::endl;
response.send(Pistache::Http::Code::Ok, ""); response.send(Pistache::Http::Code::Ok, "");
} }
@@ -196,9 +194,9 @@ namespace daggy::daggyr {
curCapacity_.cores += it->second.resourcesUsed.cores; curCapacity_.cores += it->second.resourcesUsed.cores;
curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB; curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB;
} }
pending_.extract(it);
std::cout << "Resolved " << it->first.first << " / " std::cout << "Resolved " << it->first.first << " / "
<< it->first.second << std::endl; << it->first.second << std::endl;
pending_.extract(it);
} }
else { else {
payload = R"({ "state": "RUNNING" })"; payload = R"({ "state": "RUNNING" })";

View File

@@ -2,6 +2,8 @@
#include <rapidjson/document.h> #include <rapidjson/document.h>
#include <random>
#include "TaskExecutor.hpp" #include "TaskExecutor.hpp"
namespace rj = rapidjson; namespace rj = rapidjson;

View File

@@ -146,11 +146,11 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
auto curCap = capacityFromJSON(doc["current"]); auto curCap = capacityFromJSON(doc["current"]);
auto totCap = capacityFromJSON(doc["total"]); auto totCap = capacityFromJSON(doc["total"]);
double cores = curCap.cores < 0 ? totCap.cores : curCap.cores; double cores = (curCap.cores - taskUsed.cores);
double memoryMB = curCap.memoryMB < 0 ? totCap.memoryMB : curCap.memoryMB; double memoryMB = (curCap.memoryMB - taskUsed.memoryMB);
double impact = double impact =
std::max(taskUsed.cores / cores, taskUsed.memoryMB / memoryMB); std::min(cores / totCap.cores, memoryMB / totCap.memoryMB);
impacts.emplace_back(runner, impact); impacts.emplace_back(runner, impact);
} }
catch (const std::exception &_) { catch (const std::exception &_) {
@@ -167,15 +167,12 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
return fut; return fut;
} }
auto cit = impacts.begin(); std::sort(impacts.begin(), impacts.end());
for (auto it = impacts.begin(); it != impacts.end(); ++it) {
std::cout << it->first << " impact is " << it->second << std::endl; auto runner = impacts.back();
if (it->second < cit->second)
cit = it;
}
std::stringstream ss; std::stringstream ss;
ss << cit->first << "/v1/task/" << runID << "/" << taskName; ss << runner.first << "/v1/task/" << runID << "/" << taskName;
auto url = ss.str(); auto url = ss.str();
const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST");
@@ -185,7 +182,7 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
RunningTask rt{.prom{}, RunningTask rt{.prom{},
.runID = runID, .runID = runID,
.taskName = taskName, .taskName = taskName,
.runnerURL = cit->first, .runnerURL = runner.first,
.retries = 3}; .retries = 3};
auto fut = rt.prom.get_future(); auto fut = rt.prom.get_future();