diff --git a/README.md b/README.md index f41213d..ab1983b 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,12 @@ and [postgres](https://postgresql.org). Building == +**Requirements:** + +- git +- cmake >= 3.14 +- gcc >= 9 + ``` git clone https://gitlab.com/iroddis/daggy cd daggy diff --git a/TODO.md b/TODO.md index 11898e8..ffca01e 100644 --- a/TODO.md +++ b/TODO.md @@ -6,8 +6,10 @@ Tasks - [ ] Add in authorization scheme (maybe PAM auth endpoint with JWT?) - [ ] Flesh out server and interface - Core Functionality - - [ ] Handle return on errored DAG / Task - - [ ] Clearing a DAG Task + - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma + separated list + - Allow for tasks to define next tasks + - Add execution gates - Executors - [ ] Slurm Executor - Loggers @@ -22,4 +24,6 @@ Tasks - [X] Add ability to define child -> parent relationships - [X] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger - - [X] Resume a failed DAG \ No newline at end of file + - [X] Resume a failed DAG + - [X] Handle return on errored DAG / Task + - [X] Clearing a DAG Task diff --git a/daggy/include/daggy/AttemptRecord.hpp b/daggy/include/daggy/AttemptRecord.hpp deleted file mode 100644 index 5b3515e..0000000 --- a/daggy/include/daggy/AttemptRecord.hpp +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include -#include - -#include "Defines.hpp" - -namespace daggy { - struct AttemptRecord { - TimePoint startTime; - TimePoint stopTime; - int rc; // RC from the task - std::string executorLog; // Logs from the dag_executor - std::string outputLog; // stdout from command - std::string errorLog; // stderr from command - }; -} diff --git a/daggy/include/daggy/DAGRun.hpp b/daggy/include/daggy/DAGRun.hpp deleted file mode 100644 index 2cb7b8a..0000000 --- a/daggy/include/daggy/DAGRun.hpp +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "DAG.hpp" -#include "Task.hpp" -#include "AttemptRecord.hpp" - -namespace daggy { - using ParameterValue = std::variant>; - using TaskRun = std::vector; - - struct DAGRun { - std::vector tasks; - std::unordered_map parameters; - DAG dag; - std::vector taskRuns; - }; -} diff --git a/daggy/include/daggy/Defines.hpp b/daggy/include/daggy/Defines.hpp index 3d5291d..b1c7798 100644 --- a/daggy/include/daggy/Defines.hpp +++ b/daggy/include/daggy/Defines.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -29,4 +30,29 @@ namespace daggy { KILLED = 1 << 3, COMPLETED = 1 << 4 }; + + struct Task { + std::string name; + std::vector command; + uint32_t maxRetries; + uint32_t retryIntervalSeconds; // Time to wait between retries + std::unordered_set children; + + bool operator==(const Task &other) const { + return (name == other.name) + and (maxRetries == other.maxRetries) + and (retryIntervalSeconds == other.retryIntervalSeconds) + and (command == other.command) + and (children == other.children); + } + }; + + struct AttemptRecord { + TimePoint startTime; + TimePoint stopTime; + int rc; // RC from the task + std::string executorLog; // Logs from the dag_executor + std::string outputLog; // stdout from command + std::string errorLog; // stderr from command + }; } diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index c908c07..43a1068 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -8,8 +8,6 @@ #include #include "Defines.hpp" -#include "Task.hpp" -#include "AttemptRecord.hpp" namespace rj = rapidjson; diff --git a/daggy/include/daggy/Task.hpp b/daggy/include/daggy/Task.hpp deleted file mode 100644 index 7bdb8ce..0000000 --- a/daggy/include/daggy/Task.hpp +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace daggy { - struct Task { - std::string name; - std::vector command; - uint32_t maxRetries; - uint32_t retryIntervalSeconds; // Time to wait between retries - std::unordered_set children; - - bool operator==(const Task &other) const { - return (name == other.name) - and (maxRetries == other.maxRetries) - and (retryIntervalSeconds == other.retryIntervalSeconds) - and (command == other.command) - and (children == other.children); - } - }; -} diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index e25f335..0b7e203 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -9,7 +9,6 @@ #include "daggy/loggers/dag_run/DAGRunLogger.hpp" #include "daggy/executors/task/TaskExecutor.hpp" -#include "Task.hpp" #include "Defines.hpp" #include "DAG.hpp" diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index d549a1e..3b0bd4b 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -16,4 +16,4 @@ namespace daggy { }; } } -} \ No newline at end of file +} diff --git a/daggy/include/daggy/executors/task/TaskExecutor.hpp b/daggy/include/daggy/executors/task/TaskExecutor.hpp index 749ecb1..0b7c116 100644 --- a/daggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/TaskExecutor.hpp @@ -6,8 +6,7 @@ #include #include -#include "daggy/Task.hpp" -#include "daggy/AttemptRecord.hpp" +#include "daggy/Defines.hpp" #include "daggy/ThreadPool.hpp" /* diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index d9852fb..e5594b8 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -2,8 +2,6 @@ #include -#include "../../Task.hpp" -#include "../../AttemptRecord.hpp" #include "../../Defines.hpp" #include "Defines.hpp" diff --git a/daggy/include/daggy/loggers/dag_run/Defines.hpp b/daggy/include/daggy/loggers/dag_run/Defines.hpp index ca5292d..fde2b8e 100644 --- a/daggy/include/daggy/loggers/dag_run/Defines.hpp +++ b/daggy/include/daggy/loggers/dag_run/Defines.hpp @@ -1,37 +1,39 @@ #pragma once -namespace daggy { - namespace loggers { - namespace dag_run { - struct TaskUpdateRecord { - TimePoint time; - TaskID taskID; - RunState newState; - }; +#include +#include +#include +#include - struct DAGUpdateRecord { - TimePoint time; - RunState newState; - }; - // Pretty heavy weight, but - struct DAGRunRecord { - std::string name; - std::vector tasks; - std::vector runStates; - std::vector> taskAttempts; - std::vector taskStateChanges; - std::vector dagStateChanges; - }; +namespace daggy::loggers::dag_run { + struct TaskUpdateRecord { + TimePoint time; + TaskID taskID; + RunState newState; + }; - struct DAGRunSummary { - DAGRunID runID; - std::string name; - RunState runState; - TimePoint startTime; - TimePoint lastUpdate; - std::unordered_map taskStateCounts; - }; - } - } -} \ No newline at end of file + struct DAGUpdateRecord { + TimePoint time; + RunState newState; + }; + + // Pretty heavy weight, but + struct DAGRunRecord { + std::string name; + std::vector tasks; + std::vector taskRunStates; + std::vector> taskAttempts; + std::vector taskStateChanges; + std::vector dagStateChanges; + }; + + struct DAGRunSummary { + DAGRunID runID; + std::string name; + RunState runState; + TimePoint startTime; + TimePoint lastUpdate; + std::unordered_map taskStateCounts; + }; +} diff --git a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp index ac3689b..84d1756 100644 --- a/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp @@ -6,65 +6,62 @@ #include #include "DAGRunLogger.hpp" +#include "Defines.hpp" namespace fs = std::filesystem; namespace rj = rapidjson; -namespace daggy { - namespace loggers { - namespace dag_run { - /* - * This logger should only be used for debug purposes. It's not really optimized for querying, and will - * use a ton of inodes to track state. - * - * On the plus side, it's trivial to look at without using the API. - * - * Filesystem logger creates the following structure: - * {root}/ - * runs/ - * {runID}/ - * meta.json --- Contains the DAG name, task definitions - * states.csv --- DAG state changes - * {taskName}/ - * states.csv --- TASK state changes - * {attempt}/ - * metadata.json --- timestamps and rc - * output.log - * error.log - * executor.log - */ - class FileSystemLogger : public DAGRunLogger { - public: - FileSystemLogger(fs::path root); +namespace daggy::loggers::dag_run { + /* + * This logger should only be used for debug purposes. It's not really optimized for querying, and will + * use a ton of inodes to track state. + * + * On the plus side, it's trivial to look at without using the API. + * + * Filesystem logger creates the following structure: + * {root}/ + * runs/ + * {runID}/ + * meta.json --- Contains the DAG name, task definitions + * states.csv --- DAG state changes + * {taskName}/ + * states.csv --- TASK state changes + * {attempt}/ + * metadata.json --- timestamps and rc + * output.log + * error.log + * executor.log + */ + class FileSystemLogger : public DAGRunLogger { + public: + FileSystemLogger(fs::path root); - // Execution - DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; + // Execution + DAGRunID startDAGRun(std::string name, const std::vector &tasks) override; - void updateDAGRunState(DAGRunID dagRunID, RunState state) override; + void updateDAGRunState(DAGRunID dagRunID, RunState state) override; - void - logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override; + void + logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override; - void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override; + void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override; - // Querying - std::vector getDAGs(uint32_t stateMask) override; + // Querying + std::vector getDAGs(uint32_t stateMask) override; - DAGRunRecord getDAGRun(DAGRunID dagRunID) override; + DAGRunRecord getDAGRun(DAGRunID dagRunID) override; - private: - fs::path root_; - std::atomic nextRunID_; - std::mutex lock_; + private: + fs::path root_; + std::atomic nextRunID_; + std::mutex lock_; - // std::unordered_map runLocks; + // std::unordered_map runLocks; - inline const fs::path getCurrentPath() const; + inline const fs::path getCurrentPath() const; - inline const fs::path getRunsRoot() const; + inline const fs::path getRunsRoot() const; - inline const fs::path getRunRoot(DAGRunID runID) const; - }; - } - } + inline const fs::path getRunRoot(DAGRunID runID) const; + }; } diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 990c4fc..3bbc5dd 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -4,6 +4,7 @@ #include #include "DAGRunLogger.hpp" +#include "Defines.hpp" namespace daggy { namespace loggers { @@ -38,4 +39,4 @@ namespace daggy { }; } } -} \ No newline at end of file +} diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 31acf36..bed95d7 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -228,4 +228,4 @@ namespace daggy { ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); return Clock::from_time_t(mktime(&dt)); } -} \ No newline at end of file +} diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index dc266e3..8bad0fa 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -64,6 +66,12 @@ namespace daggy { .bind(&Server::handleRunDAG, this) .produces(MIME(Application, Json), MIME(Application, Xml)) .response(Http::Code::Ok, "Run a DAG"); + // List detailed DAG run + dagPath + .route(desc_.get("/:runID")) + .bind(&Server::handleGetDAGRun, this) + .produces(MIME(Application, Json), MIME(Application, Xml)) + .response(Http::Code::Ok, "Details of a specific DAG run"); // List all DAG runs dagPath @@ -72,12 +80,7 @@ namespace daggy { .produces(MIME(Application, Json), MIME(Application, Xml)) .response(Http::Code::Ok, "The list of all known DAG Runs"); - // List detailed DAG run - dagPath - .route(desc_.get("/{id}")) - .bind(&Server::handleGetDAGRun, this) - .produces(MIME(Application, Json), MIME(Application, Xml)) - .response(Http::Code::Ok, "Details of a specific DAG run"); + } /* @@ -127,7 +130,7 @@ namespace daggy { auto runID = logger_.startDAGRun(runName, tasks); auto dag = buildDAGFromTasks(tasks); - auto fut = runnerPool_.addTask( + runnerPool_.addTask( [this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); @@ -135,10 +138,99 @@ namespace daggy { void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request, response)) return; + auto dagRuns = logger_.getDAGs(0); + std::stringstream ss; + ss << '['; + + bool first = true; + for (const auto &run : dagRuns) { + if (first) { + first = false; + } else { + ss << ", "; + } + + ss << " {" + << R"("runID": )" << run.runID << ',' + << R"("name": )" << std::quoted(run.name) << "," + << R"("startTime": )" << std::quoted(timePointToString(run.startTime)) << ',' + << R"("lastUpdate": )" << std::quoted(timePointToString(run.lastUpdate)) << ',' + << R"("taskCounts": {)"; + bool firstState = true; + for (const auto &[state, count] : run.taskStateCounts) { + if (firstState) { + firstState = false; + } else { + ss << ", "; + } + ss << std::quoted(magic_enum::enum_name(state)) << ':' << count; + } + ss << '}' // end of taskCounts + << '}'; // end of item + } + + ss << ']'; + response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request, response)) return; + if (!request.hasParam(":runID")) { REQ_ERROR(Not_Found, "No runID provided in URL"); } + DAGRunID runID = request.param(":runID").as(); + auto run = logger_.getDAGRun(runID); + + bool first = true; + std::stringstream ss; + ss << "{" + << R"("runID": )" << runID << ',' + << R"("name": )" << std::quoted(run.name) << ',' + << R"("tasks": )" << tasksToJSON(run.tasks) << ','; + + // task run states + ss << R"("taskStates": [ )"; + first = true; + for (const auto &state : run.taskRunStates) { + if (first) { first = false; } else { ss << ','; } + ss << std::quoted(magic_enum::enum_name(state)); + } + ss << "],"; + + // Attempt records + first = true; + ss << R"("taskAttempts": [ )"; + for (const auto &attempts : run.taskAttempts) { + if (first) { first = false; } else { ss << ','; } + ss << '['; + bool firstAttempt = true; + for (const auto &attempt : attempts) { + if (firstAttempt) { firstAttempt = false; } else { ss << ','; } + ss << '{' + << R"("startTime":)" << std::quoted(timePointToString(attempt.startTime)) << ',' + << R"("stopTime":)" << std::quoted(timePointToString(attempt.stopTime)) << ',' + << R"("rc":)" << attempt.rc << ',' + << R"("outputLog":)" << std::quoted(attempt.outputLog) << ',' + << R"("errorLog":)" << std::quoted(attempt.errorLog) << ',' + << R"("executorLog":)" << std::quoted(attempt.executorLog) + << '}'; + } + ss << ']'; + } + ss << "],"; + + // DAG state changes + first = true; + ss << R"("dagStateChanges": [ )"; + for (const auto &change : run.dagStateChanges) { + if (first) { first = false; } else { ss << ','; } + ss << '{' + << R"("newState": )" << std::quoted(magic_enum::enum_name(change.newState)) << ',' + << R"("time": )" << std::quoted(timePointToString(change.time)) + << '}'; + } + ss << "]"; + ss << '}'; + + response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index d830683..7d4a725 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -174,6 +174,11 @@ namespace daggy { } std::this_thread::sleep_for(250ms); } + + if (dag.allVisited()) { + logger.updateDAGRunState(runID, RunState::COMPLETED); + } + return dag; } diff --git a/daggy/src/loggers/dag_run/FileSystemLogger.cpp b/daggy/src/loggers/dag_run/FileSystemLogger.cpp index 2b5c6e8..2de031b 100644 --- a/daggy/src/loggers/dag_run/FileSystemLogger.cpp +++ b/daggy/src/loggers/dag_run/FileSystemLogger.cpp @@ -160,7 +160,7 @@ namespace daggy { for (const auto &task : record.tasks) { auto taskStateFile = runRoot / task.name / "states.csv"; if (!fs::exists(taskStateFile)) { - record.runStates.push_back(RunState::QUEUED); + record.taskRunStates.push_back(RunState::QUEUED); continue; } @@ -169,9 +169,9 @@ namespace daggy { std::stringstream ss{line}; while (std::getline(ss, token, ',')) { continue; } RunState taskState = magic_enum::enum_cast(token).value(); - record.runStates.emplace_back(taskState); + record.taskRunStates.emplace_back(taskState); ifh.close(); } return record; } -} \ No newline at end of file +} diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index 35a034e..a778951 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -17,8 +17,7 @@ namespace daggy { dagRuns_.push_back({ .name = name, .tasks = tasks, - //.runStates = std::vector(tasks.size(), RunState::QUEUED), - .runStates{tasks.size(), RunState::QUEUED}, + .taskRunStates{tasks.size(), RunState::QUEUED}, .taskAttempts = std::vector>(tasks.size()) }); @@ -56,21 +55,53 @@ namespace daggy { void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { std::lock_guard lock(guard_); - os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << magic_enum::enum_name(state) - << std::endl; - - const auto &tasks = dagRuns_[dagRunID].tasks; + auto &dagRun = dagRuns_[dagRunID]; + const auto &tasks = dagRun.tasks; auto it = std::find_if(tasks.begin(), tasks.end(), [&taskName](const Task &a) { return a.name == taskName; }); if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName); size_t taskID = it - tasks.begin(); - dagRuns_[dagRunID].taskStateChanges.push_back({Clock::now(), taskID, state}); + dagRun.taskStateChanges.push_back({Clock::now(), taskID, state}); + dagRun.taskRunStates[taskID] = state; + + os_ << "Task State Change (" << dagRunID << '/' << taskName << " [task_id: " << taskID << "]): " + << magic_enum::enum_name(state) + << std::endl; } // Querying - std::vector OStreamLogger::getDAGs(uint32_t stateMask) { return {}; } + std::vector OStreamLogger::getDAGs(uint32_t stateMask) { + std::vector summaries; + std::lock_guard lock(guard_); + size_t i = 0; + for (const auto &run : dagRuns_) { + DAGRunSummary summary{ + .runID = i, + .name = run.name, + .runState = run.dagStateChanges.back().newState, + .startTime = run.dagStateChanges.front().time, + .lastUpdate = std::max(run.taskStateChanges.back().time, + run.dagStateChanges.back().time) + }; - DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { return dagRuns_[dagRunID]; } + std::vector states(run.tasks.size()); + for (const auto &taskUpdate : run.taskStateChanges) { + states[taskUpdate.taskID] = taskUpdate.newState; + } + + for (const auto &taskState : states) { + summary.taskStateCounts[taskState]++; + } + + summaries.emplace_back(summary); + } + return summaries; + } + + DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) { + std::lock_guard lock(guard_); + return dagRuns_[dagRunID]; + } } } } diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index 189e4ab..0715699 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -4,11 +4,14 @@ #include #include +#include #include "daggy/Server.hpp" #include "daggy/executors/task/ForkingTaskExecutor.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" +namespace rj = rapidjson; + Pistache::Http::Response REQUEST(std::string url, std::string payload = "") { Pistache::Http::Experimental::Client client; @@ -81,12 +84,80 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { "parents": [ "touch" ] } ] - })"; + })"; + + + // Submit, and get the runID + daggy::DAGRunID runID = 0; + { + auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + + rj::Document doc; + rj::ParseResult parseResult = doc.Parse(response.body().c_str()); + REQUIRE(parseResult); + REQUIRE(doc.IsObject()); + REQUIRE(doc.HasMember("runID")); + + runID = doc["runID"].GetUint64(); + } + + // Ensure our runID shows up in the list of running DAGs + { + auto response = REQUEST(baseURL + "/v1/dagrun/"); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + + rj::Document doc; + rj::ParseResult parseResult = doc.Parse(response.body().c_str()); + REQUIRE(parseResult); + REQUIRE(doc.IsArray()); + REQUIRE(doc.Size() >= 1); + + // Ensure that our DAG is in the list and matches our given DAGRunID + bool found = false; + const auto &runs = doc.GetArray(); + for (size_t i = 0; i < runs.Size(); ++i) { + const auto &run = runs[i]; + REQUIRE(run.IsObject()); + REQUIRE(run.HasMember("name")); + REQUIRE(run.HasMember("runID")); + + std::string runName = run["name"].GetString(); + if (runName == "unit_server") { + REQUIRE(run["runID"].GetUint64() == runID); + found = true; + break; + } + } + REQUIRE(found); + } + + // Wait until our DAG is complete + bool complete = false; + while (!complete) { + complete = true; + auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID)); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + rj::Document doc; + rj::ParseResult parseResult = doc.Parse(response.body().c_str()); + REQUIRE(parseResult); + REQUIRE(doc.IsObject()); + + REQUIRE(doc.HasMember("taskStates")); + const auto &taskStates = doc["taskStates"].GetArray(); + REQUIRE(taskStates.Size() == 3); + + for (size_t i = 0; i < taskStates.Size(); ++i) { + std::string state = taskStates[i].GetString(); + if (state != "COMPLETED") { + complete = false; + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } - auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun); - REQUIRE(response.code() == Pistache::Http::Code::Ok); std::this_thread::sleep_for(std::chrono::seconds(2)); - for (const auto &pth : std::vector{"dagrun_A", "dagrun_B"}) { REQUIRE(fs::exists(pth)); fs::remove(pth); @@ -94,4 +165,4 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { } server.shutdown(); -} \ No newline at end of file +}