diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index d951eda..aa4d771 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -7,25 +9,28 @@ #include "loggers/dag_run/DAGRunLogger.hpp" #include "executors/task/TaskExecutor.hpp" +namespace fs = std::filesystem; + namespace daggy { class Server { public: - Server(Pistache::Address addr - , loggers::dag_run::DAGRunLogger & logger - , executors::task::TaskExecutor & executor - , ThreadPool & runnerPool - ) - : endpoint_(addr) - , desc_("Daggy API", "0.1") - , logger_(logger) - , executor_(executor) - , runnerPool_(runnerPool) - {} + Server(const Pistache::Address &listenSpec, loggers::dag_run::DAGRunLogger &logger, + executors::task::TaskExecutor &executor, + size_t nDAGRunners + ) + : endpoint_(listenSpec), desc_("Daggy API", "0.1"), logger_(logger), executor_(executor), + runnerPool_(nDAGRunners) {} + + Server &setWebHandlerThreads(size_t nThreads); + + Server &setSSLCertificates(const fs::path &cert, const fs::path &key); void init(int threads = 1); void start(); + uint16_t getPort() const; + void shutdown(); private: @@ -39,14 +44,14 @@ namespace daggy { void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response); + bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response); Pistache::Http::Endpoint endpoint_; Pistache::Rest::Description desc_; Pistache::Rest::Router router_; - loggers::dag_run::DAGRunLogger & logger_; - executors::task::TaskExecutor & executor_; - ThreadPool & runnerPool_; + loggers::dag_run::DAGRunLogger &logger_; + executors::task::TaskExecutor &executor_; + ThreadPool runnerPool_; }; } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 8fb02e9..6115d72 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -3,7 +3,7 @@ #include #include -#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return; +#define REQ_ERROR(code, msg) response.send(Pistache::Http::Code::code, msg); return; namespace rj = rapidjson; using namespace Pistache; @@ -11,7 +11,8 @@ using namespace Pistache; namespace daggy { void Server::init(int threads) { auto opts = Http::Endpoint::options() - .threads(threads); + .threads(threads) + .flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort); endpoint_.init(opts); createDescription(); } @@ -27,6 +28,10 @@ namespace daggy { endpoint_.shutdown(); } + uint16_t Server::getPort() const { + return endpoint_.getPort(); + } + void Server::createDescription() { desc_ .info() @@ -51,7 +56,7 @@ namespace daggy { auto versionPath = desc_.path("/v1"); - auto dagPath = versionPath.path("/dag"); + auto dagPath = versionPath.path("/dagrun"); // Run a DAG dagPath @@ -82,7 +87,7 @@ namespace daggy { * "tasks": {...} */ void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - if (! handleAuth(request, response)) return; + if (!handleAuth(request, response)) return; rj::Document doc; try { @@ -91,9 +96,9 @@ namespace daggy { REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what()); } - if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } - if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } - if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } + if (!doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } + if (!doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } + if (!doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } std::string runName = doc["name"].GetString(); std::vector tasks; @@ -119,17 +124,18 @@ namespace daggy { auto runID = logger_.startDAGRun(runName, tasks); auto dag = buildDAGFromTasks(tasks); - auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); + auto fut = runnerPool_.addTask( + [this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); } void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - if (! handleAuth(request, response)) return; + if (!handleAuth(request, response)) return; } void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - if (! handleAuth(request, response)) return; + if (!handleAuth(request, response)) return; } void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { @@ -140,8 +146,8 @@ namespace daggy { * handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns * false, callers should cease handling the response */ - bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) { - (void)response; + bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response) { + (void) response; return true; } } diff --git a/examples/sample_dag.json b/examples/sample_dag.json index b7fb4f9..251b400 100644 --- a/examples/sample_dag.json +++ b/examples/sample_dag.json @@ -1,4 +1,15 @@ { + "parameters": { + "SOURCE": [ + "a", + "b", + "c" + ], + "DATE": [ + "2021-01-01", + "2021-01-02" + ] + }, "tasks": [ { "name": "pull_data_a", @@ -8,7 +19,9 @@ "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A", "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A", "timeout_seconds": 30, - "children": [ "merge_data" ] + "children": [ + "merge_data" + ] }, { "name": "pull_data_b", @@ -18,7 +31,9 @@ "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B", "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B", "timeout_seconds": 30, - "children": [ "merge_data" ] + "children": [ + "merge_data" + ] }, { "name": "merge_data", diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp new file mode 100644 index 0000000..ecf0836 --- /dev/null +++ b/tests/unit_server.cpp @@ -0,0 +1,93 @@ +#include +#include +#include + +#include +#include + +#include "daggy/Server.hpp" +#include "daggy/executors/task/ForkingTaskExecutor.hpp" +#include "daggy/loggers/dag_run/OStreamLogger.hpp" + +Pistache::Http::Response +REQUEST(Pistache::Http::Experimental::Client &client, std::string url, std::string payload = "") { + Pistache::Http::Response response; + auto reqSpec = (payload.empty() ? client.get(url) : client.post(url)); + reqSpec.timeout(std::chrono::seconds(2)); + if (!payload.empty()) { + reqSpec.body(payload); + } + auto request = reqSpec.send(); + bool ok = false, error = false; + std::string msg; + request.then( + [&](Pistache::Http::Response rsp) { + ok = true; + response = rsp; + }, + [&](std::exception_ptr ptr) { + error = true; + try { + std::rethrow_exception(ptr); + } catch (std::exception &e) { + msg = e.what(); + } + } + ); + + Pistache::Async::Barrier barrier(request); + barrier.wait_for(std::chrono::seconds(2)); + if (error) { + throw std::runtime_error(msg); + } + return response; +} + +TEST_CASE("Server Basic Endpoints", "[server_basic]") { + std::stringstream ss; + daggy::executors::task::ForkingTaskExecutor executor(10); + daggy::loggers::dag_run::OStreamLogger logger(ss); + Pistache::Address listenSpec("localhost", Pistache::Port(0)); + + const size_t nDAGRunners = 10, + nWebThreads = 10; + + daggy::Server server(listenSpec, logger, executor, 10); + server.init(nWebThreads); + server.start(); + + Pistache::Http::Experimental::Client client; + const std::string host = "localhost:"; + const std::string baseURL = host + std::to_string(server.getPort()); + client.init(); + + SECTION("/ready endpoint") { + auto response = REQUEST(client, baseURL + "/ready"); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + } + + SECTION("dag submission") { + // submit a DAGRun + std::string dagRun = R"({ + "name": "unit_server", + "parameters": { "DIRS": [ "A", "B" ] }, + "tasks": [ + { "name": "touch", + "command": [ "/usr/bin/touch", "/tmp/{{DIRS}}" ] + }, + { + "name": "cat", + "command": [ "/usr/bin/cat", "/tmp/A", "/tmp/B" ] + "parents": [ "touch" ] + } + ] + } + )"; + + auto response = REQUEST(client, baseURL + "/v1/dagrun/", dagRun); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + } + + server.shutdown(); + client.shutdown(); +} \ No newline at end of file diff --git a/utils/rest_server/rest_server.cpp b/utils/rest_server/rest_server.cpp index eaa7ef7..f244370 100644 --- a/utils/rest_server/rest_server.cpp +++ b/utils/rest_server/rest_server.cpp @@ -18,11 +18,8 @@ struct Options { size_t webThreads = 50; size_t dagThreads = 20; - // Pool name -> Executor Type + nWorkers - std::unordered_map> executors; - - // Logger Config - std::vector> loggers; + std::unique_ptr executor; + std::unique_ptr logger; }; int main(int argc, char **argv) {