- Adding StdOutLogger and adding tests for DAG execution to verify it works.

- Roughing in FileSystemLogger
- Deleting Scheduler code and associated unit tests as being too complicated for maintenance.
- Refactoring namespaces for loggers and executors.
This commit is contained in:
Ian Roddis
2021-08-09 14:59:54 -03:00
parent a8e85f8feb
commit 28c5b3eea3
3 changed files with 0 additions and 194 deletions

View File

@@ -1,57 +0,0 @@
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <variant>
#include "DAG.hpp"
#include "TaskExecutor.hpp"
#include "DAGRun.hpp"
#include "ThreadPool.hpp"
namespace daggy {
enum class DAGState : uint32_t {
UNKNOWN = 0,
QUEUED,
RUNNING,
ERRORED,
COMPLETE
};
class Scheduler {
public:
public:
Scheduler(
TaskExecutor &executor, size_t executorThreads = 30, size_t schedulerThreads = 10);
~Scheduler();
// returns DagRun ID
std::future<void>
scheduleDAG(std::string runName, std::vector<Task> tasks,
std::unordered_map<std::string, ParameterValue> parameters,
DAG dag = {} // Allows for loading of an existing DAG
);
// get the current DAG
DAG dagRunState();
// Complete running DAGs and shutdown
void drain();
private:
void runDAG(const std::string &name, DAGRun &dagRun);
std::vector<AttemptRecord> runTask(const Task &task);
std::unordered_map<std::string, DAGRun> runs_;
std::vector<std::future<void>> futs_;
TaskExecutor &executor_;
ThreadPool schedulers_;
ThreadPool executors_;
std::unordered_map<std::string, std::future<void>> jobs;
std::mutex mtx_;
std::condition_variable cv_;
};
}

View File

@@ -1,112 +0,0 @@
#include <daggy/Scheduler.hpp>
using namespace std::chrono_literals;
namespace daggy {
Scheduler::Scheduler(TaskExecutor &executor, size_t executorThreads, size_t schedulerThreads)
: executor_(executor), schedulers_(schedulerThreads), executors_(executorThreads) {}
Scheduler::~Scheduler() {
executors_.shutdown();
schedulers_.shutdown();
}
std::future<void>
Scheduler::scheduleDAG(std::string runName, std::vector<Task> tasks,
std::unordered_map<std::string, ParameterValue> parameters, DAG dag
) {
// Initialize the dag if one wasn't provided
if (dag.empty()) {
std::unordered_map<std::string, size_t> taskIDs;
// Add all the vertices
for (const auto &task : tasks) {
taskIDs[task.name] = dag.addVertex();
}
// Add edges
for (size_t i = 0; i < tasks.size(); ++i) {
for (const auto &c : tasks[i].children) {
dag.addEdge(i, taskIDs[c]);
}
}
dag.reset();
}
// Create the DAGRun
std::lock_guard<std::mutex> guard(mtx_);
auto &dr = runs_[runName];
dr.tasks = tasks;
dr.parameters = std::move(parameters);
dr.dag = dag;
dr.taskRuns = std::vector<TaskRun>{tasks.size()};
// return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); }));
return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); }));
}
void Scheduler::runDAG(const std::string &name, DAGRun &run) {
struct TaskState {
size_t tid;
std::future<std::vector<AttemptRecord>> fut;
bool complete;
};
std::vector<TaskState> tasks;
while (!run.dag.allVisited()) {
// Check for any completed tasks
for (auto &task : tasks) {
if (task.complete) continue;
if (task.fut.valid()) {
auto ars = task.fut.get();
if (ars.back().rc == 0) {
run.dag.completeVisit(task.tid);
}
task.complete = true;
}
}
// Add all remaining tasks in a task queue to avoid dominating the thread pool
auto tq = std::make_shared<TaskQueue>();
auto t = run.dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
TaskState tsk{.tid = t.value(), .fut = tq->addTask(
[&]() { return runTask(run.tasks[t.value()]); }), .complete = false
};
tasks.push_back(std::move(tsk));
//
auto nt = run.dag.visitNext();
if (not nt.has_value()) break;
t.emplace(nt.value());
}
if (! tq->empty()) {
executors_.addTasks(tq);
}
std::this_thread::sleep_for(250ms);
}
}
std::vector<AttemptRecord>
Scheduler::runTask(const Task &task) {
std::vector<AttemptRecord> attempts;
while (attempts.size() < task.maxRetries) {
attempts.push_back(executor_.runCommand(task.command));
if (attempts.back().rc == 0) break;
}
return attempts;
}
void Scheduler::drain() {
schedulers_.drain();
}
}