Files
daggy/daggyr/daggyr/daggyr.cpp
2021-12-21 17:23:58 -04:00

196 lines
5.1 KiB
C++

#include <rapidjson/document.h>
#include <sys/stat.h>
#include <sys/sysinfo.h>
#include <argparse.hpp>
#include <atomic>
#include <csignal>
#include <daggy/Serialization.hpp>
#include <daggyr/Server.hpp>
#include <fstream>
#include <iostream>
// Add executors here
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/executors/task/SlurmTaskExecutor.hpp>
// Add loggers here
#include <daggy/executors/task/TaskExecutor.hpp>
#include <daggy/loggers/dag_run/DAGRunLogger.hpp>
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
#include <daggy/loggers/dag_run/RedisLogger.hpp>
namespace rj = rapidjson;
static std::atomic<bool> running{true};
void signalHandler(int signal)
{
switch (signal) {
case SIGHUP:
break;
case SIGINT:
case SIGTERM:
running = false;
break;
default:
break;
}
}
void daemonize()
{
pid_t pid;
struct sigaction newSigAction;
sigset_t newSigSet;
/* Check if parent process id is set */
if (getppid() == 1) {
return;
}
/* Set signal mask - signals we want to block */
sigemptyset(&newSigSet);
sigaddset(&newSigSet,
SIGCHLD); /* ignore child - i.e. we don't need to wait for it */
sigaddset(&newSigSet, SIGTSTP); /* ignore Tty stop signals */
sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */
sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */
sigprocmask(SIG_BLOCK, &newSigSet,
nullptr); /* Block the above specified signals */
/* Set up a signal handler */
newSigAction.sa_handler = signalHandler;
sigemptyset(&newSigAction.sa_mask);
newSigAction.sa_flags = 0;
/* Signals to handle */
sigaction(SIGHUP, &newSigAction, nullptr); /* catch hangup signal */
sigaction(SIGTERM, &newSigAction, nullptr); /* catch term signal */
sigaction(SIGINT, &newSigAction, nullptr); /* catch interrupt signal */
// Fork once
pid = fork();
if (pid < 0) {
exit(EXIT_FAILURE);
}
if (pid > 0) {
exit(EXIT_SUCCESS);
}
/* On success: The child process becomes session leader */
if (setsid() < 0) {
std::cerr << "Unable to setsid" << std::endl;
exit(EXIT_FAILURE);
}
/* Catch, ignore and handle signals */
signal(SIGCHLD, SIG_IGN);
signal(SIGHUP, SIG_IGN);
/* Fork off for the second time*/
pid = fork();
if (pid < 0)
exit(EXIT_FAILURE);
if (pid > 0)
exit(EXIT_SUCCESS);
umask(0);
/* Change the working directory to the root directory */
/* or another appropriated directory */
auto rc = chdir("/");
(void)rc;
/* Close all open file descriptors */
for (int x = sysconf(_SC_OPEN_MAX); x >= 0; x--) {
close(x);
}
}
int main(int argc, char **argv)
{
argparse::ArgumentParser args("Daggy");
args.add_argument("-v", "--verbose")
.default_value(false)
.implicit_value(true);
args.add_argument("-d", "--daemon").default_value(false).implicit_value(true);
args.add_argument("--config").default_value(std::string{});
args.add_argument("--ip").default_value(std::string{"127.0.0.1"});
args.add_argument("--port").default_value(int{2504});
try {
args.parse_args(argc, argv);
}
catch (std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
std::cout << args;
exit(1);
}
struct sysinfo systemInfo;
sysinfo(&systemInfo);
bool verbose = args.get<bool>("--verbose");
bool asDaemon = args.get<bool>("--daemon");
auto configFile = args.get<std::string>("--config");
std::string listenIP = args.get<std::string>("--ip");
int listenPort = args.get<int>("--port");
size_t webThreads = 50;
ssize_t maxCores = std::max(1U, std::thread::hardware_concurrency() - 2);
ssize_t maxMemoryMB =
std::max((systemInfo.totalram / (1024 * 1024) * 0.75), 100.0);
if (!configFile.empty()) {
std::ifstream ifh(configFile);
std::string config;
std::getline(ifh, config, '\0');
ifh.close();
rj::Document doc;
daggy::checkRJParse(doc.Parse(config.c_str()));
if (doc.HasMember("ip"))
listenIP = doc["ip"].GetString();
if (doc.HasMember("port"))
listenPort = doc["port"].GetInt();
if (doc.HasMember("web-threads"))
webThreads = doc["web-threads"].GetInt64();
if (doc.HasMember("capacity-overrides")) {
const auto &co = doc["capacity-overrides"];
if (co.HasMember("cores"))
maxCores = co["cores"].GetInt64();
if (co.HasMember("memoryMB"))
maxCores = co["memoryMB"].GetInt64();
}
}
if (verbose) {
std::cout << "Server running at http://" << listenIP << ':' << listenPort
<< std::endl
<< "Max Cores: " << maxCores << std::endl
<< "Max Memory: " << maxMemoryMB << " MB" << std::endl
<< "Max Web Clients: " << webThreads << std::endl
<< std::endl
<< "Ctrl-C to exit" << std::endl;
}
if (asDaemon) {
daemonize();
}
Pistache::Address listenSpec(listenIP, listenPort);
daggy::daggyr::Server server(listenSpec, maxCores, maxMemoryMB);
server.init(webThreads);
server.start();
running = true;
while (running) {
std::this_thread::sleep_for(std::chrono::seconds(30));
}
server.shutdown();
}