Files
daggy/tests/unit_utilities.cpp
2021-09-21 09:41:11 -03:00

256 lines
8.5 KiB
C++

#include <algorithm>
#include <catch2/catch.hpp>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <random>
#include "daggy/Serialization.hpp"
#include "daggy/Utilities.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/executors/task/NoopTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
TEST_CASE("string_utilities", "[utilities_string]")
{
std::string test = "/this/is/{{A}}/test/{{A}}";
auto res = daggy::globalSub(test, "{{A}}", "hello");
REQUIRE(res == "/this/is/hello/test/hello");
}
TEST_CASE("string_expansion", "[utilities_parameter_expansion]")
{
SECTION("Basic expansion")
{
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}",
"{{TYPE}}"};
auto allCommands = daggy::interpolateValues(cmd, params);
REQUIRE(allCommands.size() == 6);
}
SECTION("Skip over unused parameters")
{
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}"};
auto allCommands = daggy::interpolateValues(cmd, params);
// TYPE isn't used, so it's just |DATE| * |SOURCE|
REQUIRE(allCommands.size() == 2);
}
SECTION("Expand within a command part")
{
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": ["A", "B"], "TYPE": ["a", "b", "c"]})"};
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/touch", "{{DATE}}_{{SOURCE}}"};
auto result = daggy::interpolateValues(cmd, params);
// TYPE isn't used, so it's just |DATE| * |SOURCE|
REQUIRE(result.size() == 4);
}
}
TEST_CASE("dag_runner_order", "[dagrun_order]")
{
daggy::executors::task::NoopTaskExecutor ex;
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::TimePoint startTime = daggy::Clock::now();
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string taskJSON = R"({
"A": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "B","D" ]},
"B": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "C","D","E" ]},
"C": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "D"]},
"D": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "E"]},
"E": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}}
})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex, params);
REQUIRE(tasks.size() == 20);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited());
// Ensure the run order
auto rec = logger.getDAGRun(runID);
daggy::TimePoint stopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 5> minTimes;
minTimes.fill(startTime);
std::array<daggy::TimePoint, 5> maxTimes;
maxTimes.fill(stopTime);
for (const auto &[k, v] : rec.taskAttempts) {
size_t idx = k[0] - 65;
auto &startTime = minTimes[idx];
auto &stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 5; ++i) {
for (size_t j = i + 1; j < 4; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
}
TEST_CASE("dag_runner", "[utilities_dag_runner]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Simple execution")
{
std::string prefix = (fs::current_path() / "asdlk").string();
std::unordered_map<std::string, std::string> files{
{"A", prefix + "_A"}, {"B", prefix + "_B"}, {"C", prefix + "_C"}};
std::string taskJSON =
R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + files.at("A") +
R"("]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" +
files.at("B") +
R"("]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" +
files.at("C") + R"("]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited());
for (const auto &[_, file] : files) {
REQUIRE(fs::exists(file));
fs::remove(file);
}
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
for (const auto &[_, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.front().rc == 0);
}
}
SECTION("Recovery from Error")
{
auto cleanup = []() {
// Cleanup
std::vector<fs::path> paths{"rec_error_A", "noexist"};
for (const auto &pth : paths) {
if (fs::exists(pth))
fs::remove_all(pth);
}
};
cleanup();
std::string goodPrefix = "rec_error_";
std::string badPrefix = "noexist/rec_error_";
std::string taskJSON =
R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + goodPrefix +
R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" +
badPrefix +
R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" +
badPrefix + R"(C"]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto tryDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(!tryDAG.allVisited());
// Create the missing dir, then continue to run the DAG
fs::create_directory("noexist");
tryDAG.resetRunning();
auto endDAG = daggy::runDAG(runID, ex, logger, tryDAG);
REQUIRE(endDAG.allVisited());
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
REQUIRE(record.taskAttempts["A_0"].size() == 1); // A ran fine
REQUIRE(record.taskAttempts["B_0"].size() ==
2); // B errored and had to be retried
REQUIRE(record.taskAttempts["C_0"].size() ==
1); // C wasn't run because B errored
cleanup();
}
SECTION("Generator tasks")
{
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string generatorOutput =
R"({"B": {"job": {"command": ["/usr/bin/echo", "-e", "{{DATE}}"]}, "children": ["C"]}})";
fs::path ofn = fs::current_path() / "generator_test_output.json";
std::ofstream ofh{ofn};
ofh << generatorOutput << std::endl;
ofh.close();
std::stringstream jsonTasks;
jsonTasks
<< R"({ "A": { "job": {"command": [ "/usr/bin/cat", )"
<< std::quoted(ofn.string())
<< R"(]}, "children": ["C"], "isGenerator": true},)"
<< R"("C": { "job": {"command": [ "/usr/bin/echo", "hello!"]} } })";
auto baseTasks = daggy::tasksFromJSON(jsonTasks.str());
REQUIRE(baseTasks.size() == 2);
auto tasks = daggy::expandTaskSet(baseTasks, ex, params);
REQUIRE(tasks.size() == 2);
auto dag = daggy::buildDAGFromTasks(tasks);
REQUIRE(dag.size() == 2);
auto runID = logger.startDAGRun("generator_run", tasks);
auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params);
REQUIRE(finalDAG.allVisited());
REQUIRE(finalDAG.size() == 4);
// Check the logger
auto record = logger.getDAGRun(runID);
REQUIRE(record.tasks.size() == 4);
REQUIRE(record.taskRunStates.size() == 4);
for (const auto &[taskName, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.back().rc == 0);
}
// Ensure that children were updated properly
REQUIRE(record.tasks["A_0"].children ==
std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.tasks["B_0"].children ==
std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["B_1"].children ==
std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["C_0"].children.empty());
}
}