diff --git a/README.md b/README.md index aca019b..b9146dd 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,36 @@ Daggy: Ya like dags? Description == -Daggy is a work orchestration framework for running workflows modeled as -directed, acyclic graphs (DAGs). These are quite useful when modeling -data ingestion / processing pipelines. +Daggy is a work orchestration framework for running workflows modeled as directed, acyclic graphs (DAGs). These are +quite useful when modeling data ingestion / processing pipelines. + +Below is an example workflow where data is pulled from three sources (A, B, C), some work is done on them, and a report +is generated. + +Each step depends on the success of its upstream dependencies, e.g. `Derive_Data_AB` can't run until `Transform_A` and +`Transform_B` have completed successfully. + +```mermaid +graph LR + Pull_A-->Transform_A; + Pull_B-->Transform_B; + Pull_C-->Transform_C; + + Transform_A-->Derive_Data_AB; + Transform_B-->Derive_Data_AB; + Derive_Data_AB-->Derive_Data_ABC; + Transform_C-->Derive_Data_ABC; + + Derive_Data_ABC-->Report; +``` + +Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via +fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html) +or [kubernetes](https://kubernetes.io/) (both planned). + +State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and a +filesystem logger (FileSystemLogger). Future plans include supporting [redis](https://redis.io) +and [postgres](https://postgresql.org). Building == @@ -19,27 +46,169 @@ cmake .. make ``` -Architecture -== +DAG Run Definition +=== -- Server - - Interactive endpoint - - DAGs - - CRUDs DAG definitions - - Kick off runs - - Return status -- Scheduler - - Accepts Executors - - Accepts task lists and parameters - - Runs and monitors DAGs +daggy works as a standalone library, but generally runs as a service with a REST interface. This documentation is +specifically for submitting DAGs to the REST server for execution (a DAG run). -Flow -== +DAGs are defined in JSON as a set of `tasks`, along with optional `taskParameters` and `executionParameters` (future). -1. DAG Generated from JSON spec -2. Execution Pool Selected -3. DAGRun created from storage -4. DAGExecutor runs with - - Logger object - - Execution Pool - - DAG definition \ No newline at end of file +Basic Definition +-- + +A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes: + +| Attribute | Required | Description | +|------------|------------|--------------------------------------------------------| +| name | Yes | Name of this task. Must be unique. | +| command | Yes | The command to execute | +| maxRetries | No | If a task fails, how many times to retry (default: 0) | +| retry | No | How many seconds to wait between retries. | +| children | No | List of names of tasks that depend on this task | +| parents | No | List of names of tasks that this task depends on | + +Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you +to define your task dependencies in the way that is most natural to how you think. + +Below is an example DAG Run submission: + +```json +{ + "tasks": [ + { + "name": "task_one", + "command": [ + "/usr/bin/touch", + "/tmp/somefile" + ], + "maxRetries": 3, + "retryIntervalSeconds": 30 + }, + { + "name": "task_two", + "command": [ + "/usr/bin/touch", + "/tmp/someotherfile" + ], + "maxRetries": 3, + "retryIntervalSeconds": 30 + "parents": [ + "task_one" + ] + } + ] +] +``` + +Task Parameters +-- + +Task commands can be parameterized by passing in an optional `taskParameters` member. Each parameter consists of a name +and either a string value, or an array of string values. Task commands will be regenerated based on the values of the +parameters. + +For instance: + +```json +{ + "taskParameters": { + "DIRECTORY": "/var/tmp", + "FILE": "somefile" + }, + "tasks": [ + { + "name": "task_one", + "command": [ + "/usr/bin/touch", + "{{DIRECTORY}}/{{FILE}}" + ], + "maxRetries": 3, + "retryIntervalSeconds": 30 + } + ] +} +``` + +`task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be +populated from the `taskParameters` values. + +In the case where a parameter has an array of values, any tasks referencing that value will be duplicated with the +cartesian product of all relevant values. + +Example: + +```json +{ + "taskParameters": { + "DIRECTORY": "/var/tmp", + "FILE": "somefile", + "DATE": [ + "2021-01-01", + "2021-02-01", + "2021-03-01" + ] + }, + "tasks": [ + { + "name": "populate_inputs", + "command": [ + "/usr/bin/touch", + "{{DIRECTORY}}/{{FILE}}" + ] + }, + { + "name": "calc_date", + "command": [ + "/path/to/calculator", + "{{DIRECTORY}}/{{FILE}}", + "{{DATE}}" + ] + }, + { + "name": "generate_report", + "command": [ + "/path/to/generator" + ] + } + ] +} +``` + +Conceptually, this DAG looks like this: + +```mermaid +graph LR + populate_inputs-->calc_date + calc_date-->generate_report +``` + +Once the parameters have been populated, the new DAG will look like this: + +```mermaid +graph LR + populate_inputs-->calc_date_1 + populate_inputs-->calc_date_2 + populate_inputs-->calc_date_3 + calc_date_1-->generate_report + calc_date_2-->generate_report + calc_date_3-->generate_report +``` + +- `calc_date_1` will have the command `/path/to/calculator /var/tmp/somefile 2021-01-01` +- `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01` +- `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01` + +Execution Parameters +-- +(future work) + +The REST server can be configured with multiple pools of executors. For instance, it might be helpful to run certain +jobs on slurm with a specific set of restrictions, or allow for local execution as well as execution on a slurm cluster. + +`executionParameters` is a member passed in that alters how the DAG is executed. + +| Attribute | Description | +|-----------|-------------| +| pool | Names the executor the DAG should run on | +| poolParameters | Any parameters the executor accepts that might modify how a task is run | \ No newline at end of file diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 201bb51..caccf83 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -83,7 +83,7 @@ namespace daggy { /* * { * "name": "DAG Run Name" - * "parameters": {...} + * "taskParameters": {...} * "tasks": {...} */ void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { @@ -104,9 +104,9 @@ namespace daggy { // Get parameters if there are any ParameterValues parameters; - if (doc.HasMember("parameters")) { + if (doc.HasMember("taskParameters")) { try { - auto parsedParams = parametersFromJSON(doc["parameters"].GetObject()); + auto parsedParams = parametersFromJSON(doc["taskParameters"].GetObject()); parameters.swap(parsedParams); } catch (std::exception &e) { REQ_ERROR(Bad_Request, e.what()); diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index f6bfe46..0bf8354 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -70,7 +70,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { SECTION("Simple DAGRun Submission") { std::string dagRun = R"({ "name": "unit_server", - "parameters": { "FILE": [ "A", "B" ] }, + "taskParameters": { "FILE": [ "A", "B" ] }, "tasks": [ { "name": "touch", "command": [ "/usr/bin/touch", "/tmp/{{FILE}}" ]