Squashed commit of the following: commit 29571182b1ec3b5be2cec3212c2bea1121a3dac2 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Feb 24 11:29:47 2022 -0400 Adding more elegant handling of tasks with no attempts commit 18c8ccb0863abbf6c9cc0efe5cc68df03a9eb80d Author: Ian Roddis <tech@kinesin.ca> Date: Thu Feb 24 11:18:59 2022 -0400 Better handling of no attempts at all commit 962f9f6e5e17f71bc3766553913774631f66e7ef Author: Ian Roddis <tech@kinesin.ca> Date: Thu Feb 24 11:10:28 2022 -0400 Adding fix for missing attempts commit 19b8203e952b3d21f4ff3f9b97a01c4d567ff1e7 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 16:56:37 2022 -0400 Adding webui instructions to readme commit 81383c80f01101828c0c49868916a2712d140f42 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 16:48:31 2022 -0400 Adding in route splatting to support static assets commit c9b39b307916c0fb1e88769d6986ddf7c3ba183a Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:11:11 2022 -0400 Cleanup commit 177819a1439cd1a0f32c652abf670f54457e105a Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:09:40 2022 -0400 Setting explicit url for extra CSS commit 78261129511c50657e7902934cee396eb1e4e3a8 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:08:27 2022 -0400 Moving webui commit 9f8db6e2c2c8a231060217cb82f1b13aabe4eae2 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 12:06:25 2022 -0400 Reorganizing elements, adding regex for run list commit f114250c9a506b2c0e9d642cc75749e99cc76cef Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 10:52:41 2022 -0400 Adding regex filtering to tasks commit 2de2f218416210443119aa88fa49c714197f4b16 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 10:42:22 2022 -0400 Adding in task details and getting the plumbing working commit 660a2078e22799ba51b4b8bbe5c12cd0f9315b0a Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 09:38:13 2022 -0400 Fixing remaining settings commit 1aa0dfe1c971a12dfed183586ee5a3206d452409 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 09:36:25 2022 -0400 Playing with settings commit 84cbd11c45651c7c6c96c16714e741b6aee10bc5 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 23 08:52:52 2022 -0400 Removing extra code commit 6e31646b7c62368cab22b3844a70943e0149ddc7 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 17:29:47 2022 -0400 Adding linter, renaming components to meet standards, fixing some mixups in settings commit 225442ee5732d007867e485ccea05293e3e5e1b7 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 17:25:27 2022 -0400 Fixing sorters commit eb0d7a4c4c30d8e8b43b574ed0c2f97515bb9353 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 16:46:41 2022 -0400 Controls are coming together commit b1789d1cc3c0bae170e0ca1a47cccfd344197244 Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 22 11:08:09 2022 -0400 More refactoring commit 6d0afce429aad00864482a2cc7dd731a53312e14 Author: Ian Roddis <tech@kinesin.ca> Date: Sun Feb 20 22:29:43 2022 -0400 figuring out layout commit 6af498f3aa7fe2f45121df2278cdfac297165c5c Author: Ian Roddis <tech@kinesin.ca> Date: Sun Feb 20 12:30:49 2022 -0400 Migrating to prop drilling / emiting commit dffe7059ce01209d2def6ef7c03bc750e31fe741 Author: Ian Roddis <tech@kinesin.ca> Date: Fri Feb 18 17:20:46 2022 -0400 Checkpointing work for now commit d6428ad59c9c05ab7fba82ce3c0441ac3f568796 Author: Ian Roddis <tech@kinesin.ca> Date: Fri Feb 18 17:05:37 2022 -0400 Adding in toggling for states commit b9a4f2dc02f327d3529821e217d3b6a00a84f202 Author: Ian Roddis <tech@kinesin.ca> Date: Fri Feb 18 16:43:01 2022 -0400 Reorganizing everything commit d33691d022597d1ff8f588450e147c72555be9f4 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 17:04:54 2022 -0400 Removing console logging commit 4537376ccad6fc0c52f0a7cfd2b2bf23f708196c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 17:04:27 2022 -0400 Refresh timer working now commit 213a3da4fd07c82cd18cd8c3b2422ddc78bd6fb4 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 16:40:45 2022 -0400 Adding timer commit ff495ac69563689ff4fc07119936079e57608ea7 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 16:02:53 2022 -0400 Refactoring some code, adding in endpoint to kill a running task commit 97ff28b9b1910e03e0f2725a3f54d2a07e53714c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 14:56:15 2022 -0400 Renaming UI commit affab06ad657833b73588eac919250935b353f31 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 13:29:31 2022 -0400 moving to bootstrap commit c40a2e58a86362863c905470f4417753aaf0dac2 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 12:33:08 2022 -0400 adding task button commit 420463b8d7f964baa0dfc7c87c2e9024bc8284cc Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 10:51:11 2022 -0400 checkpoint commit a7aa3db731255e7e13bc58d901b8eb1e30ede39c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 09:33:01 2022 -0400 Fixing up state commit 361b4cbcd8f1268eb9b494084d6862a6ab8f3a27 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 09:29:14 2022 -0400 Fixing event callbacks commit 388cada692dc8d7e0eff611467d4c77ce897a54c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 09:24:39 2022 -0400 Adding global state, task view and buttons commit cb5a3acef0bd982621678fbd44a133db56420871 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Feb 16 07:49:30 2022 -0400 Adding RunView commit 4c78ef1250709e7c8f5ef3433640fd8d1d319a8d Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 15 17:20:23 2022 -0400 checkpoint commit 2c5b610101e9c18ef1ad8f962d7309b63c80743c Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 15 17:10:06 2022 -0400 Adding explicit payload headers, adding vue and react apps commit 95ac6c05903bc83c6934db58b48649eee2038c3d Author: Ian Roddis <tech@kinesin.ca> Date: Tue Feb 15 12:56:57 2022 -0400 Adding CORS support, rough-in of webui
12 KiB
Daggy: Ya like dags?
Description
Daggy is a work orchestration framework for running workflows modeled as directed, acyclic graphs (DAGs). These are quite useful when modeling data ingestion / processing pipelines.
Below is an example workflow where data is pulled from three sources (A, B, C), some work is done on them, and a report is generated.
Each step depends on the success of its upstream dependencies, e.g. Derive_Data_AB can't run until Transform_A and
Transform_B have completed successfully.
graph LR
Pull_A-->Transform_A;
Pull_B-->Transform_B;
Pull_C-->Transform_C;
Transform_A-->Derive_Data_AB;
Transform_B-->Derive_Data_AB;
Derive_Data_AB-->Derive_Data_ABC;
Transform_C-->Derive_Data_ABC;
Derive_Data_ABC-->Report;
Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via fork), to distributed work managers like slurm or kubernetes (planned), or daggy's own executor.
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and RedisJSON.
Future plans include supporting postgres.
Building
Requirements:
-
git
-
cmake >= 3.14
-
gcc >= 8
-
npm (if you want the webui)
-
libslurm (if needed)
git clone https://gitlab.com/iroddis/daggy
cd daggy
mkdir build
cd build
cmake [-DDAGGY_ENABLE_SLURM=ON] ..
make
tests/tests # for unit tests
# Web UI
cd webui
npm install
npm run build
# Lauching daggyd with the web UI
build/bin/daggyd -v --assets-dir webui/dist
DAG Run Definition
daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is specifically for submitting DAGs to the REST server for execution (a DAG run).
DAGs are defined in JSON as a set of tasks, along with optional job and executionParameters (future).
Basic Definition
A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes:
| Attribute | Required | Description |
|---|---|---|
| name | Yes | Name of this task. Must be unique. |
| command | Yes | The command to execute |
| maxRetries | No | If a task fails, how many times to retry (default: 0) |
| retry | No | How many seconds to wait between retries. |
| children | No | List of names of tasks that depend on this task |
| parents | No | List of names of tasks that this task depends on |
| isGenerator | No | The output of this task generates additional task definitions |
Defining both parents and children are not required; one or the other is sufficient. Both are supported to allow you
to define your task dependencies in the way that is most natural to how you think.
Below is an example DAG Run submission:
{
"tasks": {
"task_one": {
"job": {
"command": [
"/bin/touch",
"/tmp/somefile"
]
},
"maxRetries": 3,
"retryIntervalSeconds": 30
},
"task_two": {
"job": {
"command": [
"/bin/touch",
"/tmp/someotherfile"
]
},
"maxRetries": 3,
"retryIntervalSeconds": 30,
"parents": [
"task_one"
]
}
}
}
Task Parameters
Task commands can be parameterized by passing in an optional parameters member. Each parameter consists of a name and
either a string value, or an array of string values. Tasks will be regenerated based on the values of the parameters.
For instance:
{
"parameters": {
"DIRECTORY": "/var/tmp",
"FILE": "somefile"
},
"tasks": {
"task_one": {
"job": {
"command": [
"/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
]
},
"maxRetries": 3,
"retryIntervalSeconds": 30
}
}
}
task_one's command, when run, will touch /var/tmp/somefile, since the values of DIRECTORY and FILE will be
populated from the job values.
In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the cartesian product of all relevant values.
Example:
{
"job": {
"DIRECTORY": "/var/tmp",
"FILE": "somefile",
"DATE": [
"2021-01-01",
"2021-02-01",
"2021-03-01"
]
},
"tasks": {
"populate_inputs": {
"job": {
"command": [
"/bin/touch",
"{{DIRECTORY}}/{{FILE}}"
]
}
},
"calc_date": {
"job": {
"command": [
"/path/to/calculator",
"{{DIRECTORY}}/{{FILE}}",
"{{DATE}}"
]
}
},
"generate_report": {
"job": {
"command": [
"/path/to/generator"
]
}
}
}
}
Conceptually, this DAG looks like this:
graph LR
populate_inputs-->calc_date
calc_date-->generate_report
Once the parameters have been populated, the new DAG will look like this:
graph LR
populate_inputs-->calc_date_1
populate_inputs-->calc_date_2
populate_inputs-->calc_date_3
calc_date_1-->generate_report
calc_date_2-->generate_report
calc_date_3-->generate_report
calc_date_1will have the command/path/to/calculator /var/tmp/somefile 2021-01-01calc_date_2will have the command/path/to/calculator /var/tmp/somefile 2021-02-01calc_date_3will have the command/path/to/calculator /var/tmp/somefile 2021-03-01
NB: When a task template resolves to multiple tasks instances, all of those new instances are still referred to by
the original name for the purposes of creating dependencies. e.g. to add a dependency dynamically (see next section),
you must refer to "children": [ "calc_date" ], not to the individual calc_date_1.
Tasks Generating Tasks
Some DAG structures can only be fully known at runtime. For instance, if a job pulls multiple files from a source, each of which can be processed independently, it would be nice if the DAG could modify itself on the fly to accomodate that request.
Enter the generator task. If a task is defined with "isGenerator": true, the output of the task is assumed to be a
JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above, using
the same parameter list as the original DAG. New tasks can define their own dependencies.
NB: Generated tasks won't have any children dependencies unless you define them. If there are parameterized
dependencies, you must use the name of the original task (e.g. use calc_date, not calc_date_1) to add a dependency.
NB: If you add a child dependency to a task that has already completed, that task won't restart. Best practice is to create a dependency from the generator task to the task the new tasks will depend on.
{
"tasks": {
"pull_files": {
"job": {
"command": [
"/path/to/puller/script",
"{{DATE}}"
]
},
"isGenerator": true,
children: [
"generate_report"
]
},
"generate_report": {
"job": {
"command": [
"/path/to/generator"
]
}
}
}
}
graph LR
pull_files-->generate_report
The output of the puller task might be:
{
"calc_date_a": {
"job": {
command
": [
"/path/to/calculator",
"/path/to/data/file/a"
]
},
"children": [
"generate_report"
]
},
"calc_date_b": {
"job": {
"command": [
"/path/to/calculator",
"/path/to/data/file/b"
]
},
"children": [
"generate_report"
]
}
}
Once the first task runs, its output is parse as additional tasks to run. The new DAG will look like this:
graph LR
pull_files-->generate_report
pull_files-->calc_file_a
pull_files-->calc_file_b
calc_file_a-->generate_report
calc_file_b-->generate_report
Note that it was important that generate_report depend on pull_files, otherwise the two task would run concurrently,
and the generate_report wouldn't have any files to report on.
Execution Parameters
(future work)
The REST server can be configured with multiple pools of executors. For instance, it might be helpful to run certain jobs on slurm with a specific set of restrictions, or allow for local execution as well as execution on a slurm cluster.
executionParameters is a member passed in that alters how the DAG is executed.
| Attribute | Description |
|---|---|
| pool | Names the executor the DAG should run on |
| poolParameters | Any parameters the executor accepts that might modify how a task is run |
Default Job Values
A DAG can be submitted with the extra section jobDefaults. These values will be used to fill in default values for all
tasks if they aren't overridden. This can be useful for cases like Slurm execution, where tasks will share default
memory and runtime requirements.
Executors
Different executors require different structures for the job task member.
Local Executor (ForkingTaskExecutor)
The ForkingTaskExecutor runs tasks on the local box, forking to run the task, and using threads to monitor completion and capture output.
| Field | Sample | Description |
|---|---|---|
| command | [ "/bin/echo", "param1" ] |
The command to run |
| commandString | "/bin/echo param1" |
The command to run as a string. Quoted args are properly handled. |
| environment | [ "DATE=2021-05-03" ] |
Environment variables to set for script |
Slurm Executor (SlurmTaskExecutor)
The slurm executor requires that the daggy server be running on a node capable of submitting jobs.
To enable slurm support use cmake -DDAGGY_ENABLE_SLURM=ON .. when configuring the project.
Required job config values:
| Field | Sample | Description |
|---|---|---|
| command | [ "/bin/echo", "param1" ] |
The command to run on a slurm host |
| commandString | "/bin/echo param1" |
The command to run as a string. Quoted args are properly handled. |
| environment | [ "DATE=2021-05-03" ] |
Environment variables to set for script |
| minCPUs | "1" |
Minimum number of CPUs required |
| minMemoryMB | "1" |
Minimum memory required, in MB |
| minTmpDiskMB | "1" |
Minimum temporary disk required, in MB |
| priority | "100" |
Slurm priority |
| timeLimitSeconds | "100" |
Number of seconds to allow the job to run for |
| userID | "1002" |
Numeric UID that the job should run as |
| workDir | "/tmp/" |
Directory to use for work |
| tmpDir | "/tmp/" |
Directory to use for temporary files, as well as stdout/stderr capture |
Daggy will submit the command to run, capturing the output in ${tmpDir}/${taskName}_{RANDOM}.{stderr,stdout} . Those
files will then be read after the task has completed, and stored in the AttemptRecord for later retrieval.
For this reason, it's important that the tmpDir directory be readable by the daggy engine. i.e in a distributed
environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy,
although it will still be available wherever it was written by slurm.
DaggyRunnerTaskExecutor
Daggy Runners (daggyr in this project) are daemons that can be run on remote hosts, then allocated work.
Tasks submitted to this type of runner require cores and memoryMB attributes. Remote runners have a specific
capacity that are consumed when tasks run on them. Right now those capacities are merely advisory; it's possible
to oversubscribe a runner, and the constraints are not enforced.
Enforcement via cgroups is planned.
| Field | Sample | Description |
|---|---|---|
| command | [ "/bin/echo", "param1" ] |
The command to run |
| commandString | "/bin/echo param1" |
The command to run as a string. Quoted args are properly handled. |
| environment | [ "DATE=2021-05-03" ] |
Environment variables to set for script |
| cores | "1" | Number of cores required by the task |
| memoryMB | "100" | Amount of memory (RSS) required by the task, in MB |