- Checkpointing work on expanding commands.
This commit is contained in:
@@ -14,6 +14,8 @@
|
|||||||
#include "DAG.hpp"
|
#include "DAG.hpp"
|
||||||
|
|
||||||
namespace daggy {
|
namespace daggy {
|
||||||
|
std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement);
|
||||||
|
|
||||||
std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues ¶meters);
|
std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues ¶meters);
|
||||||
|
|
||||||
DAG buildDAGFromTasks(const std::vector<Task> &tasks);
|
DAG buildDAGFromTasks(const std::vector<Task> &tasks);
|
||||||
|
|||||||
@@ -3,28 +3,47 @@
|
|||||||
#include <daggy/Utilities.hpp>
|
#include <daggy/Utilities.hpp>
|
||||||
|
|
||||||
namespace daggy {
|
namespace daggy {
|
||||||
|
std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) {
|
||||||
|
size_t pos = string.find(pattern);
|
||||||
|
while (pos != std::string::npos) {
|
||||||
|
string.replace(pos, pattern.size(), replacement);
|
||||||
|
pos = string.find(pattern);
|
||||||
|
}
|
||||||
|
return string;
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<std::vector<std::string>>
|
std::vector<std::vector<std::string>>
|
||||||
expandCommands(const std::vector<std::string> &command, const ParameterValues ¶meters) {
|
expandCommands(const std::vector<std::string> &command, const ParameterValues ¶meters) {
|
||||||
std::vector<std::vector<std::string>> commands{{}};
|
std::vector<std::vector<std::string>> commands{{}};
|
||||||
|
|
||||||
for (const auto &part : command) {
|
for (const auto &part : command) {
|
||||||
// this isn't an interpolated value
|
std::vector<std::string> expandedPart;
|
||||||
if (parameters.find(part) == parameters.end()) {
|
|
||||||
for (auto &cmd : commands) cmd.push_back(part);
|
for (const auto &[param, paramValue] : parameters) {
|
||||||
continue;
|
auto pos = part.find(param);
|
||||||
}
|
if (pos == std::string::npos) continue;
|
||||||
auto &inVal = parameters.at(part);
|
std::vector<std::string> newExpandedPart;
|
||||||
if (std::holds_alternative<std::string>(inVal)) {
|
|
||||||
for (auto &cmd : commands) cmd.push_back(std::get<std::string>(inVal));
|
if (std::holds_alternative<std::string>(paramValue)) {
|
||||||
continue;
|
for (auto &cmd : expandedPart) {
|
||||||
|
newExpandedPart.push_back(globalSub(cmd, param, std::get<std::string>(paramValue)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (const auto &val : std::get<std::vector<std::string>>(paramValue)) {
|
||||||
|
for (auto cmd : expandedPart) {
|
||||||
|
newExpandedPart.push_back(globalSub(cmd, param, val));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expandedPart.swap(newExpandedPart);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ends up being expensive, as it's a cartesian product
|
|
||||||
std::vector<std::vector<std::string>> newCommands;
|
std::vector<std::vector<std::string>> newCommands;
|
||||||
for (const auto &val : std::get<std::vector<std::string>>(inVal)) {
|
for (const auto &newPart : expandedPart) {
|
||||||
for (auto cmd : commands) {
|
for (auto cmd : commands) {
|
||||||
cmd.push_back(val);
|
cmd.push_back(newPart);
|
||||||
newCommands.push_back(cmd);
|
newCommands.emplace_back(cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
commands.swap(newCommands);
|
commands.swap(newCommands);
|
||||||
@@ -136,4 +155,4 @@ namespace daggy {
|
|||||||
os << std::put_time(std::localtime(&t_c), "%Y-%m-%d %H:%M:%S %Z");
|
os << std::put_time(std::localtime(&t_c), "%Y-%m-%d %H:%M:%S %Z");
|
||||||
return os;
|
return os;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,10 @@
|
|||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
|
||||||
|
#include <iomanip>
|
||||||
|
#include <algorithm>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
#include <catch2/catch.hpp>
|
#include <catch2/catch.hpp>
|
||||||
|
|
||||||
#include "daggy/Utilities.hpp"
|
#include "daggy/Utilities.hpp"
|
||||||
@@ -9,6 +13,12 @@
|
|||||||
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
||||||
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
|
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
|
||||||
|
|
||||||
|
TEST_CASE("String Utilities", "[utilities_string]") {
|
||||||
|
std::string test = "/this/is/{{A}}/test/{{A}}";
|
||||||
|
auto res = daggy::globalSub(test, "{{A}}", "hello");
|
||||||
|
REQUIRE(res == "/this/is/hello/test/hello");
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||||
SECTION("Basic expansion") {
|
SECTION("Basic expansion") {
|
||||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
||||||
@@ -28,6 +38,22 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
|||||||
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
||||||
REQUIRE(allCommands.size() == 2);
|
REQUIRE(allCommands.size() == 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SECTION("Expand within a command part") {
|
||||||
|
std::string testParams{
|
||||||
|
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": ["A", "B"], "TYPE": ["a", "b", "c"]})"};
|
||||||
|
auto params = daggy::parametersFromJSON(testParams);
|
||||||
|
std::vector<std::string> cmd{"/usr/bin/touch", "/tmp/{{DATE}}_{{SOURCE}}"};
|
||||||
|
auto result = daggy::expandCommands(cmd, params);
|
||||||
|
|
||||||
|
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
||||||
|
REQUIRE(result.size() == 4);
|
||||||
|
|
||||||
|
for (const auto &command : result) {
|
||||||
|
std::copy(command.begin(), command.end(), std::ostream_iterator<std::string>(std::cout, " "));
|
||||||
|
std::cout << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
||||||
@@ -41,4 +67,4 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
|||||||
|
|
||||||
auto runID = logger.startDAGRun("test_run", tasks);
|
auto runID = logger.startDAGRun("test_run", tasks);
|
||||||
daggy::runDAG(runID, tasks, ex, logger, dag);
|
daggy::runDAG(runID, tasks, ex, logger, dag);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user