Checkpointing work

This commit is contained in:
Ian Roddis
2021-06-20 12:05:13 -03:00
parent 60cab3d5e0
commit b7b8d5b6a1
2 changed files with 113 additions and 5 deletions

View File

@@ -7,24 +7,68 @@
#include "DAG.hpp"
#include "Executor.hpp"
#include "ThreadPool.hpp"
namespace daggy {
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
using TaskRun = std::vector<AttemptRecord>;
class Scheduler {
public:
enum class DAGState : uint32_t {
UNKNOWN = 0,
QUEUED,
RUNNING,
ERRORED,
COMPLETE
};
public:
Scheduler(size_t schedulerThreads = 10);
// Register an executor
void registerExecutor(std::shared_ptr<Executor> executor
, size_t maxParallelTasks
);
void runTasks(std::vector<Task> tasks
// returns DagRun ID
void scheduleDAG(std::string runName
, std::vector<Task> tasks
, std::unordered_map<std::string, ParameterValue> parameters
, DAG dag = {} // Allows for loading of an existing DAG
);
// get the current status of a DAG
DAGState dagRunStatus(std::string runName);
// get the current DAG
DAG dagRunState();
private:
std::unordered_map<std::string, std::shared_ptr<Executor>> executors_;
std::unordered_map<std::string, std::vector<std::future<AttemptRecord>>> jobs_;
std::unordered_map<std::string, ParameterValue> parameters_;
struct ExecutionPool {
std::shared_ptr<Executor> executor;
ThreadPool workers;
// taskid -> result
std::unordered_map<std::string, std::future<void>> jobs;
};
struct DAGRun {
std::vector<Task> tasks;
std::unordered_map<std::string, ParameterValue> parameters;
DAG dag;
std::vector<TaskRun> taskRuns;
std::mutex taskGuard_;
};
void runDAG(DAGRun & dagRun);
std::unordered_map<std::string, ExecutionPool> executorPools_;
std::unordered_map<std::string, DAGRun> runs_;
ThreadPool schedulers_;
std::mutex mtx_;
std::condition_variable cv_;
};
}

View File

@@ -1,7 +1,71 @@
#include <daggy/Scheduler.hpp>
namespace daggy {
Scheduler::Scheduler(size_t schedulerThreads = 10)
: schedulers_(schedulerThreads)
{ }
void Scheduler::registerExecutor(std::shared_ptr<Executor> executor, size_t maxParallelTasks) {
executors_.emplace(executor->getName(), executor);
executorPools_.emplace(executor->getName()
, ExecutionPool{
.executor = executor
, .workers = ThreadPool{maxParallelTasks}
, .jobs = {}
});
}
void Scheduler::scheduleDAG(std::string runName
, std::vector<Task> tasks
, std::unordered_map<std::string, ParameterValue> parameters
, DAG dag
)
{
// Initialize the dag
if (dag.empty()) {
std::unordered_map<std::string, size_t> tids;
// Add all the vertices
for (size_t i = 0; i < tasks.size(); ++i) {
tids[tasks[i].name] = dag.addVertex();
}
// Add edges
for (size_t i = 0; i < tasks.size(); ++i) {
for (const auto & c : tasks[i].children) {
dag.addEdge(i, tids[c]);
}
}
dag.reset();
}
// Create the DAGRun
DAGRun run{
.tasks = tasks
, .parameters = parameters
, .dag = dag
, .taskRuns = TaskRun(tasks.size())
};
{
std::lock_guard<std::mutex> guard(mtx_);
runs_.emplace(runName, std::move(run));
auto & dr = runs_[runName];
schedulers_.addTask([&]() { runDAG(dr); });
}
}
void Scheduler::runDAG(DAGRun & run)
{
using namespace std::chrono_literals;
while (! run.dag.allVisited()) {
// Check for any completed tasks
auto t = run.dag.visitNext();
if (! t.has_value()) {
std::this_thread::sleep_for(250ms);
continue;
}
}
}
}