Massive re-org to allow per-executor job specification formats and executor-specific task validation and expansion.

A few different renames to try and keep things more consistent.
This commit is contained in:
Ian Roddis
2021-09-03 09:10:38 -03:00
parent e746f8c163
commit d15580f47f
22 changed files with 509 additions and 300 deletions

166
README.md
View File

@@ -58,7 +58,7 @@ DAG Run Definition
daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is
specifically for submitting DAGs to the REST server for execution (a DAG run).
DAGs are defined in JSON as a set of `tasks`, along with optional `taskParameters` and `executionParameters` (future).
DAGs are defined in JSON as a set of `tasks`, along with optional `job` and `executionParameters` (future).
Basic Definition
--
@@ -84,18 +84,22 @@ Below is an example DAG Run submission:
{
"tasks": {
"task_one": {
"command": [
"/usr/bin/touch",
"/tmp/somefile"
],
"job": {
"command": [
"/usr/bin/touch",
"/tmp/somefile"
]
},
"maxRetries": 3,
"retryIntervalSeconds": 30
},
"task_two": {
"command": [
"/usr/bin/touch",
"/tmp/someotherfile"
],
"job": {
"command": [
"/usr/bin/touch",
"/tmp/someotherfile"
]
},
"maxRetries": 3,
"retryIntervalSeconds": 30,
"parents": [
@@ -109,24 +113,25 @@ Below is an example DAG Run submission:
Task Parameters
--
Task commands can be parameterized by passing in an optional `taskParameters` member. Each parameter consists of a name
and either a string value, or an array of string values. Task commands will be regenerated based on the values of the
parameters.
Task commands can be parameterized by passing in an optional `parameters` member. Each parameter consists of a name and
either a string value, or an array of string values. Tasks will be regenerated based on the values of the parameters.
For instance:
```json
{
"taskParameters": {
"parameters": {
"DIRECTORY": "/var/tmp",
"FILE": "somefile"
},
"tasks": {
"task_one": {
"command": [
"/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
],
"job": {
"command": [
"/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
]
},
"maxRetries": 3,
"retryIntervalSeconds": 30
}
@@ -135,7 +140,7 @@ For instance:
```
`task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be
populated from the `taskParameters` values.
populated from the `job` values.
In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the
cartesian product of all relevant values.
@@ -144,7 +149,7 @@ Example:
```json
{
"taskParameters": {
"job": {
"DIRECTORY": "/var/tmp",
"FILE": "somefile",
"DATE": [
@@ -155,22 +160,28 @@ Example:
},
"tasks": {
"populate_inputs": {
"command": [
"/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
]
"job": {
"command": [
"/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
]
}
},
"calc_date": {
"command": [
"/path/to/calculator",
"{{DIRECTORY}}/{{FILE}}",
"{{DATE}}"
]
"job": {
"command": [
"/path/to/calculator",
"{{DIRECTORY}}/{{FILE}}",
"{{DATE}}"
]
}
},
"generate_report": {
"command": [
"/path/to/generator"
]
"job": {
"command": [
"/path/to/generator"
]
}
}
}
}
@@ -200,37 +211,48 @@ graph LR
- `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01`
- `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01`
**NB**: When a task template resolves to multiple tasks instances, all of those new instances are still referred to by
the original name for the purposes of creating dependencies. e.g. to add a dependency dynamically (see next section),
you must refer to `"children": [ "calc_date" ]`, not to the individual `calc_date_1`.
Tasks Generating Tasks
----------------------
Some DAG structures cannot be known ahead of time, but only at runtime. For instance, if a job pulls multiple files
from a source, each of which can be processed independently, it would be nice if the DAG could modify itself on the fly
to accomodate that request.
Some DAG structures can only be fully known at runtime. For instance, if a job pulls multiple files from a source, each
of which can be processed independently, it would be nice if the DAG could modify itself on the fly to accomodate that
request.
Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be
a JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above,
and can freely define their dependencies the same way.
Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be a
JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above, using
the same parameter list as the original DAG. New tasks can define their own dependencies.
**NB:** Generated tasks won't have any children dependencies unless you define them. If there are parameterized
dependencies, you must use the name of the original task (e.g. use `calc_date`, not `calc_date_1`) to add a dependency.
**NB:** If you add a child dependency to a task that has already completed, weird things will happen. Don't do it.
**NB:** If you add a child dependency to a task that has already completed, that task won't restart. Best practice is to
create a dependency from the generator task to the task the new tasks will depend on.
```json
{
"tasks": {
"pull_files": {
"command": [
"/path/to/puller/script",
"{{DATE}}"
],
"job": {
"command": [
"/path/to/puller/script",
"{{DATE}}"
]
},
"isGenerator": true,
children: [ "generate_report" ]
children: [
"generate_report"
]
},
"generate_report": {
"command": [
"/path/to/generator"
]
"job": {
"command": [
"/path/to/generator"
]
}
}
}
}
@@ -245,20 +267,29 @@ The output of the puller task might be:
```json
{
"calc_date_a": {
"command": [
"/path/to/calculator",
"/path/to/data/file/a"
],
"children": ["generate_report"]
"calc_date_a": {
"job": {
command
": [
"/path/to/calculator",
"/path/to/data/file/a"
]
},
"calc_date_b": {
"command": [
"/path/to/calculator",
"/path/to/data/file/b"
],
"children": ["generate_report"]
}
"children": [
"generate_report"
]
},
"calc_date_b": {
"job": {
"command": [
"/path/to/calculator",
"/path/to/data/file/b"
]
},
"children": [
"generate_report"
]
}
}
```
@@ -272,8 +303,9 @@ graph LR
calc_file_a-->generate_report
calc_file_b-->generate_report
```
Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would
run concurrently, and the `generate_report` wouldn't have any files to report on.
Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would run concurrently,
and the `generate_report` wouldn't have any files to report on.
Execution Parameters
--
@@ -288,3 +320,15 @@ jobs on slurm with a specific set of restrictions, or allow for local execution
|-----------|-------------|
| pool | Names the executor the DAG should run on |
| poolParameters | Any parameters the executor accepts that might modify how a task is run |
Executors
=========
Different executors require different structures for the `job` task member.
Default Job Values
------------------
A DAG can be submitted with the extra section `jobDefaults`. These values will be used to fill in default values for all
tasks if they aren't overridden. This can be useful for cases like Slurm execution, where tasks will share default
memory and runtime requirements.

51
TODO.md
View File

@@ -1,27 +1,30 @@
Tasks
==
- Open
- REST Server
- [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?)
- Core Functionality
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
separated list
- Executors
- [ ] Slurm Executor
- Loggers
- [ ] FileSystemLogger
- [ ] Add unit tests
- [ ] Add more error checking
- [ ] General logger
- [ ] Redis DAGRunLogger
- Server
- [ ] Multiple execution pools
- [ ] per-Executor parameters
- Utilities
- daggyd
- [ ] Add config file support
- [ ] Support for all the different executors / state loggers
- daggyc
- [ ] Submission
- [ ] Querying
- REST Server
- [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?)
- Core Functionality
- rename TaskList to TaskSet
- Add support for Task.runSpec
- Executors can validate runspecs
- Executors can expand runspecs
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
separated list
- Executors
- [ ] Slurm Executor
- Loggers
- [ ] FileSystemLogger
- [ ] Add unit tests
- [ ] Add more error checking
- [ ] General logger
- [ ] Redis DAGRunLogger
- Server
- [ ] Multiple execution pools
- [ ] per-Executor parameters
- Utilities
- daggyd
- [ ] Add config file support
- [ ] Support for all the different executors / state loggers
- daggyc
- [ ] Submission
- [ ] Querying

View File

@@ -11,8 +11,8 @@
namespace daggy {
// Commands and parameters
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
using ParameterValues = std::unordered_map<std::string, ParameterValue>;
using ConfigValue = std::variant<std::string, std::vector<std::string>>;
using ConfigValues = std::unordered_map<std::string, ConfigValue>;
using Command = std::vector<std::string>;
// Time
@@ -21,10 +21,9 @@ namespace daggy {
// DAG Runs
using DAGRunID = size_t;
using TaskID = size_t;
BETTER_ENUM(RunState, uint32_t,
QUEUED = 1,
QUEUED = 1 << 0,
RUNNING = 1 << 1,
RETRY = 1 << 2,
ERRORED = 1 << 3,
@@ -34,25 +33,25 @@ namespace daggy {
struct Task {
std::string definedName;
std::vector<std::string> command;
bool isGenerator; // True if the output of this task is a JSON set of tasks to complete
uint32_t maxRetries;
uint32_t retryIntervalSeconds; // Time to wait between retries
ConfigValues job; // It's up to the individual inspectors to convert values from strings // array of strings
std::unordered_set<std::string> children;
std::unordered_set<std::string> parents;
bool isGenerator; // True if the output of this task is a JSON set of tasks to complete
bool operator==(const Task &other) const {
return (definedName == other.definedName)
and (maxRetries == other.maxRetries)
and (retryIntervalSeconds == other.retryIntervalSeconds)
and (command == other.command)
and (job == other.job)
and (children == other.children)
and (parents == other.parents)
and (isGenerator == other.isGenerator);
}
};
using TaskList = std::unordered_map<std::string, Task>;
using TaskSet = std::unordered_map<std::string, Task>;
struct AttemptRecord {
TimePoint startTime;

View File

@@ -12,22 +12,26 @@
namespace rj = rapidjson;
namespace daggy {
// Parameters
ParameterValues parametersFromJSON(const std::string &jsonSpec);
void checkRJParse(const rj::ParseResult &result, const std::string &prefix = "");
ParameterValues parametersFromJSON(const rj::Value &spec);
// Parameters
ConfigValues configFromJSON(const std::string &jsonSpec);
ConfigValues configFromJSON(const rj::Value &spec);
std::string configToJSON(const ConfigValues &config);
// Tasks
TaskList
taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues &parameters = {});
Task
taskFromJSON(const std::string &name, const rj::Value &spec, const ConfigValues &jobDefaults = {});
TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {});
TaskSet tasksFromJSON(const std::string &jsonSpec, const ConfigValues &jobDefaults = {});
TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters = {});
TaskSet tasksFromJSON(const rj::Value &spec, const ConfigValues &jobDefaults = {});
std::string taskToJSON(const Task &task);
std::string tasksToJSON(const TaskList &tasks);
std::string tasksToJSON(const TaskSet &tasks);
// Attempt Records
std::string attemptRecordToJSON(const AttemptRecord &attemptRecord);

View File

@@ -17,13 +17,19 @@ namespace daggy {
std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement);
std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters);
std::vector<Command> interpolateValues(const std::vector<std::string> &raw, const ConfigValues &values);
TaskSet
expandTaskSet(const TaskSet &tasks,
executors::task::TaskExecutor &executor,
const ConfigValues &interpolatedValues = {});
TaskDAG
buildDAGFromTasks(TaskList &tasks,
buildDAGFromTasks(TaskSet &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates = {});
void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks);
void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks);
// Blocking call
std::vector<AttemptRecord>
@@ -37,7 +43,7 @@ namespace daggy {
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
TaskDAG dag,
const ParameterValues taskParameters = {});
const ConfigValues job = {});
std::ostream &operator<<(std::ostream &os, const TimePoint &tp);
}

View File

@@ -2,18 +2,22 @@
#include "TaskExecutor.hpp"
namespace daggy {
namespace executors {
namespace task {
class ForkingTaskExecutor : public TaskExecutor {
public:
ForkingTaskExecutor(size_t nThreads)
: TaskExecutor(nThreads) {}
namespace daggy::executors::task {
class ForkingTaskExecutor : public TaskExecutor {
public:
using Command = std::vector<std::string>;
const std::string getName() const override { return "ForkingTaskExecutor"; }
ForkingTaskExecutor(size_t nThreads)
: TaskExecutor(nThreads) {}
AttemptRecord runCommand(const Task &task) override;
};
}
}
// Validates the job to ensure that all required values are set and are of the right type,
bool validateTaskParameters(const ConfigValues &job) override;
std::vector<ConfigValues>
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
AttemptRecord execute(const Task &task) override;
};
}

View File

@@ -14,20 +14,21 @@
If there are many retries, logs are returned for each attempt.
*/
namespace daggy {
namespace executors {
namespace task {
class TaskExecutor {
public:
TaskExecutor(size_t nThreads) : threadPool(nThreads) {};
namespace daggy::executors::task {
class TaskExecutor {
public:
TaskExecutor(size_t nThreads) : threadPool(nThreads) {};
virtual const std::string getName() const = 0;
// Validates the job to ensure that all required values are set and are of the right type,
virtual bool validateTaskParameters(const ConfigValues &job) = 0;
// This will block if the dag_executor is full
virtual AttemptRecord runCommand(const Task &task) = 0;
// Will use the expansion values to return the fully expanded tasks.
virtual std::vector<ConfigValues>
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) = 0;
ThreadPool threadPool;
};
}
}
// Blocking execution of a task
virtual AttemptRecord execute(const Task &task) = 0;
ThreadPool threadPool;
};
}

View File

@@ -17,7 +17,7 @@ namespace daggy {
class DAGRunLogger {
public:
// Execution
virtual DAGRunID startDAGRun(std::string name, const TaskList &tasks) = 0;
virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0;
virtual void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) = 0;

View File

@@ -23,7 +23,7 @@ namespace daggy::loggers::dag_run {
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
TaskList tasks;
TaskSet tasks;
std::unordered_map<std::string, RunState> taskRunStates;
std::unordered_map<std::string, std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;

View File

@@ -37,7 +37,7 @@ namespace daggy::loggers::dag_run {
FileSystemLogger(fs::path root);
// Execution
DAGRunID startDAGRun(std::string name, const TaskList &tasks) override;
DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;

View File

@@ -18,7 +18,7 @@ namespace daggy {
OStreamLogger(std::ostream &os);
// Execution
DAGRunID startDAGRun(std::string name, const TaskList &tasks) override;
DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override;
void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) override;

View File

@@ -1,22 +1,30 @@
#include <sstream>
#include <iomanip>
#include <rapidjson/error/en.h>
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
namespace daggy {
ParameterValues parametersFromJSON(const std::string &jsonSpec) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (!parseResult) {
throw std::runtime_error("Parameters spec is not valid JSON");
void checkRJParse(const rj::ParseResult &result, const std::string &prefix) {
if (!result) {
std::stringstream ss;
ss << (prefix.empty() ? "" : prefix + ':')
<< "Error parsing JSON: " << rj::GetParseError_En(result.Code())
<< " at byte offset " << result.Offset();
throw std::runtime_error(ss.str());
}
return parametersFromJSON(doc);
}
ParameterValues parametersFromJSON(const rj::Value &spec) {
std::unordered_map<std::string, ParameterValue> parameters;
ConfigValues configFromJSON(const std::string &jsonSpec) {
rj::Document doc;
checkRJParse(doc.Parse(jsonSpec.c_str()), "Parsing config");
return configFromJSON(doc);
}
ConfigValues configFromJSON(const rj::Value &spec) {
std::unordered_map<std::string, ConfigValue> parameters;
if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); }
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (!it->name.IsString()) {
@@ -44,86 +52,105 @@ namespace daggy {
return parameters;
}
TaskList
taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues &parameters) {
TaskList tasks;
std::string configToJSON(const ConfigValues &config) {
std::stringstream ss;
ss << '{';
bool first = true;
for (const auto &[k, v]: config) {
if (first) { first = false; } else { ss << ", "; }
ss << std::quoted(k) << ": ";
if (std::holds_alternative<std::string>(v)) {
ss << std::quoted(std::get<std::string>(v));
} else {
ss << '[';
const auto &vals = std::get<std::vector<std::string>>(v);
bool firstVal = true;
for (const auto &val: vals) {
if (firstVal) { firstVal = false; } else { ss << ", "; }
ss << std::quoted(val);
}
ss << ']';
}
}
ss << '}';
return ss.str();
}
Task
taskFromJSON(const std::string &name, const rj::Value &spec, const ConfigValues &jobDefaults) {
Task task{
.definedName = name,
.isGenerator = false,
.maxRetries = 0,
.retryIntervalSeconds = 0,
.job = jobDefaults
};
if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); }
if (!spec.HasMember("command")) {
throw std::runtime_error("Task " + name + " is missing required 'command' field");
}
// Grab the standard fields with defaults;
bool isGenerator = false;
if (spec.HasMember("isGenerator")) {
isGenerator = spec["isGenerator"].GetBool();
task.isGenerator = spec["isGenerator"].GetBool();
}
uint8_t maxRetries = 0;
if (spec.HasMember("maxRetries")) { maxRetries = spec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0;
if (spec.HasMember(
"retryIntervalSeconds")) { retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt(); }
if (spec.HasMember("maxRetries")) {
task.maxRetries = spec["maxRetries"].GetInt();
}
if (spec.HasMember("retryIntervalSeconds")) {
task.retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt();
}
// Children / parents
std::unordered_set<std::string> children;
if (spec.HasMember("children")) {
const auto &specChildren = spec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
children.insert(specChildren[c].GetString());
task.children.insert(specChildren[c].GetString());
}
}
std::unordered_set<std::string> parents;
if (spec.HasMember("parents")) {
const auto &specParents = spec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
parents.insert(specParents[c].GetString());
task.parents.insert(specParents[c].GetString());
}
}
// Build out the commands
std::vector<std::string> command;
for (size_t cmd = 0; cmd < spec["command"].Size(); ++cmd) {
command.emplace_back(spec["command"][cmd].GetString());
if (spec.HasMember("job")) {
const auto &params = spec["job"];
if (!params.IsObject()) throw std::runtime_error("job is not a dictionary.");
for (auto it = params.MemberBegin(); it != params.MemberEnd(); ++it) {
if (!it->name.IsString()) throw std::runtime_error("job key must be a string.");
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {
values.emplace_back(it->value[i].GetString());
}
task.job.insert_or_assign(it->name.GetString(), values);
} else {
task.job.insert_or_assign(it->name.GetString(), it->value.GetString());
}
}
}
auto commands = expandCommands(command, parameters);
// Create the tasks
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid));
tasks.emplace(taskName, Task{
.definedName = name,
.command = commands[tid],
.maxRetries = maxRetries,
.retryIntervalSeconds = retryIntervalSeconds,
.children = children,
.parents = parents,
.isGenerator = isGenerator
});
}
return tasks;
return task;
}
TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters) {
TaskSet tasksFromJSON(const std::string &jsonSpec, const ConfigValues &jobDefaults) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (!parseResult) {
throw std::runtime_error("Unable to parse spec: ");
}
return tasksFromJSON(doc, parameters);
checkRJParse(doc.Parse(jsonSpec.c_str()));
return tasksFromJSON(doc, jobDefaults);
}
TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters) {
TaskList tasks;
TaskSet tasksFromJSON(const rj::Value &spec, const ConfigValues &jobDefaults) {
TaskSet tasks;
if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); }
// Tasks
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (!it->name.IsString()) throw std::runtime_error("Task names must be a string.");
if (!it->value.IsObject()) throw std::runtime_error("Task definitions must be an object.");
auto subTasks = taskFromJSON(it->name.GetString(), it->value, parameters);
tasks.merge(subTasks);
const auto &taskName = it->name.GetString();
tasks.emplace(taskName, taskFromJSON(taskName, it->value, jobDefaults));
}
return tasks;
}
@@ -138,15 +165,7 @@ namespace daggy {
<< R"("maxRetries": )" << task.maxRetries << ','
<< R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ',';
// Commands
ss << R"("command": [)";
first = true;
for (const auto &part: task.command) {
if (!first) ss << ',';
ss << std::quoted(part);
first = false;
}
ss << "],";
ss << R"("job": )" << configToJSON(task.job) << ',';
ss << R"("children": [)";
first = true;
@@ -163,7 +182,7 @@ namespace daggy {
return ss.str();
}
std::string tasksToJSON(const TaskList &tasks) {
std::string tasksToJSON(const TaskSet &tasks) {
std::stringstream ss;
ss << "{";

View File

@@ -85,7 +85,7 @@ namespace daggy {
/*
* {
* "name": "DAG Run Name"
* "taskParameters": {...}
* "job": {...}
* "tasks": {...}
*/
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
@@ -105,21 +105,33 @@ namespace daggy {
std::string runName = doc["name"].GetString();
// Get parameters if there are any
ParameterValues parameters;
if (doc.HasMember("taskParameters")) {
ConfigValues parameters;
if (doc.HasMember("parameters")) {
try {
auto parsedParams = parametersFromJSON(doc["taskParameters"].GetObject());
auto parsedParams = configFromJSON(doc["parameters"].GetObject());
parameters.swap(parsedParams);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
}
// Job Defaults
ConfigValues jobDefaults;
if (doc.HasMember("jobDefaults")) {
try {
auto parsedJobDefaults = configFromJSON(doc["jobDefaults"].GetObject());
jobDefaults.swap(parsedJobDefaults);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
}
// Get the tasks
TaskList tasks;
TaskSet tasks;
try {
auto parsedTasks = tasksFromJSON(doc["tasks"], parameters);
tasks.swap(parsedTasks);
auto taskTemplates = tasksFromJSON(doc["tasks"], jobDefaults);
auto expandedTasks = expandTaskSet(taskTemplates, executor_, parameters);
tasks.swap(expandedTasks);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}

View File

@@ -14,14 +14,14 @@ namespace daggy {
}
std::vector<std::vector<std::string>>
expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters) {
std::vector<std::vector<std::string>> commands{{}};
interpolateValues(const std::vector<std::string> &raw, const ConfigValues &values) {
std::vector<std::vector<std::string>> cooked{{}};
for (const auto &part: command) {
for (const auto &part: raw) {
std::vector<std::string> expandedPart{part};
// Find all values of parameters, and expand them
for (const auto &[paramRaw, paramValue]: parameters) {
for (const auto &[paramRaw, paramValue]: values) {
std::string param = "{{" + paramRaw + "}}";
auto pos = part.find(param);
if (pos == std::string::npos) continue;
@@ -44,17 +44,42 @@ namespace daggy {
std::vector<std::vector<std::string>> newCommands;
for (const auto &newPart: expandedPart) {
for (auto cmd: commands) {
for (auto cmd: cooked) {
cmd.push_back(newPart);
newCommands.emplace_back(cmd);
}
}
commands.swap(newCommands);
cooked.swap(newCommands);
}
return commands;
return cooked;
}
void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks) {
TaskSet
expandTaskSet(const TaskSet &tasks,
executors::task::TaskExecutor &executor,
const ConfigValues &interpolatedValues) {
// Expand the tasks first
TaskSet newTaskSet;
for (const auto &[baseName, task]: tasks) {
executor.validateTaskParameters(task.job);
const auto newJobs = executor.expandTaskParameters(task.job, interpolatedValues);
if (newJobs.size() == 1) {
newTaskSet.emplace(baseName, task);
} else {
size_t i = 0;
for (const auto &newJob: newJobs) {
Task newTask{task};
newTask.job = newJob;
newTaskSet.emplace(baseName + "_" + std::to_string(i), newTask);
++i;
}
}
}
return newTaskSet;
}
void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks) {
// Add all the vertices
std::unordered_map<std::string, std::unordered_set<std::string>> definedSets;
for (const auto &[name, task]: tasks) {
@@ -79,10 +104,9 @@ namespace daggy {
}
}
TaskDAG buildDAGFromTasks(TaskList &tasks,
TaskDAG buildDAGFromTasks(TaskSet &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates) {
TaskDAG dag;
updateDAGFromTasks(dag, tasks);
dag.reset();
@@ -111,7 +135,7 @@ namespace daggy {
logger.updateTaskState(runID, taskName, RunState::RUNNING);
while (attempts.size() < task.maxRetries + 1) {
attempts.push_back(executor.runCommand(task));
attempts.push_back(executor.execute(task));
logger.logTaskAttempt(runID, taskName, attempts.back());
if (attempts.back().rc == 0) break;
logger.updateTaskState(runID, taskName, RunState::RETRY);
@@ -123,7 +147,7 @@ namespace daggy {
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
TaskDAG dag,
const ParameterValues taskParameters
const ConfigValues parameters
) {
logger.updateDAGRunState(runID, RunState::RUNNING);
@@ -146,9 +170,11 @@ namespace daggy {
auto &task = vert.data;
if (task.isGenerator) {
// Parse the output and update the DAGs
// TODO: Let the logger know about the new tasks
try {
auto newTasks = tasksFromJSON(attemptRecords.back().outputLog, taskParameters);
auto newTasks = expandTaskSet(tasksFromJSON(attemptRecords.back().outputLog),
executor,
parameters
);
updateDAGFromTasks(dag, newTasks);
for (const auto &[ntName, ntTask]: newTasks) {
@@ -158,6 +184,10 @@ namespace daggy {
}
logger.updateTask(runID, taskName, task);
} catch (std::exception &e) {
logger.logTaskAttempt(runID, taskName,
AttemptRecord{.executorLog =
std::string{"Failed to parse JSON output: "} +
e.what()});
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}

View File

@@ -1,4 +1,5 @@
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/Utilities.hpp>
#include <fcntl.h>
#include <unistd.h>
@@ -31,23 +32,26 @@ std::string slurp(int fd) {
}
daggy::AttemptRecord
ForkingTaskExecutor::runCommand(const Task &task) {
ForkingTaskExecutor::execute(const Task &task) {
AttemptRecord rec;
rec.startTime = Clock::now();
// Need to convert the strings
std::vector<char *> argv;
for (const auto &s : task.command) {
const auto command = std::get<Command>(task.job.at("command"));
for (const auto &s: command) {
argv.push_back(const_cast<char *>(s.c_str()));
}
argv.push_back(nullptr);
// Create the pipe
int stdoutPipe[2];
pipe2(stdoutPipe, O_DIRECT);
int pipeRC = pipe2(stdoutPipe, O_DIRECT);
if (pipeRC != 0) throw std::runtime_error("Unable to create pipe for stdout");
int stderrPipe[2];
pipe2(stderrPipe, O_DIRECT);
pipeRC = pipe2(stderrPipe, O_DIRECT);
if (pipeRC != 0) throw std::runtime_error("Unable to create pipe for stderr");
pid_t child = fork();
if (child < 0) {
@@ -84,3 +88,26 @@ ForkingTaskExecutor::runCommand(const Task &task) {
return rec;
}
bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job) {
auto it = job.find("command");
if (it == job.end())
throw std::runtime_error(R"(job does not have a "command" argument)");
if (!std::holds_alternative<Command>(it->second))
throw std::runtime_error(R"(taskParameter's "command" must be an array of strings)");
return true;
}
std::vector<daggy::ConfigValues>
ForkingTaskExecutor::expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) {
std::vector<ConfigValues> newValues;
const auto command = std::get<Command>(job.at("command"));
for (const auto &expandedCommand: interpolateValues(command, expansionValues)) {
ConfigValues newCommand{job};
newCommand.at("command") = expandedCommand;
newValues.emplace_back(newCommand);
}
return newValues;
}

View File

@@ -39,7 +39,7 @@ namespace daggy {
}
// Execution
DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskList &tasks) {
DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskSet &tasks) {
DAGRunID runID = nextRunID_++;
// TODO make this threadsafe

View File

@@ -4,6 +4,7 @@
#include <enum.h>
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
#include <daggy/Serialization.hpp>
namespace daggy {
namespace loggers {
@@ -11,7 +12,7 @@ namespace daggy {
OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {}
// Execution
DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskList &tasks) {
DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskSet &tasks) {
std::lock_guard<std::mutex> lock(guard_);
size_t runID = dagRuns_.size();
dagRuns_.push_back({
@@ -26,9 +27,7 @@ namespace daggy {
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
<< " tasks" << std::endl;
for (const auto &[name, task]: tasks) {
os_ << "TASK (" << name << "): ";
std::copy(task.command.begin(), task.command.end(),
std::ostream_iterator<std::string>(os_, " "));
os_ << "TASK (" << name << "): " << configToJSON(task.job);
os_ << std::endl;
}
return runID;

View File

@@ -12,13 +12,13 @@ namespace fs = std::filesystem;
using namespace daggy;
using namespace daggy::loggers::dag_run;
const TaskList SAMPLE_TASKS{
{"work_a", Task{.command{"/bin/echo", "a"}, .children{"c"}}},
{"work_b", Task{.command{"/bin/echo", "b"}, .children{"c"}}},
{"work_c", Task{.command{"/bin/echo", "c"}}}
const TaskSet SAMPLE_TASKS{
{"work_a", Task{.job{{"command", std::vector<std::string>{"/bin/echo", "a"}}}, .children{"c"}}},
{"work_b", Task{.job{{"command", std::vector<std::string>{"/bin/echo", "b"}}}, .children{"c"}}},
{"work_c", Task{.job{{"command", std::vector<std::string>{"/bin/echo", "c"}}}}}
};
inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, const TaskList &tasks) {
inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, const TaskSet &tasks) {
auto runID = logger.startDAGRun(name, tasks);
auto dagRun = logger.getDAGRun(runID);

View File

@@ -2,6 +2,8 @@
#include <filesystem>
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/Serialization.hpp"
#include "daggy/Utilities.hpp"
#include <catch2/catch.hpp>
@@ -9,9 +11,12 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
daggy::executors::task::ForkingTaskExecutor ex(10);
SECTION("Simple Run") {
daggy::Task task{.command{"/usr/bin/echo", "abc", "123"}};
daggy::Task task{.job{
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/echo", "abc", "123"}}}};
auto rec = ex.runCommand(task);
REQUIRE(ex.validateTaskParameters(task.job));
auto rec = ex.execute(task);
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
@@ -19,9 +24,10 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
}
SECTION("Error Run") {
daggy::Task task{.command{"/usr/bin/expr", "1", "+", "+"}};
daggy::Task task{.job{
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}};
auto rec = ex.runCommand(task);
auto rec = ex.execute(task);
REQUIRE(rec.rc == 2);
REQUIRE(rec.errorLog.size() >= 20);
@@ -33,16 +39,45 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
"/usr/share/dict/linux.words", "/usr/share/dict/cracklib-small", "/etc/ssh/moduli"
};
for (const auto &bigFile : BIG_FILES) {
for (const auto &bigFile: BIG_FILES) {
if (!std::filesystem::exists(bigFile)) continue;
daggy::Task task{.command{"/usr/bin/cat", bigFile}};
daggy::Task task{.job{
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/cat", bigFile}}}};
auto rec = ex.runCommand(task);
auto rec = ex.execute(task);
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
REQUIRE(rec.errorLog.empty());
}
}
}
SECTION("Parameter Expansion") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string taskJSON = R"({"B": {"job": {"command": ["/usr/bin/echo", "{{DATE}}"]}, "children": ["C"]}})";
auto tasks = daggy::tasksFromJSON(taskJSON);
auto result = daggy::expandTaskSet(tasks, ex, params);
REQUIRE(result.size() == 2);
}
SECTION("Build with expansion") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::configFromJSON(testParams);
std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["B"]}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})";
auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks), ex, params);
REQUIRE(tasks.size() == 4);
}
SECTION("Build with expansion using parents instead of children") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::configFromJSON(testParams);
std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}}, "B": {"job": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"]}, "parents": ["A"]}, "C": {"job": {"command": ["/bin/echo", "C"]}, "parents": ["A"]}})";
auto tasks = daggy::expandTaskSet(daggy::tasksFromJSON(testTasks), ex, params);
REQUIRE(tasks.size() == 4);
}
}

View File

@@ -11,49 +11,68 @@ namespace fs = std::filesystem;
TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") {
SECTION("Basic Parse") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parametersFromJSON(testParams);
auto params = daggy::configFromJSON(testParams);
REQUIRE(params.size() == 2);
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["DATE"]));
REQUIRE(std::holds_alternative<std::string>(params["SOURCE"]));
}SECTION("Invalid JSON") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"};
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
REQUIRE_THROWS(daggy::configFromJSON(testParams));
}SECTION("Non-string Keys") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], 6: "name"})"};
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
REQUIRE_THROWS(daggy::configFromJSON(testParams));
}SECTION("Non-array/Non-string values") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": {"name": "kevin"}})"};
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
REQUIRE_THROWS(daggy::configFromJSON(testParams));
}
}
TEST_CASE("Task Deserialization", "[deserialize_task]") {
SECTION("Build with no expansion") {
std::string testTasks = R"({ "A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})";
std::string testTasks = R"({
"A": {
"job": { "command": ["/bin/echo", "A"] },
"children": ["C"]
},
"B": {
"job": {"command": ["/bin/echo", "B"]},
"children": ["C"]
},
"C": {
"job": {"command": ["/bin/echo", "C"]}
}
})";
auto tasks = daggy::tasksFromJSON(testTasks);
REQUIRE(tasks.size() == 3);
}
SECTION("Build with expansion") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parametersFromJSON(testParams);
std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["B"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})";
auto tasks = daggy::tasksFromJSON(testTasks, params);
REQUIRE(tasks.size() == 4);
}
SECTION("Build with expansion using parents instead of children") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parametersFromJSON(testParams);
std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]}, "C": {"command": ["/bin/echo", "C"], "parents": ["A"]}})";
auto tasks = daggy::tasksFromJSON(testTasks, params);
REQUIRE(tasks.size() == 4);
SECTION("Build with job defaults") {
std::string testTasks = R"({
"A": {
"job": { "command": ["/bin/echo", "A"] },
"children": ["B"]
},
"B": {
"job": {
"command": ["/bin/echo", "C"],
"memory": "1G"
}
}
})";
daggy::ConfigValues jobDefaults{{"runtime", "60"},
{"memory", "300M"}};
auto tasks = daggy::tasksFromJSON(testTasks, jobDefaults);
REQUIRE(tasks.size() == 2);
REQUIRE(std::get<std::string>(tasks["A"].job["runtime"]) == "60");
REQUIRE(std::get<std::string>(tasks["A"].job["memory"]) == "300M");
REQUIRE(std::get<std::string>(tasks["B"].job["runtime"]) == "60");
REQUIRE(std::get<std::string>(tasks["B"].job["memory"]) == "1G");
}
}
TEST_CASE("Task Serialization", "[serialize_tasks]") {
SECTION("Build with no expansion") {
std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})";
std::string testTasks = R"({"A": {"job": {"command": ["/bin/echo", "A"]}, "children": ["C"]}, "B": {"job": {"command": ["/bin/echo", "B"]}, "children": ["C"]}, "C": {"job": {"command": ["/bin/echo", "C"]}}})";
auto tasks = daggy::tasksFromJSON(testTasks);
auto genJSON = daggy::tasksToJSON(tasks);

View File

@@ -6,9 +6,10 @@
#include <pistache/client.h>
#include <rapidjson/document.h>
#include "daggy/Server.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
#include <daggy/Server.hpp>
#include <daggy/Serialization.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
namespace rj = rapidjson;
@@ -73,10 +74,10 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
SECTION("Simple DAGRun Submission") {
std::string dagRun = R"({
"name": "unit_server",
"taskParameters": { "FILE": [ "A", "B" ] },
"parameters": { "FILE": [ "A", "B" ] },
"tasks": {
"touch": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ] },
"cat": { "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ],
"touch": { "job": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ]} },
"cat": { "job": { "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ]},
"parents": [ "touch" ]
}
}
@@ -90,8 +91,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
REQUIRE(response.code() == Pistache::Http::Code::Ok);
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(response.body().c_str());
REQUIRE(parseResult);
daggy::checkRJParse(doc.Parse(response.body().c_str()));
REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("runID"));
@@ -104,8 +104,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
REQUIRE(response.code() == Pistache::Http::Code::Ok);
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(response.body().c_str());
REQUIRE(parseResult);
daggy::checkRJParse(doc.Parse(response.body().c_str()));
REQUIRE(doc.IsArray());
REQUIRE(doc.Size() >= 1);
@@ -134,8 +133,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID));
REQUIRE(response.code() == Pistache::Http::Code::Ok);
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(response.body().c_str());
REQUIRE(parseResult);
daggy::checkRJParse(doc.Parse(response.body().c_str()));
REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("taskStates"));

View File

@@ -24,18 +24,18 @@ TEST_CASE("String Utilities", "[utilities_string]") {
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
SECTION("Basic expansion") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
auto params = daggy::parametersFromJSON(testParams);
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"};
auto allCommands = daggy::expandCommands(cmd, params);
auto allCommands = daggy::interpolateValues(cmd, params);
REQUIRE(allCommands.size() == 6);
}
SECTION("Skip over unused parameters") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
auto params = daggy::parametersFromJSON(testParams);
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}"};
auto allCommands = daggy::expandCommands(cmd, params);
auto allCommands = daggy::interpolateValues(cmd, params);
// TYPE isn't used, so it's just |DATE| * |SOURCE|
REQUIRE(allCommands.size() == 2);
@@ -44,9 +44,9 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
SECTION("Expand within a command part") {
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": ["A", "B"], "TYPE": ["a", "b", "c"]})"};
auto params = daggy::parametersFromJSON(testParams);
auto params = daggy::configFromJSON(testParams);
std::vector<std::string> cmd{"/usr/bin/touch", "{{DATE}}_{{SOURCE}}"};
auto result = daggy::expandCommands(cmd, params);
auto result = daggy::interpolateValues(cmd, params);
// TYPE isn't used, so it's just |DATE| * |SOURCE|
REQUIRE(result.size() == 4);
@@ -62,11 +62,11 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
SECTION("Simple execution") {
std::string prefix = "asdlk_";
std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(C"]}})";
auto tasks = daggy::tasksFromJSON(taskJSON);
std::string taskJSON = R"({"A": {"job": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(C"]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
@@ -102,12 +102,13 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
std::string goodPrefix = "rec_error_";
std::string badPrefix = "noexist/rec_error_";
std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")"
std::string taskJSON = R"({"A": {"job": {"command": ["/usr/bin/touch", ")"
+ goodPrefix +
R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")"
+ badPrefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")"
+ badPrefix + R"(C"]}})";
auto tasks = daggy::tasksFromJSON(taskJSON);
R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")"
+ badPrefix +
R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")"
+ badPrefix + R"(C"]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
@@ -134,22 +135,31 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
SECTION("Generator tasks") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
auto params = daggy::parametersFromJSON(testParams);
auto params = daggy::configFromJSON(testParams);
std::string generatorOutput = R"({"B": {"job": {"command": ["/usr/bin/echo", "-e", "{{DATE}}"]}, "children": ["C"]}})";
fs::path ofn = fs::current_path() / "generator_test_output.json";
std::ofstream ofh{ofn};
ofh << generatorOutput << std::endl;
ofh.close();
std::string generatorOutput = R"({"B": {"command": ["/usr/bin/echo", "{{DATE}}"], "children": ["C"]}})";
std::stringstream jsonTasks;
jsonTasks << R"({ "A": { "command": [ "/usr/bin/echo", )" << std::quoted(generatorOutput)
<< R"(], "children": ["C"], "isGenerator": true},)"
<< R"("C": { "command": [ "/usr/bin/echo", "hello!"] } })";
jsonTasks << R"({ "A": { "job": {"command": [ "/usr/bin/cat", )" << std::quoted(ofn.string())
<< R"(]}, "children": ["C"], "isGenerator": true},)"
<< R"("C": { "job": {"command": [ "/usr/bin/echo", "hello!"]} } })";
auto tasks = daggy::tasksFromJSON(jsonTasks.str());
auto baseTasks = daggy::tasksFromJSON(jsonTasks.str());
REQUIRE(baseTasks.size() == 2);
auto tasks = daggy::expandTaskSet(baseTasks, ex, params);
REQUIRE(tasks.size() == 2);
auto dag = daggy::buildDAGFromTasks(tasks);
REQUIRE(dag.size() == 2);
auto runID = logger.startDAGRun("generator_run", tasks);
auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params);
REQUIRE(finalDAG.allVisited());
REQUIRE(finalDAG.size() == 4);
// Check the logger
@@ -157,16 +167,15 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
REQUIRE(record.tasks.size() == 4);
REQUIRE(record.taskRunStates.size() == 4);
for (const auto & [taskName, attempts] : record.taskAttempts) {
for (const auto &[taskName, attempts]: record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.back().rc == 0);
}
// Ensure that children were updated properly
REQUIRE(record.tasks["A"].children == std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.tasks["A"].children == std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.tasks["B_0"].children == std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["B_1"].children == std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["C"].children.empty());
}
}