Adding support for isGenerator tasks

- Changing how DAG is represented, both in code and how DAGs are defined
  in JSON.
- Removing std::vector<Task> representation in favour of a map that will
  enforce unique task names
- Task names now have a name (generated), and a definedName.
- Adding support to loggers to add tasks after a DAGRun has been
  initialized.
This commit is contained in:
Ian Roddis
2021-08-30 22:05:37 -03:00
parent dd6159dda8
commit 2c00001e0b
22 changed files with 672 additions and 396 deletions

132
README.md
View File

@@ -17,12 +17,12 @@ graph LR
Pull_A-->Transform_A; Pull_A-->Transform_A;
Pull_B-->Transform_B; Pull_B-->Transform_B;
Pull_C-->Transform_C; Pull_C-->Transform_C;
Transform_A-->Derive_Data_AB; Transform_A-->Derive_Data_AB;
Transform_B-->Derive_Data_AB; Transform_B-->Derive_Data_AB;
Derive_Data_AB-->Derive_Data_ABC; Derive_Data_AB-->Derive_Data_ABC;
Transform_C-->Derive_Data_ABC; Transform_C-->Derive_Data_ABC;
Derive_Data_ABC-->Report; Derive_Data_ABC-->Report;
``` ```
@@ -65,14 +65,15 @@ Basic Definition
A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes: A DAG Run definition consists of a dictionary that defines a set of tasks. Each task has the following attributes:
| Attribute | Required | Description | | Attribute | Required | Description |
|------------|------------|--------------------------------------------------------| |--------------|--------------|---------------------------------------------------------------|
| name | Yes | Name of this task. Must be unique. | | name | Yes | Name of this task. Must be unique. |
| command | Yes | The command to execute | | command | Yes | The command to execute |
| maxRetries | No | If a task fails, how many times to retry (default: 0) | | maxRetries | No | If a task fails, how many times to retry (default: 0) |
| retry | No | How many seconds to wait between retries. | | retry | No | How many seconds to wait between retries. |
| children | No | List of names of tasks that depend on this task | | children | No | List of names of tasks that depend on this task |
| parents | No | List of names of tasks that this task depends on | | parents | No | List of names of tasks that this task depends on |
| isGenerator | No | The output of this task generates additional task definitions |
Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you Defining both `parents` and `children` are not required; one or the other is sufficient. Both are supported to allow you
to define your task dependencies in the way that is most natural to how you think. to define your task dependencies in the way that is most natural to how you think.
@@ -81,9 +82,8 @@ Below is an example DAG Run submission:
```json ```json
{ {
"tasks": [ "tasks": {
{ "task_one": {
"name": "task_one",
"command": [ "command": [
"/usr/bin/touch", "/usr/bin/touch",
"/tmp/somefile" "/tmp/somefile"
@@ -91,8 +91,7 @@ Below is an example DAG Run submission:
"maxRetries": 3, "maxRetries": 3,
"retryIntervalSeconds": 30 "retryIntervalSeconds": 30
}, },
{ "task_two": {
"name": "task_two",
"command": [ "command": [
"/usr/bin/touch", "/usr/bin/touch",
"/tmp/someotherfile" "/tmp/someotherfile"
@@ -103,7 +102,7 @@ Below is an example DAG Run submission:
"task_one" "task_one"
] ]
} }
] }
} }
``` ```
@@ -122,9 +121,8 @@ For instance:
"DIRECTORY": "/var/tmp", "DIRECTORY": "/var/tmp",
"FILE": "somefile" "FILE": "somefile"
}, },
"tasks": [ "tasks": {
{ "task_one": {
"name": "task_one",
"command": [ "command": [
"/usr/bin/touch", "/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}" "{{DIRECTORY}}/{{FILE}}"
@@ -132,9 +130,9 @@ For instance:
"maxRetries": 3, "maxRetries": 3,
"retryIntervalSeconds": 30 "retryIntervalSeconds": 30
} }
] }
} }
``` ```
`task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be `task_one`'s command, when run, will touch `/var/tmp/somefile`, since the values of `DIRECTORY` and `FILE` will be
populated from the `taskParameters` values. populated from the `taskParameters` values.
@@ -155,31 +153,28 @@ Example:
"2021-03-01" "2021-03-01"
] ]
}, },
"tasks": [ "tasks": {
{ "populate_inputs": {
"name": "populate_inputs",
"command": [ "command": [
"/usr/bin/touch", "/usr/bin/touch",
"{{DIRECTORY}}/{{FILE}}" "{{DIRECTORY}}/{{FILE}}"
] ]
}, },
{ "calc_date": {
"name": "calc_date",
"command": [ "command": [
"/path/to/calculator", "/path/to/calculator",
"{{DIRECTORY}}/{{FILE}}", "{{DIRECTORY}}/{{FILE}}",
"{{DATE}}" "{{DATE}}"
] ]
}, },
{ "generate_report": {
"name": "generate_report",
"command": [ "command": [
"/path/to/generator" "/path/to/generator"
] ]
} }
] }
} }
``` ```
Conceptually, this DAG looks like this: Conceptually, this DAG looks like this:
@@ -205,6 +200,81 @@ graph LR
- `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01` - `calc_date_2` will have the command `/path/to/calculator /var/tmp/somefile 2021-02-01`
- `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01` - `calc_date_3` will have the command `/path/to/calculator /var/tmp/somefile 2021-03-01`
Tasks Generating Tasks
----------------------
Some DAG structures cannot be known ahead of time, but only at runtime. For instance, if a job pulls multiple files
from a source, each of which can be processed independently, it would be nice if the DAG could modify itself on the fly
to accomodate that request.
Enter the `generator` task. If a task is defined with `"isGenerator": true`, the output of the task is assumed to be
a JSON dictionary containing new tasks to run. The new tasks will go through parameter expansion as described above,
and can freely define their dependencies the same way.
**NB:** Generated tasks won't have any children dependencies unless you define them. If there are parameterized
dependencies, you must use the name of the original task (e.g. use `calc_date`, not `calc_date_1`) to add a dependency.
**NB:** If you add a child dependency to a task that has already completed, weird things will happen. Don't do it.
```json
{
"tasks": {
"pull_files": {
"command": [
"/path/to/puller/script",
"{{DATE}}"
],
"isGenerator": true,
children: [ "generate_report" ]
},
"generate_report": {
"command": [
"/path/to/generator"
]
}
}
}
```
```mermaid
graph LR
pull_files-->generate_report
```
The output of the puller task might be:
```json
{
"calc_date_a": {
"command": [
"/path/to/calculator",
"/path/to/data/file/a"
],
"children": ["generate_report"]
},
"calc_date_b": {
"command": [
"/path/to/calculator",
"/path/to/data/file/b"
],
"children": ["generate_report"]
}
}
```
Once the first task runs, its output is parse as additional tasks to run. The new DAG will look like this:
```mermaid
graph LR
pull_files-->generate_report
pull_files-->calc_file_a
pull_files-->calc_file_b
calc_file_a-->generate_report
calc_file_b-->generate_report
```
Note that it was important that `generate_report` depend on `pull_files`, otherwise the two task would
run concurrently, and the `generate_report` wouldn't have any files to report on.
Execution Parameters Execution Parameters
-- --
(future work) (future work)
@@ -217,4 +287,4 @@ jobs on slurm with a specific set of restrictions, or allow for local execution
| Attribute | Description | | Attribute | Description |
|-----------|-------------| |-----------|-------------|
| pool | Names the executor the DAG should run on | | pool | Names the executor the DAG should run on |
| poolParameters | Any parameters the executor accepts that might modify how a task is run | | poolParameters | Any parameters the executor accepts that might modify how a task is run |

View File

@@ -9,6 +9,11 @@ Tasks
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
separated list separated list
- Allow for tasks to define next tasks - Allow for tasks to define next tasks
- Refactor [de]serialization so that a task can be parsed by itself
- Add notation of parameterValues
- Tasks are now refered by two names:
- baseName is the original name in the spec
- name is the individual tasks
- Add execution gates - Add execution gates
- Executors - Executors
- [ ] Slurm Executor - [ ] Slurm Executor

View File

@@ -8,6 +8,7 @@
#include <iterator> #include <iterator>
#include <functional> #include <functional>
#include <optional> #include <optional>
#include <sstream>
#include "Defines.hpp" #include "Defines.hpp"
@@ -18,27 +19,33 @@
namespace daggy { namespace daggy {
template<typename K, typename V>
struct Vertex { struct Vertex {
RunState state; RunState state;
uint32_t depCount; uint32_t depCount;
std::unordered_set<size_t> children; K key;
V data;
std::unordered_set<K> children;
}; };
using Edge = std::pair<size_t, size_t>;
template<typename K, typename V>
class DAG { class DAG {
using Edge = std::pair<K, K>;
public: public:
// Vertices // Vertices
size_t addVertex(); void addVertex(K id, V data);
const std::vector<Vertex> &getVertices(); const std::vector<Vertex<K, V>> &getVertices();
// Edges // Edges
void addEdge(const size_t src, const size_t dst); void addEdge(const K &src, const K &dst);
void dropEdge(const size_t src, const size_t dst); void addEdgeIf(const K &src, std::function<bool(const Vertex<K, V> &v)> predicate);
bool hasPath(const size_t from, const size_t to) const; bool hasPath(const K &from, const K &to) const;
bool hasVertex(const K &from);
const std::vector<Edge> &getEdges(); const std::vector<Edge> &getEdges();
@@ -53,17 +60,24 @@ namespace daggy {
// Reset any vertex with RUNNING state to QUEUED // Reset any vertex with RUNNING state to QUEUED
void resetRunning(); void resetRunning();
RunState getVertexState(const size_t id) const; RunState getVertexState(const K &id) const;
void setVertexState(const size_t id, RunState state); void setVertexState(const K &id, RunState state);
void forEach(std::function<void(const Vertex<K, V> &)> fun) const;
bool allVisited() const; bool allVisited() const;
std::optional<const size_t> visitNext(); std::optional<const Vertex<K, V>> visitNext();
void completeVisit(const size_t id); Vertex<K, V> &getVertex(const K &id);
void completeVisit(const K &id);
private: private:
std::vector<Vertex> vertices_; std::unordered_map<K, Vertex<K, V>> vertices_;
std::unordered_set<K> readyVertices_;
}; };
} }
#include "DAG.impl.hxx"

View File

@@ -0,0 +1,119 @@
namespace daggy {
template<typename K, typename V>
size_t DAG<K, V>::size() const { return vertices_.size(); }
template<typename K, typename V>
bool DAG<K, V>::empty() const { return vertices_.empty(); }
template<typename K, typename V>
bool DAG<K, V>::hasVertex(const K &id) { return vertices_.count(id) != 0; }
template<typename K, typename V>
Vertex <K, V> &DAG<K, V>::getVertex(const K &id) { return vertices_.at(id); }
template<typename K, typename V>
void DAG<K, V>::addVertex(K id, V data) {
if (vertices_.count(id) != 0) {
std::stringstream ss;
ss << "A vertex with ID " << id << " already exists in the DAG";
throw std::runtime_error(ss.str());
}
vertices_.emplace(id, Vertex<K, V>{.state = RunState::QUEUED, .depCount = 0, .key = id, .data = data
});
}
template<typename K, typename V>
void DAG<K, V>::addEdge(const K &from, const K &to) {
if (vertices_.find(from) == vertices_.end()) throw std::runtime_error("No such vertex");
if (vertices_.find(to) == vertices_.end()) throw std::runtime_error("No such vertex");
if (hasPath(to, from))
throw std::runtime_error("Adding edge would result in a cycle");
vertices_.at(from).children.insert(to);
vertices_.at(to).depCount++;
}
template<typename K, typename V>
void DAG<K, V>::addEdgeIf(const K &src, std::function<bool(const Vertex <K, V> &v)> predicate) {
for (const auto &[name, vertex]: vertices_) {
if (name == src) continue;
if (predicate(vertex)) addEdge(src, name);
}
}
template<typename K, typename V>
bool DAG<K, V>::hasPath(const K &from, const K &to) const {
if (vertices_.find(from) == vertices_.end()) throw std::runtime_error("No such vertex");
if (vertices_.find(to) == vertices_.end()) throw std::runtime_error("No such vertex");
for (const auto &child: vertices_.at(from).children) {
if (child == to) return true;
if (hasPath(child, to)) return true;
}
return false;
}
template<typename K, typename V>
void DAG<K, V>::reset() {
// Reset the state of all vertices
for (auto &[_, v]: vertices_) {
v.state = RunState::QUEUED;
v.depCount = 0;
}
// Calculate the upstream count
for (auto &[_, v]: vertices_) {
for (auto c: v.children) {
vertices_.at(c).depCount++;
}
}
}
template<typename K, typename V>
void DAG<K, V>::resetRunning() {
for (auto &[k, v]: vertices_) {
if (v.state != +RunState::RUNNING) continue;
v.state = RunState::QUEUED;
}
}
template<typename K, typename V>
void DAG<K, V>::setVertexState(const K &id, RunState state) {
vertices_.at(id).state = state;
}
template<typename K, typename V>
bool DAG<K, V>::allVisited() const {
for (const auto &[_, v]: vertices_) {
if (v.state != +RunState::COMPLETED) return false;
}
return true;
}
template<typename K, typename V>
std::optional<const Vertex <K, V>> DAG<K, V>::visitNext() {
for (auto &[k, v]: vertices_) {
if (v.state != +RunState::QUEUED) continue;
if (v.depCount != 0) continue;
v.state = RunState::RUNNING;
return v;
}
return {};
}
template<typename K, typename V>
void DAG<K, V>::completeVisit(const K &id) {
auto &v = vertices_.at(id);
v.state = RunState::COMPLETED;
for (auto c: v.children) {
--vertices_.at(c).depCount;
}
}
template<typename K, typename V>
void DAG<K, V>::forEach(std::function<void(const Vertex <K, V> &)> fun) const {
for (const auto &[_, v]: vertices_) {
fun(v);
}
}
}

View File

@@ -34,20 +34,28 @@ namespace daggy {
struct Task { struct Task {
std::string name; std::string name;
// definedName is the name from the original DAGDefinition.
std::string definedName;
std::vector<std::string> command; std::vector<std::string> command;
uint32_t maxRetries; uint32_t maxRetries;
uint32_t retryIntervalSeconds; // Time to wait between retries uint32_t retryIntervalSeconds; // Time to wait between retries
std::unordered_set<std::string> children; std::unordered_set<std::string> children;
std::unordered_set<std::string> parents;
bool isGenerator; // True if the output of this task is a JSON set of tasks to complete
bool operator==(const Task &other) const { bool operator==(const Task &other) const {
return (name == other.name) return (name == other.name)
and (maxRetries == other.maxRetries) and (maxRetries == other.maxRetries)
and (retryIntervalSeconds == other.retryIntervalSeconds) and (retryIntervalSeconds == other.retryIntervalSeconds)
and (command == other.command) and (command == other.command)
and (children == other.children); and (children == other.children)
and (parents == other.parents)
and (isGenerator == other.isGenerator);
} }
}; };
using TaskList = std::unordered_map<std::string, Task>;
struct AttemptRecord { struct AttemptRecord {
TimePoint startTime; TimePoint startTime;
TimePoint stopTime; TimePoint stopTime;
@@ -58,4 +66,4 @@ namespace daggy {
}; };
} }
BETTER_ENUMS_DECLARE_STD_HASH(daggy::RunState) BETTER_ENUMS_DECLARE_STD_HASH(daggy::RunState)

View File

@@ -18,13 +18,16 @@ namespace daggy {
ParameterValues parametersFromJSON(const rj::Value &spec); ParameterValues parametersFromJSON(const rj::Value &spec);
// Tasks // Tasks
std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {}); TaskList
taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues &parameters = {});
std::vector<Task> tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters = {}); TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {});
TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters = {});
std::string taskToJSON(const Task &task); std::string taskToJSON(const Task &task);
std::string tasksToJSON(const std::vector<Task> &tasks); std::string tasksToJSON(const TaskList &tasks);
// Attempt Records // Attempt Records
std::string attemptRecordToJSON(const AttemptRecord &attemptRecord); std::string attemptRecordToJSON(const AttemptRecord &attemptRecord);

View File

@@ -13,12 +13,20 @@
#include "DAG.hpp" #include "DAG.hpp"
namespace daggy { namespace daggy {
using TaskDAG = DAG<std::string, Task>;
std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement); std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement);
std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters); std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters);
DAG buildDAGFromTasks(const std::vector<Task> &tasks, std::unordered_set<std::string>
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates = {}); findDerivedVertices(TaskDAG &dag, const std::string &definedName);
TaskDAG
buildDAGFromTasks(TaskList &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates = {});
void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks);
// Blocking call // Blocking call
std::vector<AttemptRecord> std::vector<AttemptRecord>
@@ -28,11 +36,11 @@ namespace daggy {
executors::task::TaskExecutor &executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger); loggers::dag_run::DAGRunLogger &logger);
DAG runDAG(DAGRunID runID, TaskDAG runDAG(DAGRunID runID,
std::vector<Task> tasks, executors::task::TaskExecutor &executor,
executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
DAG dag); const ParameterValues taskParameters = {});
std::ostream &operator<<(std::ostream &os, const TimePoint &tp); std::ostream &operator<<(std::ostream &os, const TimePoint &tp);
} }

View File

@@ -17,7 +17,11 @@ namespace daggy {
class DAGRunLogger { class DAGRunLogger {
public: public:
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) = 0; virtual DAGRunID startDAGRun(std::string name, const TaskList &tasks) = 0;
virtual void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) = 0;
virtual void updateTask(DAGRunID dagRunID, const std::string taskName, const Task &task) = 0;
virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0;

View File

@@ -4,12 +4,14 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <unordered_set> #include <unordered_set>
#include <unordered_map>
#include "../../Defines.hpp"
namespace daggy::loggers::dag_run { namespace daggy::loggers::dag_run {
struct TaskUpdateRecord { struct TaskUpdateRecord {
TimePoint time; TimePoint time;
TaskID taskID; std::string taskName;
RunState newState; RunState newState;
}; };
@@ -21,9 +23,9 @@ namespace daggy::loggers::dag_run {
// Pretty heavy weight, but // Pretty heavy weight, but
struct DAGRunRecord { struct DAGRunRecord {
std::string name; std::string name;
std::vector<Task> tasks; TaskList tasks;
std::vector<RunState> taskRunStates; std::unordered_map<std::string, RunState> taskRunStates;
std::vector<std::vector<AttemptRecord>> taskAttempts; std::unordered_map<std::string, std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges; std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges; std::vector<DAGUpdateRecord> dagStateChanges;
}; };

View File

@@ -37,7 +37,7 @@ namespace daggy::loggers::dag_run {
FileSystemLogger(fs::path root); FileSystemLogger(fs::path root);
// Execution // Execution
DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override; DAGRunID startDAGRun(std::string name, const TaskList &tasks) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override;

View File

@@ -18,7 +18,11 @@ namespace daggy {
OStreamLogger(std::ostream &os); OStreamLogger(std::ostream &os);
// Execution // Execution
DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override; DAGRunID startDAGRun(std::string name, const TaskList &tasks) override;
void addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) override;
void updateTask(DAGRunID dagRunID, const std::string taskName, const Task &task) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
@@ -36,6 +40,10 @@ namespace daggy {
std::mutex guard_; std::mutex guard_;
std::ostream &os_; std::ostream &os_;
std::vector<DAGRunRecord> dagRuns_; std::vector<DAGRunRecord> dagRuns_;
void _updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state);
void _updateDAGRunState(DAGRunID dagRunID, RunState state);
}; };
} }
} }

View File

@@ -1,91 +0,0 @@
#include <daggy/DAG.hpp>
#include <stdexcept>
namespace daggy {
size_t DAG::size() const { return vertices_.size(); }
bool DAG::empty() const { return vertices_.empty(); }
size_t DAG::addVertex() {
vertices_.push_back(Vertex{.state = RunState::QUEUED, .depCount = 0});
return vertices_.size() - 1;
}
void DAG::dropEdge(const size_t from, const size_t to) {
if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
vertices_[from].children.extract(to);
}
void DAG::addEdge(const size_t from, const size_t to) {
if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
if (hasPath(to, from))
throw std::runtime_error("Adding edge would result in a cycle");
vertices_[from].children.insert(to);
}
bool DAG::hasPath(const size_t from, const size_t to) const {
if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
for (const auto &child: vertices_[from].children) {
if (child == to) return true;
if (hasPath(child, to)) return true;
}
return false;
}
void DAG::reset() {
// Reset the state of all vertices
for (auto &v: vertices_) {
v.state = RunState::QUEUED;
v.depCount = 0;
}
// Calculate the upstream count
for (auto &v: vertices_) {
for (auto c: v.children) {
++vertices_[c].depCount;
}
}
}
void DAG::resetRunning() {
for (auto &v: vertices_) {
if (v.state != +RunState::RUNNING) continue;
v.state = RunState::QUEUED;
}
}
void DAG::setVertexState(const size_t id, RunState state) {
vertices_[id].state = state;
}
bool DAG::allVisited() const {
for (const auto &v: vertices_) {
if (v.state != +RunState::COMPLETED) return false;
}
return true;
}
std::optional<const size_t> DAG::visitNext() {
for (size_t i = 0; i < vertices_.size(); ++i) {
auto &v = vertices_[i];
if (v.state != +RunState::QUEUED) continue;
if (v.depCount != 0) continue;
v.state = RunState::RUNNING;
return i;
}
return {};
}
void DAG::completeVisit(const size_t id) {
auto &v = vertices_[id];
v.state = RunState::COMPLETED;
for (auto c: v.children) {
--vertices_[c].depCount;
}
}
}

View File

@@ -44,7 +44,69 @@ namespace daggy {
return parameters; return parameters;
} }
std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters) { TaskList
taskFromJSON(const std::string &name, const rj::Value &spec, const ParameterValues &parameters) {
TaskList tasks;
if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); }
if (!spec.HasMember("command")) {
throw std::runtime_error("Task " + name + " is missing required 'command' field");
}
// Grab the standard fields with defaults;
bool isGenerator = false;
if (spec.HasMember("isGenerator")) {
isGenerator = spec["isGenerator"].GetBool();
}
uint8_t maxRetries = 0;
if (spec.HasMember("maxRetries")) { maxRetries = spec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0;
if (spec.HasMember(
"retryIntervalSeconds")) { retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt(); }
// Children / parents
std::unordered_set<std::string> children;
if (spec.HasMember("children")) {
const auto &specChildren = spec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
children.insert(specChildren[c].GetString());
}
}
std::unordered_set<std::string> parents;
if (spec.HasMember("parents")) {
const auto &specParents = spec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
parents.insert(specParents[c].GetString());
}
}
// Build out the commands
std::vector<std::string> command;
for (size_t cmd = 0; cmd < spec["command"].Size(); ++cmd) {
command.emplace_back(spec["command"][cmd].GetString());
}
auto commands = expandCommands(command, parameters);
// Create the tasks
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid));
tasks.emplace(taskName, Task{
.name = taskName,
.definedName = name,
.command = commands[tid],
.maxRetries = maxRetries,
.retryIntervalSeconds = retryIntervalSeconds,
.children = children,
.parents = parents,
.isGenerator = isGenerator
});
}
return tasks;
}
TaskList tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters) {
rj::Document doc; rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (!parseResult) { if (!parseResult) {
@@ -53,101 +115,22 @@ namespace daggy {
return tasksFromJSON(doc, parameters); return tasksFromJSON(doc, parameters);
} }
std::vector<Task> tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters) { TaskList tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters) {
std::vector<Task> tasks; TaskList tasks;
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); } if (!spec.IsObject()) { throw std::runtime_error("Tasks is not an object"); }
const std::vector<std::string> reqFields{"name", "command"};
std::unordered_map<std::string, std::vector<std::string>> childrenMap;
// Maps child -> parent
std::unordered_map<std::string, std::vector<std::string>> parentMap;
std::unordered_map<std::string, size_t> taskIndex;
// Tasks // Tasks
for (size_t i = 0; i < spec.Size(); ++i) { for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (!spec[i].IsObject()) { if (!it->name.IsString()) throw std::runtime_error("Task names must be a string.");
throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary."); if (!it->value.IsObject()) throw std::runtime_error("Task definitions must be an object.");
} auto subTasks = taskFromJSON(it->name.GetString(), it->value, parameters);
const auto &taskSpec = spec[i].GetObject(); tasks.merge(subTasks);
for (const auto &reqField : reqFields) {
if (!taskSpec.HasMember(reqField.c_str())) {
throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField);
}
}
// Grab the standard fields with defaults;
std::string name = taskSpec["name"].GetString();
taskIndex[name] = i;
uint8_t maxRetries = 0;
if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0;
if (taskSpec.HasMember(
"retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
// Children / parents
std::unordered_set<std::string> children;
if (taskSpec.HasMember("children")) {
const auto &specChildren = taskSpec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
children.insert(specChildren[c].GetString());
}
}
if (taskSpec.HasMember("parents")) {
const auto &specParents = taskSpec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
parentMap[name].emplace_back(specParents[c].GetString());
}
}
// Build out the commands
std::vector<std::string> command;
for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) {
command.emplace_back(taskSpec["command"][cmd].GetString());
}
auto commands = expandCommands(command, parameters);
// Create the tasks
auto &taskNames = childrenMap[name];
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid));
taskNames.push_back(taskName);
tasks.emplace_back(Task{
.name = taskName,
.command = commands[tid],
.maxRetries = maxRetries,
.retryIntervalSeconds = retryIntervalSeconds,
.children = children
});
}
} }
// Update any missing child -> parent relationship
for (auto &task : tasks) {
auto pit = parentMap.find(task.name);
if (pit == parentMap.end()) { continue; }
for (const auto &parent : pit->second) {
tasks[taskIndex[parent]].children.insert(task.name);
}
}
// At the end, replace the names of the children with all the expanded versions
for (auto &task : tasks) {
std::unordered_set<std::string> children;
for (const auto &child : task.children) {
auto &newChildren = childrenMap[child];
std::copy(newChildren.begin(), newChildren.end(), std::inserter(children, children.end()));
}
task.children.swap(children);
}
return tasks; return tasks;
} }
// I really want to do this with rapidjson, but damn they make it ugly and difficult. // I really want to do this with rapidjson, but damn they make it ugly and difficult.
// So we'll shortcut and generate the JSON directly. // So we'll shortcut and generate the JSON directly.
std::string taskToJSON(const Task &task) { std::string taskToJSON(const Task &task) {
std::stringstream ss; std::stringstream ss;
bool first = false; bool first = false;
@@ -160,7 +143,7 @@ namespace daggy {
// Commands // Commands
ss << R"("command": [)"; ss << R"("command": [)";
first = true; first = true;
for (const auto &part : task.command) { for (const auto &part: task.command) {
if (!first) ss << ','; if (!first) ss << ',';
ss << std::quoted(part); ss << std::quoted(part);
first = false; first = false;
@@ -169,29 +152,31 @@ namespace daggy {
ss << R"("children": [)"; ss << R"("children": [)";
first = true; first = true;
for (const auto &child : task.children) { for (const auto &child: task.children) {
if (!first) ss << ','; if (!first) ss << ',';
ss << std::quoted(child); ss << std::quoted(child);
first = false; first = false;
} }
ss << "]"; ss << "],";
ss << R"("isGenerator": )" << (task.isGenerator ? "true" : "false");
ss << '}'; ss << '}';
return ss.str(); return ss.str();
} }
std::string tasksToJSON(const std::vector<Task> &tasks) { std::string tasksToJSON(const TaskList &tasks) {
std::stringstream ss; std::stringstream ss;
ss << "["; ss << "{";
bool first = true; bool first = true;
for (const auto &task : tasks) { for (const auto &[name, task]: tasks) {
if (!first) ss << ','; if (!first) ss << ',';
ss << taskToJSON(task); ss << std::quoted(name) << ": " << taskToJSON(task);
first = false; first = false;
} }
ss << "]"; ss << "}";
return ss.str(); return ss.str();
} }
@@ -228,4 +213,5 @@ namespace daggy {
ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z");
return Clock::from_time_t(mktime(&dt)); return Clock::from_time_t(mktime(&dt));
} }
} }

View File

@@ -118,21 +118,20 @@ namespace daggy {
} }
// Get the tasks // Get the tasks
std::vector<Task> tasks; TaskList tasks;
try { try {
auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray(), parameters); auto parsedTasks = tasksFromJSON(doc["tasks"], parameters);
tasks.swap(parsedTasks); tasks.swap(parsedTasks);
} catch (std::exception &e) { } catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what()); REQ_ERROR(Bad_Request, e.what());
} }
// Get a run ID // Get a run ID
auto runID = logger_.startDAGRun(runName, tasks); auto runID = logger_.startDAGRun(runName, tasks);
auto dag = buildDAGFromTasks(tasks); auto dag = buildDAGFromTasks(tasks);
runnerPool_.addTask( runnerPool_.addTask(
[this, runID, tasks, dag]() { runDAG(runID, tasks, this->executor_, this->logger_, dag); }); [this, parameters, runID, dag]() { runDAG(runID, this->executor_, this->logger_, dag, parameters); });
response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}"); response.send(Pistache::Http::Code::Ok, R"({"runID": )" + std::to_string(runID) + "}");
} }
@@ -190,7 +189,7 @@ namespace daggy {
// task run states // task run states
ss << R"("taskStates": [ )"; ss << R"("taskStates": [ )";
first = true; first = true;
for (const auto &state: run.taskRunStates) { for (const auto &[_, state]: run.taskRunStates) {
if (first) { first = false; } else { ss << ','; } if (first) { first = false; } else { ss << ','; }
ss << std::quoted(state._to_string()); ss << std::quoted(state._to_string());
} }
@@ -198,10 +197,10 @@ namespace daggy {
// Attempt records // Attempt records
first = true; first = true;
ss << R"("taskAttempts": [ )"; ss << R"("taskAttempts": { )";
for (const auto &attempts: run.taskAttempts) { for (const auto &[taskName, attempts]: run.taskAttempts) {
if (first) { first = false; } else { ss << ','; } if (first) { first = false; } else { ss << ','; }
ss << '['; ss << std::quoted(taskName) << ": [";
bool firstAttempt = true; bool firstAttempt = true;
for (const auto &attempt: attempts) { for (const auto &attempt: attempts) {
if (firstAttempt) { firstAttempt = false; } else { ss << ','; } if (firstAttempt) { firstAttempt = false; } else { ss << ','; }
@@ -216,7 +215,7 @@ namespace daggy {
} }
ss << ']'; ss << ']';
} }
ss << "],"; ss << "},";
// DAG state changes // DAG state changes
first = true; first = true;

View File

@@ -1,6 +1,7 @@
#include <iomanip> #include <iomanip>
#include <daggy/Utilities.hpp> #include <daggy/Utilities.hpp>
#include <daggy/Serialization.hpp>
namespace daggy { namespace daggy {
std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) { std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) {
@@ -16,22 +17,22 @@ namespace daggy {
expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters) { expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters) {
std::vector<std::vector<std::string>> commands{{}}; std::vector<std::vector<std::string>> commands{{}};
for (const auto &part : command) { for (const auto &part: command) {
std::vector<std::string> expandedPart{part}; std::vector<std::string> expandedPart{part};
// Find all values of parameters, and expand them // Find all values of parameters, and expand them
for (const auto &[param, paramValue] : parameters) { for (const auto &[param, paramValue]: parameters) {
auto pos = part.find(param); auto pos = part.find(param);
if (pos == std::string::npos) continue; if (pos == std::string::npos) continue;
std::vector<std::string> newExpandedPart; std::vector<std::string> newExpandedPart;
if (std::holds_alternative<std::string>(paramValue)) { if (std::holds_alternative<std::string>(paramValue)) {
for (auto &cmd : expandedPart) { for (auto &cmd: expandedPart) {
newExpandedPart.push_back(globalSub(cmd, param, std::get<std::string>(paramValue))); newExpandedPart.push_back(globalSub(cmd, param, std::get<std::string>(paramValue)));
} }
} else { } else {
for (const auto &val : std::get<std::vector<std::string>>(paramValue)) { for (const auto &val: std::get<std::vector<std::string>>(paramValue)) {
for (auto cmd : expandedPart) { for (auto cmd: expandedPart) {
newExpandedPart.push_back(globalSub(cmd, param, val)); newExpandedPart.push_back(globalSub(cmd, param, val));
} }
} }
@@ -41,8 +42,8 @@ namespace daggy {
} }
std::vector<std::vector<std::string>> newCommands; std::vector<std::vector<std::string>> newCommands;
for (const auto &newPart : expandedPart) { for (const auto &newPart: expandedPart) {
for (auto cmd : commands) { for (auto cmd: commands) {
cmd.push_back(newPart); cmd.push_back(newPart);
newCommands.emplace_back(cmd); newCommands.emplace_back(cmd);
} }
@@ -52,33 +53,58 @@ namespace daggy {
return commands; return commands;
} }
DAG buildDAGFromTasks(const std::vector<Task> &tasks, std::unordered_set<std::string>
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates) { findDerivedVertices(TaskDAG &dag, const std::string &definedName) {
DAG dag; std::unordered_set<std::string> vertices;
std::unordered_map<std::string, size_t> taskIDs; dag.forEach([&](const auto &v) {
if (v.data.definedName == definedName) {
vertices.insert(v.data.name);
}
});
return vertices;
}
void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks) {
// Add all the vertices // Add all the vertices
for (const auto &task : tasks) { std::unordered_map<std::string, std::unordered_set<std::string>> definedSets;
taskIDs[task.name] = dag.addVertex(); for (const auto &[name, task]: tasks) {
dag.addVertex(name, task);
definedSets[task.definedName].insert(name);
} }
// Add edges // Add edges
for (size_t i = 0; i < tasks.size(); ++i) { for (const auto &[name, task]: tasks) {
for (const auto &c : tasks[i].children) { for (const auto &defChild: task.children) {
dag.addEdge(i, taskIDs[c]); for (const auto &child: definedSets[defChild]) {
dag.addEdge(name, child);
}
}
for (const auto &defParent: task.parents) {
for (const auto &parent: definedSets[defParent]) {
dag.addEdge(parent, name);
tasks.at(parent).children.insert(name);
}
} }
} }
}
TaskDAG buildDAGFromTasks(TaskList &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates) {
TaskDAG dag;
updateDAGFromTasks(dag, tasks);
dag.reset(); dag.reset();
// Replay any updates // Replay any updates
for (const auto &update : updates) { for (const auto &update: updates) {
switch (update.newState) { switch (update.newState) {
case RunState::RUNNING: case RunState::RUNNING:
case RunState::RETRY: case RunState::RETRY:
case RunState::ERRORED: case RunState::ERRORED:
case RunState::KILLED: case RunState::KILLED:
dag.setVertexState(update.taskID, RunState::RUNNING); dag.setVertexState(update.taskName, RunState::RUNNING);
dag.setVertexState(update.taskID, RunState::COMPLETED); dag.setVertexState(update.taskName, RunState::COMPLETED);
break; break;
} }
} }
@@ -87,7 +113,6 @@ namespace daggy {
} }
std::vector<AttemptRecord> runTask(DAGRunID runID, std::vector<AttemptRecord> runTask(DAGRunID runID,
TaskID taskID,
const Task &task, const Task &task,
executors::task::TaskExecutor &executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger) { loggers::dag_run::DAGRunLogger &logger) {
@@ -103,45 +128,55 @@ namespace daggy {
return attempts; return attempts;
} }
DAG runDAG(DAGRunID runID, TaskDAG runDAG(DAGRunID runID,
std::vector<Task> tasks, executors::task::TaskExecutor &executor,
executors::task::TaskExecutor &executor, loggers::dag_run::DAGRunLogger &logger,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
DAG dag) { const ParameterValues taskParameters
) {
logger.updateDAGRunState(runID, RunState::RUNNING); logger.updateDAGRunState(runID, RunState::RUNNING);
struct TaskState { std::unordered_map<std::string, std::future<std::vector<AttemptRecord>>> runningTasks;
size_t tid;
std::future<std::vector<AttemptRecord>> fut;
bool complete;
};
std::vector<TaskState> taskStates;
// TODO Handle case where everything is wedged due to errors
size_t running = 0; size_t running = 0;
size_t errored = 0; size_t errored = 0;
while (!dag.allVisited()) { while (!dag.allVisited()) {
// Check for any completed tasks // Check for any completed tasks
for (auto &taskState : taskStates) { for (auto &[taskName, fut]: runningTasks) {
if (taskState.complete) continue; if (fut.valid()) {
auto attemptRecords = fut.get();
if (taskState.fut.valid()) {
auto attemptRecords = taskState.fut.get();
const auto &taskName = tasks[taskState.tid].name;
if (attemptRecords.empty()) { if (attemptRecords.empty()) {
logger.updateTaskState(runID, taskName, RunState::ERRORED); logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored; ++errored;
} }
if (attemptRecords.back().rc == 0) { if (attemptRecords.back().rc == 0) {
logger.updateTaskState(runID, taskName, RunState::COMPLETED); logger.updateTaskState(runID, taskName, RunState::COMPLETED);
dag.completeVisit(taskState.tid); auto &vert = dag.getVertex(taskName);
auto &task = vert.data;
if (task.isGenerator) {
// Parse the output and update the DAGs
// TODO: Let the logger know about the new tasks
try {
auto newTasks = tasksFromJSON(attemptRecords.back().outputLog, taskParameters);
updateDAGFromTasks(dag, newTasks);
for (const auto &[ntName, ntTask]: newTasks) {
logger.addTask(runID, ntName, ntTask);
dag.addEdge(taskName, ntName);
task.children.insert(ntName);
}
logger.updateTask(runID, taskName, task);
} catch (std::exception &e) {
logger.updateTaskState(runID, task.name, RunState::ERRORED);
++errored;
}
}
dag.completeVisit(taskName);
--running; --running;
} else { } else {
logger.updateTaskState(runID, taskName, RunState::ERRORED); logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored; ++errored;
} }
taskState.complete = true;
} }
} }
@@ -150,15 +185,10 @@ namespace daggy {
auto t = dag.visitNext(); auto t = dag.visitNext();
while (t.has_value()) { while (t.has_value()) {
// Schedule the task to run // Schedule the task to run
auto tid = t.value(); auto vertex = t.value();
TaskState tsk{ runningTasks.emplace(vertex.data.name, tq->addTask([runID, vertex, &executor, &logger]() {
.tid = tid, return runTask(runID, vertex.data, executor, logger);
.fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() { }));
return runTask(runID, tid, tasks[tid], executor, logger);
}),
.complete = false
};
taskStates.push_back(std::move(tsk));
++running; ++running;
auto nextTask = dag.visitNext(); auto nextTask = dag.visitNext();

View File

@@ -39,7 +39,7 @@ namespace daggy {
} }
// Execution // Execution
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) { DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskList &tasks) {
DAGRunID runID = nextRunID_++; DAGRunID runID = nextRunID_++;
// TODO make this threadsafe // TODO make this threadsafe
@@ -56,8 +56,8 @@ namespace daggy {
ofh.close(); ofh.close();
// Task directories // Task directories
for (const auto &task: tasks) { for (const auto &[name, task]: tasks) {
auto taskDir = runRoot / task.name; auto taskDir = runRoot / name;
fs::create_directories(taskDir); fs::create_directories(taskDir);
std::ofstream ofh(taskDir / "states.csv"); std::ofstream ofh(taskDir / "states.csv");
} }
@@ -136,7 +136,7 @@ namespace daggy {
doc.Parse(metaData.c_str()); doc.Parse(metaData.c_str());
record.name = doc["name"].GetString(); record.name = doc["name"].GetString();
record.tasks = tasksFromJSON(doc["tasks"].GetObject()); record.tasks = tasksFromJSON(doc["tasks"]);
// DAG State Changes // DAG State Changes
std::string line; std::string line;
@@ -158,10 +158,10 @@ namespace daggy {
ifh.close(); ifh.close();
// Task states // Task states
for (const auto &task: record.tasks) { for (const auto &[_, task]: record.tasks) {
auto taskStateFile = runRoot / task.name / "states.csv"; auto taskStateFile = runRoot / task.name / "states.csv";
if (!fs::exists(taskStateFile)) { if (!fs::exists(taskStateFile)) {
record.taskRunStates.push_back(RunState::QUEUED); record.taskRunStates.emplace(task.name, RunState::QUEUED);
continue; continue;
} }
@@ -170,7 +170,7 @@ namespace daggy {
std::stringstream ss{line}; std::stringstream ss{line};
while (std::getline(ss, token, ',')) { continue; } while (std::getline(ss, token, ',')) { continue; }
RunState taskState = RunState::_from_string(token.c_str()); RunState taskState = RunState::_from_string(token.c_str());
record.taskRunStates.emplace_back(taskState); record.taskRunStates.emplace(task.name, taskState);
ifh.close(); ifh.close();
} }
return record; return record;

View File

@@ -11,19 +11,21 @@ namespace daggy {
OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {} OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) {}
// Execution // Execution
DAGRunID OStreamLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) { DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskList &tasks) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
size_t runID = dagRuns_.size(); size_t runID = dagRuns_.size();
dagRuns_.push_back({ dagRuns_.push_back({
.name = name, .name = name,
.tasks = tasks, .tasks = tasks
.taskRunStates{tasks.size(), RunState::QUEUED},
.taskAttempts = std::vector<std::vector<AttemptRecord>>(tasks.size())
}); });
for (const auto &[name, _]: tasks) {
_updateTaskState(runID, name, RunState::QUEUED);
}
_updateDAGRunState(runID, RunState::QUEUED);
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
<< " tasks" << std::endl; << " tasks" << std::endl;
for (const auto &task: tasks) { for (const auto &[_, task]: tasks) {
os_ << "TASK (" << task.name << "): "; os_ << "TASK (" << task.name << "): ";
std::copy(task.command.begin(), task.command.end(), std::copy(task.command.begin(), task.command.end(),
std::ostream_iterator<std::string>(os_, " ")); std::ostream_iterator<std::string>(os_, " "));
@@ -32,8 +34,25 @@ namespace daggy {
return runID; return runID;
} }
void OStreamLogger::addTask(DAGRunID dagRunID, const std::string taskName, const Task &task) {
std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID];
dagRun.tasks[taskName] = task;
_updateTaskState(dagRunID, taskName, RunState::QUEUED);
}
void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string taskName, const Task &task) {
std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID];
dagRun.tasks[taskName] = task;
}
void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) { void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
_updateDAGRunState(dagRunID, state);
}
void OStreamLogger::_updateDAGRunState(DAGRunID dagRunID, RunState state) {
os_ << "DAG State Change(" << dagRunID << "): " << state._to_string() << std::endl; os_ << "DAG State Change(" << dagRunID << "): " << state._to_string() << std::endl;
dagRuns_[dagRunID].dagStateChanges.push_back({Clock::now(), state}); dagRuns_[dagRunID].dagStateChanges.push_back({Clock::now(), state});
} }
@@ -45,26 +64,25 @@ namespace daggy {
os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": " os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": "
<< msg << std::endl; << msg << std::endl;
const auto &tasks = dagRuns_[dagRunID].tasks; dagRuns_[dagRunID].taskAttempts[taskName].push_back(attempt);
auto it = std::find_if(tasks.begin(), tasks.end(),
[&taskName](const Task &a) { return a.name == taskName; });
if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName);
size_t taskID = it - tasks.begin();
dagRuns_[dagRunID].taskAttempts[taskID].push_back(attempt);
} }
void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID]; _updateTaskState(dagRunID, taskName, state);
const auto &tasks = dagRun.tasks; }
auto it = std::find_if(tasks.begin(), tasks.end(),
[&taskName](const Task &a) { return a.name == taskName; });
if (it == tasks.end()) throw std::runtime_error("No such task: " + taskName);
size_t taskID = it - tasks.begin();
dagRun.taskStateChanges.push_back({Clock::now(), taskID, state});
dagRun.taskRunStates[taskID] = state;
os_ << "Task State Change (" << dagRunID << '/' << taskName << " [task_id: " << taskID << "]): " void OStreamLogger::_updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) {
auto &dagRun = dagRuns_.at(dagRunID);
dagRun.taskStateChanges.push_back({Clock::now(), taskName, state});
auto it = dagRun.taskRunStates.find(taskName);
if (it == dagRun.taskRunStates.end()) {
dagRun.taskRunStates.emplace(taskName, state);
} else {
it->second = state;
}
os_ << "Task State Change (" << dagRunID << '/' << taskName << "): "
<< state._to_string() << state._to_string()
<< std::endl; << std::endl;
} }
@@ -84,12 +102,7 @@ namespace daggy {
run.dagStateChanges.back().time) run.dagStateChanges.back().time)
}; };
std::vector<RunState> states(run.tasks.size(), RunState::QUEUED); for (const auto &[_, taskState]: run.taskRunStates) {
for (const auto &taskUpdate: run.taskStateChanges) {
states[taskUpdate.taskID] = taskUpdate.newState;
}
for (const auto &taskState: states) {
summary.taskStateCounts[taskState]++; summary.taskStateCounts[taskState]++;
} }

View File

@@ -5,14 +5,16 @@
#include <catch2/catch.hpp> #include <catch2/catch.hpp>
TEST_CASE("DAG Construction Tests", "[dag]") { TEST_CASE("DAG Construction Tests", "[dag]") {
daggy::DAG dag; daggy::DAG<size_t, size_t> dag;
REQUIRE(dag.size() == 0); REQUIRE(dag.size() == 0);
REQUIRE(dag.empty()); REQUIRE(dag.empty());
REQUIRE_NOTHROW(dag.addVertex()); REQUIRE_NOTHROW(dag.addVertex(0, 0));
for (int i = 1; i < 10; ++i) { for (int i = 1; i < 10; ++i) {
dag.addVertex(); dag.addVertex(i, i);
REQUIRE(dag.hasVertex(i));
REQUIRE(dag.getVertex(i).data == i);
dag.addEdge(i - 1, i); dag.addEdge(i - 1, i);
} }
@@ -26,9 +28,6 @@ TEST_CASE("DAG Construction Tests", "[dag]") {
SECTION("addEdge Bounds Checking") { SECTION("addEdge Bounds Checking") {
REQUIRE_THROWS(dag.addEdge(20, 0)); REQUIRE_THROWS(dag.addEdge(20, 0));
REQUIRE_THROWS(dag.addEdge(0, 20)); REQUIRE_THROWS(dag.addEdge(0, 20));
}SECTION("dropEdge Bounds Checking") {
REQUIRE_THROWS(dag.dropEdge(20, 0));
REQUIRE_THROWS(dag.dropEdge(0, 20));
}SECTION("hasPath Bounds Checking") { }SECTION("hasPath Bounds Checking") {
REQUIRE_THROWS(dag.hasPath(20, 0)); REQUIRE_THROWS(dag.hasPath(20, 0));
REQUIRE_THROWS(dag.hasPath(0, 20)); REQUIRE_THROWS(dag.hasPath(0, 20));
@@ -36,11 +35,11 @@ TEST_CASE("DAG Construction Tests", "[dag]") {
} }
TEST_CASE("DAG Traversal Tests", "[dag]") { TEST_CASE("DAG Traversal Tests", "[dag]") {
daggy::DAG dag; daggy::DAG<size_t, size_t> dag;
const int N_VERTICES = 10; const int N_VERTICES = 10;
for (int i = 0; i < N_VERTICES; ++i) { dag.addVertex(); } for (int i = 0; i < N_VERTICES; ++i) { dag.addVertex(i, i); }
/* /*
0 ---------------------\ 0 ---------------------\
@@ -61,24 +60,30 @@ TEST_CASE("DAG Traversal Tests", "[dag]") {
{7, 9} {7, 9}
}; };
for (auto const[from, to] : edges) { for (auto const[from, to]: edges) {
dag.addEdge(from, to); dag.addEdge(from, to);
} }
SECTION("Baisc Traversal") { SECTION("Basic Traversal") {
dag.reset(); dag.reset();
std::vector<int> visitOrder(N_VERTICES); std::vector<int> visitOrder(N_VERTICES);
size_t i = 0; size_t i = 0;
while (!dag.allVisited()) { while (!dag.allVisited()) {
const auto &v = dag.visitNext().value(); const auto &v = dag.visitNext().value();
dag.completeVisit(v); dag.completeVisit(v.key);
visitOrder[v] = i; visitOrder[v.key] = i;
++i; ++i;
} }
// Ensure visit order is preserved // Ensure visit order is preserved
for (auto const[from, to] : edges) { for (auto const[from, to]: edges) {
REQUIRE(visitOrder[from] <= visitOrder[to]); REQUIRE(visitOrder[from] <= visitOrder[to]);
} }
} }
SECTION("Iteration") {
size_t nVisited = 0;
dag.forEach([&](const daggy::Vertex<size_t, size_t> &) { ++nVisited; });
REQUIRE(nVisited == dag.size());
}
} }

View File

@@ -0,0 +1,67 @@
#include <iostream>
#include <filesystem>
#include <fstream>
#include <catch2/catch.hpp>
#include "daggy/loggers/dag_run/FileSystemLogger.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
using namespace daggy;
using namespace daggy::loggers::dag_run;
const TaskList SAMPLE_TASKS{
{"work_a", Task{.command{"/bin/echo", "a"}, .children{"c"}}},
{"work_b", Task{.command{"/bin/echo", "b"}, .children{"c"}}},
{"work_c", Task{.command{"/bin/echo", "c"}}}
};
inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, const TaskList &tasks) {
auto runID = logger.startDAGRun(name, tasks);
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.tasks == tasks);
REQUIRE(dagRun.taskRunStates.size() == tasks.size());
auto nonQueuedTask = std::find_if(dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(),
[](const auto &a) { return a.second != +RunState::QUEUED; });
REQUIRE(nonQueuedTask == dagRun.taskRunStates.end());
REQUIRE(dagRun.dagStateChanges.size() == 1);
REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::QUEUED);
return runID;
}
/*
TEST_CASE("Filesystem Logger", "[filesystem_logger]") {
const fs::path logRoot{"fs_logger_unit"};
auto cleanup = [&]() {
if (fs::exists(logRoot)) {
fs::remove_all(logRoot);
}
};
//cleanup();
daggy::loggers::dag_run::FileSystemLogger logger(logRoot);
SECTION("DAGRun Starts") {
testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
}
// cleanup();
}
*/
TEST_CASE("ostream Logger", "[ostream_logger]") {
//cleanup();
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("DAGRun Starts") {
testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
}
// cleanup();
}

View File

@@ -29,7 +29,7 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") {
TEST_CASE("Task Deserialization", "[deserialize_task]") { TEST_CASE("Task Deserialization", "[deserialize_task]") {
SECTION("Build with no expansion") { SECTION("Build with no expansion") {
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; std::string testTasks = R"({ "A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})";
auto tasks = daggy::tasksFromJSON(testTasks); auto tasks = daggy::tasksFromJSON(testTasks);
REQUIRE(tasks.size() == 3); REQUIRE(tasks.size() == 3);
} }
@@ -37,7 +37,7 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") {
SECTION("Build with expansion") { SECTION("Build with expansion") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parametersFromJSON(testParams); auto params = daggy::parametersFromJSON(testParams);
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["B"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["B"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})";
auto tasks = daggy::tasksFromJSON(testTasks, params); auto tasks = daggy::tasksFromJSON(testTasks, params);
REQUIRE(tasks.size() == 4); REQUIRE(tasks.size() == 4);
} }
@@ -45,7 +45,7 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") {
SECTION("Build with expansion using parents instead of children") { SECTION("Build with expansion using parents instead of children") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"}; std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parametersFromJSON(testParams); auto params = daggy::parametersFromJSON(testParams);
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]},{"name": "C", "command": ["/bin/echo", "C"], "parents": ["A"]}])"; std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"]}, "B": {"command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]}, "C": {"command": ["/bin/echo", "C"], "parents": ["A"]}})";
auto tasks = daggy::tasksFromJSON(testTasks, params); auto tasks = daggy::tasksFromJSON(testTasks, params);
REQUIRE(tasks.size() == 4); REQUIRE(tasks.size() == 4);
} }
@@ -53,21 +53,16 @@ TEST_CASE("Task Deserialization", "[deserialize_task]") {
TEST_CASE("Task Serialization", "[serialize_tasks]") { TEST_CASE("Task Serialization", "[serialize_tasks]") {
SECTION("Build with no expansion") { SECTION("Build with no expansion") {
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])"; std::string testTasks = R"({"A": {"command": ["/bin/echo", "A"], "children": ["C"]}, "B": {"command": ["/bin/echo", "B"], "children": ["C"]}, "C": {"command": ["/bin/echo", "C"]}})";
auto tasks = daggy::tasksFromJSON(testTasks); auto tasks = daggy::tasksFromJSON(testTasks);
std::unordered_map<std::string, size_t> taskMap;
for (size_t i = 0; i < tasks.size(); ++i) {
taskMap[tasks[i].name] = i;
}
auto genJSON = daggy::tasksToJSON(tasks); auto genJSON = daggy::tasksToJSON(tasks);
auto regenTasks = daggy::tasksFromJSON(genJSON); auto regenTasks = daggy::tasksFromJSON(genJSON);
REQUIRE(regenTasks.size() == tasks.size()); REQUIRE(regenTasks.size() == tasks.size());
for (const auto &task : regenTasks) { for (const auto &[name, task]: regenTasks) {
const auto &other = tasks[taskMap[task.name]]; const auto &other = tasks[name];
REQUIRE(task == other); REQUIRE(task == other);
} }
} }

View File

@@ -74,16 +74,12 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
std::string dagRun = R"({ std::string dagRun = R"({
"name": "unit_server", "name": "unit_server",
"taskParameters": { "FILE": [ "A", "B" ] }, "taskParameters": { "FILE": [ "A", "B" ] },
"tasks": [ "tasks": {
{ "name": "touch", "touch": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ] },
"command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ] "cat": { "command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ],
},
{
"name": "cat",
"command": [ "/usr/bin/cat", "dagrun_A", "dagrun_B" ],
"parents": [ "touch" ] "parents": [ "touch" ]
} }
] }
})"; })";
@@ -160,7 +156,7 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
REQUIRE(complete); REQUIRE(complete);
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
for (const auto &pth : std::vector<fs::path>{"dagrun_A", "dagrun_B"}) { for (const auto &pth: std::vector<fs::path>{"dagrun_A", "dagrun_B"}) {
REQUIRE(fs::exists(pth)); REQUIRE(fs::exists(pth));
fs::remove(pth); fs::remove(pth);
} }

View File

@@ -62,20 +62,20 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
SECTION("Simple execution") { SECTION("Simple execution") {
std::string prefix = "asdlk_"; std::string prefix = "asdlk_";
std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")" std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")" + prefix + R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")" + prefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")"
+ prefix + R"(C"]}])"; + prefix + R"(C"]}})";
auto tasks = daggy::tasksFromJSON(taskJSON); auto tasks = daggy::tasksFromJSON(taskJSON);
auto dag = daggy::buildDAGFromTasks(tasks); auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks); auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, tasks, ex, logger, dag); auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited()); REQUIRE(endDAG.allVisited());
std::vector<std::string> letters{"A", "B", "C"}; std::vector<std::string> letters{"A", "B", "C"};
for (const auto &letter : letters) { for (const auto &letter: letters) {
fs::path file{prefix + letter}; fs::path file{prefix + letter};
REQUIRE(fs::exists(file)); REQUIRE(fs::exists(file));
fs::remove(file); fs::remove(file);
@@ -83,7 +83,7 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
// Get the DAG Run Attempts // Get the DAG Run Attempts
auto record = logger.getDAGRun(runID); auto record = logger.getDAGRun(runID);
for (const auto &attempts : record.taskAttempts) { for (const auto &[_, attempts]: record.taskAttempts) {
REQUIRE(attempts.size() == 1); REQUIRE(attempts.size() == 1);
REQUIRE(attempts.front().rc == 0); REQUIRE(attempts.front().rc == 0);
} }
@@ -93,45 +93,80 @@ TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
auto cleanup = []() { auto cleanup = []() {
// Cleanup // Cleanup
std::vector<fs::path> paths{"rec_error_A", "noexist"}; std::vector<fs::path> paths{"rec_error_A", "noexist"};
for (const auto &pth : paths) { for (const auto &pth: paths) {
if (fs::exists(pth)) fs::remove_all(pth); if (fs::exists(pth)) fs::remove_all(pth);
} }
}; };
cleanup(); cleanup();
// daggy::loggers::dag_run::OStreamLogger logger(std::cout);
std::string goodPrefix = "rec_error_"; std::string goodPrefix = "rec_error_";
std::string badPrefix = "noexist/rec_error_"; std::string badPrefix = "noexist/rec_error_";
std::string taskJSON = R"([{"name": "A", "command": ["/usr/bin/touch", ")" std::string taskJSON = R"({"A": {"command": ["/usr/bin/touch", ")"
+ goodPrefix + + goodPrefix +
R"(A"], "children": ["C"]}, {"name": "B", "command": ["/usr/bin/touch", ")" R"(A"], "children": ["C"]}, "B": {"command": ["/usr/bin/touch", ")"
+ badPrefix + R"(B"], "children": ["C"]}, {"name": "C", "command": ["/usr/bin/touch", ")" + badPrefix + R"(B"], "children": ["C"]}, "C": {"command": ["/usr/bin/touch", ")"
+ badPrefix + R"(C"]}])"; + badPrefix + R"(C"]}})";
auto tasks = daggy::tasksFromJSON(taskJSON); auto tasks = daggy::tasksFromJSON(taskJSON);
auto dag = daggy::buildDAGFromTasks(tasks); auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks); auto runID = logger.startDAGRun("test_run", tasks);
auto tryDAG = daggy::runDAG(runID, tasks, ex, logger, dag); auto tryDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(!tryDAG.allVisited()); REQUIRE(!tryDAG.allVisited());
// Create the missing dir, then continue to run the DAG // Create the missing dir, then continue to run the DAG
fs::create_directory("noexist"); fs::create_directory("noexist");
tryDAG.resetRunning(); tryDAG.resetRunning();
auto endDAG = daggy::runDAG(runID, tasks, ex, logger, tryDAG); auto endDAG = daggy::runDAG(runID, ex, logger, tryDAG);
REQUIRE(endDAG.allVisited()); REQUIRE(endDAG.allVisited());
// Get the DAG Run Attempts // Get the DAG Run Attempts
auto record = logger.getDAGRun(runID); auto record = logger.getDAGRun(runID);
REQUIRE(record.taskAttempts[0].size() == 1); // A ran fine REQUIRE(record.taskAttempts["A"].size() == 1); // A ran fine
REQUIRE(record.taskAttempts[1].size() == 2); // B errored and had to be retried REQUIRE(record.taskAttempts["B"].size() == 2); // B errored and had to be retried
REQUIRE(record.taskAttempts[2].size() == 1); // C wasn't run because B errored REQUIRE(record.taskAttempts["C"].size() == 1); // C wasn't run because B errored
cleanup(); cleanup();
} }
SECTION("Generator tasks") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
auto params = daggy::parametersFromJSON(testParams);
std::string generatorOutput = R"({"B": {"command": ["/usr/bin/echo", "{{DATE}}"], "children": ["C"]}})";
std::stringstream jsonTasks;
jsonTasks << R"({ "A": { "command": [ "/usr/bin/echo", )" << std::quoted(generatorOutput)
<< R"(], "children": ["C"], "isGenerator": true},)"
<< R"("C": { "command": [ "/usr/bin/echo", "hello!"] } })";
auto tasks = daggy::tasksFromJSON(jsonTasks.str());
auto dag = daggy::buildDAGFromTasks(tasks);
REQUIRE(dag.size() == 2);
auto runID = logger.startDAGRun("generator_run", tasks);
auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params);
REQUIRE(finalDAG.size() == 4);
// Check the logger
auto record = logger.getDAGRun(runID);
REQUIRE(record.tasks.size() == 4);
REQUIRE(record.taskRunStates.size() == 4);
for (const auto & [taskName, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.back().rc == 0);
}
// Ensure that children were updated properly
REQUIRE(record.tasks["A"].children == std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.tasks["B_0"].children == std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["B_1"].children == std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["C"].children.empty());
}
} }