From 612bc8af8a1510b335cd60707b5afae11f01fde6 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 18 Oct 2021 12:12:04 -0300 Subject: [PATCH] Adding config file supoprt for daggyd --- utils/daggyd/daggyd.cpp | 185 +++++++++++++++++++++++----------------- 1 file changed, 107 insertions(+), 78 deletions(-) diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index 8c86bbe..f0c77a9 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -1,32 +1,26 @@ +#include #include #include #include #include +#include #include #include #include // Add executors here -#ifdef DAGGY_ENABLE_SLURM -#include -#else #include -#endif +#include // Add loggers here #include -#ifdef DAGGY_ENABLE_REDIS #include -#endif -/* -#include -#include -#include -#include -#include -*/ +#include "daggy/executors/task/TaskExecutor.hpp" +#include "daggy/loggers/dag_run/DAGRunLogger.hpp" + +namespace rj = rapidjson; static std::atomic running{true}; @@ -115,6 +109,78 @@ void daemonize() } } +namespace dl = daggy::loggers::dag_run; + +std::unique_ptr loggerFactory(const rj::Value &config) +{ + if (config.HasMember("logger")) { + const auto &logConf = config["logger"]; + if (!logConf.IsObject()) + throw std::runtime_error("logger config is not an object"); + if (!logConf.HasMember("name")) + throw std::runtime_error("logger config is missing logger name"); + if (!logConf.HasMember("config")) + throw std::runtime_error("logger config is missing logger config"); + + std::string name = logConf["name"].GetString(); + const auto &logConfig = logConf["config"]; + if (name == "OStreamLogger") { + if (logConfig.HasMember("file")) { + std::ofstream ofh(logConfig["file"].GetString()); + return std::make_unique(ofh); + } + } + else if (name == "RedisLogger") { + std::string host = "localhost"; + uint16_t port = 6379; + std::string prefix = "daggy"; + + if (logConfig.HasMember("prefix")) + prefix = logConfig["prefix"].GetString(); + if (logConfig.HasMember("host")) + host = logConfig["host"].GetString(); + if (logConfig.HasMember("port")) + port = logConfig["port"].GetInt(); + + return std::make_unique(prefix, host, port); + } + else + throw std::runtime_error("Unknown logger type: " + name); + } + return std::make_unique(std::cout); +} + +namespace de = daggy::executors::task; + +std::unique_ptr executorFactory(const rj::Value &config) +{ + if (config.HasMember("executor")) { + const auto &execConf = config["executor"]; + if (!execConf.IsObject()) + throw std::runtime_error("Executor config is not an object"); + if (!execConf.HasMember("name")) + throw std::runtime_error("Executor config is missing execger name"); + if (!execConf.HasMember("config")) + throw std::runtime_error("Executor config is missing config"); + std::string name = execConf["name"].GetString(); + const auto &execConfig = execConf["config"]; + + if (name == "ForkingTaskExecutor") { + size_t threads = 10; + if (execConfig.HasMember("threads")) + threads = execConfig["threads"].GetInt64(); + return std::make_unique(threads); + } + else if (name == "SlurmTaskExecutor") { + return std::make_unique(); + } + else + throw std::runtime_error("Unknown executor type: " + name); + } + + return std::make_unique(10); +} + int main(int argc, char **argv) { argparse::ArgumentParser args("Daggy"); @@ -123,29 +189,7 @@ int main(int argc, char **argv) .default_value(false) .implicit_value(true); args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); - args.add_argument("--ip") - .help("IP address to listen to") - .default_value(std::string{"127.0.0.1"}); - args.add_argument("--log-file") - .help("File to log to.") - .default_value(std::string{"daggyd.log"}); - args.add_argument("--port") - .help("Port to listen to") - .default_value(2503) - .action([](const std::string &value) { return std::stoi(value); }); - args.add_argument("--dag-threads") - .help("Number of DAGs to run concurrently") - .default_value(10UL) - .action([](const std::string &value) { return std::stoull(value); }); - args.add_argument("--web-threads") - .help("Number of web requests to support concurrently") - .default_value(30UL) - .action([](const std::string &value) { return std::stoull(value); }); - args.add_argument("--executor-threads") - .help("Number of tasks to run concurrently") - .default_value(30UL) - .action([](const std::string &value) { return std::stoull(value); }); - + args.add_argument("--config"); try { args.parse_args(argc, argv); } @@ -155,35 +199,37 @@ int main(int argc, char **argv) exit(1); } - bool verbose = args.get("--verbose"); - bool asDaemon = args.get("--daemon"); - auto logFileName = args.get("--log-file"); - auto listenIP = args.get("--ip"); - auto listenPort = args.get("--port"); - auto executorThreads = args.get("--executor-threads"); - auto webThreads = args.get("--web-threads"); - auto dagThreads = args.get("--dag-threads"); + bool verbose = args.get("--verbose"); + bool asDaemon = args.get("--daemon"); + auto configFile = args.get("--config"); - if (logFileName == "-") { - if (asDaemon) { - std::cout << "Unable to daemonize if logging to stdout" << std::endl; - exit(1); - } - } - else { - fs::path logFn{logFileName}; - if (!logFn.is_absolute()) { - logFileName = (fs::current_path() / logFileName).string(); - } - } + std::ifstream ifh(configFile); + std::string config; + std::getline(ifh, config, '\0'); + ifh.close(); + + rj::Document doc; + daggy::checkRJParse(doc.Parse(config.c_str())); + + std::string listenIP = "127.0.0.1"; + int listenPort = 2503; + size_t webThreads = 50; + size_t dagThreads = 50; + + if (doc.HasMember("ip")) + listenIP = doc["ip"].GetString(); + if (doc.HasMember("port")) + listenPort = doc["port"].GetInt(); + if (doc.HasMember("web-threads")) + webThreads = doc["web-threads"].GetInt64(); + if (doc.HasMember("dag-threads")) + dagThreads = doc["dag-threads"].GetInt64(); if (verbose) { std::cout << "Server running at http://" << listenIP << ':' << listenPort << std::endl << "Max DAG Processing: " << dagThreads << std::endl - << "Max Task Execution: " << executorThreads << std::endl << "Max Web Clients: " << webThreads << std::endl - << "Logging to: " << logFileName << std::endl << std::endl << "Ctrl-C to exit" << std::endl; } @@ -192,29 +238,12 @@ int main(int argc, char **argv) daemonize(); } - std::ofstream logFH; - std::unique_ptr logger; - if (logFileName == "-") { - logger = -#ifdef DAGGY_ENABLE_REDIS - std::make_unique(); -#else - std::make_unique(std::cout); -#endif - } - else { - logFH.open(logFileName, std::ios::app); - logger = std::make_unique(logFH); - } + auto logger = loggerFactory(doc); + auto executor = executorFactory(doc); -#ifdef DAGGY_ENABLE_SLURM - daggy::executors::task::SlurmTaskExecutor executor; -#else - daggy::executors::task::ForkingTaskExecutor executor(executorThreads); -#endif Pistache::Address listenSpec(listenIP, listenPort); - daggy::Server server(listenSpec, *logger, executor, dagThreads); + daggy::Server server(listenSpec, *logger, *executor, dagThreads); server.init(webThreads); server.start();