From ac490b6f3e3a1d1a06c35b648a5e394b4453b4a0 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 30 Dec 2021 13:44:17 -0400 Subject: [PATCH] Adding html output --- daggyd/daggyd/daggyd.cpp | 38 +++-- daggyd/libdaggyd/src/Server.cpp | 272 ++++++++++++++++++++++---------- 2 files changed, 214 insertions(+), 96 deletions(-) diff --git a/daggyd/daggyd/daggyd.cpp b/daggyd/daggyd/daggyd.cpp index 1204ff2..bffd732 100644 --- a/daggyd/daggyd/daggyd.cpp +++ b/daggyd/daggyd/daggyd.cpp @@ -222,7 +222,7 @@ int main(int argc, char **argv) .default_value(false) .implicit_value(true); args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); - args.add_argument("--config"); + args.add_argument("--config").default_value(std::string{}); args.add_argument("--ip").default_value(std::string{"127.0.0.1"}); args.add_argument("--port").default_value(2503u).action( [](const std::string &value) -> unsigned { return std::stoul(value); }); @@ -241,26 +241,30 @@ int main(int argc, char **argv) auto configFile = args.get("--config"); std::string listenIP = args.get("--ip"); auto listenPort = args.get("--port"); - - std::ifstream ifh(configFile); - std::string config; - std::getline(ifh, config, '\0'); - ifh.close(); + size_t webThreads = 50; + size_t dagThreads = 50; rj::Document doc; - daggy::checkRJParse(doc.Parse(config.c_str())); + if (!configFile.empty()) { + std::ifstream ifh(configFile); + std::string config; + std::getline(ifh, config, '\0'); + ifh.close(); - size_t webThreads = 50; - size_t dagThreads = 50; + daggy::checkRJParse(doc.Parse(config.c_str())); - if (doc.HasMember("ip")) - listenIP = doc["ip"].GetString(); - if (doc.HasMember("port")) - listenPort = doc["port"].GetInt(); - if (doc.HasMember("web-threads")) - webThreads = doc["web-threads"].GetInt64(); - if (doc.HasMember("dag-threads")) - dagThreads = doc["dag-threads"].GetInt64(); + if (doc.HasMember("ip")) + listenIP = doc["ip"].GetString(); + if (doc.HasMember("port")) + listenPort = doc["port"].GetInt(); + if (doc.HasMember("web-threads")) + webThreads = doc["web-threads"].GetInt64(); + if (doc.HasMember("dag-threads")) + dagThreads = doc["dag-threads"].GetInt64(); + } + else { + doc.SetObject(); + } if (verbose) { std::cout << "Server running at http://" << listenIP << ':' << listenPort diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 4e9ebf8..d916a0d 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -19,6 +19,17 @@ using namespace Pistache; namespace daggy::daggyd { + + bool requestIsForJSON(const Pistache::Rest::Request &request) + { + auto acceptedMimeTypes = + request.headers().get()->media(); + auto fit = + std::find(acceptedMimeTypes.begin(), acceptedMimeTypes.end(), + Pistache::Http::Mime::MediaType::fromString("text/html")); + return fit == acceptedMimeTypes.end(); + } + void Server::init(size_t threads) { auto opts = Http::Endpoint::options() @@ -209,48 +220,93 @@ namespace daggy::daggyd { tag = request.query().get("tag").value(); } + bool isJSON = requestIsForJSON(request); + if (request.hasParam(":all")) { - auto val = request.query().get("all").value(); + auto val = request.query().get(":all").value(); if (val == "true" or val == "1") { all = true; } } + else if (!isJSON) { + all = true; + } auto dagRuns = logger_.queryDAGRuns(tag, all); std::stringstream ss; - ss << '['; + if (isJSON) { + // default to json + ss << '['; - bool first = true; - for (const auto &run : dagRuns) { - if (first) { - first = false; - } - else { - ss << ", "; - } - - ss << " {" - << R"("runID": )" << run.runID << ',' << R"("tag": )" - << std::quoted(run.tag) << "," - << R"("startTime": )" << std::quoted(timePointToString(run.startTime)) - << ',' << R"("lastUpdate": )" - << std::quoted(timePointToString(run.lastUpdate)) << ',' - << R"("taskCounts": {)"; - bool firstState = true; - for (const auto &[state, count] : run.taskStateCounts) { - if (firstState) { - firstState = false; + bool first = true; + for (const auto &run : dagRuns) { + if (first) { + first = false; } else { ss << ", "; } - ss << std::quoted(state._to_string()) << ':' << count; - } - ss << '}' // end of taskCounts - << '}'; // end of item - } - ss << ']'; + ss << " {" + << R"("runID": )" << run.runID << ',' << R"("tag": )" + << std::quoted(run.tag) << "," + << R"("startTime": )" + << std::quoted(timePointToString(run.startTime)) << ',' + << R"("lastUpdate": )" + << std::quoted(timePointToString(run.lastUpdate)) << ',' + << R"("taskCounts": {)"; + bool firstState = true; + for (const auto &[state, count] : run.taskStateCounts) { + if (firstState) { + firstState = false; + } + else { + ss << ", "; + } + ss << std::quoted(state._to_string()) << ':' << count; + } + ss << '}' // end of taskCounts + << '}'; // end of item + } + ss << ']'; + } + else { + // HTML + ss << "
Daggy " + "Runs

Current Runs


"; + std::cout << "Reporting on " << dagRuns.size() << " dag runs\n"; + if (!dagRuns.empty()) { + std::sort(dagRuns.begin(), dagRuns.end(), + [](const auto &a, const auto &b) { + return a.startTime > b.startTime; + }); + ss << ""; + for (auto &ds : dagRuns) { + size_t nTasks = 0; + for (const auto &[k, cnt] : ds.taskStateCounts) + nTasks += cnt; + ss << "" + << R"(" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << "" + << ""; + } + ss << "
Run IDTagState# " + "TasksStart TimeLast " + "UpdateQueuedRunningRetryErroredCompleted
)" + << ds.runID << "" << ds.tag << "" << ds.runState << "" << nTasks << "" << timePointToString(ds.startTime) << "" << timePointToString(ds.lastUpdate) << "" << ds.taskStateCounts[RunState::QUEUED] << "" << ds.taskStateCounts[RunState::RUNNING] << "" << ds.taskStateCounts[RunState::RETRY] << "" << ds.taskStateCounts[RunState::ERRORED] << "" << ds.taskStateCounts[RunState::COMPLETED] << "
"; + } + ss << ""; + } response.send(Pistache::Http::Code::Ok, ss.str()); } @@ -262,69 +318,127 @@ namespace daggy::daggyd { if (!request.hasParam(":runID")) { REQ_RESPONSE(Not_Found, "No runID provided in URL"); } - auto runID = request.param(":runID").as(); - auto run = logger_.getDAGRun(runID); + auto runID = request.param(":runID").as(); + auto run = logger_.getDAGRun(runID); + bool isJSON = requestIsForJSON(request); - bool first = true; std::stringstream ss; - ss << "{" - << R"("runID": )" << runID << ',' << R"("tag": )" - << std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )" - << tasksToJSON(run.dagSpec.tasks) << ','; + if (isJSON) { + bool first = true; + ss << "{" + << R"("runID": )" << runID << ',' << R"("tag": )" + << std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )" + << tasksToJSON(run.dagSpec.tasks) << ','; - // task run states - ss << R"("taskStates": { )"; - first = true; - for (const auto &[name, state] : run.taskRunStates) { - if (first) { - first = false; - } - else { - ss << ','; - } - ss << std::quoted(name) << ": " << std::quoted(state._to_string()); - } - ss << "},"; - - // Attempt records - first = true; - ss << R"("taskAttempts": { )"; - for (const auto &[taskName, attempts] : run.taskAttempts) { - if (first) { - first = false; - } - else { - ss << ','; - } - ss << std::quoted(taskName) << ": ["; - bool firstAttempt = true; - for (const auto &attempt : attempts) { - if (firstAttempt) { - firstAttempt = false; + // task run states + ss << R"("taskStates": { )"; + first = true; + for (const auto &[name, state] : run.taskRunStates) { + if (first) { + first = false; } else { ss << ','; } - ss << attemptRecordToJSON(attempt); + ss << std::quoted(name) << ": " << std::quoted(state._to_string()); } - ss << ']'; - } - ss << "},"; + ss << "},"; - // DAG state changes - first = true; - ss << R"("dagStateChanges": [ )"; - for (const auto &change : run.dagStateChanges) { - if (first) { - first = false; + // Attempt records + first = true; + ss << R"("taskAttempts": { )"; + for (const auto &[taskName, attempts] : run.taskAttempts) { + if (first) { + first = false; + } + else { + ss << ','; + } + ss << std::quoted(taskName) << ": ["; + bool firstAttempt = true; + for (const auto &attempt : attempts) { + if (firstAttempt) { + firstAttempt = false; + } + else { + ss << ','; + } + ss << attemptRecordToJSON(attempt); + } + ss << ']'; } - else { - ss << ','; + ss << "},"; + + // DAG state changes + first = true; + ss << R"("dagStateChanges": [ )"; + for (const auto &change : run.dagStateChanges) { + if (first) { + first = false; + } + else { + ss << ','; + } + ss << stateUpdateRecordToJSON(change); } - ss << stateUpdateRecordToJSON(change); + ss << "]"; + ss << '}'; + } + else { + ss << R"( +
+ Details for RunID )" + << runID << R"( + + +
+ +
+
+ )" + << "graph LR;\n"; + + std::unordered_map> + taskClassMap; + for (const auto &[taskName, task] : run.dagSpec.tasks) { + taskClassMap[task.definedName].emplace(taskName); + } + for (const auto &[taskName, task] : run.dagSpec.tasks) { + for (const auto &child : task.children) { + for (const auto &ci : taskClassMap[child]) { + ss << " " << taskName << "-->" << ci << '\n'; + } + } + ss << "click " << taskName << " href \"/v1/dagrun/" << runID << "/task/" + << taskName << "\"\n"; + ss << "style " << taskName << " fill: #"; + switch (run.taskStateChanges[taskName].back().state) { + case RunState::QUEUED: + ss << "55f"; + break; + case RunState::RUNNING: + ss << "5a5"; + break; + case RunState::RETRY: + ss << "55a"; + break; + case RunState::ERRORED: + ss << "55F"; + break; + case RunState::COMPLETED: + ss << "5f5"; + break; + case RunState::KILLED: + ss << "fff"; + break; + case RunState::PAUSED: + ss << "333"; + break; + } + ss << '\n'; + } + ss << "
"; } - ss << "]"; - ss << '}'; response.send(Pistache::Http::Code::Ok, ss.str()); }