Files
daggy/tests/unit_utilities.cpp
Ian Roddis 39d5ae08be Adding a No-op task executor for testing
Fixing DFS implementation of DAG validation to be much faster
Adding in additional tests to ensure the run order of expanded tasks is preserved
Adding additional compile-time checks, resolving issues that came up as a result
2021-09-20 19:05:56 -03:00

236 lines
9.0 KiB
C++

#include <iostream>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <algorithm>
#include <iostream>
#include <random>
#include <catch2/catch.hpp>
#include "daggy/Utilities.hpp"
#include "daggy/Serialization.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/executors/task/NoopTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
TEST_CASE("string_utilities", "[utilities_string]") {
std::string test = "/this/is/{{A}}/test/{{A}}";
auto res = daggy::globalSub(test, "{{A}}", "hello");
REQUIRE(res == "/this/is/hello/test/hello");
}
TEST_CASE("string_expansion", "[utilities_parameter_expansion]") {
SECTION("Basic expansion") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"};
auto allCommands = daggy::interpolateValues(cmd, params);
REQUIRE(allCommands.size() == 6);
}
SECTION("Skip over unused parameters") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}"};
auto allCommands = daggy::interpolateValues(cmd, params);
// TYPE isn't used, so it's just |DATE| * |SOURCE|
REQUIRE(allCommands.size() == 2);
}
SECTION("Expand within a command part") {
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": ["A", "B"], "TYPE": ["a", "b", "c"]})"};
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/touch", "{{DATE}}_{{SOURCE}}"};
auto result = daggy::interpolateValues(cmd, params);
// TYPE isn't used, so it's just |DATE| * |SOURCE|
REQUIRE(result.size() == 4);
}
}
TEST_CASE("dag_runner_order", "[dagrun_order]") {
daggy::executors::task::NoopTaskExecutor ex;
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::TimePoint startTime = daggy::Clock::now();
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string taskJSON = R"({
"A": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "B","D" ]},
"B": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "C","D","E" ]},
"C": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "D"]},
"D": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "E"]},
"E": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}}
})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex, params);
REQUIRE(tasks.size() == 20);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited());
// Ensure the run order
auto rec = logger.getDAGRun(runID);
daggy::TimePoint stopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 5> minTimes; minTimes.fill(startTime);
std::array<daggy::TimePoint, 5> maxTimes; maxTimes.fill(stopTime);
for (const auto &[k, v] : rec.taskAttempts) {
size_t idx = k[0] - 65;
auto & startTime = minTimes[idx];
auto & stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 5; ++i) {
for (size_t j = i+1; j < 4; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
}
TEST_CASE("dag_runner", "[utilities_dag_runner]") {
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Simple execution") {
std::string prefix = (fs::current_path() / "asdlk").string();
std::unordered_map<std::string, std::string> files{
{"A", prefix + "_A"},
{"B", prefix + "_B"},
{"C", prefix + "_C"}};
std::string taskJSON = R"({"A": {"job": {"command": ["/usr/bin/touch", ")"
+ files.at("A") + R"("]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")"
+ files.at("B") + R"("]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")"
+ files.at("C") + R"("]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited());
for (const auto &[_, file] : files) {
REQUIRE(fs::exists(file));
fs::remove(file);
}
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
for (const auto &[_, attempts]: record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.front().rc == 0);
}
}
SECTION("Recovery from Error") {
auto cleanup = []() {
// Cleanup
std::vector<fs::path> paths{"rec_error_A", "noexist"};
for (const auto &pth: paths) {
if (fs::exists(pth)) fs::remove_all(pth);
}
};
cleanup();
std::string goodPrefix = "rec_error_";
std::string badPrefix = "noexist/rec_error_";
std::string taskJSON = R"({"A": {"job": {"command": ["/usr/bin/touch", ")"
+ goodPrefix +
R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")"
+ badPrefix +
R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")"
+ badPrefix + R"(C"]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto tryDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(!tryDAG.allVisited());
// Create the missing dir, then continue to run the DAG
fs::create_directory("noexist");
tryDAG.resetRunning();
auto endDAG = daggy::runDAG(runID, ex, logger, tryDAG);
REQUIRE(endDAG.allVisited());
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
REQUIRE(record.taskAttempts["A_0"].size() == 1); // A ran fine
REQUIRE(record.taskAttempts["B_0"].size() == 2); // B errored and had to be retried
REQUIRE(record.taskAttempts["C_0"].size() == 1); // C wasn't run because B errored
cleanup();
}
SECTION("Generator tasks") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string generatorOutput = R"({"B": {"job": {"command": ["/usr/bin/echo", "-e", "{{DATE}}"]}, "children": ["C"]}})";
fs::path ofn = fs::current_path() / "generator_test_output.json";
std::ofstream ofh{ofn};
ofh << generatorOutput << std::endl;
ofh.close();
std::stringstream jsonTasks;
jsonTasks << R"({ "A": { "job": {"command": [ "/usr/bin/cat", )" << std::quoted(ofn.string())
<< R"(]}, "children": ["C"], "isGenerator": true},)"
<< R"("C": { "job": {"command": [ "/usr/bin/echo", "hello!"]} } })";
auto baseTasks = daggy::tasksFromJSON(jsonTasks.str());
REQUIRE(baseTasks.size() == 2);
auto tasks = daggy::expandTaskSet(baseTasks, ex, params);
REQUIRE(tasks.size() == 2);
auto dag = daggy::buildDAGFromTasks(tasks);
REQUIRE(dag.size() == 2);
auto runID = logger.startDAGRun("generator_run", tasks);
auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params);
REQUIRE(finalDAG.allVisited());
REQUIRE(finalDAG.size() == 4);
// Check the logger
auto record = logger.getDAGRun(runID);
REQUIRE(record.tasks.size() == 4);
REQUIRE(record.taskRunStates.size() == 4);
for (const auto &[taskName, attempts]: record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.back().rc == 0);
}
// Ensure that children were updated properly
REQUIRE(record.tasks["A_0"].children == std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.tasks["B_0"].children == std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["B_1"].children == std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["C_0"].children.empty());
}
}