138 lines
5.1 KiB
C++
138 lines
5.1 KiB
C++
#include <iostream>
|
|
#include <filesystem>
|
|
#include <fstream>
|
|
|
|
#include <iomanip>
|
|
#include <algorithm>
|
|
#include <iostream>
|
|
|
|
#include <catch2/catch.hpp>
|
|
|
|
#include "daggy/Utilities.hpp"
|
|
#include "daggy/Serialization.hpp"
|
|
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
|
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
TEST_CASE("String Utilities", "[utilities_string]") {
|
|
std::string test = "/this/is/{{A}}/test/{{A}}";
|
|
auto res = daggy::globalSub(test, "{{A}}", "hello");
|
|
REQUIRE(res == "/this/is/hello/test/hello");
|
|
}
|
|
|
|
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
|
SECTION("Basic expansion") {
|
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
|
auto params = daggy::parametersFromJSON(testParams);
|
|
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"};
|
|
auto allCommands = daggy::expandCommands(cmd, params);
|
|
|
|
REQUIRE(allCommands.size() == 6);
|
|
}
|
|
|
|
SECTION("Skip over unused parameters") {
|
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
|
auto params = daggy::parametersFromJSON(testParams);
|
|
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}"};
|
|
auto allCommands = daggy::expandCommands(cmd, params);
|
|
|
|
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
|
REQUIRE(allCommands.size() == 2);
|
|
}
|
|
|
|
SECTION("Expand within a command part") {
|
|
std::string testParams{
|
|
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": ["A", "B"], "TYPE": ["a", "b", "c"]})"};
|
|
auto params = daggy::parametersFromJSON(testParams);
|
|
std::vector<std::string> cmd{"/usr/bin/touch", "{{DATE}}_{{SOURCE}}"};
|
|
auto result = daggy::expandCommands(cmd, params);
|
|
|
|
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
|
REQUIRE(result.size() == 4);
|
|
|
|
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
|
daggy::executors::task::ForkingTaskExecutor ex(10);
|
|
std::stringstream ss;
|
|
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
|
|
|
SECTION("Simple execution") {
|
|
std::string prefix = "asdlk_";
|
|
std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")"
|
|
+ prefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")"
|
|
+ prefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")"
|
|
+ prefix + R"(C"]}])";
|
|
auto tasks = daggy::tasksFromJSON(taskJSON);
|
|
auto dag = daggy::buildDAGFromTasks(tasks);
|
|
|
|
auto runID = logger.startDAGRun("test_run", tasks);
|
|
auto endDAG = daggy::runDAG(runID, tasks, ex, logger, dag);
|
|
|
|
REQUIRE(endDAG.allVisited());
|
|
|
|
std::vector<std::string> letters{"A", "B", "C"};
|
|
for (const auto &letter : letters) {
|
|
fs::path file{prefix + letter};
|
|
REQUIRE(fs::exists(file));
|
|
fs::remove(file);
|
|
}
|
|
|
|
// Get the DAG Run Attempts
|
|
auto record = logger.getDAGRun(runID);
|
|
for (const auto &attempts : record.taskAttempts) {
|
|
REQUIRE(attempts.size() == 1);
|
|
REQUIRE(attempts.front().rc == 0);
|
|
}
|
|
}
|
|
|
|
SECTION("Recovery from Error") {
|
|
auto cleanup = []() {
|
|
// Cleanup
|
|
std::vector<fs::path> paths{"rec_error_A", "noexist"};
|
|
for (const auto &pth : paths) {
|
|
if (fs::exists(pth)) fs::remove_all(pth);
|
|
}
|
|
};
|
|
|
|
cleanup();
|
|
|
|
|
|
// daggy::loggers::dag_run::OStreamLogger logger(std::cout);
|
|
|
|
std::string goodPrefix = "rec_error_";
|
|
std::string badPrefix = "noexist/rec_error_";
|
|
std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")"
|
|
+ goodPrefix +
|
|
R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")"
|
|
+ badPrefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")"
|
|
+ badPrefix + R"(C"]}])";
|
|
auto tasks = daggy::tasksFromJSON(taskJSON);
|
|
auto dag = daggy::buildDAGFromTasks(tasks);
|
|
|
|
auto runID = logger.startDAGRun("test_run", tasks);
|
|
|
|
auto tryDAG = daggy::runDAG(runID, tasks, ex, logger, dag);
|
|
|
|
REQUIRE(!tryDAG.allVisited());
|
|
|
|
// Create the missing dir, then continue to run the DAG
|
|
fs::create_directory("noexist");
|
|
tryDAG.resetRunning();
|
|
auto endDAG = daggy::runDAG(runID, tasks, ex, logger, tryDAG);
|
|
|
|
REQUIRE(endDAG.allVisited());
|
|
|
|
// Get the DAG Run Attempts
|
|
auto record = logger.getDAGRun(runID);
|
|
REQUIRE(record.taskAttempts[0].size() == 1); // A ran fine
|
|
REQUIRE(record.taskAttempts[1].size() == 2); // B errored and had to be retried
|
|
REQUIRE(record.taskAttempts[2].size() == 1); // C wasn't run because B errored
|
|
|
|
cleanup();
|
|
}
|
|
}
|