Fixing up threadpool

This commit is contained in:
Ian Roddis
2021-06-20 10:33:35 -03:00
parent 1003e88303
commit 209ec6f380
4 changed files with 32 additions and 14 deletions

View File

@@ -22,7 +22,7 @@ namespace daggy {
std::future<void> addTask(AsyncTask fn); std::future<void> addTask(AsyncTask fn);
private: private:
using QueuedAsyncTask = std::shared_ptr<AsyncTask>; using QueuedAsyncTask = std::shared_ptr<std::packaged_task<void()>>;
std::atomic<bool> shutdown_; std::atomic<bool> shutdown_;
std::mutex guard_; std::mutex guard_;

View File

@@ -13,7 +13,9 @@ ThreadPool::ThreadPool(size_t nWorkers) {
std::unique_lock<std::mutex> lk(guard_); std::unique_lock<std::mutex> lk(guard_);
cv_.wait(lk, []{ return true; }); cv_.wait(lk, []{ return true; });
if (shutdown_) return; if (shutdown_) return;
tsk.swap(taskQueue_.front()); if (taskQueue_.empty()) continue;
tsk = taskQueue_.front();
taskQueue_.pop_front(); taskQueue_.pop_front();
} }
@@ -41,7 +43,7 @@ std::future<void> ThreadPool::addTask(std::function<void()> fn) {
std::future<void> result = task->get_future(); std::future<void> result = task->get_future();
{ {
std::unique_lock<std::mutex> lk(guard_); std::unique_lock<std::mutex> lk(guard_);
taskQueue_.emplace_back(std::move(task)); taskQueue_.push_back(task);
} }
cv_.notify_one(); cv_.notify_one();
return result; return result;

View File

@@ -29,13 +29,22 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
} }
SECTION("Large Output") { SECTION("Large Output") {
const std::string BIG_FILE{"/usr/share/dict/linux.words"}; const std::vector<std::string> BIG_FILES{
std::vector<std::string> cmd{"/usr/bin/cat", BIG_FILE}; "/usr/share/dict/linux.words"
, "/usr/share/dict/cracklib-small"
, "/etc/ssh/moduli"
};
auto rec = ex.runCommand(cmd); for (const auto & bigFile : BIG_FILES) {
if (! std::filesystem::exists(bigFile)) continue;
REQUIRE(rec.rc == 0); std::vector<std::string> cmd{"/usr/bin/cat", bigFile};
REQUIRE(rec.output.size() == std::filesystem::file_size(BIG_FILE));
REQUIRE(rec.error.empty()); auto rec = ex.runCommand(cmd);
REQUIRE(rec.rc == 0);
REQUIRE(rec.output.size() == std::filesystem::file_size(bigFile));
REQUIRE(rec.error.empty());
}
} }
} }

View File

@@ -12,13 +12,20 @@ TEST_CASE("Threadpool Construction", "[threadpool]") {
ThreadPool tp(10); ThreadPool tp(10);
std::vector<std::future<void>> res; std::vector<std::future<void>> res;
for (size_t i = 0; i < 100; ++i) {
res.push_back(tp.addTask([&cnt]() -> void { cnt++; return; })); SECTION("Simple runs") {
for (size_t i = 0; i < 100; ++i)
res.push_back(tp.addTask([&cnt]() { cnt++; return; }));
for (auto & r : res) r.get();
REQUIRE(cnt == 100);
} }
for (auto & r : res) { SECTION("Slow runs") {
r.get(); using namespace std::chrono_literals;
for (size_t i = 0; i < 100; ++i)
res.push_back(tp.addTask([&cnt]() { std::this_thread::sleep_for(20ms); cnt++; return; }));
for (auto & r : res) r.get();
REQUIRE(cnt == 100);
} }
REQUIRE(cnt == 100);
} }