- Adding unit tests for Server
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <filesystem>
|
||||||
|
|
||||||
#include <pistache/description.h>
|
#include <pistache/description.h>
|
||||||
#include <pistache/endpoint.h>
|
#include <pistache/endpoint.h>
|
||||||
#include <pistache/http.h>
|
#include <pistache/http.h>
|
||||||
@@ -7,25 +9,28 @@
|
|||||||
#include "loggers/dag_run/DAGRunLogger.hpp"
|
#include "loggers/dag_run/DAGRunLogger.hpp"
|
||||||
#include "executors/task/TaskExecutor.hpp"
|
#include "executors/task/TaskExecutor.hpp"
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
namespace daggy {
|
namespace daggy {
|
||||||
class Server {
|
class Server {
|
||||||
public:
|
public:
|
||||||
Server(Pistache::Address addr
|
Server(const Pistache::Address &listenSpec, loggers::dag_run::DAGRunLogger &logger,
|
||||||
, loggers::dag_run::DAGRunLogger & logger
|
executors::task::TaskExecutor &executor,
|
||||||
, executors::task::TaskExecutor & executor
|
size_t nDAGRunners
|
||||||
, ThreadPool & runnerPool
|
)
|
||||||
)
|
: endpoint_(listenSpec), desc_("Daggy API", "0.1"), logger_(logger), executor_(executor),
|
||||||
: endpoint_(addr)
|
runnerPool_(nDAGRunners) {}
|
||||||
, desc_("Daggy API", "0.1")
|
|
||||||
, logger_(logger)
|
Server &setWebHandlerThreads(size_t nThreads);
|
||||||
, executor_(executor)
|
|
||||||
, runnerPool_(runnerPool)
|
Server &setSSLCertificates(const fs::path &cert, const fs::path &key);
|
||||||
{}
|
|
||||||
|
|
||||||
void init(int threads = 1);
|
void init(int threads = 1);
|
||||||
|
|
||||||
void start();
|
void start();
|
||||||
|
|
||||||
|
uint16_t getPort() const;
|
||||||
|
|
||||||
void shutdown();
|
void shutdown();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@@ -39,14 +44,14 @@ namespace daggy {
|
|||||||
|
|
||||||
void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
|
void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
|
||||||
|
|
||||||
bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response);
|
bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response);
|
||||||
|
|
||||||
Pistache::Http::Endpoint endpoint_;
|
Pistache::Http::Endpoint endpoint_;
|
||||||
Pistache::Rest::Description desc_;
|
Pistache::Rest::Description desc_;
|
||||||
Pistache::Rest::Router router_;
|
Pistache::Rest::Router router_;
|
||||||
|
|
||||||
loggers::dag_run::DAGRunLogger & logger_;
|
loggers::dag_run::DAGRunLogger &logger_;
|
||||||
executors::task::TaskExecutor & executor_;
|
executors::task::TaskExecutor &executor_;
|
||||||
ThreadPool & runnerPool_;
|
ThreadPool runnerPool_;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
#include <daggy/Serialization.hpp>
|
#include <daggy/Serialization.hpp>
|
||||||
#include <daggy/Utilities.hpp>
|
#include <daggy/Utilities.hpp>
|
||||||
|
|
||||||
#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return;
|
#define REQ_ERROR(code, msg) response.send(Pistache::Http::Code::code, msg); return;
|
||||||
|
|
||||||
namespace rj = rapidjson;
|
namespace rj = rapidjson;
|
||||||
using namespace Pistache;
|
using namespace Pistache;
|
||||||
@@ -11,7 +11,8 @@ using namespace Pistache;
|
|||||||
namespace daggy {
|
namespace daggy {
|
||||||
void Server::init(int threads) {
|
void Server::init(int threads) {
|
||||||
auto opts = Http::Endpoint::options()
|
auto opts = Http::Endpoint::options()
|
||||||
.threads(threads);
|
.threads(threads)
|
||||||
|
.flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort);
|
||||||
endpoint_.init(opts);
|
endpoint_.init(opts);
|
||||||
createDescription();
|
createDescription();
|
||||||
}
|
}
|
||||||
@@ -27,6 +28,10 @@ namespace daggy {
|
|||||||
endpoint_.shutdown();
|
endpoint_.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint16_t Server::getPort() const {
|
||||||
|
return endpoint_.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
void Server::createDescription() {
|
void Server::createDescription() {
|
||||||
desc_
|
desc_
|
||||||
.info()
|
.info()
|
||||||
@@ -51,7 +56,7 @@ namespace daggy {
|
|||||||
|
|
||||||
auto versionPath = desc_.path("/v1");
|
auto versionPath = desc_.path("/v1");
|
||||||
|
|
||||||
auto dagPath = versionPath.path("/dag");
|
auto dagPath = versionPath.path("/dagrun");
|
||||||
|
|
||||||
// Run a DAG
|
// Run a DAG
|
||||||
dagPath
|
dagPath
|
||||||
@@ -82,7 +87,7 @@ namespace daggy {
|
|||||||
* "tasks": {...}
|
* "tasks": {...}
|
||||||
*/
|
*/
|
||||||
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
||||||
if (! handleAuth(request, response)) return;
|
if (!handleAuth(request, response)) return;
|
||||||
|
|
||||||
rj::Document doc;
|
rj::Document doc;
|
||||||
try {
|
try {
|
||||||
@@ -91,9 +96,9 @@ namespace daggy {
|
|||||||
REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what());
|
REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); }
|
if (!doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); }
|
||||||
if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); }
|
if (!doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); }
|
||||||
if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
|
if (!doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
|
||||||
|
|
||||||
std::string runName = doc["name"].GetString();
|
std::string runName = doc["name"].GetString();
|
||||||
std::vector<Task> tasks;
|
std::vector<Task> tasks;
|
||||||
@@ -119,17 +124,18 @@ namespace daggy {
|
|||||||
auto runID = logger_.startDAGRun(runName, tasks);
|
auto runID = logger_.startDAGRun(runName, tasks);
|
||||||
auto dag = buildDAGFromTasks(tasks);
|
auto dag = buildDAGFromTasks(tasks);
|
||||||
|
|
||||||
auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); });
|
auto fut = runnerPool_.addTask(
|
||||||
|
[this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); });
|
||||||
|
|
||||||
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
|
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
|
||||||
}
|
}
|
||||||
|
|
||||||
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
||||||
if (! handleAuth(request, response)) return;
|
if (!handleAuth(request, response)) return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
||||||
if (! handleAuth(request, response)) return;
|
if (!handleAuth(request, response)) return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
|
||||||
@@ -140,8 +146,8 @@ namespace daggy {
|
|||||||
* handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns
|
* handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns
|
||||||
* false, callers should cease handling the response
|
* false, callers should cease handling the response
|
||||||
*/
|
*/
|
||||||
bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) {
|
bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response) {
|
||||||
(void)response;
|
(void) response;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,15 @@
|
|||||||
{
|
{
|
||||||
|
"parameters": {
|
||||||
|
"SOURCE": [
|
||||||
|
"a",
|
||||||
|
"b",
|
||||||
|
"c"
|
||||||
|
],
|
||||||
|
"DATE": [
|
||||||
|
"2021-01-01",
|
||||||
|
"2021-01-02"
|
||||||
|
]
|
||||||
|
},
|
||||||
"tasks": [
|
"tasks": [
|
||||||
{
|
{
|
||||||
"name": "pull_data_a",
|
"name": "pull_data_a",
|
||||||
@@ -8,7 +19,9 @@
|
|||||||
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A",
|
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A",
|
||||||
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A",
|
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A",
|
||||||
"timeout_seconds": 30,
|
"timeout_seconds": 30,
|
||||||
"children": [ "merge_data" ]
|
"children": [
|
||||||
|
"merge_data"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "pull_data_b",
|
"name": "pull_data_b",
|
||||||
@@ -18,7 +31,9 @@
|
|||||||
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B",
|
"command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B",
|
||||||
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B",
|
"verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B",
|
||||||
"timeout_seconds": 30,
|
"timeout_seconds": 30,
|
||||||
"children": [ "merge_data" ]
|
"children": [
|
||||||
|
"merge_data"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "merge_data",
|
"name": "merge_data",
|
||||||
|
|||||||
93
tests/unit_server.cpp
Normal file
93
tests/unit_server.cpp
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
#include <iostream>
|
||||||
|
#include <filesystem>
|
||||||
|
#include <fstream>
|
||||||
|
|
||||||
|
#include <catch2/catch.hpp>
|
||||||
|
#include <pistache/client.h>
|
||||||
|
|
||||||
|
#include "daggy/Server.hpp"
|
||||||
|
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
||||||
|
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
|
||||||
|
|
||||||
|
Pistache::Http::Response
|
||||||
|
REQUEST(Pistache::Http::Experimental::Client &client, std::string url, std::string payload = "") {
|
||||||
|
Pistache::Http::Response response;
|
||||||
|
auto reqSpec = (payload.empty() ? client.get(url) : client.post(url));
|
||||||
|
reqSpec.timeout(std::chrono::seconds(2));
|
||||||
|
if (!payload.empty()) {
|
||||||
|
reqSpec.body(payload);
|
||||||
|
}
|
||||||
|
auto request = reqSpec.send();
|
||||||
|
bool ok = false, error = false;
|
||||||
|
std::string msg;
|
||||||
|
request.then(
|
||||||
|
[&](Pistache::Http::Response rsp) {
|
||||||
|
ok = true;
|
||||||
|
response = rsp;
|
||||||
|
},
|
||||||
|
[&](std::exception_ptr ptr) {
|
||||||
|
error = true;
|
||||||
|
try {
|
||||||
|
std::rethrow_exception(ptr);
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
msg = e.what();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
Pistache::Async::Barrier<Pistache::Http::Response> barrier(request);
|
||||||
|
barrier.wait_for(std::chrono::seconds(2));
|
||||||
|
if (error) {
|
||||||
|
throw std::runtime_error(msg);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Server Basic Endpoints", "[server_basic]") {
|
||||||
|
std::stringstream ss;
|
||||||
|
daggy::executors::task::ForkingTaskExecutor executor(10);
|
||||||
|
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
||||||
|
Pistache::Address listenSpec("localhost", Pistache::Port(0));
|
||||||
|
|
||||||
|
const size_t nDAGRunners = 10,
|
||||||
|
nWebThreads = 10;
|
||||||
|
|
||||||
|
daggy::Server server(listenSpec, logger, executor, 10);
|
||||||
|
server.init(nWebThreads);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Pistache::Http::Experimental::Client client;
|
||||||
|
const std::string host = "localhost:";
|
||||||
|
const std::string baseURL = host + std::to_string(server.getPort());
|
||||||
|
client.init();
|
||||||
|
|
||||||
|
SECTION("/ready endpoint") {
|
||||||
|
auto response = REQUEST(client, baseURL + "/ready");
|
||||||
|
REQUIRE(response.code() == Pistache::Http::Code::Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("dag submission") {
|
||||||
|
// submit a DAGRun
|
||||||
|
std::string dagRun = R"({
|
||||||
|
"name": "unit_server",
|
||||||
|
"parameters": { "DIRS": [ "A", "B" ] },
|
||||||
|
"tasks": [
|
||||||
|
{ "name": "touch",
|
||||||
|
"command": [ "/usr/bin/touch", "/tmp/{{DIRS}}" ]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "cat",
|
||||||
|
"command": [ "/usr/bin/cat", "/tmp/A", "/tmp/B" ]
|
||||||
|
"parents": [ "touch" ]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
)";
|
||||||
|
|
||||||
|
auto response = REQUEST(client, baseURL + "/v1/dagrun/", dagRun);
|
||||||
|
REQUIRE(response.code() == Pistache::Http::Code::Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
server.shutdown();
|
||||||
|
client.shutdown();
|
||||||
|
}
|
||||||
@@ -18,11 +18,8 @@ struct Options {
|
|||||||
size_t webThreads = 50;
|
size_t webThreads = 50;
|
||||||
size_t dagThreads = 20;
|
size_t dagThreads = 20;
|
||||||
|
|
||||||
// Pool name -> Executor Type + nWorkers
|
std::unique_ptr<daggy::executors::task::TaskExecutor> executor;
|
||||||
std::unordered_map<std::string, std::pair<std::string, size_t>> executors;
|
std::unique_ptr<daggy::loggers::dag_run::DAGRunLogger> logger;
|
||||||
|
|
||||||
// Logger Config
|
|
||||||
std::vector<std::unique_ptr<daggy::loggers::dag_run::DAGRunLogger>> loggers;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
|
|||||||
Reference in New Issue
Block a user