- Adding more documentation

- Renaming "parameters" to "taskParameters" for clarity, and to distinguish from "executionParameters" that will be implemented down the road.
This commit is contained in:
Ian Roddis
2021-08-23 14:17:22 -03:00
parent 6b9baffe27
commit 808a7f9af4
3 changed files with 197 additions and 28 deletions

217
README.md
View File

@@ -3,9 +3,36 @@ Daggy: Ya like dags?
Description
==
Daggy is a work orchestration framework for running workflows modeled as
directed, acyclic graphs (DAGs). These are quite useful when modeling
data ingestion / processing pipelines.
Daggy is a work orchestration framework for running workflows modeled as directed, acyclic graphs (DAGs). These are
quite useful when modeling data ingestion / processing pipelines.
Below is an example workflow where data is pulled from three sources (A, B, C), some work is done on them, and a report
is generated.
Each step depends on the success of its upstream dependencies, e.g. `Derive_Data_AB` can't run until `Transform_A` and
`Transform_B` have completed successfully.
```mermaid
graph LR
Pull_A-->Transform_A;
Pull_B-->Transform_B;
Pull_C-->Transform_C;
Transform_A-->Derive_Data_AB;
Transform_B-->Derive_Data_AB;
Derive_Data_AB-->Derive_Data_ABC;
Transform_C-->Derive_Data_ABC;
Derive_Data_ABC-->Report;
```
Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via
fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html)
or [kubernetes](https://kubernetes.io/) (both planned).
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and a
filesystem logger (FileSystemLogger). Future plans include supporting [redis](https://redis.io)
and [postgres](https://postgresql.org).
Building
==
@@ -19,27 +46,169 @@ cmake ..
make
```
Architecture
==
DAG Run Definition
===
- Server
- Interactive endpoint
- DAGs
- CRUDs DAG definitions
- Kick off runs
- Return status
- Scheduler
- Accepts Executors
- Accepts task lists and parameters
- Runs and monitors DAGs
daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is
specifically for submitting DAGs to the REST server for execution (a DAG run).
Flow
==
DAGs are defined in JSON as a set of `tasks`, along with optional `taskParameters` and `executionParameters` (future).
1. DAG Generated from JSON spec
2. Execution Pool Selected
3. DAGRun created from storage
4. DAGExecutor runs with
- Logger object
- Execution Pool
- DAG definition
Basic Definition
--
A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes:
| Attribute | Required | Description |
|------------|------------|--------------------------------------------------------|
| name | Yes | Name of this task. Must be unique. |
| command | Yes | The command to execute |
| maxRetries | No | If a task fails, how many times to retry (default: 0) |
| retry | No | How many seconds to wait between retries. |
| children | No | List of names of tasks that depend on this task |
| parents | No | List of names of tasks that this task depends on |
Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you
to define your task dependencies in the way that is most natural to how you think.
Below is an example DAG Run submission:
```json
{
"tasks": [
{
"name": "task_one",
"command": [
"/usr/bin/touch",
"/tmp/somefile"
],
"maxRetries": 3,
"retryIntervalSeconds": 30
},
{
"name": "task_two",
"command": [
"/usr/bin/touch",
"/tmp/someotherfile"
],
"maxRetries": 3,
"retryIntervalSeconds": 30
"parents": [
"task_one"
]
}
]
]
```
Task Parameters
--
Task commands can be parameterized by passing in an optional `taskParameters` member. Each parameter consists of a name
and either a string value, or an array of string values. Task commands will be regenerated based on the values of the
parameters.
For instance:
```json
{
"taskParameters": {
"DIRECTORY": "/var/tmp",
"FILE": "somefile"
},
"tasks": [
{
"name": "task_one",
"command": [
"/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
],
"maxRetries": 3,
"retryIntervalSeconds": 30
}
]
}
```
`task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be
populated from the `taskParameters` values.
In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the
cartesian product of all relevant values.
Example:
```json
{
"taskParameters": {
"DIRECTORY": "/var/tmp",
"FILE": "somefile",
"DATE": [
"2021-01-01",
"2021-02-01",
"2021-03-01"
]
},
"tasks": [
{
"name": "populate_inputs",
"command": [
"/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
]
},
{
"name": "calc_date",
"command": [
"/path/to/calculator",
"{{DIRECTORY}}/{{FILE}}",
"{{DATE}}"
]
},
{
"name": "generate_report",
"command": [
"/path/to/generator"
]
}
]
}
```
Conceptually, this DAG looks like this:
```mermaid
graph LR
populate_inputs-->calc_date
calc_date-->generate_report
```
Once the parameters have been populated, the new DAG will look like this:
```mermaid
graph LR
populate_inputs-->calc_date_1
populate_inputs-->calc_date_2
populate_inputs-->calc_date_3
calc_date_1-->generate_report
calc_date_2-->generate_report
calc_date_3-->generate_report
```
- `calc_date_1` will have the command `/path/to/calculator /var/tmp/somefile 2021-01-01`
- `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01`
- `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01`
Execution Parameters
--
(future work)
The REST server can be configured with multiple pools of executors. For instance, it might be helpful to run certain
jobs on slurm with a specific set of restrictions, or allow for local execution as well as execution on a slurm cluster.
`executionParameters` is a member passed in that alters how the DAG is executed.
| Attribute | Description |
|-----------|-------------|
| pool | Names the executor the DAG should run on |
| poolParameters | Any parameters the executor accepts that might modify how a task is run |

View File

@@ -83,7 +83,7 @@ namespace daggy {
/*
* {
* "name": "DAG Run Name"
* "parameters": {...}
* "taskParameters": {...}
* "tasks": {...}
*/
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
@@ -104,9 +104,9 @@ namespace daggy {
// Get parameters if there are any
ParameterValues parameters;
if (doc.HasMember("parameters")) {
if (doc.HasMember("taskParameters")) {
try {
auto parsedParams = parametersFromJSON(doc["parameters"].GetObject());
auto parsedParams = parametersFromJSON(doc["taskParameters"].GetObject());
parameters.swap(parsedParams);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());

View File

@@ -70,7 +70,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
SECTION("Simple DAGRun Submission") {
std::string dagRun = R"({
"name": "unit_server",
"parameters": { "FILE": [ "A", "B" ] },
"taskParameters": { "FILE": [ "A", "B" ] },
"tasks": [
{ "name": "touch",
"command": [ "/usr/bin/touch", "/tmp/{{FILE}}" ]