#include #include #include #include #include #include #include #include #include // Add executors here #include #include // Add loggers here #include #include #include "daggy/executors/task/TaskExecutor.hpp" #include "daggy/loggers/dag_run/DAGRunLogger.hpp" namespace rj = rapidjson; static std::atomic running{true}; void signalHandler(int signal) { switch (signal) { case SIGHUP: break; case SIGINT: case SIGTERM: running = false; break; default: break; } } void daemonize() { pid_t pid; struct sigaction newSigAction; sigset_t newSigSet; /* Check if parent process id is set */ if (getppid() == 1) { return; } /* Set signal mask - signals we want to block */ sigemptyset(&newSigSet); sigaddset(&newSigSet, SIGCHLD); /* ignore child - i.e. we don't need to wait for it */ sigaddset(&newSigSet, SIGTSTP); /* ignore Tty stop signals */ sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */ sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */ sigprocmask(SIG_BLOCK, &newSigSet, nullptr); /* Block the above specified signals */ /* Set up a signal handler */ newSigAction.sa_handler = signalHandler; sigemptyset(&newSigAction.sa_mask); newSigAction.sa_flags = 0; /* Signals to handle */ sigaction(SIGHUP, &newSigAction, nullptr); /* catch hangup signal */ sigaction(SIGTERM, &newSigAction, nullptr); /* catch term signal */ sigaction(SIGINT, &newSigAction, nullptr); /* catch interrupt signal */ // Fork once pid = fork(); if (pid < 0) { exit(EXIT_FAILURE); } if (pid > 0) { exit(EXIT_SUCCESS); } /* On success: The child process becomes session leader */ if (setsid() < 0) { std::cerr << "Unable to setsid" << std::endl; exit(EXIT_FAILURE); } /* Catch, ignore and handle signals */ signal(SIGCHLD, SIG_IGN); signal(SIGHUP, SIG_IGN); /* Fork off for the second time*/ pid = fork(); if (pid < 0) exit(EXIT_FAILURE); if (pid > 0) exit(EXIT_SUCCESS); umask(0); /* Change the working directory to the root directory */ /* or another appropriated directory */ auto rc = chdir("/"); (void)rc; /* Close all open file descriptors */ for (int x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { close(x); } } namespace dl = daggy::loggers::dag_run; std::unique_ptr loggerFactory(const rj::Value &config) { if (config.HasMember("logger")) { const auto &logConf = config["logger"]; if (!logConf.IsObject()) throw std::runtime_error("logger config is not an object"); if (!logConf.HasMember("name")) throw std::runtime_error("logger config is missing logger name"); if (!logConf.HasMember("config")) throw std::runtime_error("logger config is missing logger config"); std::string name = logConf["name"].GetString(); const auto &logConfig = logConf["config"]; if (name == "OStreamLogger") { if (logConfig.HasMember("file")) { std::ofstream ofh(logConfig["file"].GetString()); return std::make_unique(ofh); } } else if (name == "RedisLogger") { std::string host = "localhost"; uint16_t port = 6379; std::string prefix = "daggy"; if (logConfig.HasMember("prefix")) prefix = logConfig["prefix"].GetString(); if (logConfig.HasMember("host")) host = logConfig["host"].GetString(); if (logConfig.HasMember("port")) port = logConfig["port"].GetInt(); return std::make_unique(prefix, host, port); } else throw std::runtime_error("Unknown logger type: " + name); } return std::make_unique(std::cout); } namespace de = daggy::executors::task; std::unique_ptr executorFactory(const rj::Value &config) { if (config.HasMember("executor")) { const auto &execConf = config["executor"]; if (!execConf.IsObject()) throw std::runtime_error("Executor config is not an object"); if (!execConf.HasMember("name")) throw std::runtime_error("Executor config is missing execger name"); if (!execConf.HasMember("config")) throw std::runtime_error("Executor config is missing config"); std::string name = execConf["name"].GetString(); const auto &execConfig = execConf["config"]; if (name == "ForkingTaskExecutor") { size_t threads = 10; if (execConfig.HasMember("threads")) threads = execConfig["threads"].GetInt64(); return std::make_unique(threads); } else if (name == "SlurmTaskExecutor") { return std::make_unique(); } else throw std::runtime_error("Unknown executor type: " + name); } return std::make_unique(10); } int main(int argc, char **argv) { argparse::ArgumentParser args("Daggy"); args.add_argument("-v", "--verbose") .default_value(false) .implicit_value(true); args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); args.add_argument("--config"); try { args.parse_args(argc, argv); } catch (std::exception &e) { std::cout << "Error: " << e.what() << std::endl; std::cout << args; exit(1); } bool verbose = args.get("--verbose"); bool asDaemon = args.get("--daemon"); auto configFile = args.get("--config"); std::ifstream ifh(configFile); std::string config; std::getline(ifh, config, '\0'); ifh.close(); rj::Document doc; daggy::checkRJParse(doc.Parse(config.c_str())); std::string listenIP = "127.0.0.1"; int listenPort = 2503; size_t webThreads = 50; size_t dagThreads = 50; if (doc.HasMember("ip")) listenIP = doc["ip"].GetString(); if (doc.HasMember("port")) listenPort = doc["port"].GetInt(); if (doc.HasMember("web-threads")) webThreads = doc["web-threads"].GetInt64(); if (doc.HasMember("dag-threads")) dagThreads = doc["dag-threads"].GetInt64(); if (verbose) { std::cout << "Server running at http://" << listenIP << ':' << listenPort << std::endl << "Max DAG Processing: " << dagThreads << std::endl << "Max Web Clients: " << webThreads << std::endl << std::endl << "Ctrl-C to exit" << std::endl; } if (asDaemon) { daemonize(); } auto logger = loggerFactory(doc); auto executor = executorFactory(doc); Pistache::Address listenSpec(listenIP, listenPort); daggy::Server server(listenSpec, *logger, *executor, dagThreads); server.init(webThreads); server.start(); running = true; while (running) { std::this_thread::sleep_for(std::chrono::seconds(30)); } server.shutdown(); }