diff --git a/TODO.md b/TODO.md index dfe5618..b6fb763 100644 --- a/TODO.md +++ b/TODO.md @@ -4,11 +4,9 @@ Tasks - Open - REST Server - [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?) - - [ ] Flesh out server and interface - Core Functionality - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated list - - Add execution gates - Executors - [ ] Slurm Executor - Loggers @@ -17,12 +15,13 @@ Tasks - [ ] Add more error checking - [ ] General logger - [ ] Redis DAGRunLogger - -- Completed - - Core Functionality - - [X] Add ability to define child -> parent relationships - - [X] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, - and logger - - [X] Resume a failed DAG - - [X] Handle return on errored DAG / Task - - [X] Clearing a DAG Task + - Server + - [ ] Multiple execution pools + - [ ] per-Executor parameters + - Utilities + - daggyd + - [ ] Add config file support + - [ ] Support for all the different executors / state loggers + - daggyc + - [ ] Submission + - [ ] Querying diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 63dcfe9..9c73a70 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -22,7 +22,7 @@ namespace daggy { if (!it->name.IsString()) { throw std::runtime_error("All keys must be strings."); } - std::string name = std::string{"{{"} + it->name.GetString() + "}}"; + std::string name = it->name.GetString(); if (it->value.IsArray()) { std::vector values; for (size_t i = 0; i < it->value.Size(); ++i) { diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index d4b504d..541c81f 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -80,8 +80,6 @@ namespace daggy { .bind(&Server::handleGetDAGRuns, this) .produces(MIME(Application, Json), MIME(Application, Xml)) .response(Http::Code::Ok, "The list of all known DAG Runs"); - - } /* diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index ae96bad..d183273 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -21,7 +21,8 @@ namespace daggy { std::vector expandedPart{part}; // Find all values of parameters, and expand them - for (const auto &[param, paramValue]: parameters) { + for (const auto &[paramRaw, paramValue]: parameters) { + std::string param = "{{" + paramRaw + "}}"; auto pos = part.find(param); if (pos == std::string::npos) continue; std::vector newExpandedPart; diff --git a/tests/unit_serialization.cpp b/tests/unit_serialization.cpp index ab14631..dd11fb0 100644 --- a/tests/unit_serialization.cpp +++ b/tests/unit_serialization.cpp @@ -13,8 +13,8 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; auto params = daggy::parametersFromJSON(testParams); REQUIRE(params.size() == 2); - REQUIRE(std::holds_alternative>(params["{{DATE}}"])); - REQUIRE(std::holds_alternative(params["{{SOURCE}}"])); + REQUIRE(std::holds_alternative>(params["DATE"])); + REQUIRE(std::holds_alternative(params["SOURCE"])); }SECTION("Invalid JSON") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"}; REQUIRE_THROWS(daggy::parametersFromJSON(testParams)); @@ -66,4 +66,4 @@ TEST_CASE("Task Serialization", "[serialize_tasks]") { REQUIRE(task == other); } } -} \ No newline at end of file +} diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index dffd09d..fd2105e 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -1,5 +1,9 @@ #include #include +#include + +#include +#include #include @@ -11,6 +15,86 @@ // Add loggers here #include +/* +#include +#include +#include +#include +#include +*/ + +static std::atomic running{true}; + +void signalHandler(int signal) { + switch (signal) { + case SIGHUP: + break; + case SIGINT: + case SIGTERM: + running = false; + break; + } +} + +void daemonize() { + pid_t pid; + + struct sigaction newSigAction; + sigset_t newSigSet; + + /* Check if parent process id is set */ + if (getppid() == 1) { return; } + + /* Set signal mask - signals we want to block */ + sigemptyset(&newSigSet); + sigaddset(&newSigSet, SIGCHLD); /* ignore child - i.e. we don't need to wait for it */ + sigaddset(&newSigSet, SIGTSTP); /* ignore Tty stop signals */ + sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */ + sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */ + sigprocmask(SIG_BLOCK, &newSigSet, NULL); /* Block the above specified signals */ + + /* Set up a signal handler */ + newSigAction.sa_handler = signalHandler; + sigemptyset(&newSigAction.sa_mask); + newSigAction.sa_flags = 0; + + /* Signals to handle */ + sigaction(SIGHUP, &newSigAction, NULL); /* catch hangup signal */ + sigaction(SIGTERM, &newSigAction, NULL); /* catch term signal */ + sigaction(SIGINT, &newSigAction, NULL); /* catch interrupt signal */ + + // Fork once + pid = fork(); + if (pid < 0) { exit(EXIT_FAILURE); } + if (pid > 0) { exit(EXIT_SUCCESS); } + + /* On success: The child process becomes session leader */ + if (setsid() < 0) { + std::cerr << "Unable to setsid" << std::endl; + exit(EXIT_FAILURE); + } + + /* Catch, ignore and handle signals */ + signal(SIGCHLD, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + /* Fork off for the second time*/ + pid = fork(); + if (pid < 0) + exit(EXIT_FAILURE); + if (pid > 0) + exit(EXIT_SUCCESS); + + umask(0); + + /* Change the working directory to the root directory */ + /* or another appropriated directory */ + chdir("/"); + + /* Close all open file descriptors */ + for (auto x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { close(x); } +} + int main(int argc, char **argv) { argparse::ArgumentParser args("Daggy"); @@ -22,25 +106,25 @@ int main(int argc, char **argv) { .implicit_value(true); args.add_argument("--ip") .help("IP address to listen to") - .default_value("localhost"); + .default_value(std::string{"127.0.0.1"}); args.add_argument("--log-file") .help("File to log to.") - .default_value("daggyd.log"); + .default_value(std::string{"daggyd.log"}); args.add_argument("--port") .help("Port to listen to") .default_value(2503) .action([](const std::string &value) { return std::stoi(value); }); args.add_argument("--dag-threads") .help("Number of DAGs to run concurrently") - .default_value(10) + .default_value(10UL) .action([](const std::string &value) { return std::stoull(value); }); args.add_argument("--web-threads") .help("Number of web requests to support concurrently") - .default_value(30) + .default_value(30UL) .action([](const std::string &value) { return std::stoull(value); }); args.add_argument("--executor-threads") .help("Number of tasks to run concurrently") - .default_value(30) + .default_value(30UL) .action([](const std::string &value) { return std::stoull(value); }); try { @@ -51,16 +135,60 @@ int main(int argc, char **argv) { exit(1); } - std::ofstream logFile(args.get("--log-file"), std::ios::app); - daggy::loggers::dag_run::OStreamLogger logger(logFile); - daggy::executors::task::ForkingTaskExecutor executor(args.get("--executor-threads")); - Pistache::Address listenSpec(args.get("--ip"), args.get("--port")); + bool verbose = args.get("--verbose"); + bool asDaemon = args.get("--daemon"); + std::string logFileName = args.get("--log-file"); + std::string listenIP = args.get("--ip"); + uint16_t listenPort = args.get("--port"); + size_t executorThreads = args.get("--executor-threads"); + size_t webThreads = args.get("--web-threads"); + size_t dagThreads = args.get("--dag-threads"); - daggy::Server server(listenSpec, logger, executor, args.get("--dag-threads")); - server.init(args.get("--web-threads")); + if (logFileName == "-") { + if (asDaemon) { + std::cout << "Unable to daemonize if logging to stdout" << std::endl; + exit(1); + } + } else { + fs::path logFn{logFileName}; + if (!logFn.is_absolute()) { + logFileName = (fs::current_path() / logFileName).string(); + } + } + + if (verbose) { + std::cout << "Server running at http://" << listenIP << ':' << listenPort << std::endl + << "Max DAG Processing: " << dagThreads << std::endl + << "Max Task Execution: " << executorThreads << std::endl + << "Max Web Clients: " << webThreads << std::endl + << "Logging to: " << logFileName << std::endl + << std::endl << "Ctrl-C to exit" << std::endl; + } + + if (asDaemon) { + daemonize(); + } + + std::ofstream logFH; + std::unique_ptr logger; + if (logFileName == "-") { + logger = std::make_unique(std::cout); + } else { + logFH.open(logFileName, std::ios::app); + logger = std::make_unique(logFH); + } + + daggy::executors::task::ForkingTaskExecutor executor(executorThreads); + Pistache::Address listenSpec(listenIP, listenPort); + + daggy::Server server(listenSpec, *logger, executor, dagThreads); + server.init(webThreads); server.start(); - std::cout << "Server running at http://localhost:2503, Ctrl-C to exit" << std::endl; - while (true) { + + + running = true; + while (running) { std::this_thread::sleep_for(std::chrono::seconds(30)); } -} + server.shutdown(); +} \ No newline at end of file