- Adding dagRun REST method

This commit is contained in:
Ian Roddis
2021-08-14 11:11:12 -03:00
parent 2525731f5a
commit 4c6bc2a540
2 changed files with 99 additions and 5 deletions

View File

@@ -4,18 +4,30 @@
#include <pistache/endpoint.h> #include <pistache/endpoint.h>
#include <pistache/http.h> #include <pistache/http.h>
#include <rapidjson/document.h> #include "loggers/dag_run/DAGRunLogger.hpp"
#include "executors/task/TaskExecutor.hpp"
namespace daggy { namespace daggy {
class Server { class Server {
public: public:
Server(Pistache::Address addr) Server(Pistache::Address addr
: endpoint_(addr), desc_("Daggy API", "0.1") {} , loggers::dag_run::DAGRunLogger & logger
, executors::task::TaskExecutor & executor
, ThreadPool & runnerPool
)
: endpoint_(addr)
, desc_("Daggy API", "0.1")
, logger_(logger)
, executor_(executor)
, runnerPool_(runnerPool)
{}
void init(int threads = 1); void init(int threads = 1);
void start(); void start();
void shutdown();
private: private:
void createDescription(); void createDescription();
@@ -27,8 +39,14 @@ namespace daggy {
void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response);
Pistache::Http::Endpoint endpoint_; Pistache::Http::Endpoint endpoint_;
Pistache::Rest::Description desc_; Pistache::Rest::Description desc_;
Pistache::Rest::Router router_; Pistache::Rest::Router router_;
loggers::dag_run::DAGRunLogger & logger_;
executors::task::TaskExecutor & executor_;
ThreadPool & runnerPool_;
}; };
} }

View File

@@ -1,5 +1,11 @@
#include <daggy/Server.hpp> #include <daggy/Server.hpp>
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return;
namespace rj = rapidjson;
using namespace Pistache; using namespace Pistache;
namespace daggy { namespace daggy {
@@ -14,7 +20,11 @@ namespace daggy {
router_.initFromDescription(desc_); router_.initFromDescription(desc_);
endpoint_.setHandler(router_.handler()); endpoint_.setHandler(router_.handler());
endpoint_.serve(); endpoint_.serveThreaded();
}
void Server::shutdown() {
endpoint_.shutdown();
} }
void Server::createDescription() { void Server::createDescription() {
@@ -65,7 +75,73 @@ namespace daggy {
.response(Http::Code::Ok, "Details of a specific DAG run"); .response(Http::Code::Ok, "Details of a specific DAG run");
} }
/*
* {
* "name": "DAG Run Name"
* "parameters": {...}
* "tasks": {...}
*/
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return;
rj::Document doc;
try {
doc.Parse(request.body().c_str());
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what());
}
if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); }
if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); }
if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
std::string runName = doc["name"].GetString();
std::vector<Task> tasks;
try {
auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray());
tasks.swap(parsedTasks);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
// Get parameters if there are any
ParameterValues parameters;
if (doc.HasMember("parameters")) {
try {
auto parsedParams = parametersFromJSON(doc["parameters"].GetObject());
parameters.swap(parsedParams);
} catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
}
// Get a run ID
auto runID = logger_.startDAGRun(runName, tasks);
auto dag = buildDAGFromTasks(tasks);
auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); });
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
}
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return;
}
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return;
}
void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
response.send(Pistache::Http::Code::Ok, "All good here"); response.send(Pistache::Http::Code::Ok, "Ya like DAGs?");
}
/*
* handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns
* false, callers should cease handling the response
*/
bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) {
(void)response;
return true;
} }
} }