From 468993edb57989beb576730c651e28940a0c9aee Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 5 Jul 2021 11:57:38 -0300 Subject: [PATCH] Checkpointing work --- daggy/include/daggy/Executor.hpp | 1 + daggy/include/daggy/Scheduler.hpp | 31 +++--- daggy/include/daggy/Server.hpp | 2 +- daggy/include/daggy/Task.hpp | 2 +- daggy/include/daggy/ThreadPool.hpp | 3 + .../daggy/executors/ForkingExecutor.hpp | 3 +- daggy/src/DAG.cpp | 13 ++- daggy/src/Scheduler.cpp | 99 ++++++++++++++----- daggy/src/ThreadPool.cpp | 7 +- daggy/src/executors/ForkingExecutor.cpp | 1 - tests/unit_dag.cpp | 14 +++ 11 files changed, 125 insertions(+), 51 deletions(-) diff --git a/daggy/include/daggy/Executor.hpp b/daggy/include/daggy/Executor.hpp index 9b22f96..97dd95d 100644 --- a/daggy/include/daggy/Executor.hpp +++ b/daggy/include/daggy/Executor.hpp @@ -28,6 +28,7 @@ namespace daggy { class Executor { public: + Executor() = default; virtual const std::string getName() const = 0; // This will block if the executor is full diff --git a/daggy/include/daggy/Scheduler.hpp b/daggy/include/daggy/Scheduler.hpp index 5be9bc7..83f7cc4 100644 --- a/daggy/include/daggy/Scheduler.hpp +++ b/daggy/include/daggy/Scheduler.hpp @@ -24,12 +24,12 @@ namespace daggy { }; public: - Scheduler(size_t schedulerThreads = 10); + Scheduler( + Executor & executor + , size_t executorThreads = 30 + , size_t schedulerThreads = 10); - // Register an executor - void registerExecutor(std::shared_ptr executor - , size_t maxParallelTasks - ); + ~Scheduler(); // returns DagRun ID void scheduleDAG(std::string runName @@ -44,17 +44,11 @@ namespace daggy { // get the current DAG DAG dagRunState(); + // Complete running DAGs and shutdown + void drain(); + private: - - struct ExecutionPool { - std::shared_ptr executor; - ThreadPool workers; - - // taskid -> result - std::unordered_map> jobs; - }; - struct DAGRun { std::vector tasks; std::unordered_map parameters; @@ -63,11 +57,14 @@ namespace daggy { std::mutex taskGuard_; }; - void runDAG(DAGRun & dagRun); + void runDAG(const std::string & name, DAGRun & dagRun); + std::vector runTask(const Task & task); - std::unordered_map executorPools_; std::unordered_map runs_; - ThreadPool schedulers_; + Executor & executor_; + ThreadPool schedulers_; + ThreadPool executorWorkers_; + std::unordered_map> jobs; std::mutex mtx_; std::condition_variable cv_; }; diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index 7dfeaf7..2457720 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -4,7 +4,7 @@ #include #include -#include +// #include namespace daggy { class Server { diff --git a/daggy/include/daggy/Task.hpp b/daggy/include/daggy/Task.hpp index d7d0aa3..bf26470 100644 --- a/daggy/include/daggy/Task.hpp +++ b/daggy/include/daggy/Task.hpp @@ -7,7 +7,7 @@ namespace daggy { struct Task { std::string name; - std::string command; + std::vector command; uint8_t max_retries; uint32_t retry_interval_seconds; // Time to wait between retries std::vector children; diff --git a/daggy/include/daggy/ThreadPool.hpp b/daggy/include/daggy/ThreadPool.hpp index 6ed7a30..c24fced 100644 --- a/daggy/include/daggy/ThreadPool.hpp +++ b/daggy/include/daggy/ThreadPool.hpp @@ -29,6 +29,9 @@ namespace daggy { void shutdown(); std::future addTask(AsyncTask fn); + + size_t queueSize(); + private: using QueuedAsyncTask = std::shared_ptr>; diff --git a/daggy/include/daggy/executors/ForkingExecutor.hpp b/daggy/include/daggy/executors/ForkingExecutor.hpp index de10d34..c7a1b4a 100644 --- a/daggy/include/daggy/executors/ForkingExecutor.hpp +++ b/daggy/include/daggy/executors/ForkingExecutor.hpp @@ -5,8 +5,9 @@ namespace daggy { namespace executor { - class ForkingExecutor : Executor { + class ForkingExecutor : public Executor { public: + ForkingExecutor() = default; const std::string getName() const override { return "ForkingExecutor"; } AttemptRecord runCommand(std::vector cmd) override; diff --git a/daggy/src/DAG.cpp b/daggy/src/DAG.cpp index d560ccd..56f4d0e 100644 --- a/daggy/src/DAG.cpp +++ b/daggy/src/DAG.cpp @@ -1,4 +1,5 @@ #include +#include namespace daggy { size_t DAG::size() const { return vertices_.size(); } @@ -6,22 +7,26 @@ namespace daggy { size_t DAG::addVertex() { vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0}); - return vertices_.size(); + return vertices_.size() - 1; } void DAG::dropEdge(const size_t from, const size_t to) { + if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); + if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); vertices_[from].children.extract(to); } void DAG::addEdge(const size_t from, const size_t to) { + if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); + if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); if (hasPath(to, from)) throw std::runtime_error("Adding edge would result in a cycle"); vertices_[from].children.insert(to); } bool DAG::hasPath(const size_t from, const size_t to) const { - bool pathFound = false; - + if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); + if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); for (const auto & child : vertices_[from].children) { if (child == to) return true; if (hasPath(child, to)) return true; @@ -52,7 +57,7 @@ namespace daggy { return true; } - std::optional DAG::visitNext() { + std::optional DAG::visitNext() { for (size_t i = 0; i < vertices_.size(); ++i) { auto & v = vertices_[i]; diff --git a/daggy/src/Scheduler.cpp b/daggy/src/Scheduler.cpp index 8e01e1b..0260470 100644 --- a/daggy/src/Scheduler.cpp +++ b/daggy/src/Scheduler.cpp @@ -1,17 +1,20 @@ #include +using namespace std::chrono_literals; + namespace daggy { - Scheduler::Scheduler(size_t schedulerThreads = 10) - : schedulers_(schedulerThreads) + Scheduler::Scheduler(Executor & executor + , size_t executorThreads + , size_t schedulerThreads) + : executor_(executor) + , schedulers_(schedulerThreads) + , executorWorkers_(executorThreads) { } - void Scheduler::registerExecutor(std::shared_ptr executor, size_t maxParallelTasks) { - executorPools_.emplace(executor->getName() - , ExecutionPool{ - .executor = executor - , .workers = ThreadPool{maxParallelTasks} - , .jobs = {} - }); + + Scheduler::~Scheduler() { + executorWorkers_.shutdown(); + schedulers_.shutdown(); } void Scheduler::scheduleDAG(std::string runName @@ -39,33 +42,79 @@ namespace daggy { } // Create the DAGRun - DAGRun run{ - .tasks = tasks - , .parameters = parameters - , .dag = dag - , .taskRuns = TaskRun(tasks.size()) - }; - { std::lock_guard guard(mtx_); - runs_.emplace(runName, std::move(run)); auto & dr = runs_[runName]; - schedulers_.addTask([&]() { runDAG(dr); }); + + dr.tasks = tasks; + dr.parameters = parameters; + dr.dag = dag; + dr.taskRuns = std::vector{tasks.size()}; + + schedulers_.addTask([&]() { runDAG(runName, dr); }); } } - void Scheduler::runDAG(DAGRun & run) + void Scheduler::runDAG(const std::string & name, DAGRun & run) { - using namespace std::chrono_literals; + std::unordered_map>> tasks; + std::cout << "Running dag " << name << std::endl; while (! run.dag.allVisited()) { // Check for any completed tasks - - auto t = run.dag.visitNext(); - if (! t.has_value()) { - std::this_thread::sleep_for(250ms); - continue; + for (auto & [tid, fut] : tasks) { + std::cout << "Checking tid " << tid << std::endl; + if (fut.valid()) { + auto ars = fut.get(); + if (ars.back().rc == 0) { + std::cout << "Completing node " << tid << std::endl; + run.dag.completeVisit(tid); + } + } } + + // Get the next dag to run + auto t = run.dag.visitNext(); + while (t.has_value()) { + std::cout << "Scheduling " << t.value() << std::endl; + // Schedule the task to run + + std::packaged_task()> node([&]() { + return runTask(run.tasks[t.value()]); + }); + + tasks.emplace(t.value(), node.get_future()); + node(); + + // + auto nt = run.dag.visitNext(); + if (not nt.has_value()) break; + t.emplace(nt.value()); + } + + std::cout << "sleeping" << std::endl; + std::this_thread::sleep_for(250ms); + } + } + + std::vector + Scheduler::runTask(const Task & task) { + std::vector attempts; + + while (attempts.size() < task.max_retries) { + auto fut = executorWorkers_.addTask([&]() { + attempts.push_back(executor_.runCommand(task.command)); + }); + fut.get(); + if (attempts.back().rc == 0) break; + } + + return attempts; + } + + void Scheduler::drain() { + while (schedulers_.queueSize()) { + std::this_thread::sleep_for(250ms); } } } diff --git a/daggy/src/ThreadPool.cpp b/daggy/src/ThreadPool.cpp index 2b740a0..6168d3d 100644 --- a/daggy/src/ThreadPool.cpp +++ b/daggy/src/ThreadPool.cpp @@ -34,7 +34,7 @@ void ThreadPool::shutdown() { cv_.notify_all(); for (auto & w : workers_) { - w.join(); + if (w.joinable()) w.join(); } } @@ -48,3 +48,8 @@ std::future ThreadPool::addTask(std::function fn) { cv_.notify_one(); return result; } + +size_t ThreadPool::queueSize() { + std::unique_lock lk(guard_); + return taskQueue_.size(); +} diff --git a/daggy/src/executors/ForkingExecutor.cpp b/daggy/src/executors/ForkingExecutor.cpp index b86bef9..8ff3b96 100644 --- a/daggy/src/executors/ForkingExecutor.cpp +++ b/daggy/src/executors/ForkingExecutor.cpp @@ -33,7 +33,6 @@ std::string slurp(int fd) { return result; } - daggy::AttemptRecord ForkingExecutor::runCommand(std::vector cmd) { diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index b0cc3f6..bdf57f0 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -21,6 +21,20 @@ TEST_CASE("DAG Construction Tests", "[dag]") { // Cannot add an edge that would result in a cycle REQUIRE_THROWS(dag.addEdge(9, 5)); + + // Bounds checking + SECTION("addEdge Bounds Checking") { + REQUIRE_THROWS(dag.addEdge(20, 0)); + REQUIRE_THROWS(dag.addEdge(0, 20)); + } + SECTION("dropEdge Bounds Checking") { + REQUIRE_THROWS(dag.dropEdge(20, 0)); + REQUIRE_THROWS(dag.dropEdge(0, 20)); + } + SECTION("hasPath Bounds Checking") { + REQUIRE_THROWS(dag.hasPath(20, 0)); + REQUIRE_THROWS(dag.hasPath(0, 20)); + } } TEST_CASE("DAG Traversal Tests", "[dag]") {