- Adding unit tests for Server

This commit is contained in:
Ian Roddis
2021-08-19 14:23:40 -03:00
parent 0a2a66bc59
commit db47bc1593
5 changed files with 150 additions and 34 deletions

View File

@@ -1,5 +1,7 @@
#pragma once #pragma once
#include <filesystem>
#include <pistache/description.h> #include <pistache/description.h>
#include <pistache/endpoint.h> #include <pistache/endpoint.h>
#include <pistache/http.h> #include <pistache/http.h>
@@ -7,25 +9,28 @@
#include "loggers/dag_run/DAGRunLogger.hpp" #include "loggers/dag_run/DAGRunLogger.hpp"
#include "executors/task/TaskExecutor.hpp" #include "executors/task/TaskExecutor.hpp"
namespace fs = std::filesystem;
namespace daggy { namespace daggy {
class Server { class Server {
public: public:
Server(Pistache::Address addr Server(const Pistache::Address &listenSpec, loggers::dag_run::DAGRunLogger &logger,
, loggers::dag_run::DAGRunLogger & logger executors::task::TaskExecutor &executor,
, executors::task::TaskExecutor & executor size_t nDAGRunners
, ThreadPool & runnerPool )
) : endpoint_(listenSpec), desc_("Daggy API", "0.1"), logger_(logger), executor_(executor),
: endpoint_(addr) runnerPool_(nDAGRunners) {}
, desc_("Daggy API", "0.1")
, logger_(logger) Server &setWebHandlerThreads(size_t nThreads);
, executor_(executor)
, runnerPool_(runnerPool) Server &setSSLCertificates(const fs::path &cert, const fs::path &key);
{}
void init(int threads = 1); void init(int threads = 1);
void start(); void start();
uint16_t getPort() const;
void shutdown(); void shutdown();
private: private:
@@ -39,14 +44,14 @@ namespace daggy {
void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response); bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response);
Pistache::Http::Endpoint endpoint_; Pistache::Http::Endpoint endpoint_;
Pistache::Rest::Description desc_; Pistache::Rest::Description desc_;
Pistache::Rest::Router router_; Pistache::Rest::Router router_;
loggers::dag_run::DAGRunLogger & logger_; loggers::dag_run::DAGRunLogger &logger_;
executors::task::TaskExecutor & executor_; executors::task::TaskExecutor &executor_;
ThreadPool & runnerPool_; ThreadPool runnerPool_;
}; };
} }

View File

@@ -3,7 +3,7 @@
#include <daggy/Serialization.hpp> #include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp> #include <daggy/Utilities.hpp>
#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return; #define REQ_ERROR(code, msg) response.send(Pistache::Http::Code::code, msg); return;
namespace rj = rapidjson; namespace rj = rapidjson;
using namespace Pistache; using namespace Pistache;
@@ -11,7 +11,8 @@ using namespace Pistache;
namespace daggy { namespace daggy {
void Server::init(int threads) { void Server::init(int threads) {
auto opts = Http::Endpoint::options() auto opts = Http::Endpoint::options()
.threads(threads); .threads(threads)
.flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort);
endpoint_.init(opts); endpoint_.init(opts);
createDescription(); createDescription();
} }
@@ -27,6 +28,10 @@ namespace daggy {
endpoint_.shutdown(); endpoint_.shutdown();
} }
uint16_t Server::getPort() const {
return endpoint_.getPort();
}
void Server::createDescription() { void Server::createDescription() {
desc_ desc_
.info() .info()
@@ -51,7 +56,7 @@ namespace daggy {
auto versionPath = desc_.path("/v1"); auto versionPath = desc_.path("/v1");
auto dagPath = versionPath.path("/dag"); auto dagPath = versionPath.path("/dagrun");
// Run a DAG // Run a DAG
dagPath dagPath
@@ -82,7 +87,7 @@ namespace daggy {
* "tasks": {...} * "tasks": {...}
*/ */
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return; if (!handleAuth(request, response)) return;
rj::Document doc; rj::Document doc;
try { try {
@@ -91,9 +96,9 @@ namespace daggy {
REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what()); REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what());
} }
if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } if (!doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); }
if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } if (!doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); }
if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } if (!doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
std::string runName = doc["name"].GetString(); std::string runName = doc["name"].GetString();
std::vector<Task> tasks; std::vector<Task> tasks;
@@ -119,17 +124,18 @@ namespace daggy {
auto runID = logger_.startDAGRun(runName, tasks); auto runID = logger_.startDAGRun(runName, tasks);
auto dag = buildDAGFromTasks(tasks); auto dag = buildDAGFromTasks(tasks);
auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); auto fut = runnerPool_.addTask(
[this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); });
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
} }
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return; if (!handleAuth(request, response)) return;
} }
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
if (! handleAuth(request, response)) return; if (!handleAuth(request, response)) return;
} }
void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
@@ -140,8 +146,8 @@ namespace daggy {
* handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns * handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns
* false, callers should cease handling the response * false, callers should cease handling the response
*/ */
bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) { bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response) {
(void)response; (void) response;
return true; return true;
} }
} }

View File

@@ -1,4 +1,15 @@
{ {
"parameters": {
"SOURCE": [
"a",
"b",
"c"
],
"DATE": [
"2021-01-01",
"2021-01-02"
]
},
"tasks": [ "tasks": [
{ {
"name": "pull_data_a", "name": "pull_data_a",
@@ -8,7 +19,9 @@
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A", "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A",
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A", "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A",
"timeout_seconds": 30, "timeout_seconds": 30,
"children": [ "merge_data" ] "children": [
"merge_data"
]
}, },
{ {
"name": "pull_data_b", "name": "pull_data_b",
@@ -18,7 +31,9 @@
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B", "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B",
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B", "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B",
"timeout_seconds": 30, "timeout_seconds": 30,
"children": [ "merge_data" ] "children": [
"merge_data"
]
}, },
{ {
"name": "merge_data", "name": "merge_data",

93
tests/unit_server.cpp Normal file
View File

@@ -0,0 +1,93 @@
#include <iostream>
#include <filesystem>
#include <fstream>
#include <catch2/catch.hpp>
#include <pistache/client.h>
#include "daggy/Server.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
Pistache::Http::Response
REQUEST(Pistache::Http::Experimental::Client &client, std::string url, std::string payload = "") {
Pistache::Http::Response response;
auto reqSpec = (payload.empty() ? client.get(url) : client.post(url));
reqSpec.timeout(std::chrono::seconds(2));
if (!payload.empty()) {
reqSpec.body(payload);
}
auto request = reqSpec.send();
bool ok = false, error = false;
std::string msg;
request.then(
[&](Pistache::Http::Response rsp) {
ok = true;
response = rsp;
},
[&](std::exception_ptr ptr) {
error = true;
try {
std::rethrow_exception(ptr);
} catch (std::exception &e) {
msg = e.what();
}
}
);
Pistache::Async::Barrier<Pistache::Http::Response> barrier(request);
barrier.wait_for(std::chrono::seconds(2));
if (error) {
throw std::runtime_error(msg);
}
return response;
}
TEST_CASE("Server Basic Endpoints", "[server_basic]") {
std::stringstream ss;
daggy::executors::task::ForkingTaskExecutor executor(10);
daggy::loggers::dag_run::OStreamLogger logger(ss);
Pistache::Address listenSpec("localhost", Pistache::Port(0));
const size_t nDAGRunners = 10,
nWebThreads = 10;
daggy::Server server(listenSpec, logger, executor, 10);
server.init(nWebThreads);
server.start();
Pistache::Http::Experimental::Client client;
const std::string host = "localhost:";
const std::string baseURL = host + std::to_string(server.getPort());
client.init();
SECTION("/ready endpoint") {
auto response = REQUEST(client, baseURL + "/ready");
REQUIRE(response.code() == Pistache::Http::Code::Ok);
}
SECTION("dag submission") {
// submit a DAGRun
std::string dagRun = R"({
"name": "unit_server",
"parameters": { "DIRS": [ "A", "B" ] },
"tasks": [
{ "name": "touch",
"command": [ "/usr/bin/touch", "/tmp/{{DIRS}}" ]
},
{
"name": "cat",
"command": [ "/usr/bin/cat", "/tmp/A", "/tmp/B" ]
"parents": [ "touch" ]
}
]
}
)";
auto response = REQUEST(client, baseURL + "/v1/dagrun/", dagRun);
REQUIRE(response.code() == Pistache::Http::Code::Ok);
}
server.shutdown();
client.shutdown();
}

View File

@@ -18,11 +18,8 @@ struct Options {
size_t webThreads = 50; size_t webThreads = 50;
size_t dagThreads = 20; size_t dagThreads = 20;
// Pool name -> Executor Type + nWorkers std::unique_ptr<daggy::executors::task::TaskExecutor> executor;
std::unordered_map<std::string, std::pair<std::string, size_t>> executors; std::unique_ptr<daggy::loggers::dag_run::DAGRunLogger> logger;
// Logger Config
std::vector<std::unique_ptr<daggy::loggers::dag_run::DAGRunLogger>> loggers;
}; };
int main(int argc, char **argv) { int main(int argc, char **argv) {