diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index 5f50a1d..d951eda 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -4,18 +4,30 @@ #include #include -#include +#include "loggers/dag_run/DAGRunLogger.hpp" +#include "executors/task/TaskExecutor.hpp" namespace daggy { class Server { public: - Server(Pistache::Address addr) - : endpoint_(addr), desc_("Daggy API", "0.1") {} + Server(Pistache::Address addr + , loggers::dag_run::DAGRunLogger & logger + , executors::task::TaskExecutor & executor + , ThreadPool & runnerPool + ) + : endpoint_(addr) + , desc_("Daggy API", "0.1") + , logger_(logger) + , executor_(executor) + , runnerPool_(runnerPool) + {} void init(int threads = 1); void start(); + void shutdown(); + private: void createDescription(); @@ -27,8 +39,14 @@ namespace daggy { void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response); + Pistache::Http::Endpoint endpoint_; Pistache::Rest::Description desc_; Pistache::Rest::Router router_; + + loggers::dag_run::DAGRunLogger & logger_; + executors::task::TaskExecutor & executor_; + ThreadPool & runnerPool_; }; } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 2b4a313..8fb02e9 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -1,5 +1,11 @@ #include +#include +#include + +#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return; + +namespace rj = rapidjson; using namespace Pistache; namespace daggy { @@ -14,7 +20,11 @@ namespace daggy { router_.initFromDescription(desc_); endpoint_.setHandler(router_.handler()); - endpoint_.serve(); + endpoint_.serveThreaded(); + } + + void Server::shutdown() { + endpoint_.shutdown(); } void Server::createDescription() { @@ -65,7 +75,73 @@ namespace daggy { .response(Http::Code::Ok, "Details of a specific DAG run"); } + /* + * { + * "name": "DAG Run Name" + * "parameters": {...} + * "tasks": {...} + */ + void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + if (! handleAuth(request, response)) return; + + rj::Document doc; + try { + doc.Parse(request.body().c_str()); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what()); + } + + if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } + if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } + if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } + + std::string runName = doc["name"].GetString(); + std::vector tasks; + try { + auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray()); + tasks.swap(parsedTasks); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, e.what()); + } + + // Get parameters if there are any + ParameterValues parameters; + if (doc.HasMember("parameters")) { + try { + auto parsedParams = parametersFromJSON(doc["parameters"].GetObject()); + parameters.swap(parsedParams); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, e.what()); + } + } + + // Get a run ID + auto runID = logger_.startDAGRun(runName, tasks); + auto dag = buildDAGFromTasks(tasks); + + auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); + + response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); + } + + void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + if (! handleAuth(request, response)) return; + } + + void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + if (! handleAuth(request, response)) return; + } + void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - response.send(Pistache::Http::Code::Ok, "All good here"); + response.send(Pistache::Http::Code::Ok, "Ya like DAGs?"); + } + + /* + * handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns + * false, callers should cease handling the response + */ + bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) { + (void)response; + return true; } }