- Adding TODO list to keep track of things.

- Adding .idea to gitignore
- Adding DAG execution to utilities, redefining the logger
This commit is contained in:
Ian Roddis
2021-08-05 15:24:17 -03:00
parent 745c950280
commit 1849a2fee4
11 changed files with 154 additions and 62 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
build
.cache
cmake-build-*
.idea

View File

@@ -1,4 +1,5 @@
- Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger
- Separate concerns for DAG logger vs DAG definition storage
- Add in authorization scheme (maybe JWT?)
- Flesh out server and interface
- Flesh out server and interface
- Add ability to define child -> parent relationships

View File

@@ -0,0 +1,33 @@
#pragma once
#include <string>
#include "DAGRun.hpp"
/*
MetaStore represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to
be supported.
*/
namespace daggy {
using DAGDefID = int16_t;
using DAGRunID = size_t;
enum class DAGRunState : uint32_t {
QUEUED = 0,
RUNNING,
ERRORED,
KILLED,
COMPLETED
};
class DAGLogger {
public:
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0;
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, DAGRunState state) = 0;
};
}

View File

@@ -1,42 +0,0 @@
#pragma once
#include <string>
#include "DAGRun.hpp"
/*
MetaStore represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to
be supported.
*/
namespace daggy {
using DAGDefID = int16_t;
using DAGRunID = size_t;
class MetaStore {
// Basic storage + retrieval of DAG Definitions
virtual DAGDefID storeDAGDefinition(std::string name, std::string definition) = 0;
virtual DAGDefID getCurrentDAGVersion(std::string name) = 0;
virtual std::string getDAGDefinition(std::string name, DAGDefID version = -1) = 0;
// DAG Run State
/*
* startDAGRun // DAG starts up, returns a DAGID for future updates
* updateDAGRun // DAG State transitions
* updateTaskState // Task state updates
*/
virtual DAGRunID startDAGRun(std::string dagName, DAGDefID version, DAGRun dagRun
) = 0;
virtual void updateTask(DAGRunID rid, std::string taskName, VertexState state) = 0;
virtual void updateDAGRun(DAGRunID rid, DAGState state) = 0;
// Retrievals
virtual DAGRun & getDAGRun(DAGRunID) = 0;
};
}

View File

@@ -8,6 +8,7 @@
#include "Task.hpp"
#include "AttemptRecord.hpp"
#include "ThreadPool.hpp"
/*
Executors run Tasks, returning a future with the results.
@@ -17,11 +18,13 @@
namespace daggy {
class TaskExecutor {
public:
TaskExecutor() = default;
TaskExecutor(size_t nThreads) : threadPool(nThreads) {};
virtual const std::string getName() const = 0;
// This will block if the executor is full
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
ThreadPool threadPool;
};
}

View File

@@ -7,7 +7,10 @@
#include <rapidjson/document.h>
#include "Logger.hpp"
#include "TaskExecutor.hpp"
#include "Task.hpp"
#include "ThreadPool.hpp"
namespace rj = rapidjson;
@@ -16,10 +19,20 @@ namespace daggy {
using ParameterValues = std::unordered_map<std::string, ParameterValue>;
using Command = std::vector<std::string>;
// Dealing with JSON
ParameterValues parseParameters(const std::string & jsonSpec);
ParameterValues parseParameters(const rj::Document & spec);
std::vector<Task> buildTasks(const std::string & jsonSpec, const ParameterValues & parameters = {});
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters = {});
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
// DAG execution
// DAG vertex IDs should correspond to the position of tasks in vector. e.g. Vertex ID 0 corresponds to tasks[0]
// I'm not crazy about this loose coupling, but
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
TaskExecutor & executor,
DAGLogger & logger,
DAG dag);
}

View File

@@ -7,7 +7,9 @@ namespace daggy {
namespace executor {
class ForkingTaskExecutor : public TaskExecutor {
public:
ForkingTaskExecutor() = default;
ForkingTaskExecutor(size_t nThreads)
: TaskExecutor(nThreads)
{}
const std::string getName() const override { return "ForkingTaskExecutor"; }

View File

@@ -71,6 +71,9 @@ namespace daggy {
const std::vector<std::string> reqFields{"name", "command"};
std::unordered_map<std::string, std::vector<std::string>> childrenMap;
// Maps child -> parent
std::unordered_map<std::string, std::vector<std::string>> parentMap;
std::unordered_map<std::string, size_t> taskIndex;
// Tasks
for (size_t i = 0; i < spec.Size(); ++i) {
@@ -79,7 +82,7 @@ namespace daggy {
}
const auto & taskSpec = spec[i].GetObject();
for (const auto reqField : reqFields) {
for (const auto & reqField : reqFields) {
if (! taskSpec.HasMember(reqField.c_str())) {
throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField);
}
@@ -87,10 +90,14 @@ namespace daggy {
// Grab the standard fields with defaults;
std::string name = taskSpec["name"].GetString();
taskIndex[name] = i;
uint8_t maxRetries = 0;
if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0;
if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
// Children / parents
std::vector<std::string> children;
if (taskSpec.HasMember("children")) {
const auto & specChildren = taskSpec["children"].GetArray();
@@ -98,17 +105,22 @@ namespace daggy {
children.emplace_back(specChildren[c].GetString());
}
}
if (taskSpec.HasMember("parents")) {
const auto & specParents = taskSpec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
parentMap[name].emplace_back(specParents[c].GetString());
}
}
// Build out the commands
std::vector<std::string> command;
for (size_t cmd = 0; cmd < taskSpec["command"].Size(); ++cmd) {
command.push_back(taskSpec["command"][cmd].GetString());
command.emplace_back(taskSpec["command"][cmd].GetString());
}
auto commands = expandCommands(command, parameters);
// Create the tasks
auto & taskNames = childrenMap[name];
size_t tid = 0;
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = name + "_" + std::to_string(tid);
taskNames.push_back(taskName);
@@ -122,6 +134,16 @@ namespace daggy {
}
}
// Update any missing child -> parent relationship
for (auto & task : tasks) {
auto pit = parentMap.find(task.name);
if (pit == parentMap.end()) { continue; }
for (const auto & parent : pit->second) {
tasks[taskIndex[parent]].children.emplace_back(task.name);
}
}
// At the end, replace the names of the children with all the expanded versions
for (auto & task : tasks) {
std::vector<std::string> children;
@@ -143,4 +165,66 @@ namespace daggy {
}
return buildTasks(doc, parameters);
}
}
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
TaskExecutor & executor,
DAGLogger & logger,
DAG dag) {
struct TaskState {
size_t tid;
std::future<std::vector<AttemptRecord>> fut;
bool complete;
};
std::vector<TaskState> taskStates;
while (!dag.allVisited()) {
// Check for any completed tasks
for (auto &taskState : taskStates) {
if (taskState.complete) continue;
if (taskState.fut.valid()) {
auto attemptRecords = taskState.fut.get();
if (attemptRecords.back().rc == 0) {
dag.completeVisit(taskState.tid);
}
taskState.complete = true;
}
}
// Add all remaining tasks in a task queue to avoid dominating the thread pool
auto tq = std::make_shared<TaskQueue>();
auto t = dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
auto tid = t.value();
TaskState tsk{
.tid = tid,
.fut = tq->addTask(
[tid, &tasks, &executor]() {
std::vector<AttemptRecord> attempts;
while (attempts.size() < tasks[tid].maxRetries) {
attempts.push_back(executor.runCommand(tasks[tid].command));
if (attempts.back().rc == 0) break;
}
return attempts;
})
, .complete = false
};
taskStates.push_back(std::move(tsk));
//
auto nextTask = dag.visitNext();
if (not nextTask.has_value()) break;
t.emplace(nextTask.value());
}
if (! tq->empty()) {
executor.threadPool.addTasks(tq);
}
std::this_thread::sleep_for(250ms);
}
}
}

View File

@@ -6,7 +6,7 @@
#include "catch.hpp"
TEST_CASE("Basic Execution", "[forking_executor]") {
daggy::executor::ForkingTaskExecutor ex;
daggy::executor::ForkingTaskExecutor ex(10);
SECTION("Simple Run") {
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};

View File

@@ -7,7 +7,7 @@
#include "catch.hpp"
TEST_CASE("Basic Scheduler Execution", "[scheduler]") {
daggy::executor::ForkingTaskExecutor ex;
daggy::executor::ForkingTaskExecutor ex(10);
daggy::Scheduler sched(ex);
std::vector<daggy::Task> tasks {

View File

@@ -63,17 +63,14 @@ TEST_CASE("Building Tasks", "[utilities_build_tasks]") {
auto params = daggy::parseParameters(testParams);
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["B"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
auto tasks = daggy::buildTasks(testTasks, params);
/*
for (const auto & task : tasks) {
std::cout << task.name << ": ";
for (const auto & part : task.children) {
std::cout << part << " ";
}
std::cout << std::endl;
}
*/
REQUIRE(tasks.size() == 4);
}
SECTION("Build with expansion using parents instead of children") {
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name"})"};
auto params = daggy::parseParameters(testParams);
std::string testTasks = R"([{"name": "A", "command": ["/bin/echo", "A"]}, {"name": "B", "command": ["/bin/echo", "B", "{{SOURCE}}", "{{DATE}}"], "parents": ["A"]},{"name": "C", "command": ["/bin/echo", "C"], "parents": ["A"]}])";
auto tasks = daggy::buildTasks(testTasks, params);
REQUIRE(tasks.size() == 4);
}
}