- Refactoring struct (de)serialization

This commit is contained in:
Ian Roddis
2021-08-09 10:33:49 -03:00
parent 7cd9fc5e6e
commit 0516bde09f
7 changed files with 230 additions and 215 deletions

View File

@@ -1,22 +0,0 @@
#pragma once
#include <vector>
#include <string>
#include <variant>
#include <unordered_map>
#include <rapidjson/document.h>
#include "Logger.hpp"
#include "TaskExecutor.hpp"
#include "Task.hpp"
namespace rj = rapidjson;
namespace daggy {
ParameterValues parseParameters(const std::string & jsonSpec);
ParameterValues parseParameters(const rj::Document & spec);
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters = {});
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters = {});
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
}

View File

@@ -0,0 +1,24 @@
#pragma once
#include <vector>
#include <string>
#include <variant>
#include <unordered_map>
#include <rapidjson/document.h>
#include "Logger.hpp"
#include "TaskExecutor.hpp"
#include "Task.hpp"
namespace rj = rapidjson;
namespace daggy {
// Parameters
ParameterValues parametersFromJSON(const std::string & jsonSpec);
ParameterValues parametersFromJSON(const rj::Document & spec);
// Tasks
std::vector<Task> tasksFromJSON(const std::string & jsonSpec, const ParameterValues & parameters = {});
std::vector<Task> tasksFromJSON(const rj::Document & spec, const ParameterValues & parameters = {});
}

View File

@@ -13,9 +13,9 @@
#include "Defines.hpp"
namespace daggy {
// DAG execution
// DAG vertex IDs should correspond to the position of tasks in vector. e.g. Vertex ID 0 corresponds to tasks[0]
// I'm not crazy about this loose coupling, but
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
// Blocking call
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
TaskExecutor & executor,

141
daggy/src/Serialization.cpp Normal file
View File

@@ -0,0 +1,141 @@
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
namespace daggy {
ParameterValues parametersFromJSON(const std::string & jsonSpec) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) {
throw std::runtime_error("Parameters spec is not valid JSON");
}
return parametersFromJSON(doc);
}
ParameterValues parametersFromJSON(const rj::Document & spec) {
std::unordered_map<std::string, ParameterValue> parameters;
if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); }
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (! it->name.IsString()) {
throw std::runtime_error("All keys must be strings.");
}
std::string name = std::string{"{{"} + it->name.GetString() + "}}";
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {
if (! it->value[i].IsString()) {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + " is not a string.");
}
values.emplace_back(it->value[i].GetString());
}
parameters[name] = values;
} else if (it->value.IsString()) {
parameters[name] = it->value.GetString();
} else {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array.");
}
}
return parameters;
}
std::vector<Task> tasksFromJSON(const std::string & jsonSpec, const ParameterValues & parameters) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) {
throw std::runtime_error("Unable to parse spec: ");
}
return tasksFromJSON(doc, parameters);
}
std::vector<Task> tasksFromJSON(const rj::Document & spec, const ParameterValues & parameters) {
std::vector<Task> tasks;
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); }
const std::vector<std::string> reqFields{"name", "command"};
std::unordered_map<std::string, std::vector<std::string>> childrenMap;
// Maps child -> parent
std::unordered_map<std::string, std::vector<std::string>> parentMap;
std::unordered_map<std::string, size_t> taskIndex;
// Tasks
for (size_t i = 0; i < spec.Size(); ++i) {
if (! spec[i].IsObject()) {
throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary.");
}
const auto & taskSpec = spec[i].GetObject();
for (const auto & reqField : reqFields) {
if (! taskSpec.HasMember(reqField.c_str())) {
throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField);
}
}
// Grab the standard fields with defaults;
std::string name = taskSpec["name"].GetString();
taskIndex[name] = i;
uint8_t maxRetries = 0;
if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0;
if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
// Children / parents
std::vector<std::string> children;
if (taskSpec.HasMember("children")) {
const auto & specChildren = taskSpec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
children.emplace_back(specChildren[c].GetString());
}
}
if (taskSpec.HasMember("parents")) {
const auto & specParents = taskSpec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
parentMap[name].emplace_back(specParents[c].GetString());
}
}
// Build out the commands
std::vector<std::string> command;
for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) {
command.emplace_back(taskSpec["command"][cmd].GetString());
}
auto commands = expandCommands(command, parameters);
// Create the tasks
auto & taskNames = childrenMap[name];
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = name + "_" + std::to_string(tid);
taskNames.push_back(taskName);
tasks.emplace_back(Task {
.name = name + "_" + std::to_string(tid),
.command = commands[tid],
.maxRetries = maxRetries,
.retryIntervalSeconds = retryIntervalSeconds,
.children = children
});
}
}
// Update any missing child -> parent relationship
for (auto & task : tasks) {
auto pit = parentMap.find(task.name);
if (pit == parentMap.end()) { continue; }
for (const auto & parent : pit->second) {
tasks[taskIndex[parent]].children.emplace_back(task.name);
}
}
// At the end, replace the names of the children with all the expanded versions
for (auto & task : tasks) {
std::vector<std::string> children;
for (const auto & child : task.children) {
auto & newChildren = childrenMap[child];
std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children));
}
task.children.swap(children);
}
return tasks;
}
}

View File

@@ -1,41 +1,6 @@
#include <daggy/Utilities.hpp>
namespace daggy {
ParameterValues parseParameters(const std::string & jsonSpec) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) {
throw std::runtime_error("Parameters spec is not valid JSON");
}
return parseParameters(doc);
}
ParameterValues parseParameters(const rj::Document & spec) {
std::unordered_map<std::string, ParameterValue> parameters;
if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); }
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (! it->name.IsString()) {
throw std::runtime_error("All keys must be strings.");
}
std::string name = std::string{"{{"} + it->name.GetString() + "}}";
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {
if (! it->value[i].IsString()) {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + " is not a string.");
}
values.emplace_back(it->value[i].GetString());
}
parameters[name] = values;
} else if (it->value.IsString()) {
parameters[name] = it->value.GetString();
} else {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array.");
}
}
return parameters;
}
std::vector<std::vector<std::string>>
expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters) {
std::vector<std::vector<std::string>> commands{ {} };
@@ -65,107 +30,6 @@ namespace daggy {
return commands;
}
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters) {
std::vector<Task> tasks;
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); }
const std::vector<std::string> reqFields{"name", "command"};
std::unordered_map<std::string, std::vector<std::string>> childrenMap;
// Maps child -> parent
std::unordered_map<std::string, std::vector<std::string>> parentMap;
std::unordered_map<std::string, size_t> taskIndex;
// Tasks
for (size_t i = 0; i < spec.Size(); ++i) {
if (! spec[i].IsObject()) {
throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary.");
}
const auto & taskSpec = spec[i].GetObject();
for (const auto & reqField : reqFields) {
if (! taskSpec.HasMember(reqField.c_str())) {
throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField);
}
}
// Grab the standard fields with defaults;
std::string name = taskSpec["name"].GetString();
taskIndex[name] = i;
uint8_t maxRetries = 0;
if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0;
if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
// Children / parents
std::vector<std::string> children;
if (taskSpec.HasMember("children")) {
const auto & specChildren = taskSpec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
children.emplace_back(specChildren[c].GetString());
}
}
if (taskSpec.HasMember("parents")) {
const auto & specParents = taskSpec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
parentMap[name].emplace_back(specParents[c].GetString());
}
}
// Build out the commands
std::vector<std::string> command;
for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) {
command.emplace_back(taskSpec["command"][cmd].GetString());
}
auto commands = expandCommands(command, parameters);
// Create the tasks
auto & taskNames = childrenMap[name];
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = name + "_" + std::to_string(tid);
taskNames.push_back(taskName);
tasks.emplace_back(Task {
.name = name + "_" + std::to_string(tid),
.command = commands[tid],
.maxRetries = maxRetries,
.retryIntervalSeconds = retryIntervalSeconds,
.children = children
});
}
}
// Update any missing child -> parent relationship
for (auto & task : tasks) {
auto pit = parentMap.find(task.name);
if (pit == parentMap.end()) { continue; }
for (const auto & parent : pit->second) {
tasks[taskIndex[parent]].children.emplace_back(task.name);
}
}
// At the end, replace the names of the children with all the expanded versions
for (auto & task : tasks) {
std::vector<std::string> children;
for (const auto & child : task.children) {
auto & newChildren = childrenMap[child];
std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children));
}
task.children.swap(children);
}
return tasks;
}
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters) {
rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) {
throw std::runtime_error("Unable to parse spec: ");
}
return buildTasks(doc, parameters);
}
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
TaskExecutor & executor,