Adding environment interpolation for noop, forking, and slurm executors
This commit is contained in:
@@ -135,8 +135,7 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
|
|||||||
close(stdoutPipe[0]);
|
close(stdoutPipe[0]);
|
||||||
close(stderrPipe[0]);
|
close(stderrPipe[0]);
|
||||||
char **env = (envp.empty() ? nullptr : envp.data());
|
char **env = (envp.empty() ? nullptr : envp.data());
|
||||||
auto res = execvpe(argv[0], argv.data(), env);
|
execvpe(argv[0], argv.data(), env);
|
||||||
std::cout << res << std::endl;
|
|
||||||
exit(errno);
|
exit(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -217,11 +216,23 @@ std::vector<daggy::ConfigValues> ForkingTaskExecutor::expandTaskParameters(
|
|||||||
{
|
{
|
||||||
std::vector<ConfigValues> newValues;
|
std::vector<ConfigValues> newValues;
|
||||||
|
|
||||||
const auto command = std::get<Command>(job.at("command"));
|
auto command =
|
||||||
for (const auto &expandedCommand :
|
(job.count("command") == 0 ? Command{}
|
||||||
interpolateValues(command, expansionValues)) {
|
: std::get<Command>(job.at("command")));
|
||||||
|
|
||||||
|
auto environment = (job.count("environment") == 0
|
||||||
|
? Command{}
|
||||||
|
: std::get<Command>(job.at("environment")));
|
||||||
|
|
||||||
|
Command both(command);
|
||||||
|
std::copy(environment.begin(), environment.end(), std::back_inserter(both));
|
||||||
|
|
||||||
|
for (const auto &parts : interpolateValues(both, expansionValues)) {
|
||||||
ConfigValues newCommand{job};
|
ConfigValues newCommand{job};
|
||||||
newCommand.at("command") = expandedCommand;
|
newCommand["command"] =
|
||||||
|
Command(parts.begin(), parts.begin() + command.size());
|
||||||
|
newCommand["environment"] =
|
||||||
|
Command(parts.begin() + command.size(), parts.end());
|
||||||
newValues.emplace_back(newCommand);
|
newValues.emplace_back(newCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,12 +31,23 @@ namespace daggy::executors::task {
|
|||||||
const ConfigValues &job, const ConfigValues &expansionValues)
|
const ConfigValues &job, const ConfigValues &expansionValues)
|
||||||
{
|
{
|
||||||
std::vector<ConfigValues> newValues;
|
std::vector<ConfigValues> newValues;
|
||||||
|
auto command =
|
||||||
|
(job.count("command") == 0 ? Command{}
|
||||||
|
: std::get<Command>(job.at("command")));
|
||||||
|
|
||||||
const auto command = std::get<Command>(job.at("command"));
|
auto environment = (job.count("environment") == 0
|
||||||
for (const auto &expandedCommand :
|
? Command{}
|
||||||
interpolateValues(command, expansionValues)) {
|
: std::get<Command>(job.at("environment")));
|
||||||
|
|
||||||
|
Command both(command);
|
||||||
|
std::copy(environment.begin(), environment.end(), std::back_inserter(both));
|
||||||
|
|
||||||
|
for (const auto &parts : interpolateValues(both, expansionValues)) {
|
||||||
ConfigValues newCommand{job};
|
ConfigValues newCommand{job};
|
||||||
newCommand.at("command") = expandedCommand;
|
newCommand["command"] =
|
||||||
|
Command(parts.begin(), parts.begin() + command.size());
|
||||||
|
newCommand["environment"] =
|
||||||
|
Command(parts.begin() + command.size(), parts.end());
|
||||||
newValues.emplace_back(newCommand);
|
newValues.emplace_back(newCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -125,11 +125,23 @@ namespace daggy::executors::task {
|
|||||||
{
|
{
|
||||||
std::vector<ConfigValues> newValues;
|
std::vector<ConfigValues> newValues;
|
||||||
|
|
||||||
const auto command = std::get<Command>(job.at("command"));
|
auto command =
|
||||||
for (const auto &expandedCommand :
|
(job.count("command") == 0 ? Command{}
|
||||||
interpolateValues(command, expansionValues)) {
|
: std::get<Command>(job.at("command")));
|
||||||
|
|
||||||
|
auto environment = (job.count("environment") == 0
|
||||||
|
? Command{}
|
||||||
|
: std::get<Command>(job.at("environment")));
|
||||||
|
|
||||||
|
Command both(command);
|
||||||
|
std::copy(environment.begin(), environment.end(), std::back_inserter(both));
|
||||||
|
|
||||||
|
for (const auto &parts : interpolateValues(both, expansionValues)) {
|
||||||
ConfigValues newCommand{job};
|
ConfigValues newCommand{job};
|
||||||
newCommand.at("command") = expandedCommand;
|
newCommand["command"] =
|
||||||
|
Command(parts.begin(), parts.begin() + command.size());
|
||||||
|
newCommand["environment"] =
|
||||||
|
Command(parts.begin() + command.size(), parts.end());
|
||||||
newValues.emplace_back(newCommand);
|
newValues.emplace_back(newCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ add_executable(${PROJECT_NAME} main.cpp
|
|||||||
unit_dagrun_loggers.cpp
|
unit_dagrun_loggers.cpp
|
||||||
unit_executor_forkingexecutor.cpp
|
unit_executor_forkingexecutor.cpp
|
||||||
unit_executor_slurmexecutor.cpp
|
unit_executor_slurmexecutor.cpp
|
||||||
|
unit_executor_noopexecutor.cpp
|
||||||
unit_serialization.cpp
|
unit_serialization.cpp
|
||||||
unit_threadpool.cpp
|
unit_threadpool.cpp
|
||||||
unit_utilities.cpp
|
unit_utilities.cpp
|
||||||
|
|||||||
43
libdaggy/tests/unit_executor_noopexecutor.cpp
Normal file
43
libdaggy/tests/unit_executor_noopexecutor.cpp
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
#include <catch2/catch.hpp>
|
||||||
|
#include <filesystem>
|
||||||
|
#include <fstream>
|
||||||
|
#include <iostream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
#include "daggy/Serialization.hpp"
|
||||||
|
#include "daggy/Utilities.hpp"
|
||||||
|
#include "daggy/executors/task/NoopTaskExecutor.hpp"
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
|
TEST_CASE("noop_executor", "[noop_executor]")
|
||||||
|
{
|
||||||
|
daggy::executors::task::NoopTaskExecutor ex;
|
||||||
|
|
||||||
|
SECTION("Simple Run")
|
||||||
|
{
|
||||||
|
daggy::Task task{
|
||||||
|
.job{{"command", daggy::executors::task::NoopTaskExecutor::Command{
|
||||||
|
"/usr/bin/echo", "abc", "123"}}}};
|
||||||
|
|
||||||
|
REQUIRE(ex.validateTaskParameters(task.job));
|
||||||
|
|
||||||
|
auto recFuture = ex.execute(0, "command", task);
|
||||||
|
auto rec = recFuture.get();
|
||||||
|
|
||||||
|
REQUIRE(rec.rc == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("Expansion with environment")
|
||||||
|
{
|
||||||
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
|
||||||
|
auto params = daggy::configFromJSON(testParams);
|
||||||
|
|
||||||
|
std::string taskJSON =
|
||||||
|
R"({"B": {"job": {"command": ["/usr/bin/echo", "{{DATE}}"], "environment": [ "TEST={{DATE}}"] }, "children": ["C"]}})";
|
||||||
|
auto tasks = daggy::tasksFromJSON(taskJSON);
|
||||||
|
|
||||||
|
auto result = daggy::expandTaskSet(tasks, ex, params);
|
||||||
|
REQUIRE(result.size() == 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,7 +13,7 @@ namespace fs = std::filesystem;
|
|||||||
|
|
||||||
#ifdef DAGGY_ENABLE_SLURM
|
#ifdef DAGGY_ENABLE_SLURM
|
||||||
|
|
||||||
TEST_CASE("slurm environment", "[slurm_env]")
|
TEST_CASE("slurm environment", "[slurm][slurm_env]")
|
||||||
{
|
{
|
||||||
daggy::executors::task::SlurmTaskExecutor ex;
|
daggy::executors::task::SlurmTaskExecutor ex;
|
||||||
|
|
||||||
@@ -27,7 +27,7 @@ TEST_CASE("slurm environment", "[slurm_env]")
|
|||||||
{"tmpDir", fs::current_path().string()}};
|
{"tmpDir", fs::current_path().string()}};
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("slurm_execution", "[slurm_executor]")
|
TEST_CASE("slurm_execution", "[slurm][slurm_executor]")
|
||||||
{
|
{
|
||||||
daggy::executors::task::SlurmTaskExecutor ex;
|
daggy::executors::task::SlurmTaskExecutor ex;
|
||||||
|
|
||||||
@@ -65,23 +65,36 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
|
|||||||
if (fs::exists(scriptFile))
|
if (fs::exists(scriptFile))
|
||||||
fs::remove_all(scriptFile);
|
fs::remove_all(scriptFile);
|
||||||
|
|
||||||
|
std::string valOne = "funky_times";
|
||||||
|
std::string valTwo = "bleep_bloop";
|
||||||
|
std::string valThree = "SOME-DATE";
|
||||||
|
|
||||||
|
std::string testParams{R"({"DATE": ")" + valThree + R"("})"};
|
||||||
|
auto params = daggy::configFromJSON(testParams);
|
||||||
|
|
||||||
std::ofstream ofh(scriptFile);
|
std::ofstream ofh(scriptFile);
|
||||||
ofh << "#!/bin/bash\necho \"${DAGGY_TEST_VAR}\"\necho "
|
ofh << R"(#!/bin/bash
|
||||||
"\"${DAGGY_TEST_VAR2}\"\n";
|
echo "${DAGGY_TEST_VAR}"
|
||||||
|
echo "${DAGGY_TEST_VAR2}"
|
||||||
|
echo "${DATE}"
|
||||||
|
)";
|
||||||
ofh.close();
|
ofh.close();
|
||||||
fs::permissions(scriptFile, fs::perms::owner_all,
|
fs::permissions(scriptFile, fs::perms::owner_all,
|
||||||
fs::perm_options::replace);
|
fs::perm_options::replace);
|
||||||
|
|
||||||
std::string valOne = "funky_times";
|
daggy::Task rawTask{.job{
|
||||||
std::string valTwo = "bleep_bloop";
|
{"command",
|
||||||
|
daggy::executors::task::SlurmTaskExecutor::Command{
|
||||||
|
scriptFile.string()}},
|
||||||
|
{"environment", std::vector<std::string>{"DAGGY_TEST_VAR=" + valOne,
|
||||||
|
"DAGGY_TEST_VAR2=" + valTwo,
|
||||||
|
"DATE={{DATE}}"}}}};
|
||||||
|
rawTask.job.merge(defaultJobValues);
|
||||||
|
|
||||||
daggy::Task task{.job{{"command",
|
auto tasks =
|
||||||
daggy::executors::task::SlurmTaskExecutor::Command{
|
daggy::expandTaskSet(daggy::TaskSet{{"cmd", rawTask}}, ex, params);
|
||||||
scriptFile.string()}},
|
REQUIRE(tasks.size() == 1);
|
||||||
{"environment", std::vector<std::string>{
|
auto task = tasks.at("cmd_0");
|
||||||
"DAGGY_TEST_VAR=" + valOne,
|
|
||||||
"DAGGY_TEST_VAR2=" + valTwo}}}};
|
|
||||||
task.job.merge(defaultJobValues);
|
|
||||||
|
|
||||||
REQUIRE(ex.validateTaskParameters(task.job));
|
REQUIRE(ex.validateTaskParameters(task.job));
|
||||||
|
|
||||||
@@ -92,6 +105,7 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
|
|||||||
REQUIRE(rec.outputLog.size() >= 6);
|
REQUIRE(rec.outputLog.size() >= 6);
|
||||||
REQUIRE(rec.outputLog.find(valOne) != std::string::npos);
|
REQUIRE(rec.outputLog.find(valOne) != std::string::npos);
|
||||||
REQUIRE(rec.outputLog.find(valTwo) != std::string::npos);
|
REQUIRE(rec.outputLog.find(valTwo) != std::string::npos);
|
||||||
|
REQUIRE(rec.outputLog.find(valThree) != std::string::npos);
|
||||||
REQUIRE(rec.errorLog.empty());
|
REQUIRE(rec.errorLog.empty());
|
||||||
|
|
||||||
if (fs::exists(scriptFile))
|
if (fs::exists(scriptFile))
|
||||||
|
|||||||
Reference in New Issue
Block a user