diff --git a/README.md b/README.md index fa0377b..3a2d128 100644 --- a/README.md +++ b/README.md @@ -344,7 +344,9 @@ and capture output. | Field | Sample | Description | |---------|--------|--------------| -| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host | +| command | `[ "/usr/bin/echo", "param1" ]` | The command to run | +| commandString | `"/usr/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. | +| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script | Slurm Executor (SlurmTaskExecutor) ---------------------------------- @@ -358,6 +360,8 @@ Required `job` config values: | Field | Sample | Description | |---------|--------|--------------| | command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host | +| commandString | `"/usr/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. | +| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script | | minCPUs | `"1"` | Minimum number of CPUs required | | minMemoryMB | `"1"` | Minimum memory required, in MB | | minTmpDiskMB | `"1"` | Minimum temporary disk required, in MB | diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index 8f1a5a0..0fcdd8c 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -5,6 +5,7 @@ #include #include +#include using namespace daggy::executors::task; @@ -82,12 +83,37 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, // Need to convert the strings std::vector argv; - const auto command = std::get(task.job.at("command")); + std::vector envp; + + // Populate the command + Command command; + if (task.job.count("commandString")) { + std::stringstream ss; + ss << std::get(task.job.at("commandString")); + std::string tok; + while (ss >> std::quoted(tok)) { + command.push_back(tok); + } + } + else { + const auto cmd = std::get(task.job.at("command")); + std::copy(cmd.begin(), cmd.end(), std::back_inserter(command)); + } std::transform( command.begin(), command.end(), std::back_inserter(argv), [](const std::string &s) { return const_cast(s.c_str()); }); argv.push_back(nullptr); + // Populate the environment + auto it = task.job.find("environment"); + if (it != task.job.end()) { + const auto environment = std::get(task.job.at("environment")); + std::transform( + environment.begin(), environment.end(), std::back_inserter(envp), + [](const std::string &s) { return const_cast(s.c_str()); }); + envp.push_back(nullptr); + } + // Create the pipe int stdoutPipe[2]; int pipeRC = pipe2(stdoutPipe, O_DIRECT); @@ -109,7 +135,7 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, } close(stdoutPipe[0]); close(stderrPipe[0]); - execvp(argv[0], argv.data()); + execvpe(argv[0], argv.data(), envp.data()); exit(-1); } @@ -164,12 +190,24 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job) { - auto it = job.find("command"); - if (it == job.end()) - throw std::runtime_error(R"(job does not have a "command" argument)"); - if (!std::holds_alternative(it->second)) - throw std::runtime_error( - R"(taskParameter's "command" must be an array of strings)"); + // command or commandString is required + if (job.count("command")) { + if (!std::holds_alternative(job.at("command"))) + throw std::runtime_error(R"(command must be an array of strings)"); + } + else { + if (job.count("commandString") == 0) { + throw std::runtime_error(R"(command or commandString must be defined.)"); + } + if (!std::holds_alternative(job.at("commandString"))) + throw std::runtime_error(R"(commandString must be a string)"); + } + + if (job.count("environment")) { + if (!std::holds_alternative(job.at("environment"))) + throw std::runtime_error(R"(environment must be an array of strings)"); + } + return true; } diff --git a/daggy/src/executors/task/SlurmTaskExecutor.cpp b/daggy/src/executors/task/SlurmTaskExecutor.cpp index fb07635..5a0aa9d 100644 --- a/daggy/src/executors/task/SlurmTaskExecutor.cpp +++ b/daggy/src/executors/task/SlurmTaskExecutor.cpp @@ -3,6 +3,7 @@ #include #ifdef DAGGY_ENABLE_SLURM #include +#include #include #include #include @@ -81,22 +82,40 @@ namespace daggy::executors::task { { running_ = false; monitorWorker_.join(); + + // Resolve the remaining futures + std::lock_guard lock(promiseGuard_); + for (auto &[jobID, job] : runningJobs_) { + job.prom.set_value( + AttemptRecord{.rc = -1, .executorLog = "executor killed"}); + } + runningJobs_.clear(); } - // Validates the job to ensure that all required values are set and are of the - // right type, + // Validates the job to ensure that all required values are set and are of + // the right type, bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job) { const std::unordered_set requiredFields{ - "minCPUs", "minMemoryMB", "minTmpDiskMB", - "priority", "timeLimitSeconds", "userID", - "workDir", "tmpDir", "command"}; + "minCPUs", "minMemoryMB", "minTmpDiskMB", "priority", + "timeLimitSeconds", "userID", "workDir", "tmpDir"}; for (const auto &requiredField : requiredFields) { if (job.count(requiredField) == 0) { throw std::runtime_error("Missing field " + requiredField); } } + + // Require command or commandString + if (job.count("command") + job.count("commandString") == 0) + throw std::runtime_error( + "Either command or commandString must be specified"); + + if (job.count("environment")) { + if (!std::holds_alternative(job.at("environment"))) + throw std::runtime_error(R"(environment must be an array of strings)"); + } + return true; } @@ -131,15 +150,36 @@ namespace daggy::executors::task { // Convert command to argc / argv std::vector argv{nullptr}; - const auto command = - std::get>(task.job.at("command")); + // Populate the command + Command command; + if (task.job.count("commandString")) { + std::stringstream ss; + ss << std::get(task.job.at("commandString")); + std::string tok; + while (ss >> std::quoted(tok)) { + command.push_back(tok); + } + } + else { + const auto cmd = std::get(task.job.at("command")); + std::copy(cmd.begin(), cmd.end(), std::back_inserter(command)); + } std::transform( command.begin(), command.end(), std::back_inserter(argv), [](const std::string &s) { return const_cast(s.c_str()); }); + argv.push_back(nullptr); - char empty[] = ""; - char *env[1]; - env[0] = empty; + std::vector env{""}; + std::vector envp; + auto it = task.job.find("environment"); + if (it != task.job.end()) { + const auto environment = std::get(task.job.at("environment")); + std::copy(environment.begin(), environment.end(), + std::back_inserter(env)); + } + std::transform( + env.begin(), env.end(), std::back_inserter(envp), + [](const std::string &s) { return const_cast(s.c_str()); }); char script[] = "#!/bin/bash\n$@\n"; char stdinFile[] = "/dev/null"; @@ -166,20 +206,16 @@ namespace daggy::executors::task { jd.argv = argv.data(); jd.argc = argv.size(); // TODO figure out the script to run - jd.script = script; - jd.std_in = stdinFile; - jd.std_err = const_cast(stderrFile.c_str()); - jd.std_out = const_cast(stdoutFile.c_str()); - jd.work_dir = const_cast(workDir.c_str()); - jd.env_size = 1; - jd.environment = env; + jd.script = script; + jd.std_in = stdinFile; + jd.std_err = const_cast(stderrFile.c_str()); + jd.std_out = const_cast(stdoutFile.c_str()); + jd.work_dir = const_cast(workDir.c_str()); - /* TODO: Add support for environment - jobDescription.env_size = 2; - env[0] = "SLURM_ENV_0=looking_good"; - env[1] = "SLURM_ENV_1=still_good"; - jobDescription.environment = env; - */ + // jd.env_size = 1; + // jd.environment = env; + jd.env_size = envp.size(); + jd.environment = envp.data(); error_code = slurm_submit_batch_job(&jd, &resp_msg); if (error_code) { @@ -252,7 +288,9 @@ namespace daggy::executors::task { continue; // Job has finished case JOB_COMPLETE: /* completed execution successfully */ - case JOB_FAILED: /* completed execution unsuccessfully */ + record.rc = jobInfo.exit_code; + break; + case JOB_FAILED: /* completed execution unsuccessfully */ record.rc = jobInfo.exit_code; record.executorLog = "Script errored.\n"; break; diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index 276ca47..7aa7f0a 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -7,6 +8,8 @@ #include "daggy/Utilities.hpp" #include "daggy/executors/task/ForkingTaskExecutor.hpp" +namespace fs = std::filesystem; + TEST_CASE("forking_executor", "[forking_executor]") { daggy::executors::task::ForkingTaskExecutor ex(10); @@ -27,6 +30,59 @@ TEST_CASE("forking_executor", "[forking_executor]") REQUIRE(rec.errorLog.empty()); } + SECTION("Simple Run using commandString") + { + daggy::Task task{.job{{"commandString", R"(/usr/bin/echo "abc 123")"}}}; + + REQUIRE(ex.validateTaskParameters(task.job)); + + auto recFuture = ex.execute(0, "command", task); + auto rec = recFuture.get(); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.outputLog.size() >= 6); + REQUIRE(rec.errorLog.empty()); + } + + SECTION("Simple run with environment") + { + // Create the shell script + auto scriptFile = fs::current_path() / "fork_simple.sh"; + + if (fs::exists(scriptFile)) + fs::remove_all(scriptFile); + + std::ofstream ofh(scriptFile); + ofh << "#!/bin/bash\necho \"${DAGGY_TEST_VAR}\"\necho " + "\"${DAGGY_TEST_VAR2}\"\n"; + ofh.close(); + fs::permissions(scriptFile, fs::perms::owner_all, + fs::perm_options::replace); + + std::string valOne = "funky_times"; + std::string valTwo = "bleep_bloop"; + daggy::Task task{.job{{"command", + daggy::executors::task::ForkingTaskExecutor::Command{ + scriptFile.string()}}, + {"environment", std::vector{ + "DAGGY_TEST_VAR=" + valOne, + "DAGGY_TEST_VAR2=" + valTwo}}}}; + + REQUIRE(ex.validateTaskParameters(task.job)); + + auto recFuture = ex.execute(0, "command", task); + auto rec = recFuture.get(); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.outputLog.size() >= 6); + REQUIRE(rec.outputLog.find(valOne) != std::string::npos); + REQUIRE(rec.outputLog.find(valTwo) != std::string::npos); + REQUIRE(rec.errorLog.empty()); + + if (fs::exists(scriptFile)) + fs::remove_all(scriptFile); + } + SECTION("Error Run") { daggy::Task task{ diff --git a/tests/unit_executor_slurmexecutor.cpp b/tests/unit_executor_slurmexecutor.cpp index 3dc8778..cbde965 100644 --- a/tests/unit_executor_slurmexecutor.cpp +++ b/tests/unit_executor_slurmexecutor.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include "daggy/Serialization.hpp" @@ -12,6 +13,20 @@ namespace fs = std::filesystem; #ifdef DAGGY_ENABLE_SLURM +TEST_CASE("slurm environment", "[slurm_env]") +{ + daggy::executors::task::SlurmTaskExecutor ex; + + daggy::ConfigValues defaultJobValues{{"minCPUs", "1"}, + {"minMemoryMB", "100"}, + {"minTmpDiskMB", "0"}, + {"priority", "1"}, + {"timeLimitSeconds", "200"}, + {"userID", std::to_string(getuid())}, + {"workDir", fs::current_path().string()}, + {"tmpDir", fs::current_path().string()}}; +} + TEST_CASE("slurm_execution", "[slurm_executor]") { daggy::executors::task::SlurmTaskExecutor ex; @@ -42,6 +57,62 @@ TEST_CASE("slurm_execution", "[slurm_executor]") REQUIRE(rec.errorLog.empty()); } + SECTION("Simple run with environment") + { + // Create the shell script + auto scriptFile = fs::current_path() / "slurm_simple_env.sh"; + + if (fs::exists(scriptFile)) + fs::remove_all(scriptFile); + + std::ofstream ofh(scriptFile); + ofh << "#!/bin/bash\necho \"${DAGGY_TEST_VAR}\"\necho " + "\"${DAGGY_TEST_VAR2}\"\n"; + ofh.close(); + fs::permissions(scriptFile, fs::perms::owner_all, + fs::perm_options::replace); + + std::string valOne = "funky_times"; + std::string valTwo = "bleep_bloop"; + + daggy::Task task{.job{{"command", + daggy::executors::task::SlurmTaskExecutor::Command{ + scriptFile.string()}}, + {"environment", std::vector{ + "DAGGY_TEST_VAR=" + valOne, + "DAGGY_TEST_VAR2=" + valTwo}}}}; + task.job.merge(defaultJobValues); + + REQUIRE(ex.validateTaskParameters(task.job)); + + auto recFuture = ex.execute(0, "command", task); + auto rec = recFuture.get(); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.outputLog.size() >= 6); + REQUIRE(rec.outputLog.find(valOne) != std::string::npos); + REQUIRE(rec.outputLog.find(valTwo) != std::string::npos); + REQUIRE(rec.errorLog.empty()); + + if (fs::exists(scriptFile)) + fs::remove_all(scriptFile); + } + + SECTION("Simple Run using commandString") + { + daggy::Task task{.job{{"commandString", R"(/usr/bin/echo "abc 123")"}}}; + task.job.merge(defaultJobValues); + + REQUIRE(ex.validateTaskParameters(task.job)); + + auto recFuture = ex.execute(0, "command", task); + auto rec = recFuture.get(); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.outputLog.size() >= 6); + REQUIRE(rec.errorLog.empty()); + } + SECTION("Error Run") { daggy::Task task{