diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index e9772cf..d4f98ba 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -74,6 +74,8 @@ namespace daggy::executors::task { daggy_runner::Capacity getRunnerCapacity(const std::string &runnerURL); + std::vector runnersMaxCapacities_; + std::mutex runnersGuard_; std::condition_variable runnersCV_; std::unordered_map runners_; diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index e6083fa..f1ee57a 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -113,6 +113,20 @@ bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) { daggy_runner::validateTaskParameters(job); + // Ensure the job is actually runable + auto cores = std::stoll(std::get(job.at("cores"))); + auto memoryMB = std::stoll(std::get(job.at("memoryMB"))); + + auto it = + std::find_if(runnersMaxCapacities_.begin(), runnersMaxCapacities_.end(), + [&](const auto &cap) { + return (cores <= cap.cores and memoryMB <= cap.memoryMB); + }); + if (it == runnersMaxCapacities_.end()) + throw std::runtime_error("Task requires " + std::to_string(cores) + + " cores and " + std::to_string(memoryMB) + + " MB memory, but no runner " + "can satisty those requirements"); return true; } @@ -219,7 +233,10 @@ daggy_runner::Capacity DaggyRunnerTaskExecutor::getRunnerCapacity( void DaggyRunnerTaskExecutor::addRunner(const std::string &url) { std::lock_guard lock(runnersGuard_); - runners_.emplace(url, getRunnerCapacity(url)); + auto maxCapacity = getRunnerCapacity(url); + + runnersMaxCapacities_.push_back(maxCapacity); + runners_.emplace(url, maxCapacity); } void DaggyRunnerTaskExecutor::monitor()