Adding support for execution on slurm grids
- Adding support for SlurmTaskExecutor in `daggyd` if DAGGY_ENABLE_SLURM is defined. - Renaming some test cases - Enabling compile-time slurm support - Adding slurm documentation
This commit is contained in:
@@ -14,6 +14,12 @@ include(cmake/better-enums.cmake)
|
||||
include(cmake/argparse.cmake)
|
||||
include(cmake/Catch2.cmake)
|
||||
|
||||
option(DAGGY_ENABLE_SLURM "add support for SLURM executor" ON)
|
||||
|
||||
if (DAGGY_ENABLE_SLURM)
|
||||
add_compile_options(-DDAGGY_ENABLE_SLURM)
|
||||
endif ()
|
||||
|
||||
add_subdirectory(daggy)
|
||||
add_subdirectory(tests)
|
||||
add_subdirectory(utils)
|
||||
|
||||
48
README.md
48
README.md
@@ -321,14 +321,52 @@ jobs on slurm with a specific set of restrictions, or allow for local execution
|
||||
| pool | Names the executor the DAG should run on |
|
||||
| poolParameters | Any parameters the executor accepts that might modify how a task is run |
|
||||
|
||||
Executors
|
||||
=========
|
||||
|
||||
Different executors require different structures for the `job` task member.
|
||||
|
||||
Default Job Values
|
||||
------------------
|
||||
|
||||
A DAG can be submitted with the extra section `jobDefaults`. These values will be used to fill in default values for all
|
||||
tasks if they aren't overridden. This can be useful for cases like Slurm execution, where tasks will share default
|
||||
memory and runtime requirements.
|
||||
|
||||
Executors
|
||||
=========
|
||||
|
||||
Different executors require different structures for the `job` task member.
|
||||
|
||||
Local Executor (ForkingTaskExecutor)
|
||||
------------------------------------
|
||||
|
||||
The ForkingTaskExecutor runs tasks on the local box, forking to run the task, and using threads to monitor completion
|
||||
and capture output.
|
||||
|
||||
| Field | Sample | Description |
|
||||
|---------|--------|--------------|
|
||||
| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host |
|
||||
|
||||
Slurm Executor (SlurmTaskExecutor)
|
||||
----------------------------------
|
||||
|
||||
The slurm executor requires that the daggy server be running on a node capable of submitting jobs.
|
||||
|
||||
To enable slurm support use `cmake -DDAGGY_ENABLE_SLURM=ON ..` when configuring the project.
|
||||
|
||||
Required `job` config values:
|
||||
|
||||
| Field | Sample | Description |
|
||||
|---------|--------|--------------|
|
||||
| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host |
|
||||
| minCPUs | `"1"` | Minimum number of CPUs required |
|
||||
| minMemoryMB | `"1"` | Minimum memory required, in MB |
|
||||
| minTmpDiskMB | `"1"` | Minimum temporary disk required, in MB |
|
||||
| priority | `"100"` | Slurm priority |
|
||||
| timeLimitSeconds | `"100"` | Number of seconds to allow the job to run for |
|
||||
| userID | `"1002"` | Numeric UID that the job should run as |
|
||||
| workDir | `"/tmp/"` | Directory to use for work |
|
||||
| tmpDir | `"/tmp/"` | Directory to use for temporary files, as well as stdout/stderr capture |
|
||||
|
||||
Daggy will submit the `command` to run, capturing the output in `${tmpDir}/${taskName}_{RANDOM}.{stderr,stdout}` . Those
|
||||
files will then be read after the task has completed, and stored in the AttemptRecord for later retrieval.
|
||||
|
||||
For this reason, it's important that the `tmpDir` directory **be readable by the daggy engine**. i.e in a distributed
|
||||
environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy,
|
||||
although it will still be available wherever it was written by slurm.
|
||||
|
||||
6
TODO.md
6
TODO.md
@@ -8,10 +8,8 @@ Tasks
|
||||
- Add support for Task.runSpec
|
||||
- Executors can validate runspecs
|
||||
- Executors can expand runspecs
|
||||
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
|
||||
separated list
|
||||
- Executors
|
||||
- [ ] Slurm Executor
|
||||
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated
|
||||
list
|
||||
- Loggers
|
||||
- [ ] FileSystemLogger
|
||||
- [ ] Add unit tests
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
project(libdaggy)
|
||||
|
||||
SET(LINK_LIBS pistache pthread rapidjson better-enums)
|
||||
IF (DAGGY_ENABLE_SLURM)
|
||||
SET(LINK_LIBS ${LINK_LIBS} slurm)
|
||||
endif ()
|
||||
|
||||
file(GLOB_RECURSE SOURCES src/*.cpp)
|
||||
add_library(${PROJECT_NAME} STATIC ${SOURCES})
|
||||
target_include_directories(${PROJECT_NAME} PUBLIC include)
|
||||
target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson better-enums)
|
||||
target_link_libraries(${PROJECT_NAME} ${LINK_LIBS})
|
||||
@@ -17,7 +17,7 @@ namespace daggy::executors::task {
|
||||
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override;
|
||||
|
||||
// Runs the task
|
||||
AttemptRecord execute(const Task &task) override;
|
||||
AttemptRecord execute(const std::string &taskName, const Task &task) override;
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
21
daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp
Normal file
21
daggy/include/daggy/executors/task/SlurmTaskExecutor.hpp
Normal file
@@ -0,0 +1,21 @@
|
||||
#pragma once
|
||||
|
||||
#include "TaskExecutor.hpp"
|
||||
|
||||
namespace daggy::executors::task {
|
||||
class SlurmTaskExecutor : public TaskExecutor {
|
||||
public:
|
||||
using Command = std::vector<std::string>;
|
||||
|
||||
SlurmTaskExecutor(size_t nThreads);
|
||||
|
||||
// Validates the job to ensure that all required values are set and are of the right type,
|
||||
bool validateTaskParameters(const ConfigValues &job) override;
|
||||
|
||||
std::vector<ConfigValues>
|
||||
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override;
|
||||
|
||||
// Runs the task
|
||||
AttemptRecord execute(const std::string &taskName, const Task &task) override;
|
||||
};
|
||||
}
|
||||
@@ -27,7 +27,7 @@ namespace daggy::executors::task {
|
||||
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) = 0;
|
||||
|
||||
// Blocking execution of a task
|
||||
virtual AttemptRecord execute(const Task &task) = 0;
|
||||
virtual AttemptRecord execute(const std::string &taskName, const Task &task) = 0;
|
||||
|
||||
ThreadPool threadPool;
|
||||
};
|
||||
|
||||
@@ -135,7 +135,7 @@ namespace daggy {
|
||||
logger.updateTaskState(runID, taskName, RunState::RUNNING);
|
||||
|
||||
while (attempts.size() < task.maxRetries + 1) {
|
||||
attempts.push_back(executor.execute(task));
|
||||
attempts.push_back(executor.execute(taskName, task));
|
||||
logger.logTaskAttempt(runID, taskName, attempts.back());
|
||||
if (attempts.back().rc == 0) break;
|
||||
logger.updateTaskState(runID, taskName, RunState::RETRY);
|
||||
|
||||
@@ -32,7 +32,7 @@ std::string slurp(int fd) {
|
||||
}
|
||||
|
||||
daggy::AttemptRecord
|
||||
ForkingTaskExecutor::execute(const Task &task) {
|
||||
ForkingTaskExecutor::execute(const std::string &, const Task &task) {
|
||||
AttemptRecord rec;
|
||||
|
||||
rec.startTime = Clock::now();
|
||||
|
||||
246
daggy/src/executors/task/SlurmTaskExecutor.cpp
Normal file
246
daggy/src/executors/task/SlurmTaskExecutor.cpp
Normal file
@@ -0,0 +1,246 @@
|
||||
#ifdef DAGGY_ENABLE_SLURM
|
||||
#include <random>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include <slurm/slurm.h>
|
||||
|
||||
#include <daggy/executors/task/SlurmTaskExecutor.hpp>
|
||||
#include <daggy/Utilities.hpp>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace daggy::executors::task {
|
||||
std::string getUniqueTag(size_t nChars = 6) {
|
||||
std::string result(nChars, '\0');
|
||||
static std::random_device dev;
|
||||
static std::mt19937 rng(dev());
|
||||
|
||||
std::uniform_int_distribution<int> dist(0, 61);
|
||||
|
||||
const char *v = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
|
||||
for (size_t i = 0; i < nChars; i++) {
|
||||
result[i] = v[dist(rng)];
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void readAndClean(const fs::path & fn, std::string & dest) {
|
||||
if (! fs::exists(fn)) return;
|
||||
|
||||
std::ifstream ifh;
|
||||
ifh.open(fn);
|
||||
std::string contents(std::istreambuf_iterator<char>{ifh}, {});
|
||||
ifh.close();
|
||||
fs::remove_all(fn);
|
||||
|
||||
dest.swap(contents);
|
||||
}
|
||||
|
||||
SlurmTaskExecutor::SlurmTaskExecutor(size_t nThreads)
|
||||
: TaskExecutor(nThreads) {
|
||||
std::string priority = "SLURM_PRIO_PROCESS=" + std::to_string(getpriority(PRIO_PROCESS, 0));
|
||||
std::string submitDir = "SLURM_SUBMIT_DIR=" + fs::current_path().string();
|
||||
|
||||
const size_t MAX_HOSTNAME_LENGTH = 50;
|
||||
std::string submitHost(MAX_HOSTNAME_LENGTH, '\0');
|
||||
gethostname(submitHost.data(), MAX_HOSTNAME_LENGTH);
|
||||
submitHost = "SLURM_SUBMIT_HOST=" + submitHost;
|
||||
submitHost.resize(submitHost.find('\0'));
|
||||
|
||||
uint32_t mask = umask(0);
|
||||
umask(mask); // Restore the old mask
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "SLURM_UMASK=0"
|
||||
<< uint32_t{((mask >> 6) & 07)}
|
||||
<< uint32_t{((mask >> 3) & 07)}
|
||||
<< uint32_t{(mask & 07)};
|
||||
|
||||
// Set some environment variables
|
||||
putenv(const_cast<char *>(priority.c_str()));
|
||||
putenv(const_cast<char *>(submitDir.c_str()));
|
||||
putenv(const_cast<char *>(submitHost.c_str()));
|
||||
putenv(const_cast<char *>(ss.str().c_str()));
|
||||
}
|
||||
|
||||
// Validates the job to ensure that all required values are set and are of the right type,
|
||||
bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job) {
|
||||
const std::unordered_set<std::string> requiredFields{
|
||||
"minCPUs",
|
||||
"minMemoryMB",
|
||||
"minTmpDiskMB",
|
||||
"priority",
|
||||
"timeLimitSeconds",
|
||||
"userID",
|
||||
"workDir",
|
||||
"tmpDir",
|
||||
"command"
|
||||
};
|
||||
|
||||
for (const auto &requiredField: requiredFields) {
|
||||
if (job.count(requiredField) == 0) {
|
||||
throw std::runtime_error("Missing field " + requiredField);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<ConfigValues>
|
||||
SlurmTaskExecutor::expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) {
|
||||
std::vector<ConfigValues> newValues;
|
||||
|
||||
const auto command = std::get<Command>(job.at("command"));
|
||||
for (const auto &expandedCommand: interpolateValues(command, expansionValues)) {
|
||||
ConfigValues newCommand{job};
|
||||
newCommand.at("command") = expandedCommand;
|
||||
newValues.emplace_back(newCommand);
|
||||
}
|
||||
|
||||
return newValues;
|
||||
}
|
||||
|
||||
AttemptRecord SlurmTaskExecutor::execute(const std::string &taskName, const Task &task) {
|
||||
AttemptRecord record;
|
||||
std::stringstream executorLog;
|
||||
|
||||
const auto &job = task.job;
|
||||
const auto uniqueTaskName = taskName + "_" + getUniqueTag(6);
|
||||
|
||||
fs::path tmpDir = std::get<std::string>(job.at("tmpDir"));
|
||||
std::string stdoutFile = (tmpDir / (uniqueTaskName + ".stdout")).string();
|
||||
std::string stderrFile = (tmpDir / (uniqueTaskName + ".stderr")).string();
|
||||
std::string workDir = std::get<std::string>(job.at("workDir"));
|
||||
|
||||
// Convert command to argc / argv
|
||||
std::vector<char *> argv{nullptr};
|
||||
const auto command = std::get<std::vector<std::string>>(task.job.at("command"));
|
||||
for (const auto &s: command) {
|
||||
argv.push_back(const_cast<char *>(s.c_str()));
|
||||
}
|
||||
|
||||
char empty[] = "";
|
||||
char *env[1];
|
||||
env[0] = empty;
|
||||
|
||||
char script[] = "#!/bin/bash\n$@\n";
|
||||
|
||||
// taken from slurm
|
||||
int error_code;
|
||||
job_desc_msg_t jd;
|
||||
submit_response_msg_t *resp_msg;
|
||||
|
||||
slurm_init_job_desc_msg(&jd);
|
||||
jd.contiguous = 1;
|
||||
jd.name = const_cast<char *>(taskName.c_str());
|
||||
jd.min_cpus = std::stoi(std::get<std::string>(job.at("minCPUs")));
|
||||
|
||||
jd.pn_min_memory = std::stoi(std::get<std::string>(job.at("minMemoryMB")));
|
||||
jd.pn_min_tmp_disk = std::stoi(std::get<std::string>(job.at("minTmpDiskMB")));
|
||||
jd.priority = std::stoi(std::get<std::string>(job.at("priority")));
|
||||
jd.shared = 0;
|
||||
jd.time_limit = std::stoi(std::get<std::string>(job.at("timeLimitSeconds")));
|
||||
jd.min_nodes = 1;
|
||||
jd.user_id = std::stoi(std::get<std::string>(job.at("userID")));
|
||||
jd.argv = argv.data();
|
||||
jd.argc = argv.size();
|
||||
// TODO figure out the script to run
|
||||
jd.script = script;
|
||||
jd.std_in = empty;
|
||||
jd.std_err = const_cast<char *>(stderrFile.c_str());
|
||||
jd.std_out = const_cast<char *>(stdoutFile.c_str());
|
||||
jd.work_dir = const_cast<char *>(workDir.c_str());
|
||||
jd.env_size = 1;
|
||||
jd.environment = env;
|
||||
|
||||
/* TODO: Add support for environment
|
||||
jobDescription.env_size = 2;
|
||||
env[0] = "SLURM_ENV_0=looking_good";
|
||||
env[1] = "SLURM_ENV_1=still_good";
|
||||
jobDescription.environment = env;
|
||||
*/
|
||||
|
||||
error_code = slurm_submit_batch_job(&jd, &resp_msg);
|
||||
if (error_code) {
|
||||
record.rc = error_code;
|
||||
executorLog << "Unable to submit slurm job: "
|
||||
<< slurm_strerror(error_code);
|
||||
record.executorLog = executorLog.str();
|
||||
return record;
|
||||
}
|
||||
|
||||
uint32_t jobID = resp_msg->job_id;
|
||||
executorLog << "Job " << resp_msg->job_submit_user_msg << '\n';
|
||||
slurm_free_submit_response_response_msg(resp_msg);
|
||||
|
||||
// Pool until done
|
||||
job_info_msg_t * jobStatus;
|
||||
bool waiting = true;
|
||||
while (waiting) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
error_code = slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL);
|
||||
if (error_code == SLURM_SUCCESS) {
|
||||
uint32_t idx = jobStatus->record_count;
|
||||
if (idx > 0) {
|
||||
idx--;
|
||||
const slurm_job_info_t & jobInfo = jobStatus->job_array[idx];
|
||||
switch(jobInfo.job_state) {
|
||||
case JOB_PENDING:
|
||||
case JOB_SUSPENDED:
|
||||
case JOB_RUNNING:
|
||||
continue;
|
||||
break;
|
||||
// Job has finished
|
||||
case JOB_COMPLETE: /* completed execution successfully */
|
||||
case JOB_FAILED: /* completed execution unsuccessfully */
|
||||
record.rc = jobInfo.exit_code;
|
||||
executorLog << "Script errored.\n";
|
||||
break;
|
||||
case JOB_CANCELLED: /* cancelled by user */
|
||||
executorLog << "Job cancelled by user.\n";
|
||||
break;
|
||||
case JOB_TIMEOUT: /* terminated on reaching time limit */
|
||||
executorLog << "Job exceeded time limit.\n";
|
||||
break;
|
||||
case JOB_NODE_FAIL: /* terminated on node failure */
|
||||
executorLog << "Node failed during execution\n";
|
||||
break;
|
||||
case JOB_PREEMPTED: /* terminated due to preemption */
|
||||
executorLog << "Job terminated due to pre-emption.\n";
|
||||
break;
|
||||
case JOB_BOOT_FAIL: /* terminated due to node boot failure */
|
||||
executorLog << "Job failed to run due to failure of compute node to boot.\n";
|
||||
break;
|
||||
case JOB_DEADLINE: /* terminated on deadline */
|
||||
executorLog << "Job terminated due to deadline.\n";
|
||||
break;
|
||||
case JOB_OOM: /* experienced out of memory error */
|
||||
executorLog << "Job terminated due to out-of-memory.\n";
|
||||
break;
|
||||
}
|
||||
waiting = false;
|
||||
record.rc = jobInfo.exit_code;
|
||||
}
|
||||
slurm_free_job_info_msg(jobStatus);
|
||||
|
||||
readAndClean(stdoutFile, record.outputLog);
|
||||
readAndClean(stderrFile, record.errorLog);
|
||||
record.executorLog = executorLog.str();
|
||||
} else {
|
||||
executorLog << "Failed to poll for job: "
|
||||
<< slurm_strerror(error_code);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return record;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
TEST_CASE("DAG Construction Tests", "[dag]") {
|
||||
TEST_CASE("dag_construction", "[dag]") {
|
||||
daggy::DAG<size_t, size_t> dag;
|
||||
|
||||
REQUIRE(dag.size() == 0);
|
||||
@@ -34,7 +34,7 @@ TEST_CASE("DAG Construction Tests", "[dag]") {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("DAG Traversal Tests", "[dag]") {
|
||||
TEST_CASE("dag_traversal", "[dag]") {
|
||||
daggy::DAG<size_t, size_t> dag;
|
||||
|
||||
const int N_VERTICES = 10;
|
||||
|
||||
@@ -54,7 +54,7 @@ TEST_CASE("Filesystem Logger", "[filesystem_logger]") {
|
||||
}
|
||||
*/
|
||||
|
||||
TEST_CASE("ostream Logger", "[ostream_logger]") {
|
||||
TEST_CASE("ostream_logger", "[ostream_logger]") {
|
||||
//cleanup();
|
||||
std::stringstream ss;
|
||||
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
TEST_CASE("Basic Execution", "[forking_executor]") {
|
||||
TEST_CASE("forking_executor", "[forking_executor]") {
|
||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||
|
||||
SECTION("Simple Run") {
|
||||
@@ -16,7 +16,7 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
|
||||
|
||||
REQUIRE(ex.validateTaskParameters(task.job));
|
||||
|
||||
auto rec = ex.execute(task);
|
||||
auto rec = ex.execute("command", task);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.outputLog.size() >= 6);
|
||||
@@ -27,7 +27,7 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
|
||||
daggy::Task task{.job{
|
||||
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}};
|
||||
|
||||
auto rec = ex.execute(task);
|
||||
auto rec = ex.execute("command", task);
|
||||
|
||||
REQUIRE(rec.rc == 2);
|
||||
REQUIRE(rec.errorLog.size() >= 20);
|
||||
@@ -45,7 +45,7 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
|
||||
daggy::Task task{.job{
|
||||
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/cat", bigFile}}}};
|
||||
|
||||
auto rec = ex.execute(task);
|
||||
auto rec = ex.execute("command", task);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
|
||||
|
||||
105
tests/unit_executor_slurmexecutor.cpp
Normal file
105
tests/unit_executor_slurmexecutor.cpp
Normal file
@@ -0,0 +1,105 @@
|
||||
#include <iostream>
|
||||
#include <filesystem>
|
||||
|
||||
#include "daggy/executors/task/SlurmTaskExecutor.hpp"
|
||||
#include "daggy/Serialization.hpp"
|
||||
#include "daggy/Utilities.hpp"
|
||||
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
#ifdef DAGGY_ENABLE_SLURM
|
||||
|
||||
TEST_CASE("slurm_execution", "[slurm_executor]") {
|
||||
daggy::executors::task::SlurmTaskExecutor ex(10);
|
||||
|
||||
daggy::ConfigValues defaultJobValues{
|
||||
{"minCPUs", "1"},
|
||||
{"minMemoryMB", "100"},
|
||||
{"minTmpDiskMB", "10"},
|
||||
{"priority", "1"},
|
||||
{"timeLimitSeconds", "200"},
|
||||
{"userID", "1002"},
|
||||
{"workDir", fs::current_path().string()},
|
||||
{"tmpDir", fs::current_path().string()}
|
||||
};
|
||||
|
||||
SECTION("Simple Run") {
|
||||
daggy::Task task{.job{
|
||||
{"command", std::vector<std::string>{"/usr/bin/echo", "abc", "123"}}
|
||||
}};
|
||||
|
||||
task.job.merge(defaultJobValues);
|
||||
|
||||
REQUIRE(ex.validateTaskParameters(task.job));
|
||||
|
||||
auto rec = ex.execute("command", task);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.outputLog.size() >= 6);
|
||||
REQUIRE(rec.errorLog.empty());
|
||||
}
|
||||
|
||||
SECTION("Error Run") {
|
||||
daggy::Task task{.job{
|
||||
{"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}};
|
||||
task.job.merge(defaultJobValues);
|
||||
|
||||
auto rec = ex.execute("command", task);
|
||||
|
||||
REQUIRE(rec.rc != 0);
|
||||
REQUIRE(rec.errorLog.size() >= 20);
|
||||
REQUIRE(rec.outputLog.empty());
|
||||
}
|
||||
|
||||
SECTION("Large Output") {
|
||||
const std::vector<std::string> BIG_FILES{
|
||||
"/usr/share/dict/linux.words", "/usr/share/dict/cracklib-small", "/etc/ssh/moduli"
|
||||
};
|
||||
|
||||
for (const auto &bigFile: BIG_FILES) {
|
||||
if (!std::filesystem::exists(bigFile)) continue;
|
||||
|
||||
daggy::Task task{.job{
|
||||
{"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/cat", bigFile}}}};
|
||||
task.job.merge(defaultJobValues);
|
||||
|
||||
auto rec = ex.execute("command", task);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
|
||||
REQUIRE(rec.errorLog.empty());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SECTION("Parameter Expansion") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
|
||||
auto params = daggy::configFromJSON(testParams);
|
||||
|
||||
std::string taskJSON = R"({"B": {"job": {"command": ["/usr/bin/echo", "{{DATE}}"]}, "children": ["C"]}})";
|
||||
auto tasks = daggy::tasksFromJSON(taskJSON, defaultJobValues);
|
||||
|
||||
auto result = daggy::expandTaskSet(tasks, ex, params);
|
||||
REQUIRE(result.size() == 2);
|
||||
}
|
||||
|
||||
SECTION("Build with expansion") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
|
||||
auto params = daggy::configFromJSON(testParams);
|
||||
std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["B"]}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})";
|
||||
auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks, defaultJobValues), ex, params);
|
||||
REQUIRE(tasks.size() == 4);
|
||||
}
|
||||
|
||||
SECTION("Build with expansion using parents instead of children") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
|
||||
auto params = daggy::configFromJSON(testParams);
|
||||
std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "parents": ["A"]}, "C": {"job": {"command": ["/bin/echo", "C"]}, "parents": ["A"]}})";
|
||||
auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks, defaultJobValues), ex, params);
|
||||
|
||||
REQUIRE(tasks.size() == 4);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") {
|
||||
TEST_CASE("parameter_deserialization", "[deserialize_parameters]") {
|
||||
SECTION("Basic Parse") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
|
||||
auto params = daggy::configFromJSON(testParams);
|
||||
@@ -27,7 +27,7 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Task Deserialization", "[deserialize_task]") {
|
||||
TEST_CASE("task_deserialization", "[deserialize_task]") {
|
||||
SECTION("Build with no expansion") {
|
||||
std::string testTasks = R"({
|
||||
"A": {
|
||||
@@ -70,7 +70,7 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Task Serialization", "[serialize_tasks]") {
|
||||
TEST_CASE("task_serialization", "[serialize_tasks]") {
|
||||
SECTION("Build with no expansion") {
|
||||
std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["C"]}, "B": {"job": {"command": ["/bin/echo", "B"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})";
|
||||
auto tasks = daggy::tasksFromJSON(testTasks);
|
||||
|
||||
@@ -50,7 +50,7 @@ REQUEST(std::string url, std::string payload = "") {
|
||||
return response;
|
||||
}
|
||||
|
||||
TEST_CASE("Server Basic Endpoints", "[server_basic]") {
|
||||
TEST_CASE("rest_endpoint", "[server_basic]") {
|
||||
std::stringstream ss;
|
||||
daggy::executors::task::ForkingTaskExecutor executor(10);
|
||||
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
|
||||
using namespace daggy;
|
||||
|
||||
TEST_CASE("Threadpool Construction", "[threadpool]") {
|
||||
TEST_CASE("threadpool", "[threadpool]") {
|
||||
std::atomic<uint32_t> cnt(0);
|
||||
ThreadPool tp(10);
|
||||
|
||||
@@ -22,7 +22,7 @@ TEST_CASE("Threadpool Construction", "[threadpool]") {
|
||||
return cnt.load();
|
||||
})));
|
||||
tp.addTasks(tq);
|
||||
for (auto &r : res) r.get();
|
||||
for (auto &r: res) r.get();
|
||||
REQUIRE(cnt == 100);
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ TEST_CASE("Threadpool Construction", "[threadpool]") {
|
||||
cnt++;
|
||||
return;
|
||||
}));
|
||||
for (auto &r : res) r.get();
|
||||
for (auto &r: res) r.get();
|
||||
REQUIRE(cnt == 100);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
TEST_CASE("String Utilities", "[utilities_string]") {
|
||||
TEST_CASE("string_utilities", "[utilities_string]") {
|
||||
std::string test = "/this/is/{{A}}/test/{{A}}";
|
||||
auto res = daggy::globalSub(test, "{{A}}", "hello");
|
||||
REQUIRE(res == "/this/is/hello/test/hello");
|
||||
}
|
||||
|
||||
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||
TEST_CASE("string_expansion", "[utilities_parameter_expansion]") {
|
||||
SECTION("Basic expansion") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
||||
auto params = daggy::configFromJSON(testParams);
|
||||
@@ -55,7 +55,7 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
||||
TEST_CASE("dag_runner", "[utilities_dag_runner]") {
|
||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||
std::stringstream ss;
|
||||
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
||||
|
||||
@@ -10,7 +10,13 @@
|
||||
#include <daggy/Server.hpp>
|
||||
|
||||
// Add executors here
|
||||
#ifdef DAGGY_ENABLE_SLURM
|
||||
|
||||
#include <daggy/executors/task/SlurmTaskExecutor.hpp>
|
||||
|
||||
#else
|
||||
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
|
||||
#endif
|
||||
|
||||
// Add loggers here
|
||||
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
|
||||
@@ -178,7 +184,11 @@ int main(int argc, char **argv) {
|
||||
logger = std::make_unique<daggy::loggers::dag_run::OStreamLogger>(logFH);
|
||||
}
|
||||
|
||||
#ifdef DAGGY_ENABLE_SLURM
|
||||
daggy::executors::task::SlurmTaskExecutor executor(executorThreads);
|
||||
#else
|
||||
daggy::executors::task::ForkingTaskExecutor executor(executorThreads);
|
||||
#endif
|
||||
Pistache::Address listenSpec(listenIP, listenPort);
|
||||
|
||||
daggy::Server server(listenSpec, *logger, executor, dagThreads);
|
||||
|
||||
Reference in New Issue
Block a user