diff --git a/CMakeLists.txt b/CMakeLists.txt index 86d85f5..88187e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,12 @@ include(cmake/better-enums.cmake) include(cmake/argparse.cmake) include(cmake/Catch2.cmake) +option(DAGGY_ENABLE_SLURM "add support for SLURM executor" ON) + +if (DAGGY_ENABLE_SLURM) + add_compile_options(-DDAGGY_ENABLE_SLURM) +endif () + add_subdirectory(daggy) add_subdirectory(tests) add_subdirectory(utils) diff --git a/README.md b/README.md index ecf58be..4601c13 100644 --- a/README.md +++ b/README.md @@ -321,14 +321,52 @@ jobs on slurm with a specific set of restrictions, or allow for local execution | pool | Names the executor the DAG should run on | | poolParameters | Any parameters the executor accepts that might modify how a task is run | -Executors -========= - -Different executors require different structures for the `job` task member. - Default Job Values ------------------ A DAG can be submitted with the extra section `jobDefaults`. These values will be used to fill in default values for all tasks if they aren't overridden. This can be useful for cases like Slurm execution, where tasks will share default memory and runtime requirements. + +Executors +========= + +Different executors require different structures for the `job` task member. + +Local Executor (ForkingTaskExecutor) +------------------------------------ + +The ForkingTaskExecutor runs tasks on the local box, forking to run the task, and using threads to monitor completion +and capture output. + +| Field | Sample | Description | +|---------|--------|--------------| +| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host | + +Slurm Executor (SlurmTaskExecutor) +---------------------------------- + +The slurm executor requires that the daggy server be running on a node capable of submitting jobs. + +To enable slurm support use `cmake -DDAGGY_ENABLE_SLURM=ON ..` when configuring the project. + +Required `job` config values: + +| Field | Sample | Description | +|---------|--------|--------------| +| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host | +| minCPUs | `"1"` | Minimum number of CPUs required | +| minMemoryMB | `"1"` | Minimum memory required, in MB | +| minTmpDiskMB | `"1"` | Minimum temporary disk required, in MB | +| priority | `"100"` | Slurm priority | +| timeLimitSeconds | `"100"` | Number of seconds to allow the job to run for | +| userID | `"1002"` | Numeric UID that the job should run as | +| workDir | `"/tmp/"` | Directory to use for work | +| tmpDir | `"/tmp/"` | Directory to use for temporary files, as well as stdout/stderr capture | + +Daggy will submit the `command` to run, capturing the output in `${tmpDir}/${taskName}_{RANDOM}.{stderr,stdout}` . Those +files will then be read after the task has completed, and stored in the AttemptRecord for later retrieval. + +For this reason, it's important that the `tmpDir` directory **be readable by the daggy engine**. i.e in a distributed +environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy, +although it will still be available wherever it was written by slurm. diff --git a/TODO.md b/TODO.md index d7e6289..6ed7f5e 100644 --- a/TODO.md +++ b/TODO.md @@ -8,10 +8,8 @@ Tasks - Add support for Task.runSpec - Executors can validate runspecs - Executors can expand runspecs - - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma - separated list - - Executors - - [ ] Slurm Executor + - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated + list - Loggers - [ ] FileSystemLogger - [ ] Add unit tests diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index 735f300..038a9bb 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -1,6 +1,11 @@ project(libdaggy) +SET(LINK_LIBS pistache pthread rapidjson better-enums) +IF (DAGGY_ENABLE_SLURM) + SET(LINK_LIBS ${LINK_LIBS} slurm) +endif () + file(GLOB_RECURSE SOURCES src/*.cpp) add_library(${PROJECT_NAME} STATIC ${SOURCES}) target_include_directories(${PROJECT_NAME} PUBLIC include) -target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson better-enums) \ No newline at end of file +target_link_libraries(${PROJECT_NAME} ${LINK_LIBS}) \ No newline at end of file diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index bc0e60e..298b57b 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -17,7 +17,7 @@ namespace daggy::executors::task { expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; // Runs the task - AttemptRecord execute(const Task &task) override; + AttemptRecord execute(const std::string &taskName, const Task &task) override; }; } diff --git a/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp b/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp new file mode 100644 index 0000000..5dc3009 --- /dev/null +++ b/daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "TaskExecutor.hpp" + +namespace daggy::executors::task { + class SlurmTaskExecutor : public TaskExecutor { + public: + using Command = std::vector; + + SlurmTaskExecutor(size_t nThreads); + + // Validates the job to ensure that all required values are set and are of the right type, + bool validateTaskParameters(const ConfigValues &job) override; + + std::vector + expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override; + + // Runs the task + AttemptRecord execute(const std::string &taskName, const Task &task) override; + }; +} diff --git a/daggy/include/daggy/executors/task/TaskExecutor.hpp b/daggy/include/daggy/executors/task/TaskExecutor.hpp index 80948c9..14fb508 100644 --- a/daggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/TaskExecutor.hpp @@ -27,7 +27,7 @@ namespace daggy::executors::task { expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) = 0; // Blocking execution of a task - virtual AttemptRecord execute(const Task &task) = 0; + virtual AttemptRecord execute(const std::string &taskName, const Task &task) = 0; ThreadPool threadPool; }; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 66711ab..b133778 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -135,7 +135,7 @@ namespace daggy { logger.updateTaskState(runID, taskName, RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { - attempts.push_back(executor.execute(task)); + attempts.push_back(executor.execute(taskName, task)); logger.logTaskAttempt(runID, taskName, attempts.back()); if (attempts.back().rc == 0) break; logger.updateTaskState(runID, taskName, RunState::RETRY); diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index 3f6184d..40c793b 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -32,7 +32,7 @@ std::string slurp(int fd) { } daggy::AttemptRecord -ForkingTaskExecutor::execute(const Task &task) { +ForkingTaskExecutor::execute(const std::string &, const Task &task) { AttemptRecord rec; rec.startTime = Clock::now(); diff --git a/daggy/src/executors/task/SlurmTaskExecutor.cpp b/daggy/src/executors/task/SlurmTaskExecutor.cpp new file mode 100644 index 0000000..4cc96ab --- /dev/null +++ b/daggy/src/executors/task/SlurmTaskExecutor.cpp @@ -0,0 +1,246 @@ +#ifdef DAGGY_ENABLE_SLURM +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace fs = std::filesystem; + +namespace daggy::executors::task { + std::string getUniqueTag(size_t nChars = 6) { + std::string result(nChars, '\0'); + static std::random_device dev; + static std::mt19937 rng(dev()); + + std::uniform_int_distribution dist(0, 61); + + const char *v = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + for (size_t i = 0; i < nChars; i++) { + result[i] = v[dist(rng)]; + } + return result; + } + + void readAndClean(const fs::path & fn, std::string & dest) { + if (! fs::exists(fn)) return; + + std::ifstream ifh; + ifh.open(fn); + std::string contents(std::istreambuf_iterator{ifh}, {}); + ifh.close(); + fs::remove_all(fn); + + dest.swap(contents); + } + + SlurmTaskExecutor::SlurmTaskExecutor(size_t nThreads) + : TaskExecutor(nThreads) { + std::string priority = "SLURM_PRIO_PROCESS=" + std::to_string(getpriority(PRIO_PROCESS, 0)); + std::string submitDir = "SLURM_SUBMIT_DIR=" + fs::current_path().string(); + + const size_t MAX_HOSTNAME_LENGTH = 50; + std::string submitHost(MAX_HOSTNAME_LENGTH, '\0'); + gethostname(submitHost.data(), MAX_HOSTNAME_LENGTH); + submitHost = "SLURM_SUBMIT_HOST=" + submitHost; + submitHost.resize(submitHost.find('\0')); + + uint32_t mask = umask(0); + umask(mask); // Restore the old mask + + std::stringstream ss; + ss << "SLURM_UMASK=0" + << uint32_t{((mask >> 6) & 07)} + << uint32_t{((mask >> 3) & 07)} + << uint32_t{(mask & 07)}; + + // Set some environment variables + putenv(const_cast(priority.c_str())); + putenv(const_cast(submitDir.c_str())); + putenv(const_cast(submitHost.c_str())); + putenv(const_cast(ss.str().c_str())); + } + + // Validates the job to ensure that all required values are set and are of the right type, + bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job) { + const std::unordered_set requiredFields{ + "minCPUs", + "minMemoryMB", + "minTmpDiskMB", + "priority", + "timeLimitSeconds", + "userID", + "workDir", + "tmpDir", + "command" + }; + + for (const auto &requiredField: requiredFields) { + if (job.count(requiredField) == 0) { + throw std::runtime_error("Missing field " + requiredField); + } + } + return true; + } + + std::vector + SlurmTaskExecutor::expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) { + std::vector newValues; + + const auto command = std::get(job.at("command")); + for (const auto &expandedCommand: interpolateValues(command, expansionValues)) { + ConfigValues newCommand{job}; + newCommand.at("command") = expandedCommand; + newValues.emplace_back(newCommand); + } + + return newValues; + } + + AttemptRecord SlurmTaskExecutor::execute(const std::string &taskName, const Task &task) { + AttemptRecord record; + std::stringstream executorLog; + + const auto &job = task.job; + const auto uniqueTaskName = taskName + "_" + getUniqueTag(6); + + fs::path tmpDir = std::get(job.at("tmpDir")); + std::string stdoutFile = (tmpDir / (uniqueTaskName + ".stdout")).string(); + std::string stderrFile = (tmpDir / (uniqueTaskName + ".stderr")).string(); + std::string workDir = std::get(job.at("workDir")); + + // Convert command to argc / argv + std::vector argv{nullptr}; + const auto command = std::get>(task.job.at("command")); + for (const auto &s: command) { + argv.push_back(const_cast(s.c_str())); + } + + char empty[] = ""; + char *env[1]; + env[0] = empty; + + char script[] = "#!/bin/bash\n$@\n"; + + // taken from slurm + int error_code; + job_desc_msg_t jd; + submit_response_msg_t *resp_msg; + + slurm_init_job_desc_msg(&jd); + jd.contiguous = 1; + jd.name = const_cast(taskName.c_str()); + jd.min_cpus = std::stoi(std::get(job.at("minCPUs"))); + + jd.pn_min_memory = std::stoi(std::get(job.at("minMemoryMB"))); + jd.pn_min_tmp_disk = std::stoi(std::get(job.at("minTmpDiskMB"))); + jd.priority = std::stoi(std::get(job.at("priority"))); + jd.shared = 0; + jd.time_limit = std::stoi(std::get(job.at("timeLimitSeconds"))); + jd.min_nodes = 1; + jd.user_id = std::stoi(std::get(job.at("userID"))); + jd.argv = argv.data(); + jd.argc = argv.size(); + // TODO figure out the script to run + jd.script = script; + jd.std_in = empty; + jd.std_err = const_cast(stderrFile.c_str()); + jd.std_out = const_cast(stdoutFile.c_str()); + jd.work_dir = const_cast(workDir.c_str()); + jd.env_size = 1; + jd.environment = env; + + /* TODO: Add support for environment + jobDescription.env_size = 2; + env[0] = "SLURM_ENV_0=looking_good"; + env[1] = "SLURM_ENV_1=still_good"; + jobDescription.environment = env; + */ + + error_code = slurm_submit_batch_job(&jd, &resp_msg); + if (error_code) { + record.rc = error_code; + executorLog << "Unable to submit slurm job: " + << slurm_strerror(error_code); + record.executorLog = executorLog.str(); + return record; + } + + uint32_t jobID = resp_msg->job_id; + executorLog << "Job " << resp_msg->job_submit_user_msg << '\n'; + slurm_free_submit_response_response_msg(resp_msg); + + // Pool until done + job_info_msg_t * jobStatus; + bool waiting = true; + while (waiting) { + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + error_code = slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL); + if (error_code == SLURM_SUCCESS) { + uint32_t idx = jobStatus->record_count; + if (idx > 0) { + idx--; + const slurm_job_info_t & jobInfo = jobStatus->job_array[idx]; + switch(jobInfo.job_state) { + case JOB_PENDING: + case JOB_SUSPENDED: + case JOB_RUNNING: + continue; + break; + // Job has finished + case JOB_COMPLETE: /* completed execution successfully */ + case JOB_FAILED: /* completed execution unsuccessfully */ + record.rc = jobInfo.exit_code; + executorLog << "Script errored.\n"; + break; + case JOB_CANCELLED: /* cancelled by user */ + executorLog << "Job cancelled by user.\n"; + break; + case JOB_TIMEOUT: /* terminated on reaching time limit */ + executorLog << "Job exceeded time limit.\n"; + break; + case JOB_NODE_FAIL: /* terminated on node failure */ + executorLog << "Node failed during execution\n"; + break; + case JOB_PREEMPTED: /* terminated due to preemption */ + executorLog << "Job terminated due to pre-emption.\n"; + break; + case JOB_BOOT_FAIL: /* terminated due to node boot failure */ + executorLog << "Job failed to run due to failure of compute node to boot.\n"; + break; + case JOB_DEADLINE: /* terminated on deadline */ + executorLog << "Job terminated due to deadline.\n"; + break; + case JOB_OOM: /* experienced out of memory error */ + executorLog << "Job terminated due to out-of-memory.\n"; + break; + } + waiting = false; + record.rc = jobInfo.exit_code; + } + slurm_free_job_info_msg(jobStatus); + + readAndClean(stdoutFile, record.outputLog); + readAndClean(stderrFile, record.errorLog); + record.executorLog = executorLog.str(); + } else { + executorLog << "Failed to poll for job: " + << slurm_strerror(error_code); + } + } + + + return record; + } +} +#endif \ No newline at end of file diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index 16c4727..f38a79d 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -4,7 +4,7 @@ #include -TEST_CASE("DAG Construction Tests", "[dag]") { +TEST_CASE("dag_construction", "[dag]") { daggy::DAG dag; REQUIRE(dag.size() == 0); @@ -34,7 +34,7 @@ TEST_CASE("DAG Construction Tests", "[dag]") { } } -TEST_CASE("DAG Traversal Tests", "[dag]") { +TEST_CASE("dag_traversal", "[dag]") { daggy::DAG dag; const int N_VERTICES = 10; diff --git a/tests/unit_dagrun_loggers.cpp b/tests/unit_dagrun_loggers.cpp index 1f826a9..1b5cf29 100644 --- a/tests/unit_dagrun_loggers.cpp +++ b/tests/unit_dagrun_loggers.cpp @@ -54,7 +54,7 @@ TEST_CASE("Filesystem Logger", "[filesystem_logger]") { } */ -TEST_CASE("ostream Logger", "[ostream_logger]") { +TEST_CASE("ostream_logger", "[ostream_logger]") { //cleanup(); std::stringstream ss; daggy::loggers::dag_run::OStreamLogger logger(ss); diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index d225f20..98d80bc 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -7,7 +7,7 @@ #include -TEST_CASE("Basic Execution", "[forking_executor]") { +TEST_CASE("forking_executor", "[forking_executor]") { daggy::executors::task::ForkingTaskExecutor ex(10); SECTION("Simple Run") { @@ -16,7 +16,7 @@ TEST_CASE("Basic Execution", "[forking_executor]") { REQUIRE(ex.validateTaskParameters(task.job)); - auto rec = ex.execute(task); + auto rec = ex.execute("command", task); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() >= 6); @@ -27,7 +27,7 @@ TEST_CASE("Basic Execution", "[forking_executor]") { daggy::Task task{.job{ {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}}; - auto rec = ex.execute(task); + auto rec = ex.execute("command", task); REQUIRE(rec.rc == 2); REQUIRE(rec.errorLog.size() >= 20); @@ -45,7 +45,7 @@ TEST_CASE("Basic Execution", "[forking_executor]") { daggy::Task task{.job{ {"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/cat", bigFile}}}}; - auto rec = ex.execute(task); + auto rec = ex.execute("command", task); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); diff --git a/tests/unit_executor_slurmexecutor.cpp b/tests/unit_executor_slurmexecutor.cpp new file mode 100644 index 0000000..9bcade1 --- /dev/null +++ b/tests/unit_executor_slurmexecutor.cpp @@ -0,0 +1,105 @@ +#include +#include + +#include "daggy/executors/task/SlurmTaskExecutor.hpp" +#include "daggy/Serialization.hpp" +#include "daggy/Utilities.hpp" + +#include + +namespace fs = std::filesystem; + +#ifdef DAGGY_ENABLE_SLURM + +TEST_CASE("slurm_execution", "[slurm_executor]") { + daggy::executors::task::SlurmTaskExecutor ex(10); + + daggy::ConfigValues defaultJobValues{ + {"minCPUs", "1"}, + {"minMemoryMB", "100"}, + {"minTmpDiskMB", "10"}, + {"priority", "1"}, + {"timeLimitSeconds", "200"}, + {"userID", "1002"}, + {"workDir", fs::current_path().string()}, + {"tmpDir", fs::current_path().string()} + }; + + SECTION("Simple Run") { + daggy::Task task{.job{ + {"command", std::vector{"/usr/bin/echo", "abc", "123"}} + }}; + + task.job.merge(defaultJobValues); + + REQUIRE(ex.validateTaskParameters(task.job)); + + auto rec = ex.execute("command", task); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.outputLog.size() >= 6); + REQUIRE(rec.errorLog.empty()); + } + + SECTION("Error Run") { + daggy::Task task{.job{ + {"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}}; + task.job.merge(defaultJobValues); + + auto rec = ex.execute("command", task); + + REQUIRE(rec.rc != 0); + REQUIRE(rec.errorLog.size() >= 20); + REQUIRE(rec.outputLog.empty()); + } + + SECTION("Large Output") { + const std::vector BIG_FILES{ + "/usr/share/dict/linux.words", "/usr/share/dict/cracklib-small", "/etc/ssh/moduli" + }; + + for (const auto &bigFile: BIG_FILES) { + if (!std::filesystem::exists(bigFile)) continue; + + daggy::Task task{.job{ + {"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/cat", bigFile}}}}; + task.job.merge(defaultJobValues); + + auto rec = ex.execute("command", task); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile)); + REQUIRE(rec.errorLog.empty()); + break; + } + } + + SECTION("Parameter Expansion") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"}; + auto params = daggy::configFromJSON(testParams); + + std::string taskJSON = R"({"B": {"job": {"command": ["/usr/bin/echo", "{{DATE}}"]}, "children": ["C"]}})"; + auto tasks = daggy::tasksFromJSON(taskJSON, defaultJobValues); + + auto result = daggy::expandTaskSet(tasks, ex, params); + REQUIRE(result.size() == 2); + } + + SECTION("Build with expansion") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; + auto params = daggy::configFromJSON(testParams); + std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["B"]}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})"; + auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks, defaultJobValues), ex, params); + REQUIRE(tasks.size() == 4); + } + + SECTION("Build with expansion using parents instead of children") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; + auto params = daggy::configFromJSON(testParams); + std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "parents": ["A"]}, "C": {"job": {"command": ["/bin/echo", "C"]}, "parents": ["A"]}})"; + auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks, defaultJobValues), ex, params); + + REQUIRE(tasks.size() == 4); + } +} +#endif \ No newline at end of file diff --git a/tests/unit_serialization.cpp b/tests/unit_serialization.cpp index 74a3671..75f1a4c 100644 --- a/tests/unit_serialization.cpp +++ b/tests/unit_serialization.cpp @@ -8,7 +8,7 @@ namespace fs = std::filesystem; -TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") { +TEST_CASE("parameter_deserialization", "[deserialize_parameters]") { SECTION("Basic Parse") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; auto params = daggy::configFromJSON(testParams); @@ -27,7 +27,7 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") { } } -TEST_CASE("Task Deserialization", "[deserialize_task]") { +TEST_CASE("task_deserialization", "[deserialize_task]") { SECTION("Build with no expansion") { std::string testTasks = R"({ "A": { @@ -70,7 +70,7 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") { } } -TEST_CASE("Task Serialization", "[serialize_tasks]") { +TEST_CASE("task_serialization", "[serialize_tasks]") { SECTION("Build with no expansion") { std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["C"]}, "B": {"job": {"command": ["/bin/echo", "B"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})"; auto tasks = daggy::tasksFromJSON(testTasks); diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index 581b210..cfae715 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -50,7 +50,7 @@ REQUEST(std::string url, std::string payload = "") { return response; } -TEST_CASE("Server Basic Endpoints", "[server_basic]") { +TEST_CASE("rest_endpoint", "[server_basic]") { std::stringstream ss; daggy::executors::task::ForkingTaskExecutor executor(10); daggy::loggers::dag_run::OStreamLogger logger(ss); diff --git a/tests/unit_threadpool.cpp b/tests/unit_threadpool.cpp index e3d5912..9191a9a 100644 --- a/tests/unit_threadpool.cpp +++ b/tests/unit_threadpool.cpp @@ -7,7 +7,7 @@ using namespace daggy; -TEST_CASE("Threadpool Construction", "[threadpool]") { +TEST_CASE("threadpool", "[threadpool]") { std::atomic cnt(0); ThreadPool tp(10); @@ -22,7 +22,7 @@ TEST_CASE("Threadpool Construction", "[threadpool]") { return cnt.load(); }))); tp.addTasks(tq); - for (auto &r : res) r.get(); + for (auto &r: res) r.get(); REQUIRE(cnt == 100); } @@ -35,7 +35,7 @@ TEST_CASE("Threadpool Construction", "[threadpool]") { cnt++; return; })); - for (auto &r : res) r.get(); + for (auto &r: res) r.get(); REQUIRE(cnt == 100); } } diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp index 82a5b7c..1577755 100644 --- a/tests/unit_utilities.cpp +++ b/tests/unit_utilities.cpp @@ -15,13 +15,13 @@ namespace fs = std::filesystem; -TEST_CASE("String Utilities", "[utilities_string]") { +TEST_CASE("string_utilities", "[utilities_string]") { std::string test = "/this/is/{{A}}/test/{{A}}"; auto res = daggy::globalSub(test, "{{A}}", "hello"); REQUIRE(res == "/this/is/hello/test/hello"); } -TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") { +TEST_CASE("string_expansion", "[utilities_parameter_expansion]") { SECTION("Basic expansion") { std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"}; auto params = daggy::configFromJSON(testParams); @@ -55,7 +55,7 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") { } } -TEST_CASE("DAG Runner", "[utilities_dag_runner]") { +TEST_CASE("dag_runner", "[utilities_dag_runner]") { daggy::executors::task::ForkingTaskExecutor ex(10); std::stringstream ss; daggy::loggers::dag_run::OStreamLogger logger(ss); diff --git a/utils/daggyd/daggyd.cpp b/utils/daggyd/daggyd.cpp index fd2105e..a7bf2bd 100644 --- a/utils/daggyd/daggyd.cpp +++ b/utils/daggyd/daggyd.cpp @@ -10,7 +10,13 @@ #include // Add executors here +#ifdef DAGGY_ENABLE_SLURM + +#include + +#else #include +#endif // Add loggers here #include @@ -178,7 +184,11 @@ int main(int argc, char **argv) { logger = std::make_unique(logFH); } +#ifdef DAGGY_ENABLE_SLURM + daggy::executors::task::SlurmTaskExecutor executor(executorThreads); +#else daggy::executors::task::ForkingTaskExecutor executor(executorThreads); +#endif Pistache::Address listenSpec(listenIP, listenPort); daggy::Server server(listenSpec, *logger, executor, dagThreads);