From 28c5b3eea3b162b1e32b662caa0256b9dee5293d Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 9 Aug 2021 14:59:54 -0300 Subject: [PATCH] - Adding StdOutLogger and adding tests for DAG execution to verify it works. - Roughing in FileSystemLogger - Deleting Scheduler code and associated unit tests as being too complicated for maintenance. - Refactoring namespaces for loggers and executors. --- daggy/include/daggy/Scheduler.hpp | 57 --------------- daggy/src/Scheduler.cpp | 112 ------------------------------ tests/unit_scheduler.cpp | 25 ------- 3 files changed, 194 deletions(-) delete mode 100644 daggy/include/daggy/Scheduler.hpp delete mode 100644 daggy/src/Scheduler.cpp delete mode 100644 tests/unit_scheduler.cpp diff --git a/daggy/include/daggy/Scheduler.hpp b/daggy/include/daggy/Scheduler.hpp deleted file mode 100644 index a3eff61..0000000 --- a/daggy/include/daggy/Scheduler.hpp +++ /dev/null @@ -1,57 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "DAG.hpp" -#include "TaskExecutor.hpp" -#include "DAGRun.hpp" -#include "ThreadPool.hpp" - -namespace daggy { - enum class DAGState : uint32_t { - UNKNOWN = 0, - QUEUED, - RUNNING, - ERRORED, - COMPLETE - }; - - class Scheduler { - public: - public: - Scheduler( - TaskExecutor &executor, size_t executorThreads = 30, size_t schedulerThreads = 10); - - ~Scheduler(); - - // returns DagRun ID - std::future - scheduleDAG(std::string runName, std::vector tasks, - std::unordered_map parameters, - DAG dag = {} // Allows for loading of an existing DAG - ); - - // get the current DAG - DAG dagRunState(); - - // Complete running DAGs and shutdown - void drain(); - - private: - void runDAG(const std::string &name, DAGRun &dagRun); - - std::vector runTask(const Task &task); - - std::unordered_map runs_; - std::vector> futs_; - TaskExecutor &executor_; - ThreadPool schedulers_; - ThreadPool executors_; - std::unordered_map> jobs; - std::mutex mtx_; - std::condition_variable cv_; - }; -} diff --git a/daggy/src/Scheduler.cpp b/daggy/src/Scheduler.cpp deleted file mode 100644 index 75a5139..0000000 --- a/daggy/src/Scheduler.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include - -using namespace std::chrono_literals; - -namespace daggy { - Scheduler::Scheduler(TaskExecutor &executor, size_t executorThreads, size_t schedulerThreads) - : executor_(executor), schedulers_(schedulerThreads), executors_(executorThreads) {} - - - Scheduler::~Scheduler() { - executors_.shutdown(); - schedulers_.shutdown(); - } - - std::future - Scheduler::scheduleDAG(std::string runName, std::vector tasks, - std::unordered_map parameters, DAG dag - ) { - // Initialize the dag if one wasn't provided - if (dag.empty()) { - std::unordered_map taskIDs; - - // Add all the vertices - for (const auto &task : tasks) { - taskIDs[task.name] = dag.addVertex(); - } - - // Add edges - for (size_t i = 0; i < tasks.size(); ++i) { - for (const auto &c : tasks[i].children) { - dag.addEdge(i, taskIDs[c]); - } - } - dag.reset(); - } - - // Create the DAGRun - std::lock_guard guard(mtx_); - auto &dr = runs_[runName]; - - dr.tasks = tasks; - dr.parameters = std::move(parameters); - dr.dag = dag; - dr.taskRuns = std::vector{tasks.size()}; - - // return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); - return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); - } - - void Scheduler::runDAG(const std::string &name, DAGRun &run) { - struct TaskState { - size_t tid; - std::future> fut; - bool complete; - }; - - std::vector tasks; - - while (!run.dag.allVisited()) { - - // Check for any completed tasks - for (auto &task : tasks) { - if (task.complete) continue; - - if (task.fut.valid()) { - auto ars = task.fut.get(); - if (ars.back().rc == 0) { - run.dag.completeVisit(task.tid); - } - task.complete = true; - } - } - - // Add all remaining tasks in a task queue to avoid dominating the thread pool - auto tq = std::make_shared(); - auto t = run.dag.visitNext(); - while (t.has_value()) { - // Schedule the task to run - TaskState tsk{.tid = t.value(), .fut = tq->addTask( - [&]() { return runTask(run.tasks[t.value()]); }), .complete = false - }; - tasks.push_back(std::move(tsk)); - - // - auto nt = run.dag.visitNext(); - if (not nt.has_value()) break; - t.emplace(nt.value()); - } - if (! tq->empty()) { - executors_.addTasks(tq); - } - - std::this_thread::sleep_for(250ms); - } - } - - std::vector - Scheduler::runTask(const Task &task) { - std::vector attempts; - - while (attempts.size() < task.maxRetries) { - attempts.push_back(executor_.runCommand(task.command)); - if (attempts.back().rc == 0) break; - } - - return attempts; - } - - void Scheduler::drain() { - schedulers_.drain(); - } -} diff --git a/tests/unit_scheduler.cpp b/tests/unit_scheduler.cpp deleted file mode 100644 index cbc582f..0000000 --- a/tests/unit_scheduler.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include -#include - -#include "daggy/executors/ForkingTaskExecutor.hpp" -#include "daggy/Scheduler.hpp" - -#include "catch.hpp" - -TEST_CASE("Basic Scheduler Execution", "[scheduler]") { - daggy::executor::ForkingTaskExecutor ex(10); - daggy::Scheduler sched(ex); - - std::vector tasks { - daggy::Task{ "task_a", { "/usr/bin/echo", "task_a"}, 3, 30, { "task_c"} } - , daggy::Task{ "task_b", { "/usr/bin/echo", "task_b"}, 3, 30, { "task_c" } } - , daggy::Task{ "task_c", { "/usr/bin/echo", "task_c"}, 3, 30, {} } - }; - - SECTION("Simple Run") { - auto fut_a = sched.scheduleDAG("Simple 1", tasks, {}); - auto fut_b = sched.scheduleDAG("Simple 2", tasks, {}); - fut_a.get(); - fut_b.get(); - } -} \ No newline at end of file