From 14d0ef4a3f9bc18d984cfa227015fdf597843088 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Sat, 13 Nov 2021 12:09:51 -0400 Subject: [PATCH] Adding environment interpolation for noop, forking, and slurm executors --- .../executors/task/ForkingTaskExecutor.cpp | 23 +++++++--- .../src/executors/task/NoopTaskExecutor.cpp | 19 ++++++-- .../src/executors/task/SlurmTaskExecutor.cpp | 20 +++++++-- libdaggy/tests/CMakeLists.txt | 1 + libdaggy/tests/unit_executor_noopexecutor.cpp | 43 +++++++++++++++++++ .../tests/unit_executor_slurmexecutor.cpp | 40 +++++++++++------ 6 files changed, 119 insertions(+), 27 deletions(-) create mode 100644 libdaggy/tests/unit_executor_noopexecutor.cpp diff --git a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp index 5ec27bf..359d929 100644 --- a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp @@ -135,8 +135,7 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, close(stdoutPipe[0]); close(stderrPipe[0]); char **env = (envp.empty() ? nullptr : envp.data()); - auto res = execvpe(argv[0], argv.data(), env); - std::cout << res << std::endl; + execvpe(argv[0], argv.data(), env); exit(errno); } @@ -217,11 +216,23 @@ std::vector ForkingTaskExecutor::expandTaskParameters( { std::vector newValues; - const auto command = std::get(job.at("command")); - for (const auto &expandedCommand : - interpolateValues(command, expansionValues)) { + auto command = + (job.count("command") == 0 ? Command{} + : std::get(job.at("command"))); + + auto environment = (job.count("environment") == 0 + ? Command{} + : std::get(job.at("environment"))); + + Command both(command); + std::copy(environment.begin(), environment.end(), std::back_inserter(both)); + + for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; - newCommand.at("command") = expandedCommand; + newCommand["command"] = + Command(parts.begin(), parts.begin() + command.size()); + newCommand["environment"] = + Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } diff --git a/libdaggy/src/executors/task/NoopTaskExecutor.cpp b/libdaggy/src/executors/task/NoopTaskExecutor.cpp index 9e875f4..5d9d434 100644 --- a/libdaggy/src/executors/task/NoopTaskExecutor.cpp +++ b/libdaggy/src/executors/task/NoopTaskExecutor.cpp @@ -31,12 +31,23 @@ namespace daggy::executors::task { const ConfigValues &job, const ConfigValues &expansionValues) { std::vector newValues; + auto command = + (job.count("command") == 0 ? Command{} + : std::get(job.at("command"))); - const auto command = std::get(job.at("command")); - for (const auto &expandedCommand : - interpolateValues(command, expansionValues)) { + auto environment = (job.count("environment") == 0 + ? Command{} + : std::get(job.at("environment"))); + + Command both(command); + std::copy(environment.begin(), environment.end(), std::back_inserter(both)); + + for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; - newCommand.at("command") = expandedCommand; + newCommand["command"] = + Command(parts.begin(), parts.begin() + command.size()); + newCommand["environment"] = + Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } diff --git a/libdaggy/src/executors/task/SlurmTaskExecutor.cpp b/libdaggy/src/executors/task/SlurmTaskExecutor.cpp index c0fad0b..26b44a5 100644 --- a/libdaggy/src/executors/task/SlurmTaskExecutor.cpp +++ b/libdaggy/src/executors/task/SlurmTaskExecutor.cpp @@ -125,11 +125,23 @@ namespace daggy::executors::task { { std::vector newValues; - const auto command = std::get(job.at("command")); - for (const auto &expandedCommand : - interpolateValues(command, expansionValues)) { + auto command = + (job.count("command") == 0 ? Command{} + : std::get(job.at("command"))); + + auto environment = (job.count("environment") == 0 + ? Command{} + : std::get(job.at("environment"))); + + Command both(command); + std::copy(environment.begin(), environment.end(), std::back_inserter(both)); + + for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; - newCommand.at("command") = expandedCommand; + newCommand["command"] = + Command(parts.begin(), parts.begin() + command.size()); + newCommand["environment"] = + Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } diff --git a/libdaggy/tests/CMakeLists.txt b/libdaggy/tests/CMakeLists.txt index e4890c9..23c9a2d 100644 --- a/libdaggy/tests/CMakeLists.txt +++ b/libdaggy/tests/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(${PROJECT_NAME} main.cpp unit_dagrun_loggers.cpp unit_executor_forkingexecutor.cpp unit_executor_slurmexecutor.cpp + unit_executor_noopexecutor.cpp unit_serialization.cpp unit_threadpool.cpp unit_utilities.cpp diff --git a/libdaggy/tests/unit_executor_noopexecutor.cpp b/libdaggy/tests/unit_executor_noopexecutor.cpp new file mode 100644 index 0000000..220fb08 --- /dev/null +++ b/libdaggy/tests/unit_executor_noopexecutor.cpp @@ -0,0 +1,43 @@ +#include +#include +#include +#include +#include + +#include "daggy/Serialization.hpp" +#include "daggy/Utilities.hpp" +#include "daggy/executors/task/NoopTaskExecutor.hpp" + +namespace fs = std::filesystem; + +TEST_CASE("noop_executor", "[noop_executor]") +{ + daggy::executors::task::NoopTaskExecutor ex; + + SECTION("Simple Run") + { + daggy::Task task{ + .job{{"command", daggy::executors::task::NoopTaskExecutor::Command{ + "/usr/bin/echo", "abc", "123"}}}}; + + REQUIRE(ex.validateTaskParameters(task.job)); + + auto recFuture = ex.execute(0, "command", task); + auto rec = recFuture.get(); + + REQUIRE(rec.rc == 0); + } + + SECTION("Expansion with environment") + { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"}; + auto params = daggy::configFromJSON(testParams); + + std::string taskJSON = + R"({"B": {"job": {"command": ["/usr/bin/echo", "{{DATE}}"], "environment": [ "TEST={{DATE}}"] }, "children": ["C"]}})"; + auto tasks = daggy::tasksFromJSON(taskJSON); + + auto result = daggy::expandTaskSet(tasks, ex, params); + REQUIRE(result.size() == 2); + } +} diff --git a/libdaggy/tests/unit_executor_slurmexecutor.cpp b/libdaggy/tests/unit_executor_slurmexecutor.cpp index cbde965..09e567a 100644 --- a/libdaggy/tests/unit_executor_slurmexecutor.cpp +++ b/libdaggy/tests/unit_executor_slurmexecutor.cpp @@ -13,7 +13,7 @@ namespace fs = std::filesystem; #ifdef DAGGY_ENABLE_SLURM -TEST_CASE("slurm environment", "[slurm_env]") +TEST_CASE("slurm environment", "[slurm][slurm_env]") { daggy::executors::task::SlurmTaskExecutor ex; @@ -27,7 +27,7 @@ TEST_CASE("slurm environment", "[slurm_env]") {"tmpDir", fs::current_path().string()}}; } -TEST_CASE("slurm_execution", "[slurm_executor]") +TEST_CASE("slurm_execution", "[slurm][slurm_executor]") { daggy::executors::task::SlurmTaskExecutor ex; @@ -65,23 +65,36 @@ TEST_CASE("slurm_execution", "[slurm_executor]") if (fs::exists(scriptFile)) fs::remove_all(scriptFile); + std::string valOne = "funky_times"; + std::string valTwo = "bleep_bloop"; + std::string valThree = "SOME-DATE"; + + std::string testParams{R"({"DATE": ")" + valThree + R"("})"}; + auto params = daggy::configFromJSON(testParams); + std::ofstream ofh(scriptFile); - ofh << "#!/bin/bash\necho \"${DAGGY_TEST_VAR}\"\necho " - "\"${DAGGY_TEST_VAR2}\"\n"; + ofh << R"(#!/bin/bash + echo "${DAGGY_TEST_VAR}" + echo "${DAGGY_TEST_VAR2}" + echo "${DATE}" + )"; ofh.close(); fs::permissions(scriptFile, fs::perms::owner_all, fs::perm_options::replace); - std::string valOne = "funky_times"; - std::string valTwo = "bleep_bloop"; + daggy::Task rawTask{.job{ + {"command", + daggy::executors::task::SlurmTaskExecutor::Command{ + scriptFile.string()}}, + {"environment", std::vector{"DAGGY_TEST_VAR=" + valOne, + "DAGGY_TEST_VAR2=" + valTwo, + "DATE={{DATE}}"}}}}; + rawTask.job.merge(defaultJobValues); - daggy::Task task{.job{{"command", - daggy::executors::task::SlurmTaskExecutor::Command{ - scriptFile.string()}}, - {"environment", std::vector{ - "DAGGY_TEST_VAR=" + valOne, - "DAGGY_TEST_VAR2=" + valTwo}}}}; - task.job.merge(defaultJobValues); + auto tasks = + daggy::expandTaskSet(daggy::TaskSet{{"cmd", rawTask}}, ex, params); + REQUIRE(tasks.size() == 1); + auto task = tasks.at("cmd_0"); REQUIRE(ex.validateTaskParameters(task.job)); @@ -92,6 +105,7 @@ TEST_CASE("slurm_execution", "[slurm_executor]") REQUIRE(rec.outputLog.size() >= 6); REQUIRE(rec.outputLog.find(valOne) != std::string::npos); REQUIRE(rec.outputLog.find(valTwo) != std::string::npos); + REQUIRE(rec.outputLog.find(valThree) != std::string::npos); REQUIRE(rec.errorLog.empty()); if (fs::exists(scriptFile))