From 856e5bd2f4a0f1743864bc58d1aa88f3cb6ed7b6 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 6 Jan 2022 15:20:06 -0400 Subject: [PATCH] Adding in task attempts drilldown --- daggyd/libdaggyd/src/Server.cpp | 129 +++++++++++------- .../daggy/loggers/dag_run/DAGRunLogger.hpp | 6 +- .../include/daggy/loggers/dag_run/Defines.hpp | 9 ++ .../daggy/loggers/dag_run/OStreamLogger.hpp | 2 + .../daggy/loggers/dag_run/RedisLogger.hpp | 2 + .../src/loggers/dag_run/OStreamLogger.cpp | 12 ++ libdaggy/src/loggers/dag_run/RedisLogger.cpp | 42 ++++++ 7 files changed, 151 insertions(+), 51 deletions(-) diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 7e156ff..ea36a7a 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -392,62 +392,52 @@ namespace daggy::daggyd { ss << '}'; } else { + std::unordered_map stateCounts; + for (const auto &[_, state] : run.taskRunStates) { + stateCounts[state]++; + } + ss << R"(
Details for RunID )" << runID << R"( - -
-
- )" - << "graph LR;\n"; +

Summary

+ + + + + )" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << "
Run IDTagState#TasksQueuedRunningRetryErroredCompleted
" << runID << "" << run.dagSpec.tag << "" << run.dagStateChanges.back().state << "" << run.dagSpec.tasks.size() << "" << stateCounts[RunState::QUEUED] << "" << stateCounts[RunState::RUNNING] << "" << stateCounts[RunState::RETRY] << "" << stateCounts[RunState::ERRORED] << "" << stateCounts[RunState::COMPLETED] << "
" + << "

Task Details

" + << ""; - std::unordered_map> - taskClassMap; for (const auto &[taskName, task] : run.dagSpec.tasks) { - taskClassMap[task.definedName].emplace(taskName); + ss << "" + << "" + << "" + << "" + << ""; } - for (const auto &[taskName, task] : run.dagSpec.tasks) { - for (const auto &child : task.children) { - for (const auto &ci : taskClassMap[child]) { - ss << " " << taskName << "-->" << ci << '\n'; - } - } - ss << "click " << taskName << " href \"/v1/dagrun/" << runID << "/task/" - << taskName << "\"\n"; - ss << "style " << taskName << " fill: #"; - switch (run.taskStateChanges[taskName].back().state) { - case RunState::QUEUED: - ss << "55f"; - break; - case RunState::RUNNING: - ss << "5a5"; - break; - case RunState::RETRY: - ss << "55a"; - break; - case RunState::ERRORED: - ss << "55F"; - break; - case RunState::COMPLETED: - ss << "5f5"; - break; - case RunState::KILLED: - ss << "fff"; - break; - case RunState::PAUSED: - ss << "333"; - break; - } - ss << '\n'; - } - ss << "
"; + ss << "
Task Name StateLast " + "Update Logs
" << taskName << "" << run.taskRunStates.at(taskName) << "" + << timePointToString(run.taskStateChanges.at(taskName).back().time) + << "Logs" + << "
"; + response.send(Pistache::Http::Code::Ok, ss.str()); } - - response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleGetDAGRunState(const Pistache::Rest::Request &request, @@ -557,14 +547,53 @@ namespace daggy::daggyd { auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); + bool isJSON = requestIsForJSON(request); - try { - auto task = logger_.getTask(runID, taskName); - response.send(Pistache::Http::Code::Ok, taskToJSON(task)); + std::stringstream ss; + if (isJSON) { + Task task; + try { + task = logger_.getTask(runID, taskName); + } + catch (std::exception &e) { + REQ_RESPONSE(Not_Found, e.what()); + } + ss << taskToJSON(task); } - catch (std::exception &e) { - REQ_RESPONSE(Not_Found, e.what()); + else { + std::optional tr; + try { + tr.emplace(logger_.getTaskRecord(runID, taskName)); + } + catch (std::exception &e) { + REQ_RESPONSE(Not_Found, e.what()); + } + ss << "Task Details for " << runID << " / " << taskName + << "" + << "" + << "" + << "" + << "" + << ""; + + std::sort(tr->attempts.begin(), tr->attempts.end(), + [](const auto &a, const auto &b) { + return a.startTime < b.startTime; + }); + + for (size_t i = 0; i < tr->attempts.size(); ++i) { + const auto &attempt = tr->attempts[i]; + ss << ""; + } + + ss << "
Name" << taskName << "
State" << tr->state << "
Definition" << taskToJSON(tr->task) + << "
Attempts
" << timePointToString(attempt.startTime) + << "
rc: " << attempt.rc
+           << "\n\nstdout:\n--------------\n"
+           << attempt.outputLog << "\n\nstderr:\n--------------\n"
+           << attempt.errorLog << "
"; } + response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleGetTaskState(const Pistache::Rest::Request &request, diff --git a/libdaggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/libdaggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index 06e4b30..dd9a845 100644 --- a/libdaggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/libdaggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -44,7 +44,11 @@ namespace daggy::loggers::dag_run { virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0; virtual Task getTask(DAGRunID dagRunID, const std::string &taskName) = 0; + + virtual TaskRecord getTaskRecord(DAGRunID dagRunID, + const std::string &taskName) = 0; + virtual RunState getTaskState(DAGRunID dagRunID, - const std::string &taskName) = 0; + const std::string &taskName) = 0; }; } // namespace daggy::loggers::dag_run diff --git a/libdaggy/include/daggy/loggers/dag_run/Defines.hpp b/libdaggy/include/daggy/loggers/dag_run/Defines.hpp index 65d817e..f4408b9 100644 --- a/libdaggy/include/daggy/loggers/dag_run/Defines.hpp +++ b/libdaggy/include/daggy/loggers/dag_run/Defines.hpp @@ -15,10 +15,19 @@ namespace daggy::loggers::dag_run { RunState state; }; + struct TaskRecord + { + Task task; + RunState state; + std::vector stateChanges; + std::vector attempts; + }; + // Pretty heavy weight, but struct DAGRunRecord { DAGSpec dagSpec; + std::unordered_map taskRecords; std::unordered_map taskRunStates; std::unordered_map> taskAttempts; std::unordered_map> diff --git a/libdaggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/libdaggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 4e942e0..87eab37 100644 --- a/libdaggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/libdaggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -44,6 +44,8 @@ namespace daggy::loggers::dag_run { DAGRunRecord getDAGRun(DAGRunID dagRunID) override; Task getTask(DAGRunID dagRunID, const std::string &taskName) override; + TaskRecord getTaskRecord(DAGRunID dagRunID, + const std::string &taskName) override; RunState getTaskState(DAGRunID dagRunID, const std::string &taskName) override; diff --git a/libdaggy/include/daggy/loggers/dag_run/RedisLogger.hpp b/libdaggy/include/daggy/loggers/dag_run/RedisLogger.hpp index 28d66e2..30cff83 100644 --- a/libdaggy/include/daggy/loggers/dag_run/RedisLogger.hpp +++ b/libdaggy/include/daggy/loggers/dag_run/RedisLogger.hpp @@ -59,6 +59,8 @@ namespace daggy::loggers::dag_run { DAGRunRecord getDAGRun(DAGRunID dagRunID) override; Task getTask(DAGRunID dagRunID, const std::string &taskName) override; + TaskRecord getTaskRecord(DAGRunID dagRunID, + const std::string &taskName) override; RunState getTaskState(DAGRunID dagRunID, const std::string &taskName) override; diff --git a/libdaggy/src/loggers/dag_run/OStreamLogger.cpp b/libdaggy/src/loggers/dag_run/OStreamLogger.cpp index 23b3278..062127c 100644 --- a/libdaggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/libdaggy/src/loggers/dag_run/OStreamLogger.cpp @@ -170,6 +170,18 @@ namespace daggy::loggers::dag_run { return dagRuns_.at(dagRunID).dagSpec.tasks.at(taskName); } + TaskRecord OStreamLogger::getTaskRecord(DAGRunID dagRunID, + const std::string &taskName) + { + std::lock_guard lock(guard_); + const auto &run = dagRuns_.at(dagRunID); + + return TaskRecord{.task = run.dagSpec.tasks.at(taskName), + .state = run.taskRunStates.at(taskName), + .stateChanges = run.taskStateChanges.at(taskName), + .attempts = run.taskAttempts.at(taskName)}; + } + RunState OStreamLogger::getTaskState(DAGRunID dagRunID, const std::string &taskName) { diff --git a/libdaggy/src/loggers/dag_run/RedisLogger.cpp b/libdaggy/src/loggers/dag_run/RedisLogger.cpp index ab923c6..071ed73 100644 --- a/libdaggy/src/loggers/dag_run/RedisLogger.cpp +++ b/libdaggy/src/loggers/dag_run/RedisLogger.cpp @@ -252,6 +252,48 @@ namespace daggy::loggers::dag_run { return taskFromJSON(taskName, resp.as()); } + TaskRecord RedisLogger::getTaskRecord(DAGRunID dagRunID, + const std::string &taskName) + { + // Task State + auto taskState = RunState::_from_string( + ctx_.query("HGET %s %s", getTaskStatesKey_(dagRunID).c_str(), + taskName.c_str()) + .as() + .c_str()); + + // task + auto task = taskFromJSON( + taskName, ctx_.query("HGET %s %s", getTasksKey_(dagRunID).c_str(), + taskName.c_str()) + .as()); + + // Attempts + auto attemptJSONS = + ctx_.query("LRANGE %s 0 -1", + getTaskAttemptKey_(dagRunID, taskName).c_str()) + .asList(); + std::vector attempts; + std::transform(attemptJSONS.begin(), attemptJSONS.end(), + std::back_inserter(attempts), + [](const auto &s) { return attemptRecordFromJSON(s); }); + + // Populate stateUpdates + auto taskStateUpdates = + ctx_.query("LRANGE %s 0 -1", + getTaskStateUpdateKey_(dagRunID, taskName).c_str()) + .asList(); + std::vector stateUpdates; + std::transform(taskStateUpdates.begin(), taskStateUpdates.end(), + std::back_inserter(stateUpdates), + [](const auto &s) { return stateUpdateRecordFromJSON(s); }); + + return TaskRecord{.task = task, + .state = taskState, + .stateChanges = stateUpdates, + .attempts = attempts}; + } + RunState RedisLogger::getTaskState(DAGRunID dagRunID, const std::string &taskName) {