#include #include #include #include #include using namespace daggy::executors::task; using namespace daggy::executors::task::daggy_runner; using namespace daggy; namespace daggy::executors::task::daggy_runner { std::string capacityToJSON(const Capacity &cap) { return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" + std::to_string(cap.memoryMB) + "}"; } Capacity capacityFromJSON(const rj::Value &spec) { Capacity cap{.cores = 0, .memoryMB = 0}; if (!spec.IsObject()) { throw std::runtime_error("Capacity is not an object"); } if (spec.HasMember("cores")) { if (!spec["cores"].IsNumber()) { throw std::runtime_error("cores member of Capacity is not an integer"); } cap.cores = spec["cores"].GetInt64(); } if (spec.HasMember("memoryMB")) { if (!spec["memoryMB"].IsNumber()) { throw std::runtime_error( "memoryMB member of Capacity is not an integer"); } cap.memoryMB = spec["memoryMB"].GetInt64(); } return cap; } Capacity capacityFromTask(const Task &task) { Capacity cap{.cores = 0, .memoryMB = 0}; cap.cores = std::stoll(std::get(task.job.at("cores"))); cap.memoryMB = std::stoll(std::get(task.job.at("memoryMB"))); return cap; } void validateTaskParameters(const daggy::ConfigValues &job) { forking_executor::validateTaskParameters(job); const std::array fields{"cores", "memoryMB"}; for (const auto &field : fields) { if (job.count(field) == 0) throw std::runtime_error("Missing required job parameter " + field); const auto &val = job.at(field); if (!std::holds_alternative(val)) throw std::runtime_error(field + " in capacity is not a string"); try { std::stoll(std::get(val)); } catch (std::exception &e) { throw std::runtime_error(field + " in capacity is not an integer"); } } } } // namespace daggy::executors::task::daggy_runner DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) { } DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() { running_ = false; monitorWorker_.join(); } std::string DaggyRunnerTaskExecutor::description() const { std::stringstream ss; ss << "DaggyRunnerTaskExecutor running with " << runners_.size() << " runners: ["; bool first = true; for (const auto &[runner, _] : runners_) { if (first) { first = false; } else { ss << ", "; } ss << runner; } ss << "]"; return ss.str(); } // Validates the job to ensure that all required values are set and are of // the right type, bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) { daggy_runner::validateTaskParameters(job); return true; } std::vector DaggyRunnerTaskExecutor::expandTaskParameters( const ConfigValues &job, const ConfigValues &expansionValues) { std::vector newValues; auto command = (job.count("command") == 0 ? Command{} : std::get(job.at("command"))); auto environment = (job.count("environment") == 0 ? Command{} : std::get(job.at("environment"))); Command both(command); std::copy(environment.begin(), environment.end(), std::back_inserter(both)); for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; newCommand["command"] = Command(parts.begin(), parts.begin() + command.size()); newCommand["environment"] = Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } return newValues; } // Runs the task std::future DaggyRunnerTaskExecutor::execute( DAGRunID runID, const std::string &taskName, const Task &task) { auto taskUsed = capacityFromTask(task); // Get the capacities for all the runners // Capacities for a runner can be negative, meaning that they're currently // oversubscribed. std::vector> impacts; std::string runner; { std::lock_guard lock(runnersGuard_); for (auto &[runner, caps] : runners_) { const auto result = HTTP_REQUEST(runner + "/ready"); if (result.code != 200) continue; // Set capacities if they haven't been discovered yet if (caps.total.cores == 0) { const auto &[code, json] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); if (code != HTTPCode::Ok) { std::cerr << "Runner " << runner << " appears to be up, but cannot retrieve capacity"; continue; } caps.current = capacityFromJSON(json["current"]); caps.total = capacityFromJSON(json["total"]); } double cores = (caps.current.cores - taskUsed.cores); double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB); double impact = std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB); impacts.emplace_back(runner, impact); } if (impacts.empty()) { std::promise prom; auto fut = prom.get_future(); AttemptRecord record{.rc = -1, .executorLog = "No runners available for execution"}; prom.set_value(std::move(record)); return fut; } std::sort(impacts.begin(), impacts.end(), [](const auto &a, const auto &b) { return a.second < b.second; }); runner = impacts.back().first; auto &caps = runners_.at(runner); caps.current.cores -= taskUsed.cores; caps.current.memoryMB -= taskUsed.memoryMB; } std::stringstream ss; ss << runner << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); if (response.code != HTTPCode::Ok) throw std::runtime_error("Unable to submit task: " + response.body); RunningTask rt{.prom{}, .runID = runID, .taskName = taskName, .runnerURL = runner, .retries = 3, .resources = taskUsed}; auto fut = rt.prom.get_future(); std::lock_guard lock(rtGuard_); runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); return fut; } bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) { return true; } void DaggyRunnerTaskExecutor::addRunner(const std::string &url) { // Try and get the capacity const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity"); if (code != HTTPCode::Ok) { std::cerr << "Failed to contact runner " << url << ": " << doc["error"].GetString() << ", will attempt to set capacities later" << std::endl; runners_.emplace(url, RunnerCapacity{}); return; } RunnerCapacity caps{.current = capacityFromJSON(doc["current"]), .total = capacityFromJSON(doc["total"])}; std::lock_guard lock(runnersGuard_); runners_.emplace(url, caps); } void DaggyRunnerTaskExecutor::monitor() { while (running_) { std::unordered_map, std::optional> resolvedJobs; std::unordered_map, Capacity> taskResources; { std::lock_guard lock(rtGuard_); for (const auto &[tid, info] : runningTasks_) { taskResources.emplace(tid, info.resources); } } { std::lock_guard lock(runnersGuard_); for (auto &[runnerURL, caps] : runners_) { rj::Document doc; try { auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); if (code != HTTPCode::Ok) continue; doc.Swap(json); } catch (std::exception &e) { std::cout << "Curl failed for runner " << runnerURL << ": " << e.what() << std::endl; } const auto tasks = doc.GetArray(); for (size_t idx = 0; idx < tasks.Size(); ++idx) { const auto &task = tasks[idx]; if (task["state"] == "PENDING") { resolvedJobs.emplace(std::make_pair(task["runID"].GetInt64(), task["taskName"].GetString()), std::nullopt); } else { auto tid = std::make_pair(task["runID"].GetInt64(), task["taskName"].GetString()); auto it = taskResources.find(tid); if (it != taskResources.end()) { const auto &res = taskResources.at(tid); caps.current.cores += res.cores; caps.current.memoryMB += res.memoryMB; } resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); } } } } std::vector> completedTasks; { std::lock_guard lock(rtGuard_); for (auto &[taskID, task] : runningTasks_) { auto it = resolvedJobs.find(taskID); if (it == resolvedJobs.end()) { --task.retries; if (task.retries == 0) { AttemptRecord record{ .rc = -1, .executorLog = "Unable to query runner for progress"}; task.prom.set_value(std::move(record)); completedTasks.emplace_back(taskID); } continue; } else if (it->second.has_value()) { // Task has completed task.prom.set_value(it->second.value()); completedTasks.emplace_back(taskID); } } for (const auto &tid : completedTasks) { runningTasks_.extract(tid); } } std::this_thread::sleep_for(std::chrono::seconds(1)); } }