Squashed commit of the following: commit 29571182b1ec3b5be2cec3212c2bea1121a3dac2 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Feb 24 11:29:47 2022 -0400 Adding more elegant handling of tasks with no attempts commit 18c8ccb0863abbf6c9cc0efe5cc68df03a9eb80d Author: Ian Roddis <tech@kinesin.ca> Date: Thu Feb 24 11:18:59 2022 -0400 Better handling of no attempts at all commit 962f9f6e5e17f71bc3766553913774631f66e7ef Author: Ian Roddis <tech@kinesin.ca> Date: Thu Feb 24 11:10:28 2022 -0400 Adding fix for missing attempts commit 19b8203e952b3d21f4ff3f9b97a01c4d567ff1e7 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 16:56:37 2022 -0400 Adding webui instructions to readme commit 81383c80f01101828c0c49868916a2712d140f42 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 16:48:31 2022 -0400 Adding in route splatting to support static assets commit c9b39b307916c0fb1e88769d6986ddf7c3ba183a Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:11:11 2022 -0400 Cleanup commit 177819a1439cd1a0f32c652abf670f54457e105a Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:09:40 2022 -0400 Setting explicit url for extra CSS commit 78261129511c50657e7902934cee396eb1e4e3a8 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:08:27 2022 -0400 Moving webui commit 9f8db6e2c2c8a231060217cb82f1b13aabe4eae2 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:06:25 2022 -0400 Reorganizing elements, adding regex for run list commit f114250c9a506b2c0e9d642cc75749e99cc76cef Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 10:52:41 2022 -0400 Adding regex filtering to tasks commit 2de2f218416210443119aa88fa49c714197f4b16 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 10:42:22 2022 -0400 Adding in task details and getting the plumbing working commit 660a2078e22799ba51b4b8bbe5c12cd0f9315b0a Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 09:38:13 2022 -0400 Fixing remaining settings commit 1aa0dfe1c971a12dfed183586ee5a3206d452409 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 09:36:25 2022 -0400 Playing with settings commit 84cbd11c45651c7c6c96c16714e741b6aee10bc5 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 08:52:52 2022 -0400 Removing extra code commit 6e31646b7c62368cab22b3844a70943e0149ddc7 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 17:29:47 2022 -0400 Adding linter, renaming components to meet standards, fixing some mixups in settings commit 225442ee5732d007867e485ccea05293e3e5e1b7 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 17:25:27 2022 -0400 Fixing sorters commit eb0d7a4c4c30d8e8b43b574ed0c2f97515bb9353 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 16:46:41 2022 -0400 Controls are coming together commit b1789d1cc3c0bae170e0ca1a47cccfd344197244 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 11:08:09 2022 -0400 More refactoring commit 6d0afce429aad00864482a2cc7dd731a53312e14 Author: Ian Roddis <tech@kinesin.ca> Date: Sun Feb 20 22:29:43 2022 -0400 figuring out layout commit 6af498f3aa7fe2f45121df2278cdfac297165c5c Author: Ian Roddis <tech@kinesin.ca> Date: Sun Feb 20 12:30:49 2022 -0400 Migrating to prop drilling / emiting commit dffe7059ce01209d2def6ef7c03bc750e31fe741 Author: Ian Roddis <tech@kinesin.ca> Date: Fri Feb 18 17:20:46 2022 -0400 Checkpointing work for now commit d6428ad59c9c05ab7fba82ce3c0441ac3f568796 Author: Ian Roddis <tech@kinesin.ca> Date: Fri Feb 18 17:05:37 2022 -0400 Adding in toggling for states commit b9a4f2dc02f327d3529821e217d3b6a00a84f202 Author: Ian Roddis <tech@kinesin.ca> Date: Fri Feb 18 16:43:01 2022 -0400 Reorganizing everything commit d33691d022597d1ff8f588450e147c72555be9f4 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 17:04:54 2022 -0400 Removing console logging commit 4537376ccad6fc0c52f0a7cfd2b2bf23f708196c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 17:04:27 2022 -0400 Refresh timer working now commit 213a3da4fd07c82cd18cd8c3b2422ddc78bd6fb4 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 16:40:45 2022 -0400 Adding timer commit ff495ac69563689ff4fc07119936079e57608ea7 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 16:02:53 2022 -0400 Refactoring some code, adding in endpoint to kill a running task commit 97ff28b9b1910e03e0f2725a3f54d2a07e53714c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 14:56:15 2022 -0400 Renaming UI commit affab06ad657833b73588eac919250935b353f31 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 13:29:31 2022 -0400 moving to bootstrap commit c40a2e58a86362863c905470f4417753aaf0dac2 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 12:33:08 2022 -0400 adding task button commit 420463b8d7f964baa0dfc7c87c2e9024bc8284cc Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 10:51:11 2022 -0400 checkpoint commit a7aa3db731255e7e13bc58d901b8eb1e30ede39c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 09:33:01 2022 -0400 Fixing up state commit 361b4cbcd8f1268eb9b494084d6862a6ab8f3a27 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 09:29:14 2022 -0400 Fixing event callbacks commit 388cada692dc8d7e0eff611467d4c77ce897a54c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 09:24:39 2022 -0400 Adding global state, task view and buttons commit cb5a3acef0bd982621678fbd44a133db56420871 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 07:49:30 2022 -0400 Adding RunView commit 4c78ef1250709e7c8f5ef3433640fd8d1d319a8d Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 15 17:20:23 2022 -0400 checkpoint commit 2c5b610101e9c18ef1ad8f962d7309b63c80743c Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 15 17:10:06 2022 -0400 Adding explicit payload headers, adding vue and react apps commit 95ac6c05903bc83c6934db58b48649eee2038c3d Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 15 12:56:57 2022 -0400 Adding CORS support, rough-in of webui
417 lines
12 KiB
Markdown
417 lines
12 KiB
Markdown
Daggy: Ya like dags?
|
|
|
|
Description
|
|
==
|
|
|
|
Daggy is a work orchestration framework for running workflows modeled as directed, acyclic graphs (DAGs). These are
|
|
quite useful when modeling data ingestion / processing pipelines.
|
|
|
|
Below is an example workflow where data is pulled from three sources (A, B, C), some work is done on them, and a report
|
|
is generated.
|
|
|
|
Each step depends on the success of its upstream dependencies, e.g. `Derive_Data_AB` can't run until `Transform_A` and
|
|
`Transform_B` have completed successfully.
|
|
|
|
```mermaid
|
|
graph LR
|
|
Pull_A-->Transform_A;
|
|
Pull_B-->Transform_B;
|
|
Pull_C-->Transform_C;
|
|
|
|
Transform_A-->Derive_Data_AB;
|
|
Transform_B-->Derive_Data_AB;
|
|
Derive_Data_AB-->Derive_Data_ABC;
|
|
Transform_C-->Derive_Data_ABC;
|
|
|
|
Derive_Data_ABC-->Report;
|
|
```
|
|
|
|
Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via
|
|
fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html)
|
|
or [kubernetes](https://kubernetes.io/) (planned), or daggy's own executor.
|
|
|
|
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and
|
|
[RedisJSON](https://oss.redis.com/redisjson/).
|
|
|
|
Future plans include supporting [postgres](https://postgresql.org).
|
|
|
|
Building
|
|
==
|
|
|
|
**Requirements:**
|
|
|
|
- git
|
|
- cmake >= 3.14
|
|
- gcc >= 8
|
|
|
|
- npm (if you want the webui)
|
|
- libslurm (if needed)
|
|
|
|
```sh
|
|
git clone https://gitlab.com/iroddis/daggy
|
|
cd daggy
|
|
mkdir build
|
|
cd build
|
|
cmake [-DDAGGY_ENABLE_SLURM=ON] ..
|
|
make
|
|
|
|
tests/tests # for unit tests
|
|
|
|
# Web UI
|
|
cd webui
|
|
npm install
|
|
npm run build
|
|
|
|
# Lauching daggyd with the web UI
|
|
build/bin/daggyd -v --assets-dir webui/dist
|
|
```
|
|
|
|
DAG Run Definition
|
|
===
|
|
|
|
daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is
|
|
specifically for submitting DAGs to the REST server for execution (a DAG run).
|
|
|
|
DAGs are defined in JSON as a set of `tasks`, along with optional `job` and `executionParameters` (future).
|
|
|
|
Basic Definition
|
|
--
|
|
|
|
A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes:
|
|
|
|
| Attribute | Required | Description |
|
|
|--------------|--------------|---------------------------------------------------------------|
|
|
| name | Yes | Name of this task. Must be unique. |
|
|
| command | Yes | The command to execute |
|
|
| maxRetries | No | If a task fails, how many times to retry (default: 0) |
|
|
| retry | No | How many seconds to wait between retries. |
|
|
| children | No | List of names of tasks that depend on this task |
|
|
| parents | No | List of names of tasks that this task depends on |
|
|
| isGenerator | No | The output of this task generates additional task definitions |
|
|
|
|
Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you
|
|
to define your task dependencies in the way that is most natural to how you think.
|
|
|
|
Below is an example DAG Run submission:
|
|
|
|
```json
|
|
{
|
|
"tasks": {
|
|
"task_one": {
|
|
"job": {
|
|
"command": [
|
|
"/bin/touch",
|
|
"/tmp/somefile"
|
|
]
|
|
},
|
|
"maxRetries": 3,
|
|
"retryIntervalSeconds": 30
|
|
},
|
|
"task_two": {
|
|
"job": {
|
|
"command": [
|
|
"/bin/touch",
|
|
"/tmp/someotherfile"
|
|
]
|
|
},
|
|
"maxRetries": 3,
|
|
"retryIntervalSeconds": 30,
|
|
"parents": [
|
|
"task_one"
|
|
]
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
Task Parameters
|
|
--
|
|
|
|
Task commands can be parameterized by passing in an optional `parameters` member. Each parameter consists of a name and
|
|
either a string value, or an array of string values. Tasks will be regenerated based on the values of the parameters.
|
|
|
|
For instance:
|
|
|
|
```json
|
|
{
|
|
"parameters": {
|
|
"DIRECTORY": "/var/tmp",
|
|
"FILE": "somefile"
|
|
},
|
|
"tasks": {
|
|
"task_one": {
|
|
"job": {
|
|
"command": [
|
|
"/bin/touch",
|
|
"{{DIRECTORY}}/{{FILE}}"
|
|
]
|
|
},
|
|
"maxRetries": 3,
|
|
"retryIntervalSeconds": 30
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
`task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be
|
|
populated from the `job` values.
|
|
|
|
In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the
|
|
cartesian product of all relevant values.
|
|
|
|
Example:
|
|
|
|
```json
|
|
{
|
|
"job": {
|
|
"DIRECTORY": "/var/tmp",
|
|
"FILE": "somefile",
|
|
"DATE": [
|
|
"2021-01-01",
|
|
"2021-02-01",
|
|
"2021-03-01"
|
|
]
|
|
},
|
|
"tasks": {
|
|
"populate_inputs": {
|
|
"job": {
|
|
"command": [
|
|
"/bin/touch",
|
|
"{{DIRECTORY}}/{{FILE}}"
|
|
]
|
|
}
|
|
},
|
|
"calc_date": {
|
|
"job": {
|
|
"command": [
|
|
"/path/to/calculator",
|
|
"{{DIRECTORY}}/{{FILE}}",
|
|
"{{DATE}}"
|
|
]
|
|
}
|
|
},
|
|
"generate_report": {
|
|
"job": {
|
|
"command": [
|
|
"/path/to/generator"
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
Conceptually, this DAG looks like this:
|
|
|
|
```mermaid
|
|
graph LR
|
|
populate_inputs-->calc_date
|
|
calc_date-->generate_report
|
|
```
|
|
|
|
Once the parameters have been populated, the new DAG will look like this:
|
|
|
|
```mermaid
|
|
graph LR
|
|
populate_inputs-->calc_date_1
|
|
populate_inputs-->calc_date_2
|
|
populate_inputs-->calc_date_3
|
|
calc_date_1-->generate_report
|
|
calc_date_2-->generate_report
|
|
calc_date_3-->generate_report
|
|
```
|
|
|
|
- `calc_date_1` will have the command `/path/to/calculator /var/tmp/somefile 2021-01-01`
|
|
- `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01`
|
|
- `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01`
|
|
|
|
**NB**: When a task template resolves to multiple tasks instances, all of those new instances are still referred to by
|
|
the original name for the purposes of creating dependencies. e.g. to add a dependency dynamically (see next section),
|
|
you must refer to `"children": [ "calc_date" ]`, not to the individual `calc_date_1`.
|
|
|
|
Tasks Generating Tasks
|
|
----------------------
|
|
|
|
Some DAG structures can only be fully known at runtime. For instance, if a job pulls multiple files from a source, each
|
|
of which can be processed independently, it would be nice if the DAG could modify itself on the fly to accomodate that
|
|
request.
|
|
|
|
Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be a
|
|
JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above, using
|
|
the same parameter list as the original DAG. New tasks can define their own dependencies.
|
|
|
|
**NB:** Generated tasks won't have any children dependencies unless you define them. If there are parameterized
|
|
dependencies, you must use the name of the original task (e.g. use `calc_date`, not `calc_date_1`) to add a dependency.
|
|
|
|
**NB:** If you add a child dependency to a task that has already completed, that task won't restart. Best practice is to
|
|
create a dependency from the generator task to the task the new tasks will depend on.
|
|
|
|
```json
|
|
{
|
|
"tasks": {
|
|
"pull_files": {
|
|
"job": {
|
|
"command": [
|
|
"/path/to/puller/script",
|
|
"{{DATE}}"
|
|
]
|
|
},
|
|
"isGenerator": true,
|
|
children: [
|
|
"generate_report"
|
|
]
|
|
},
|
|
"generate_report": {
|
|
"job": {
|
|
"command": [
|
|
"/path/to/generator"
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
```mermaid
|
|
graph LR
|
|
pull_files-->generate_report
|
|
```
|
|
|
|
The output of the puller task might be:
|
|
|
|
```json
|
|
{
|
|
"calc_date_a": {
|
|
"job": {
|
|
command
|
|
": [
|
|
"/path/to/calculator",
|
|
"/path/to/data/file/a"
|
|
]
|
|
},
|
|
"children": [
|
|
"generate_report"
|
|
]
|
|
},
|
|
"calc_date_b": {
|
|
"job": {
|
|
"command": [
|
|
"/path/to/calculator",
|
|
"/path/to/data/file/b"
|
|
]
|
|
},
|
|
"children": [
|
|
"generate_report"
|
|
]
|
|
}
|
|
}
|
|
```
|
|
|
|
Once the first task runs, its output is parse as additional tasks to run. The new DAG will look like this:
|
|
|
|
```mermaid
|
|
graph LR
|
|
pull_files-->generate_report
|
|
pull_files-->calc_file_a
|
|
pull_files-->calc_file_b
|
|
calc_file_a-->generate_report
|
|
calc_file_b-->generate_report
|
|
```
|
|
|
|
Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would run concurrently,
|
|
and the `generate_report` wouldn't have any files to report on.
|
|
|
|
Execution Parameters
|
|
--
|
|
(future work)
|
|
|
|
The REST server can be configured with multiple pools of executors. For instance, it might be helpful to run certain
|
|
jobs on slurm with a specific set of restrictions, or allow for local execution as well as execution on a slurm cluster.
|
|
|
|
`executionParameters` is a member passed in that alters how the DAG is executed.
|
|
|
|
| Attribute | Description |
|
|
|-----------|-------------|
|
|
| pool | Names the executor the DAG should run on |
|
|
| poolParameters | Any parameters the executor accepts that might modify how a task is run |
|
|
|
|
Default Job Values
|
|
------------------
|
|
|
|
A DAG can be submitted with the extra section `jobDefaults`. These values will be used to fill in default values for all
|
|
tasks if they aren't overridden. This can be useful for cases like Slurm execution, where tasks will share default
|
|
memory and runtime requirements.
|
|
|
|
Executors
|
|
=========
|
|
|
|
Different executors require different structures for the `job` task member.
|
|
|
|
Local Executor (ForkingTaskExecutor)
|
|
------------------------------------
|
|
|
|
The ForkingTaskExecutor runs tasks on the local box, forking to run the task, and using threads to monitor completion
|
|
and capture output.
|
|
|
|
| Field | Sample | Description |
|
|
|---------|--------|--------------|
|
|
| command | `[ "/bin/echo", "param1" ]` | The command to run |
|
|
| commandString | `"/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. |
|
|
| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script |
|
|
|
|
Slurm Executor (SlurmTaskExecutor)
|
|
----------------------------------
|
|
|
|
The slurm executor requires that the daggy server be running on a node capable of submitting jobs.
|
|
|
|
To enable slurm support use `cmake -DDAGGY_ENABLE_SLURM=ON ..` when configuring the project.
|
|
|
|
Required `job` config values:
|
|
|
|
| Field | Sample | Description |
|
|
|---------|--------|--------------|
|
|
| command | `[ "/bin/echo", "param1" ]` | The command to run on a slurm host |
|
|
| commandString | `"/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. |
|
|
| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script |
|
|
| minCPUs | `"1"` | Minimum number of CPUs required |
|
|
| minMemoryMB | `"1"` | Minimum memory required, in MB |
|
|
| minTmpDiskMB | `"1"` | Minimum temporary disk required, in MB |
|
|
| priority | `"100"` | Slurm priority |
|
|
| timeLimitSeconds | `"100"` | Number of seconds to allow the job to run for |
|
|
| userID | `"1002"` | Numeric UID that the job should run as |
|
|
| workDir | `"/tmp/"` | Directory to use for work |
|
|
| tmpDir | `"/tmp/"` | Directory to use for temporary files, as well as stdout/stderr capture |
|
|
|
|
Daggy will submit the `command` to run, capturing the output in `${tmpDir}/${taskName}_{RANDOM}.{stderr,stdout}` . Those
|
|
files will then be read after the task has completed, and stored in the AttemptRecord for later retrieval.
|
|
|
|
For this reason, it's important that the `tmpDir` directory **be readable by the daggy engine**. i.e in a distributed
|
|
environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy,
|
|
although it will still be available wherever it was written by slurm.
|
|
|
|
DaggyRunnerTaskExecutor
|
|
-----------------------
|
|
|
|
Daggy Runners (`daggyr` in this project) are daemons that can be run on remote hosts, then allocated work.
|
|
|
|
Tasks submitted to this type of runner require `cores` and `memoryMB` attributes. Remote runners have a specific
|
|
capacity that are consumed when tasks run on them. Right now those capacities are merely advisory; it's possible
|
|
to oversubscribe a runner, and the constraints are not enforced.
|
|
|
|
Enforcement via cgroups is planned.
|
|
|
|
|
|
| Field | Sample | Description |
|
|
|---------|--------|--------------|
|
|
| command | `[ "/bin/echo", "param1" ]` | The command to run |
|
|
| commandString | `"/bin/echo param1"` | The command to run as a string. Quoted args are properly handled. |
|
|
| environment | `[ "DATE=2021-05-03" ]` | Environment variables to set for script |
|
|
| cores | "1" | Number of cores required by the task |
|
|
| memoryMB | "100" | Amount of memory (RSS) required by the task, in MB |
|
|
|
|
Loggers
|
|
=======
|
|
|
|
RedisJSON
|
|
---------
|