- Making task children unordered_set to avoid multiple edges between parent and child.

This commit is contained in:
Ian Roddis
2021-08-10 12:42:54 -03:00
parent 621467dd5a
commit a152588368
3 changed files with 27 additions and 17 deletions

View File

@@ -3,6 +3,7 @@
#include <cstdint>
#include <string>
#include <vector>
#include <unordered_set>
namespace daggy {
struct Task {
@@ -10,6 +11,6 @@ namespace daggy {
std::vector<std::string> command;
uint32_t maxRetries;
uint32_t retryIntervalSeconds; // Time to wait between retries
std::vector<std::string> children;
std::unordered_set<std::string> children;
};
}

View File

@@ -87,11 +87,11 @@ namespace daggy {
"retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
// Children / parents
std::vector<std::string> children;
std::unordered_set<std::string> children;
if (taskSpec.HasMember("children")) {
const auto &specChildren = taskSpec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
children.emplace_back(specChildren[c].GetString());
children.insert(specChildren[c].GetString());
}
}
if (taskSpec.HasMember("parents")) {
@@ -129,16 +129,16 @@ namespace daggy {
if (pit == parentMap.end()) { continue; }
for (const auto &parent : pit->second) {
tasks[taskIndex[parent]].children.emplace_back(task.name);
tasks[taskIndex[parent]].children.insert(task.name);
}
}
// At the end, replace the names of the children with all the expanded versions
for (auto &task : tasks) {
std::vector<std::string> children;
std::unordered_set<std::string> children;
for (const auto &child : task.children) {
auto &newChildren = childrenMap[child];
std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children));
std::copy(newChildren.begin(), newChildren.end(), std::inserter(children, children.end()));
}
task.children.swap(children);
}
@@ -150,6 +150,8 @@ namespace daggy {
// So we'll shortcut and generate the JSON directly.
std::string taskToJSON(const Task &task) {
std::stringstream ss;
bool first = false;
ss << "{"
<< R"("name": )" << std::quoted(task.name) << ','
<< R"("maxRetries": )" << task.maxRetries << ','
@@ -157,16 +159,20 @@ namespace daggy {
// Commands
ss << R"("command": [)";
for (auto it = task.command.begin(); it != task.command.end(); ++it) {
ss << std::quoted(*it);
if (it != task.command.end() - 1) ss << ", ";
first = true;
for (const auto &part : task.command) {
if (!first) ss << ',';
ss << std::quoted(part);
first = false;
}
ss << "],";
ss << R"("children": [)";
for (auto it = task.children.begin(); it != task.children.end(); ++it) {
ss << std::quoted(*it);
if (it != task.children.end() - 1) ss << ", ";
first = true;
for (const auto &child : task.children) {
if (!first) ss << ',';
ss << std::quoted(child);
first = false;
}
ss << "]";
@@ -179,9 +185,11 @@ namespace daggy {
ss << "[";
for (auto it = tasks.begin(); it != tasks.end(); ++it) {
ss << taskToJSON(*it);
if (it != tasks.end() - 1) ss << ", ";
bool first = true;
for (const auto &task : tasks) {
if (!first) ss << ',';
ss << taskToJSON(task);
first = false;
}
ss << "]";

View File

@@ -44,6 +44,7 @@ namespace daggy {
fs::create_directories(runRoot);
// Create meta.json with DAGRun Name and task definitions
return runID;
}
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {}
@@ -53,7 +54,7 @@ namespace daggy {
void FileSystemLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {}
// Querying
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) {}
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) { return {}; }
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) {}
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) { return {}; }
}