#include #include #include #include #include using namespace daggy::executors::task; using namespace daggy::executors::task::daggy_runner; using namespace daggy; namespace daggy::executors::task::daggy_runner { std::string capacityToJSON(const Capacity &cap) { return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" + std::to_string(cap.memoryMB) + "}"; } Capacity capacityFromJSON(const rj::Value &spec) { Capacity cap{.cores = 0, .memoryMB = 0}; if (!spec.IsObject()) { throw std::runtime_error("Capacity is not an object"); } if (spec.HasMember("cores")) { if (!spec["cores"].IsNumber()) { throw std::runtime_error("cores member of Capacity is not an integer"); } cap.cores = spec["cores"].GetInt64(); } if (spec.HasMember("memoryMB")) { if (!spec["memoryMB"].IsNumber()) { throw std::runtime_error( "memoryMB member of Capacity is not an integer"); } cap.memoryMB = spec["memoryMB"].GetInt64(); } return cap; } Capacity capacityFromTask(const Task &task) { Capacity cap{.cores = 0, .memoryMB = 0}; cap.cores = std::stoll(std::get(task.job.at("cores"))); cap.memoryMB = std::stoll(std::get(task.job.at("memoryMB"))); return cap; } void validateTaskParameters(const daggy::ConfigValues &job) { forking_executor::validateTaskParameters(job); const std::array fields{"cores", "memoryMB"}; for (const auto &field : fields) { if (job.count(field) == 0) throw std::runtime_error("Missing required job parameter " + field); const auto &val = job.at(field); if (!std::holds_alternative(val)) throw std::runtime_error(field + " in capacity is not a string"); try { std::stoll(std::get(val)); } catch (std::exception &e) { throw std::runtime_error(field + " in capacity is not an integer"); } } } } // namespace daggy::executors::task::daggy_runner DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) , promptTask_(false) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) { } DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() { running_ = false; monitorWorker_.join(); } std::string DaggyRunnerTaskExecutor::description() const { std::stringstream ss; ss << "DaggyRunnerTaskExecutor running with " << runners_.size() << " runners: ["; bool first = true; for (const auto &[runner, _] : runners_) { if (first) { first = false; } else { ss << ", "; } ss << runner; } ss << "]"; return ss.str(); } // Validates the job to ensure that all required values are set and are of // the right type, bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) { daggy_runner::validateTaskParameters(job); return true; } std::vector DaggyRunnerTaskExecutor::expandTaskParameters( const ConfigValues &job, const ConfigValues &expansionValues) { std::vector newValues; auto command = (job.count("command") == 0 ? Command{} : std::get(job.at("command"))); auto environment = (job.count("environment") == 0 ? Command{} : std::get(job.at("environment"))); Command both(command); std::copy(environment.begin(), environment.end(), std::back_inserter(both)); for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; newCommand["command"] = Command(parts.begin(), parts.begin() + command.size()); newCommand["environment"] = Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } return newValues; } // Runs the task TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID, const std::string &taskName, const Task &task) { auto taskUsed = capacityFromTask(task); std::string exe_runner; Capacity *exe_capacity; // Block until a host is found std::unique_lock lock(runnersGuard_); // Wait for a host to be available runnersCV_.wait(lock, [&] { for (auto &[runner, capacity] : runners_) { if (capacity.cores >= taskUsed.cores and capacity.memoryMB >= taskUsed.memoryMB) { exe_runner = runner; exe_capacity = &capacity; return true; } } return false; }); exe_capacity->cores -= taskUsed.cores; exe_capacity->memoryMB -= taskUsed.memoryMB; std::stringstream ss; ss << exe_runner << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); RunningTask rt{.fut = std::make_shared>(), .runID = runID, .taskName = taskName, .resources = taskUsed}; auto fut = rt.fut; { std::lock_guard lock(rtGuard_); runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); } return fut; } bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) { return true; } daggy_runner::Capacity DaggyRunnerTaskExecutor::getRunnerCapacity( const std::string &runnerURL) { // Try and get the capacity const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity"); if (code != HTTPCode::Ok) { return Capacity{}; } return capacityFromJSON(doc["total"]); } void DaggyRunnerTaskExecutor::addRunner(const std::string &url) { std::lock_guard lock(runnersGuard_); runners_.emplace(url, getRunnerCapacity(url)); } void DaggyRunnerTaskExecutor::monitor() { std::unordered_map runners; while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); std::unordered_map, std::optional> resolvedJobs; std::unordered_map, Capacity> taskResources; // Cache what's running now { std::lock_guard lock(rtGuard_); for (const auto &[tid, info] : runningTasks_) { taskResources.emplace(tid, info.resources); } } { std::lock_guard lock(runnersGuard_); for (auto &[runnerURL, caps] : runners_) { rj::Document doc; try { auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); if (code != HTTPCode::Ok) { std::cout << "Unable to poll: " << code << ": " << dumpJSON(json) << std::endl; continue; } doc.Swap(json); } catch (std::exception &e) { std::cout << "Unable to poll: " << e.what() << std::endl; continue; } if (!doc.IsArray()) { std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl; continue; } const auto tasks = doc.GetArray(); for (size_t idx = 0; idx < tasks.Size(); ++idx) { const auto &task = tasks[idx]; auto tid = std::make_pair(task["runID"].GetInt64(), task["taskName"].GetString()); auto it = taskResources.find(tid); if (it != taskResources.end()) { caps.cores += it->second.cores; caps.memoryMB += it->second.memoryMB; } auto attempt = attemptRecordFromJSON(task["attempt"]); resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); promptTask_ = true; runnersCV_.notify_one(); } } } std::vector> completedTasks; { std::lock_guard lock(rtGuard_); for (auto &[taskID, task] : runningTasks_) { auto it = resolvedJobs.find(taskID); if (it == resolvedJobs.end()) continue; if (it->second.has_value()) { // Task has completed task.fut->set(std::move(it->second.value())); completedTasks.emplace_back(taskID); } } for (const auto &tid : completedTasks) { runningTasks_.extract(tid); } } } }