Adding JSON parsing helpers to Utilities.
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,3 +1,3 @@
|
|||||||
build
|
build
|
||||||
.cache
|
.cache
|
||||||
/cmake-build-debug/
|
cmake-build-*
|
||||||
|
|||||||
10
README.md
10
README.md
@@ -33,3 +33,13 @@ Architecture
|
|||||||
- Accepts task lists and parameters
|
- Accepts task lists and parameters
|
||||||
- Runs and monitors DAGs
|
- Runs and monitors DAGs
|
||||||
|
|
||||||
|
Flow
|
||||||
|
==
|
||||||
|
|
||||||
|
1. DAG Generated from JSON spec
|
||||||
|
2. Execution Pool Selected
|
||||||
|
3. DAGRun created from storage
|
||||||
|
4. DAGExecutor runs with
|
||||||
|
- Logger object
|
||||||
|
- Execution Pool
|
||||||
|
- DAG definition
|
||||||
@@ -2,7 +2,7 @@ include(ExternalProject)
|
|||||||
# Download RapidJSON
|
# Download RapidJSON
|
||||||
ExternalProject_Add(
|
ExternalProject_Add(
|
||||||
rapidjson
|
rapidjson
|
||||||
PREFIX "vendor/rapidjson"
|
PREFIX "third_party/rapidjson"
|
||||||
GIT_REPOSITORY "https://github.com/Tencent/rapidjson.git"
|
GIT_REPOSITORY "https://github.com/Tencent/rapidjson.git"
|
||||||
GIT_TAG f54b0e47a08782a6131cc3d60f94d038fa6e0a51
|
GIT_TAG f54b0e47a08782a6131cc3d60f94d038fa6e0a51
|
||||||
TIMEOUT 10
|
TIMEOUT 10
|
||||||
@@ -18,4 +18,4 @@ ExternalProject_Add(
|
|||||||
|
|
||||||
# Prepare RapidJSON (RapidJSON is a header-only library)
|
# Prepare RapidJSON (RapidJSON is a header-only library)
|
||||||
ExternalProject_Get_Property(rapidjson source_dir)
|
ExternalProject_Get_Property(rapidjson source_dir)
|
||||||
set(RAPIDJSON_INCLUDE_DIR ${source_dir}/include)
|
set(RAPIDJSON_INCLUDE_DIR ${source_dir}/include)
|
||||||
@@ -3,8 +3,8 @@ project(daggy)
|
|||||||
#ExternalProject_Add_StepDependencies(pistache_extern build)
|
#ExternalProject_Add_StepDependencies(pistache_extern build)
|
||||||
|
|
||||||
file(GLOB SOURCES src/*.cpp src/**/*.cpp)
|
file(GLOB SOURCES src/*.cpp src/**/*.cpp)
|
||||||
add_library(${PROJECT_NAME} STATIC ${SOURCES} src/Scheduler.cpp)
|
add_library(${PROJECT_NAME} STATIC ${SOURCES})
|
||||||
include_directories(${PISTACHE_INCLUDE_DIR})
|
include_directories(${PISTACHE_INCLUDE_DIR})
|
||||||
target_include_directories(${PROJECT_NAME} PUBLIC include)
|
target_include_directories(${PROJECT_NAME} PUBLIC include)
|
||||||
target_link_libraries(${PROJECT_NAME} pistache pthread)
|
target_link_libraries(${PROJECT_NAME} pistache pthread)
|
||||||
add_dependencies(${PROJECT_NAME} PistacheDownload)
|
add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson)
|
||||||
22
daggy/include/daggy/Utilities.hpp
Normal file
22
daggy/include/daggy/Utilities.hpp
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <string>
|
||||||
|
#include <variant>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
|
#include <rapidjson/document.h>
|
||||||
|
|
||||||
|
#include "Task.hpp"
|
||||||
|
|
||||||
|
namespace rj = rapidjson;
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
|
||||||
|
using ParameterValues = std::unordered_map<std::string, ParameterValue>;
|
||||||
|
|
||||||
|
ParameterValues parseParameters(const std::string & jsonSpec);
|
||||||
|
ParameterValues parseParameters(const rj::Document & spec);
|
||||||
|
// std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters);
|
||||||
|
// std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters);
|
||||||
|
}
|
||||||
67
daggy/src/Utilities.cpp
Normal file
67
daggy/src/Utilities.cpp
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
#include <daggy/Utilities.hpp>
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
ParameterValues parseParameters(const std::string & jsonSpec) {
|
||||||
|
rj::Document doc;
|
||||||
|
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
|
||||||
|
if (! parseResult) {
|
||||||
|
throw std::runtime_error("Parameters spec is not valid JSON");
|
||||||
|
}
|
||||||
|
return parseParameters(doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
ParameterValues parseParameters(const rj::Document & spec) {
|
||||||
|
std::unordered_map<std::string, ParameterValue> parameters;
|
||||||
|
if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); }
|
||||||
|
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
|
||||||
|
if (! it->name.IsString()) {
|
||||||
|
throw std::runtime_error("All keys must be strings.");
|
||||||
|
}
|
||||||
|
if (it->value.IsArray()) {
|
||||||
|
std::vector<std::string> values;
|
||||||
|
for (size_t i = 0; i < it->value.Size(); ++i) {
|
||||||
|
if (! it->value[i].IsString()) {
|
||||||
|
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + " is not a string.");
|
||||||
|
}
|
||||||
|
values.emplace_back(it->value[i].GetString());
|
||||||
|
}
|
||||||
|
parameters[it->name.GetString()] = values;
|
||||||
|
} else if (it->value.IsString()) {
|
||||||
|
parameters[it->name.GetString()] = it->value.GetString();
|
||||||
|
} else {
|
||||||
|
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parameters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
std::vector<Task> buildTasks(const std::string & jsonSpec) {
|
||||||
|
rj::Document doc;
|
||||||
|
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
|
||||||
|
if (! parseResult) {
|
||||||
|
throw std::runtime_error("Unable to parse spec: ");
|
||||||
|
}
|
||||||
|
return buildTasks(doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<Task> buildTasks(const rj::Document & spec) {
|
||||||
|
std::vector<Task> tasks;
|
||||||
|
if (!spec.IsObject()) { throw std::runtime_error("Spec is not a JSON dictionary"); }
|
||||||
|
|
||||||
|
// Parameter Parsing
|
||||||
|
auto parameters = parseParameters(spec);
|
||||||
|
// Tasks
|
||||||
|
if (spec.HasMember("tasks")) {
|
||||||
|
auto & sTasks = spec["tasks"];
|
||||||
|
if (! sTasks.IsArray()) {
|
||||||
|
throw std::runtime_error("tasks member must be an array");
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < sTasks.Size(); ++i) {
|
||||||
|
tasks.push_back(parseTask(sTasks[i]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tasks;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
}
|
||||||
@@ -1,12 +1,4 @@
|
|||||||
{
|
{
|
||||||
"config": {
|
|
||||||
"timeout": 3600,
|
|
||||||
"executor": "local"
|
|
||||||
},
|
|
||||||
"parameter_list": {
|
|
||||||
"DATE": [],
|
|
||||||
"SOURCE": ""
|
|
||||||
},
|
|
||||||
"tasks": [
|
"tasks": [
|
||||||
{
|
{
|
||||||
"name": "pull_data_a",
|
"name": "pull_data_a",
|
||||||
|
|||||||
@@ -2,4 +2,4 @@ project(tests)
|
|||||||
file(GLOB UNIT_TESTS unit_*.cpp)
|
file(GLOB UNIT_TESTS unit_*.cpp)
|
||||||
file(GLOB INTEGRATION_TESTS int_*.cpp)
|
file(GLOB INTEGRATION_TESTS int_*.cpp)
|
||||||
add_executable(tests main.cpp ${UNIT_TESTS} ${INTEGRATION_TESTS})
|
add_executable(tests main.cpp ${UNIT_TESTS} ${INTEGRATION_TESTS})
|
||||||
target_link_libraries(tests daggy stdc++fs)
|
target_link_libraries(tests daggy stdc++fs)
|
||||||
31
tests/unit_utilities.cpp
Normal file
31
tests/unit_utilities.cpp
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
#include <iostream>
|
||||||
|
#include <filesystem>
|
||||||
|
#include <fstream>
|
||||||
|
|
||||||
|
#include "catch.hpp"
|
||||||
|
|
||||||
|
#include "daggy/Utilities.hpp"
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
|
TEST_CASE("Parameter Parsing", "[utilities_parse_parameters]") {
|
||||||
|
SECTION("Basic Parse") {
|
||||||
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
|
||||||
|
auto params = daggy::parseParameters(testParams);
|
||||||
|
REQUIRE(params.size() == 2);
|
||||||
|
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["DATE"]));
|
||||||
|
REQUIRE(std::holds_alternative<std::string>(params["SOURCE"]));
|
||||||
|
}
|
||||||
|
SECTION("Invalid JSON") {
|
||||||
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"};
|
||||||
|
REQUIRE_THROWS(daggy::parseParameters(testParams));
|
||||||
|
}
|
||||||
|
SECTION("Non-string Keys") {
|
||||||
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], 6: "name"})"};
|
||||||
|
REQUIRE_THROWS(daggy::parseParameters(testParams));
|
||||||
|
}
|
||||||
|
SECTION("Non-array/Non-string values") {
|
||||||
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": {"name": "kevin"}})"};
|
||||||
|
REQUIRE_THROWS(daggy::parseParameters(testParams));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user