220 lines
5.9 KiB
Markdown
220 lines
5.9 KiB
Markdown
Daggy: Ya like dags?
|
|
|
|
Description
|
|
==
|
|
|
|
Daggy is a work orchestration framework for running workflows modeled as directed, acyclic graphs (DAGs). These are
|
|
quite useful when modeling data ingestion / processing pipelines.
|
|
|
|
Below is an example workflow where data is pulled from three sources (A, B, C), some work is done on them, and a report
|
|
is generated.
|
|
|
|
Each step depends on the success of its upstream dependencies, e.g. `Derive_Data_AB` can't run until `Transform_A` and
|
|
`Transform_B` have completed successfully.
|
|
|
|
```mermaid
|
|
graph LR
|
|
Pull_A-->Transform_A;
|
|
Pull_B-->Transform_B;
|
|
Pull_C-->Transform_C;
|
|
|
|
Transform_A-->Derive_Data_AB;
|
|
Transform_B-->Derive_Data_AB;
|
|
Derive_Data_AB-->Derive_Data_ABC;
|
|
Transform_C-->Derive_Data_ABC;
|
|
|
|
Derive_Data_ABC-->Report;
|
|
```
|
|
|
|
Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via
|
|
fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html)
|
|
or [kubernetes](https://kubernetes.io/) (both planned).
|
|
|
|
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and a
|
|
filesystem logger (FileSystemLogger). Future plans include supporting [redis](https://redis.io)
|
|
and [postgres](https://postgresql.org).
|
|
|
|
Building
|
|
==
|
|
|
|
**Requirements:**
|
|
|
|
- git
|
|
- cmake >= 3.14
|
|
- gcc >= 8
|
|
|
|
```
|
|
git clone https://gitlab.com/iroddis/daggy
|
|
cd daggy
|
|
mkdir build
|
|
cd build
|
|
cmake ..
|
|
make
|
|
```
|
|
|
|
DAG Run Definition
|
|
===
|
|
|
|
daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is
|
|
specifically for submitting DAGs to the REST server for execution (a DAG run).
|
|
|
|
DAGs are defined in JSON as a set of `tasks`, along with optional `taskParameters` and `executionParameters` (future).
|
|
|
|
Basic Definition
|
|
--
|
|
|
|
A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes:
|
|
|
|
| Attribute | Required | Description |
|
|
|------------|------------|--------------------------------------------------------|
|
|
| name | Yes | Name of this task. Must be unique. |
|
|
| command | Yes | The command to execute |
|
|
| maxRetries | No | If a task fails, how many times to retry (default: 0) |
|
|
| retry | No | How many seconds to wait between retries. |
|
|
| children | No | List of names of tasks that depend on this task |
|
|
| parents | No | List of names of tasks that this task depends on |
|
|
|
|
Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you
|
|
to define your task dependencies in the way that is most natural to how you think.
|
|
|
|
Below is an example DAG Run submission:
|
|
|
|
```json
|
|
{
|
|
"tasks": [
|
|
{
|
|
"name": "task_one",
|
|
"command": [
|
|
"/usr/bin/touch",
|
|
"/tmp/somefile"
|
|
],
|
|
"maxRetries": 3,
|
|
"retryIntervalSeconds": 30
|
|
},
|
|
{
|
|
"name": "task_two",
|
|
"command": [
|
|
"/usr/bin/touch",
|
|
"/tmp/someotherfile"
|
|
],
|
|
"maxRetries": 3,
|
|
"retryIntervalSeconds": 30,
|
|
"parents": [
|
|
"task_one"
|
|
]
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
Task Parameters
|
|
--
|
|
|
|
Task commands can be parameterized by passing in an optional `taskParameters` member. Each parameter consists of a name
|
|
and either a string value, or an array of string values. Task commands will be regenerated based on the values of the
|
|
parameters.
|
|
|
|
For instance:
|
|
|
|
```json
|
|
{
|
|
"taskParameters": {
|
|
"DIRECTORY": "/var/tmp",
|
|
"FILE": "somefile"
|
|
},
|
|
"tasks": [
|
|
{
|
|
"name": "task_one",
|
|
"command": [
|
|
"/usr/bin/touch",
|
|
"{{DIRECTORY}}/{{FILE}}"
|
|
],
|
|
"maxRetries": 3,
|
|
"retryIntervalSeconds": 30
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
`task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be
|
|
populated from the `taskParameters` values.
|
|
|
|
In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the
|
|
cartesian product of all relevant values.
|
|
|
|
Example:
|
|
|
|
```json
|
|
{
|
|
"taskParameters": {
|
|
"DIRECTORY": "/var/tmp",
|
|
"FILE": "somefile",
|
|
"DATE": [
|
|
"2021-01-01",
|
|
"2021-02-01",
|
|
"2021-03-01"
|
|
]
|
|
},
|
|
"tasks": [
|
|
{
|
|
"name": "populate_inputs",
|
|
"command": [
|
|
"/usr/bin/touch",
|
|
"{{DIRECTORY}}/{{FILE}}"
|
|
]
|
|
},
|
|
{
|
|
"name": "calc_date",
|
|
"command": [
|
|
"/path/to/calculator",
|
|
"{{DIRECTORY}}/{{FILE}}",
|
|
"{{DATE}}"
|
|
]
|
|
},
|
|
{
|
|
"name": "generate_report",
|
|
"command": [
|
|
"/path/to/generator"
|
|
]
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
Conceptually, this DAG looks like this:
|
|
|
|
```mermaid
|
|
graph LR
|
|
populate_inputs-->calc_date
|
|
calc_date-->generate_report
|
|
```
|
|
|
|
Once the parameters have been populated, the new DAG will look like this:
|
|
|
|
```mermaid
|
|
graph LR
|
|
populate_inputs-->calc_date_1
|
|
populate_inputs-->calc_date_2
|
|
populate_inputs-->calc_date_3
|
|
calc_date_1-->generate_report
|
|
calc_date_2-->generate_report
|
|
calc_date_3-->generate_report
|
|
```
|
|
|
|
- `calc_date_1` will have the command `/path/to/calculator /var/tmp/somefile 2021-01-01`
|
|
- `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01`
|
|
- `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01`
|
|
|
|
Execution Parameters
|
|
--
|
|
(future work)
|
|
|
|
The REST server can be configured with multiple pools of executors. For instance, it might be helpful to run certain
|
|
jobs on slurm with a specific set of restrictions, or allow for local execution as well as execution on a slurm cluster.
|
|
|
|
`executionParameters` is a member passed in that alters how the DAG is executed.
|
|
|
|
| Attribute | Description |
|
|
|-----------|-------------|
|
|
| pool | Names the executor the DAG should run on |
|
|
| poolParameters | Any parameters the executor accepts that might modify how a task is run | |