#include #include #include #include #include #include #include #include #include #include #define REQ_RESPONSE(code, msg) \ std::stringstream ss; \ ss << R"({"message": )" << std::quoted(msg) << "}"; \ response.send(Pistache::Http::Code::code, ss.str()); \ return; using namespace Pistache; namespace daggy::daggyd { bool requestIsForJSON(const Pistache::Rest::Request &request) { auto acceptedMimeTypes = request.headers().get()->media(); auto fit = std::find(acceptedMimeTypes.begin(), acceptedMimeTypes.end(), Pistache::Http::Mime::MediaType::fromString("text/html")); return fit == acceptedMimeTypes.end(); } void Server::init(size_t threads) { auto opts = Http::Endpoint::options() .threads(threads) .flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort) .maxRequestSize(4294967296) .maxResponseSize(4294967296); endpoint_.init(opts); createDescription(); } Server::Server(const Pistache::Address &listenSpec, loggers::dag_run::DAGRunLogger &logger, executors::task::TaskExecutor &executor, size_t nDAGRunners) : endpoint_(listenSpec) , desc_("Daggy API", "0.1") , logger_(logger) , executor_(executor) , runnerPool_(nDAGRunners) { } Server::~Server() { shutdown(); } void Server::start() { router_.initFromDescription(desc_); endpoint_.setHandler(router_.handler()); endpoint_.serveThreaded(); } Server &Server::setSSLCertificates(const fs::path &cert, const fs::path &key) { endpoint_.useSSL(cert, key); return *this; } void Server::shutdown() { endpoint_.shutdown(); runnerPool_.shutdown(); } uint16_t Server::getPort() const { return endpoint_.getPort(); } void Server::createDescription() { desc_.info().license("MIT", "https://opensource.org/licenses/MIT"); auto backendErrorResponse = desc_.response(Http::Code::Internal_Server_Error, R"({"error": "An error occurred with the backend"})"); desc_.schemes(Rest::Scheme::Http) .basePath("/v1") .produces(MIME(Application, Json)) .consumes(MIME(Application, Json)); desc_.route(desc_.get("/ready")) .bind(&Server::handleReady, this) .response(Http::Code::Ok, "Response to the /ready call") .hide(); desc_.route(desc_.get("/")) .bind(&Server::handleRoot, this) .response(Http::Code::Ok, "Response to the /ready call") .hide(); auto versionPath = desc_.path("/v1"); /* DAG Run Summaries */ auto dagRunsPath = versionPath.path("/dagruns"); dagRunsPath.route(desc_.get("/")) .bind(&Server::handleQueryDAGs, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "List summaries DAGs"); /* Individual DAG Run routes */ auto dagRunPath = versionPath.path("/dagrun"); dagRunPath.route(desc_.post("/")) .bind(&Server::handleRunDAG, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Run a DAG"); dagRunPath.route(desc_.post("/validate")) .bind(&Server::handleValidateDAG, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Validate a DAG Run Spec"); /* Management of a specific DAG */ auto specificDAGRunPath = dagRunPath.path("/:runID"); specificDAGRunPath.route(desc_.del("/")) .bind(&Server::handleStopDAGRun, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Kill a running dag"); specificDAGRunPath.route(desc_.get("/")) .bind(&Server::handleGetDAGRun, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Full DAG Run"); specificDAGRunPath.route(desc_.get("/state")) .bind(&Server::handleGetDAGRunState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Structure of a DAG and DAG and Task run states"); specificDAGRunPath.route(desc_.patch("/state/:state")) .bind(&Server::handleSetDAGRunState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Change the state of a DAG"); /* Task paths */ auto taskPath = specificDAGRunPath.path("/task/:taskName"); taskPath.route(desc_.get("/")) .bind(&Server::handleGetTask, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Details of a specific task"); /* Task State paths */ auto taskStatePath = taskPath.path("/state"); taskStatePath.route(desc_.get("/")) .bind(&Server::handleGetTaskState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Get a task state"); taskStatePath.route(desc_.patch("/:state")) .bind(&Server::handleSetTaskState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Set a task state"); } void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; DAGSpec dagSpec; try { dagSpec = dagFromJSON(request.body()); } catch (std::runtime_error &e) { REQ_RESPONSE(Not_Acceptable, e.what()); } dagSpec.tasks = expandTaskSet(dagSpec.tasks, executor_, dagSpec.taskConfig.variables); // Get a run ID DAGRunID runID = logger_.startDAGRun(dagSpec); auto dag = buildDAGFromTasks(dagSpec.tasks); queueDAG_(runID, dag, dagSpec.taskConfig); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}\n"); } void Server::handleValidateDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { try { dagFromJSON(request.body()); response.send(Pistache::Http::Code::Ok, R"({"valid": true}\n)"); } catch (std::exception &e) { std::string error = e.what(); response.send( Pistache::Http::Code::Ok, std::string{R"({"valid": true, "error": })"} + error + "}\n"); } } void Server::handleQueryDAGs(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; bool all = false; std::string tag = ""; if (request.query().has("tag")) { tag = request.query().get("tag").value(); } bool isJSON = requestIsForJSON(request); if (request.hasParam(":all")) { auto val = request.query().get(":all").value(); if (val == "true" or val == "1") { all = true; } } else if (!isJSON) { all = true; } auto dagRuns = logger_.queryDAGRuns(tag, all); std::stringstream ss; if (isJSON) { // default to json ss << '['; bool first = true; for (const auto &run : dagRuns) { if (first) { first = false; } else { ss << ", "; } ss << " {" << R"("runID": )" << run.runID << ',' << R"("tag": )" << std::quoted(run.tag) << "," << R"("startTime": )" << std::quoted(timePointToString(run.startTime)) << ',' << R"("lastUpdate": )" << std::quoted(timePointToString(run.lastUpdate)) << ',' << R"("taskCounts": {)"; bool firstState = true; for (const auto &[state, count] : run.taskStateCounts) { if (firstState) { firstState = false; } else { ss << ", "; } ss << std::quoted(state._to_string()) << ':' << count; } ss << '}' // end of taskCounts << '}'; // end of item } ss << "]\n"; } else { // HTML ss << "
Daggy " "Runs

Current Runs


"; if (!dagRuns.empty()) { std::sort(dagRuns.begin(), dagRuns.end(), [](const auto &a, const auto &b) { return a.startTime > b.startTime; }); ss << ""; for (auto &ds : dagRuns) { size_t nTasks = 0; for (const auto &[k, cnt] : ds.taskStateCounts) nTasks += cnt; ss << "" << R"(" << "" << "" << "" << "" << "" << "" << "" << "" << "" << "" << ""; } ss << "
Run IDTagState# " "TasksStart TimeLast " "UpdateQueuedRunningRetryErroredCompleted
)" << ds.runID << "" << ds.tag << "" << ds.runState << "" << nTasks << "" << timePointToString(ds.startTime) << "" << timePointToString(ds.lastUpdate) << "" << ds.taskStateCounts[RunState::QUEUED] << "" << ds.taskStateCounts[RunState::RUNNING] << "" << ds.taskStateCounts[RunState::RETRY] << "" << ds.taskStateCounts[RunState::ERRORED] << "" << ds.taskStateCounts[RunState::COMPLETED] << "
"; } ss << "\n"; } response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; if (!request.hasParam(":runID")) { REQ_RESPONSE(Not_Found, "No runID provided in URL"); } auto runID = request.param(":runID").as(); auto run = logger_.getDAGRun(runID); bool isJSON = requestIsForJSON(request); std::stringstream ss; if (isJSON) { bool first = true; ss << "{" << R"("runID": )" << runID << ',' << R"("tag": )" << std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )" << tasksToJSON(run.dagSpec.tasks) << ','; // task run states ss << R"("taskStates": { )"; first = true; for (const auto &[name, state] : run.taskRunStates) { if (first) { first = false; } else { ss << ','; } ss << std::quoted(name) << ": " << std::quoted(state._to_string()); } ss << "},"; // Attempt records first = true; ss << R"("taskAttempts": { )"; for (const auto &[taskName, attempts] : run.taskAttempts) { if (first) { first = false; } else { ss << ','; } ss << std::quoted(taskName) << ": ["; bool firstAttempt = true; for (const auto &attempt : attempts) { if (firstAttempt) { firstAttempt = false; } else { ss << ','; } ss << attemptRecordToJSON(attempt); } ss << ']'; } ss << "},"; // DAG state changes first = true; ss << R"("dagStateChanges": [ )"; for (const auto &change : run.dagStateChanges) { if (first) { first = false; } else { ss << ','; } ss << stateUpdateRecordToJSON(change); } ss << "]"; ss << "}\n"; } else { std::unordered_map stateCounts; for (const auto &[_, state] : run.taskRunStates) { stateCounts[state]++; } ss << R"(
Details for RunID )" << runID << R"(

Summary

)" << "" << "" << "" << "" << "" << "" << "" << "" << "" << "
Run IDTagState #Tasks QueuedRunningRetry ErroredCompleted
" << runID << "" << run.dagSpec.tag << "" << run.dagStateChanges.back().state << "" << run.dagSpec.tasks.size() << "" << stateCounts[RunState::QUEUED] << "" << stateCounts[RunState::RUNNING] << "" << stateCounts[RunState::RETRY] << "" << stateCounts[RunState::ERRORED] << "" << stateCounts[RunState::COMPLETED] << "
" << "

Task Details

" << ""; for (const auto &[taskName, task] : run.dagSpec.tasks) { ss << "" << "" << "" << "" << ""; } ss << "
Task Name StateLast " "Update Logs
" << taskName << "" << run.taskRunStates.at(taskName) << "" << timePointToString(run.taskStateChanges.at(taskName).back().time) << "Logs" << "
"; } response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleStopDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; if (!request.hasParam(":runID")) { REQ_RESPONSE(Not_Found, "No runID provided in URL"); } auto runID = request.param(":runID").as(); { std::lock_guard lock(runnerGuard_); auto it = runners_.find(runID); if (it != runners_.end()) { it->second->stop(true, false); } } response.send(Pistache::Http::Code::Ok, ""); } void Server::handleGetDAGRunState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; DAGRunID runID = request.param(":runID").as(); RunState state = RunState::QUEUED; try { state = logger_.getDAGRunState(runID); std::stringstream ss; ss << R"({ "runID": )" << runID << R"(, "state": )" << std::quoted(state._to_string()) << '}'; response.send(Pistache::Http::Code::Ok, ss.str()); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } } void Server::queueDAG_(DAGRunID runID, const TaskDAG &dag, const TaskParameters &taskParameters) { std::lock_guard lock(runnerGuard_); /* auto it = runners_.emplace( std::piecewise_construct, std::forward_as_tuple(runID), std::forward_as_tuple(runID, executor_, logger_, dag, taskParameters)); */ auto it = runners_.emplace( runID, std::make_shared(runID, executor_, logger_, dag, taskParameters)); if (!it.second) throw std::runtime_error("A DAGRun with the same ID is already running"); auto runner = it.first->second; runnerPool_.addTask([runner, runID, this]() { runner->run(); std::lock_guard lock(this->runnerGuard_); this->runners_.extract(runID); }); } void Server::handleSetDAGRunState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; // TODO handle state transition DAGRunID runID = request.param(":runID").as(); RunState newState = RunState::_from_string( request.param(":state").as().c_str()); std::shared_ptr runner{nullptr}; { std::lock_guard lock(runnerGuard_); auto it = runners_.find(runID); if (runners_.find(runID) != runners_.end()) { runner = it->second; } } if (runner) { switch (newState) { case RunState::PAUSED: case RunState::KILLED: { runner->stop(true, true); logger_.updateDAGRunState(runID, newState); break; } default: { REQ_RESPONSE(Method_Not_Allowed, std::string{"Cannot transition to state "} + newState._to_string()); } } } else { switch (newState) { case RunState::QUEUED: { auto dagRun = logger_.getDAGRun(runID); auto dag = buildDAGFromTasks(dagRun.dagSpec.tasks, dagRun.taskStateChanges); dag.resetRunning(); queueDAG_(runID, dag, dagRun.dagSpec.taskConfig); break; } default: REQ_RESPONSE( Method_Not_Allowed, std::string{"DAG not running, cannot transition to state "} + newState._to_string()); } } REQ_RESPONSE(Ok, ""); } void Server::handleGetTask(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); bool isJSON = requestIsForJSON(request); std::stringstream ss; if (isJSON) { Task task; try { task = logger_.getTask(runID, taskName); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } ss << taskToJSON(task); } else { std::optional tr; try { tr.emplace(logger_.getTaskRecord(runID, taskName)); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } ss << "Task Details for " << runID << " / " << taskName << "" << "" << "" << "" << "" << ""; std::sort(tr->attempts.begin(), tr->attempts.end(), [](const auto &a, const auto &b) { return a.startTime < b.startTime; }); for (const auto &attempt : tr->attempts) { ss << ""; } ss << "
Name" << taskName << "
State" << tr->state << "
Definition" << taskToJSON(tr->task) << "
Attempts
" << timePointToString(attempt.startTime) << "
rc: " << attempt.rc
           << "\n\nstdout:\n--------------\n"
           << attempt.outputLog << "\n\nstderr:\n--------------\n"
           << attempt.errorLog << "\n\nexecutor:\n--------------\n"
           << attempt.executorLog << "
\n"; } response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleGetTaskState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); try { auto state = logger_.getTaskState(runID, taskName); std::stringstream ss; ss << R"({ "runID": )" << runID << R"(, "taskName": )" << std::quoted(taskName) << R"(, "state": )" << std::quoted(state._to_string()) << "}\n"; response.send(Pistache::Http::Code::Ok, ss.str()); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } } void Server::handleSetTaskState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; // TODO implement handling of task state auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); RunState state = RunState::_from_string( request.param(":state").as().c_str()); try { logger_.updateTaskState(runID, taskName, state); response.send(Pistache::Http::Code::Ok, ""); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } } void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { response.send(Pistache::Http::Code::Ok, R"({ "msg": "Ya like DAGs?"}\n)"); } void Server::handleRoot(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { response.headers().add( std::make_shared("/v1/dagruns")); response.send(Pistache::Http::Code::Moved_Permanently, R"({ "msg": "These are the dags you are looking for"}\n)"); } /* * handleAuth will check any auth methods and handle any responses in the * case of failed auth. If it returns false, callers should cease handling * the response */ bool Server::handleAuth(const Pistache::Rest::Request &request) { return true; } } // namespace daggy::daggyd