Checkpointing progress on task builder.

This commit is contained in:
Ian Roddis
2021-08-02 11:59:43 -03:00
parent 54e8170c68
commit a5b4e6ce42
3 changed files with 68 additions and 38 deletions

View File

@@ -17,6 +17,6 @@ namespace daggy {
ParameterValues parseParameters(const std::string & jsonSpec);
ParameterValues parseParameters(const rj::Document & spec);
// std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters);
// std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters);
}
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters);
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters);
}

View File

@@ -17,6 +17,7 @@ namespace daggy {
if (! it->name.IsString()) {
throw std::runtime_error("All keys must be strings.");
}
std::string name = std::string{"{{"} + it->name.GetString() + "}}";
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {
@@ -25,9 +26,9 @@ namespace daggy {
}
values.emplace_back(it->value[i].GetString());
}
parameters[it->name.GetString()] = values;
parameters[name] = values;
} else if (it->value.IsString()) {
parameters[it->name.GetString()] = it->value.GetString();
parameters[name] = it->value.GetString();
} else {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array.");
}
@@ -35,33 +36,60 @@ namespace daggy {
return parameters;
}
/*
std::vector<Task> buildTasks(const std::string & jsonSpec) {
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters) {
std::vector<Task> tasks;
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); }
const std::vector<std::string> reqFields{"name", "command"};
const std::vector<std::string> optionalFields{"max_retries", "retry_interval", "children"};
// Tasks
for (size_t i = 0; i < spec.Size(); ++i) {
if (! spec[i].IsObject()) {
throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary.");
}
const auto & taskSpec = spec[i].GetObject();
for (const auto reqField : reqFields) {
if (! taskSpec.HasMember(reqField.c_str())) {
throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField);
}
}
// Build the first task as-is
Task task;
task.name = taskSpec["name"].GetString();
task.command = taskSpec["command"].GetString();
if (taskSpec.HasMember("max_retries")) { task.max_retries = taskSpec["max_retries"].GetInt(); }
if (taskSpec.HasMember("retry_interval_seconds")) { task.retry_interval_seconds = taskSpec["retry_interval_seconds"].GetInt(); }
if (taskSpec.HasMember("children")) {
const auto & children = taskSpec["children"].GetArray();
for (size_t c = 0; c < children.Size(); ++c) {
task.children.emplace_back(children[c].GetString());
}
}
// We need to iterate over the tasks multiple times to interpolate
std::vector<Task> taskGroup;
taskGroup.push_back(task);
std::string command = taskSpec["command"];
Task task;
task.name = taskSpec["name"].GetString();
task.name = taskSpec["name"].GetString();
}
return tasks;
}
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) {
throw std::runtime_error("Unable to parse spec: ");
}
return buildTasks(doc);
return buildTasks(doc, parameters);
}
std::vector<Task> buildTasks(const rj::Document & spec) {
std::vector<Task> tasks;
if (!spec.IsObject()) { throw std::runtime_error("Spec is not a JSON dictionary"); }
// Parameter Parsing
auto parameters = parseParameters(spec);
// Tasks
if (spec.HasMember("tasks")) {
auto & sTasks = spec["tasks"];
if (! sTasks.IsArray()) {
throw std::runtime_error("tasks member must be an array");
}
for (size_t i = 0; i < sTasks.Size(); ++i) {
tasks.push_back(parseTask(sTasks[i]));
}
}
return tasks;
}
*/
}
}

View File

@@ -3,24 +3,26 @@
{
"name": "pull_data_a",
"max_retries": 3,
"retry_delay_seconds": 600,
"command": "/path/to/pull.sh --date {DATE} --source {SOURCE}_A",
"verification_command": "/path/to/pull_verify.sh --date {DATE} --source {SOURCE}_A",
"timeout_seconds": 30
"retry_interval_seconds": 600,
"if": "/path/to/should_pull.sh --date {{DATE}} --source {{SOURCE}}_A",
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A",
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A",
"timeout_seconds": 30,
"children": [ "merge_data" ]
},
{
"name": "pull_data_b",
"max_retries": 3,
"retry_delay_seconds": 600,
"command": "/path/to/pull.sh --date {DATE} --source {SOURCE}_B",
"verification_command": "/path/to/pull_verify.sh --date {DATE} --source {SOURCE}_B",
"timeout_seconds": 30
"retry_interval_seconds": 600,
"if": "/path/to/should_pull.sh --date {{DATE}} --source {{SOURCE}}_B",
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B",
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B",
"timeout_seconds": 30,
"children": [ "merge_data" ]
},
{
"name": "merge_data",
"command": "/path/to/merge.sh --left {SOURCE}_A --right {SOURCE}_B"
"command": "/path/to/merge.sh --left {{SOURCE}}_A --right {{SOURCE}}_B"
}
]
}