Adding html output
This commit is contained in:
@@ -222,7 +222,7 @@ int main(int argc, char **argv)
|
|||||||
.default_value(false)
|
.default_value(false)
|
||||||
.implicit_value(true);
|
.implicit_value(true);
|
||||||
args.add_argument("-d", "--daemon").default_value(false).implicit_value(true);
|
args.add_argument("-d", "--daemon").default_value(false).implicit_value(true);
|
||||||
args.add_argument("--config");
|
args.add_argument("--config").default_value(std::string{});
|
||||||
args.add_argument("--ip").default_value(std::string{"127.0.0.1"});
|
args.add_argument("--ip").default_value(std::string{"127.0.0.1"});
|
||||||
args.add_argument("--port").default_value(2503u).action(
|
args.add_argument("--port").default_value(2503u).action(
|
||||||
[](const std::string &value) -> unsigned { return std::stoul(value); });
|
[](const std::string &value) -> unsigned { return std::stoul(value); });
|
||||||
@@ -241,26 +241,30 @@ int main(int argc, char **argv)
|
|||||||
auto configFile = args.get<std::string>("--config");
|
auto configFile = args.get<std::string>("--config");
|
||||||
std::string listenIP = args.get<std::string>("--ip");
|
std::string listenIP = args.get<std::string>("--ip");
|
||||||
auto listenPort = args.get<unsigned>("--port");
|
auto listenPort = args.get<unsigned>("--port");
|
||||||
|
size_t webThreads = 50;
|
||||||
std::ifstream ifh(configFile);
|
size_t dagThreads = 50;
|
||||||
std::string config;
|
|
||||||
std::getline(ifh, config, '\0');
|
|
||||||
ifh.close();
|
|
||||||
|
|
||||||
rj::Document doc;
|
rj::Document doc;
|
||||||
daggy::checkRJParse(doc.Parse(config.c_str()));
|
if (!configFile.empty()) {
|
||||||
|
std::ifstream ifh(configFile);
|
||||||
|
std::string config;
|
||||||
|
std::getline(ifh, config, '\0');
|
||||||
|
ifh.close();
|
||||||
|
|
||||||
size_t webThreads = 50;
|
daggy::checkRJParse(doc.Parse(config.c_str()));
|
||||||
size_t dagThreads = 50;
|
|
||||||
|
|
||||||
if (doc.HasMember("ip"))
|
if (doc.HasMember("ip"))
|
||||||
listenIP = doc["ip"].GetString();
|
listenIP = doc["ip"].GetString();
|
||||||
if (doc.HasMember("port"))
|
if (doc.HasMember("port"))
|
||||||
listenPort = doc["port"].GetInt();
|
listenPort = doc["port"].GetInt();
|
||||||
if (doc.HasMember("web-threads"))
|
if (doc.HasMember("web-threads"))
|
||||||
webThreads = doc["web-threads"].GetInt64();
|
webThreads = doc["web-threads"].GetInt64();
|
||||||
if (doc.HasMember("dag-threads"))
|
if (doc.HasMember("dag-threads"))
|
||||||
dagThreads = doc["dag-threads"].GetInt64();
|
dagThreads = doc["dag-threads"].GetInt64();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
doc.SetObject();
|
||||||
|
}
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
std::cout << "Server running at http://" << listenIP << ':' << listenPort
|
std::cout << "Server running at http://" << listenIP << ':' << listenPort
|
||||||
|
|||||||
@@ -19,6 +19,17 @@
|
|||||||
using namespace Pistache;
|
using namespace Pistache;
|
||||||
|
|
||||||
namespace daggy::daggyd {
|
namespace daggy::daggyd {
|
||||||
|
|
||||||
|
bool requestIsForJSON(const Pistache::Rest::Request &request)
|
||||||
|
{
|
||||||
|
auto acceptedMimeTypes =
|
||||||
|
request.headers().get<Pistache::Http::Header::Accept>()->media();
|
||||||
|
auto fit =
|
||||||
|
std::find(acceptedMimeTypes.begin(), acceptedMimeTypes.end(),
|
||||||
|
Pistache::Http::Mime::MediaType::fromString("text/html"));
|
||||||
|
return fit == acceptedMimeTypes.end();
|
||||||
|
}
|
||||||
|
|
||||||
void Server::init(size_t threads)
|
void Server::init(size_t threads)
|
||||||
{
|
{
|
||||||
auto opts = Http::Endpoint::options()
|
auto opts = Http::Endpoint::options()
|
||||||
@@ -209,48 +220,93 @@ namespace daggy::daggyd {
|
|||||||
tag = request.query().get("tag").value();
|
tag = request.query().get("tag").value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isJSON = requestIsForJSON(request);
|
||||||
|
|
||||||
if (request.hasParam(":all")) {
|
if (request.hasParam(":all")) {
|
||||||
auto val = request.query().get("all").value();
|
auto val = request.query().get(":all").value();
|
||||||
if (val == "true" or val == "1") {
|
if (val == "true" or val == "1") {
|
||||||
all = true;
|
all = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (!isJSON) {
|
||||||
|
all = true;
|
||||||
|
}
|
||||||
|
|
||||||
auto dagRuns = logger_.queryDAGRuns(tag, all);
|
auto dagRuns = logger_.queryDAGRuns(tag, all);
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << '[';
|
if (isJSON) {
|
||||||
|
// default to json
|
||||||
|
ss << '[';
|
||||||
|
|
||||||
bool first = true;
|
bool first = true;
|
||||||
for (const auto &run : dagRuns) {
|
for (const auto &run : dagRuns) {
|
||||||
if (first) {
|
if (first) {
|
||||||
first = false;
|
first = false;
|
||||||
}
|
|
||||||
else {
|
|
||||||
ss << ", ";
|
|
||||||
}
|
|
||||||
|
|
||||||
ss << " {"
|
|
||||||
<< R"("runID": )" << run.runID << ',' << R"("tag": )"
|
|
||||||
<< std::quoted(run.tag) << ","
|
|
||||||
<< R"("startTime": )" << std::quoted(timePointToString(run.startTime))
|
|
||||||
<< ',' << R"("lastUpdate": )"
|
|
||||||
<< std::quoted(timePointToString(run.lastUpdate)) << ','
|
|
||||||
<< R"("taskCounts": {)";
|
|
||||||
bool firstState = true;
|
|
||||||
for (const auto &[state, count] : run.taskStateCounts) {
|
|
||||||
if (firstState) {
|
|
||||||
firstState = false;
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
ss << ", ";
|
ss << ", ";
|
||||||
}
|
}
|
||||||
ss << std::quoted(state._to_string()) << ':' << count;
|
|
||||||
}
|
|
||||||
ss << '}' // end of taskCounts
|
|
||||||
<< '}'; // end of item
|
|
||||||
}
|
|
||||||
|
|
||||||
ss << ']';
|
ss << " {"
|
||||||
|
<< R"("runID": )" << run.runID << ',' << R"("tag": )"
|
||||||
|
<< std::quoted(run.tag) << ","
|
||||||
|
<< R"("startTime": )"
|
||||||
|
<< std::quoted(timePointToString(run.startTime)) << ','
|
||||||
|
<< R"("lastUpdate": )"
|
||||||
|
<< std::quoted(timePointToString(run.lastUpdate)) << ','
|
||||||
|
<< R"("taskCounts": {)";
|
||||||
|
bool firstState = true;
|
||||||
|
for (const auto &[state, count] : run.taskStateCounts) {
|
||||||
|
if (firstState) {
|
||||||
|
firstState = false;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ss << ", ";
|
||||||
|
}
|
||||||
|
ss << std::quoted(state._to_string()) << ':' << count;
|
||||||
|
}
|
||||||
|
ss << '}' // end of taskCounts
|
||||||
|
<< '}'; // end of item
|
||||||
|
}
|
||||||
|
ss << ']';
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// HTML
|
||||||
|
ss << "<html><header><title>Daggy "
|
||||||
|
"Runs</title></header><body><center><h2>Current Runs</h2><br>";
|
||||||
|
std::cout << "Reporting on " << dagRuns.size() << " dag runs\n";
|
||||||
|
if (!dagRuns.empty()) {
|
||||||
|
std::sort(dagRuns.begin(), dagRuns.end(),
|
||||||
|
[](const auto &a, const auto &b) {
|
||||||
|
return a.startTime > b.startTime;
|
||||||
|
});
|
||||||
|
ss << "<table><tr><th>Run ID</th><th>Tag</th><th>State</th><th># "
|
||||||
|
"Tasks</th><th>Start Time</th><th>Last "
|
||||||
|
"Update</th><th>Queued</th><th>Running</th><th>Retry</"
|
||||||
|
"th><th>Errored</th><th>Completed</th></tr>";
|
||||||
|
for (auto &ds : dagRuns) {
|
||||||
|
size_t nTasks = 0;
|
||||||
|
for (const auto &[k, cnt] : ds.taskStateCounts)
|
||||||
|
nTasks += cnt;
|
||||||
|
ss << "<tr>"
|
||||||
|
<< R"(<td><a href="/v1/api/dagrun/)" << ds.runID << R"(">)"
|
||||||
|
<< ds.runID << "</a></td>"
|
||||||
|
<< "<td>" << ds.tag << "</td>"
|
||||||
|
<< "<td>" << ds.runState << "</td>"
|
||||||
|
<< "<td>" << nTasks << "</td>"
|
||||||
|
<< "<td>" << timePointToString(ds.startTime) << "</td>"
|
||||||
|
<< "<td>" << timePointToString(ds.lastUpdate) << "</td>"
|
||||||
|
<< "<td>" << ds.taskStateCounts[RunState::QUEUED] << "</td>"
|
||||||
|
<< "<td>" << ds.taskStateCounts[RunState::RUNNING] << "</td>"
|
||||||
|
<< "<td>" << ds.taskStateCounts[RunState::RETRY] << "</td>"
|
||||||
|
<< "<td>" << ds.taskStateCounts[RunState::ERRORED] << "</td>"
|
||||||
|
<< "<td>" << ds.taskStateCounts[RunState::COMPLETED] << "</td>"
|
||||||
|
<< "</tr>";
|
||||||
|
}
|
||||||
|
ss << "</table>";
|
||||||
|
}
|
||||||
|
ss << "</body></html>";
|
||||||
|
}
|
||||||
response.send(Pistache::Http::Code::Ok, ss.str());
|
response.send(Pistache::Http::Code::Ok, ss.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -262,69 +318,127 @@ namespace daggy::daggyd {
|
|||||||
if (!request.hasParam(":runID")) {
|
if (!request.hasParam(":runID")) {
|
||||||
REQ_RESPONSE(Not_Found, "No runID provided in URL");
|
REQ_RESPONSE(Not_Found, "No runID provided in URL");
|
||||||
}
|
}
|
||||||
auto runID = request.param(":runID").as<size_t>();
|
auto runID = request.param(":runID").as<size_t>();
|
||||||
auto run = logger_.getDAGRun(runID);
|
auto run = logger_.getDAGRun(runID);
|
||||||
|
bool isJSON = requestIsForJSON(request);
|
||||||
|
|
||||||
bool first = true;
|
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << "{"
|
if (isJSON) {
|
||||||
<< R"("runID": )" << runID << ',' << R"("tag": )"
|
bool first = true;
|
||||||
<< std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )"
|
ss << "{"
|
||||||
<< tasksToJSON(run.dagSpec.tasks) << ',';
|
<< R"("runID": )" << runID << ',' << R"("tag": )"
|
||||||
|
<< std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )"
|
||||||
|
<< tasksToJSON(run.dagSpec.tasks) << ',';
|
||||||
|
|
||||||
// task run states
|
// task run states
|
||||||
ss << R"("taskStates": { )";
|
ss << R"("taskStates": { )";
|
||||||
first = true;
|
first = true;
|
||||||
for (const auto &[name, state] : run.taskRunStates) {
|
for (const auto &[name, state] : run.taskRunStates) {
|
||||||
if (first) {
|
if (first) {
|
||||||
first = false;
|
first = false;
|
||||||
}
|
|
||||||
else {
|
|
||||||
ss << ',';
|
|
||||||
}
|
|
||||||
ss << std::quoted(name) << ": " << std::quoted(state._to_string());
|
|
||||||
}
|
|
||||||
ss << "},";
|
|
||||||
|
|
||||||
// Attempt records
|
|
||||||
first = true;
|
|
||||||
ss << R"("taskAttempts": { )";
|
|
||||||
for (const auto &[taskName, attempts] : run.taskAttempts) {
|
|
||||||
if (first) {
|
|
||||||
first = false;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
ss << ',';
|
|
||||||
}
|
|
||||||
ss << std::quoted(taskName) << ": [";
|
|
||||||
bool firstAttempt = true;
|
|
||||||
for (const auto &attempt : attempts) {
|
|
||||||
if (firstAttempt) {
|
|
||||||
firstAttempt = false;
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
ss << ',';
|
ss << ',';
|
||||||
}
|
}
|
||||||
ss << attemptRecordToJSON(attempt);
|
ss << std::quoted(name) << ": " << std::quoted(state._to_string());
|
||||||
}
|
}
|
||||||
ss << ']';
|
ss << "},";
|
||||||
}
|
|
||||||
ss << "},";
|
|
||||||
|
|
||||||
// DAG state changes
|
// Attempt records
|
||||||
first = true;
|
first = true;
|
||||||
ss << R"("dagStateChanges": [ )";
|
ss << R"("taskAttempts": { )";
|
||||||
for (const auto &change : run.dagStateChanges) {
|
for (const auto &[taskName, attempts] : run.taskAttempts) {
|
||||||
if (first) {
|
if (first) {
|
||||||
first = false;
|
first = false;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ss << ',';
|
||||||
|
}
|
||||||
|
ss << std::quoted(taskName) << ": [";
|
||||||
|
bool firstAttempt = true;
|
||||||
|
for (const auto &attempt : attempts) {
|
||||||
|
if (firstAttempt) {
|
||||||
|
firstAttempt = false;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ss << ',';
|
||||||
|
}
|
||||||
|
ss << attemptRecordToJSON(attempt);
|
||||||
|
}
|
||||||
|
ss << ']';
|
||||||
}
|
}
|
||||||
else {
|
ss << "},";
|
||||||
ss << ',';
|
|
||||||
|
// DAG state changes
|
||||||
|
first = true;
|
||||||
|
ss << R"("dagStateChanges": [ )";
|
||||||
|
for (const auto &change : run.dagStateChanges) {
|
||||||
|
if (first) {
|
||||||
|
first = false;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ss << ',';
|
||||||
|
}
|
||||||
|
ss << stateUpdateRecordToJSON(change);
|
||||||
}
|
}
|
||||||
ss << stateUpdateRecordToJSON(change);
|
ss << "]";
|
||||||
|
ss << '}';
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ss << R"(<html>
|
||||||
|
<header>
|
||||||
|
<title>Details for RunID )"
|
||||||
|
<< runID << R"(</title>
|
||||||
|
<script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script>
|
||||||
|
<script>mermaid.initialize({startOnLoad:true});</script>
|
||||||
|
</header>
|
||||||
|
<body>
|
||||||
|
<center>
|
||||||
|
<div class="mermaid">
|
||||||
|
)"
|
||||||
|
<< "graph LR;\n";
|
||||||
|
|
||||||
|
std::unordered_map<std::string, std::unordered_set<std::string>>
|
||||||
|
taskClassMap;
|
||||||
|
for (const auto &[taskName, task] : run.dagSpec.tasks) {
|
||||||
|
taskClassMap[task.definedName].emplace(taskName);
|
||||||
|
}
|
||||||
|
for (const auto &[taskName, task] : run.dagSpec.tasks) {
|
||||||
|
for (const auto &child : task.children) {
|
||||||
|
for (const auto &ci : taskClassMap[child]) {
|
||||||
|
ss << " " << taskName << "-->" << ci << '\n';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ss << "click " << taskName << " href \"/v1/dagrun/" << runID << "/task/"
|
||||||
|
<< taskName << "\"\n";
|
||||||
|
ss << "style " << taskName << " fill: #";
|
||||||
|
switch (run.taskStateChanges[taskName].back().state) {
|
||||||
|
case RunState::QUEUED:
|
||||||
|
ss << "55f";
|
||||||
|
break;
|
||||||
|
case RunState::RUNNING:
|
||||||
|
ss << "5a5";
|
||||||
|
break;
|
||||||
|
case RunState::RETRY:
|
||||||
|
ss << "55a";
|
||||||
|
break;
|
||||||
|
case RunState::ERRORED:
|
||||||
|
ss << "55F";
|
||||||
|
break;
|
||||||
|
case RunState::COMPLETED:
|
||||||
|
ss << "5f5";
|
||||||
|
break;
|
||||||
|
case RunState::KILLED:
|
||||||
|
ss << "fff";
|
||||||
|
break;
|
||||||
|
case RunState::PAUSED:
|
||||||
|
ss << "333";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ss << '\n';
|
||||||
|
}
|
||||||
|
ss << "</div><center></body></html>";
|
||||||
}
|
}
|
||||||
ss << "]";
|
|
||||||
ss << '}';
|
|
||||||
|
|
||||||
response.send(Pistache::Http::Code::Ok, ss.str());
|
response.send(Pistache::Http::Code::Ok, ss.str());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user