From b63739c0a989ec1ed65b400a79252d9f2e3af055 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Tue, 22 Feb 2022 10:32:31 -0400 Subject: [PATCH] Adding StopTask endpoint and endpoint documentation --- daggyd/libdaggyd/include/daggyd/Server.hpp | 1 + daggyd/libdaggyd/src/Server.cpp | 23 +++ endpoints.rst | 201 +++++++++++++++++++++ libdaggy/include/daggy/DAGRunner.hpp | 1 + libdaggy/src/DAGRunner.cpp | 5 + 5 files changed, 231 insertions(+) create mode 100644 endpoints.rst diff --git a/daggyd/libdaggyd/include/daggyd/Server.hpp b/daggyd/libdaggyd/include/daggyd/Server.hpp index 26ed627..cc6cd1a 100644 --- a/daggyd/libdaggyd/include/daggyd/Server.hpp +++ b/daggyd/libdaggyd/include/daggyd/Server.hpp @@ -50,6 +50,7 @@ namespace daggy::daggyd { DAGGY_REST_HANDLER(handleGetDAGRunState); // X DAGGY_REST_HANDLER(handleSetDAGRunState); // X DAGGY_REST_HANDLER(handleGetTask); // X + DAGGY_REST_HANDLER(handleStopTask); // X DAGGY_REST_HANDLER(handleGetTaskState); // X DAGGY_REST_HANDLER(handleSetTaskState); // X diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 4e02077..088fd68 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -167,6 +167,10 @@ namespace daggy::daggyd { .bind(&Server::handleGetTask, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Details of a specific task"); + taskPath.route(desc_.del("/")) + .bind(&Server::handleStopTask, this) + .produces(MIME(Application, Json)) + .response(Http::Code::Ok, "Kill a specific task"); /* Task State paths @@ -682,6 +686,25 @@ namespace daggy::daggyd { } } + void Server::handleStopTask(const Pistache::Rest::Request &request, + Pistache::Http::ResponseWriter response) + { + if (!handleAuth(request)) + return; + + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + + { + std::lock_guard lock(runnerGuard_); + auto it = runners_.find(runID); + if (runners_.find(runID) != runners_.end()) { + it->second->stopTask(taskName); + } + } + response.send(Pistache::Http::Code::Ok, ""); + } + void Server::handleSetTaskState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { diff --git a/endpoints.rst b/endpoints.rst new file mode 100644 index 0000000..269e9e8 --- /dev/null +++ b/endpoints.rst @@ -0,0 +1,201 @@ +Paths +=== + ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| Path | Verb | Description | Payload | Result | ++==============================================+========+===========================================================================+====================+===============================+ +| ``/ready`` | GET | Ready check | | ``{ "msg": "Ya like DAGs?"}`` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagruns?all=1`` | GET | Retrieve list of known dags | | :ref:`Example List of Runs` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun`` | POST | Submit a DAG for execution | :ref:`Example DAG` | ``{ "runID": 0 }`` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/validate`` | POST | Submit a DAG for validation | :ref:`Example DAG` | ``{ "valid": true }`` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}`` | GET | Retrieve full state of dagrun | | :ref:`Example DAG Run` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}`` | DELETE | Kill a running DAG | | | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}/state`` | GET | Retrieve current state of run | | ``{ "state": "QUEUED" }`` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}/state/{state}`` | PATCH | Set the state of a DAG Run. Can be used to restart an errored/killed DAG. | | ``{ "runID": 0 }`` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}/task/{taskName}`` | GET | Get all the details of a specific task. | | :ref:`Example Task Details` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}/task/{taskName}`` | DELETE | Kill a task | | | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ +| ``/v1/dagrun/{runid}/task/{taskName}/state`` | GET | Get the current state of a task | | ``{ "state": "QUEUED" }`` | ++----------------------------------------------+--------+---------------------------------------------------------------------------+--------------------+-------------------------------+ + +Example List of Runs +=== + +.. code:: json + + [ + { + "runID": 0, + "tag": "mediumtest", + "state": "COMPLETED", + "startTime": "1645112245996647771", + "lastUpdate": "1645112265427198113", + "taskCounts": { + "COMPLETED": 25 + } + }, + { + "runID": 1, + "tag": "mediumtest", + "state": "COMPLETED", + "startTime": "1645218441353189612", + "lastUpdate": "1645218455775103710", + "taskCounts": { + "COMPLETED": 25 + } + }, + { + "runID": 2, + "tag": "mediumtest", + "state": "RUNNING", + "startTime": "1645539903281234588", + "lastUpdate": "1645539912499992547", + "taskCounts": { + "COMPLETED": 11, + "RUNNING": 14 + } + } + ] + +Example DAG +=== + +.. code:: json + + { + "tag": "mediumtest", + "parameters": { + "JOBNO": [ + "1", "2" + ] + }, + "tasks": { + "simple": { + "job": { + "command": [ "/usr/bin/python3", "script.py", "{{JOBNO}}" ], + "cores": "1", + "memoryMB": "100" + } + } + } + } + + +Example DAG Run +=== + +.. code:: json + + { + "runID": 0, + "tag": "mediumtest", + "tasks": { + "simple_0": { + "maxRetries": 0, + "retryIntervalSeconds": 0, + "job": { + "environment": [], + "memoryMB": "100", + "cores": "1", + "command": [ + "/usr/bin/python3", + "script.py", + "1" + ] + }, + "children": [], + "parents": [], + "isGenerator": false + }, + "simple_1": { + "maxRetries": 0, + "retryIntervalSeconds": 0, + "job": { + "environment": [], + "memoryMB": "100", + "cores": "1", + "command": [ + "/usr/bin/python3", + "script.py", + "2" + ] + }, + "children": [], + "parents": [], + "isGenerator": false + } + }, + "taskStates": { + "simple_0": "COMPLETED", + "simple_1": "COMPLETED" + }, + "taskAttempts": { + "simple_0": [ + { + "startTime": "1645112246001010638", + "stopTime": "1645112256116473300", + "rc": 0, + "outputLog": "Echoing script.py 1 env is not found, value is >><< \n", + "errorLog": "", + "executorLog": "" + } + ], + "simple_1": [ + { + "startTime": "1645112246001027500", + "stopTime": "1645112256115818901", + "rc": 0, + "outputLog": "Echoing script.py 2 env is not found, value is >><< \n", + "errorLog": "", + "executorLog": "" + } + ] + }, + "dagStateChanges": [ + { + "time": "1645112245996647771", + "state": "QUEUED" + }, + { + "time": "1645112245997607895", + "state": "RUNNING" + }, + { + "time": "1645112265427198113", + "state": "COMPLETED" + } + ] + } + + +Example Task Details +=== + +.. code:: json + + { + "maxRetries": 0, + "retryIntervalSeconds": 0, + "job": { + "environment": [], + "memoryMB": "100", + "cores": "1", + "command": [ + "/usr/bin/python3", + "script.py", + "1" + ] + }, + "children": [], + "parents": [], + "isGenerator": false + } diff --git a/libdaggy/include/daggy/DAGRunner.hpp b/libdaggy/include/daggy/DAGRunner.hpp index caaa8ab..5c2b888 100644 --- a/libdaggy/include/daggy/DAGRunner.hpp +++ b/libdaggy/include/daggy/DAGRunner.hpp @@ -31,6 +31,7 @@ namespace daggy { TaskDAG run(); void resetRunning(); void stop(bool kill = false, bool blocking = false); + void stopTask(const std::string &taskName); private: void collectFinished(); diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp index fe64cfd..941c5fa 100644 --- a/libdaggy/src/DAGRunner.cpp +++ b/libdaggy/src/DAGRunner.cpp @@ -90,6 +90,11 @@ namespace daggy { } } + void DAGRunner::stopTask(const std::string &taskName) + { + executor_.stop(runID_, taskName); + } + void DAGRunner::queuePending() { if (!running_)