Files
daggy/daggyd/daggyd/daggyd.cpp
2025-05-31 10:13:09 -03:00

357 lines
11 KiB
C++

#include <rapidjson/document.h>
#include <sys/stat.h>
#include <argparse.hpp>
#include <atomic>
#include <csignal>
#include <daggy/Serialization.hpp>
#include <daggyd/Server.hpp>
#include <fstream>
#include <iostream>
// Add executors here
#include <daggy/executors/task/DaggyRunnerTaskExecutor.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/executors/task/SSHTaskExecutor.hpp>
#ifdef DAGGY_ENABLE_SLURM
#include <daggy/executors/task/SlurmTaskExecutor.hpp>
#endif
// Add loggers here
#include <daggy/executors/task/TaskExecutor.hpp>
#include <daggy/loggers/dag_run/DAGRunLogger.hpp>
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
#ifdef DAGGY_ENABLE_REDIS
#include <daggy/loggers/dag_run/RedisLogger.hpp>
#endif
namespace rj = rapidjson;
static std::atomic<bool> running{true};
void signalHandler(int signal)
{
switch (signal) {
case SIGHUP:
break;
case SIGINT:
case SIGTERM:
running = false;
break;
default:
break;
}
}
void daemonize()
{
pid_t pid;
struct sigaction newSigAction;
sigset_t newSigSet;
/* Check if parent process id is set */
if (getppid() == 1) {
return;
}
/* Set signal mask - signals we want to block */
sigemptyset(&newSigSet);
sigaddset(&newSigSet,
SIGCHLD); /* ignore child - i.e. we don't need to wait for it */
sigaddset(&newSigSet, SIGTSTP); /* ignore Tty stop signals */
sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */
sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */
sigprocmask(SIG_BLOCK, &newSigSet,
nullptr); /* Block the above specified signals */
/* Set up a signal handler */
newSigAction.sa_handler = signalHandler;
sigemptyset(&newSigAction.sa_mask);
newSigAction.sa_flags = 0;
/* Signals to handle */
sigaction(SIGHUP, &newSigAction, nullptr); /* catch hangup signal */
sigaction(SIGTERM, &newSigAction, nullptr); /* catch term signal */
sigaction(SIGINT, &newSigAction, nullptr); /* catch interrupt signal */
// Fork once
pid = fork();
if (pid < 0) {
exit(EXIT_FAILURE);
}
if (pid > 0) {
exit(EXIT_SUCCESS);
}
/* On success: The child process becomes session leader */
if (setsid() < 0) {
std::cerr << "Unable to setsid" << std::endl;
exit(EXIT_FAILURE);
}
/* Catch, ignore and handle signals */
signal(SIGCHLD, SIG_IGN);
signal(SIGHUP, SIG_IGN);
/* Fork off for the second time*/
pid = fork();
if (pid < 0)
exit(EXIT_FAILURE);
if (pid > 0)
exit(EXIT_SUCCESS);
umask(0);
/* Change the working directory to the root directory */
/* or another appropriated directory */
auto rc = chdir("/");
(void)rc;
/* Close all open file descriptors */
for (int x = sysconf(_SC_OPEN_MAX); x >= 0; x--) {
close(x);
}
}
namespace dl = daggy::loggers::dag_run;
std::ofstream ofh;
std::unique_ptr<dl::DAGRunLogger> loggerFactory(const rj::Value &config)
{
if (!config.HasMember("logger"))
return std::make_unique<dl::OStreamLogger>(std::cout);
const auto &logConf = config["logger"];
if (!logConf.IsObject())
throw std::runtime_error("logger config is not an object");
if (!logConf.HasMember("name"))
throw std::runtime_error("logger config is missing logger name");
if (!logConf.HasMember("config"))
throw std::runtime_error("logger config is missing logger config");
std::string name = logConf["name"].GetString();
const auto &logConfig = logConf["config"];
if (name == "OStreamLogger") {
if (logConfig.HasMember("file")) {
std::string fn = logConfig["file"].GetString();
if (fn == "-")
return std::make_unique<dl::OStreamLogger>(std::cout);
ofh.open(logConfig["file"].GetString());
return std::make_unique<dl::OStreamLogger>(ofh);
}
}
#ifdef DAGGY_ENABLE_REDIS
else if (name == "RedisLogger") {
std::string host = "localhost";
uint16_t port = 6379;
std::string prefix = "daggy";
if (logConfig.HasMember("prefix"))
prefix = logConfig["prefix"].GetString();
if (logConfig.HasMember("host"))
host = logConfig["host"].GetString();
if (logConfig.HasMember("port"))
port = logConfig["port"].GetInt();
return std::make_unique<dl::RedisLogger>(prefix, host, port);
}
#endif
throw std::runtime_error("Unknown logger type: " + name);
}
namespace de = daggy::executors::task;
std::unique_ptr<de::TaskExecutor> executorFactory(const rj::Value &config)
{
if (!config.HasMember("executor"))
return std::make_unique<de::ForkingTaskExecutor>(10);
const auto &execConf = config["executor"];
if (!execConf.IsObject())
throw std::runtime_error("Executor config is not an object");
if (!execConf.HasMember("name"))
throw std::runtime_error("Executor config is missing name");
if (!execConf.HasMember("config"))
throw std::runtime_error("Executor config is missing config");
std::string name = execConf["name"].GetString();
const auto &execConfig = execConf["config"];
if (name == "ForkingTaskExecutor") {
size_t threads = 10;
if (execConfig.HasMember("threads"))
threads = execConfig["threads"].GetInt64();
return std::make_unique<de::ForkingTaskExecutor>(threads);
}
#ifdef DAGGY_ENABLE_SLURM
else if (name == "SlurmTaskExecutor") {
return std::make_unique<de::SlurmTaskExecutor>();
}
#endif
else if (name == "DaggyRunnerTaskExecutor") {
if (!execConfig.HasMember("runners"))
throw std::runtime_error(
"DaggyRunnerExecutor config needs at least one remote runner");
auto exe = std::make_unique<de::DaggyRunnerTaskExecutor>();
const auto &runners = execConfig["runners"];
if (!runners.IsArray())
throw std::runtime_error(
"DaggyRunnerExecutor runners must be an array of urls");
for (size_t i = 0; i < runners.Size(); ++i) {
if (!runners[i].IsString())
throw std::runtime_error(
"DaggyRunnerExecutor runners must be an array of urls");
exe->addRunner(runners[i].GetString());
}
return exe;
}
else if (name == "SSHTaskExecutor") {
if (!execConfig.HasMember("hosts"))
throw std::runtime_error(
"SSHTaskExecutor config needs at least one host");
std::unordered_map<std::string,
daggy::executors::task::SSHTaskExecutor::RemoteHost>
remoteHosts;
const auto &hosts = execConfig["hosts"];
if (!hosts.IsObject())
throw std::runtime_error(
"SSHTaskExecutor hosts must be a dictionary of host => {cores, "
"memoryMB}");
for (auto it = hosts.MemberBegin(); it != hosts.MemberEnd(); ++it) {
if (!it->name.IsString())
throw std::runtime_error("Hostnames names must be a string.");
if (!it->value.IsObject())
throw std::runtime_error("Hostname definitions must be an object.");
const std::string hostName = it->name.GetString();
const auto &caps = it->value.GetObject();
if (!caps.HasMember("cores"))
throw std::runtime_error("Host " + hostName +
" is missing cores count.");
if (!caps.HasMember("memoryMB"))
throw std::runtime_error("Host " + hostName +
" is missing memoryMB size.");
size_t cores = caps["cores"].GetInt64();
size_t mem = caps["memoryMB"].GetInt64();
remoteHosts.emplace(hostName,
daggy::executors::task::SSHTaskExecutor::RemoteHost{
.cores = cores, .memoryMB = mem});
}
auto exe = std::make_unique<de::SSHTaskExecutor>(remoteHosts);
return exe;
}
throw std::runtime_error("Unknown executor type: >>" + name + "<<");
}
int main(int argc, char **argv)
{
curl_global_init(CURL_GLOBAL_ALL);
argparse::ArgumentParser args("Daggy");
args.add_argument("-v", "--verbose")
.default_value(false)
.implicit_value(true);
args.add_argument("-d", "--daemon").default_value(false).implicit_value(true);
args.add_argument("--assets-dir").default_value(std::string{});
args.add_argument("--config").default_value(std::string{});
args.add_argument("--ip").default_value(std::string{"127.0.0.1"});
args.add_argument("--port").default_value(2503u).action(
[](const std::string &value) -> unsigned { return std::stoul(value); });
args.add_argument("--jwt-secret").default_value(std::string{});
try {
args.parse_args(argc, argv);
}
catch (std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
std::cout << args;
exit(1);
}
bool verbose = args.get<bool>("--verbose");
bool asDaemon = args.get<bool>("--daemon");
auto configFile = args.get<std::string>("--config");
auto staticAssetsDir = args.get<std::string>("--assets-dir");
std::string listenIP = args.get<std::string>("--ip");
auto listenPort = args.get<unsigned>("--port");
auto jwtSecret = args.get<std::string>("--jwt-secret");
size_t webThreads = 50;
size_t dagThreads = 50;
rj::Document doc;
if (!configFile.empty()) {
if (!fs::exists(configFile)) {
std::cerr << "No such file " << configFile << std::endl;
exit(1);
}
std::ifstream ifh(configFile);
std::string config;
std::getline(ifh, config, '\0');
ifh.close();
daggy::checkRJParse(doc.Parse(config.c_str()));
if (doc.HasMember("ip"))
listenIP = doc["ip"].GetString();
if (doc.HasMember("assets-dir"))
staticAssetsDir = doc["assets-dir"].GetString();
if (doc.HasMember("port"))
listenPort = doc["port"].GetInt();
if (doc.HasMember("web-threads"))
webThreads = doc["web-threads"].GetInt64();
if (doc.HasMember("dag-threads"))
dagThreads = doc["dag-threads"].GetInt64();
if (doc.HasMember("jwt-secret"))
jwtSecret = doc["jwt-secret"].GetString();
}
else {
doc.SetObject();
}
auto logger = loggerFactory(doc);
auto executor = executorFactory(doc);
if (verbose) {
std::cout << "Server running at http://" << listenIP << ':' << listenPort
<< std::endl
<< "Max DAG Processing: " << dagThreads << std::endl
<< "Max Web Clients: " << webThreads << std::endl
<< "Executor: " << executor->description() << std::endl
<< std::endl
<< "Ctrl-C to exit" << std::endl;
}
if (asDaemon) {
daemonize();
}
Pistache::Address listenSpec(listenIP, listenPort);
daggy::daggyd::Server server(listenSpec, *logger, *executor, dagThreads,
staticAssetsDir);
if (!jwtSecret.empty()) {
server.setJWTSecret(jwtSecret);
}
server.init(webThreads);
server.start();
running = true;
while (running) {
std::this_thread::sleep_for(std::chrono::seconds(30));
}
server.shutdown();
curl_global_cleanup();
}