From 9e7d78788b9a9166ebee4c3e6ae547db7203853a Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Thu, 13 Jan 2022 12:52:00 -0400 Subject: [PATCH] Fixing issue in dagrunner where tasks were never removed from running list --- daggyd/libdaggyd/src/Server.cpp | 3 +-- libdaggy/src/DAGRunner.cpp | 6 ++++++ libdaggy/src/loggers/dag_run/OStreamLogger.cpp | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 43314b2..fbd0405 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -587,8 +587,7 @@ namespace daggy::daggyd { return a.startTime < b.startTime; }); - for (size_t i = 0; i < tr->attempts.size(); ++i) { - const auto &attempt = tr->attempts[i]; + for (const auto &attempt : tr->attempts) { ss << "" << timePointToString(attempt.startTime) << "
rc: " << attempt.rc
            << "\n\nstdout:\n--------------\n"
diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp
index deed5f4..e56d472 100644
--- a/libdaggy/src/DAGRunner.cpp
+++ b/libdaggy/src/DAGRunner.cpp
@@ -124,10 +124,12 @@ namespace daggy {
 
   void DAGRunner::collectFinished()
   {
+    std::vector completedTasks;
     for (auto &[taskName, fut] : runningTasks_) {
       if (fut->ready()) {
         auto attempt = fut->get();
         logger_.logTaskAttempt(runID_, taskName, attempt);
+        completedTasks.push_back(taskName);
 
         // Not a reference, since adding tasks will invalidate references
         auto vert  = dag_.getVertex(taskName);
@@ -196,6 +198,10 @@ namespace daggy {
         }
       }
     }
+
+    for (const auto &taskName : completedTasks) {
+      runningTasks_.extract(taskName);
+    }
   }
 
   void DAGRunner::stop(bool kill, bool blocking)
diff --git a/libdaggy/src/loggers/dag_run/OStreamLogger.cpp b/libdaggy/src/loggers/dag_run/OStreamLogger.cpp
index 7490dbd..64c44a2 100644
--- a/libdaggy/src/loggers/dag_run/OStreamLogger.cpp
+++ b/libdaggy/src/loggers/dag_run/OStreamLogger.cpp
@@ -175,12 +175,12 @@ namespace daggy::loggers::dag_run {
                                           const std::string &taskName)
   {
     std::lock_guard lock(guard_);
-    const auto &run = dagRuns_.at(dagRunID);
+    auto &run = dagRuns_.at(dagRunID);
 
     return TaskRecord{.task         = run.dagSpec.tasks.at(taskName),
                       .state        = run.taskRunStates.at(taskName),
                       .stateChanges = run.taskStateChanges.at(taskName),
-                      .attempts     = run.taskAttempts.at(taskName)};
+                      .attempts     = run.taskAttempts[taskName]};
   }
 
   RunState OStreamLogger::getTaskState(DAGRunID dagRunID,