diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp index e56d472..c2773a3 100644 --- a/libdaggy/src/DAGRunner.cpp +++ b/libdaggy/src/DAGRunner.cpp @@ -55,7 +55,7 @@ namespace daggy { } } - std::this_thread::sleep_for(250ms); + std::this_thread::sleep_for(100ms); { std::lock_guard lock(runGuard_); allVisited = dag_.allVisited(); @@ -95,12 +95,21 @@ namespace daggy { if (!running_) return; - // Check for any completed tasks - // Add all remaining tasks in a task queue to avoid dominating the thread - // pool - auto t = dag_.visitNext(); - while (t.has_value()) { - // Schedule the task to run + const size_t MAX_SUBMITS = 25; + size_t n_submitted = 0; + + /* + In cases where there are many tasks ready to execute, + the blocking nature of executor_.execute(...) means + that tasks will get executed and the futures resolve, + but we won't know until all pending tasks are enqueued. + + To avoid this, submit at most MAX_SUBMITS tasks before + returning to allow completed tasks to be updated. + */ + + while (n_submitted < MAX_SUBMITS) { + auto t = dag_.visitNext(); auto &taskName = t.value().first; taskAttemptCounts_[taskName] = 1; @@ -114,11 +123,7 @@ namespace daggy { std::cout << "Unable to execute task: " << e.what() << std::endl; } ++nRunningTasks_; - - auto nextTask = dag_.visitNext(); - if (not nextTask.has_value()) - break; - t.emplace(nextTask.value()); + ++n_submitted; } }