From 0349a5109ba181fd73bd94198a8e9080235dd456 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 22 Jul 2021 12:57:51 -0300 Subject: [PATCH] * Formatting code with clang-tidy * Roughing in more metastore work --- daggy/include/daggy/AttemptRecord.hpp | 17 ++ daggy/include/daggy/DAG.hpp | 70 ++--- daggy/include/daggy/DAGImpl.hpp | 70 ----- daggy/include/daggy/DAGRun.hpp | 21 ++ daggy/include/daggy/Executor.hpp | 26 +- daggy/include/daggy/MetaStore.hpp | 36 ++- daggy/include/daggy/Scheduler.hpp | 74 ++---- daggy/include/daggy/Server.hpp | 55 ++-- daggy/include/daggy/Task.hpp | 14 +- daggy/include/daggy/ThreadPool.hpp | 249 +++++++++--------- .../daggy/executors/ForkingExecutor.hpp | 17 +- daggy/src/DAG.cpp | 121 ++++----- daggy/src/Scheduler.cpp | 180 ++++++------- daggy/src/Server.cpp | 79 +++--- daggy/src/executors/ForkingExecutor.cpp | 123 ++++----- tests/unit_scheduler.cpp | 8 +- 16 files changed, 561 insertions(+), 599 deletions(-) create mode 100644 daggy/include/daggy/AttemptRecord.hpp delete mode 100644 daggy/include/daggy/DAGImpl.hpp create mode 100644 daggy/include/daggy/DAGRun.hpp diff --git a/daggy/include/daggy/AttemptRecord.hpp b/daggy/include/daggy/AttemptRecord.hpp new file mode 100644 index 0000000..e7f5cdb --- /dev/null +++ b/daggy/include/daggy/AttemptRecord.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +namespace daggy { + using Clock = std::chrono::system_clock; + + struct AttemptRecord { + std::chrono::time_point startTime; + std::chrono::time_point stopTime; + int rc; // RC from the task + std::string metaLog; // Logs from the executor + std::string output; // stdout from command + std::string error; // stderr from command + }; +} diff --git a/daggy/include/daggy/DAG.hpp b/daggy/include/daggy/DAG.hpp index 5654d1c..37c2fee 100644 --- a/daggy/include/daggy/DAG.hpp +++ b/daggy/include/daggy/DAG.hpp @@ -16,45 +16,53 @@ namespace daggy { - enum class VertexState : uint32_t { - UNVISITED = 0, - VISITING, - VISITED - }; + enum class VertexState : uint32_t { + UNVISITED = 0, + VISITING, + VISITED + }; - struct Vertex { - VertexState state; - uint32_t depCount; - std::unordered_set children; - }; + struct Vertex { + VertexState state; + uint32_t depCount; + std::unordered_set children; + }; - using Edge = std::pair; + using Edge = std::pair; - class DAG { + class DAG { public: - // Vertices - size_t addVertex(); - const std::vector & getVertices(); + // Vertices + size_t addVertex(); - // Edges - void addEdge(const size_t src, const size_t dst); - void dropEdge(const size_t src, const size_t dst); - bool hasPath(const size_t from, const size_t to) const; - const std::vector & getEdges(); + const std::vector &getVertices(); - // Attributes - size_t size() const; - bool empty() const; + // Edges + void addEdge(const size_t src, const size_t dst); - // Traversal - void reset(); - VertexState getVertexState(const size_t id) const; - bool allVisited() const; + void dropEdge(const size_t src, const size_t dst); - std::optional visitNext(); - void completeVisit(const size_t id); + bool hasPath(const size_t from, const size_t to) const; + + const std::vector &getEdges(); + + // Attributes + size_t size() const; + + bool empty() const; + + // Traversal + void reset(); + + VertexState getVertexState(const size_t id) const; + + bool allVisited() const; + + std::optional visitNext(); + + void completeVisit(const size_t id); private: - std::vector vertices_; - }; + std::vector vertices_; + }; } diff --git a/daggy/include/daggy/DAGImpl.hpp b/daggy/include/daggy/DAGImpl.hpp deleted file mode 100644 index 4627107..0000000 --- a/daggy/include/daggy/DAGImpl.hpp +++ /dev/null @@ -1,70 +0,0 @@ -size_t DAG::size() const { return vertices_.size(); } -bool DAG::empty() const { return vertices_.empty(); } - -size_t DAG::addVertex() { - vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0}); - return vertices_.size(); -} - -void DAG::dropEdge(const size_t from, const size_t to) { - vertices_[from].children.extract(to); -} - -void DAG::addEdge(const size_t from, const size_t to) { - if (hasPath(to, from)) - throw std::runtime_error("Adding edge would result in a cycle"); - vertices_[from].children.insert(to); -} - -bool DAG::hasPath(const size_t from, const size_t to) const { - bool pathFound = false; - - for (const auto & child : vertices_[from].children) { - if (child == to) return true; - if (hasPath(child, to)) return true; - } - - return false; -} - -void DAG::reset() { - // Reset the state of all vertices - for (auto & v : vertices_) { - v.state = VertexState::UNVISITED; - v.depCount = 0; - } - - // Calculate the upstream count - for (auto & v : vertices_) { - for (auto c : v.children) { - ++vertices_[c].depCount; - } - } -} - -bool DAG::allVisited() const { - for (const auto & v : vertices_) { - if (v.state != VertexState::VISITED) return false; - } - return true; -} - -std::optional DAG::visitNext() { - for (size_t i = 0; i < vertices_.size(); ++i) { - auto & v = vertices_[i]; - - if (v.state != VertexState::UNVISITED) continue; - if (v.depCount != 0) continue; - v.state = VertexState::VISITING; - return i; - } - return {}; -} - -void DAG::completeVisit(const size_t id) { - auto & v = vertices_[id]; - v.state = VertexState::VISITED; - for (auto c : v.children) { - --vertices_[c].depCount; - } -} diff --git a/daggy/include/daggy/DAGRun.hpp b/daggy/include/daggy/DAGRun.hpp new file mode 100644 index 0000000..2cb7b8a --- /dev/null +++ b/daggy/include/daggy/DAGRun.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include +#include + +#include "DAG.hpp" +#include "Task.hpp" +#include "AttemptRecord.hpp" + +namespace daggy { + using ParameterValue = std::variant>; + using TaskRun = std::vector; + + struct DAGRun { + std::vector tasks; + std::unordered_map parameters; + DAG dag; + std::vector taskRuns; + }; +} diff --git a/daggy/include/daggy/Executor.hpp b/daggy/include/daggy/Executor.hpp index 97dd95d..53243dc 100644 --- a/daggy/include/daggy/Executor.hpp +++ b/daggy/include/daggy/Executor.hpp @@ -7,7 +7,7 @@ #include #include "Task.hpp" - +#include "AttemptRecord.hpp" /* Executors run Tasks, returning a future with the results. @@ -15,23 +15,13 @@ */ namespace daggy { - using Clock = std::chrono::system_clock; - - struct AttemptRecord { - std::chrono::time_point startTime; - std::chrono::time_point stopTime; - int rc; // RC from the task - std::string metaLog; // Logs from the executor - std::string output; // stdout from command - std::string error; // stderr from command - }; - - class Executor { + class Executor { public: - Executor() = default; - virtual const std::string getName() const = 0; + Executor() = default; - // This will block if the executor is full - virtual AttemptRecord runCommand(std::vector cmd) = 0; - }; + virtual const std::string getName() const = 0; + + // This will block if the executor is full + virtual AttemptRecord runCommand(std::vector cmd) = 0; + }; } diff --git a/daggy/include/daggy/MetaStore.hpp b/daggy/include/daggy/MetaStore.hpp index 9c24307..ee0f5dc 100644 --- a/daggy/include/daggy/MetaStore.hpp +++ b/daggy/include/daggy/MetaStore.hpp @@ -2,6 +2,8 @@ #include +#include "DAGRun.hpp" + /* MetaStore represents the interface to store all the state information for daggy to run. Abstracted in case other back-end solutions need to @@ -9,17 +11,31 @@ */ namespace daggy { - using DAGDefID = int16_t; // future proofing + using DAGDefID = int16_t; + using DAGRunID = size_t; - // This struct will contain transitions for - struct DAGRunEvent { }; + class MetaStore { + // Basic storage + retrieval of DAG Definitions + virtual DAGDefID storeDAGDefinition(std::string name, std::string definition) = 0; - class MetaStore { - // Basic storage + retrieval of DAG Definitions - virtual void storeDAGDefinition(std::string name, std::string definition) = 0; - virtual DAGDefID getCurrentDAGVersion(std::string name) = 0; - virtual std::string getDAGDefinition(std::string name, DAGDefID version = -1) = 0; + virtual DAGDefID getCurrentDAGVersion(std::string name) = 0; - // DAG Run State - }; + virtual std::string getDAGDefinition(std::string name, DAGDefID version = -1) = 0; + + // DAG Run State + + /* + * startDAGRun // DAG starts up, returns a DAGID for future updates + * updateDAGRun // DAG State transitions + * updateTaskState // Task state updates + */ + virtual DAGRunID startDAGRun(std::string dagName, DAGDefID version, DAGRun dagRun + ) = 0; + + virtual void updateTask(DAGRunID rid, std::string taskName, VertexState state) = 0; + + virtual void updateDAGRun(DAGRunID rid, DAGState state) = 0; + + // Retrievals + }; } diff --git a/daggy/include/daggy/Scheduler.hpp b/daggy/include/daggy/Scheduler.hpp index f81f79e..09901be 100644 --- a/daggy/include/daggy/Scheduler.hpp +++ b/daggy/include/daggy/Scheduler.hpp @@ -7,67 +7,51 @@ #include "DAG.hpp" #include "Executor.hpp" +#include "DAGRun.hpp" #include "ThreadPool.hpp" namespace daggy { - using ParameterValue = std::variant>; - using TaskRun = std::vector; - - class Scheduler { - public: - enum class DAGState : uint32_t { + enum class DAGState : uint32_t { UNKNOWN = 0, QUEUED, RUNNING, ERRORED, COMPLETE - }; + }; + class Scheduler { public: - Scheduler( - Executor & executor - , size_t executorThreads = 30 - , size_t schedulerThreads = 10); + public: + Scheduler( + Executor &executor, size_t executorThreads = 30, size_t schedulerThreads = 10); - ~Scheduler(); + ~Scheduler(); - // returns DagRun ID - std::future - scheduleDAG(std::string runName - , std::vector tasks - , std::unordered_map parameters - , DAG dag = {} // Allows for loading of an existing DAG - ); + // returns DagRun ID + std::future + scheduleDAG(std::string runName, std::vector tasks, + std::unordered_map parameters, + DAG dag = {} // Allows for loading of an existing DAG + ); - // get the current status of a DAG - DAGState dagRunStatus(std::string runName); + // get the current DAG + DAG dagRunState(); - // get the current DAG - DAG dagRunState(); - - // Complete running DAGs and shutdown - void drain(); + // Complete running DAGs and shutdown + void drain(); private: + void runDAG(const std::string &name, DAGRun &dagRun); - struct DAGRun { - std::vector tasks; - std::unordered_map parameters; - DAG dag; - std::vector taskRuns; - std::mutex taskGuard_; - }; + std::vector runTask(const Task &task); - void runDAG(const std::string & name, DAGRun & dagRun); - std::vector runTask(const Task & task); - - std::unordered_map runs_; - std::vector> futs_; - Executor & executor_; - ThreadPool schedulers_; - ThreadPool executors_; - std::unordered_map> jobs; - std::mutex mtx_; - std::condition_variable cv_; - }; + std::unordered_map runs_; + std::vector> futs_; + Executor &executor_; + ThreadPool schedulers_; + ThreadPool executors_; + std::unordered_map> jobs; + std::mutex mtx_; + std::condition_variable cv_; + }; } diff --git a/daggy/include/daggy/Server.hpp b/daggy/include/daggy/Server.hpp index 2457720..cf81fb6 100644 --- a/daggy/include/daggy/Server.hpp +++ b/daggy/include/daggy/Server.hpp @@ -7,44 +7,45 @@ // #include namespace daggy { - class Server { + class Server { public: - Server(Pistache::Address addr) - : endpoint_(addr) - , desc_("Daggy API", "0.1") - {} + Server(Pistache::Address addr) + : endpoint_(addr), desc_("Daggy API", "0.1") {} - void init(int threads = 1); + void init(int threads = 1); - void start(); + void start(); private: - void createDescription(); + void createDescription(); - // - // DAG Definition handlers - // + // + // DAG Definition handlers + // - void listDAGs(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); - void upsertDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); - void deleteDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); - void getDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void listDAGs(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - // - // DAG Runs - // + void upsertDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - void runDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void deleteDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - // List - void getDAGRuns(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void getDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - // Get status of specific run - void getDAGRun(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + // + // DAG Runs + // - Pistache::Http::Endpoint endpoint_; - Pistache::Rest::Description desc_; - Pistache::Rest::Router router_; + void runDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); - }; + // List + void getDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + + // Get status of specific run + void getDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); + + Pistache::Http::Endpoint endpoint_; + Pistache::Rest::Description desc_; + Pistache::Rest::Router router_; + + }; } diff --git a/daggy/include/daggy/Task.hpp b/daggy/include/daggy/Task.hpp index bf26470..098fbbd 100644 --- a/daggy/include/daggy/Task.hpp +++ b/daggy/include/daggy/Task.hpp @@ -5,11 +5,11 @@ #include namespace daggy { - struct Task { - std::string name; - std::vector command; - uint8_t max_retries; - uint32_t retry_interval_seconds; // Time to wait between retries - std::vector children; - }; + struct Task { + std::string name; + std::vector command; + uint8_t max_retries; + uint32_t retry_interval_seconds; // Time to wait between retries + std::vector children; + }; } diff --git a/daggy/include/daggy/ThreadPool.hpp b/daggy/include/daggy/ThreadPool.hpp index 389d6db..887441b 100644 --- a/daggy/include/daggy/ThreadPool.hpp +++ b/daggy/include/daggy/ThreadPool.hpp @@ -14,155 +14,152 @@ using namespace std::chrono_literals; namespace daggy { - /* - A Task Queue is a collection of async tasks to be executed by the - thread pool. Using individual task queues allows for a rough QoS - when a single thread may be submitting batches of requests -- - one producer won't starve out another, but all tasks will be run - as quickly as possible. - */ - class TaskQueue { + /* + A Task Queue is a collection of async tasks to be executed by the + thread pool. Using individual task queues allows for a rough QoS + when a single thread may be submitting batches of requests -- + one producer won't starve out another, but all tasks will be run + as quickly as possible. + */ + class TaskQueue { public: - template - decltype(auto) addTask(F&& f, Args&&... args) { - // using return_type = std::invoke_result::type; - using return_type = std::invoke_result_t; + template + decltype(auto) addTask(F &&f, Args &&... args) { + // using return_type = std::invoke_result::type; + using return_type = std::invoke_result_t; - std::packaged_task task( - std::bind(std::forward(f), std::forward(args)...) - ); + std::packaged_task task( + std::bind(std::forward(f), std::forward(args)...) + ); - std::future res = task.get_future(); - { - std::lock_guard guard(mtx_); - tasks_.emplace(std::move(task)); - } - return res; + std::future res = task.get_future(); + { + std::lock_guard guard(mtx_); + tasks_.emplace(std::move(task)); + } + return res; } - std::packaged_task pop() { - std::lock_guard guard(mtx_); - auto task = std::move(tasks_.front()); - tasks_.pop(); - return task; - } + std::packaged_task pop() { + std::lock_guard guard(mtx_); + auto task = std::move(tasks_.front()); + tasks_.pop(); + return task; + } - size_t size() { - std::lock_guard guard(mtx_); - return tasks_.size(); - } + size_t size() { + std::lock_guard guard(mtx_); + return tasks_.size(); + } - bool empty() { - std::lock_guard guard(mtx_); - return tasks_.empty(); - } + bool empty() { + std::lock_guard guard(mtx_); + return tasks_.empty(); + } private: - std::queue< std::packaged_task > tasks_; - std::mutex mtx_; - }; + std::queue > tasks_; + std::mutex mtx_; + }; - class ThreadPool { + class ThreadPool { public: - explicit ThreadPool(size_t nWorkers) - : - tqit_(taskQueues_.begin()) - , stop_(false) - , drain_(false) - { - resize(nWorkers); - } - - ~ThreadPool() { shutdown(); } - - void shutdown() { - stop_ = true; - cv_.notify_all(); - for (std::thread& worker : workers_) { - if (worker.joinable()) - worker.join(); + explicit ThreadPool(size_t nWorkers) + : + tqit_(taskQueues_.begin()), stop_(false), drain_(false) { + resize(nWorkers); } - } - void drain() { - drain_ = true; - while (true) { - { - std::lock_guard guard(mtx_); - if (taskQueues_.empty()) break; - } - std::this_thread::sleep_for(250ms); - } - } + ~ThreadPool() { shutdown(); } - void restart() { - drain_ = false; - } - - void resize(size_t nWorkers) { - shutdown(); - workers_.clear(); - stop_ = false; - - for(size_t i = 0;i< nWorkers;++i) - workers_.emplace_back( [&] { - while (true) { - std::packaged_task task; - { - std::unique_lock lock(mtx_); - cv_.wait(lock, [&]{ return stop_ || ! taskQueues_.empty(); }); - if(taskQueues_.empty()) { - if(stop_) return; - continue; - } - if (tqit_ == taskQueues_.end()) tqit_ = taskQueues_.begin(); - task = std::move((*tqit_)->pop()); - if ((*tqit_)->empty()) { - tqit_ = taskQueues_.erase(tqit_); - } else { - tqit_++; - } - } - task(); + void shutdown() { + stop_ = true; + cv_.notify_all(); + for (std::thread &worker : workers_) { + if (worker.joinable()) + worker.join(); } - } - ); - }; + } - template - decltype(auto) addTask(F&& f, Args&&... args) { - if (drain_) throw std::runtime_error("Unable to add task to draining pool"); - auto tq = std::make_shared(); + void drain() { + drain_ = true; + while (true) { + { + std::lock_guard guard(mtx_); + if (taskQueues_.empty()) break; + } + std::this_thread::sleep_for(250ms); + } + } - auto fut = tq->addTask(f, args...); + void restart() { + drain_ = false; + } - { + void resize(size_t nWorkers) { + shutdown(); + workers_.clear(); + stop_ = false; + + for (size_t i = 0; i < nWorkers; ++i) + workers_.emplace_back([&] { + while (true) { + std::packaged_task task; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [&] { return stop_ || !taskQueues_.empty(); }); + if (taskQueues_.empty()) { + if (stop_) return; + continue; + } + if (tqit_ == taskQueues_.end()) tqit_ = taskQueues_.begin(); + task = std::move((*tqit_)->pop()); + if ((*tqit_)->empty()) { + tqit_ = taskQueues_.erase(tqit_); + } else { + tqit_++; + } + } + task(); + } + } + ); + }; + + template + decltype(auto) addTask(F &&f, Args &&... args) { + if (drain_) throw std::runtime_error("Unable to add task to draining pool"); + auto tq = std::make_shared(); + + auto fut = tq->addTask(f, args...); + + { + std::lock_guard guard(mtx_); + taskQueues_.push_back(tq); + } + cv_.notify_one(); + return fut; + } + + void addTasks(std::shared_ptr tq) { + if (drain_) throw std::runtime_error("Unable to add task to draining pool"); std::lock_guard guard(mtx_); taskQueues_.push_back(tq); - } - cv_.notify_one(); - return fut; + cv_.notify_one(); } - void addTasks(std::shared_ptr tq) { - if (drain_) throw std::runtime_error("Unable to add task to draining pool"); - std::lock_guard guard(mtx_); - taskQueues_.push_back(tq); - cv_.notify_one(); - } - private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers_; - // the task queue - std::list> taskQueues_; - std::list>::iterator tqit_; + // need to keep track of threads so we can join them + std::vector workers_; + // the task queue + std::list> taskQueues_; + std::list>::iterator tqit_; - // synchronization - std::mutex mtx_; - std::condition_variable cv_; - std::atomic stop_; - std::atomic drain_; - }; + // synchronization + std::mutex mtx_; + std::condition_variable cv_; + std::atomic stop_; + std::atomic drain_; + }; } diff --git a/daggy/include/daggy/executors/ForkingExecutor.hpp b/daggy/include/daggy/executors/ForkingExecutor.hpp index c7a1b4a..97d2016 100644 --- a/daggy/include/daggy/executors/ForkingExecutor.hpp +++ b/daggy/include/daggy/executors/ForkingExecutor.hpp @@ -4,13 +4,14 @@ #include "../Executor.hpp" namespace daggy { - namespace executor { - class ForkingExecutor : public Executor { - public: - ForkingExecutor() = default; - const std::string getName() const override { return "ForkingExecutor"; } + namespace executor { + class ForkingExecutor : public Executor { + public: + ForkingExecutor() = default; - AttemptRecord runCommand(std::vector cmd) override; - }; - } + const std::string getName() const override { return "ForkingExecutor"; } + + AttemptRecord runCommand(std::vector cmd) override; + }; + } } diff --git a/daggy/src/DAG.cpp b/daggy/src/DAG.cpp index 56f4d0e..c81d87a 100644 --- a/daggy/src/DAG.cpp +++ b/daggy/src/DAG.cpp @@ -2,78 +2,79 @@ #include namespace daggy { - size_t DAG::size() const { return vertices_.size(); } - bool DAG::empty() const { return vertices_.empty(); } + size_t DAG::size() const { return vertices_.size(); } - size_t DAG::addVertex() { - vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0}); - return vertices_.size() - 1; - } + bool DAG::empty() const { return vertices_.empty(); } - void DAG::dropEdge(const size_t from, const size_t to) { - if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); - if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); - vertices_[from].children.extract(to); - } - - void DAG::addEdge(const size_t from, const size_t to) { - if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); - if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); - if (hasPath(to, from)) - throw std::runtime_error("Adding edge would result in a cycle"); - vertices_[from].children.insert(to); - } - - bool DAG::hasPath(const size_t from, const size_t to) const { - if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); - if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); - for (const auto & child : vertices_[from].children) { - if (child == to) return true; - if (hasPath(child, to)) return true; + size_t DAG::addVertex() { + vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0}); + return vertices_.size() - 1; } - return false; - } - - void DAG::reset() { - // Reset the state of all vertices - for (auto & v : vertices_) { - v.state = VertexState::UNVISITED; - v.depCount = 0; + void DAG::dropEdge(const size_t from, const size_t to) { + if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); + if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); + vertices_[from].children.extract(to); } - // Calculate the upstream count - for (auto & v : vertices_) { - for (auto c : v.children) { - ++vertices_[c].depCount; - } + void DAG::addEdge(const size_t from, const size_t to) { + if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); + if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); + if (hasPath(to, from)) + throw std::runtime_error("Adding edge would result in a cycle"); + vertices_[from].children.insert(to); } - } - bool DAG::allVisited() const { - for (const auto & v : vertices_) { - if (v.state != VertexState::VISITED) return false; + bool DAG::hasPath(const size_t from, const size_t to) const { + if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); + if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); + for (const auto &child : vertices_[from].children) { + if (child == to) return true; + if (hasPath(child, to)) return true; + } + + return false; } - return true; - } - std::optional DAG::visitNext() { - for (size_t i = 0; i < vertices_.size(); ++i) { - auto & v = vertices_[i]; + void DAG::reset() { + // Reset the state of all vertices + for (auto &v : vertices_) { + v.state = VertexState::UNVISITED; + v.depCount = 0; + } - if (v.state != VertexState::UNVISITED) continue; - if (v.depCount != 0) continue; - v.state = VertexState::VISITING; - return i; + // Calculate the upstream count + for (auto &v : vertices_) { + for (auto c : v.children) { + ++vertices_[c].depCount; + } + } } - return {}; - } - void DAG::completeVisit(const size_t id) { - auto & v = vertices_[id]; - v.state = VertexState::VISITED; - for (auto c : v.children) { - --vertices_[c].depCount; + bool DAG::allVisited() const { + for (const auto &v : vertices_) { + if (v.state != VertexState::VISITED) return false; + } + return true; + } + + std::optional DAG::visitNext() { + for (size_t i = 0; i < vertices_.size(); ++i) { + auto &v = vertices_[i]; + + if (v.state != VertexState::UNVISITED) continue; + if (v.depCount != 0) continue; + v.state = VertexState::VISITING; + return i; + } + return {}; + } + + void DAG::completeVisit(const size_t id) { + auto &v = vertices_[id]; + v.state = VertexState::VISITED; + for (auto c : v.children) { + --vertices_[c].depCount; + } } - } } diff --git a/daggy/src/Scheduler.cpp b/daggy/src/Scheduler.cpp index aba1bc6..64cb442 100644 --- a/daggy/src/Scheduler.cpp +++ b/daggy/src/Scheduler.cpp @@ -3,116 +3,110 @@ using namespace std::chrono_literals; namespace daggy { - Scheduler::Scheduler(Executor & executor - , size_t executorThreads - , size_t schedulerThreads) - : executor_(executor) - , schedulers_(schedulerThreads) - , executors_(executorThreads) - { } + Scheduler::Scheduler(Executor &executor, size_t executorThreads, size_t schedulerThreads) + : executor_(executor), schedulers_(schedulerThreads), executors_(executorThreads) {} - Scheduler::~Scheduler() { - executors_.shutdown(); - schedulers_.shutdown(); - } + Scheduler::~Scheduler() { + executors_.shutdown(); + schedulers_.shutdown(); + } - std::future - Scheduler::scheduleDAG(std::string runName - , std::vector tasks - , std::unordered_map parameters - , DAG dag - ) - { - // Initialize the dag - if (dag.empty()) { - std::unordered_map tids; + std::future + Scheduler::scheduleDAG(std::string runName, std::vector tasks, + std::unordered_map parameters, DAG dag + ) { + // Initialize the dag if one wasn't provided + if (dag.empty()) { + std::unordered_map taskIDs; - // Add all the vertices - for (size_t i = 0; i < tasks.size(); ++i) { - tids[tasks[i].name] = dag.addVertex(); - } + // Add all the vertices + for (const auto &task : tasks) { + taskIDs[task.name] = dag.addVertex(); + } - // Add edges - for (size_t i = 0; i < tasks.size(); ++i) { - for (const auto & c : tasks[i].children) { - dag.addEdge(i, tids[c]); + // Add edges + for (size_t i = 0; i < tasks.size(); ++i) { + for (const auto &c : tasks[i].children) { + dag.addEdge(i, taskIDs[c]); + } + } + dag.reset(); } - } - dag.reset(); + + // Create the DAGRun + std::lock_guard guard(mtx_); + auto &dr = runs_[runName]; + + dr.tasks = tasks; + dr.parameters = std::move(parameters); + dr.dag = dag; + dr.taskRuns = std::vector{tasks.size()}; + + // return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); + return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); } - // Create the DAGRun - std::lock_guard guard(mtx_); - auto & dr = runs_[runName]; + void Scheduler::runDAG(const std::string &name, DAGRun &run) { + struct TaskState { + size_t tid; + std::future> fut; + bool complete; + }; - dr.tasks = tasks; - dr.parameters = parameters; - dr.dag = dag; - dr.taskRuns = std::vector{tasks.size()}; + std::vector tasks; - // return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); - return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); - } + while (!run.dag.allVisited()) { - void Scheduler::runDAG(const std::string & name, DAGRun & run) - { - struct Task { - size_t tid; - std::future> fut; - bool complete; - }; + // Check for any completed tasks + for (auto &task : tasks) { + if (task.complete) continue; - std::vector tasks; + if (task.fut.valid()) { + auto ars = task.fut.get(); + if (ars.back().rc == 0) { + run.dag.completeVisit(task.tid); + } + task.complete = true; + } + } - while (! run.dag.allVisited()) { + // Add all remaining tasks in a task queue to avoid dominating the thread pool + auto tq = std::make_shared(); + auto t = run.dag.visitNext(); + while (t.has_value()) { + // Schedule the task to run + TaskState tsk{.tid = t.value(), .fut = tq->addTask( + [&]() { return runTask(run.tasks[t.value()]); }), .complete = false + }; + tasks.push_back(std::move(tsk)); - // Check for any completed tasks - for (auto & task : tasks) { - if (task.complete) continue; + // + auto nt = run.dag.visitNext(); + if (not nt.has_value()) break; + t.emplace(nt.value()); + } + if (! tq->empty()) { + executors_.addTasks(tq); + } - if (task.fut.valid()) { - auto ars = task.fut.get(); - if (ars.back().rc == 0) { - run.dag.completeVisit(task.tid); - } - task.complete = true; + std::this_thread::sleep_for(250ms); } - } - - // Get the next dag to run - auto t = run.dag.visitNext(); - while (t.has_value()) { - // Schedule the task to run - Task tsk{ .tid = t.value() - , .fut = executors_.addTask([&](){return runTask(run.tasks[t.value()]);}) - , .complete = false - }; - tasks.push_back(std::move(tsk)); - - // - auto nt = run.dag.visitNext(); - if (not nt.has_value()) break; - t.emplace(nt.value()); - } - - std::this_thread::sleep_for(250ms); - } - } - - std::vector - Scheduler::runTask(const Task & task) { - std::vector attempts; - - while (attempts.size() < task.max_retries) { - attempts.push_back(executor_.runCommand(task.command)); - if (attempts.back().rc == 0) break; } - return attempts; - } + std::vector + Scheduler::runTask(const Task &task) { + std::vector attempts; - void Scheduler::drain() { - schedulers_.drain(); - } + while (attempts.size() < task.max_retries) { + attempts.push_back(executor_.runCommand(task.command)); + if (attempts.back().rc == 0) break; + } + + return attempts; + } + + void Scheduler::drain() { + schedulers_.drain(); + } } diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index ac14976..216b793 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -3,55 +3,54 @@ using namespace Pistache; namespace daggy { - void Server::init(int threads) { - auto opts = Http::Endpoint::options() - .threads(threads) - ; - endpoint_.init(opts); - createDescription(); - } + void Server::init(int threads) { + auto opts = Http::Endpoint::options() + .threads(threads); + endpoint_.init(opts); + createDescription(); + } - void Server::start() { - router_.initFromDescription(desc_); + void Server::start() { + router_.initFromDescription(desc_); - endpoint_.setHandler(router_.handler()); - endpoint_.serve(); - } + endpoint_.setHandler(router_.handler()); + endpoint_.serve(); + } - void Server::createDescription() { - desc_ - .info() - .license("Apache", "http://www.apache.org/licenses/LICENSE-2.0") - ; + void Server::createDescription() { + desc_ + .info() + .license("Apache", "http://www.apache.org/licenses/LICENSE-2.0"); - auto backendErrorResponse = desc_.response(Http::Code::Internal_Server_Error, "An error occured with the backend"); + auto backendErrorResponse = desc_.response(Http::Code::Internal_Server_Error, + "An error occured with the backend"); - desc_ - .schemes(Rest::Scheme::Http) - .basePath("/v1") - .produces(MIME(Application, Json)) - .consumes(MIME(Application, Json)); + desc_ + .schemes(Rest::Scheme::Http) + .basePath("/v1") + .produces(MIME(Application, Json)) + .consumes(MIME(Application, Json)); - /* - desc_ - .route(desc_.get("/ready")) - .bind(&Generic::handleReady) - .response(Http::Code::Ok, "Response to the /ready call") - .hide(); - */ + /* + desc_ + .route(desc_.get("/ready")) + .bind(&Generic::handleReady) + .response(Http::Code::Ok, "Response to the /ready call") + .hide(); + */ - auto versionPath = desc_.path("/v1"); + auto versionPath = desc_.path("/v1"); - auto accountsPath = versionPath.path("/accounts"); + auto accountsPath = versionPath.path("/accounts"); - /* - accountsPath - .route(desc_.get("/all")) - .bind(&BankerService::retrieveAllAccounts, this) - .produces(MIME(Application, Json), MIME(Application, Xml)) - .response(Http::Code::Ok, "The list of all account"); - */ + /* + accountsPath + .route(desc_.get("/all")) + .bind(&BankerService::retrieveAllAccounts, this) + .produces(MIME(Application, Json), MIME(Application, Xml)) + .response(Http::Code::Ok, "The list of all account"); + */ - } + } } diff --git a/daggy/src/executors/ForkingExecutor.cpp b/daggy/src/executors/ForkingExecutor.cpp index 8ff3b96..5c4b50b 100644 --- a/daggy/src/executors/ForkingExecutor.cpp +++ b/daggy/src/executors/ForkingExecutor.cpp @@ -10,79 +10,80 @@ using namespace daggy::executor; -std::string slurp(int fd) { - std::string result; +std::string slurp(int fd) { + std::string result; - const ssize_t BUFFER_SIZE = 4096; - char buffer[BUFFER_SIZE]; + const ssize_t BUFFER_SIZE = 4096; + char buffer[BUFFER_SIZE]; - struct pollfd pfd{ .fd = fd, .events = POLLIN, .revents = 0 }; - poll(&pfd, 1, 1); - - while (pfd.revents & POLLIN) { - ssize_t bytes = read(fd, buffer, BUFFER_SIZE); - if (bytes == 0) { - break; - } else { - result.append(buffer, bytes); - } - pfd.revents = 0; + struct pollfd pfd{.fd = fd, .events = POLLIN, .revents = 0}; poll(&pfd, 1, 1); - } - return result; + while (pfd.revents & POLLIN) { + ssize_t bytes = read(fd, buffer, BUFFER_SIZE); + if (bytes == 0) { + break; + } else { + result.append(buffer, bytes); + } + pfd.revents = 0; + poll(&pfd, 1, 1); + } + + return result; } daggy::AttemptRecord - ForkingExecutor::runCommand(std::vector cmd) -{ - AttemptRecord rec; +ForkingExecutor::runCommand(std::vector cmd) { + AttemptRecord rec; - rec.startTime = Clock::now(); + rec.startTime = Clock::now(); - // Need to convert the strings - std::vector argv; - for (const auto & s : cmd) { - argv.push_back(const_cast(s.c_str())); - } - argv.push_back(nullptr); + // Need to convert the strings + std::vector argv; + for (const auto &s : cmd) { + argv.push_back(const_cast(s.c_str())); + } + argv.push_back(nullptr); - // Create the pipe - int stdoutPipe[2]; pipe2(stdoutPipe, O_DIRECT); - int stderrPipe[2]; pipe2(stderrPipe, O_DIRECT); + // Create the pipe + int stdoutPipe[2]; + pipe2(stdoutPipe, O_DIRECT); + int stderrPipe[2]; + pipe2(stderrPipe, O_DIRECT); + + pid_t child = fork(); + if (child < 0) { + throw std::runtime_error("Unable to fork child"); + } else if (child == 0) { // child + while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {} + while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {} + close(stdoutPipe[0]); + close(stderrPipe[0]); + execvp(argv[0], argv.data()); + exit(-1); + } + + std::atomic running = true; + std::thread stdoutReader([&]() { while (running) rec.output.append(slurp(stdoutPipe[0])); }); + std::thread stderrReader([&]() { while (running) rec.error.append(slurp(stderrPipe[0])); }); + + int rc = 0; + waitpid(child, &rc, 0); + running = false; + + rec.stopTime = Clock::now(); + if (WIFEXITED(rc)) { + rec.rc = WEXITSTATUS(rc); + } else { + rec.rc = -1; + } + + stdoutReader.join(); + stderrReader.join(); - pid_t child = fork(); - if (child < 0) { - throw std::runtime_error("Unable to fork child"); - } else if (child == 0) { // child - while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {} - while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {} close(stdoutPipe[0]); close(stderrPipe[0]); - execvp(argv[0], argv.data()); - exit(-1); - } - std::atomic running = true; - std::thread stdoutReader([&]() { while(running) rec.output.append(slurp(stdoutPipe[0])); }); - std::thread stderrReader([&]() { while(running) rec.error.append(slurp(stderrPipe[0])); }); - - int rc = 0; - waitpid(child, &rc, 0); - running = false; - - rec.stopTime = Clock::now(); - if (WIFEXITED(rc)) { - rec.rc = WEXITSTATUS(rc); - } else { - rec.rc = -1; - } - - stdoutReader.join(); - stderrReader.join(); - - close(stdoutPipe[0]); - close(stderrPipe[0]); - - return rec; + return rec; } diff --git a/tests/unit_scheduler.cpp b/tests/unit_scheduler.cpp index c2aa450..5e5a9f7 100644 --- a/tests/unit_scheduler.cpp +++ b/tests/unit_scheduler.cpp @@ -17,7 +17,9 @@ TEST_CASE("Basic Scheduler Execution", "[scheduler]") { }; SECTION("Simple Run") { - auto fut = sched.scheduleDAG("Simple", tasks, {}); - fut.get(); + auto fut_a = sched.scheduleDAG("Simple 1", tasks, {}); + auto fut_b = sched.scheduleDAG("Simple 2", tasks, {}); + fut_a.get(); + fut_b.get(); } -} +} \ No newline at end of file