From 3c6966a9aca76b7c5685275e68b23bfc2ed33176 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 24 Dec 2021 10:52:06 -0400 Subject: [PATCH] Adding discovery of capacities after the fact. --- .../task/DaggyRunnerTaskExecutor.hpp | 5 +++++ .../task/DaggyRunnerTaskExecutor.cpp | 22 ++++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 453e447..2fb5fdc 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -15,6 +15,11 @@ namespace daggy::executors::task { { ssize_t cores; ssize_t memoryMB; + void operator==(const Capacity &other) + { + cores = other.cores; + memoryMB = other.memoryMB; + } }; std::string capacityToJSON(const Capacity &cap); diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 55fe0a8..d89e771 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -139,11 +139,23 @@ std::future DaggyRunnerTaskExecutor::execute( { std::lock_guard lock(runnersGuard_); - for (const auto &[runner, caps] : runners_) { + for (auto &[runner, caps] : runners_) { const auto result = HTTP_REQUEST(runner + "/ready"); if (result.code != 200) continue; + // Set capacities if they haven't been discovered yet + if (caps.total.cores == 0) { + const auto &[code, json] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); + if (code != HTTPCode::Ok) { + std::cerr << "Runner " << runner + << " appears to be up, but cannot retrieve capacity"; + continue; + } + caps.current = capacityFromJSON(json["current"]); + caps.total = capacityFromJSON(json["total"]); + } + double cores = (caps.current.cores - taskUsed.cores); double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB); @@ -203,10 +215,14 @@ void DaggyRunnerTaskExecutor::addRunner(const std::string &url) // Try and get the capacity const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity"); if (code != HTTPCode::Ok) { - std::cerr << "Failed to add runner " << url << ": " - << doc["error"].GetString() << std::endl; + std::cerr << "Failed to contact runner " << url << ": " + << doc["error"].GetString() + << ", will attempt to set capacities later" << std::endl; + + runners_.emplace(url, RunnerCapacity{}); return; } + RunnerCapacity caps{.current = capacityFromJSON(doc["current"]), .total = capacityFromJSON(doc["total"])}; std::lock_guard lock(runnersGuard_);