- Adding dagRun REST method

This commit is contained in:
Ian Roddis
2021-08-14 11:11:12 -03:00
parent ce2c0dd30f
commit 0a2a66bc59
2 changed files with 99 additions and 5 deletions

View File

@@ -4,18 +4,30 @@
#include <pistache/endpoint.h>
#include <pistache/http.h>
#include <rapidjson/document.h>
#include "loggers/dag_run/DAGRunLogger.hpp"
#include "executors/task/TaskExecutor.hpp"
namespace daggy {
class Server {
public:
Server(Pistache::Address addr)
: endpoint_(addr), desc_("Daggy API", "0.1") {}
Server(Pistache::Address addr
, loggers::dag_run::DAGRunLogger & logger
, executors::task::TaskExecutor & executor
, ThreadPool & runnerPool
)
: endpoint_(addr)
, desc_("Daggy API", "0.1")
, logger_(logger)
, executor_(executor)
, runnerPool_(runnerPool)
{}
void init(int threads = 1);
void start();
void shutdown();
private:
void createDescription();
@@ -27,8 +39,14 @@ namespace daggy {
void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response);
Pistache::Http::Endpoint endpoint_;
Pistache::Rest::Description desc_;
Pistache::Rest::Router router_;
loggers::dag_run::DAGRunLogger & logger_;
executors::task::TaskExecutor & executor_;
ThreadPool & runnerPool_;
};
}

View File

@@ -1,5 +1,11 @@
#include <daggy/Server.hpp>
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return;
namespace rj = rapidjson;
using namespace Pistache;
namespace daggy {
@@ -14,7 +20,11 @@ namespace daggy {
router_.initFromDescription(desc_);
endpoint_.setHandler(router_.handler());
endpoint_.serve();
endpoint_.serveThreaded();
}
void Server::shutdown() {
endpoint_.shutdown();
}
void Server::createDescription() {
@@ -65,7 +75,73 @@ namespace daggy {
.response(Http::Code::Ok, "Details of a specific DAG run");
}
/*
* {
* "name": "DAG Run Name"
* "parameters": {...}
* "tasks": {...}
*/
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return;
rj::Document doc;
try {
doc.Parse(request.body().c_str());
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what());
}
if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); }
if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); }
if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
std::string runName = doc["name"].GetString();
std::vector<Task> tasks;
try {
auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray());
tasks.swap(parsedTasks);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
// Get parameters if there are any
ParameterValues parameters;
if (doc.HasMember("parameters")) {
try {
auto parsedParams = parametersFromJSON(doc["parameters"].GetObject());
parameters.swap(parsedParams);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
}
// Get a run ID
auto runID = logger_.startDAGRun(runName, tasks);
auto dag = buildDAGFromTasks(tasks);
auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); });
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
}
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return;
}
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return;
}
void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
response.send(Pistache::Http::Code::Ok, "All good here");
response.send(Pistache::Http::Code::Ok, "Ya like DAGs?");
}
/*
* handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns
* false, callers should cease handling the response
*/
bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) {
(void)response;
return true;
}
}