#include #include #include #include #include using namespace daggy::executors::task; using namespace daggy::executors::task::daggy_runner; using namespace daggy; namespace daggy::executors::task::daggy_runner { std::string capacityToJSON(const Capacity &cap) { return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" + std::to_string(cap.memoryMB) + "}"; } Capacity capacityFromJSON(const rj::Value &spec) { Capacity cap{.cores = 0, .memoryMB = 0}; if (!spec.IsObject()) { throw std::runtime_error("Capacity is not an object"); } if (spec.HasMember("cores")) { if (!spec["cores"].IsNumber()) { throw std::runtime_error("cores member of Capacity is not an integer"); } cap.cores = spec["cores"].GetInt64(); } if (spec.HasMember("memoryMB")) { if (!spec["memoryMB"].IsNumber()) { throw std::runtime_error( "memoryMB member of Capacity is not an integer"); } cap.memoryMB = spec["memoryMB"].GetInt64(); } return cap; } Capacity capacityFromTask(const Task &task) { Capacity cap{.cores = 0, .memoryMB = 0}; cap.cores = std::stoll(std::get(task.job.at("cores"))); cap.memoryMB = std::stoll(std::get(task.job.at("memoryMB"))); return cap; } void validateTaskParameters(const daggy::ConfigValues &job) { forking_executor::validateTaskParameters(job); const std::array fields{"cores", "memoryMB"}; for (const auto &field : fields) { if (job.count(field) == 0) throw std::runtime_error("Missing required job parameter " + field); const auto &val = job.at(field); if (!std::holds_alternative(val)) throw std::runtime_error(field + " in capacity is not a string"); try { std::stoll(std::get(val)); } catch (std::exception &e) { throw std::runtime_error(field + " in capacity is not an integer"); } } } } // namespace daggy::executors::task::daggy_runner DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) { } DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() { running_ = false; monitorWorker_.join(); } // Validates the job to ensure that all required values are set and are of // the right type, bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) { daggy_runner::validateTaskParameters(job); return true; } std::vector DaggyRunnerTaskExecutor::expandTaskParameters( const ConfigValues &job, const ConfigValues &expansionValues) { std::vector newValues; auto command = (job.count("command") == 0 ? Command{} : std::get(job.at("command"))); auto environment = (job.count("environment") == 0 ? Command{} : std::get(job.at("environment"))); Command both(command); std::copy(environment.begin(), environment.end(), std::back_inserter(both)); for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; newCommand["command"] = Command(parts.begin(), parts.begin() + command.size()); newCommand["environment"] = Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } return newValues; } // Runs the task std::future DaggyRunnerTaskExecutor::execute( DAGRunID runID, const std::string &taskName, const Task &task) { auto taskUsed = capacityFromTask(task); // Get the capacities for all the runners // Capacities for a runner can be negative, meaning that they're currently // oversubscribed. std::vector> impacts; for (const auto &runner : runners_) { try { const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); if (code != HTTPCode::Ok) { continue; } auto curCap = capacityFromJSON(doc["current"]); auto totCap = capacityFromJSON(doc["total"]); ssize_t cores = curCap.cores < 0 ? totCap.cores : curCap.cores; ssize_t memoryMB = curCap.memoryMB < 0 ? totCap.memoryMB : curCap.memoryMB; double impact = std::max(taskUsed.cores / cores, taskUsed.memoryMB / memoryMB); impacts.emplace_back(runner, impact); } catch (const std::exception &_) { continue; } } if (impacts.empty()) throw std::runtime_error("No runners available for execution"); auto cit = impacts.begin(); for (auto it = impacts.begin(); it != impacts.end(); ++it) { if (it->second < cit->second) cit = it; } RunningTask rt{ .prom{}, .runID = runID, .taskName = taskName, .runnerURL = cit->first}; auto fut = rt.prom.get_future(); std::lock_guard lock(rtGuard_); runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); return fut; } bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) { return true; } void DaggyRunnerTaskExecutor::addRunner(const std::string &url) { runners_.insert(url); } void DaggyRunnerTaskExecutor::monitor() { while (running_) { { std::vector> resolvedJobs; std::lock_guard lock(rtGuard_); for (auto &[taskID, task] : runningTasks_) { try { const auto &[code, json] = JSON_HTTP_REQUEST( task.runnerURL + "/v1/task/" + std::to_string(taskID.first) + "/" + taskID.second); if (code != HTTPCode::Ok) { AttemptRecord record{ .rc = -1, .executorLog = "Unable to query runner for progress"}; task.prom.set_value(std::move(record)); resolvedJobs.emplace_back(taskID); continue; } if (json["state"] == "COMPLETED") { task.prom.set_value(attemptRecordFromJSON(json["attempt"])); resolvedJobs.emplace_back(taskID); } } catch (std::runtime_error &e) { continue; } for (const auto &tid : resolvedJobs) { runningTasks_.extract(tid); } } std::this_thread::sleep_for(std::chrono::seconds(1)); } } }