diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 5d55476..a9c7159 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -17,6 +17,6 @@ namespace daggy { ParameterValues parseParameters(const std::string & jsonSpec); ParameterValues parseParameters(const rj::Document & spec); - // std::vector buildTasks(const std::string & jsonSpec, const ParameterValues & parameters); - // std::vector buildTasks(const rj::Document & spec, const ParameterValues & parameters); -} \ No newline at end of file + std::vector buildTasks(const std::string & jsonSpec, const ParameterValues & parameters); + std::vector buildTasks(const rj::Document & spec, const ParameterValues & parameters); +} diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index c517b5b..99a8737 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -17,6 +17,7 @@ namespace daggy { if (! it->name.IsString()) { throw std::runtime_error("All keys must be strings."); } + std::string name = std::string{"{{"} + it->name.GetString() + "}}"; if (it->value.IsArray()) { std::vector values; for (size_t i = 0; i < it->value.Size(); ++i) { @@ -25,9 +26,9 @@ namespace daggy { } values.emplace_back(it->value[i].GetString()); } - parameters[it->name.GetString()] = values; + parameters[name] = values; } else if (it->value.IsString()) { - parameters[it->name.GetString()] = it->value.GetString(); + parameters[name] = it->value.GetString(); } else { throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array."); } @@ -35,33 +36,60 @@ namespace daggy { return parameters; } - /* - std::vector buildTasks(const std::string & jsonSpec) { + + std::vector buildTasks(const rj::Document & spec, const ParameterValues & parameters) { + std::vector tasks; + if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); } + + const std::vector reqFields{"name", "command"}; + const std::vector optionalFields{"max_retries", "retry_interval", "children"}; + + // Tasks + for (size_t i = 0; i < spec.Size(); ++i) { + if (! spec[i].IsObject()) { + throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary."); + } + const auto & taskSpec = spec[i].GetObject(); + + for (const auto reqField : reqFields) { + if (! taskSpec.HasMember(reqField.c_str())) { + throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField); + } + } + + // Build the first task as-is + Task task; + task.name = taskSpec["name"].GetString(); + task.command = taskSpec["command"].GetString(); + + if (taskSpec.HasMember("max_retries")) { task.max_retries = taskSpec["max_retries"].GetInt(); } + if (taskSpec.HasMember("retry_interval_seconds")) { task.retry_interval_seconds = taskSpec["retry_interval_seconds"].GetInt(); } + if (taskSpec.HasMember("children")) { + const auto & children = taskSpec["children"].GetArray(); + for (size_t c = 0; c < children.Size(); ++c) { + task.children.emplace_back(children[c].GetString()); + } + } + + // We need to iterate over the tasks multiple times to interpolate + std::vector taskGroup; + taskGroup.push_back(task); + + std::string command = taskSpec["command"]; + + Task task; + task.name = taskSpec["name"].GetString(); + task.name = taskSpec["name"].GetString(); + } + return tasks; + } + + std::vector buildTasks(const std::string & jsonSpec, const ParameterValues & parameters) { rj::Document doc; rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); if (! parseResult) { throw std::runtime_error("Unable to parse spec: "); } - return buildTasks(doc); + return buildTasks(doc, parameters); } - - std::vector buildTasks(const rj::Document & spec) { - std::vector tasks; - if (!spec.IsObject()) { throw std::runtime_error("Spec is not a JSON dictionary"); } - - // Parameter Parsing - auto parameters = parseParameters(spec); - // Tasks - if (spec.HasMember("tasks")) { - auto & sTasks = spec["tasks"]; - if (! sTasks.IsArray()) { - throw std::runtime_error("tasks member must be an array"); - } - for (size_t i = 0; i < sTasks.Size(); ++i) { - tasks.push_back(parseTask(sTasks[i])); - } - } - return tasks; - } - */ -} \ No newline at end of file +} diff --git a/examples/sample_dag.json b/examples/sample_dag.json index 0fe8c32..b7fb4f9 100644 --- a/examples/sample_dag.json +++ b/examples/sample_dag.json @@ -3,24 +3,26 @@ { "name": "pull_data_a", "max_retries": 3, - "retry_delay_seconds": 600, - "command": "/path/to/pull.sh --date {DATE} --source {SOURCE}_A", - "verification_command": "/path/to/pull_verify.sh --date {DATE} --source {SOURCE}_A", - "timeout_seconds": 30 + "retry_interval_seconds": 600, + "if": "/path/to/should_pull.sh --date {{DATE}} --source {{SOURCE}}_A", + "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A", + "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A", + "timeout_seconds": 30, "children": [ "merge_data" ] }, { "name": "pull_data_b", "max_retries": 3, - "retry_delay_seconds": 600, - "command": "/path/to/pull.sh --date {DATE} --source {SOURCE}_B", - "verification_command": "/path/to/pull_verify.sh --date {DATE} --source {SOURCE}_B", - "timeout_seconds": 30 + "retry_interval_seconds": 600, + "if": "/path/to/should_pull.sh --date {{DATE}} --source {{SOURCE}}_B", + "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B", + "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B", + "timeout_seconds": 30, "children": [ "merge_data" ] }, { "name": "merge_data", - "command": "/path/to/merge.sh --left {SOURCE}_A --right {SOURCE}_B" + "command": "/path/to/merge.sh --left {{SOURCE}}_A --right {{SOURCE}}_B" } ] }