diff --git a/daggyd/libdaggyd/src/Server.cpp b/daggyd/libdaggyd/src/Server.cpp index 43314b2..fbd0405 100644 --- a/daggyd/libdaggyd/src/Server.cpp +++ b/daggyd/libdaggyd/src/Server.cpp @@ -587,8 +587,7 @@ namespace daggy::daggyd { return a.startTime < b.startTime; }); - for (size_t i = 0; i < tr->attempts.size(); ++i) { - const auto &attempt = tr->attempts[i]; + for (const auto &attempt : tr->attempts) { ss << "
rc: " << attempt.rc
<< "\n\nstdout:\n--------------\n"
diff --git a/libdaggy/src/DAGRunner.cpp b/libdaggy/src/DAGRunner.cpp
index deed5f4..e56d472 100644
--- a/libdaggy/src/DAGRunner.cpp
+++ b/libdaggy/src/DAGRunner.cpp
@@ -124,10 +124,12 @@ namespace daggy {
void DAGRunner::collectFinished()
{
+ std::vector completedTasks;
for (auto &[taskName, fut] : runningTasks_) {
if (fut->ready()) {
auto attempt = fut->get();
logger_.logTaskAttempt(runID_, taskName, attempt);
+ completedTasks.push_back(taskName);
// Not a reference, since adding tasks will invalidate references
auto vert = dag_.getVertex(taskName);
@@ -196,6 +198,10 @@ namespace daggy {
}
}
}
+
+ for (const auto &taskName : completedTasks) {
+ runningTasks_.extract(taskName);
+ }
}
void DAGRunner::stop(bool kill, bool blocking)
diff --git a/libdaggy/src/loggers/dag_run/OStreamLogger.cpp b/libdaggy/src/loggers/dag_run/OStreamLogger.cpp
index 7490dbd..64c44a2 100644
--- a/libdaggy/src/loggers/dag_run/OStreamLogger.cpp
+++ b/libdaggy/src/loggers/dag_run/OStreamLogger.cpp
@@ -175,12 +175,12 @@ namespace daggy::loggers::dag_run {
const std::string &taskName)
{
std::lock_guard lock(guard_);
- const auto &run = dagRuns_.at(dagRunID);
+ auto &run = dagRuns_.at(dagRunID);
return TaskRecord{.task = run.dagSpec.tasks.at(taskName),
.state = run.taskRunStates.at(taskName),
.stateChanges = run.taskStateChanges.at(taskName),
- .attempts = run.taskAttempts.at(taskName)};
+ .attempts = run.taskAttempts[taskName]};
}
RunState OStreamLogger::getTaskState(DAGRunID dagRunID,