From d871572d5e780d8134726219aefe9e5c416d9665 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Wed, 16 Jun 2021 10:48:31 -0300 Subject: [PATCH] Checkpointing work on first-pass implementation --- daggy/include/daggy/ThreadPool.hpp | 14 +++++++++- daggy/src/ThreadPool.cpp | 44 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 daggy/src/ThreadPool.cpp diff --git a/daggy/include/daggy/ThreadPool.hpp b/daggy/include/daggy/ThreadPool.hpp index ad8b1d7..420a1ae 100644 --- a/daggy/include/daggy/ThreadPool.hpp +++ b/daggy/include/daggy/ThreadPool.hpp @@ -5,15 +5,27 @@ #include #include #include +#include +#include namespace daggy { + using AsyncTask = std::function; class ThreadPool { public: ThreadPool(size_t nWorkers); + ~ThreadPool(); + + ThreadPool(const ThreadPool & other) = delete; + ThreadPool(ThreadPool & other) = delete; + + void shutdown(); + + std::future addTask(AsyncTask fn); private: std::atomic shutdown_; std::mutex guard_; std::condition_variable cv_; - std::vector workers; + std::vector workers_; + std::deque taskQueue_; }; } diff --git a/daggy/src/ThreadPool.cpp b/daggy/src/ThreadPool.cpp new file mode 100644 index 0000000..3e4963a --- /dev/null +++ b/daggy/src/ThreadPool.cpp @@ -0,0 +1,44 @@ +#include + +using namespace daggy; + +ThreadPool::ThreadPool(size_t nWorkers) { + for (size_t i = 0; i < nWorkers; ++i) { + workers_.emplace_back([&]() { + while (true) { + AsyncTask tsk; + + { + std::unique_lock lk(guard_); + cv_.wait(lk, []{ return true; }); + if (shutdown_) return; + tsk = taskQueue_.front(); + taskQueue_.pop_front(); + } + + tsk(); + } + }); + } +} + +ThreadPool::~ThreadPool() { + shutdown(); +} + +void ThreadPool::shutdown() { + shutdown_ = true; + cv_.notify_all(); + + for (auto & w : workers_) { + w.join(); + } +} + +std::future ThreadPool::addTask(std::function fn) { + { + std::unique_lock lk(guard_); + taskQueue_.push_back(fn); + } + cv_.notify_one(); +}