Completing task building.
This commit is contained in:
@@ -8,8 +8,8 @@ namespace daggy {
|
||||
struct Task {
|
||||
std::string name;
|
||||
std::vector<std::string> command;
|
||||
uint8_t max_retries;
|
||||
uint32_t retry_interval_seconds; // Time to wait between retries
|
||||
uint8_t maxRetries;
|
||||
uint32_t retryIntervalSeconds; // Time to wait between retries
|
||||
std::vector<std::string> children;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -14,9 +14,12 @@ namespace rj = rapidjson;
|
||||
namespace daggy {
|
||||
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
|
||||
using ParameterValues = std::unordered_map<std::string, ParameterValue>;
|
||||
using Command = std::vector<std::string>;
|
||||
|
||||
ParameterValues parseParameters(const std::string & jsonSpec);
|
||||
ParameterValues parseParameters(const rj::Document & spec);
|
||||
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters);
|
||||
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters);
|
||||
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters = {});
|
||||
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters = {});
|
||||
|
||||
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ namespace daggy {
|
||||
Scheduler::runTask(const Task &task) {
|
||||
std::vector<AttemptRecord> attempts;
|
||||
|
||||
while (attempts.size() < task.max_retries) {
|
||||
while (attempts.size() < task.maxRetries) {
|
||||
attempts.push_back(executor_.runCommand(task.command));
|
||||
if (attempts.back().rc == 0) break;
|
||||
}
|
||||
|
||||
@@ -36,13 +36,41 @@ namespace daggy {
|
||||
return parameters;
|
||||
}
|
||||
|
||||
std::vector<std::vector<std::string>>
|
||||
expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters) {
|
||||
std::vector<std::vector<std::string>> commands{ {} };
|
||||
|
||||
for (const auto & part : command) {
|
||||
// this isn't an interpolated value
|
||||
if (parameters.find(part) == parameters.end()) {
|
||||
for (auto &cmd : commands) cmd.push_back(part);
|
||||
continue;
|
||||
}
|
||||
auto & inVal = parameters.at(part);
|
||||
if (std::holds_alternative<std::string>(inVal)) {
|
||||
for (auto &cmd : commands) cmd.push_back(std::get<std::string>(inVal));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ends up being expensive, as it's a cartesian product
|
||||
std::vector<std::vector<std::string>> newCommands;
|
||||
for (const auto & val : std::get<std::vector<std::string>>(inVal)) {
|
||||
for (auto cmd : commands) {
|
||||
cmd.push_back(val);
|
||||
newCommands.push_back(cmd);
|
||||
}
|
||||
}
|
||||
commands.swap(newCommands);
|
||||
}
|
||||
return commands;
|
||||
}
|
||||
|
||||
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters) {
|
||||
std::vector<Task> tasks;
|
||||
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); }
|
||||
|
||||
const std::vector<std::string> reqFields{"name", "command"};
|
||||
const std::vector<std::string> optionalFields{"max_retries", "retry_interval", "children"};
|
||||
std::unordered_map<std::string, std::vector<std::string>> childrenMap;
|
||||
|
||||
// Tasks
|
||||
for (size_t i = 0; i < spec.Size(); ++i) {
|
||||
@@ -57,30 +85,53 @@ namespace daggy {
|
||||
}
|
||||
}
|
||||
|
||||
// Build the first task as-is
|
||||
Task task;
|
||||
task.name = taskSpec["name"].GetString();
|
||||
task.command = taskSpec["command"].GetString();
|
||||
|
||||
if (taskSpec.HasMember("max_retries")) { task.max_retries = taskSpec["max_retries"].GetInt(); }
|
||||
if (taskSpec.HasMember("retry_interval_seconds")) { task.retry_interval_seconds = taskSpec["retry_interval_seconds"].GetInt(); }
|
||||
// Grab the standard fields with defaults;
|
||||
std::string name = taskSpec["name"].GetString();
|
||||
uint8_t maxRetries = 0;
|
||||
if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); }
|
||||
uint8_t retryIntervalSeconds = 0;
|
||||
if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
|
||||
std::vector<std::string> children;
|
||||
if (taskSpec.HasMember("children")) {
|
||||
const auto & children = taskSpec["children"].GetArray();
|
||||
for (size_t c = 0; c < children.Size(); ++c) {
|
||||
task.children.emplace_back(children[c].GetString());
|
||||
const auto & specChildren = taskSpec["children"].GetArray();
|
||||
for (size_t c = 0; c < specChildren.Size(); ++c) {
|
||||
children.emplace_back(specChildren[c].GetString());
|
||||
}
|
||||
}
|
||||
|
||||
// We need to iterate over the tasks multiple times to interpolate
|
||||
std::vector<Task> taskGroup;
|
||||
taskGroup.push_back(task);
|
||||
// Build out the commands
|
||||
std::vector<std::string> command;
|
||||
for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) {
|
||||
command.push_back(taskSpec["command"][cmd].GetString());
|
||||
}
|
||||
auto commands = expandCommands(command, parameters);
|
||||
|
||||
std::string command = taskSpec["command"];
|
||||
|
||||
Task task;
|
||||
task.name = taskSpec["name"].GetString();
|
||||
task.name = taskSpec["name"].GetString();
|
||||
// Create the tasks
|
||||
auto & taskNames = childrenMap[name];
|
||||
size_t tid = 0;
|
||||
for (size_t tid = 0; tid < commands.size(); ++tid) {
|
||||
std::string taskName = name + "_" + std::to_string(tid);
|
||||
taskNames.push_back(taskName);
|
||||
tasks.emplace_back(Task {
|
||||
.name = name + "_" + std::to_string(tid),
|
||||
.command = commands[tid],
|
||||
.maxRetries = maxRetries,
|
||||
.retryIntervalSeconds = retryIntervalSeconds,
|
||||
.children = children
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// At the end, replace the names of the children with all the expanded versions
|
||||
for (auto & task : tasks) {
|
||||
std::vector<std::string> children;
|
||||
for (const auto & child : task.children) {
|
||||
auto & newChildren = childrenMap[child];
|
||||
std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children));
|
||||
}
|
||||
task.children.swap(children);
|
||||
}
|
||||
|
||||
return tasks;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,8 @@ TEST_CASE("Parameter Parsing", "[utilities_parse_parameters]") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
|
||||
auto params = daggy::parseParameters(testParams);
|
||||
REQUIRE(params.size() == 2);
|
||||
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["DATE"]));
|
||||
REQUIRE(std::holds_alternative<std::string>(params["SOURCE"]));
|
||||
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["{{DATE}}"]));
|
||||
REQUIRE(std::holds_alternative<std::string>(params["{{SOURCE}}"]));
|
||||
}
|
||||
SECTION("Invalid JSON") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"};
|
||||
@@ -28,4 +28,52 @@ TEST_CASE("Parameter Parsing", "[utilities_parse_parameters]") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": {"name": "kevin"}})"};
|
||||
REQUIRE_THROWS(daggy::parseParameters(testParams));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||
SECTION("Basic Parse") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
||||
auto params = daggy::parseParameters(testParams);
|
||||
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"};
|
||||
auto allCmds = daggy::expandCommands(cmd, params);
|
||||
|
||||
REQUIRE(allCmds.size() == 6);
|
||||
}
|
||||
|
||||
SECTION("Skip over unused parameters") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
||||
auto params = daggy::parseParameters(testParams);
|
||||
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}"};
|
||||
auto allCmds = daggy::expandCommands(cmd, params);
|
||||
|
||||
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
||||
REQUIRE(allCmds.size() == 2);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Building Tasks", "[utilities_build_tasks]") {
|
||||
SECTION("Build with no expansion") {
|
||||
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
|
||||
auto tasks = daggy::buildTasks(testTasks);
|
||||
REQUIRE(tasks.size() == 3);
|
||||
}
|
||||
|
||||
SECTION("Build with expansion") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
|
||||
auto params = daggy::parseParameters(testParams);
|
||||
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["B"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
|
||||
auto tasks = daggy::buildTasks(testTasks, params);
|
||||
|
||||
/*
|
||||
for (const auto & task : tasks) {
|
||||
std::cout << task.name << ": ";
|
||||
for (const auto & part : task.children) {
|
||||
std::cout << part << " ";
|
||||
}
|
||||
std::cout << std::endl;
|
||||
}
|
||||
*/
|
||||
REQUIRE(tasks.size() == 4);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user