From 54e8170c68b643802870b68bbfd1317e63a3ac18 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 2 Aug 2021 11:31:46 -0300 Subject: [PATCH] Adding JSON parsing helpers to Utilities. --- .gitignore | 2 +- README.md | 10 +++++ cmake/rapidjson.cmake | 4 +- daggy/CMakeLists.txt | 4 +- daggy/include/daggy/Utilities.hpp | 22 ++++++++++ daggy/src/Utilities.cpp | 67 +++++++++++++++++++++++++++++++ examples/sample_dag.json | 8 ---- tests/CMakeLists.txt | 2 +- tests/unit_utilities.cpp | 31 ++++++++++++++ 9 files changed, 136 insertions(+), 14 deletions(-) create mode 100644 daggy/include/daggy/Utilities.hpp create mode 100644 daggy/src/Utilities.cpp create mode 100644 tests/unit_utilities.cpp diff --git a/.gitignore b/.gitignore index aff690a..08f2b60 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ build .cache -/cmake-build-debug/ +cmake-build-* diff --git a/README.md b/README.md index 69078c4..aca019b 100644 --- a/README.md +++ b/README.md @@ -33,3 +33,13 @@ Architecture - Accepts task lists and parameters - Runs and monitors DAGs +Flow +== + +1. DAG Generated from JSON spec +2. Execution Pool Selected +3. DAGRun created from storage +4. DAGExecutor runs with + - Logger object + - Execution Pool + - DAG definition \ No newline at end of file diff --git a/cmake/rapidjson.cmake b/cmake/rapidjson.cmake index 7febd9d..342f537 100644 --- a/cmake/rapidjson.cmake +++ b/cmake/rapidjson.cmake @@ -2,7 +2,7 @@ include(ExternalProject) # Download RapidJSON ExternalProject_Add( rapidjson - PREFIX "vendor/rapidjson" + PREFIX "third_party/rapidjson" GIT_REPOSITORY "https://github.com/Tencent/rapidjson.git" GIT_TAG f54b0e47a08782a6131cc3d60f94d038fa6e0a51 TIMEOUT 10 @@ -18,4 +18,4 @@ ExternalProject_Add( # Prepare RapidJSON (RapidJSON is a header-only library) ExternalProject_Get_Property(rapidjson source_dir) -set(RAPIDJSON_INCLUDE_DIR ${source_dir}/include) +set(RAPIDJSON_INCLUDE_DIR ${source_dir}/include) \ No newline at end of file diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index 866a1a5..74e51e2 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -3,8 +3,8 @@ project(daggy) #ExternalProject_Add_StepDependencies(pistache_extern build) file(GLOB SOURCES src/*.cpp src/**/*.cpp) -add_library(${PROJECT_NAME} STATIC ${SOURCES} src/Scheduler.cpp) +add_library(${PROJECT_NAME} STATIC ${SOURCES}) include_directories(${PISTACHE_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} pistache pthread) -add_dependencies(${PROJECT_NAME} PistacheDownload) +add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson) \ No newline at end of file diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp new file mode 100644 index 0000000..5d55476 --- /dev/null +++ b/daggy/include/daggy/Utilities.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include "Task.hpp" + +namespace rj = rapidjson; + +namespace daggy { + using ParameterValue = std::variant>; + using ParameterValues = std::unordered_map; + + ParameterValues parseParameters(const std::string & jsonSpec); + ParameterValues parseParameters(const rj::Document & spec); + // std::vector buildTasks(const std::string & jsonSpec, const ParameterValues & parameters); + // std::vector buildTasks(const rj::Document & spec, const ParameterValues & parameters); +} \ No newline at end of file diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp new file mode 100644 index 0000000..c517b5b --- /dev/null +++ b/daggy/src/Utilities.cpp @@ -0,0 +1,67 @@ +#include + +namespace daggy { + ParameterValues parseParameters(const std::string & jsonSpec) { + rj::Document doc; + rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); + if (! parseResult) { + throw std::runtime_error("Parameters spec is not valid JSON"); + } + return parseParameters(doc); + } + + ParameterValues parseParameters(const rj::Document & spec) { + std::unordered_map parameters; + if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); } + for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { + if (! it->name.IsString()) { + throw std::runtime_error("All keys must be strings."); + } + if (it->value.IsArray()) { + std::vector values; + for (size_t i = 0; i < it->value.Size(); ++i) { + if (! it->value[i].IsString()) { + throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + " is not a string."); + } + values.emplace_back(it->value[i].GetString()); + } + parameters[it->name.GetString()] = values; + } else if (it->value.IsString()) { + parameters[it->name.GetString()] = it->value.GetString(); + } else { + throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array."); + } + } + return parameters; + } + + /* + std::vector buildTasks(const std::string & jsonSpec) { + rj::Document doc; + rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); + if (! parseResult) { + throw std::runtime_error("Unable to parse spec: "); + } + return buildTasks(doc); + } + + std::vector buildTasks(const rj::Document & spec) { + std::vector tasks; + if (!spec.IsObject()) { throw std::runtime_error("Spec is not a JSON dictionary"); } + + // Parameter Parsing + auto parameters = parseParameters(spec); + // Tasks + if (spec.HasMember("tasks")) { + auto & sTasks = spec["tasks"]; + if (! sTasks.IsArray()) { + throw std::runtime_error("tasks member must be an array"); + } + for (size_t i = 0; i < sTasks.Size(); ++i) { + tasks.push_back(parseTask(sTasks[i])); + } + } + return tasks; + } + */ +} \ No newline at end of file diff --git a/examples/sample_dag.json b/examples/sample_dag.json index 10041e9..0fe8c32 100644 --- a/examples/sample_dag.json +++ b/examples/sample_dag.json @@ -1,12 +1,4 @@ { - "config": { - "timeout": 3600, - "executor": "local" - }, - "parameter_list": { - "DATE": [], - "SOURCE": "" - }, "tasks": [ { "name": "pull_data_a", diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 46b9ac3..6d3c468 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,4 +2,4 @@ project(tests) file(GLOB UNIT_TESTS unit_*.cpp) file(GLOB INTEGRATION_TESTS int_*.cpp) add_executable(tests main.cpp ${UNIT_TESTS} ${INTEGRATION_TESTS}) -target_link_libraries(tests daggy stdc++fs) +target_link_libraries(tests daggy stdc++fs) \ No newline at end of file diff --git a/tests/unit_utilities.cpp b/tests/unit_utilities.cpp new file mode 100644 index 0000000..f4127c1 --- /dev/null +++ b/tests/unit_utilities.cpp @@ -0,0 +1,31 @@ +#include +#include +#include + +#include "catch.hpp" + +#include "daggy/Utilities.hpp" + +namespace fs = std::filesystem; + +TEST_CASE("Parameter Parsing", "[utilities_parse_parameters]") { + SECTION("Basic Parse") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; + auto params = daggy::parseParameters(testParams); + REQUIRE(params.size() == 2); + REQUIRE(std::holds_alternative>(params["DATE"])); + REQUIRE(std::holds_alternative(params["SOURCE"])); + } + SECTION("Invalid JSON") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"}; + REQUIRE_THROWS(daggy::parseParameters(testParams)); + } + SECTION("Non-string Keys") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], 6: "name"})"}; + REQUIRE_THROWS(daggy::parseParameters(testParams)); + } + SECTION("Non-array/Non-string values") { + std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": {"name": "kevin"}})"}; + REQUIRE_THROWS(daggy::parseParameters(testParams)); + } +} \ No newline at end of file