* Formatting code with clang-tidy

* Roughing in more metastore work
This commit is contained in:
Ian Roddis
2021-07-22 12:57:51 -03:00
parent 987718334f
commit 0349a5109b
16 changed files with 561 additions and 599 deletions

View File

@@ -0,0 +1,17 @@
#pragma once
#include <chrono>
#include <string>
namespace daggy {
using Clock = std::chrono::system_clock;
struct AttemptRecord {
std::chrono::time_point<Clock> startTime;
std::chrono::time_point<Clock> stopTime;
int rc; // RC from the task
std::string metaLog; // Logs from the executor
std::string output; // stdout from command
std::string error; // stderr from command
};
}

View File

@@ -16,45 +16,53 @@
namespace daggy { namespace daggy {
enum class VertexState : uint32_t { enum class VertexState : uint32_t {
UNVISITED = 0, UNVISITED = 0,
VISITING, VISITING,
VISITED VISITED
}; };
struct Vertex { struct Vertex {
VertexState state; VertexState state;
uint32_t depCount; uint32_t depCount;
std::unordered_set<size_t> children; std::unordered_set<size_t> children;
}; };
using Edge = std::pair<size_t,size_t>; using Edge = std::pair<size_t, size_t>;
class DAG { class DAG {
public: public:
// Vertices // Vertices
size_t addVertex(); size_t addVertex();
const std::vector<Vertex> & getVertices();
// Edges const std::vector<Vertex> &getVertices();
void addEdge(const size_t src, const size_t dst);
void dropEdge(const size_t src, const size_t dst);
bool hasPath(const size_t from, const size_t to) const;
const std::vector<Edge> & getEdges();
// Attributes // Edges
size_t size() const; void addEdge(const size_t src, const size_t dst);
bool empty() const;
// Traversal void dropEdge(const size_t src, const size_t dst);
void reset();
VertexState getVertexState(const size_t id) const;
bool allVisited() const;
std::optional<const size_t> visitNext(); bool hasPath(const size_t from, const size_t to) const;
void completeVisit(const size_t id);
const std::vector<Edge> &getEdges();
// Attributes
size_t size() const;
bool empty() const;
// Traversal
void reset();
VertexState getVertexState(const size_t id) const;
bool allVisited() const;
std::optional<const size_t> visitNext();
void completeVisit(const size_t id);
private: private:
std::vector<Vertex> vertices_; std::vector<Vertex> vertices_;
}; };
} }

View File

@@ -1,70 +0,0 @@
size_t DAG::size() const { return vertices_.size(); }
bool DAG::empty() const { return vertices_.empty(); }
size_t DAG::addVertex() {
vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0});
return vertices_.size();
}
void DAG::dropEdge(const size_t from, const size_t to) {
vertices_[from].children.extract(to);
}
void DAG::addEdge(const size_t from, const size_t to) {
if (hasPath(to, from))
throw std::runtime_error("Adding edge would result in a cycle");
vertices_[from].children.insert(to);
}
bool DAG::hasPath(const size_t from, const size_t to) const {
bool pathFound = false;
for (const auto & child : vertices_[from].children) {
if (child == to) return true;
if (hasPath(child, to)) return true;
}
return false;
}
void DAG::reset() {
// Reset the state of all vertices
for (auto & v : vertices_) {
v.state = VertexState::UNVISITED;
v.depCount = 0;
}
// Calculate the upstream count
for (auto & v : vertices_) {
for (auto c : v.children) {
++vertices_[c].depCount;
}
}
}
bool DAG::allVisited() const {
for (const auto & v : vertices_) {
if (v.state != VertexState::VISITED) return false;
}
return true;
}
std::optional<const size_t > DAG::visitNext() {
for (size_t i = 0; i < vertices_.size(); ++i) {
auto & v = vertices_[i];
if (v.state != VertexState::UNVISITED) continue;
if (v.depCount != 0) continue;
v.state = VertexState::VISITING;
return i;
}
return {};
}
void DAG::completeVisit(const size_t id) {
auto & v = vertices_[id];
v.state = VertexState::VISITED;
for (auto c : v.children) {
--vertices_[c].depCount;
}
}

View File

@@ -0,0 +1,21 @@
#pragma once
#include <string>
#include <unordered_map>
#include <variant>
#include "DAG.hpp"
#include "Task.hpp"
#include "AttemptRecord.hpp"
namespace daggy {
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
using TaskRun = std::vector<AttemptRecord>;
struct DAGRun {
std::vector<Task> tasks;
std::unordered_map<std::string, ParameterValue> parameters;
DAG dag;
std::vector<TaskRun> taskRuns;
};
}

View File

@@ -7,7 +7,7 @@
#include <vector> #include <vector>
#include "Task.hpp" #include "Task.hpp"
#include "AttemptRecord.hpp"
/* /*
Executors run Tasks, returning a future with the results. Executors run Tasks, returning a future with the results.
@@ -15,23 +15,13 @@
*/ */
namespace daggy { namespace daggy {
using Clock = std::chrono::system_clock; class Executor {
struct AttemptRecord {
std::chrono::time_point<Clock> startTime;
std::chrono::time_point<Clock> stopTime;
int rc; // RC from the task
std::string metaLog; // Logs from the executor
std::string output; // stdout from command
std::string error; // stderr from command
};
class Executor {
public: public:
Executor() = default; Executor() = default;
virtual const std::string getName() const = 0;
// This will block if the executor is full virtual const std::string getName() const = 0;
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
}; // This will block if the executor is full
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
};
} }

View File

@@ -2,6 +2,8 @@
#include <string> #include <string>
#include "DAGRun.hpp"
/* /*
MetaStore represents the interface to store all the state information MetaStore represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to for daggy to run. Abstracted in case other back-end solutions need to
@@ -9,17 +11,31 @@
*/ */
namespace daggy { namespace daggy {
using DAGDefID = int16_t; // future proofing using DAGDefID = int16_t;
using DAGRunID = size_t;
// This struct will contain transitions for class MetaStore {
struct DAGRunEvent { }; // Basic storage + retrieval of DAG Definitions
virtual DAGDefID storeDAGDefinition(std::string name, std::string definition) = 0;
class MetaStore { virtual DAGDefID getCurrentDAGVersion(std::string name) = 0;
// Basic storage + retrieval of DAG Definitions
virtual void storeDAGDefinition(std::string name, std::string definition) = 0;
virtual DAGDefID getCurrentDAGVersion(std::string name) = 0;
virtual std::string getDAGDefinition(std::string name, DAGDefID version = -1) = 0;
// DAG Run State virtual std::string getDAGDefinition(std::string name, DAGDefID version = -1) = 0;
};
// DAG Run State
/*
* startDAGRun // DAG starts up, returns a DAGID for future updates
* updateDAGRun // DAG State transitions
* updateTaskState // Task state updates
*/
virtual DAGRunID startDAGRun(std::string dagName, DAGDefID version, DAGRun dagRun
) = 0;
virtual void updateTask(DAGRunID rid, std::string taskName, VertexState state) = 0;
virtual void updateDAGRun(DAGRunID rid, DAGState state) = 0;
// Retrievals
};
} }

View File

@@ -7,67 +7,51 @@
#include "DAG.hpp" #include "DAG.hpp"
#include "Executor.hpp" #include "Executor.hpp"
#include "DAGRun.hpp"
#include "ThreadPool.hpp" #include "ThreadPool.hpp"
namespace daggy { namespace daggy {
using ParameterValue = std::variant<std::string, std::vector<std::string>>; enum class DAGState : uint32_t {
using TaskRun = std::vector<AttemptRecord>;
class Scheduler {
public:
enum class DAGState : uint32_t {
UNKNOWN = 0, UNKNOWN = 0,
QUEUED, QUEUED,
RUNNING, RUNNING,
ERRORED, ERRORED,
COMPLETE COMPLETE
}; };
class Scheduler {
public: public:
Scheduler( public:
Executor & executor Scheduler(
, size_t executorThreads = 30 Executor &executor, size_t executorThreads = 30, size_t schedulerThreads = 10);
, size_t schedulerThreads = 10);
~Scheduler(); ~Scheduler();
// returns DagRun ID // returns DagRun ID
std::future<void> std::future<void>
scheduleDAG(std::string runName scheduleDAG(std::string runName, std::vector<Task> tasks,
, std::vector<Task> tasks std::unordered_map<std::string, ParameterValue> parameters,
, std::unordered_map<std::string, ParameterValue> parameters DAG dag = {} // Allows for loading of an existing DAG
, DAG dag = {} // Allows for loading of an existing DAG );
);
// get the current status of a DAG // get the current DAG
DAGState dagRunStatus(std::string runName); DAG dagRunState();
// get the current DAG // Complete running DAGs and shutdown
DAG dagRunState(); void drain();
// Complete running DAGs and shutdown
void drain();
private: private:
void runDAG(const std::string &name, DAGRun &dagRun);
struct DAGRun { std::vector<AttemptRecord> runTask(const Task &task);
std::vector<Task> tasks;
std::unordered_map<std::string, ParameterValue> parameters;
DAG dag;
std::vector<TaskRun> taskRuns;
std::mutex taskGuard_;
};
void runDAG(const std::string & name, DAGRun & dagRun); std::unordered_map<std::string, DAGRun> runs_;
std::vector<AttemptRecord> runTask(const Task & task); std::vector<std::future<void>> futs_;
Executor &executor_;
std::unordered_map<std::string, DAGRun> runs_; ThreadPool schedulers_;
std::vector<std::future<void>> futs_; ThreadPool executors_;
Executor & executor_; std::unordered_map<std::string, std::future<void>> jobs;
ThreadPool schedulers_; std::mutex mtx_;
ThreadPool executors_; std::condition_variable cv_;
std::unordered_map<std::string, std::future<void>> jobs; };
std::mutex mtx_;
std::condition_variable cv_;
};
} }

View File

@@ -7,44 +7,45 @@
// #include <pistache/thirdparty/serializer/rapidjson.h> // #include <pistache/thirdparty/serializer/rapidjson.h>
namespace daggy { namespace daggy {
class Server { class Server {
public: public:
Server(Pistache::Address addr) Server(Pistache::Address addr)
: endpoint_(addr) : endpoint_(addr), desc_("Daggy API", "0.1") {}
, desc_("Daggy API", "0.1")
{}
void init(int threads = 1); void init(int threads = 1);
void start(); void start();
private: private:
void createDescription(); void createDescription();
// //
// DAG Definition handlers // DAG Definition handlers
// //
void listDAGs(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void listDAGs(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
void upsertDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
void deleteDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
void getDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
// void upsertDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// DAG Runs
//
void runDAG(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void deleteDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// List void getDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
void getDAGRuns(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
// Get status of specific run //
void getDAGRun(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); // DAG Runs
//
Pistache::Http::Endpoint endpoint_; void runDAG(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
Pistache::Rest::Description desc_;
Pistache::Rest::Router router_;
}; // List
void getDAGRuns(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// Get status of specific run
void getDAGRun(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
Pistache::Http::Endpoint endpoint_;
Pistache::Rest::Description desc_;
Pistache::Rest::Router router_;
};
} }

View File

@@ -5,11 +5,11 @@
#include <vector> #include <vector>
namespace daggy { namespace daggy {
struct Task { struct Task {
std::string name; std::string name;
std::vector<std::string> command; std::vector<std::string> command;
uint8_t max_retries; uint8_t max_retries;
uint32_t retry_interval_seconds; // Time to wait between retries uint32_t retry_interval_seconds; // Time to wait between retries
std::vector<std::string> children; std::vector<std::string> children;
}; };
} }

View File

@@ -14,155 +14,152 @@ using namespace std::chrono_literals;
namespace daggy { namespace daggy {
/* /*
A Task Queue is a collection of async tasks to be executed by the A Task Queue is a collection of async tasks to be executed by the
thread pool. Using individual task queues allows for a rough QoS thread pool. Using individual task queues allows for a rough QoS
when a single thread may be submitting batches of requests -- when a single thread may be submitting batches of requests --
one producer won't starve out another, but all tasks will be run one producer won't starve out another, but all tasks will be run
as quickly as possible. as quickly as possible.
*/ */
class TaskQueue { class TaskQueue {
public: public:
template<class F, class... Args> template<class F, class... Args>
decltype(auto) addTask(F&& f, Args&&... args) { decltype(auto) addTask(F &&f, Args &&... args) {
// using return_type = std::invoke_result<F, Args...>::type; // using return_type = std::invoke_result<F, Args...>::type;
using return_type = std::invoke_result_t<F, Args...>; using return_type = std::invoke_result_t<F, Args...>;
std::packaged_task<return_type()> task( std::packaged_task<return_type()> task(
std::bind(std::forward<F>(f), std::forward<Args>(args)...) std::bind(std::forward<F>(f), std::forward<Args>(args)...)
); );
std::future<return_type> res = task.get_future(); std::future<return_type> res = task.get_future();
{ {
std::lock_guard<std::mutex> guard(mtx_); std::lock_guard<std::mutex> guard(mtx_);
tasks_.emplace(std::move(task)); tasks_.emplace(std::move(task));
} }
return res; return res;
} }
std::packaged_task<void()> pop() { std::packaged_task<void()> pop() {
std::lock_guard<std::mutex> guard(mtx_); std::lock_guard<std::mutex> guard(mtx_);
auto task = std::move(tasks_.front()); auto task = std::move(tasks_.front());
tasks_.pop(); tasks_.pop();
return task; return task;
} }
size_t size() { size_t size() {
std::lock_guard<std::mutex> guard(mtx_); std::lock_guard<std::mutex> guard(mtx_);
return tasks_.size(); return tasks_.size();
} }
bool empty() { bool empty() {
std::lock_guard<std::mutex> guard(mtx_); std::lock_guard<std::mutex> guard(mtx_);
return tasks_.empty(); return tasks_.empty();
} }
private: private:
std::queue< std::packaged_task<void()> > tasks_; std::queue<std::packaged_task<void()> > tasks_;
std::mutex mtx_; std::mutex mtx_;
}; };
class ThreadPool { class ThreadPool {
public: public:
explicit ThreadPool(size_t nWorkers) explicit ThreadPool(size_t nWorkers)
: :
tqit_(taskQueues_.begin()) tqit_(taskQueues_.begin()), stop_(false), drain_(false) {
, stop_(false) resize(nWorkers);
, drain_(false)
{
resize(nWorkers);
}
~ThreadPool() { shutdown(); }
void shutdown() {
stop_ = true;
cv_.notify_all();
for (std::thread& worker : workers_) {
if (worker.joinable())
worker.join();
} }
}
void drain() { ~ThreadPool() { shutdown(); }
drain_ = true;
while (true) {
{
std::lock_guard<std::mutex> guard(mtx_);
if (taskQueues_.empty()) break;
}
std::this_thread::sleep_for(250ms);
}
}
void restart() { void shutdown() {
drain_ = false; stop_ = true;
} cv_.notify_all();
for (std::thread &worker : workers_) {
void resize(size_t nWorkers) { if (worker.joinable())
shutdown(); worker.join();
workers_.clear();
stop_ = false;
for(size_t i = 0;i< nWorkers;++i)
workers_.emplace_back( [&] {
while (true) {
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [&]{ return stop_ || ! taskQueues_.empty(); });
if(taskQueues_.empty()) {
if(stop_) return;
continue;
}
if (tqit_ == taskQueues_.end()) tqit_ = taskQueues_.begin();
task = std::move((*tqit_)->pop());
if ((*tqit_)->empty()) {
tqit_ = taskQueues_.erase(tqit_);
} else {
tqit_++;
}
}
task();
} }
} }
);
};
template<class F, class... Args> void drain() {
decltype(auto) addTask(F&& f, Args&&... args) { drain_ = true;
if (drain_) throw std::runtime_error("Unable to add task to draining pool"); while (true) {
auto tq = std::make_shared<TaskQueue>(); {
std::lock_guard<std::mutex> guard(mtx_);
if (taskQueues_.empty()) break;
}
std::this_thread::sleep_for(250ms);
}
}
auto fut = tq->addTask(f, args...); void restart() {
drain_ = false;
}
{ void resize(size_t nWorkers) {
shutdown();
workers_.clear();
stop_ = false;
for (size_t i = 0; i < nWorkers; ++i)
workers_.emplace_back([&] {
while (true) {
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [&] { return stop_ || !taskQueues_.empty(); });
if (taskQueues_.empty()) {
if (stop_) return;
continue;
}
if (tqit_ == taskQueues_.end()) tqit_ = taskQueues_.begin();
task = std::move((*tqit_)->pop());
if ((*tqit_)->empty()) {
tqit_ = taskQueues_.erase(tqit_);
} else {
tqit_++;
}
}
task();
}
}
);
};
template<class F, class... Args>
decltype(auto) addTask(F &&f, Args &&... args) {
if (drain_) throw std::runtime_error("Unable to add task to draining pool");
auto tq = std::make_shared<TaskQueue>();
auto fut = tq->addTask(f, args...);
{
std::lock_guard<std::mutex> guard(mtx_);
taskQueues_.push_back(tq);
}
cv_.notify_one();
return fut;
}
void addTasks(std::shared_ptr<TaskQueue> tq) {
if (drain_) throw std::runtime_error("Unable to add task to draining pool");
std::lock_guard<std::mutex> guard(mtx_); std::lock_guard<std::mutex> guard(mtx_);
taskQueues_.push_back(tq); taskQueues_.push_back(tq);
} cv_.notify_one();
cv_.notify_one();
return fut;
} }
void addTasks(std::shared_ptr<TaskQueue> tq) {
if (drain_) throw std::runtime_error("Unable to add task to draining pool");
std::lock_guard<std::mutex> guard(mtx_);
taskQueues_.push_back(tq);
cv_.notify_one();
}
private: private:
// need to keep track of threads so we can join them // need to keep track of threads so we can join them
std::vector< std::thread > workers_; std::vector<std::thread> workers_;
// the task queue // the task queue
std::list<std::shared_ptr<TaskQueue>> taskQueues_; std::list<std::shared_ptr<TaskQueue>> taskQueues_;
std::list<std::shared_ptr<TaskQueue>>::iterator tqit_; std::list<std::shared_ptr<TaskQueue>>::iterator tqit_;
// synchronization // synchronization
std::mutex mtx_; std::mutex mtx_;
std::condition_variable cv_; std::condition_variable cv_;
std::atomic<bool> stop_; std::atomic<bool> stop_;
std::atomic<bool> drain_; std::atomic<bool> drain_;
}; };
} }

View File

@@ -4,13 +4,14 @@
#include "../Executor.hpp" #include "../Executor.hpp"
namespace daggy { namespace daggy {
namespace executor { namespace executor {
class ForkingExecutor : public Executor { class ForkingExecutor : public Executor {
public: public:
ForkingExecutor() = default; ForkingExecutor() = default;
const std::string getName() const override { return "ForkingExecutor"; }
AttemptRecord runCommand(std::vector<std::string> cmd) override; const std::string getName() const override { return "ForkingExecutor"; }
};
} AttemptRecord runCommand(std::vector<std::string> cmd) override;
};
}
} }

View File

@@ -2,78 +2,79 @@
#include <stdexcept> #include <stdexcept>
namespace daggy { namespace daggy {
size_t DAG::size() const { return vertices_.size(); } size_t DAG::size() const { return vertices_.size(); }
bool DAG::empty() const { return vertices_.empty(); }
size_t DAG::addVertex() { bool DAG::empty() const { return vertices_.empty(); }
vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0});
return vertices_.size() - 1;
}
void DAG::dropEdge(const size_t from, const size_t to) { size_t DAG::addVertex() {
if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from)); vertices_.push_back(Vertex{.state = VertexState::UNVISITED, .depCount = 0});
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to)); return vertices_.size() - 1;
vertices_[from].children.extract(to);
}
void DAG::addEdge(const size_t from, const size_t to) {
if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
if (hasPath(to, from))
throw std::runtime_error("Adding edge would result in a cycle");
vertices_[from].children.insert(to);
}
bool DAG::hasPath(const size_t from, const size_t to) const {
if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
for (const auto & child : vertices_[from].children) {
if (child == to) return true;
if (hasPath(child, to)) return true;
} }
return false; void DAG::dropEdge(const size_t from, const size_t to) {
} if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
void DAG::reset() { vertices_[from].children.extract(to);
// Reset the state of all vertices
for (auto & v : vertices_) {
v.state = VertexState::UNVISITED;
v.depCount = 0;
} }
// Calculate the upstream count void DAG::addEdge(const size_t from, const size_t to) {
for (auto & v : vertices_) { if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
for (auto c : v.children) { if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
++vertices_[c].depCount; if (hasPath(to, from))
} throw std::runtime_error("Adding edge would result in a cycle");
vertices_[from].children.insert(to);
} }
}
bool DAG::allVisited() const { bool DAG::hasPath(const size_t from, const size_t to) const {
for (const auto & v : vertices_) { if (from >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(from));
if (v.state != VertexState::VISITED) return false; if (to >= vertices_.size()) throw std::runtime_error("No such vertex " + std::to_string(to));
for (const auto &child : vertices_[from].children) {
if (child == to) return true;
if (hasPath(child, to)) return true;
}
return false;
} }
return true;
}
std::optional<const size_t> DAG::visitNext() { void DAG::reset() {
for (size_t i = 0; i < vertices_.size(); ++i) { // Reset the state of all vertices
auto & v = vertices_[i]; for (auto &v : vertices_) {
v.state = VertexState::UNVISITED;
v.depCount = 0;
}
if (v.state != VertexState::UNVISITED) continue; // Calculate the upstream count
if (v.depCount != 0) continue; for (auto &v : vertices_) {
v.state = VertexState::VISITING; for (auto c : v.children) {
return i; ++vertices_[c].depCount;
}
}
} }
return {};
}
void DAG::completeVisit(const size_t id) { bool DAG::allVisited() const {
auto & v = vertices_[id]; for (const auto &v : vertices_) {
v.state = VertexState::VISITED; if (v.state != VertexState::VISITED) return false;
for (auto c : v.children) { }
--vertices_[c].depCount; return true;
}
std::optional<const size_t> DAG::visitNext() {
for (size_t i = 0; i < vertices_.size(); ++i) {
auto &v = vertices_[i];
if (v.state != VertexState::UNVISITED) continue;
if (v.depCount != 0) continue;
v.state = VertexState::VISITING;
return i;
}
return {};
}
void DAG::completeVisit(const size_t id) {
auto &v = vertices_[id];
v.state = VertexState::VISITED;
for (auto c : v.children) {
--vertices_[c].depCount;
}
} }
}
} }

View File

@@ -3,116 +3,110 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
namespace daggy { namespace daggy {
Scheduler::Scheduler(Executor & executor Scheduler::Scheduler(Executor &executor, size_t executorThreads, size_t schedulerThreads)
, size_t executorThreads : executor_(executor), schedulers_(schedulerThreads), executors_(executorThreads) {}
, size_t schedulerThreads)
: executor_(executor)
, schedulers_(schedulerThreads)
, executors_(executorThreads)
{ }
Scheduler::~Scheduler() { Scheduler::~Scheduler() {
executors_.shutdown(); executors_.shutdown();
schedulers_.shutdown(); schedulers_.shutdown();
} }
std::future<void> std::future<void>
Scheduler::scheduleDAG(std::string runName Scheduler::scheduleDAG(std::string runName, std::vector<Task> tasks,
, std::vector<Task> tasks std::unordered_map<std::string, ParameterValue> parameters, DAG dag
, std::unordered_map<std::string, ParameterValue> parameters ) {
, DAG dag // Initialize the dag if one wasn't provided
) if (dag.empty()) {
{ std::unordered_map<std::string, size_t> taskIDs;
// Initialize the dag
if (dag.empty()) {
std::unordered_map<std::string, size_t> tids;
// Add all the vertices // Add all the vertices
for (size_t i = 0; i < tasks.size(); ++i) { for (const auto &task : tasks) {
tids[tasks[i].name] = dag.addVertex(); taskIDs[task.name] = dag.addVertex();
} }
// Add edges // Add edges
for (size_t i = 0; i < tasks.size(); ++i) { for (size_t i = 0; i < tasks.size(); ++i) {
for (const auto & c : tasks[i].children) { for (const auto &c : tasks[i].children) {
dag.addEdge(i, tids[c]); dag.addEdge(i, taskIDs[c]);
}
}
dag.reset();
} }
}
dag.reset(); // Create the DAGRun
std::lock_guard<std::mutex> guard(mtx_);
auto &dr = runs_[runName];
dr.tasks = tasks;
dr.parameters = std::move(parameters);
dr.dag = dag;
dr.taskRuns = std::vector<TaskRun>{tasks.size()};
// return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); }));
return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); }));
} }
// Create the DAGRun void Scheduler::runDAG(const std::string &name, DAGRun &run) {
std::lock_guard<std::mutex> guard(mtx_); struct TaskState {
auto & dr = runs_[runName]; size_t tid;
std::future<std::vector<AttemptRecord>> fut;
bool complete;
};
dr.tasks = tasks; std::vector<TaskState> tasks;
dr.parameters = parameters;
dr.dag = dag;
dr.taskRuns = std::vector<TaskRun>{tasks.size()};
// return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); })); while (!run.dag.allVisited()) {
return std::move(schedulers_.addTask([&]() { runDAG(runName, dr); }));
}
void Scheduler::runDAG(const std::string & name, DAGRun & run) // Check for any completed tasks
{ for (auto &task : tasks) {
struct Task { if (task.complete) continue;
size_t tid;
std::future<std::vector<AttemptRecord>> fut;
bool complete;
};
std::vector<Task> tasks; if (task.fut.valid()) {
auto ars = task.fut.get();
if (ars.back().rc == 0) {
run.dag.completeVisit(task.tid);
}
task.complete = true;
}
}
while (! run.dag.allVisited()) { // Add all remaining tasks in a task queue to avoid dominating the thread pool
auto tq = std::make_shared<TaskQueue>();
auto t = run.dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
TaskState tsk{.tid = t.value(), .fut = tq->addTask(
[&]() { return runTask(run.tasks[t.value()]); }), .complete = false
};
tasks.push_back(std::move(tsk));
// Check for any completed tasks //
for (auto & task : tasks) { auto nt = run.dag.visitNext();
if (task.complete) continue; if (not nt.has_value()) break;
t.emplace(nt.value());
}
if (! tq->empty()) {
executors_.addTasks(tq);
}
if (task.fut.valid()) { std::this_thread::sleep_for(250ms);
auto ars = task.fut.get();
if (ars.back().rc == 0) {
run.dag.completeVisit(task.tid);
}
task.complete = true;
} }
}
// Get the next dag to run
auto t = run.dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
Task tsk{ .tid = t.value()
, .fut = executors_.addTask([&](){return runTask(run.tasks[t.value()]);})
, .complete = false
};
tasks.push_back(std::move(tsk));
//
auto nt = run.dag.visitNext();
if (not nt.has_value()) break;
t.emplace(nt.value());
}
std::this_thread::sleep_for(250ms);
}
}
std::vector<AttemptRecord>
Scheduler::runTask(const Task & task) {
std::vector<AttemptRecord> attempts;
while (attempts.size() < task.max_retries) {
attempts.push_back(executor_.runCommand(task.command));
if (attempts.back().rc == 0) break;
} }
return attempts; std::vector<AttemptRecord>
} Scheduler::runTask(const Task &task) {
std::vector<AttemptRecord> attempts;
void Scheduler::drain() { while (attempts.size() < task.max_retries) {
schedulers_.drain(); attempts.push_back(executor_.runCommand(task.command));
} if (attempts.back().rc == 0) break;
}
return attempts;
}
void Scheduler::drain() {
schedulers_.drain();
}
} }

View File

@@ -3,55 +3,54 @@
using namespace Pistache; using namespace Pistache;
namespace daggy { namespace daggy {
void Server::init(int threads) { void Server::init(int threads) {
auto opts = Http::Endpoint::options() auto opts = Http::Endpoint::options()
.threads(threads) .threads(threads);
; endpoint_.init(opts);
endpoint_.init(opts); createDescription();
createDescription(); }
}
void Server::start() { void Server::start() {
router_.initFromDescription(desc_); router_.initFromDescription(desc_);
endpoint_.setHandler(router_.handler()); endpoint_.setHandler(router_.handler());
endpoint_.serve(); endpoint_.serve();
} }
void Server::createDescription() { void Server::createDescription() {
desc_ desc_
.info() .info()
.license("Apache", "http://www.apache.org/licenses/LICENSE-2.0") .license("Apache", "http://www.apache.org/licenses/LICENSE-2.0");
;
auto backendErrorResponse = desc_.response(Http::Code::Internal_Server_Error, "An error occured with the backend"); auto backendErrorResponse = desc_.response(Http::Code::Internal_Server_Error,
"An error occured with the backend");
desc_ desc_
.schemes(Rest::Scheme::Http) .schemes(Rest::Scheme::Http)
.basePath("/v1") .basePath("/v1")
.produces(MIME(Application, Json)) .produces(MIME(Application, Json))
.consumes(MIME(Application, Json)); .consumes(MIME(Application, Json));
/* /*
desc_ desc_
.route(desc_.get("/ready")) .route(desc_.get("/ready"))
.bind(&Generic::handleReady) .bind(&Generic::handleReady)
.response(Http::Code::Ok, "Response to the /ready call") .response(Http::Code::Ok, "Response to the /ready call")
.hide(); .hide();
*/ */
auto versionPath = desc_.path("/v1"); auto versionPath = desc_.path("/v1");
auto accountsPath = versionPath.path("/accounts"); auto accountsPath = versionPath.path("/accounts");
/* /*
accountsPath accountsPath
.route(desc_.get("/all")) .route(desc_.get("/all"))
.bind(&BankerService::retrieveAllAccounts, this) .bind(&BankerService::retrieveAllAccounts, this)
.produces(MIME(Application, Json), MIME(Application, Xml)) .produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "The list of all account"); .response(Http::Code::Ok, "The list of all account");
*/ */
} }
} }

View File

@@ -10,79 +10,80 @@
using namespace daggy::executor; using namespace daggy::executor;
std::string slurp(int fd) { std::string slurp(int fd) {
std::string result; std::string result;
const ssize_t BUFFER_SIZE = 4096; const ssize_t BUFFER_SIZE = 4096;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
struct pollfd pfd{ .fd = fd, .events = POLLIN, .revents = 0 }; struct pollfd pfd{.fd = fd, .events = POLLIN, .revents = 0};
poll(&pfd, 1, 1);
while (pfd.revents & POLLIN) {
ssize_t bytes = read(fd, buffer, BUFFER_SIZE);
if (bytes == 0) {
break;
} else {
result.append(buffer, bytes);
}
pfd.revents = 0;
poll(&pfd, 1, 1); poll(&pfd, 1, 1);
}
return result; while (pfd.revents & POLLIN) {
ssize_t bytes = read(fd, buffer, BUFFER_SIZE);
if (bytes == 0) {
break;
} else {
result.append(buffer, bytes);
}
pfd.revents = 0;
poll(&pfd, 1, 1);
}
return result;
} }
daggy::AttemptRecord daggy::AttemptRecord
ForkingExecutor::runCommand(std::vector<std::string> cmd) ForkingExecutor::runCommand(std::vector<std::string> cmd) {
{ AttemptRecord rec;
AttemptRecord rec;
rec.startTime = Clock::now(); rec.startTime = Clock::now();
// Need to convert the strings // Need to convert the strings
std::vector<char *> argv; std::vector<char *> argv;
for (const auto & s : cmd) { for (const auto &s : cmd) {
argv.push_back(const_cast<char *>(s.c_str())); argv.push_back(const_cast<char *>(s.c_str()));
} }
argv.push_back(nullptr); argv.push_back(nullptr);
// Create the pipe // Create the pipe
int stdoutPipe[2]; pipe2(stdoutPipe, O_DIRECT); int stdoutPipe[2];
int stderrPipe[2]; pipe2(stderrPipe, O_DIRECT); pipe2(stdoutPipe, O_DIRECT);
int stderrPipe[2];
pipe2(stderrPipe, O_DIRECT);
pid_t child = fork();
if (child < 0) {
throw std::runtime_error("Unable to fork child");
} else if (child == 0) { // child
while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {}
while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {}
close(stdoutPipe[0]);
close(stderrPipe[0]);
execvp(argv[0], argv.data());
exit(-1);
}
std::atomic<bool> running = true;
std::thread stdoutReader([&]() { while (running) rec.output.append(slurp(stdoutPipe[0])); });
std::thread stderrReader([&]() { while (running) rec.error.append(slurp(stderrPipe[0])); });
int rc = 0;
waitpid(child, &rc, 0);
running = false;
rec.stopTime = Clock::now();
if (WIFEXITED(rc)) {
rec.rc = WEXITSTATUS(rc);
} else {
rec.rc = -1;
}
stdoutReader.join();
stderrReader.join();
pid_t child = fork();
if (child < 0) {
throw std::runtime_error("Unable to fork child");
} else if (child == 0) { // child
while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {}
while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {}
close(stdoutPipe[0]); close(stdoutPipe[0]);
close(stderrPipe[0]); close(stderrPipe[0]);
execvp(argv[0], argv.data());
exit(-1);
}
std::atomic<bool> running = true; return rec;
std::thread stdoutReader([&]() { while(running) rec.output.append(slurp(stdoutPipe[0])); });
std::thread stderrReader([&]() { while(running) rec.error.append(slurp(stderrPipe[0])); });
int rc = 0;
waitpid(child, &rc, 0);
running = false;
rec.stopTime = Clock::now();
if (WIFEXITED(rc)) {
rec.rc = WEXITSTATUS(rc);
} else {
rec.rc = -1;
}
stdoutReader.join();
stderrReader.join();
close(stdoutPipe[0]);
close(stderrPipe[0]);
return rec;
} }

View File

@@ -17,7 +17,9 @@ TEST_CASE("Basic Scheduler Execution", "[scheduler]") {
}; };
SECTION("Simple Run") { SECTION("Simple Run") {
auto fut = sched.scheduleDAG("Simple", tasks, {}); auto fut_a = sched.scheduleDAG("Simple 1", tasks, {});
fut.get(); auto fut_b = sched.scheduleDAG("Simple 2", tasks, {});
fut_a.get();
fut_b.get();
} }
} }