#include #include #include #include #include using namespace daggy::executors::task; using namespace daggy::executors::task::daggy_runner; using namespace daggy; namespace daggy::executors::task::daggy_runner { std::string capacityToJSON(const Capacity &cap) { return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" + std::to_string(cap.memoryMB) + "}"; } Capacity capacityFromJSON(const rj::Value &spec) { Capacity cap{.cores = 0, .memoryMB = 0}; if (!spec.IsObject()) { throw std::runtime_error("Capacity is not an object"); } if (spec.HasMember("cores")) { if (!spec["cores"].IsNumber()) { throw std::runtime_error("cores member of Capacity is not an integer"); } cap.cores = spec["cores"].GetInt64(); } if (spec.HasMember("memoryMB")) { if (!spec["memoryMB"].IsNumber()) { throw std::runtime_error( "memoryMB member of Capacity is not an integer"); } cap.memoryMB = spec["memoryMB"].GetInt64(); } return cap; } Capacity capacityFromTask(const Task &task) { Capacity cap{.cores = 0, .memoryMB = 0}; cap.cores = std::stoll(std::get(task.job.at("cores"))); cap.memoryMB = std::stoll(std::get(task.job.at("memoryMB"))); return cap; } void validateTaskParameters(const daggy::ConfigValues &job) { forking_executor::validateTaskParameters(job); const std::array fields{"cores", "memoryMB"}; for (const auto &field : fields) { if (job.count(field) == 0) throw std::runtime_error("Missing required job parameter " + field); const auto &val = job.at(field); if (!std::holds_alternative(val)) throw std::runtime_error(field + " in capacity is not a string"); try { std::stoll(std::get(val)); } catch (std::exception &e) { throw std::runtime_error(field + " in capacity is not an integer"); } } } } // namespace daggy::executors::task::daggy_runner DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() : running_(true) , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) { } DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() { running_ = false; monitorWorker_.join(); } std::string DaggyRunnerTaskExecutor::description() const { std::stringstream ss; ss << "DaggyRunnerTaskExecutor running with " << runners_.size() << " runners: ["; bool first = true; for (const auto &[runner, _] : runners_) { if (first) { first = false; } else { ss << ", "; } ss << runner; } ss << "]"; return ss.str(); } // Validates the job to ensure that all required values are set and are of // the right type, bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) { daggy_runner::validateTaskParameters(job); return true; } std::vector DaggyRunnerTaskExecutor::expandTaskParameters( const ConfigValues &job, const ConfigValues &expansionValues) { std::vector newValues; auto command = (job.count("command") == 0 ? Command{} : std::get(job.at("command"))); auto environment = (job.count("environment") == 0 ? Command{} : std::get(job.at("environment"))); Command both(command); std::copy(environment.begin(), environment.end(), std::back_inserter(both)); for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; newCommand["command"] = Command(parts.begin(), parts.begin() + command.size()); newCommand["environment"] = Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } return newValues; } // Runs the task TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID, const std::string &taskName, const Task &task) { auto taskUsed = capacityFromTask(task); std::string exe_runner; // Block until a host is found std::unique_lock lock(runnersGuard_); // Wait for a host to be available runnersCV_.wait(lock, [&] { for (auto &[runner, capacity] : runners_) { if (capacity.cores >= taskUsed.cores and capacity.memoryMB >= taskUsed.memoryMB) { exe_runner = runner; capacity.cores -= taskUsed.cores; capacity.memoryMB -= taskUsed.memoryMB; return true; } } return false; }); // Capacity has already been reduced lock.unlock(); std::stringstream ss; ss << exe_runner << "/v1/task/" << runID << "/" << taskName; auto url = ss.str(); // TODO catching this failure state doesn't allow for runners // dying. while (true) { auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); if (response.code == 200) break; std::cout << "Submitting " << taskName << " expected code 200, got " << response.code << '[' << response.body << "]\n"; std::this_thread::sleep_for(250ms); } RunningTask rt{.fut = std::make_shared>(), .runID = runID, .taskName = taskName, .runnerURL = exe_runner, .resources = taskUsed}; auto fut = rt.fut; { std::lock_guard rtLock(rtGuard_); runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); } return fut; } bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) { return true; } daggy_runner::Capacity DaggyRunnerTaskExecutor::getRunnerCapacity( const std::string &runnerURL) { // Try and get the capacity const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity"); if (code != HTTPCode::Ok) { throw std::runtime_error("Unable to get capacity from runner " + runnerURL); } return capacityFromJSON(doc); } void DaggyRunnerTaskExecutor::addRunner(const std::string &url) { std::lock_guard lock(runnersGuard_); runners_.emplace(url, getRunnerCapacity(url)); } void DaggyRunnerTaskExecutor::monitor() { std::vector resolvedTasks; std::vector> runningTasks; std::unordered_map returnedResources; while (running_) { std::this_thread::sleep_for(2s); resolvedTasks.clear(); runningTasks.clear(); returnedResources.clear(); // Copy the running tasks to prevent holding the lock too long { std::lock_guard lock(rtGuard_); for (const auto &[tid, info] : runningTasks_) { runningTasks.emplace_back( std::make_tuple(tid, info.runnerURL, info.fut, info.resources)); } } for (const auto &[tid, runner, fut, resources] : runningTasks) { rj::Document doc; try { std::string url = runner + "/v1/task/" + std::to_string(tid.first) + "/" + tid.second; auto [code, json] = JSON_HTTP_REQUEST(url); if (code != HTTPCode::Ok) { continue; } doc.Swap(json); } catch (std::exception &e) { continue; } auto &cap = returnedResources[runner]; cap.cores += resources.cores; cap.memoryMB += resources.memoryMB; auto attempt = attemptRecordFromJSON(doc); attempt.executorLog += "\nExecuted on " + runner; fut->set(attempt); resolvedTasks.push_back(tid); } if (!returnedResources.empty()) { { std::lock_guard rLock(runnersGuard_); for (const auto &[runner, res] : returnedResources) { auto &caps = runners_[runner]; caps.cores += res.cores; caps.memoryMB += res.memoryMB; } } } if (!resolvedTasks.empty()) { std::lock_guard lock(rtGuard_); for (const auto &tid : resolvedTasks) { runningTasks_.extract(tid); runnersCV_.notify_one(); } } } }