From 8d00621908b90d0c818872f303694caae7606b73 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 16 Dec 2021 12:16:12 -0400 Subject: [PATCH] Adding support for remote execution daemons. Squashed commit of the following: commit 69d5ef7a256b86a86d46e5ae374c00fded1497ea Author: Ian Roddis Date: Thu Dec 16 12:15:55 2021 -0400 Updating readme commit 94a9f676d0f9cc0b55cdc18c4927eaea40d82c77 Author: Ian Roddis Date: Thu Dec 16 12:05:36 2021 -0400 Fixing serialization of attempt records when querying entire dag commit 945e5f90b24abf07c9af1bc4c6bbcb33e93b8069 Author: Ian Roddis Date: Thu Dec 16 11:37:59 2021 -0400 Compiles cleanly... commit 8b23e46081d47fb80dc1a2d998fc6dc4bbf301a8 Author: Ian Roddis Date: Thu Dec 16 10:43:03 2021 -0400 Adding in missing source file to cmake build list commit 6d10d9791206e2bc15788beadeea580b8e43a853 Author: Ian Roddis Date: Thu Dec 16 10:41:43 2021 -0400 Adding new executors commit 42a2c67f4d6ae99df95d917c8621d78cd99837a1 Author: Ian Roddis Date: Thu Dec 16 10:27:14 2021 -0400 Fixing missing curl cmake dependency commit 394bc4c5d51ecee7bf14712f719c8bf7e97fb0fa Author: Ian Roddis Date: Thu Dec 16 10:21:58 2021 -0400 Fixing missing curl cmake dependency commit dd9efc8e7e7770ea1bcbccb70a1af9cfcff0414c Author: Ian Roddis Date: Wed Dec 15 17:15:38 2021 -0400 Checkpointing progress commit 3b3b55d6037bb96e46de6763f486f4ecb92fe6a0 Author: Ian Roddis Date: Wed Dec 15 14:21:18 2021 -0400 updating readme commit 303027c11452941b2a0c0d1b04ac5942e79efd74 Author: Ian Roddis Date: Wed Dec 15 14:17:16 2021 -0400 Namespacing daggyd Adding more error checking around deserialization of parameters Adding tests for runner agent commit c592eaeba12e2a449bae401e8c1d9ed236416d52 Author: Ian Roddis Date: Wed Dec 15 11:20:21 2021 -0400 Checkpointing work commit fb1862d1cefe2b53a98659cce3c8c73d88bf5d84 Author: Ian Roddis Date: Wed Dec 15 09:52:29 2021 -0400 Copying daggyd for daggyr template, adding in basic routes --- CMakeLists.txt | 5 +- README.md | 22 +- daggyd/daggyd/daggyd.cpp | 24 +- daggyd/libdaggyd/include/daggyd/Server.hpp | 4 +- daggyd/libdaggyd/src/Server.cpp | 13 +- daggyd/tests/unit_server.cpp | 139 +--------- daggyr/CMakeLists.txt | 3 + daggyr/README.md | 68 +++++ daggyr/daggyr/CMakeLists.txt | 4 + daggyr/daggyr/daggyr.cpp | 193 +++++++++++++ daggyr/libdaggyr/CMakeLists.txt | 8 + daggyr/libdaggyr/include/daggyr/Server.hpp | 84 ++++++ daggyr/libdaggyr/src/CMakeLists.txt | 3 + daggyr/libdaggyr/src/Server.cpp | 259 ++++++++++++++++++ daggyr/tests/CMakeLists.txt | 9 + daggyr/tests/main.cpp | 15 + daggyr/tests/unit_server.cpp | 172 ++++++++++++ libdaggy/CMakeLists.txt | 2 +- libdaggy/include/daggy/Utilities.hpp | 47 ++++ .../task/DaggyRunnerTaskExecutor.hpp | 69 +++++ .../executors/task/ForkingTaskExecutor.hpp | 4 + libdaggy/src/Serialization.cpp | 6 +- libdaggy/src/Utilities.cpp | 110 ++++++++ libdaggy/src/executors/task/CMakeLists.txt | 1 + .../task/DaggyRunnerTaskExecutor.cpp | 227 +++++++++++++++ .../executors/task/ForkingTaskExecutor.cpp | 42 +-- 26 files changed, 1373 insertions(+), 160 deletions(-) create mode 100644 daggyr/CMakeLists.txt create mode 100644 daggyr/README.md create mode 100644 daggyr/daggyr/CMakeLists.txt create mode 100644 daggyr/daggyr/daggyr.cpp create mode 100644 daggyr/libdaggyr/CMakeLists.txt create mode 100644 daggyr/libdaggyr/include/daggyr/Server.hpp create mode 100644 daggyr/libdaggyr/src/CMakeLists.txt create mode 100644 daggyr/libdaggyr/src/Server.cpp create mode 100644 daggyr/tests/CMakeLists.txt create mode 100644 daggyr/tests/main.cpp create mode 100644 daggyr/tests/unit_server.cpp create mode 100644 libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp create mode 100644 libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 99571ff..e65c5f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,8 +11,8 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS True) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror") if(CMAKE_BUILD_TYPE MATCHES "Debug") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fno-omit-frame-pointer") - set(TSAN_OPTIONS "suppressions=${CMAKE_CURRENT_DIR}/tests/tsan.supp") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer") + # set(TSAN_OPTIONS "suppressions=${CMAKE_CURRENT_DIR}/tests/tsan.supp") endif() set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party) @@ -48,3 +48,4 @@ enable_testing() add_subdirectory(libdaggy) add_subdirectory(daggyd) +add_subdirectory(daggyr) diff --git a/README.md b/README.md index 53c6a78..ecb1b13 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ graph LR Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html) -or [kubernetes](https://kubernetes.io/) (planned). +or [kubernetes](https://kubernetes.io/) (planned), or daggy's own executor. State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and [RedisJSON](https://oss.redis.com/redisjson/). @@ -380,6 +380,26 @@ For this reason, it's important that the `tmpDir` directory **be readable by the environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy, although it will still be available wherever it was written by slurm. +DaggyRunnerTaskExecutor +----------------------- + +Daggy Runners (`daggyr` in this project) are daemons that can be run on remote hosts, then allocated work. + +Tasks submitted to this type of runner require `cores` and `memoryMB` attributes. Remote runners have a specific +capacity that are consumed when tasks run on them. Right now those capacities are merely advisory; it's possible +to oversubscribe a runner, and the constraints are not enforced. + +Enforcement via cgroups is planned. + + +| Field | Sample | Description | +|---------|--------|--------------| +| command | `[ "/usr/bin/echo", "param1" ]` | The command to run | +| commandString | `"/usr/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. | +| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script | +| cores | "1" | Number of cores required by the task | +| memoryMB | "100" | Amount of memory (RSS) required by the task, in MB | + Loggers ======= diff --git a/daggyd/daggyd/daggyd.cpp b/daggyd/daggyd/daggyd.cpp index d1a0421..7aa8471 100644 --- a/daggyd/daggyd/daggyd.cpp +++ b/daggyd/daggyd/daggyd.cpp @@ -10,6 +10,7 @@ #include // Add executors here +#include #include #include @@ -177,6 +178,27 @@ std::unique_ptr executorFactory(const rj::Value &config) else if (name == "SlurmTaskExecutor") { return std::make_unique(); } + else if (name == "DaggyRunnerTaskExecutor") { + if (!execConfig.HasMember("runners")) + throw std::runtime_error( + "DaggyRunnerExecutor config needs at least one remote runner"); + + auto exe = std::make_unique(); + + const auto &runners = execConfig["runners"]; + if (!runners.IsArray()) { + throw std::runtime_error( + "DaggyRunnerExecutor runners must be an array of urls"); + + for (size_t i = 0; i < runners.Size(); ++i) { + if (!runners[i].IsString()) + throw std::runtime_error( + "DaggyRunnerExecutor runners must be an array of urls"); + exe->addRunner(runners[i].GetString()); + } + return exe; + } + } else throw std::runtime_error("Unknown executor type: " + name); } @@ -246,7 +268,7 @@ int main(int argc, char **argv) Pistache::Address listenSpec(listenIP, listenPort); - daggy::Server server(listenSpec, *logger, *executor, dagThreads); + daggy::daggyd::Server server(listenSpec, *logger, *executor, dagThreads); server.init(webThreads); server.start(); diff --git a/daggyd/libdaggyd/include/daggyd/Server.hpp b/daggyd/libdaggyd/include/daggyd/Server.hpp index 572488c..b9b7d91 100644 --- a/daggyd/libdaggyd/include/daggyd/Server.hpp +++ b/daggyd/libdaggyd/include/daggyd/Server.hpp @@ -16,7 +16,7 @@ namespace fs = std::filesystem; -namespace daggy { +namespace daggy::daggyd { class Server { public: @@ -64,4 +64,4 @@ namespace daggy { std::mutex runnerGuard_; std::unordered_map> runners_; }; -} // namespace daggy +} // namespace daggy::daggyd diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 6b5462f..4e9ebf8 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -18,7 +18,7 @@ using namespace Pistache; -namespace daggy { +namespace daggy::daggyd { void Server::init(size_t threads) { auto opts = Http::Endpoint::options() @@ -305,14 +305,7 @@ namespace daggy { else { ss << ','; } - ss << '{' << R"("startTime":)" - << std::quoted(timePointToString(attempt.startTime)) << ',' - << R"("stopTime":)" - << std::quoted(timePointToString(attempt.stopTime)) << ',' - << R"("rc":)" << attempt.rc << ',' << R"("outputLog":)" - << std::quoted(attempt.outputLog) << ',' << R"("errorLog":)" - << std::quoted(attempt.errorLog) << ',' << R"("executorLog":)" - << std::quoted(attempt.executorLog) << '}'; + ss << attemptRecordToJSON(attempt); } ss << ']'; } @@ -511,4 +504,4 @@ namespace daggy { { return true; } -} // namespace daggy +} // namespace daggy::daggyd diff --git a/daggyd/tests/unit_server.cpp b/daggyd/tests/unit_server.cpp index 1049c41..8da5520 100644 --- a/daggyd/tests/unit_server.cpp +++ b/daggyd/tests/unit_server.cpp @@ -17,118 +17,6 @@ namespace rj = rapidjson; using namespace daggy; -#ifdef DEBUG_HTTP -static int my_trace(CURL *handle, curl_infotype type, char *data, size_t size, - void *userp) -{ - const char *text; - (void)handle; /* prevent compiler warning */ - (void)userp; - - switch (type) { - case CURLINFO_TEXT: - fprintf(stderr, "== Info: %s", data); - default: /* in case a new one is introduced to shock us */ - return 0; - - case CURLINFO_HEADER_OUT: - text = "=> Send header"; - break; - case CURLINFO_DATA_OUT: - text = "=> Send data"; - break; - case CURLINFO_SSL_DATA_OUT: - text = "=> Send SSL data"; - break; - case CURLINFO_HEADER_IN: - text = "<= Recv header"; - break; - case CURLINFO_DATA_IN: - text = "<= Recv data"; - break; - case CURLINFO_SSL_DATA_IN: - text = "<= Recv SSL data"; - break; - } - - std::cerr << "\n================== " << text - << " ==================" << std::endl - << data << std::endl; - return 0; -} -#endif - -enum HTTPCode : long -{ - Ok = 200, - Not_Found = 404 -}; - -struct HTTPResponse -{ - HTTPCode code; - std::string body; -}; - -uint curlWriter(char *in, uint size, uint nmemb, std::stringstream *out) -{ - uint r; - r = size * nmemb; - out->write(in, r); - return r; -} - -HTTPResponse REQUEST(const std::string &url, const std::string &payload = "", - const std::string &method = "GET") -{ - HTTPResponse response; - - CURL *curl; - CURLcode res; - struct curl_slist *headers = NULL; - - curl_global_init(CURL_GLOBAL_ALL); - - curl = curl_easy_init(); - if (curl) { - std::stringstream buffer; - -#ifdef DEBUG_HTTP - curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, my_trace); - curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); -#endif - - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - - if (!payload.empty()) { - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, payload.size()); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload.c_str()); - headers = curl_slist_append(headers, "Content-Type: Application/Json"); - } - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str()); - headers = curl_slist_append(headers, "Expect:"); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - - res = curl_easy_perform(curl); - - if (res != CURLE_OK) { - curl_easy_cleanup(curl); - throw std::runtime_error(std::string{"CURL Failed: "} + - curl_easy_strerror(res)); - } - curl_easy_cleanup(curl); - - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code); - response.body = buffer.str(); - } - - curl_global_cleanup(); - - return response; -} - TEST_CASE("rest_endpoint", "[server_basic]") { std::stringstream ss; @@ -138,7 +26,7 @@ TEST_CASE("rest_endpoint", "[server_basic]") const size_t nDAGRunners = 10, nWebThreads = 10; - daggy::Server server(listenSpec, logger, executor, nDAGRunners); + daggy::daggyd::Server server(listenSpec, logger, executor, nDAGRunners); server.init(nWebThreads); server.start(); @@ -147,13 +35,13 @@ TEST_CASE("rest_endpoint", "[server_basic]") SECTION("Ready Endpoint") { - auto response = REQUEST(baseURL + "/ready"); + auto response = HTTP_REQUEST(baseURL + "/ready"); REQUIRE(response.code == HTTPCode::Ok); } SECTION("Querying a non-existent dagrunid should fail ") { - auto response = REQUEST(baseURL + "/v1/dagrun/100"); + auto response = HTTP_REQUEST(baseURL + "/v1/dagrun/100"); REQUIRE(response.code != HTTPCode::Ok); } @@ -175,7 +63,7 @@ TEST_CASE("rest_endpoint", "[server_basic]") // Submit, and get the runID daggy::DAGRunID runID = 0; { - auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun, "POST"); + auto response = HTTP_REQUEST(baseURL + "/v1/dagrun/", dagRun, "POST"); REQUIRE(response.code == HTTPCode::Ok); rj::Document doc; @@ -188,7 +76,7 @@ TEST_CASE("rest_endpoint", "[server_basic]") // Ensure our runID shows up in the list of running DAGs { - auto response = REQUEST(baseURL + "/v1/dagruns?all=1"); + auto response = HTTP_REQUEST(baseURL + "/v1/dagruns?all=1"); REQUIRE(response.code == HTTPCode::Ok); rj::Document doc; @@ -217,8 +105,8 @@ TEST_CASE("rest_endpoint", "[server_basic]") // Ensure we can get one of our tasks { - auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID) + - "/task/cat_0"); + auto response = HTTP_REQUEST(baseURL + "/v1/dagrun/" + + std::to_string(runID) + "/task/cat_0"); REQUIRE(response.code == HTTPCode::Ok); rj::Document doc; @@ -233,7 +121,8 @@ TEST_CASE("rest_endpoint", "[server_basic]") // Wait until our DAG is complete bool complete = true; for (auto i = 0; i < 10; ++i) { - auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID)); + auto response = + HTTP_REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID)); REQUIRE(response.code == HTTPCode::Ok); rj::Document doc; daggy::checkRJParse(doc.Parse(response.body.c_str())); @@ -281,7 +170,7 @@ TEST_CASE("Server cancels and resumes execution", "[server_resume]") const size_t nDAGRunners = 10, nWebThreads = 10; - daggy::Server server(listenSpec, logger, executor, nDAGRunners); + daggy::daggyd::Server server(listenSpec, logger, executor, nDAGRunners); server.init(nWebThreads); server.start(); @@ -304,7 +193,7 @@ TEST_CASE("Server cancels and resumes execution", "[server_resume]") // Submit, and get the runID daggy::DAGRunID runID; { - auto response = REQUEST(baseURL + "/v1/dagrun/", dagRunJSON, "POST"); + auto response = HTTP_REQUEST(baseURL + "/v1/dagrun/", dagRunJSON, "POST"); REQUIRE(response.code == HTTPCode::Ok); rj::Document doc; @@ -319,7 +208,7 @@ TEST_CASE("Server cancels and resumes execution", "[server_resume]") // Stop the current run { - auto response = REQUEST( + auto response = HTTP_REQUEST( baseURL + "/v1/dagrun/" + std::to_string(runID) + "/state/KILLED", "", "PATCH"); REQUIRE(response.code == HTTPCode::Ok); @@ -342,7 +231,7 @@ TEST_CASE("Server cancels and resumes execution", "[server_resume]") { auto url = baseURL + "/v1/dagrun/" + std::to_string(runID) + "/task/sleep_B_0/state/QUEUED"; - auto response = REQUEST(url, "", "PATCH"); + auto response = HTTP_REQUEST(url, "", "PATCH"); REQUIRE(response.code == HTTPCode::Ok); REQUIRE(logger.getTaskState(runID, "sleep_B_0") == +daggy::RunState::QUEUED); @@ -355,7 +244,7 @@ TEST_CASE("Server cancels and resumes execution", "[server_resume]") lstat("resume_touch_A", &s); auto preMTime = s.st_mtim.tv_sec; - auto response = REQUEST( + auto response = HTTP_REQUEST( baseURL + "/v1/dagrun/" + std::to_string(runID) + "/state/QUEUED", "", "PATCH"); diff --git a/daggyr/CMakeLists.txt b/daggyr/CMakeLists.txt new file mode 100644 index 0000000..f4a80cd --- /dev/null +++ b/daggyr/CMakeLists.txt @@ -0,0 +1,3 @@ +add_subdirectory(libdaggyr) +add_subdirectory(daggyr) +add_subdirectory(tests) diff --git a/daggyr/README.md b/daggyr/README.md new file mode 100644 index 0000000..24d280a --- /dev/null +++ b/daggyr/README.md @@ -0,0 +1,68 @@ +# Daggy Runner + +`daggyr` is a REST server process that acts as a remote task executor. + +# Running it + +```bash +daggyr # That's it, will listen on 127.0.0.1:2504 , and run with a local executor +daggyr -d # Daemonize + +daggyr --config FILE # Run with a config file +``` + +# Capacity and Allocation + +On startup, a server's capacity is determined automatically. The capacities are: + +| Capacity | Determined by | Default | Notes | +|-----------|---------------------------------------|-----------------------------|----------------------------------| +| cores | `std::thread::hardware_concurrency()` | `max(1, max - 2)` | A value of 0 will mean all cores | +| memory_mb | `sysinfo.h` | `max(100, totalram * 0.75)` | `totalram` is converted to MB | + +When a `daggyd` process is selecting a runner to send a task to, it will +query the current capacities, and choose the runner that: + +- Can satisfy the requirements of the task +- Has the lowest impact, which is the largest relative drop in available capacity across all capacities. + +For instance, if a job were submitted that requires 2 cores and 5g of memory, +and three runners reported the following capacities: + +| Runner | free_cores | impact_cores | free_memory | impact_memory | max_impact | +|--------|------------|--------------|-------------|---------------|------------| +| 1 | 70 | 2.8% | 20g | 25.00% | 25% | +| 2 | 4 | 50.0% | 80g | 6.25% | 50% | +| 3 | 10 | 20.0% | 30g | 16.67% | 20% | + +Runner 3 would be selected. Even though it doesn't have the most memory +or CPU capacity, allocating the job to it minimizes the impact to the +overall availability. + +# Submission and Execution + +Tasks submitted to the runner will be executed with [cgroups](https://www.man7.org/linux/man-pages/man7/cgroups.7.html) +to enforce limits. + +Jobs are submitted asynchronously, and rely on the client to poll for +results using the `GET /api/v1/task/:task_id` to get the resulting +TaskAttempt. + +Runners are **stateless**, meaning that killing one will kill any +running tasks and any stored results will be lost. + +# Config Files + +```json +{ + "web-threads": 50, + "port": 2504, + "ip": "localhost", + "capacity_overrides": { + "cores": 10, + "memory_mb": 100 + } +} +``` + +Capacities can be overriden from the auto-discovered results. diff --git a/daggyr/daggyr/CMakeLists.txt b/daggyr/daggyr/CMakeLists.txt new file mode 100644 index 0000000..603e8f2 --- /dev/null +++ b/daggyr/daggyr/CMakeLists.txt @@ -0,0 +1,4 @@ +project(daggyr) +file(GLOB SOURCES daggyr.cpp) +add_executable(${PROJECT_NAME} ${SOURCES}) +target_link_libraries(${PROJECT_NAME} argparse libdaggyr libdaggy curl) diff --git a/daggyr/daggyr/daggyr.cpp b/daggyr/daggyr/daggyr.cpp new file mode 100644 index 0000000..52fddee --- /dev/null +++ b/daggyr/daggyr/daggyr.cpp @@ -0,0 +1,193 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +// Add executors here +#include +#include + +// Add loggers here +#include +#include +#include +#include + +namespace rj = rapidjson; + +static std::atomic running{true}; + +void signalHandler(int signal) +{ + switch (signal) { + case SIGHUP: + break; + case SIGINT: + case SIGTERM: + running = false; + break; + default: + break; + } +} + +void daemonize() +{ + pid_t pid; + + struct sigaction newSigAction; + sigset_t newSigSet; + + /* Check if parent process id is set */ + if (getppid() == 1) { + return; + } + + /* Set signal mask - signals we want to block */ + sigemptyset(&newSigSet); + sigaddset(&newSigSet, + SIGCHLD); /* ignore child - i.e. we don't need to wait for it */ + sigaddset(&newSigSet, SIGTSTP); /* ignore Tty stop signals */ + sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */ + sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */ + sigprocmask(SIG_BLOCK, &newSigSet, + nullptr); /* Block the above specified signals */ + + /* Set up a signal handler */ + newSigAction.sa_handler = signalHandler; + sigemptyset(&newSigAction.sa_mask); + newSigAction.sa_flags = 0; + + /* Signals to handle */ + sigaction(SIGHUP, &newSigAction, nullptr); /* catch hangup signal */ + sigaction(SIGTERM, &newSigAction, nullptr); /* catch term signal */ + sigaction(SIGINT, &newSigAction, nullptr); /* catch interrupt signal */ + + // Fork once + pid = fork(); + if (pid < 0) { + exit(EXIT_FAILURE); + } + if (pid > 0) { + exit(EXIT_SUCCESS); + } + + /* On success: The child process becomes session leader */ + if (setsid() < 0) { + std::cerr << "Unable to setsid" << std::endl; + exit(EXIT_FAILURE); + } + + /* Catch, ignore and handle signals */ + signal(SIGCHLD, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + /* Fork off for the second time*/ + pid = fork(); + if (pid < 0) + exit(EXIT_FAILURE); + if (pid > 0) + exit(EXIT_SUCCESS); + + umask(0); + + /* Change the working directory to the root directory */ + /* or another appropriated directory */ + auto rc = chdir("/"); + (void)rc; + + /* Close all open file descriptors */ + for (int x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { + close(x); + } +} + +int main(int argc, char **argv) +{ + argparse::ArgumentParser args("Daggy"); + + args.add_argument("-v", "--verbose") + .default_value(false) + .implicit_value(true); + args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); + args.add_argument("--config").default_value(std::string{}); + + try { + args.parse_args(argc, argv); + } + catch (std::exception &e) { + std::cout << "Error: " << e.what() << std::endl; + std::cout << args; + exit(1); + } + + struct sysinfo systemInfo; + + sysinfo(&systemInfo); + + bool verbose = args.get("--verbose"); + bool asDaemon = args.get("--daemon"); + auto configFile = args.get("--config"); + std::string listenIP = "127.0.0.1"; + int listenPort = 2504; + size_t webThreads = 50; + ssize_t maxCores = std::max(1U, std::thread::hardware_concurrency() - 2); + ssize_t maxMemoryMB = + std::max((systemInfo.totalram / (1024 * 1024) * 0.75), 100.0); + + if (!configFile.empty()) { + std::ifstream ifh(configFile); + std::string config; + std::getline(ifh, config, '\0'); + ifh.close(); + + rj::Document doc; + daggy::checkRJParse(doc.Parse(config.c_str())); + + if (doc.HasMember("ip")) + listenIP = doc["ip"].GetString(); + if (doc.HasMember("port")) + listenPort = doc["port"].GetInt(); + if (doc.HasMember("web-threads")) + webThreads = doc["web-threads"].GetInt64(); + if (doc.HasMember("capacity-overrides")) { + const auto &co = doc["capacity-overrides"]; + if (co.HasMember("cores")) + maxCores = co["cores"].GetInt64(); + if (co.HasMember("memoryMB")) + maxCores = co["memoryMB"].GetInt64(); + } + } + + if (verbose) { + std::cout << "Server running at http://" << listenIP << ':' << listenPort + << std::endl + << "Max Cores: " << maxCores << std::endl + << "Max Memory: " << maxMemoryMB << " MB" << std::endl + << "Max Web Clients: " << webThreads << std::endl + << std::endl + << "Ctrl-C to exit" << std::endl; + } + + if (asDaemon) { + daemonize(); + } + + Pistache::Address listenSpec(listenIP, listenPort); + daggy::daggyr::Server server(listenSpec, maxCores, maxMemoryMB); + server.init(webThreads); + server.start(); + + running = true; + while (running) { + std::this_thread::sleep_for(std::chrono::seconds(30)); + } + server.shutdown(); +} diff --git a/daggyr/libdaggyr/CMakeLists.txt b/daggyr/libdaggyr/CMakeLists.txt new file mode 100644 index 0000000..6765cb6 --- /dev/null +++ b/daggyr/libdaggyr/CMakeLists.txt @@ -0,0 +1,8 @@ +project(libdaggyr) + +add_library(${PROJECT_NAME} STATIC) + +target_include_directories(${PROJECT_NAME} PUBLIC include) +target_link_libraries(${PROJECT_NAME} libdaggy stdc++fs) + +add_subdirectory(src) diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp new file mode 100644 index 0000000..6cd335a --- /dev/null +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#define DAGGY_REST_HANDLER(func) \ + void func(const Pistache::Rest::Request &request, \ + Pistache::Http::ResponseWriter response); + +namespace fs = std::filesystem; +using namespace daggy::executors::task::daggy_runner; + +namespace daggy::daggyr { + + class Server + { + public: + Server(const Pistache::Address &listenSpec, ssize_t maxCores, + ssize_t maxMemoryMB); + ~Server(); + + Server &setSSLCertificates(const fs::path &cert, const fs::path &key); + + void init(size_t threads = 1); + + void start(); + + uint16_t getPort() const; + + void shutdown(); + + static void validateTask(const Task &task); + + private: + void createDescription(); + + bool handleAuth(const Pistache::Rest::Request &request); + + DAGGY_REST_HANDLER(handleReady); + DAGGY_REST_HANDLER(handleGetCapacity); + DAGGY_REST_HANDLER(handleRunTask); + DAGGY_REST_HANDLER(handleGetTask); + DAGGY_REST_HANDLER(handleStopTask); + DAGGY_REST_HANDLER(handleValidateTask); + + Pistache::Http::Endpoint endpoint_; + Pistache::Rest::Description desc_; + Pistache::Rest::Router router_; + + executors::task::ForkingTaskExecutor executor_; + + struct TaskRecord + { + RunState state; + AttemptRecord attempt; + }; + + std::mutex capacityGuard_; + Capacity maxCapacity_; + Capacity curCapacity_; + + std::mutex pendingGuard_; + + struct PendingJob + { + std::future fut; + Capacity resourcesUsed; + }; + + std::unordered_map, PendingJob> pending_; + + std::mutex resultsGuard_; + std::unordered_map, AttemptRecord> + results_; + }; +} // namespace daggy::daggyr diff --git a/daggyr/libdaggyr/src/CMakeLists.txt b/daggyr/libdaggyr/src/CMakeLists.txt new file mode 100644 index 0000000..9f09ece --- /dev/null +++ b/daggyr/libdaggyr/src/CMakeLists.txt @@ -0,0 +1,3 @@ +target_sources(${PROJECT_NAME} PRIVATE + Server.cpp +) diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp new file mode 100644 index 0000000..5537113 --- /dev/null +++ b/daggyr/libdaggyr/src/Server.cpp @@ -0,0 +1,259 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define REQ_RESPONSE(code, msg) \ + std::stringstream ss; \ + ss << R"({"message": )" << std::quoted(msg) << "}"; \ + response.send(Pistache::Http::Code::code, ss.str()); \ + return; + +using namespace Pistache; + +namespace daggy::daggyr { + void Server::init(size_t threads) + { + auto opts = Http::Endpoint::options() + .threads(threads) + .flags(Pistache::Tcp::Options::ReuseAddr | + Pistache::Tcp::Options::ReusePort) + .maxRequestSize(102400) + .maxResponseSize(102400); + endpoint_.init(opts); + createDescription(); + } + + Server::Server(const Pistache::Address &listenSpec, ssize_t maxCores, + ssize_t maxMemoryMB) + : endpoint_(listenSpec) + , desc_("Daggy Runner API", "0.1") + , executor_(maxCores) + , maxCapacity_{maxCores, maxMemoryMB} + , curCapacity_{maxCores, maxMemoryMB} + { + } + + Server::~Server() + { + shutdown(); + } + + void Server::start() + { + router_.initFromDescription(desc_); + + endpoint_.setHandler(router_.handler()); + endpoint_.serveThreaded(); + } + + Server &Server::setSSLCertificates(const fs::path &cert, const fs::path &key) + { + endpoint_.useSSL(cert, key); + return *this; + } + + void Server::shutdown() + { + endpoint_.shutdown(); + } + + uint16_t Server::getPort() const + { + return endpoint_.getPort(); + } + + void Server::createDescription() + { + desc_.info().license("MIT", "https://opensource.org/licenses/MIT"); + + desc_.schemes(Rest::Scheme::Http) + .basePath("/v1") + .produces(MIME(Application, Json)) + .consumes(MIME(Application, Json)); + + desc_.route(desc_.get("/ready")) + .bind(&Server::handleReady, this) + .response(Http::Code::Ok, "Response to the /ready call") + .hide(); + + auto versionPath = desc_.path("/v1"); + + versionPath.route(desc_.post("/validate")) + .bind(&Server::handleValidateTask, this) + .produces(MIME(Application, Json)) + .response(Http::Code::Ok, "Validate a task"); + + versionPath.route(desc_.post("/task/:runID/:taskName")) + .bind(&Server::handleRunTask, this) + .produces(MIME(Application, Json)) + .response(Http::Code::Ok, "Run a task"); + + versionPath.route(desc_.get("/task/:runID/:taskName")) + .bind(&Server::handleGetTask, this) + .produces(MIME(Application, Json)) + .response(Http::Code::Ok, + "Get the state and potentially the AttemptRecord of a task"); + + versionPath.route(desc_.del("/task/:runID/:taskName")) + .bind(&Server::handleStopTask, this) + .produces(MIME(Application, Json)) + .response(Http::Code::Ok, "Stop a task"); + + versionPath.route(desc_.get("/capacity")) + .bind(&Server::handleGetCapacity, this) + .produces(MIME(Application, Json)) + .response(Http::Code::Ok, "Get capacities of worker"); + } + + void Server::handleValidateTask(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + try { + auto task = taskFromJSON("sample_task", request.body()); + daggy::executors::task::daggy_runner::validateTaskParameters(task.job); + } + catch (std::exception &e) { + REQ_RESPONSE(Not_Acceptable, e.what()); + } + REQ_RESPONSE(Ok, "Task is valid"); + } + + void Server::handleRunTask(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + if (!handleAuth(request)) + return; + + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + + Capacity resourcesUsed; + Task task; + try { + task = taskFromJSON(taskName, request.body()); + resourcesUsed = capacityFromTask(task); + } + catch (std::exception &e) { + REQ_RESPONSE(Not_Acceptable, e.what()); + } + + { + std::lock_guard lock(capacityGuard_); + curCapacity_.cores -= resourcesUsed.cores; + curCapacity_.memoryMB -= resourcesUsed.memoryMB; + } + + { + std::lock_guard lock(pendingGuard_); + pending_.emplace( + std::make_pair(runID, taskName), + PendingJob{.fut = executor_.execute(runID, taskName, task), + .resourcesUsed = resourcesUsed}); + } + + response.send(Pistache::Http::Code::Ok, ""); + } + + void Server::handleGetTask(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + if (!handleAuth(request)) + return; + + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + + auto taskID = std::make_pair(runID, taskName); + + std::string payload; + + bool found = false; + { + std::lock_guard lock(pendingGuard_); + auto it = pending_.find(taskID); + if (it != pending_.end()) { + // poll it + if (it->second.fut.valid() and + it->second.fut.wait_for(1ms) == std::future_status::ready) { + auto attempt = it->second.fut.get(); + { + std::lock_guard rlock(resultsGuard_); + results_.emplace(taskID, attempt); + } + { + std::lock_guard rlock(capacityGuard_); + curCapacity_.cores += it->second.resourcesUsed.cores; + curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB; + } + } + else { + payload = R"({ "state": "RUNNING" })"; + found = true; + } + } + } + + if (!found) { + std::lock_guard lock(resultsGuard_); + auto it = results_.find(taskID); + if (it == results_.end()) { + REQ_RESPONSE(Not_Found, "No such task"); + } + + payload = R"({ "state": "COMPLETED", "attempt": )" + + attemptRecordToJSON(it->second) + "}"; + } + response.send(Pistache::Http::Code::Ok, payload); + } + + void Server::handleStopTask(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + if (!handleAuth(request)) + return; + + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + + executor_.stop(runID, taskName); + + REQ_RESPONSE(Ok, ""); + } + + void Server::handleGetCapacity(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + std::string payload; + { + std::lock_guard lock(capacityGuard_); + payload = R"({ "current": )" + capacityToJSON(curCapacity_) + + R"(, "total": )" + capacityToJSON(maxCapacity_) + "}"; + } + + response.send(Pistache::Http::Code::Ok, payload); + } + + void Server::handleReady(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + response.send(Pistache::Http::Code::Ok, R"({ "msg": "Ready for tasks!"})"); + } + + /* + * handleAuth will check any auth methods and handle any responses in the + * case of failed auth. If it returns false, callers should cease handling + * the response + */ + bool Server::handleAuth(const Pistache::Rest::Request &request) + { + return true; + } +} // namespace daggy::daggyr diff --git a/daggyr/tests/CMakeLists.txt b/daggyr/tests/CMakeLists.txt new file mode 100644 index 0000000..2adbef6 --- /dev/null +++ b/daggyr/tests/CMakeLists.txt @@ -0,0 +1,9 @@ +project(daggyr_tests) + +add_executable(${PROJECT_NAME} main.cpp + # unit tests + unit_server.cpp + ) +target_link_libraries(${PROJECT_NAME} libdaggyr libdaggy stdc++fs Catch2::Catch2) + +add_test(${PROJECT_NAME} ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}) diff --git a/daggyr/tests/main.cpp b/daggyr/tests/main.cpp new file mode 100644 index 0000000..4387fa6 --- /dev/null +++ b/daggyr/tests/main.cpp @@ -0,0 +1,15 @@ +#include + +#include "daggy/DAG.hpp" + +#define CATCH_CONFIG_MAIN + +#include + +TEST_CASE("Sanity tests", "[sanity]") +{ + REQUIRE(1 == 1); +} + +// compile and run +// g++ -std=c++17 -o test test.cpp && ./test diff --git a/daggyr/tests/unit_server.cpp b/daggyr/tests/unit_server.cpp new file mode 100644 index 0000000..4d598ee --- /dev/null +++ b/daggyr/tests/unit_server.cpp @@ -0,0 +1,172 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rj = rapidjson; + +using namespace daggy; + +TEST_CASE("rest_endpoint", "[server_basic]") +{ + std::stringstream ss; + Pistache::Address listenSpec("localhost", Pistache::Port(0)); + + const ssize_t maxCores = 10, maxMemoryMB = 1000; + + daggyr::Server server(listenSpec, maxCores, maxMemoryMB); + server.init(10); + server.start(); + + const std::string host = "localhost:"; + const std::string baseURL = host + std::to_string(server.getPort()); + + SECTION("Ready Endpoint") + { + auto response = HTTP_REQUEST(baseURL + "/ready"); + REQUIRE(response.code == HTTPCode::Ok); + } + + SECTION("Querying a non-existent task should yield a 404") + { + auto response = HTTP_REQUEST(baseURL + "/v1/task/100/sample_name"); + REQUIRE(response.code == HTTPCode::Not_Found); + } + + SECTION("Task Missing Cores should Fail") + { + std::string taskSpec = + R"({ "job": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ]}, "memoryMB": 100 })"; + + auto response = + HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); + REQUIRE(response.code == HTTPCode::Not_Acceptable); + } + + SECTION("Task Missing MemoryMB should Fail") + { + std::string taskSpec = + R"({ "job": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ]}, "cores": 100 })"; + + auto response = + HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); + REQUIRE(response.code == HTTPCode::Not_Acceptable); + } + + SECTION("Task submission and get result") + { + std::string taskSpec = + R"({ "job": { "command": [ "/usr/bin/echo", "hello", "world" ], "cores": "1", "memoryMB": "100" } })"; + + // Submit + { + auto response = + HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); + REQUIRE(response.code == HTTPCode::Ok); + } + + while (true) { + auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/task/0/sample_task"); + REQUIRE(doc.IsObject()); + REQUIRE(doc.HasMember("state")); + + std::string state = doc["state"].GetString(); + if (state != "COMPLETED") { + std::this_thread::sleep_for(250ms); + } + else { + REQUIRE(doc.HasMember("attempt")); + auto attempt = attemptRecordFromJSON(doc["attempt"]); + + REQUIRE(attempt.rc == 0); + REQUIRE(attempt.outputLog == "hello world\n"); + break; + } + } + } + + SECTION("Task capacity changes") + { + std::string taskSpec = + R"({ "job": { "command": [ "/usr/bin/sleep", "5" ], "cores": "1", "memoryMB": "100" } })"; + + auto getCapacity = [&]() -> daggy::executors::task::daggy_runner::Capacity { + daggy::executors::task::daggy_runner::Capacity cap; + auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/capacity"); + REQUIRE(doc.IsObject()); + REQUIRE(doc.HasMember("current")); + const auto &cur = doc["current"]; + REQUIRE(cur.IsObject()); + REQUIRE(cur.HasMember("cores")); + REQUIRE(cur.HasMember("memoryMB")); + + cap.cores = cur["cores"].GetInt64(); + cap.memoryMB = cur["memoryMB"].GetInt64(); + + return cap; + }; + + auto preCap = getCapacity(); + + // Submit + { + auto response = + HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); + REQUIRE(response.code == HTTPCode::Ok); + } + + auto postCap = getCapacity(); + + REQUIRE(postCap.cores == preCap.cores - 1); + REQUIRE(postCap.memoryMB == preCap.memoryMB - 100); + + // Ensure the current job is running + { + auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/task/0/sample_task"); + REQUIRE(doc.IsObject()); + REQUIRE(doc.HasMember("state")); + REQUIRE(doc["state"] != "COMPLETED"); + } + + // Stop it + { + auto [code, doc] = + JSON_HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", "", "DELETE"); + REQUIRE(code == HTTPCode::Ok); + } + + // Grab it and ensure it was killed + while (true) { + auto response = HTTP_REQUEST(baseURL + "/v1/task/0/sample_task"); + + REQUIRE(response.code == HTTPCode::Ok); + rj::Document doc; + daggy::checkRJParse(doc.Parse(response.body.c_str())); + REQUIRE(doc.IsObject()); + REQUIRE(doc.HasMember("state")); + + std::string state = doc["state"].GetString(); + if (state != "COMPLETED") { + std::this_thread::sleep_for(250ms); + } + else { + REQUIRE(doc.HasMember("attempt")); + auto attempt = attemptRecordFromJSON(doc["attempt"]); + + REQUIRE(attempt.rc != 0); + break; + } + } + } + + server.shutdown(); +} diff --git a/libdaggy/CMakeLists.txt b/libdaggy/CMakeLists.txt index ca0e602..4a34b4a 100644 --- a/libdaggy/CMakeLists.txt +++ b/libdaggy/CMakeLists.txt @@ -11,7 +11,7 @@ IF (DAGGY_ENABLE_REDIS) endif () target_include_directories(${PROJECT_NAME} PUBLIC include) -target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson better-enums) +target_link_libraries(${PROJECT_NAME} pistache curl pthread rapidjson better-enums) add_subdirectory(src) add_subdirectory(tests) diff --git a/libdaggy/include/daggy/Utilities.hpp b/libdaggy/include/daggy/Utilities.hpp index f852fb1..f3447dd 100644 --- a/libdaggy/include/daggy/Utilities.hpp +++ b/libdaggy/include/daggy/Utilities.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -12,6 +13,8 @@ #include "daggy/executors/task/TaskExecutor.hpp" #include "daggy/loggers/dag_run/DAGRunLogger.hpp" +namespace rj = rapidjson; + namespace daggy { using TaskDAG = DAG; @@ -40,4 +43,48 @@ namespace daggy { void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks); std::ostream &operator<<(std::ostream &os, const TimePoint &tp); + + // HTTP helpers + enum HTTPCode : long + { + Ok = 200, + Not_Found = 404, + Not_Acceptable = 406 + }; + + struct HTTPResponse + { + HTTPCode code; + std::string body; + }; + + HTTPResponse HTTP_REQUEST(const std::string &url, + const std::string &payload = "", + const std::string &method = "GET", + bool trace = false); + + std::pair JSON_HTTP_REQUEST( + const std::string &url, const std::string &payload = "", + const std::string &method = "GET", bool trace = false); } // namespace daggy + +template +void hash_combine(std::size_t &seed, T const &key) +{ + std::hash hasher; + seed ^= hasher(key) + 0x9e3779b9 + (seed << 6) + (seed >> 2); +} + +namespace std { + template + struct hash> + { + std::size_t operator()(std::pair const &p) const + { + std::size_t seed(0); + ::hash_combine(seed, p.first); + ::hash_combine(seed, p.second); + return seed; + } + }; +} // namespace std diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp new file mode 100644 index 0000000..1aed18a --- /dev/null +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include + +#include "TaskExecutor.hpp" + +namespace rj = rapidjson; + +namespace daggy::executors::task { + + namespace daggy_runner { + struct Capacity + { + ssize_t cores; + ssize_t memoryMB; + }; + + std::string capacityToJSON(const Capacity &cap); + Capacity capacityFromJSON(const rj::Value &spec); + Capacity capacityFromTask(const Task &task); + + void validateTaskParameters(const ConfigValues &job); + } // namespace daggy_runner + + class DaggyRunnerTaskExecutor : public TaskExecutor + { + public: + using Command = std::vector; + + DaggyRunnerTaskExecutor(); + ~DaggyRunnerTaskExecutor() override; + + // Validates the job to ensure that all required values are set and are of + // the right type, + bool validateTaskParameters(const ConfigValues &job) override; + + std::vector expandTaskParameters( + const ConfigValues &job, const ConfigValues &expansionValues) override; + + // Runs the task + std::future execute(DAGRunID runID, + const std::string &taskName, + const Task &task) override; + + bool stop(DAGRunID runID, const std::string &taskName) override; + + void addRunner(const std::string &url); + + private: + void monitor(); + + struct RunningTask + { + std::promise prom; + DAGRunID runID; + std::string taskName; + std::string runnerURL; + }; + + // Resolves jobs through polling + std::atomic running_; + std::thread monitorWorker_; + + std::unordered_set runners_; + std::mutex rtGuard_; + std::unordered_map, RunningTask> + runningTasks_; + }; +} // namespace daggy::executors::task diff --git a/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index 573c4fe..a142796 100644 --- a/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -5,6 +5,10 @@ #include "TaskExecutor.hpp" namespace daggy::executors::task { + namespace forking_executor { + void validateTaskParameters(const ConfigValues &job); + } + class ForkingTaskExecutor : public TaskExecutor { public: diff --git a/libdaggy/src/Serialization.cpp b/libdaggy/src/Serialization.cpp index 6a90173..0f6a673 100644 --- a/libdaggy/src/Serialization.cpp +++ b/libdaggy/src/Serialization.cpp @@ -153,6 +153,7 @@ namespace daggy { for (auto it = params.MemberBegin(); it != params.MemberEnd(); ++it) { if (!it->name.IsString()) throw std::runtime_error("job key must be a string."); + if (it->value.IsArray()) { std::vector values; for (size_t i = 0; i < it->value.Size(); ++i) { @@ -160,10 +161,13 @@ namespace daggy { } task.job.insert_or_assign(it->name.GetString(), values); } - else { + else if (it->value.IsString()) { task.job.insert_or_assign(it->name.GetString(), it->value.GetString()); } + else { + throw std::runtime_error("Value in parameters is not a string"); + } } } diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index b20cf36..894cc6d 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -5,6 +5,53 @@ using namespace std::chrono_literals; +static int http_trace(CURL *handle, curl_infotype type, char *data, size_t size, + void *userp) +{ + const char *text; + (void)handle; /* prevent compiler warning */ + (void)userp; + + switch (type) { + case CURLINFO_TEXT: + fprintf(stderr, "== Info: %s", data); + default: /* in case a new one is introduced to shock us */ + return 0; + + case CURLINFO_HEADER_OUT: + text = "=> Send header"; + break; + case CURLINFO_DATA_OUT: + text = "=> Send data"; + break; + case CURLINFO_SSL_DATA_OUT: + text = "=> Send SSL data"; + break; + case CURLINFO_HEADER_IN: + text = "<= Recv header"; + break; + case CURLINFO_DATA_IN: + text = "<= Recv data"; + break; + case CURLINFO_SSL_DATA_IN: + text = "<= Recv SSL data"; + break; + } + + std::cerr << "\n================== " << text + << " ==================" << std::endl + << data << std::endl; + return 0; +} + +uint curlWriter(char *in, uint size, uint nmemb, std::stringstream *out) +{ + uint r; + r = size * nmemb; + out->write(in, r); + return r; +} + namespace daggy { std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) @@ -168,4 +215,67 @@ namespace daggy { os << tp.time_since_epoch().count() << std::endl; return os; } + + HTTPResponse HTTP_REQUEST(const std::string &url, const std::string &payload, + const std::string &method, bool trace) + { + HTTPResponse response{.code = HTTPCode::Ok, .body = ""}; + + CURL *curl; + CURLcode res; + struct curl_slist *headers = NULL; + + curl_global_init(CURL_GLOBAL_ALL); + + curl = curl_easy_init(); + if (curl) { + std::stringstream buffer; + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); + + if (trace) { + curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); + curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + } + + if (!payload.empty()) { + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, payload.size()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload.c_str()); + headers = curl_slist_append(headers, "Content-Type: Application/Json"); + } + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str()); + headers = curl_slist_append(headers, "Expect:"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + res = curl_easy_perform(curl); + + if (res != CURLE_OK) { + curl_easy_cleanup(curl); + throw std::runtime_error(std::string{"CURL Failed: "} + + curl_easy_strerror(res)); + } + curl_easy_cleanup(curl); + + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code); + response.body = buffer.str(); + } + + curl_global_cleanup(); + + return response; + } + + std::pair JSON_HTTP_REQUEST( + const std::string &url, const std::string &payload, + const std::string &method, bool trace) + { + auto response = HTTP_REQUEST(url, payload, method); + + rj::Document doc; + checkRJParse(doc.Parse(response.body.c_str())); + return std::make_pair(response.code, std::move(doc)); + } + } // namespace daggy diff --git a/libdaggy/src/executors/task/CMakeLists.txt b/libdaggy/src/executors/task/CMakeLists.txt index 0b480d2..96a8ada 100644 --- a/libdaggy/src/executors/task/CMakeLists.txt +++ b/libdaggy/src/executors/task/CMakeLists.txt @@ -2,4 +2,5 @@ target_sources(${PROJECT_NAME} PRIVATE SlurmTaskExecutor.cpp NoopTaskExecutor.cpp ForkingTaskExecutor.cpp + DaggyRunnerTaskExecutor.cpp ) diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp new file mode 100644 index 0000000..c6fa415 --- /dev/null +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -0,0 +1,227 @@ +#include +#include +#include +#include +#include + +using namespace daggy::executors::task; +using namespace daggy::executors::task::daggy_runner; +using namespace daggy; + +namespace daggy::executors::task::daggy_runner { + std::string capacityToJSON(const Capacity &cap) + { + return R"({ "cores": )" + std::to_string(cap.cores) + R"(, "memoryMB": )" + + std::to_string(cap.memoryMB) + "}"; + } + + Capacity capacityFromJSON(const rj::Value &spec) + { + Capacity cap{.cores = 0, .memoryMB = 0}; + + if (!spec.IsObject()) { + throw std::runtime_error("Capacity is not an object"); + } + + if (spec.HasMember("cores")) { + if (!spec["cores"].IsNumber()) { + throw std::runtime_error("cores member of Capacity is not an integer"); + } + cap.cores = spec["cores"].GetInt64(); + } + + if (spec.HasMember("memoryMB")) { + if (!spec["memoryMB"].IsNumber()) { + throw std::runtime_error( + "memoryMB member of Capacity is not an integer"); + } + cap.memoryMB = spec["memoryMB"].GetInt64(); + } + + return cap; + } + + Capacity capacityFromTask(const Task &task) + { + Capacity cap{.cores = 0, .memoryMB = 0}; + + cap.cores = std::stoll(std::get(task.job.at("cores"))); + cap.memoryMB = std::stoll(std::get(task.job.at("memoryMB"))); + + return cap; + } + + void validateTaskParameters(const daggy::ConfigValues &job) + { + forking_executor::validateTaskParameters(job); + + const std::array fields{"cores", "memoryMB"}; + + for (const auto &field : fields) { + if (job.count(field) == 0) + throw std::runtime_error("Missing required job parameter " + field); + + const auto &val = job.at(field); + + if (!std::holds_alternative(val)) + throw std::runtime_error(field + " in capacity is not a string"); + + try { + std::stoll(std::get(val)); + } + catch (std::exception &e) { + throw std::runtime_error(field + " in capacity is not an integer"); + } + } + } +} // namespace daggy::executors::task::daggy_runner + +DaggyRunnerTaskExecutor::DaggyRunnerTaskExecutor() + : running_(true) + , monitorWorker_(&DaggyRunnerTaskExecutor::monitor, this) +{ +} + +DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor() +{ + running_ = false; + monitorWorker_.join(); +} + +// Validates the job to ensure that all required values are set and are of +// the right type, +bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job) +{ + daggy_runner::validateTaskParameters(job); + + return true; +} + +std::vector DaggyRunnerTaskExecutor::expandTaskParameters( + const ConfigValues &job, const ConfigValues &expansionValues) +{ + std::vector newValues; + + auto command = + (job.count("command") == 0 ? Command{} + : std::get(job.at("command"))); + + auto environment = (job.count("environment") == 0 + ? Command{} + : std::get(job.at("environment"))); + + Command both(command); + std::copy(environment.begin(), environment.end(), std::back_inserter(both)); + + for (const auto &parts : interpolateValues(both, expansionValues)) { + ConfigValues newCommand{job}; + newCommand["command"] = + Command(parts.begin(), parts.begin() + command.size()); + newCommand["environment"] = + Command(parts.begin() + command.size(), parts.end()); + newValues.emplace_back(newCommand); + } + + return newValues; +} + +// Runs the task +std::future DaggyRunnerTaskExecutor::execute( + DAGRunID runID, const std::string &taskName, const Task &task) +{ + auto taskUsed = capacityFromTask(task); + + // Get the capacities for all the runners + // Capacities for a runner can be negative, meaning that they're currently + // oversubscribed. + std::vector> impacts; + for (const auto &runner : runners_) { + try { + const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); + if (code != HTTPCode::Ok) { + continue; + } + + auto curCap = capacityFromJSON(doc["current"]); + auto totCap = capacityFromJSON(doc["total"]); + + ssize_t cores = curCap.cores < 0 ? totCap.cores : curCap.cores; + ssize_t memoryMB = + curCap.memoryMB < 0 ? totCap.memoryMB : curCap.memoryMB; + + double impact = + std::max(taskUsed.cores / cores, taskUsed.memoryMB / memoryMB); + impacts.emplace_back(runner, impact); + } + catch (const std::exception &_) { + continue; + } + } + + if (impacts.empty()) + throw std::runtime_error("No runners available for execution"); + + auto cit = impacts.begin(); + for (auto it = impacts.begin(); it != impacts.end(); ++it) { + if (it->second < cit->second) + cit = it; + } + + RunningTask rt{ + .prom{}, .runID = runID, .taskName = taskName, .runnerURL = cit->first}; + + auto fut = rt.prom.get_future(); + + std::lock_guard lock(rtGuard_); + runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt)); + + return fut; +} + +bool DaggyRunnerTaskExecutor::stop(DAGRunID runID, const std::string &taskName) +{ + return true; +} + +void DaggyRunnerTaskExecutor::addRunner(const std::string &url) +{ + runners_.insert(url); +} + +void DaggyRunnerTaskExecutor::monitor() +{ + while (running_) { + { + std::vector> resolvedJobs; + + std::lock_guard lock(rtGuard_); + for (auto &[taskID, task] : runningTasks_) { + try { + const auto &[code, json] = JSON_HTTP_REQUEST( + task.runnerURL + "/v1/task/" + std::to_string(taskID.first) + + "/" + taskID.second); + if (code != HTTPCode::Ok) { + AttemptRecord record{ + .rc = -1, .executorLog = "Unable to query runner for progress"}; + task.prom.set_value(std::move(record)); + resolvedJobs.emplace_back(taskID); + continue; + } + + if (json["state"] == "COMPLETED") { + task.prom.set_value(attemptRecordFromJSON(json["attempt"])); + resolvedJobs.emplace_back(taskID); + } + } + catch (std::runtime_error &e) { + continue; + } + + for (const auto &tid : resolvedJobs) { + runningTasks_.extract(tid); + } + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } +} diff --git a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp index 359d929..3c232e5 100644 --- a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp @@ -9,6 +9,30 @@ using namespace daggy::executors::task; +namespace daggy::executors::task::forking_executor { + void validateTaskParameters(const daggy::ConfigValues &job) + { + // command or commandString is required + if (job.count("command")) { + if (!std::holds_alternative(job.at("command"))) + throw std::runtime_error(R"(command must be an array of strings)"); + } + else { + if (job.count("commandString") == 0) { + throw std::runtime_error( + R"(command or commandString must be defined.)"); + } + if (!std::holds_alternative(job.at("commandString"))) + throw std::runtime_error(R"(commandString must be a string)"); + } + + if (job.count("environment")) { + if (!std::holds_alternative(job.at("environment"))) + throw std::runtime_error(R"(environment must be an array of strings)"); + } + } +} // namespace daggy::executors::task::forking_executor + std::string slurp(int fd) { std::string result; @@ -190,23 +214,7 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job) { - // command or commandString is required - if (job.count("command")) { - if (!std::holds_alternative(job.at("command"))) - throw std::runtime_error(R"(command must be an array of strings)"); - } - else { - if (job.count("commandString") == 0) { - throw std::runtime_error(R"(command or commandString must be defined.)"); - } - if (!std::holds_alternative(job.at("commandString"))) - throw std::runtime_error(R"(commandString must be a string)"); - } - - if (job.count("environment")) { - if (!std::holds_alternative(job.at("environment"))) - throw std::runtime_error(R"(environment must be an array of strings)"); - } + forking_executor::validateTaskParameters(job); return true; }