Files
daggy/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp
Ian Roddis 53308c063d Fixing a number of scaling issues:
- Missed closing of file descriptor made ForkingTaskExecutor
  silently die after running out of FDs
- Tightened up scope for locks to prevent http timeout
- Simplified threadpool
2022-01-10 13:02:10 -04:00

354 lines
10 KiB
C++

#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/DaggyRunnerTaskExecutor.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <iomanip>
using namespace daggy::executors::task;
using namespace daggy::executors::task::daggy_runner;
using namespace daggy;
namespace daggy::executors::task::daggy_runner {
std::string capacityToJSON(const Capacity &cap)
{
return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" +
std::to_string(cap.memoryMB) + "}";
}
Capacity capacityFromJSON(const rj::Value &spec)
{
Capacity cap{.cores = 0, .memoryMB = 0};
if (!spec.IsObject()) {
throw std::runtime_error("Capacity is not an object");
}
if (spec.HasMember("cores")) {
if (!spec["cores"].IsNumber()) {
throw std::runtime_error("cores member of Capacity is not an integer");
}
cap.cores = spec["cores"].GetInt64();
}
if (spec.HasMember("memoryMB")) {
if (!spec["memoryMB"].IsNumber()) {
throw std::runtime_error(
"memoryMB member of Capacity is not an integer");
}
cap.memoryMB = spec["memoryMB"].GetInt64();
}
return cap;
}
Capacity capacityFromTask(const Task &task)
{
Capacity cap{.cores = 0, .memoryMB = 0};
cap.cores = std::stoll(std::get<std::string>(task.job.at("cores")));
cap.memoryMB = std::stoll(std::get<std::string>(task.job.at("memoryMB")));
return cap;
}
void validateTaskParameters(const daggy::ConfigValues &job)
{
forking_executor::validateTaskParameters(job);
const std::array<std::string, 2> fields{"cores", "memoryMB"};
for (const auto &field : fields) {
if (job.count(field) == 0)
throw std::runtime_error("Missing required job parameter " + field);
const auto &val = job.at(field);
if (!std::holds_alternative<std::string>(val))
throw std::runtime_error(field + " in capacity is not a string");
try {
std::stoll(std::get<std::string>(val));
}
catch (std::exception &e) {
throw std::runtime_error(field + " in capacity is not an integer");
}
}
}
} // namespace daggy::executors::task::daggy_runner
DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor()
: running_(true)
, monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this)
{
}
DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor()
{
running_ = false;
monitorWorker_.join();
}
std::string DaggyRunnerTaskExecutor::description() const
{
std::stringstream ss;
ss << "DaggyRunnerTaskExecutor running with " << runners_.size()
<< " runners: [";
bool first = true;
for (const auto &[runner, _] : runners_) {
if (first) {
first = false;
}
else {
ss << ", ";
}
ss << runner;
}
ss << "]";
return ss.str();
}
// Validates the job to ensure that all required values are set and are of
// the right type,
bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
daggy_runner::validateTaskParameters(job);
return true;
}
std::vector<ConfigValues> DaggyRunnerTaskExecutor::expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues)
{
std::vector<ConfigValues> newValues;
auto command =
(job.count("command") == 0 ? Command{}
: std::get<Command>(job.at("command")));
auto environment = (job.count("environment") == 0
? Command{}
: std::get<Command>(job.at("environment")));
Command both(command);
std::copy(environment.begin(), environment.end(), std::back_inserter(both));
for (const auto &parts : interpolateValues(both, expansionValues)) {
ConfigValues newCommand{job};
newCommand["command"] =
Command(parts.begin(), parts.begin() + command.size());
newCommand["environment"] =
Command(parts.begin() + command.size(), parts.end());
newValues.emplace_back(newCommand);
}
return newValues;
}
// Runs the task
std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
{
auto taskUsed = capacityFromTask(task);
// Get the capacities for all the runners
// Capacities for a runner can be negative, meaning that they're currently
// oversubscribed.
std::vector<std::pair<std::string, double>> impacts;
{
std::lock_guard<std::mutex> lock(runnersGuard_);
for (auto &[runner, caps] : runners_) {
const auto result = HTTP_REQUEST(runner + "/ready");
if (result.code != 200)
continue;
// Set capacities if they haven't been discovered yet
if (caps.total.cores == 0) {
const auto &[code, json] = JSON_HTTP_REQUEST(runner + "/v1/capacity");
if (code != HTTPCode::Ok) {
std::cerr << "Runner " << runner
<< " appears to be up, but cannot retrieve capacity";
continue;
}
caps.current = capacityFromJSON(json["current"]);
caps.total = capacityFromJSON(json["total"]);
}
double cores = (caps.current.cores - taskUsed.cores);
double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB);
double impact =
std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB);
impacts.emplace_back(runner, impact);
}
if (impacts.empty()) {
std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
return fut;
}
}
std::sort(impacts.begin(), impacts.end(),
[](const auto &a, const auto &b) { return a.second > b.second; });
std::string submitted_runner;
for (const auto &[runner, _] : impacts) {
auto &caps = runners_.at(runner);
caps.current.cores -= taskUsed.cores;
caps.current.memoryMB -= taskUsed.memoryMB;
std::stringstream ss;
ss << runner << "/v1/task/" << runID << "/" << taskName;
auto url = ss.str();
const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST");
if (response.code != HTTPCode::Ok) {
continue;
// throw std::runtime_error("Unable to submit task: " + response.body);
}
submitted_runner = runner;
}
if (!submitted_runner.empty()) {
std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
return fut;
}
RunningTask rt{.prom{},
.runID = runID,
.taskName = taskName,
.runnerURL = submitted_runner,
.retries = 3,
.resources = taskUsed};
auto fut = rt.prom.get_future();
std::lock_guard<std::mutex> lock(rtGuard_);
runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt));
return fut;
}
bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
return true;
}
void DaggyRunnerTaskExecutor::addRunner(const std::string &url)
{
// Try and get the capacity
const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity");
if (code != HTTPCode::Ok) {
std::cerr << "Failed to contact runner " << url << ": "
<< doc["error"].GetString()
<< ", will attempt to set capacities later" << std::endl;
runners_.emplace(url, RunnerCapacity{});
return;
}
RunnerCapacity caps{.current = capacityFromJSON(doc["current"]),
.total = capacityFromJSON(doc["total"])};
std::lock_guard<std::mutex> lock(runnersGuard_);
runners_.emplace(url, caps);
}
void DaggyRunnerTaskExecutor::monitor()
{
std::unordered_map<std::string, RunnerCapacity> runners;
while (running_) {
std::unordered_map<std::pair<DAGRunID, std::string>,
std::optional<AttemptRecord>>
resolvedJobs;
std::unordered_map<std::pair<DAGRunID, std::string>, Capacity>
taskResources;
// Cache what's running now
{
std::lock_guard<std::mutex> lock(rtGuard_);
for (const auto &[tid, info] : runningTasks_) {
taskResources.emplace(tid, info.resources);
}
}
{
std::lock_guard<std::mutex> lock(runnersGuard_);
runners = runners_;
}
for (auto &[runnerURL, caps] : runners) {
rj::Document doc;
try {
auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
if (code != HTTPCode::Ok)
continue;
doc.Swap(json);
}
catch (std::exception &e) {
continue;
}
const auto tasks = doc.GetArray();
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
const auto &task = tasks[idx];
auto tid = std::make_pair(task["runID"].GetInt64(),
task["taskName"].GetString());
if (task["state"] == "PENDING") {
resolvedJobs.emplace(tid, std::nullopt);
}
else {
auto it = taskResources.find(tid);
if (it != taskResources.end()) {
const auto &res = taskResources.at(tid);
caps.current.cores += res.cores;
caps.current.memoryMB += res.memoryMB;
}
auto attempt = attemptRecordFromJSON(task["attempt"]);
resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"]));
}
}
}
std::vector<std::pair<DAGRunID, std::string>> completedTasks;
{
std::lock_guard<std::mutex> lock(rtGuard_);
for (auto &[taskID, task] : runningTasks_) {
auto it = resolvedJobs.find(taskID);
if (it == resolvedJobs.end()) {
--task.retries;
if (task.retries == 0) {
AttemptRecord record{
.rc = -1, .executorLog = "Unable to query runner for progress"};
task.prom.set_value(std::move(record));
completedTasks.emplace_back(taskID);
}
continue;
}
else if (it->second.has_value()) {
// Task has completed
task.prom.set_value(it->second.value());
completedTasks.emplace_back(taskID);
}
}
for (const auto &tid : completedTasks) {
runningTasks_.extract(tid);
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}