From dfb71b63b877ed34705b3011b78b3c303b61d4e3 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 13 Aug 2021 12:43:39 -0300 Subject: [PATCH 1/8] - Sidequest: Clean up header-only library dependencies with cmake. --- CMakeLists.txt | 8 +++--- TODO.md | 2 ++ cmake/Catch2.cmake | 7 ++++++ cmake/MagicEnum.cmake | 24 ++++++++---------- cmake/argparse.cmake | 15 +++++++++++ cmake/{Pistache.cmake => pistache.cmake} | 0 cmake/rapidjson.cmake | 32 ++++++++++-------------- daggy/CMakeLists.txt | 5 +--- tests/CMakeLists.txt | 8 ------ utils/CMakeLists.txt | 1 + utils/rest_server/CMakeLists.txt | 5 ++++ utils/rest_server/daggy.cpp | 23 +++++++++++++++++ 12 files changed, 82 insertions(+), 48 deletions(-) create mode 100644 cmake/Catch2.cmake create mode 100644 cmake/argparse.cmake rename cmake/{Pistache.cmake => pistache.cmake} (100%) create mode 100644 utils/CMakeLists.txt create mode 100644 utils/rest_server/CMakeLists.txt create mode 100644 utils/rest_server/daggy.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c1950fc..69d7531 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,11 +9,11 @@ set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party) find_package(Threads REQUIRED) include(cmake/rapidjson.cmake) -include(cmake/Pistache.cmake) +include(cmake/pistache.cmake) include(cmake/MagicEnum.cmake) - -include_directories(${RAPIDJSON_INCLUDE_DIR}) -include_directories(${MAGIC_ENUM_INCLUDE_DIR}) +include(cmake/argparse.cmake) +include(cmake/Catch2.cmake) add_subdirectory(daggy) add_subdirectory(tests) +add_subdirectory(utils) diff --git a/TODO.md b/TODO.md index 980e511..e1c7746 100644 --- a/TODO.md +++ b/TODO.md @@ -13,6 +13,8 @@ Tasks - [ ] Slurm Executor - Loggers - [ ] FileSystemLogger + - [ ] Add unit tests + - [ ] Add more error checking - [ ] General logger - Completed diff --git a/cmake/Catch2.cmake b/cmake/Catch2.cmake new file mode 100644 index 0000000..61261e4 --- /dev/null +++ b/cmake/Catch2.cmake @@ -0,0 +1,7 @@ +Include(FetchContent) + +FetchContent_Declare( + Catch2 + GIT_REPOSITORY https://github.com/catchorg/Catch2.git + GIT_TAG v2.13.7) +FetchContent_MakeAvailable(Catch2) diff --git a/cmake/MagicEnum.cmake b/cmake/MagicEnum.cmake index d63a488..5e87ff7 100644 --- a/cmake/MagicEnum.cmake +++ b/cmake/MagicEnum.cmake @@ -1,17 +1,15 @@ include(ExternalProject) -# Download RapidJSON -ExternalProject_Add( - magic-enum - PREFIX "third_party/magic-enum" - GIT_REPOSITORY "https://github.com/Neargye/magic_enum" + +project(magicEnum) + +ExternalProject_Add(${PROJECT_NAME}-external + GIT_REPOSITORY https://github.com/Neargye/magic_enum.git GIT_TAG "v0.7.3" - TIMEOUT 10 - CONFIGURE_COMMAND "" + GIT_SHALLOW TRUE + SOURCE_DIR ${THIRD_PARTY_DIR}/${PROJECT_NAME} BUILD_COMMAND "" INSTALL_COMMAND "" - UPDATE_COMMAND "" -) - -# Magic Enums is a header-only -ExternalProject_Get_Property(magic-enum source_dir) -set(MAGIC_ENUM_INCLUDE_DIR ${source_dir}/include) \ No newline at end of file + CONFIGURE_COMMAND "") +add_library(${PROJECT_NAME} INTERFACE) +add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-external) +target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include) \ No newline at end of file diff --git a/cmake/argparse.cmake b/cmake/argparse.cmake new file mode 100644 index 0000000..cb82b3c --- /dev/null +++ b/cmake/argparse.cmake @@ -0,0 +1,15 @@ +include(ExternalProject) + +project(argparse) + +ExternalProject_Add(${PROJECT_NAME}-external + GIT_REPOSITORY https://github.com/p-ranav/argparse.git + GIT_TAG "v2.1" + GIT_SHALLOW TRUE + SOURCE_DIR ${THIRD_PARTY_DIR}/${PROJECT_NAME} + BUILD_COMMAND "" + INSTALL_COMMAND "" + CONFIGURE_COMMAND "") +add_library(${PROJECT_NAME} INTERFACE) +add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-external) +target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include) \ No newline at end of file diff --git a/cmake/Pistache.cmake b/cmake/pistache.cmake similarity index 100% rename from cmake/Pistache.cmake rename to cmake/pistache.cmake diff --git a/cmake/rapidjson.cmake b/cmake/rapidjson.cmake index 342f537..252fe23 100644 --- a/cmake/rapidjson.cmake +++ b/cmake/rapidjson.cmake @@ -1,21 +1,15 @@ include(ExternalProject) -# Download RapidJSON -ExternalProject_Add( - rapidjson - PREFIX "third_party/rapidjson" - GIT_REPOSITORY "https://github.com/Tencent/rapidjson.git" - GIT_TAG f54b0e47a08782a6131cc3d60f94d038fa6e0a51 - TIMEOUT 10 - CMAKE_ARGS - -DRAPIDJSON_BUILD_TESTS=OFF - -DRAPIDJSON_BUILD_DOC=OFF - -DRAPIDJSON_BUILD_EXAMPLES=OFF - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - UPDATE_COMMAND "" -) -# Prepare RapidJSON (RapidJSON is a header-only library) -ExternalProject_Get_Property(rapidjson source_dir) -set(RAPIDJSON_INCLUDE_DIR ${source_dir}/include) \ No newline at end of file +project(rapidjson) + +ExternalProject_Add(${PROJECT_NAME}-external + GIT_REPOSITORY https://github.com/Tencent/rapidjson.git + GIT_TAG "v1.1.0" + GIT_SHALLOW TRUE + SOURCE_DIR ${THIRD_PARTY_DIR}/${PROJECT_NAME} + BUILD_COMMAND "" + INSTALL_COMMAND "" + CONFIGURE_COMMAND "") +add_library(${PROJECT_NAME} INTERFACE) +add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-external) +target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include) \ No newline at end of file diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index 15bd0b2..9609a34 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -1,10 +1,7 @@ project(daggy) -#ExternalProject_Add_StepDependencies(pistache_extern build) - file(GLOB_RECURSE SOURCES src/*.cpp) add_library(${PROJECT_NAME} STATIC ${SOURCES}) include_directories(${PISTACHE_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC include) -target_link_libraries(${PROJECT_NAME} pistache pthread) -add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson magic-enum) \ No newline at end of file +target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson magicEnum) \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4d67d03..fee069f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,11 +1,3 @@ -Include(FetchContent) - -FetchContent_Declare( - Catch2 - GIT_REPOSITORY https://github.com/catchorg/Catch2.git - GIT_TAG v2.13.7) -FetchContent_MakeAvailable(Catch2) - project(tests) file(GLOB UNIT_TESTS unit_*.cpp) file(GLOB INTEGRATION_TESTS int_*.cpp) diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt new file mode 100644 index 0000000..bcfa9b9 --- /dev/null +++ b/utils/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(rest_server) \ No newline at end of file diff --git a/utils/rest_server/CMakeLists.txt b/utils/rest_server/CMakeLists.txt new file mode 100644 index 0000000..e509171 --- /dev/null +++ b/utils/rest_server/CMakeLists.txt @@ -0,0 +1,5 @@ +project(rest_server) +file(GLOB SOURCES *.cpp) +add_executable(${PROJECT_NAME} ${SOURCES}) +target_link_libraries(${PROJECT_NAME} daggy stdc++fs rapidjson pistache argparse) +set_target_properties(${PROJECT_NAME} PROPERTIES OUTPUT_NAME "daggy") \ No newline at end of file diff --git a/utils/rest_server/daggy.cpp b/utils/rest_server/daggy.cpp new file mode 100644 index 0000000..c969c64 --- /dev/null +++ b/utils/rest_server/daggy.cpp @@ -0,0 +1,23 @@ +#include +#include + +#include + +#include +#include +#include + +int main(int argc, char **argv) { + argparse::ArgumentParser args("Daggy"); + + args.add_argument("-v", "--verbose"); + args.add_argument("-c", "--config") + .help("Config file"); + args.add_argument("--ip") + .help("IP address to listen to"); + args.add_argument("--port") + .help("Port to listen to") + .action([](const std::string &value) { return std::stoi(value); }); + + daggy::Server endpoint; +} \ No newline at end of file From d731f8b6dcbdb5352a6eabc5a3753fa0389e1e16 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 13 Aug 2021 13:21:55 -0300 Subject: [PATCH 2/8] - More cmake cleanup --- cmake/pistache.cmake | 36 ++++++++++--------- daggy/CMakeLists.txt | 1 - daggy/include/daggy/Server.hpp | 17 ++------- daggy/src/Server.cpp | 14 ++++---- utils/rest_server/CMakeLists.txt | 3 +- .../{daggy.cpp => rest_server.cpp} | 4 +-- 6 files changed, 33 insertions(+), 42 deletions(-) rename utils/rest_server/{daggy.cpp => rest_server.cpp} (90%) diff --git a/cmake/pistache.cmake b/cmake/pistache.cmake index f5807f1..3080bf9 100644 --- a/cmake/pistache.cmake +++ b/cmake/pistache.cmake @@ -1,23 +1,25 @@ project(pistache) + include(ExternalProject) -ExternalProject_Add(PistacheDownload - PREFIX third_party - GIT_REPOSITORY https://github.com/pistacheio/pistache.git - GIT_TAG master - INSTALL_COMMAND "" -) +set_directory_properties(PROPERTIES EP_UPDATE_DISCONNECTED true) -set(PISTACHE_INCLUDE_DIR ${CMAKE_CURRENT_BINARY_DIR}/third_party/src/PistacheDownload/include) -set(PISTACHE_LIB_DIR ${CMAKE_CURRENT_BINARY_DIR}/third_party/src/PistacheDownload-build/src) +set(pistache_root ${THIRD_PARTY_DIR}/${PROJECT_NAME}) + +ExternalProject_Add(PistacheDownload + PREFIX ${pistache_root} + GIT_REPOSITORY https://github.com/pistacheio/pistache.git + GIT_TAG master + INSTALL_COMMAND "" + ) + +ExternalProject_Get_Property(PistacheDownload SOURCE_DIR) +set(PISTACHE_INCLUDE_DIR ${SOURCE_DIR}/include) +set(PISTACHE_LIB_DIR ${pistache_root}/src/PistacheDownload-build/src) + +file(MAKE_DIRECTORY ${PISTACHE_INCLUDE_DIR}) add_library(${PROJECT_NAME} SHARED IMPORTED) -add_dependencies(${PROJECT_NAME} $PistacheDownload) -set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_LIB_DIR}/libpistache.a") - -#add_library(${PROJECT_NAME} STATIC IMPORTED) - -#set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_BINARY_DIR}/lib/libpistache.a") - -#file(MAKE_DIRECTORY "${PISTACHE_BINARY_DIR}/include") -#target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE "${PISTACHE_BINARY_DIR}/include/") +add_dependencies(${PROJECT_NAME} PistacheDownload) +target_include_directories(${PROJECT_NAME} INTERFACE ${PISTACHE_INCLUDE_DIR}) +set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_LIB_DIR}/libpistache.a") \ No newline at end of file diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index 9609a34..edea93e 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -2,6 +2,5 @@ project(daggy) file(GLOB_RECURSE SOURCES src/*.cpp) add_library(${PROJECT_NAME} STATIC ${SOURCES}) -include_directories(${PISTACHE_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson magicEnum) \ No newline at end of file diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index cf81fb6..4e212dd 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -4,7 +4,7 @@ #include #include -// #include +#include namespace daggy { class Server { @@ -19,18 +19,6 @@ namespace daggy { private: void createDescription(); - // - // DAG Definition handlers - // - - void listDAGs(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - - void upsertDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - - void deleteDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - - void getDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - // // DAG Runs // @@ -43,9 +31,10 @@ namespace daggy { // Get status of specific run void getDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + Pistache::Http::Endpoint endpoint_; Pistache::Rest::Description desc_; Pistache::Rest::Router router_; - }; } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 216b793..8756606 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -32,13 +32,11 @@ namespace daggy { .produces(MIME(Application, Json)) .consumes(MIME(Application, Json)); - /* desc_ - .route(desc_.get("/ready")) - .bind(&Generic::handleReady) - .response(Http::Code::Ok, "Response to the /ready call") - .hide(); - */ + .route(desc_.get("/ready")) + .bind(&Server::handleReady, this) + .response(Http::Code::Ok, "Response to the /ready call") + .hide(); auto versionPath = desc_.path("/v1"); @@ -53,4 +51,8 @@ namespace daggy { */ } + + void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + response.send(Pistache::Http::Code::Ok, "All good here"); + } } diff --git a/utils/rest_server/CMakeLists.txt b/utils/rest_server/CMakeLists.txt index e509171..c6aeeae 100644 --- a/utils/rest_server/CMakeLists.txt +++ b/utils/rest_server/CMakeLists.txt @@ -1,5 +1,4 @@ project(rest_server) file(GLOB SOURCES *.cpp) add_executable(${PROJECT_NAME} ${SOURCES}) -target_link_libraries(${PROJECT_NAME} daggy stdc++fs rapidjson pistache argparse) -set_target_properties(${PROJECT_NAME} PROPERTIES OUTPUT_NAME "daggy") \ No newline at end of file +target_link_libraries(${PROJECT_NAME} pistache stdc++fs rapidjson argparse daggy) \ No newline at end of file diff --git a/utils/rest_server/daggy.cpp b/utils/rest_server/rest_server.cpp similarity index 90% rename from utils/rest_server/daggy.cpp rename to utils/rest_server/rest_server.cpp index c969c64..76bacc0 100644 --- a/utils/rest_server/daggy.cpp +++ b/utils/rest_server/rest_server.cpp @@ -1,7 +1,7 @@ #include #include -#include +#include #include #include @@ -19,5 +19,5 @@ int main(int argc, char **argv) { .help("Port to listen to") .action([](const std::string &value) { return std::stoi(value); }); - daggy::Server endpoint; + // daggy::Server endpoint(10); } \ No newline at end of file From eb8e530f9a4dc59b1f0bce9af4aacbe30af44b58 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 13 Aug 2021 13:28:47 -0300 Subject: [PATCH 3/8] - Checkpointing work before I restart the IDE. --- utils/rest_server/rest_server.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/utils/rest_server/rest_server.cpp b/utils/rest_server/rest_server.cpp index 76bacc0..4a19187 100644 --- a/utils/rest_server/rest_server.cpp +++ b/utils/rest_server/rest_server.cpp @@ -4,8 +4,26 @@ #include #include + +// Add executors here #include + +// Add loggers here #include +#include + +struct Options { + Pistache::Address listenAddress = "0.0.0.0"; + uint16_t listenPort = 2503; + size_t webThreads = 50; + size_t dagThreads = 20; + + // Pool name -> Executor Type + nWorkers + std::unordered_map> executors; + + // Logger Config + std::vector> loggers; +}; int main(int argc, char **argv) { argparse::ArgumentParser args("Daggy"); @@ -19,5 +37,7 @@ int main(int argc, char **argv) { .help("Port to listen to") .action([](const std::string &value) { return std::stoi(value); }); + // Set some defaults + // daggy::Server endpoint(10); } \ No newline at end of file From a668bf81f1d632ffca2c170c7caf32595391f15c Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Sat, 14 Aug 2021 08:36:49 -0300 Subject: [PATCH 4/8] - Checkpointing work so I can switch computers. --- daggy/include/daggy/Server.hpp | 12 +++--------- daggy/src/Server.cpp | 29 +++++++++++++++++++++-------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index 4e212dd..5f50a1d 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -19,17 +19,11 @@ namespace daggy { private: void createDescription(); - // - // DAG Runs - // + void handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - void runDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + void handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - // List - void getDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - - // Get status of specific run - void getDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + void handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 8756606..2b4a313 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -38,18 +38,31 @@ namespace daggy { .response(Http::Code::Ok, "Response to the /ready call") .hide(); + auto versionPath = desc_.path("/v1"); - auto accountsPath = versionPath.path("/accounts"); + auto dagPath = versionPath.path("/dag"); - /* - accountsPath - .route(desc_.get("/all")) - .bind(&BankerService::retrieveAllAccounts, this) - .produces(MIME(Application, Json), MIME(Application, Xml)) - .response(Http::Code::Ok, "The list of all account"); - */ + // Run a DAG + dagPath + .route(desc_.post("/")) + .bind(&Server::handleRunDAG, this) + .produces(MIME(Application, Json), MIME(Application, Xml)) + .response(Http::Code::Ok, "Run a DAG"); + // List all DAG runs + dagPath + .route(desc_.get("/")) + .bind(&Server::handleGetDAGRuns, this) + .produces(MIME(Application, Json), MIME(Application, Xml)) + .response(Http::Code::Ok, "The list of all known DAG Runs"); + + // List detailed DAG run + dagPath + .route(desc_.get("/{id}")) + .bind(&Server::handleGetDAGRun, this) + .produces(MIME(Application, Json), MIME(Application, Xml)) + .response(Http::Code::Ok, "Details of a specific DAG run"); } void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { From 2525731f5a6d56b69e22baccfbb6f03f69290522 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Sat, 14 Aug 2021 08:36:55 -0300 Subject: [PATCH 5/8] - Checkpointing work so I can switch computers. --- utils/rest_server/rest_server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/rest_server/rest_server.cpp b/utils/rest_server/rest_server.cpp index 4a19187..eaa7ef7 100644 --- a/utils/rest_server/rest_server.cpp +++ b/utils/rest_server/rest_server.cpp @@ -13,7 +13,7 @@ #include struct Options { - Pistache::Address listenAddress = "0.0.0.0"; + Pistache::IP listenAddress = Pistache::Ipv4::any(); uint16_t listenPort = 2503; size_t webThreads = 50; size_t dagThreads = 20; From 4c6bc2a5402a79ecd41b6abaaeff12e9e09a1ad4 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Sat, 14 Aug 2021 11:11:12 -0300 Subject: [PATCH 6/8] - Adding dagRun REST method --- daggy/include/daggy/Server.hpp | 24 ++++++++-- daggy/src/Server.cpp | 80 +++++++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index 5f50a1d..d951eda 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -4,18 +4,30 @@ #include #include -#include +#include "loggers/dag_run/DAGRunLogger.hpp" +#include "executors/task/TaskExecutor.hpp" namespace daggy { class Server { public: - Server(Pistache::Address addr) - : endpoint_(addr), desc_("Daggy API", "0.1") {} + Server(Pistache::Address addr + , loggers::dag_run::DAGRunLogger & logger + , executors::task::TaskExecutor & executor + , ThreadPool & runnerPool + ) + : endpoint_(addr) + , desc_("Daggy API", "0.1") + , logger_(logger) + , executor_(executor) + , runnerPool_(runnerPool) + {} void init(int threads = 1); void start(); + void shutdown(); + private: void createDescription(); @@ -27,8 +39,14 @@ namespace daggy { void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response); + Pistache::Http::Endpoint endpoint_; Pistache::Rest::Description desc_; Pistache::Rest::Router router_; + + loggers::dag_run::DAGRunLogger & logger_; + executors::task::TaskExecutor & executor_; + ThreadPool & runnerPool_; }; } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 2b4a313..8fb02e9 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -1,5 +1,11 @@ #include +#include +#include + +#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return; + +namespace rj = rapidjson; using namespace Pistache; namespace daggy { @@ -14,7 +20,11 @@ namespace daggy { router_.initFromDescription(desc_); endpoint_.setHandler(router_.handler()); - endpoint_.serve(); + endpoint_.serveThreaded(); + } + + void Server::shutdown() { + endpoint_.shutdown(); } void Server::createDescription() { @@ -65,7 +75,73 @@ namespace daggy { .response(Http::Code::Ok, "Details of a specific DAG run"); } + /* + * { + * "name": "DAG Run Name" + * "parameters": {...} + * "tasks": {...} + */ + void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + if (! handleAuth(request, response)) return; + + rj::Document doc; + try { + doc.Parse(request.body().c_str()); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what()); + } + + if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } + if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } + if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } + + std::string runName = doc["name"].GetString(); + std::vector tasks; + try { + auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray()); + tasks.swap(parsedTasks); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, e.what()); + } + + // Get parameters if there are any + ParameterValues parameters; + if (doc.HasMember("parameters")) { + try { + auto parsedParams = parametersFromJSON(doc["parameters"].GetObject()); + parameters.swap(parsedParams); + } catch (std::exception &e) { + REQ_ERROR(Bad_Request, e.what()); + } + } + + // Get a run ID + auto runID = logger_.startDAGRun(runName, tasks); + auto dag = buildDAGFromTasks(tasks); + + auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); + + response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); + } + + void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + if (! handleAuth(request, response)) return; + } + + void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { + if (! handleAuth(request, response)) return; + } + void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - response.send(Pistache::Http::Code::Ok, "All good here"); + response.send(Pistache::Http::Code::Ok, "Ya like DAGs?"); + } + + /* + * handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns + * false, callers should cease handling the response + */ + bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) { + (void)response; + return true; } } From 90f34c21b96e1ee1d49fb436f3ad943308352c5e Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 19 Aug 2021 14:23:40 -0300 Subject: [PATCH 7/8] - Adding unit tests for Server --- daggy/include/daggy/Server.hpp | 35 +++++++----- daggy/src/Server.cpp | 30 ++++++---- examples/sample_dag.json | 19 ++++++- tests/unit_server.cpp | 93 +++++++++++++++++++++++++++++++ utils/rest_server/rest_server.cpp | 7 +-- 5 files changed, 150 insertions(+), 34 deletions(-) create mode 100644 tests/unit_server.cpp diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index d951eda..aa4d771 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -7,25 +9,28 @@ #include "loggers/dag_run/DAGRunLogger.hpp" #include "executors/task/TaskExecutor.hpp" +namespace fs = std::filesystem; + namespace daggy { class Server { public: - Server(Pistache::Address addr - , loggers::dag_run::DAGRunLogger & logger - , executors::task::TaskExecutor & executor - , ThreadPool & runnerPool - ) - : endpoint_(addr) - , desc_("Daggy API", "0.1") - , logger_(logger) - , executor_(executor) - , runnerPool_(runnerPool) - {} + Server(const Pistache::Address &listenSpec, loggers::dag_run::DAGRunLogger &logger, + executors::task::TaskExecutor &executor, + size_t nDAGRunners + ) + : endpoint_(listenSpec), desc_("Daggy API", "0.1"), logger_(logger), executor_(executor), + runnerPool_(nDAGRunners) {} + + Server &setWebHandlerThreads(size_t nThreads); + + Server &setSSLCertificates(const fs::path &cert, const fs::path &key); void init(int threads = 1); void start(); + uint16_t getPort() const; + void shutdown(); private: @@ -39,14 +44,14 @@ namespace daggy { void handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response); + bool handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response); Pistache::Http::Endpoint endpoint_; Pistache::Rest::Description desc_; Pistache::Rest::Router router_; - loggers::dag_run::DAGRunLogger & logger_; - executors::task::TaskExecutor & executor_; - ThreadPool & runnerPool_; + loggers::dag_run::DAGRunLogger &logger_; + executors::task::TaskExecutor &executor_; + ThreadPool runnerPool_; }; } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 8fb02e9..6115d72 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -3,7 +3,7 @@ #include #include -#define REQ_ERROR(code,msg) response.send(Pistache::Http::Code::code, msg); return; +#define REQ_ERROR(code, msg) response.send(Pistache::Http::Code::code, msg); return; namespace rj = rapidjson; using namespace Pistache; @@ -11,7 +11,8 @@ using namespace Pistache; namespace daggy { void Server::init(int threads) { auto opts = Http::Endpoint::options() - .threads(threads); + .threads(threads) + .flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort); endpoint_.init(opts); createDescription(); } @@ -27,6 +28,10 @@ namespace daggy { endpoint_.shutdown(); } + uint16_t Server::getPort() const { + return endpoint_.getPort(); + } + void Server::createDescription() { desc_ .info() @@ -51,7 +56,7 @@ namespace daggy { auto versionPath = desc_.path("/v1"); - auto dagPath = versionPath.path("/dag"); + auto dagPath = versionPath.path("/dagrun"); // Run a DAG dagPath @@ -82,7 +87,7 @@ namespace daggy { * "tasks": {...} */ void Server::handleRunDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - if (! handleAuth(request, response)) return; + if (!handleAuth(request, response)) return; rj::Document doc; try { @@ -91,9 +96,9 @@ namespace daggy { REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what()); } - if (! doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } - if (! doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } - if (! doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } + if (!doc.IsObject()) { REQ_ERROR(Bad_Request, "Payload is not a dictionary."); } + if (!doc.HasMember("name")) { REQ_ERROR(Bad_Request, "DAG Run is missing a name."); } + if (!doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); } std::string runName = doc["name"].GetString(); std::vector tasks; @@ -119,17 +124,18 @@ namespace daggy { auto runID = logger_.startDAGRun(runName, tasks); auto dag = buildDAGFromTasks(tasks); - auto fut = runnerPool_.addTask([this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); + auto fut = runnerPool_.addTask( + [this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); } void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - if (! handleAuth(request, response)) return; + if (!handleAuth(request, response)) return; } void Server::handleGetDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { - if (! handleAuth(request, response)) return; + if (!handleAuth(request, response)) return; } void Server::handleReady(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { @@ -140,8 +146,8 @@ namespace daggy { * handleAuth will check any auth methods and handle any responses in the case of failed auth. If it returns * false, callers should cease handling the response */ - bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter & response) { - (void)response; + bool Server::handleAuth(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter &response) { + (void) response; return true; } } diff --git a/examples/sample_dag.json b/examples/sample_dag.json index b7fb4f9..251b400 100644 --- a/examples/sample_dag.json +++ b/examples/sample_dag.json @@ -1,4 +1,15 @@ { + "parameters": { + "SOURCE": [ + "a", + "b", + "c" + ], + "DATE": [ + "2021-01-01", + "2021-01-02" + ] + }, "tasks": [ { "name": "pull_data_a", @@ -8,7 +19,9 @@ "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_A", "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_A", "timeout_seconds": 30, - "children": [ "merge_data" ] + "children": [ + "merge_data" + ] }, { "name": "pull_data_b", @@ -18,7 +31,9 @@ "command": "/path/to/pull.sh --date {{DATE}} --source {{SOURCE}}_B", "verification_command": "/path/to/pull_verify.sh --date {{DATE}} --source {{SOURCE}}_B", "timeout_seconds": 30, - "children": [ "merge_data" ] + "children": [ + "merge_data" + ] }, { "name": "merge_data", diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp new file mode 100644 index 0000000..ecf0836 --- /dev/null +++ b/tests/unit_server.cpp @@ -0,0 +1,93 @@ +#include +#include +#include + +#include +#include + +#include "daggy/Server.hpp" +#include "daggy/executors/task/ForkingTaskExecutor.hpp" +#include "daggy/loggers/dag_run/OStreamLogger.hpp" + +Pistache::Http::Response +REQUEST(Pistache::Http::Experimental::Client &client, std::string url, std::string payload = "") { + Pistache::Http::Response response; + auto reqSpec = (payload.empty() ? client.get(url) : client.post(url)); + reqSpec.timeout(std::chrono::seconds(2)); + if (!payload.empty()) { + reqSpec.body(payload); + } + auto request = reqSpec.send(); + bool ok = false, error = false; + std::string msg; + request.then( + [&](Pistache::Http::Response rsp) { + ok = true; + response = rsp; + }, + [&](std::exception_ptr ptr) { + error = true; + try { + std::rethrow_exception(ptr); + } catch (std::exception &e) { + msg = e.what(); + } + } + ); + + Pistache::Async::Barrier barrier(request); + barrier.wait_for(std::chrono::seconds(2)); + if (error) { + throw std::runtime_error(msg); + } + return response; +} + +TEST_CASE("Server Basic Endpoints", "[server_basic]") { + std::stringstream ss; + daggy::executors::task::ForkingTaskExecutor executor(10); + daggy::loggers::dag_run::OStreamLogger logger(ss); + Pistache::Address listenSpec("localhost", Pistache::Port(0)); + + const size_t nDAGRunners = 10, + nWebThreads = 10; + + daggy::Server server(listenSpec, logger, executor, 10); + server.init(nWebThreads); + server.start(); + + Pistache::Http::Experimental::Client client; + const std::string host = "localhost:"; + const std::string baseURL = host + std::to_string(server.getPort()); + client.init(); + + SECTION("/ready endpoint") { + auto response = REQUEST(client, baseURL + "/ready"); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + } + + SECTION("dag submission") { + // submit a DAGRun + std::string dagRun = R"({ + "name": "unit_server", + "parameters": { "DIRS": [ "A", "B" ] }, + "tasks": [ + { "name": "touch", + "command": [ "/usr/bin/touch", "/tmp/{{DIRS}}" ] + }, + { + "name": "cat", + "command": [ "/usr/bin/cat", "/tmp/A", "/tmp/B" ] + "parents": [ "touch" ] + } + ] + } + )"; + + auto response = REQUEST(client, baseURL + "/v1/dagrun/", dagRun); + REQUIRE(response.code() == Pistache::Http::Code::Ok); + } + + server.shutdown(); + client.shutdown(); +} \ No newline at end of file diff --git a/utils/rest_server/rest_server.cpp b/utils/rest_server/rest_server.cpp index eaa7ef7..f244370 100644 --- a/utils/rest_server/rest_server.cpp +++ b/utils/rest_server/rest_server.cpp @@ -18,11 +18,8 @@ struct Options { size_t webThreads = 50; size_t dagThreads = 20; - // Pool name -> Executor Type + nWorkers - std::unordered_map> executors; - - // Logger Config - std::vector> loggers; + std::unique_ptr executor; + std::unique_ptr logger; }; int main(int argc, char **argv) { From 6bf376984bb120498b4fe68a1067a278eaccec27 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 19 Aug 2021 14:46:57 -0300 Subject: [PATCH 8/8] - Fixing request logic to avoid hangs when REQUIREs fail before the client has been shutdown. --- tests/unit_server.cpp | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/tests/unit_server.cpp b/tests/unit_server.cpp index ecf0836..2e39db3 100644 --- a/tests/unit_server.cpp +++ b/tests/unit_server.cpp @@ -10,7 +10,9 @@ #include "daggy/loggers/dag_run/OStreamLogger.hpp" Pistache::Http::Response -REQUEST(Pistache::Http::Experimental::Client &client, std::string url, std::string payload = "") { +REQUEST(std::string url, std::string payload = "") { + Pistache::Http::Experimental::Client client; + client.init(); Pistache::Http::Response response; auto reqSpec = (payload.empty() ? client.get(url) : client.post(url)); reqSpec.timeout(std::chrono::seconds(2)); @@ -37,6 +39,7 @@ REQUEST(Pistache::Http::Experimental::Client &client, std::string url, std::stri Pistache::Async::Barrier barrier(request); barrier.wait_for(std::chrono::seconds(2)); + client.shutdown(); if (error) { throw std::runtime_error(msg); } @@ -52,23 +55,19 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { const size_t nDAGRunners = 10, nWebThreads = 10; - daggy::Server server(listenSpec, logger, executor, 10); + daggy::Server server(listenSpec, logger, executor, nDAGRunners); server.init(nWebThreads); server.start(); - Pistache::Http::Experimental::Client client; const std::string host = "localhost:"; const std::string baseURL = host + std::to_string(server.getPort()); - client.init(); - SECTION("/ready endpoint") { - auto response = REQUEST(client, baseURL + "/ready"); + { + auto response = REQUEST(baseURL + "/ready"); REQUIRE(response.code() == Pistache::Http::Code::Ok); } - SECTION("dag submission") { - // submit a DAGRun - std::string dagRun = R"({ + std::string dagRun = R"({ "name": "unit_server", "parameters": { "DIRS": [ "A", "B" ] }, "tasks": [ @@ -81,13 +80,12 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") { "parents": [ "touch" ] } ] - } - )"; + })"; - auto response = REQUEST(client, baseURL + "/v1/dagrun/", dagRun); + { + auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun); REQUIRE(response.code() == Pistache::Http::Code::Ok); } server.shutdown(); - client.shutdown(); } \ No newline at end of file