Squashed commit of the following:

commit 8a4e0fb24d191bf1c1009bd4c8800b4adab21f81
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 17:23:21 2021 -0300

    Adding support for commandString

commit 9055cbde34d2489065b03c25c02a8bea56e42d54
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 17:10:01 2021 -0300

    Completing support for environment variables

commit 989adef378724bbc9451c5048ea9d1285eebe2f9
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 12:29:31 2021 -0300

    Adding environment support to ForkingTaskExecutor
This commit is contained in:
Ian Roddis
2021-10-05 17:26:30 -03:00
parent 65ab439848
commit cfefdae4f3
5 changed files with 240 additions and 33 deletions

View File

@@ -344,7 +344,9 @@ and capture output.
| Field | Sample | Description |
|---------|--------|--------------|
| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host |
| command | `[ "/usr/bin/echo", "param1" ]` | The command to run |
| commandString | `"/usr/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. |
| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script |
Slurm Executor (SlurmTaskExecutor)
----------------------------------
@@ -358,6 +360,8 @@ Required `job` config values:
| Field | Sample | Description |
|---------|--------|--------------|
| command | `[ "/usr/bin/echo", "param1" ]` | The command to run on a slurm host |
| commandString | `"/usr/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. |
| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script |
| minCPUs | `"1"` | Minimum number of CPUs required |
| minMemoryMB | `"1"` | Minimum memory required, in MB |
| minTmpDiskMB | `"1"` | Minimum temporary disk required, in MB |

View File

@@ -5,6 +5,7 @@
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <iomanip>
using namespace daggy::executors::task;
@@ -82,12 +83,37 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
// Need to convert the strings
std::vector<char *> argv;
const auto command = std::get<Command>(task.job.at("command"));
std::vector<char *> envp;
// Populate the command
Command command;
if (task.job.count("commandString")) {
std::stringstream ss;
ss << std::get<std::string>(task.job.at("commandString"));
std::string tok;
while (ss >> std::quoted(tok)) {
command.push_back(tok);
}
}
else {
const auto cmd = std::get<Command>(task.job.at("command"));
std::copy(cmd.begin(), cmd.end(), std::back_inserter(command));
}
std::transform(
command.begin(), command.end(), std::back_inserter(argv),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
argv.push_back(nullptr);
// Populate the environment
auto it = task.job.find("environment");
if (it != task.job.end()) {
const auto environment = std::get<Command>(task.job.at("environment"));
std::transform(
environment.begin(), environment.end(), std::back_inserter(envp),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
envp.push_back(nullptr);
}
// Create the pipe
int stdoutPipe[2];
int pipeRC = pipe2(stdoutPipe, O_DIRECT);
@@ -109,7 +135,7 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
}
close(stdoutPipe[0]);
close(stderrPipe[0]);
execvp(argv[0], argv.data());
execvpe(argv[0], argv.data(), envp.data());
exit(-1);
}
@@ -164,12 +190,24 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
auto it = job.find("command");
if (it == job.end())
throw std::runtime_error(R"(job does not have a "command" argument)");
if (!std::holds_alternative<Command>(it->second))
throw std::runtime_error(
R"(taskParameter's "command" must be an array of strings)");
// command or commandString is required
if (job.count("command")) {
if (!std::holds_alternative<Command>(job.at("command")))
throw std::runtime_error(R"(command must be an array of strings)");
}
else {
if (job.count("commandString") == 0) {
throw std::runtime_error(R"(command or commandString must be defined.)");
}
if (!std::holds_alternative<std::string>(job.at("commandString")))
throw std::runtime_error(R"(commandString must be a string)");
}
if (job.count("environment")) {
if (!std::holds_alternative<Command>(job.at("environment")))
throw std::runtime_error(R"(environment must be an array of strings)");
}
return true;
}

View File

@@ -3,6 +3,7 @@
#include <stdexcept>
#ifdef DAGGY_ENABLE_SLURM
#include <slurm/slurm.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/types.h>
@@ -81,22 +82,40 @@ namespace daggy::executors::task {
{
running_ = false;
monitorWorker_.join();
// Resolve the remaining futures
std::lock_guard<std::mutex> lock(promiseGuard_);
for (auto &[jobID, job] : runningJobs_) {
job.prom.set_value(
AttemptRecord{.rc = -1, .executorLog = "executor killed"});
}
runningJobs_.clear();
}
// Validates the job to ensure that all required values are set and are of the
// right type,
// Validates the job to ensure that all required values are set and are of
// the right type,
bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
const std::unordered_set<std::string> requiredFields{
"minCPUs", "minMemoryMB", "minTmpDiskMB",
"priority", "timeLimitSeconds", "userID",
"workDir", "tmpDir", "command"};
"minCPUs", "minMemoryMB", "minTmpDiskMB", "priority",
"timeLimitSeconds", "userID", "workDir", "tmpDir"};
for (const auto &requiredField : requiredFields) {
if (job.count(requiredField) == 0) {
throw std::runtime_error("Missing field " + requiredField);
}
}
// Require command or commandString
if (job.count("command") + job.count("commandString") == 0)
throw std::runtime_error(
"Either command or commandString must be specified");
if (job.count("environment")) {
if (!std::holds_alternative<Command>(job.at("environment")))
throw std::runtime_error(R"(environment must be an array of strings)");
}
return true;
}
@@ -131,15 +150,36 @@ namespace daggy::executors::task {
// Convert command to argc / argv
std::vector<char *> argv{nullptr};
const auto command =
std::get<std::vector<std::string>>(task.job.at("command"));
// Populate the command
Command command;
if (task.job.count("commandString")) {
std::stringstream ss;
ss << std::get<std::string>(task.job.at("commandString"));
std::string tok;
while (ss >> std::quoted(tok)) {
command.push_back(tok);
}
}
else {
const auto cmd = std::get<Command>(task.job.at("command"));
std::copy(cmd.begin(), cmd.end(), std::back_inserter(command));
}
std::transform(
command.begin(), command.end(), std::back_inserter(argv),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
argv.push_back(nullptr);
char empty[] = "";
char *env[1];
env[0] = empty;
std::vector<std::string> env{""};
std::vector<char *> envp;
auto it = task.job.find("environment");
if (it != task.job.end()) {
const auto environment = std::get<Command>(task.job.at("environment"));
std::copy(environment.begin(), environment.end(),
std::back_inserter(env));
}
std::transform(
env.begin(), env.end(), std::back_inserter(envp),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
char script[] = "#!/bin/bash\n$@\n";
char stdinFile[] = "/dev/null";
@@ -166,20 +206,16 @@ namespace daggy::executors::task {
jd.argv = argv.data();
jd.argc = argv.size();
// TODO figure out the script to run
jd.script = script;
jd.std_in = stdinFile;
jd.std_err = const_cast<char *>(stderrFile.c_str());
jd.std_out = const_cast<char *>(stdoutFile.c_str());
jd.work_dir = const_cast<char *>(workDir.c_str());
jd.env_size = 1;
jd.environment = env;
jd.script = script;
jd.std_in = stdinFile;
jd.std_err = const_cast<char *>(stderrFile.c_str());
jd.std_out = const_cast<char *>(stdoutFile.c_str());
jd.work_dir = const_cast<char *>(workDir.c_str());
/* TODO: Add support for environment
jobDescription.env_size = 2;
env[0] = "SLURM_ENV_0=looking_good";
env[1] = "SLURM_ENV_1=still_good";
jobDescription.environment = env;
*/
// jd.env_size = 1;
// jd.environment = env;
jd.env_size = envp.size();
jd.environment = envp.data();
error_code = slurm_submit_batch_job(&jd, &resp_msg);
if (error_code) {
@@ -252,7 +288,9 @@ namespace daggy::executors::task {
continue;
// Job has finished
case JOB_COMPLETE: /* completed execution successfully */
case JOB_FAILED: /* completed execution unsuccessfully */
record.rc = jobInfo.exit_code;
break;
case JOB_FAILED: /* completed execution unsuccessfully */
record.rc = jobInfo.exit_code;
record.executorLog = "Script errored.\n";
break;

View File

@@ -1,5 +1,6 @@
#include <catch2/catch.hpp>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <thread>
@@ -7,6 +8,8 @@
#include "daggy/Utilities.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
namespace fs = std::filesystem;
TEST_CASE("forking_executor", "[forking_executor]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
@@ -27,6 +30,59 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(rec.errorLog.empty());
}
SECTION("Simple Run using commandString")
{
daggy::Task task{.job{{"commandString", R"(/usr/bin/echo "abc 123")"}}};
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
REQUIRE(rec.errorLog.empty());
}
SECTION("Simple run with environment")
{
// Create the shell script
auto scriptFile = fs::current_path() / "fork_simple.sh";
if (fs::exists(scriptFile))
fs::remove_all(scriptFile);
std::ofstream ofh(scriptFile);
ofh << "#!/bin/bash\necho \"${DAGGY_TEST_VAR}\"\necho "
"\"${DAGGY_TEST_VAR2}\"\n";
ofh.close();
fs::permissions(scriptFile, fs::perms::owner_all,
fs::perm_options::replace);
std::string valOne = "funky_times";
std::string valTwo = "bleep_bloop";
daggy::Task task{.job{{"command",
daggy::executors::task::ForkingTaskExecutor::Command{
scriptFile.string()}},
{"environment", std::vector<std::string>{
"DAGGY_TEST_VAR=" + valOne,
"DAGGY_TEST_VAR2=" + valTwo}}}};
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
REQUIRE(rec.outputLog.find(valOne) != std::string::npos);
REQUIRE(rec.outputLog.find(valTwo) != std::string::npos);
REQUIRE(rec.errorLog.empty());
if (fs::exists(scriptFile))
fs::remove_all(scriptFile);
}
SECTION("Error Run")
{
daggy::Task task{

View File

@@ -2,6 +2,7 @@
#include <catch2/catch.hpp>
#include <filesystem>
#include <fstream>
#include <iostream>
#include "daggy/Serialization.hpp"
@@ -12,6 +13,20 @@ namespace fs = std::filesystem;
#ifdef DAGGY_ENABLE_SLURM
TEST_CASE("slurm environment", "[slurm_env]")
{
daggy::executors::task::SlurmTaskExecutor ex;
daggy::ConfigValues defaultJobValues{{"minCPUs", "1"},
{"minMemoryMB", "100"},
{"minTmpDiskMB", "0"},
{"priority", "1"},
{"timeLimitSeconds", "200"},
{"userID", std::to_string(getuid())},
{"workDir", fs::current_path().string()},
{"tmpDir", fs::current_path().string()}};
}
TEST_CASE("slurm_execution", "[slurm_executor]")
{
daggy::executors::task::SlurmTaskExecutor ex;
@@ -42,6 +57,62 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
REQUIRE(rec.errorLog.empty());
}
SECTION("Simple run with environment")
{
// Create the shell script
auto scriptFile = fs::current_path() / "slurm_simple_env.sh";
if (fs::exists(scriptFile))
fs::remove_all(scriptFile);
std::ofstream ofh(scriptFile);
ofh << "#!/bin/bash\necho \"${DAGGY_TEST_VAR}\"\necho "
"\"${DAGGY_TEST_VAR2}\"\n";
ofh.close();
fs::permissions(scriptFile, fs::perms::owner_all,
fs::perm_options::replace);
std::string valOne = "funky_times";
std::string valTwo = "bleep_bloop";
daggy::Task task{.job{{"command",
daggy::executors::task::SlurmTaskExecutor::Command{
scriptFile.string()}},
{"environment", std::vector<std::string>{
"DAGGY_TEST_VAR=" + valOne,
"DAGGY_TEST_VAR2=" + valTwo}}}};
task.job.merge(defaultJobValues);
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
REQUIRE(rec.outputLog.find(valOne) != std::string::npos);
REQUIRE(rec.outputLog.find(valTwo) != std::string::npos);
REQUIRE(rec.errorLog.empty());
if (fs::exists(scriptFile))
fs::remove_all(scriptFile);
}
SECTION("Simple Run using commandString")
{
daggy::Task task{.job{{"commandString", R"(/usr/bin/echo "abc 123")"}}};
task.job.merge(defaultJobValues);
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
REQUIRE(rec.errorLog.empty());
}
SECTION("Error Run")
{
daggy::Task task{