Files
daggy/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp
Ian Roddis 57e93b5045 Simplifying daggyr server, and returning to a
task submit / task poll model.

Squashed commit of the following:

commit 0ef57f095d15f0402915de54f83c1671120bd228
Author: Ian Roddis <tech@kinesin.ca>
Date:   Wed Feb 2 08:18:03 2022 -0400

    Simplifying task polling and reducing lock scopes

commit d77ef02021cc728849c7d1fb0185dd1a861b4a3d
Author: Ian Roddis <tech@kinesin.ca>
Date:   Wed Feb 2 08:02:47 2022 -0400

    Simplifying check

commit c1acf34440162abb890a959f3685c2d184242ed5
Author: Ian Roddis <tech@kinesin.ca>
Date:   Wed Feb 2 08:01:13 2022 -0400

    Removing capacity tracking from runner, since it is maintained in daggyd

commit 9401246f92113ab140143c1895978b9de8bd9972
Author: Ian Roddis <tech@kinesin.ca>
Date:   Wed Feb 2 07:47:28 2022 -0400

    Adding retry for submission

commit 398aa04a320347bb35f23f3f101d91ab4df25652
Author: Ian Roddis <tech@kinesin.ca>
Date:   Tue Feb 1 14:54:20 2022 -0400

    Adding in execution note, as well as requeuing the result if the peer disconnects

commit 637b14af6d5b53f25b9c38d4c8a7ed8532af5599
Author: Ian Roddis <tech@kinesin.ca>
Date:   Tue Feb 1 14:13:59 2022 -0400

    Fixing locking issues

commit 4d6716dfda8aa7f51e0abbdab833aff618915ba0
Author: Ian Roddis <tech@kinesin.ca>
Date:   Tue Feb 1 13:33:14 2022 -0400

    Single task daggyr working

commit bd48a5452a92817faf25ee44a6115aaa2f6c30d1
Author: Ian Roddis <tech@kinesin.ca>
Date:   Tue Feb 1 12:22:04 2022 -0400

    Checkpointing work
2022-02-02 21:12:05 -04:00

293 lines
8.1 KiB
C++

#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/DaggyRunnerTaskExecutor.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <iomanip>
using namespace daggy::executors::task;
using namespace daggy::executors::task::daggy_runner;
using namespace daggy;
namespace daggy::executors::task::daggy_runner {
std::string capacityToJSON(const Capacity &cap)
{
return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" +
std::to_string(cap.memoryMB) + "}";
}
Capacity capacityFromJSON(const rj::Value &spec)
{
Capacity cap{.cores = 0, .memoryMB = 0};
if (!spec.IsObject()) {
throw std::runtime_error("Capacity is not an object");
}
if (spec.HasMember("cores")) {
if (!spec["cores"].IsNumber()) {
throw std::runtime_error("cores member of Capacity is not an integer");
}
cap.cores = spec["cores"].GetInt64();
}
if (spec.HasMember("memoryMB")) {
if (!spec["memoryMB"].IsNumber()) {
throw std::runtime_error(
"memoryMB member of Capacity is not an integer");
}
cap.memoryMB = spec["memoryMB"].GetInt64();
}
return cap;
}
Capacity capacityFromTask(const Task &task)
{
Capacity cap{.cores = 0, .memoryMB = 0};
cap.cores = std::stoll(std::get<std::string>(task.job.at("cores")));
cap.memoryMB = std::stoll(std::get<std::string>(task.job.at("memoryMB")));
return cap;
}
void validateTaskParameters(const daggy::ConfigValues &job)
{
forking_executor::validateTaskParameters(job);
const std::array<std::string, 2> fields{"cores", "memoryMB"};
for (const auto &field : fields) {
if (job.count(field) == 0)
throw std::runtime_error("Missing required job parameter " + field);
const auto &val = job.at(field);
if (!std::holds_alternative<std::string>(val))
throw std::runtime_error(field + " in capacity is not a string");
try {
std::stoll(std::get<std::string>(val));
}
catch (std::exception &e) {
throw std::runtime_error(field + " in capacity is not an integer");
}
}
}
} // namespace daggy::executors::task::daggy_runner
DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor()
: running_(true)
, monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this)
{
}
DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor()
{
running_ = false;
monitorWorker_.join();
}
std::string DaggyRunnerTaskExecutor::description() const
{
std::stringstream ss;
ss << "DaggyRunnerTaskExecutor running with " << runners_.size()
<< " runners: [";
bool first = true;
for (const auto &[runner, _] : runners_) {
if (first) {
first = false;
}
else {
ss << ", ";
}
ss << runner;
}
ss << "]";
return ss.str();
}
// Validates the job to ensure that all required values are set and are of
// the right type,
bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
daggy_runner::validateTaskParameters(job);
return true;
}
std::vector<ConfigValues> DaggyRunnerTaskExecutor::expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues)
{
std::vector<ConfigValues> newValues;
auto command =
(job.count("command") == 0 ? Command{}
: std::get<Command>(job.at("command")));
auto environment = (job.count("environment") == 0
? Command{}
: std::get<Command>(job.at("environment")));
Command both(command);
std::copy(environment.begin(), environment.end(), std::back_inserter(both));
for (const auto &parts : interpolateValues(both, expansionValues)) {
ConfigValues newCommand{job};
newCommand["command"] =
Command(parts.begin(), parts.begin() + command.size());
newCommand["environment"] =
Command(parts.begin() + command.size(), parts.end());
newValues.emplace_back(newCommand);
}
return newValues;
}
// Runs the task
TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID,
const std::string &taskName,
const Task &task)
{
auto taskUsed = capacityFromTask(task);
std::string exe_runner;
Capacity *exe_capacity;
// Block until a host is found
std::unique_lock<std::mutex> lock(runnersGuard_);
// Wait for a host to be available
runnersCV_.wait(lock, [&] {
for (auto &[runner, capacity] : runners_) {
if (capacity.cores >= taskUsed.cores and
capacity.memoryMB >= taskUsed.memoryMB) {
exe_runner = runner;
exe_capacity = &capacity;
return true;
}
}
return false;
});
exe_capacity->cores -= taskUsed.cores;
exe_capacity->memoryMB -= taskUsed.memoryMB;
std::stringstream ss;
ss << exe_runner << "/v1/task/" << runID << "/" << taskName;
auto url = ss.str();
// TODO catching this failure state doesn't allow for runners
// dying.
while (true) {
auto response = HTTP_REQUEST(url, taskToJSON(task), "POST");
if (response.code == 200)
break;
std::cout << "Submitting " << taskName << " expected code 200, got "
<< response.code << '[' << response.body << "]\n";
std::this_thread::sleep_for(250ms);
}
RunningTask rt{.fut = std::make_shared<Future<AttemptRecord>>(),
.runID = runID,
.taskName = taskName,
.runnerURL = exe_runner,
.resources = taskUsed};
auto fut = rt.fut;
{
std::lock_guard<std::mutex> lock(rtGuard_);
runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt));
}
return fut;
}
bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
return true;
}
daggy_runner::Capacity DaggyRunnerTaskExecutor::getRunnerCapacity(
const std::string &runnerURL)
{
// Try and get the capacity
const auto &[code, doc] = JSON_HTTP_REQUEST(runnerURL + "/v1/capacity");
if (code != HTTPCode::Ok) {
throw std::runtime_error("Unable to get capacity from runner " + runnerURL);
}
return capacityFromJSON(doc);
}
void DaggyRunnerTaskExecutor::addRunner(const std::string &url)
{
std::lock_guard<std::mutex> lock(runnersGuard_);
runners_.emplace(url, getRunnerCapacity(url));
}
void DaggyRunnerTaskExecutor::monitor()
{
std::vector<TaskID> resolvedTasks;
std::vector<std::tuple<TaskID, std::string, TaskFuture, Capacity>>
runningTasks;
std::unordered_map<std::string, Capacity> returnedResources;
while (running_) {
std::this_thread::sleep_for(2s);
resolvedTasks.clear();
runningTasks.clear();
returnedResources.clear();
// Copy the running tasks to prevent holding the lock too long
{
std::lock_guard<std::mutex> lock(rtGuard_);
for (const auto &[tid, info] : runningTasks_) {
runningTasks.emplace_back(
std::make_tuple(tid, info.runnerURL, info.fut, info.resources));
}
}
for (const auto &[tid, runner, fut, resources] : runningTasks) {
rj::Document doc;
try {
std::string url =
runner + "/v1/task/" + std::to_string(tid.first) + "/" + tid.second;
auto [code, json] = JSON_HTTP_REQUEST(url);
if (code != HTTPCode::Ok) {
continue;
}
doc.Swap(json);
}
catch (std::exception &e) {
continue;
}
auto &cap = returnedResources[runner];
cap.cores += resources.cores;
cap.memoryMB += resources.memoryMB;
auto attempt = attemptRecordFromJSON(doc);
attempt.executorLog += "\nExecuted on " + runner;
fut->set(attempt);
resolvedTasks.push_back(tid);
}
if (!returnedResources.empty()) {
{
std::lock_guard<std::mutex> rLock(runnersGuard_);
for (const auto &[runner, res] : returnedResources) {
auto &caps = runners_[runner];
caps.cores += res.cores;
caps.memoryMB += res.memoryMB;
}
}
}
if (!resolvedTasks.empty()) {
std::lock_guard<std::mutex> lock(rtGuard_);
for (const auto &tid : resolvedTasks) {
runningTasks_.extract(tid);
runnersCV_.notify_one();
}
}
}
}