diff --git a/daggyd/daggyd/daggyd.cpp b/daggyd/daggyd/daggyd.cpp index 1b39260..ed78a2d 100644 --- a/daggyd/daggyd/daggyd.cpp +++ b/daggyd/daggyd/daggyd.cpp @@ -116,6 +116,7 @@ void daemonize() } namespace dl = daggy::loggers::dag_run; +std::ofstream ofh; std::unique_ptr loggerFactory(const rj::Value &config) { @@ -138,7 +139,7 @@ std::unique_ptr loggerFactory(const rj::Value &config) if (fn == "-") return std::make_unique(std::cout); - std::ofstream ofh(logConfig["file"].GetString()); + ofh.open(logConfig["file"].GetString()); return std::make_unique(ofh); } } diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 2b72ef9..f0ee0df 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -161,8 +161,6 @@ namespace daggy::daggyr { .resourcesUsed = resourcesUsed}); } - std::cout << "Enqueuing " << runID << " / " << taskName << std::endl; - response.send(Pistache::Http::Code::Ok, ""); } @@ -202,8 +200,6 @@ namespace daggy::daggyr { curCapacity_.cores += it->resourcesUsed.cores; curCapacity_.memoryMB += it->resourcesUsed.memoryMB; } - std::cout << "Resolved " << it->runID << " / " << it->taskName - << std::endl; it = pending_.erase(it); } else { diff --git a/libdaggy/include/daggy/ThreadPool.hpp b/libdaggy/include/daggy/ThreadPool.hpp index a4212aa..5774d10 100644 --- a/libdaggy/include/daggy/ThreadPool.hpp +++ b/libdaggy/include/daggy/ThreadPool.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -13,65 +14,11 @@ using namespace std::chrono_literals; namespace daggy { - - /* - A Task Queue is a collection of async tasks to be executed by the - thread pool. Using individual task queues allows for a rough QoS - when a single thread may be submitting batches of requests -- - one producer won't starve out another, but all tasks will be run - as quickly as possible. - */ - class TaskQueue - { - public: - template - decltype(auto) addTask(F &&f, Args &&...args) - { - // using return_type = std::invoke_result::type; - using return_type = std::invoke_result_t; - - std::packaged_task task( - std::bind(std::forward(f), std::forward(args)...)); - - std::future res = task.get_future(); - { - std::lock_guard guard(mtx_); - tasks_.emplace(std::move(task)); - } - return res; - } - - std::packaged_task pop() - { - std::lock_guard guard(mtx_); - auto task = std::move(tasks_.front()); - tasks_.pop(); - return task; - } - - size_t size() - { - std::lock_guard guard(mtx_); - return tasks_.size(); - } - - bool empty() - { - std::lock_guard guard(mtx_); - return tasks_.empty(); - } - - private: - std::queue> tasks_; - std::mutex mtx_; - }; - class ThreadPool { public: explicit ThreadPool(size_t nWorkers) - : tqit_(taskQueues_.begin()) - , stop_(false) + : stop_(false) , drain_(false) { resize(nWorkers); @@ -98,7 +45,7 @@ namespace daggy { while (true) { { std::lock_guard guard(mtx_); - if (taskQueues_.empty()) + if (tasks_.empty()) break; } std::this_thread::sleep_for(250ms); @@ -118,25 +65,18 @@ namespace daggy { for (size_t i = 0; i < nWorkers; ++i) workers_.emplace_back([&] { + std::packaged_task task; while (true) { - std::packaged_task task; { std::unique_lock lock(mtx_); - cv_.wait(lock, [&] { return stop_ || !taskQueues_.empty(); }); - if (taskQueues_.empty()) { + cv_.wait(lock, [&] { return stop_ || !tasks_.empty(); }); + if (tasks_.empty()) { if (stop_) return; continue; } - if (tqit_ == taskQueues_.end()) - tqit_ = taskQueues_.begin(); - task = (*tqit_)->pop(); - if ((*tqit_)->empty()) { - tqit_ = taskQueues_.erase(tqit_); - } - else { - tqit_++; - } + task.swap(tasks_.front()); + tasks_.pop(); } task(); } @@ -148,25 +88,18 @@ namespace daggy { { if (drain_) throw std::runtime_error("Unable to add task to draining pool"); - auto tq = std::make_shared(); + using return_type = std::invoke_result_t; - auto fut = tq->addTask(f, args...); + std::packaged_task task( + std::bind(std::forward(f), std::forward(args)...)); + std::future res = task.get_future(); { std::lock_guard guard(mtx_); - taskQueues_.push_back(tq); + tasks_.emplace(std::move(task)); } cv_.notify_one(); - return fut; - } - - void addTasks(std::shared_ptr &tq) - { - if (drain_) - throw std::runtime_error("Unable to add task to draining pool"); - std::lock_guard guard(mtx_); - taskQueues_.push_back(tq); - cv_.notify_one(); + return res; } size_t size() const @@ -174,12 +107,17 @@ namespace daggy { return workers_.size(); } + size_t queueSize() + { + std::lock_guard lock(mtx_); + return tasks_.size(); + } + private: // need to keep track of threads, so we can join them std::vector workers_; // the task queue - std::list> taskQueues_; - std::list>::iterator tqit_; + std::queue> tasks_; // synchronization std::mutex mtx_; @@ -187,5 +125,4 @@ namespace daggy { std::atomic stop_; std::atomic drain_; }; - } // namespace daggy diff --git a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp index 65be9b1..1a9d268 100644 --- a/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp +++ b/libdaggy/include/daggy/executors/task/DaggyRunnerTaskExecutor.hpp @@ -51,7 +51,7 @@ namespace daggy::executors::task { bool stop(DAGRunID runID, const std::string &taskName) override; - std::string description() const; + std::string description() const override; void addRunner(const std::string &url); diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp index d4c0fad..342d17e 100644 --- a/libdaggy/src/DAGRunner.cpp +++ b/libdaggy/src/DAGRunner.cpp @@ -111,6 +111,7 @@ namespace daggy { runningTasks_.emplace(taskName, std::move(fut)); } catch (std::exception &e) { + std::cout << "Unable to execute task: " << e.what() << std::endl; } ++nRunningTasks_; diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 52e4a39..d7a0c1a 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -154,8 +154,6 @@ std::future DaggyRunnerTaskExecutor::execute( // Capacities for a runner can be negative, meaning that they're currently // oversubscribed. std::vector> impacts; - std::string runner; - { std::lock_guard lock(runnersGuard_); for (auto &[runner, caps] : runners_) { @@ -191,28 +189,43 @@ std::future DaggyRunnerTaskExecutor::execute( prom.set_value(std::move(record)); return fut; } + } - std::sort(impacts.begin(), impacts.end(), - [](const auto &a, const auto &b) { return a.second < b.second; }); + std::sort(impacts.begin(), impacts.end(), + [](const auto &a, const auto &b) { return a.second > b.second; }); - runner = impacts.back().first; + std::string submitted_runner; + for (const auto &[runner, _] : impacts) { auto &caps = runners_.at(runner); caps.current.cores -= taskUsed.cores; caps.current.memoryMB -= taskUsed.memoryMB; + + std::stringstream ss; + ss << runner << "/v1/task/" << runID << "/" << taskName; + auto url = ss.str(); + + const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); + if (response.code != HTTPCode::Ok) { + continue; + // throw std::runtime_error("Unable to submit task: " + response.body); + } + + submitted_runner = runner; } - std::stringstream ss; - ss << runner << "/v1/task/" << runID << "/" << taskName; - auto url = ss.str(); - - const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST"); - if (response.code != HTTPCode::Ok) - throw std::runtime_error("Unable to submit task: " + response.body); + if (!submitted_runner.empty()) { + std::promise prom; + auto fut = prom.get_future(); + AttemptRecord record{.rc = -1, + .executorLog = "No runners available for execution"}; + prom.set_value(std::move(record)); + return fut; + } RunningTask rt{.prom{}, .runID = runID, .taskName = taskName, - .runnerURL = runner, + .runnerURL = submitted_runner, .retries = 3, .resources = taskUsed}; @@ -250,6 +263,8 @@ void DaggyRunnerTaskExecutor::addRunner(const std::string &url) void DaggyRunnerTaskExecutor::monitor() { + std::unordered_map runners; + while (running_) { std::unordered_map, std::optional> @@ -258,6 +273,7 @@ void DaggyRunnerTaskExecutor::monitor() std::unordered_map, Capacity> taskResources; + // Cache what's running now { std::lock_guard lock(rtGuard_); for (const auto &[tid, info] : runningTasks_) { @@ -267,39 +283,40 @@ void DaggyRunnerTaskExecutor::monitor() { std::lock_guard lock(runnersGuard_); - for (auto &[runnerURL, caps] : runners_) { - rj::Document doc; - try { - auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); - if (code != HTTPCode::Ok) - continue; - doc.Swap(json); - } - catch (std::exception &e) { - std::cout << "Curl failed for runner " << runnerURL << ": " - << e.what() << std::endl; - } + runners = runners_; + } - const auto tasks = doc.GetArray(); - for (size_t idx = 0; idx < tasks.Size(); ++idx) { - const auto &task = tasks[idx]; - if (task["state"] == "PENDING") { - resolvedJobs.emplace(std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()), - std::nullopt); - } - else { - auto tid = std::make_pair(task["runID"].GetInt64(), - task["taskName"].GetString()); - auto it = taskResources.find(tid); - if (it != taskResources.end()) { - const auto &res = taskResources.at(tid); - caps.current.cores += res.cores; - caps.current.memoryMB += res.memoryMB; - } + for (auto &[runnerURL, caps] : runners) { + rj::Document doc; + try { + auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll"); + if (code != HTTPCode::Ok) + continue; + doc.Swap(json); + } + catch (std::exception &e) { + continue; + } - resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); + const auto tasks = doc.GetArray(); + for (size_t idx = 0; idx < tasks.Size(); ++idx) { + const auto &task = tasks[idx]; + auto tid = std::make_pair(task["runID"].GetInt64(), + task["taskName"].GetString()); + + if (task["state"] == "PENDING") { + resolvedJobs.emplace(tid, std::nullopt); + } + else { + auto it = taskResources.find(tid); + if (it != taskResources.end()) { + const auto &res = taskResources.at(tid); + caps.current.cores += res.cores; + caps.current.memoryMB += res.memoryMB; } + + auto attempt = attemptRecordFromJSON(task["attempt"]); + resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"])); } } } diff --git a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp index ca39994..f645c13 100644 --- a/libdaggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/libdaggy/src/executors/task/ForkingTaskExecutor.cpp @@ -97,7 +97,7 @@ std::future ForkingTaskExecutor::execute( std::lock_guard lock(taskControlsGuard_); auto [it, ins] = taskControls_.emplace(key, true); auto &running = it->second; - return tp_.addTask([this, task, &running, key]() { + return tp_.addTask([this, task, taskName, &running, key]() { auto ret = this->runTask(task, running); std::lock_guard lock(this->taskControlsGuard_); this->taskControls_.extract(key); @@ -147,12 +147,16 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, // Create the pipe int stdoutPipe[2]; int pipeRC = pipe2(stdoutPipe, O_DIRECT); - if (pipeRC != 0) + if (pipeRC != 0) { + std::cerr << "Unable to create pipe for stdout: " << pipeRC << std::endl; throw std::runtime_error("Unable to create pipe for stdout"); + } int stderrPipe[2]; pipeRC = pipe2(stderrPipe, O_DIRECT); - if (pipeRC != 0) + if (pipeRC != 0) { + std::cerr << "Unable to create pipe for stderr" << std::endl; throw std::runtime_error("Unable to create pipe for stderr"); + } pid_t child = fork(); if (child < 0) { @@ -187,7 +191,7 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, if (childInfo.si_pid > 0) { break; } - std::this_thread::sleep_for(250ms); + std::this_thread::sleep_for(100ms); } if (!running) { @@ -215,6 +219,8 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, close(stdoutPipe[0]); close(stderrPipe[0]); + close(stdoutPipe[1]); + close(stderrPipe[1]); return rec; } diff --git a/libdaggy/tests/unit_threadpool.cpp b/libdaggy/tests/unit_threadpool.cpp index 054a25b..852ba98 100644 --- a/libdaggy/tests/unit_threadpool.cpp +++ b/libdaggy/tests/unit_threadpool.cpp @@ -15,14 +15,12 @@ TEST_CASE("threadpool", "[threadpool]") SECTION("Adding large tasks queues with return values") { - auto tq = std::make_shared(); std::vector> res; for (size_t i = 0; i < 100; ++i) - res.emplace_back(tq->addTask([&cnt]() { + res.emplace_back(tp.addTask([&cnt]() { cnt++; return cnt.load(); })); - tp.addTasks(tq); for (auto &r : res) r.get(); REQUIRE(cnt == 100);