Changing behaviour of runner to submit at most MAX_SUBMITS for execution before checking for completed tasks

This commit is contained in:
Ian Roddis
2022-01-28 14:28:46 -04:00
parent 5af8deabec
commit 782e6a8b97

View File

@@ -55,7 +55,7 @@ namespace daggy {
} }
} }
std::this_thread::sleep_for(250ms); std::this_thread::sleep_for(100ms);
{ {
std::lock_guard<std::mutex> lock(runGuard_); std::lock_guard<std::mutex> lock(runGuard_);
allVisited = dag_.allVisited(); allVisited = dag_.allVisited();
@@ -95,12 +95,21 @@ namespace daggy {
if (!running_) if (!running_)
return; return;
// Check for any completed tasks const size_t MAX_SUBMITS = 25;
// Add all remaining tasks in a task queue to avoid dominating the thread size_t n_submitted = 0;
// pool
auto t = dag_.visitNext(); /*
while (t.has_value()) { In cases where there are many tasks ready to execute,
// Schedule the task to run the blocking nature of executor_.execute(...) means
that tasks will get executed and the futures resolve,
but we won't know until all pending tasks are enqueued.
To avoid this, submit at most MAX_SUBMITS tasks before
returning to allow completed tasks to be updated.
*/
while (n_submitted < MAX_SUBMITS) {
auto t = dag_.visitNext();
auto &taskName = t.value().first; auto &taskName = t.value().first;
taskAttemptCounts_[taskName] = 1; taskAttemptCounts_[taskName] = 1;
@@ -114,11 +123,7 @@ namespace daggy {
std::cout << "Unable to execute task: " << e.what() << std::endl; std::cout << "Unable to execute task: " << e.what() << std::endl;
} }
++nRunningTasks_; ++nRunningTasks_;
++n_submitted;
auto nextTask = dag_.visitNext();
if (not nextTask.has_value())
break;
t.emplace(nextTask.value());
} }
} }