#include #include #include #include #include #include #include #include #include #include #include #define REQ_RESPONSE(code, msg) \ std::stringstream ss; \ ss << R"({"message": )" << std::quoted(msg) << "}\n"; \ response.send(Pistache::Http::Code::code, ss.str()); \ return; using namespace Pistache; namespace daggy::daggyd { void addResponseHeaders(Pistache::Http::ResponseWriter &response) { response.headers().add( std::make_shared( "*")); response.headers().add( std::make_shared( "content-type")); response.headers().add( std::make_shared( "PUT, OPTIONS, PATCH, GET, HEAD, CONNECT, DELETE, POST, TRACE")); response.headers().add( std::make_shared( "application/javascript")); } void Server::init(size_t threads) { auto opts = Http::Endpoint::options() .threads(threads) .flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort) .maxRequestSize(4294967296) .maxResponseSize(4294967296); endpoint_.init(opts); createDescription(); } Server::Server(const Pistache::Address &listenSpec, loggers::dag_run::DAGRunLogger &logger, executors::task::TaskExecutor &executor, size_t nDAGRunners, const fs::path &staticAssetsDir) : endpoint_(listenSpec) , desc_("Daggy API", "0.1") , logger_(logger) , executor_(executor) , runnerPool_(nDAGRunners) , staticAssetsDir_(staticAssetsDir) { } Server::~Server() { shutdown(); } void Server::start() { router_.initFromDescription(desc_); endpoint_.setHandler(router_.handler()); endpoint_.serveThreaded(); } Server &Server::setSSLCertificates(const fs::path &cert, const fs::path &key) { endpoint_.useSSL(cert, key); return *this; } void Server::shutdown() { endpoint_.shutdown(); runnerPool_.shutdown(); } uint16_t Server::getPort() const { return endpoint_.getPort(); } void Server::createDescription() { desc_.info().license("MIT", "https://opensource.org/licenses/MIT"); auto backendErrorResponse = desc_.response(Http::Code::Internal_Server_Error, R"({"error": "An error occurred with the backend"})"); desc_.route(desc_.get("/")) .bind(&Server::handleStatic, this) .response(Http::Code::Ok, "Serve static assets") .hide(); desc_.route(desc_.get("/*")) .bind(&Server::handleStatic, this) .response(Http::Code::Ok, "Serve static assets") .hide(); desc_.route(desc_.get("/*/*")) .bind(&Server::handleStatic, this) .response(Http::Code::Ok, "Serve static assets") .hide(); desc_.schemes(Rest::Scheme::Http) .basePath("/v1") .produces(MIME(Application, Json)) .consumes(MIME(Application, Json)); desc_.route(desc_.get("/ready")) .bind(&Server::handleReady, this) .response(Http::Code::Ok, "Response to the /ready call") .hide(); auto versionPath = desc_.path("/v1"); /* DAG Run Summaries */ auto dagRunsPath = versionPath.path("/dagruns"); dagRunsPath.route(desc_.options("/")).bind(&Server::handleCORS, this); dagRunsPath.route(desc_.get("/")) .bind(&Server::handleQueryDAGs, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "List summaries DAGs"); /* Individual DAG Run routes */ auto dagRunPath = versionPath.path("/dagrun"); dagRunPath.route(desc_.options("/")).bind(&Server::handleCORS, this); dagRunPath.route(desc_.post("/")) .bind(&Server::handleRunDAG, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Run a DAG"); dagRunPath.route(desc_.options("/validate")) .bind(&Server::handleCORS, this); dagRunPath.route(desc_.post("/validate")) .bind(&Server::handleValidateDAG, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Validate a DAG Run Spec"); /* Management of a specific DAG */ auto specificDAGRunPath = dagRunPath.path("/:runID"); specificDAGRunPath.route(desc_.options("/")) .bind(&Server::handleCORS, this); specificDAGRunPath.route(desc_.del("/")) .bind(&Server::handleStopDAGRun, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Kill a running dag"); specificDAGRunPath.route(desc_.get("/")) .bind(&Server::handleGetDAGRun, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Full DAG Run"); specificDAGRunPath.route(desc_.options("/state")) .bind(&Server::handleCORS, this); specificDAGRunPath.route(desc_.get("/state")) .bind(&Server::handleGetDAGRunState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Structure of a DAG and DAG and Task run states"); specificDAGRunPath.route(desc_.options("/state/:state")) .bind(&Server::handleCORS, this); specificDAGRunPath.route(desc_.patch("/state/:state")) .bind(&Server::handleSetDAGRunState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Change the state of a DAG"); /* Task paths */ auto taskPath = specificDAGRunPath.path("/task/:taskName"); taskPath.route(desc_.options("/")).bind(&Server::handleCORS, this); taskPath.route(desc_.get("/")) .bind(&Server::handleGetTask, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Details of a specific task"); taskPath.route(desc_.del("/")) .bind(&Server::handleStopTask, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Kill a specific task"); /* Task State paths */ auto taskStatePath = taskPath.path("/state"); taskStatePath.route(desc_.options("/")).bind(&Server::handleCORS, this); taskStatePath.route(desc_.get("/")) .bind(&Server::handleGetTaskState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Get a task state"); taskStatePath.route(desc_.options("/:state")) .bind(&Server::handleCORS, this); taskStatePath.route(desc_.patch("/:state")) .bind(&Server::handleSetTaskState, this) .produces(MIME(Application, Json)) .response(Http::Code::Ok, "Set a task state"); } void Server::handleCORS(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); response.send(Pistache::Http::Code::Ok, ""); } void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; addResponseHeaders(response); DAGRunID runID = 0; try { DAGSpec dagSpec; dagSpec = dagFromJSON(request.body()); dagSpec.tasks = expandTaskSet(dagSpec.tasks, executor_, dagSpec.taskConfig.variables); // Get a run ID runID = logger_.startDAGRun(dagSpec); auto dag = buildDAGFromTasks(dagSpec.tasks); queueDAG_(runID, dag, dagSpec.taskConfig); } catch (std::runtime_error &e) { REQ_RESPONSE(Not_Acceptable, e.what()); } response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}\n"); } void Server::handleValidateDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); try { dagFromJSON(request.body()); response.send(Pistache::Http::Code::Ok, R"({"valid": true}\n)"); } catch (std::exception &e) { std::string error = e.what(); response.send( Pistache::Http::Code::Ok, std::string{R"({"valid": false, "error": })"} + error + "}\n"); } } void Server::handleQueryDAGs(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; bool all = false; std::string tag = ""; if (request.query().has("tag")) { tag = request.query().get("tag").value(); } if (request.query().has("all")) { auto val = request.query().get("all").value(); if (val == "true" or val == "1") { all = true; } } auto dagRuns = logger_.queryDAGRuns(tag, all); std::stringstream ss; // default to json ss << '['; bool first = true; for (const auto &run : dagRuns) { if (first) { first = false; } else { ss << ", "; } ss << " {" << R"("runID": )" << run.runID << ',' << R"("tag": )" << std::quoted(run.tag) << "," << R"("state": )" << std::quoted(run.runState._to_string()) << "," << R"("startTime": )" << std::quoted(timePointToString(run.startTime)) << ',' << R"("lastUpdate": )" << std::quoted(timePointToString(run.lastUpdate)) << ',' << R"("taskCounts": {)"; bool firstState = true; for (const auto &[state, count] : run.taskStateCounts) { if (firstState) { firstState = false; } else { ss << ", "; } ss << std::quoted(state._to_string()) << ':' << count; } ss << '}' // end of taskCounts << '}'; // end of item } ss << "]\n"; response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; if (!request.hasParam(":runID")) { REQ_RESPONSE(Not_Found, "No runID provided in URL"); } auto runID = request.param(":runID").as(); auto run = logger_.getDAGRun(runID); std::optional filterState; if (request.query().has("state")) { auto val = request.query().get("state").value(); filterState = RunState::_from_string(val.c_str()); } std::stringstream ss; bool first = true; ss << "{" << R"("runID": )" << runID << ',' << R"("tag": )" << std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )" << tasksToJSON(run.dagSpec.tasks) << ','; // task run states ss << R"("taskStates": { )"; first = true; for (const auto &[name, state] : run.taskRunStates) { if (first) { first = false; } else { ss << ','; } ss << std::quoted(name) << ": " << std::quoted(state._to_string()); } ss << "},"; // Attempt records first = true; ss << R"("taskAttempts": { )"; for (const auto &[taskName, attempts] : run.taskAttempts) { if (first) { first = false; } else { ss << ','; } ss << std::quoted(taskName) << ": ["; bool firstAttempt = true; for (const auto &attempt : attempts) { if (firstAttempt) { firstAttempt = false; } else { ss << ','; } ss << attemptRecordToJSON(attempt); } ss << ']'; } ss << "},"; // DAG state changes first = true; ss << R"("dagStateChanges": [ )"; for (const auto &change : run.dagStateChanges) { if (first) { first = false; } else { ss << ','; } ss << stateUpdateRecordToJSON(change); } ss << "]"; ss << "}\n"; response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleStopDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; if (!request.hasParam(":runID")) { REQ_RESPONSE(Not_Found, "No runID provided in URL"); } auto runID = request.param(":runID").as(); { std::lock_guard lock(runnerGuard_); auto it = runners_.find(runID); if (it != runners_.end()) { it->second->stop(true, false); } } response.send(Pistache::Http::Code::Ok, ""); } void Server::handleGetDAGRunState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; DAGRunID runID = request.param(":runID").as(); RunState state = RunState::QUEUED; try { state = logger_.getDAGRunState(runID); std::stringstream ss; ss << R"({ "runID": )" << runID << R"(, "state": )" << std::quoted(state._to_string()) << '}'; response.send(Pistache::Http::Code::Ok, ss.str()); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } } void Server::queueDAG_(DAGRunID runID, const TaskDAG &dag, const TaskParameters &taskParameters) { std::lock_guard lock(runnerGuard_); /* auto it = runners_.emplace( std::piecewise_construct, std::forward_as_tuple(runID), std::forward_as_tuple(runID, executor_, logger_, dag, taskParameters)); */ auto it = runners_.emplace( runID, std::make_shared(runID, executor_, logger_, dag, taskParameters)); if (!it.second) throw std::runtime_error("A DAGRun with the same ID is already running"); auto runner = it.first->second; runnerPool_.addTask([runner, runID, this]() { runner->run(); std::lock_guard lock(this->runnerGuard_); this->runners_.extract(runID); }); } void Server::handleSetDAGRunState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { if (!handleAuth(request)) return; addResponseHeaders(response); // TODO handle state transition DAGRunID runID = request.param(":runID").as(); RunState newState = RunState::_from_string( request.param(":state").as().c_str()); std::shared_ptr runner{nullptr}; { std::lock_guard lock(runnerGuard_); auto it = runners_.find(runID); if (runners_.find(runID) != runners_.end()) { runner = it->second; } } if (runner) { switch (newState) { case RunState::PAUSED: case RunState::KILLED: { runner->stop(true, true); logger_.updateDAGRunState(runID, newState); break; } default: { REQ_RESPONSE(Method_Not_Allowed, std::string{"Cannot transition to state "} + newState._to_string()); } } } else { switch (newState) { case RunState::QUEUED: { auto dagRun = logger_.getDAGRun(runID); auto dag = buildDAGFromTasks(dagRun.dagSpec.tasks, dagRun.taskStateChanges); dag.resetRunning(); queueDAG_(runID, dag, dagRun.dagSpec.taskConfig); break; } default: REQ_RESPONSE( Method_Not_Allowed, std::string{"DAG not running, cannot transition to state "} + newState._to_string()); } } REQ_RESPONSE(Ok, ""); } void Server::handleGetTask(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); std::stringstream ss; Task task; try { task = logger_.getTask(runID, taskName); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } ss << taskToJSON(task); response.send(Pistache::Http::Code::Ok, ss.str()); } void Server::handleStopTask(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); std::shared_ptr runner{nullptr}; { std::lock_guard lock(runnerGuard_); auto it = runners_.find(runID); if (runners_.find(runID) != runners_.end()) { runner = it->second; } } if (runner) { runner->stopTask(taskName); } REQ_RESPONSE(Ok, ""); } void Server::handleGetTaskState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); try { auto state = logger_.getTaskState(runID, taskName); std::stringstream ss; ss << R"({ "runID": )" << runID << R"(, "taskName": )" << std::quoted(taskName) << R"(, "state": )" << std::quoted(state._to_string()) << "}\n"; response.send(Pistache::Http::Code::Ok, ss.str()); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } } void Server::handleSetTaskState(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); if (!handleAuth(request)) return; // TODO implement handling of task state auto runID = request.param(":runID").as(); auto taskName = request.param(":taskName").as(); RunState state = RunState::_from_string( request.param(":state").as().c_str()); try { logger_.updateTaskState(runID, taskName, state); response.send(Pistache::Http::Code::Ok, ""); } catch (std::exception &e) { REQ_RESPONSE(Not_Found, e.what()); } } void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); REQ_RESPONSE(Ok, "Ya like DAGs?"); } void Server::handleStatic(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { addResponseHeaders(response); std::string file = "index.html"; auto splats = request.splat(); if (!splats.empty()) { file = splats[0].as(); for (size_t i = 1; i < splats.size(); ++i) file += "/" + splats[i].as(); } auto fn = staticAssetsDir_ / file; auto ext = fn.extension(); if (!fs::exists(fn)) { std::cout << "Can't find " << fn << std::endl; REQ_RESPONSE(Not_Found, ""); } std::string contentType; if (ext == ".svg") { contentType = "image/svg+xml"; } else if (ext == ".html") { contentType = "text/html"; } else if (ext == ".css") { contentType = "text/css"; } else if (ext == ".js") { contentType = "text/javascript"; } else { REQ_RESPONSE(Bad_Request, "I don't know how to serve that kind of file"); } response.headers().remove(); response.headers().add( std::make_shared(contentType)); std::stringstream ss; std::ifstream ifh; ifh.open(fn, std::ios::binary); ss << ifh.rdbuf(); response.send(Pistache::Http::Code::Ok, ss.str()); } /* * handleAuth will check any auth methods and handle any responses in the * case of failed auth. If it returns false, callers should cease handling * the response */ bool Server::handleAuth(const Pistache::Rest::Request &request) { return true; } } // namespace daggy::daggyd