- Adding daemonization for daggyd

- Changing how parameter keys are stored in the map to allow for future key handling.
This commit is contained in:
Ian Roddis
2021-09-01 13:32:16 -03:00
parent 4e71bf5917
commit e746f8c163
6 changed files with 158 additions and 32 deletions

21
TODO.md
View File

@@ -4,11 +4,9 @@ Tasks
- Open
- REST Server
- [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?)
- [ ] Flesh out server and interface
- Core Functionality
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
separated list
- Add execution gates
- Executors
- [ ] Slurm Executor
- Loggers
@@ -17,12 +15,13 @@ Tasks
- [ ] Add more error checking
- [ ] General logger
- [ ] Redis DAGRunLogger
- Completed
- Core Functionality
- [X] Add ability to define child -> parent relationships
- [X] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list,
and logger
- [X] Resume a failed DAG
- [X] Handle return on errored DAG / Task
- [X] Clearing a DAG Task
- Server
- [ ] Multiple execution pools
- [ ] per-Executor parameters
- Utilities
- daggyd
- [ ] Add config file support
- [ ] Support for all the different executors / state loggers
- daggyc
- [ ] Submission
- [ ] Querying

View File

@@ -22,7 +22,7 @@ namespace daggy {
if (!it->name.IsString()) {
throw std::runtime_error("All keys must be strings.");
}
std::string name = std::string{"{{"} + it->name.GetString() + "}}";
std::string name = it->name.GetString();
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {

View File

@@ -80,8 +80,6 @@ namespace daggy {
.bind(&Server::handleGetDAGRuns, this)
.produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "The list of all known DAG Runs");
}
/*

View File

@@ -21,7 +21,8 @@ namespace daggy {
std::vector<std::string> expandedPart{part};
// Find all values of parameters, and expand them
for (const auto &[param, paramValue]: parameters) {
for (const auto &[paramRaw, paramValue]: parameters) {
std::string param = "{{" + paramRaw + "}}";
auto pos = part.find(param);
if (pos == std::string::npos) continue;
std::vector<std::string> newExpandedPart;

View File

@@ -13,8 +13,8 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parametersFromJSON(testParams);
REQUIRE(params.size() == 2);
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["{{DATE}}"]));
REQUIRE(std::holds_alternative<std::string>(params["{{SOURCE}}"]));
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["DATE"]));
REQUIRE(std::holds_alternative<std::string>(params["SOURCE"]));
}SECTION("Invalid JSON") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"};
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
@@ -66,4 +66,4 @@ TEST_CASE("Task Serialization", "[serialize_tasks]") {
REQUIRE(task == other);
}
}
}
}

View File

@@ -1,5 +1,9 @@
#include <iostream>
#include <fstream>
#include <atomic>
#include <sys/stat.h>
#include <signal.h>
#include <argparse.hpp>
@@ -11,6 +15,86 @@
// Add loggers here
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
/*
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <syslog.h>
*/
static std::atomic<bool> running{true};
void signalHandler(int signal) {
switch (signal) {
case SIGHUP:
break;
case SIGINT:
case SIGTERM:
running = false;
break;
}
}
void daemonize() {
pid_t pid;
struct sigaction newSigAction;
sigset_t newSigSet;
/* Check if parent process id is set */
if (getppid() == 1) { return; }
/* Set signal mask - signals we want to block */
sigemptyset(&newSigSet);
sigaddset(&newSigSet, SIGCHLD); /* ignore child - i.e. we don't need to wait for it */
sigaddset(&newSigSet, SIGTSTP); /* ignore Tty stop signals */
sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */
sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */
sigprocmask(SIG_BLOCK, &newSigSet, NULL); /* Block the above specified signals */
/* Set up a signal handler */
newSigAction.sa_handler = signalHandler;
sigemptyset(&newSigAction.sa_mask);
newSigAction.sa_flags = 0;
/* Signals to handle */
sigaction(SIGHUP, &newSigAction, NULL); /* catch hangup signal */
sigaction(SIGTERM, &newSigAction, NULL); /* catch term signal */
sigaction(SIGINT, &newSigAction, NULL); /* catch interrupt signal */
// Fork once
pid = fork();
if (pid < 0) { exit(EXIT_FAILURE); }
if (pid > 0) { exit(EXIT_SUCCESS); }
/* On success: The child process becomes session leader */
if (setsid() < 0) {
std::cerr << "Unable to setsid" << std::endl;
exit(EXIT_FAILURE);
}
/* Catch, ignore and handle signals */
signal(SIGCHLD, SIG_IGN);
signal(SIGHUP, SIG_IGN);
/* Fork off for the second time*/
pid = fork();
if (pid < 0)
exit(EXIT_FAILURE);
if (pid > 0)
exit(EXIT_SUCCESS);
umask(0);
/* Change the working directory to the root directory */
/* or another appropriated directory */
chdir("/");
/* Close all open file descriptors */
for (auto x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { close(x); }
}
int main(int argc, char **argv) {
argparse::ArgumentParser args("Daggy");
@@ -22,25 +106,25 @@ int main(int argc, char **argv) {
.implicit_value(true);
args.add_argument("--ip")
.help("IP address to listen to")
.default_value("localhost");
.default_value(std::string{"127.0.0.1"});
args.add_argument("--log-file")
.help("File to log to.")
.default_value("daggyd.log");
.default_value(std::string{"daggyd.log"});
args.add_argument("--port")
.help("Port to listen to")
.default_value(2503)
.action([](const std::string &value) { return std::stoi(value); });
args.add_argument("--dag-threads")
.help("Number of DAGs to run concurrently")
.default_value(10)
.default_value(10UL)
.action([](const std::string &value) { return std::stoull(value); });
args.add_argument("--web-threads")
.help("Number of web requests to support concurrently")
.default_value(30)
.default_value(30UL)
.action([](const std::string &value) { return std::stoull(value); });
args.add_argument("--executor-threads")
.help("Number of tasks to run concurrently")
.default_value(30)
.default_value(30UL)
.action([](const std::string &value) { return std::stoull(value); });
try {
@@ -51,16 +135,60 @@ int main(int argc, char **argv) {
exit(1);
}
std::ofstream logFile(args.get<std::string>("--log-file"), std::ios::app);
daggy::loggers::dag_run::OStreamLogger logger(logFile);
daggy::executors::task::ForkingTaskExecutor executor(args.get<size_t>("--executor-threads"));
Pistache::Address listenSpec(args.get<std::string>("--ip"), args.get<uint16_t>("--port"));
bool verbose = args.get<bool>("--verbose");
bool asDaemon = args.get<bool>("--daemon");
std::string logFileName = args.get<std::string>("--log-file");
std::string listenIP = args.get<std::string>("--ip");
uint16_t listenPort = args.get<int>("--port");
size_t executorThreads = args.get<size_t>("--executor-threads");
size_t webThreads = args.get<size_t>("--web-threads");
size_t dagThreads = args.get<size_t>("--dag-threads");
daggy::Server server(listenSpec, logger, executor, args.get<size_t>("--dag-threads"));
server.init(args.get<size_t>("--web-threads"));
if (logFileName == "-") {
if (asDaemon) {
std::cout << "Unable to daemonize if logging to stdout" << std::endl;
exit(1);
}
} else {
fs::path logFn{logFileName};
if (!logFn.is_absolute()) {
logFileName = (fs::current_path() / logFileName).string();
}
}
if (verbose) {
std::cout << "Server running at http://" << listenIP << ':' << listenPort << std::endl
<< "Max DAG Processing: " << dagThreads << std::endl
<< "Max Task Execution: " << executorThreads << std::endl
<< "Max Web Clients: " << webThreads << std::endl
<< "Logging to: " << logFileName << std::endl
<< std::endl << "Ctrl-C to exit" << std::endl;
}
if (asDaemon) {
daemonize();
}
std::ofstream logFH;
std::unique_ptr<daggy::loggers::dag_run::DAGRunLogger> logger;
if (logFileName == "-") {
logger = std::make_unique<daggy::loggers::dag_run::OStreamLogger>(std::cout);
} else {
logFH.open(logFileName, std::ios::app);
logger = std::make_unique<daggy::loggers::dag_run::OStreamLogger>(logFH);
}
daggy::executors::task::ForkingTaskExecutor executor(executorThreads);
Pistache::Address listenSpec(listenIP, listenPort);
daggy::Server server(listenSpec, *logger, executor, dagThreads);
server.init(webThreads);
server.start();
std::cout << "Server running at http://localhost:2503, Ctrl-C to exit" << std::endl;
while (true) {
running = true;
while (running) {
std::this_thread::sleep_for(std::chrono::seconds(30));
}
}
server.shutdown();
}