diff --git a/daggyd/daggyd/daggyd.cpp b/daggyd/daggyd/daggyd.cpp index 5398ad5..69c4374 100644 --- a/daggyd/daggyd/daggyd.cpp +++ b/daggyd/daggyd/daggyd.cpp @@ -119,101 +119,99 @@ namespace dl = daggy::loggers::dag_run; std::unique_ptr loggerFactory(const rj::Value &config) { - if (config.HasMember("logger")) { - const auto &logConf = config["logger"]; - if (!logConf.IsObject()) - throw std::runtime_error("logger config is not an object"); - if (!logConf.HasMember("name")) - throw std::runtime_error("logger config is missing logger name"); - if (!logConf.HasMember("config")) - throw std::runtime_error("logger config is missing logger config"); + if (!config.HasMember("logger")) + return std::make_unique(std::cout); - std::string name = logConf["name"].GetString(); - const auto &logConfig = logConf["config"]; - if (name == "OStreamLogger") { - if (logConfig.HasMember("file")) { - std::string fn = logConfig["file"].GetString(); - if (fn == "-") - return std::make_unique(std::cout); + const auto &logConf = config["logger"]; + if (!logConf.IsObject()) + throw std::runtime_error("logger config is not an object"); + if (!logConf.HasMember("name")) + throw std::runtime_error("logger config is missing logger name"); + if (!logConf.HasMember("config")) + throw std::runtime_error("logger config is missing logger config"); - std::ofstream ofh(logConfig["file"].GetString()); - return std::make_unique(ofh); - } + std::string name = logConf["name"].GetString(); + const auto &logConfig = logConf["config"]; + if (name == "OStreamLogger") { + if (logConfig.HasMember("file")) { + std::string fn = logConfig["file"].GetString(); + if (fn == "-") + return std::make_unique(std::cout); + + std::ofstream ofh(logConfig["file"].GetString()); + return std::make_unique(ofh); } -#ifdef DAGGY_ENABLE_REDIS - else if (name == "RedisLogger") { - std::string host = "localhost"; - uint16_t port = 6379; - std::string prefix = "daggy"; - - if (logConfig.HasMember("prefix")) - prefix = logConfig["prefix"].GetString(); - if (logConfig.HasMember("host")) - host = logConfig["host"].GetString(); - if (logConfig.HasMember("port")) - port = logConfig["port"].GetInt(); - - return std::make_unique(prefix, host, port); - } -#endif - else - throw std::runtime_error("Unknown logger type: " + name); } - return std::make_unique(std::cout); +#ifdef DAGGY_ENABLE_REDIS + else if (name == "RedisLogger") { + std::string host = "localhost"; + uint16_t port = 6379; + std::string prefix = "daggy"; + + if (logConfig.HasMember("prefix")) + prefix = logConfig["prefix"].GetString(); + if (logConfig.HasMember("host")) + host = logConfig["host"].GetString(); + if (logConfig.HasMember("port")) + port = logConfig["port"].GetInt(); + + return std::make_unique(prefix, host, port); + } +#endif + throw std::runtime_error("Unknown logger type: " + name); } namespace de = daggy::executors::task; std::unique_ptr executorFactory(const rj::Value &config) { - if (config.HasMember("executor")) { - const auto &execConf = config["executor"]; - if (!execConf.IsObject()) - throw std::runtime_error("Executor config is not an object"); - if (!execConf.HasMember("name")) - throw std::runtime_error("Executor config is missing name"); - if (!execConf.HasMember("config")) - throw std::runtime_error("Executor config is missing config"); - std::string name = execConf["name"].GetString(); - const auto &execConfig = execConf["config"]; + if (!config.HasMember("executor")) + return std::make_unique(10); - if (name == "ForkingTaskExecutor") { - size_t threads = 10; - if (execConfig.HasMember("threads")) - threads = execConfig["threads"].GetInt64(); - return std::make_unique(threads); - } + const auto &execConf = config["executor"]; + if (!execConf.IsObject()) + throw std::runtime_error("Executor config is not an object"); + if (!execConf.HasMember("name")) + throw std::runtime_error("Executor config is missing name"); + if (!execConf.HasMember("config")) + throw std::runtime_error("Executor config is missing config"); + std::string name = execConf["name"].GetString(); + const auto &execConfig = execConf["config"]; + + if (name == "ForkingTaskExecutor") { + size_t threads = 10; + if (execConfig.HasMember("threads")) + threads = execConfig["threads"].GetInt64(); + return std::make_unique(threads); + } #ifdef DAGGY_ENABLE_SLURM - else if (name == "SlurmTaskExecutor") { - return std::make_unique(); - } + else if (name == "SlurmTaskExecutor") { + return std::make_unique(); + } #endif - else if (name == "DaggyRunnerTaskExecutor") { - if (!execConfig.HasMember("runners")) - throw std::runtime_error( - "DaggyRunnerExecutor config needs at least one remote runner"); + else if (name == "DaggyRunnerTaskExecutor") { + if (!execConfig.HasMember("runners")) + throw std::runtime_error( + "DaggyRunnerExecutor config needs at least one remote runner"); - auto exe = std::make_unique(); + auto exe = std::make_unique(); - const auto &runners = execConfig["runners"]; - if (!runners.IsArray()) { + const auto &runners = execConfig["runners"]; + if (!runners.IsArray()) + throw std::runtime_error( + "DaggyRunnerExecutor runners must be an array of urls"); + + for (size_t i = 0; i < runners.Size(); ++i) { + if (!runners[i].IsString()) throw std::runtime_error( "DaggyRunnerExecutor runners must be an array of urls"); - - for (size_t i = 0; i < runners.Size(); ++i) { - if (!runners[i].IsString()) - throw std::runtime_error( - "DaggyRunnerExecutor runners must be an array of urls"); - exe->addRunner(runners[i].GetString()); - } - return exe; - } + exe->addRunner(runners[i].GetString()); } - else - throw std::runtime_error("Unknown executor type: " + name); + + return exe; } - return std::make_unique(10); + throw std::runtime_error("Unknown executor type: " + name); } int main(int argc, char **argv) diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 5537113..4432b5e 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -159,6 +159,8 @@ namespace daggy::daggyr { .resourcesUsed = resourcesUsed}); } + std::cout << "Enqueued " << runID << " / " << taskName << std::endl; + response.send(Pistache::Http::Code::Ok, ""); } @@ -175,6 +177,7 @@ namespace daggy::daggyr { std::string payload; + // Check to see if it's pending bool found = false; { std::lock_guard lock(pendingGuard_); @@ -193,11 +196,13 @@ namespace daggy::daggyr { curCapacity_.cores += it->second.resourcesUsed.cores; curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB; } + std::cout << "Resolved " << it->first.first << " / " + << it->first.second << std::endl; } - else { - payload = R"({ "state": "RUNNING" })"; - found = true; - } + } + else { + payload = R"({ "state": "RUNNING" })"; + found = true; } } @@ -211,6 +216,7 @@ namespace daggy::daggyr { payload = R"({ "state": "COMPLETED", "attempt": )" + attemptRecordToJSON(it->second) + "}"; } + response.send(Pistache::Http::Code::Ok, payload); } diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 1aed18a..89b07a7 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -55,6 +55,7 @@ namespace daggy::executors::task { DAGRunID runID; std::string taskName; std::string runnerURL; + uint32_t retries; }; // Resolves jobs through polling diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp index 164fbc8..d4c0fad 100644 --- a/libdaggy/src/DAGRunner.cpp +++ b/libdaggy/src/DAGRunner.cpp @@ -106,8 +106,12 @@ namespace daggy { taskAttemptCounts_[taskName] = 1; logger_.updateTaskState(runID_, taskName, RunState::RUNNING); - runningTasks_.emplace(taskName, - executor_.execute(runID_, taskName, task)); + try { + auto fut = executor_.execute(runID_, taskName, task); + runningTasks_.emplace(taskName, std::move(fut)); + } + catch (std::exception &e) { + } ++nRunningTasks_; auto nextTask = dag_.visitNext(); diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index 894cc6d..52b10a9 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -234,6 +234,7 @@ namespace daggy { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index c6fa415..ee0ae4a 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -135,6 +135,7 @@ std::future DaggyRunnerTaskExecutor::execute( // Capacities for a runner can be negative, meaning that they're currently // oversubscribed. std::vector> impacts; + for (const auto &runner : runners_) { try { const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); @@ -158,8 +159,14 @@ std::future DaggyRunnerTaskExecutor::execute( } } - if (impacts.empty()) - throw std::runtime_error("No runners available for execution"); + if (impacts.empty()) { + std::promise prom; + auto fut = prom.get_future(); + AttemptRecord record{.rc = -1, + .executorLog = "No runners available for execution"}; + prom.set_value(std::move(record)); + return fut; + } auto cit = impacts.begin(); for (auto it = impacts.begin(); it != impacts.end(); ++it) { @@ -167,8 +174,19 @@ std::future DaggyRunnerTaskExecutor::execute( cit = it; } - RunningTask rt{ - .prom{}, .runID = runID, .taskName = taskName, .runnerURL = cit->first}; + std::stringstream ss; + ss << cit->first << "/v1/task/" << runID << "/" << taskName; + auto url = ss.str(); + + const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); + if (response.code != HTTPCode::Ok) + throw std::runtime_error("Unable to submit task: " + response.body); + + RunningTask rt{.prom{}, + .runID = runID, + .taskName = taskName, + .runnerURL = cit->first, + .retries = 3}; auto fut = rt.prom.get_future(); @@ -194,34 +212,42 @@ void DaggyRunnerTaskExecutor::monitor() { std::vector> resolvedJobs; - std::lock_guard lock(rtGuard_); - for (auto &[taskID, task] : runningTasks_) { - try { - const auto &[code, json] = JSON_HTTP_REQUEST( - task.runnerURL + "/v1/task/" + std::to_string(taskID.first) + - "/" + taskID.second); - if (code != HTTPCode::Ok) { - AttemptRecord record{ - .rc = -1, .executorLog = "Unable to query runner for progress"}; - task.prom.set_value(std::move(record)); - resolvedJobs.emplace_back(taskID); + { + std::lock_guard lock(rtGuard_); + for (auto &[taskID, task] : runningTasks_) { + try { + const auto &[code, json] = JSON_HTTP_REQUEST( + task.runnerURL + "/v1/task/" + std::to_string(taskID.first) + + "/" + taskID.second); + if (code != HTTPCode::Ok) { + --task.retries; + + if (task.retries == 0) { + AttemptRecord record{ + .rc = -1, + .executorLog = "Unable to query runner for progress"}; + task.prom.set_value(std::move(record)); + resolvedJobs.emplace_back(taskID); + } + continue; + } + + if (json["state"] == "COMPLETED") { + auto attempt = attemptRecordFromJSON(json["attempt"]); + task.prom.set_value(std::move(attempt)); + resolvedJobs.emplace_back(taskID); + } + } + catch (std::runtime_error &e) { continue; } - - if (json["state"] == "COMPLETED") { - task.prom.set_value(attemptRecordFromJSON(json["attempt"])); - resolvedJobs.emplace_back(taskID); - } } - catch (std::runtime_error &e) { - continue; - } - for (const auto &tid : resolvedJobs) { runningTasks_.extract(tid); } } - std::this_thread::sleep_for(std::chrono::seconds(1)); + + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } } }