Moving to a poll method for workers, and daggyd-preserved capacities

This commit is contained in:
Ian Roddis
2021-12-24 10:21:19 -04:00
parent 0914ede8fb
commit 779d6adaea
7 changed files with 199 additions and 128 deletions

View File

@@ -119,7 +119,7 @@ int main(int argc, char **argv)
args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); args.add_argument("-d", "--daemon").default_value(false).implicit_value(true);
args.add_argument("--config").default_value(std::string{}); args.add_argument("--config").default_value(std::string{});
args.add_argument("--ip").default_value(std::string{"127.0.0.1"}); args.add_argument("--ip").default_value(std::string{"127.0.0.1"});
args.add_argument("--port").default_value(int{2504}); args.add_argument("--port").default_value(2504u);
try { try {
args.parse_args(argc, argv); args.parse_args(argc, argv);
@@ -138,7 +138,7 @@ int main(int argc, char **argv)
bool asDaemon = args.get<bool>("--daemon"); bool asDaemon = args.get<bool>("--daemon");
auto configFile = args.get<std::string>("--config"); auto configFile = args.get<std::string>("--config");
std::string listenIP = args.get<std::string>("--ip"); std::string listenIP = args.get<std::string>("--ip");
int listenPort = args.get<int>("--port"); int listenPort = args.get<uint32_t>("--port");
size_t webThreads = 50; size_t webThreads = 50;
ssize_t maxCores = std::max(1U, std::thread::hardware_concurrency() - 2); ssize_t maxCores = std::max(1U, std::thread::hardware_concurrency() - 2);
ssize_t maxMemoryMB = ssize_t maxMemoryMB =
@@ -164,7 +164,7 @@ int main(int argc, char **argv)
if (co.HasMember("cores")) if (co.HasMember("cores"))
maxCores = co["cores"].GetInt64(); maxCores = co["cores"].GetInt64();
if (co.HasMember("memoryMB")) if (co.HasMember("memoryMB"))
maxCores = co["memoryMB"].GetInt64(); maxMemoryMB = co["memoryMB"].GetInt64();
} }
} }

View File

@@ -10,6 +10,7 @@
#include <daggy/executors/task/ForkingTaskExecutor.hpp> #include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/loggers/dag_run/DAGRunLogger.hpp> #include <daggy/loggers/dag_run/DAGRunLogger.hpp>
#include <filesystem> #include <filesystem>
#include <list>
#define DAGGY_REST_HANDLER(func) \ #define DAGGY_REST_HANDLER(func) \
void func(const Pistache::Rest::Request &request, \ void func(const Pistache::Rest::Request &request, \
@@ -47,7 +48,7 @@ namespace daggy::daggyr {
DAGGY_REST_HANDLER(handleReady); DAGGY_REST_HANDLER(handleReady);
DAGGY_REST_HANDLER(handleGetCapacity); DAGGY_REST_HANDLER(handleGetCapacity);
DAGGY_REST_HANDLER(handleRunTask); DAGGY_REST_HANDLER(handleRunTask);
DAGGY_REST_HANDLER(handleGetTask); DAGGY_REST_HANDLER(handlePollTasks);
DAGGY_REST_HANDLER(handleStopTask); DAGGY_REST_HANDLER(handleStopTask);
DAGGY_REST_HANDLER(handleValidateTask); DAGGY_REST_HANDLER(handleValidateTask);
@@ -71,14 +72,13 @@ namespace daggy::daggyr {
struct PendingJob struct PendingJob
{ {
DAGRunID runID;
std::string taskName;
std::future<AttemptRecord> fut; std::future<AttemptRecord> fut;
Capacity resourcesUsed; Capacity resourcesUsed;
bool resolved;
}; };
std::unordered_map<std::pair<DAGRunID, std::string>, PendingJob> pending_; std::list<PendingJob> pending_;
std::mutex resultsGuard_;
std::unordered_map<std::pair<DAGRunID, std::string>, AttemptRecord>
results_;
}; };
} // namespace daggy::daggyr } // namespace daggy::daggyr

View File

@@ -96,11 +96,12 @@ namespace daggy::daggyr {
.produces(MIME(Application, Json)) .produces(MIME(Application, Json))
.response(Http::Code::Ok, "Run a task"); .response(Http::Code::Ok, "Run a task");
versionPath.route(desc_.get("/task/:runID/:taskName")) versionPath.route(desc_.get("/poll"))
.bind(&Server::handleGetTask, this) .bind(&Server::handlePollTasks, this)
.produces(MIME(Application, Json)) .produces(MIME(Application, Json))
.response(Http::Code::Ok, .response(
"Get the state and potentially the AttemptRecord of a task"); Http::Code::Ok,
"Poll all running tasks, getting completed attempts and state");
versionPath.route(desc_.del("/task/:runID/:taskName")) versionPath.route(desc_.del("/task/:runID/:taskName"))
.bind(&Server::handleStopTask, this) .bind(&Server::handleStopTask, this)
@@ -153,70 +154,65 @@ namespace daggy::daggyr {
{ {
std::lock_guard<std::mutex> lock(pendingGuard_); std::lock_guard<std::mutex> lock(pendingGuard_);
pending_.emplace( pending_.push_back(
std::make_pair(runID, taskName), PendingJob{.runID = runID,
PendingJob{.fut = executor_.execute(runID, taskName, task), .taskName = taskName,
.fut = executor_.execute(runID, taskName, task),
.resourcesUsed = resourcesUsed}); .resourcesUsed = resourcesUsed});
} }
std::cout << "Enqueuing " << runID << " / " << taskName << std::endl;
response.send(Pistache::Http::Code::Ok, ""); response.send(Pistache::Http::Code::Ok, "");
} }
void Server::handleGetTask(const Pistache::Rest::Request &request, void Server::handlePollTasks(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response) Pistache::Http::ResponseWriter response)
{ {
if (!handleAuth(request)) if (!handleAuth(request))
return; return;
auto runID = request.param(":runID").as<DAGRunID>(); std::stringstream payload;
auto taskName = request.param(":taskName").as<std::string>(); payload << "[";
bool first = true;
auto taskID = std::make_pair(runID, taskName);
std::string payload;
// Check to see if it's pending // Check to see if it's pending
bool found = false; std::lock_guard<std::mutex> lock(pendingGuard_);
{ auto it = pending_.begin();
std::lock_guard<std::mutex> lock(pendingGuard_); while (it != pending_.end()) {
auto it = pending_.find(taskID); if (first) {
if (it != pending_.end()) { first = false;
// poll it
if (it->second.fut.valid() and
it->second.fut.wait_for(1ms) == std::future_status::ready) {
auto attempt = it->second.fut.get();
{
std::lock_guard<std::mutex> rlock(resultsGuard_);
results_.emplace(taskID, attempt);
}
{
std::lock_guard<std::mutex> rlock(capacityGuard_);
curCapacity_.cores += it->second.resourcesUsed.cores;
curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB;
}
std::cout << "Resolved " << it->first.first << " / "
<< it->first.second << std::endl;
pending_.extract(it);
}
else {
payload = R"({ "state": "RUNNING" })";
found = true;
}
} }
} else {
payload << ", ";
if (!found) {
std::lock_guard<std::mutex> lock(resultsGuard_);
auto it = results_.find(taskID);
if (it == results_.end()) {
REQ_RESPONSE(Not_Found, "No such task");
} }
payload = R"({ "state": "COMPLETED", "attempt": )" + payload << R"({ "runID": )" << it->runID << R"(, "taskName": )"
attemptRecordToJSON(it->second) + "}"; << std::quoted(it->taskName) << ", ";
}
response.send(Pistache::Http::Code::Ok, payload); // poll it
if (it->fut.valid() and
it->fut.wait_for(1ms) == std::future_status::ready) {
auto attempt = it->fut.get();
payload << R"("state": "COMPLETED", "attempt":)"
<< attemptRecordToJSON(attempt);
{
std::lock_guard<std::mutex> rlock(capacityGuard_);
curCapacity_.cores += it->resourcesUsed.cores;
curCapacity_.memoryMB += it->resourcesUsed.memoryMB;
}
std::cout << "Resolved " << it->runID << " / " << it->taskName
<< std::endl;
}
else {
payload << R"("state": "PENDING")";
}
payload << "}";
}
payload << "]";
response.send(Pistache::Http::Code::Ok, payload.str());
} }
void Server::handleStopTask(const Pistache::Rest::Request &request, void Server::handleStopTask(const Pistache::Rest::Request &request,

View File

@@ -49,7 +49,8 @@ namespace daggy {
{ {
Ok = 200, Ok = 200,
Not_Found = 404, Not_Found = 404,
Not_Acceptable = 406 Not_Acceptable = 406,
Server_Error = 500
}; };
struct HTTPResponse struct HTTPResponse

View File

@@ -58,13 +58,21 @@ namespace daggy::executors::task {
std::string taskName; std::string taskName;
std::string runnerURL; std::string runnerURL;
uint32_t retries; uint32_t retries;
daggy_runner::Capacity resources;
}; };
// Resolves jobs through polling // Resolves jobs through polling
std::atomic<bool> running_; std::atomic<bool> running_;
std::thread monitorWorker_; std::thread monitorWorker_;
std::unordered_set<std::string> runners_; struct RunnerCapacity
{
daggy_runner::Capacity current;
daggy_runner::Capacity total;
};
std::mutex runnersGuard_;
std::unordered_map<std::string, RunnerCapacity> runners_;
std::mutex rtGuard_; std::mutex rtGuard_;
std::unordered_map<std::pair<DAGRunID, std::string>, RunningTask> std::unordered_map<std::pair<DAGRunID, std::string>, RunningTask>
runningTasks_; runningTasks_;

View File

@@ -234,7 +234,7 @@ namespace daggy {
curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
if (trace) { if (trace) {
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);
@@ -254,8 +254,9 @@ namespace daggy {
if (res != CURLE_OK) { if (res != CURLE_OK) {
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
throw std::runtime_error(std::string{"CURL Failed: "} + response.code = HTTPCode::Server_Error;
curl_easy_strerror(res)); response.body = std::string{"CURL Failed: "} + curl_easy_strerror(res);
return response;
} }
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
@@ -275,7 +276,18 @@ namespace daggy {
auto response = HTTP_REQUEST(url, payload, method); auto response = HTTP_REQUEST(url, payload, method);
rj::Document doc; rj::Document doc;
checkRJParse(doc.Parse(response.body.c_str())); if (response.code == HTTPCode::Server_Error) {
doc.SetObject();
auto &alloc = doc.GetAllocator();
doc.AddMember("error",
rj::Value().SetString(response.body.c_str(),
response.body.size(), alloc),
alloc);
}
else {
checkRJParse(doc.Parse(response.body.c_str()));
}
return std::make_pair(response.code, std::move(doc)); return std::make_pair(response.code, std::move(doc));
} }

View File

@@ -135,44 +135,45 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
// Capacities for a runner can be negative, meaning that they're currently // Capacities for a runner can be negative, meaning that they're currently
// oversubscribed. // oversubscribed.
std::vector<std::pair<std::string, double>> impacts; std::vector<std::pair<std::string, double>> impacts;
std::string runner;
for (const auto &runner : runners_) { {
try { std::lock_guard<std::mutex> lock(runnersGuard_);
const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); for (const auto &[runner, caps] : runners_) {
if (code != HTTPCode::Ok) { const auto result = HTTP_REQUEST(runner + "/ready");
if (result.code != 200)
continue; continue;
}
auto curCap = capacityFromJSON(doc["current"]); double cores = (caps.current.cores - taskUsed.cores);
auto totCap = capacityFromJSON(doc["total"]); double memoryMB = (caps.current.memoryMB - taskUsed.memoryMB);
double cores = (curCap.cores - taskUsed.cores);
double memoryMB = (curCap.memoryMB - taskUsed.memoryMB);
double impact = double impact =
std::min(cores / totCap.cores, memoryMB / totCap.memoryMB); std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB);
std::cout << runner << ": " << impact << std::endl;
impacts.emplace_back(runner, impact); impacts.emplace_back(runner, impact);
} }
catch (const std::exception &_) {
continue; if (impacts.empty()) {
std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
return fut;
} }
std::sort(impacts.begin(), impacts.end());
runner = impacts.back().first;
auto &caps = runners_.at(runner);
caps.current.cores -= taskUsed.cores;
caps.current.memoryMB -= taskUsed.memoryMB;
} }
if (impacts.empty()) { std::cout << "Queuing on runner: " << runner << std::endl;
std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
return fut;
}
std::sort(impacts.begin(), impacts.end());
auto runner = impacts.back();
std::stringstream ss; std::stringstream ss;
ss << runner.first << "/v1/task/" << runID << "/" << taskName; ss << runner << "/v1/task/" << runID << "/" << taskName;
auto url = ss.str(); auto url = ss.str();
const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST");
@@ -182,8 +183,9 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
RunningTask rt{.prom{}, RunningTask rt{.prom{},
.runID = runID, .runID = runID,
.taskName = taskName, .taskName = taskName,
.runnerURL = runner.first, .runnerURL = runner,
.retries = 3}; .retries = 3,
.resources = taskUsed};
auto fut = rt.prom.get_future(); auto fut = rt.prom.get_future();
@@ -200,51 +202,103 @@ bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
void DaggyRunnerTaskExecutor::addRunner(const std::string &url) void DaggyRunnerTaskExecutor::addRunner(const std::string &url)
{ {
runners_.insert(url); // Try and get the capacity
const auto &[code, doc] = JSON_HTTP_REQUEST(url + "/v1/capacity");
if (code != HTTPCode::Ok) {
std::cerr << "Failed to add runner " << url << ": "
<< doc["error"].GetString() << std::endl;
return;
}
RunnerCapacity caps{.current = capacityFromJSON(doc["current"]),
.total = capacityFromJSON(doc["total"])};
std::lock_guard<std::mutex> lock(runnersGuard_);
runners_.emplace(url, caps);
} }
void DaggyRunnerTaskExecutor::monitor() void DaggyRunnerTaskExecutor::monitor()
{ {
while (running_) { while (running_) {
{ {
std::vector<std::pair<DAGRunID, std::string>> resolvedJobs; std::unordered_map<std::pair<DAGRunID, std::string>,
std::optional<AttemptRecord>>
resolvedJobs;
std::unordered_map<std::pair<DAGRunID, std::string>, Capacity>
taskResources;
{ {
std::lock_guard<std::mutex> lock(rtGuard_); std::lock_guard<std::mutex> lock(rtGuard_);
for (auto &[taskID, task] : runningTasks_) { for (const auto &[tid, info] : runningTasks_) {
try { taskResources.emplace(tid, info.resources);
const auto &[code, json] = JSON_HTTP_REQUEST(
task.runnerURL + "/v1/task/" + std::to_string(taskID.first) +
"/" + taskID.second);
if (code != HTTPCode::Ok) {
--task.retries;
if (task.retries == 0) {
AttemptRecord record{
.rc = -1,
.executorLog = "Unable to query runner for progress"};
task.prom.set_value(std::move(record));
resolvedJobs.emplace_back(taskID);
}
continue;
}
if (json["state"] == "COMPLETED") {
auto attempt = attemptRecordFromJSON(json["attempt"]);
task.prom.set_value(std::move(attempt));
resolvedJobs.emplace_back(taskID);
}
}
catch (std::runtime_error &e) {
continue;
}
}
for (const auto &tid : resolvedJobs) {
runningTasks_.extract(tid);
} }
} }
std::this_thread::sleep_for(std::chrono::milliseconds(250)); {
std::lock_guard<std::mutex> lock(runnersGuard_);
for (auto &[runnerURL, caps] : runners_) {
try {
const auto &[code, json] =
JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
if (code != HTTPCode::Ok)
continue;
const auto tasks = json.GetArray();
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
const auto &task = tasks[idx];
if (task["state"] == "PENDING") {
resolvedJobs.emplace(
std::make_pair(task["runID"].GetInt64(),
task["taskName"].GetString()),
std::nullopt);
}
else {
auto tid = std::make_pair(task["runID"].GetInt64(),
task["taskName"].GetString());
const auto &res = taskResources.at(tid);
caps.current.cores += res.cores;
caps.current.memoryMB += res.memoryMB;
resolvedJobs.emplace(tid,
attemptRecordFromJSON(task["attempt"]));
}
}
}
catch (std::exception &e) {
std::cout << "Curl timeout failed for runner " << runnerURL << ": "
<< e.what() << std::endl;
}
}
}
std::vector<std::pair<DAGRunID, std::string>> completedTasks;
{
std::lock_guard<std::mutex> lock(rtGuard_);
for (auto &[taskID, task] : runningTasks_) {
auto it = resolvedJobs.find(taskID);
if (it == resolvedJobs.end()) {
--task.retries;
if (task.retries == 0) {
AttemptRecord record{
.rc = -1,
.executorLog = "Unable to query runner for progress"};
task.prom.set_value(std::move(record));
completedTasks.emplace_back(taskID);
}
continue;
}
else if (it->second.has_value()) {
// Task has completed
task.prom.set_value(it->second.value());
completedTasks.emplace_back(taskID);
}
}
}
for (const auto &tid : completedTasks) {
runningTasks_.extract(tid);
}
} }
std::this_thread::sleep_for(std::chrono::seconds(1));
} }
} }