Fixing issue in dagrunner where tasks were never removed from running list

This commit is contained in:
Ian Roddis
2022-01-13 12:52:00 -04:00
parent 854ca5be22
commit 9e7d78788b
3 changed files with 9 additions and 4 deletions

View File

@@ -587,8 +587,7 @@ namespace daggy::daggyd {
return a.startTime < b.startTime; return a.startTime < b.startTime;
}); });
for (size_t i = 0; i < tr->attempts.size(); ++i) { for (const auto &attempt : tr->attempts) {
const auto &attempt = tr->attempts[i];
ss << "<tr><td valign=top>" << timePointToString(attempt.startTime) ss << "<tr><td valign=top>" << timePointToString(attempt.startTime)
<< "</td><td><pre>rc: " << attempt.rc << "</td><td><pre>rc: " << attempt.rc
<< "\n\nstdout:\n--------------\n" << "\n\nstdout:\n--------------\n"

View File

@@ -124,10 +124,12 @@ namespace daggy {
void DAGRunner::collectFinished() void DAGRunner::collectFinished()
{ {
std::vector<std::string> completedTasks;
for (auto &[taskName, fut] : runningTasks_) { for (auto &[taskName, fut] : runningTasks_) {
if (fut->ready()) { if (fut->ready()) {
auto attempt = fut->get(); auto attempt = fut->get();
logger_.logTaskAttempt(runID_, taskName, attempt); logger_.logTaskAttempt(runID_, taskName, attempt);
completedTasks.push_back(taskName);
// Not a reference, since adding tasks will invalidate references // Not a reference, since adding tasks will invalidate references
auto vert = dag_.getVertex(taskName); auto vert = dag_.getVertex(taskName);
@@ -196,6 +198,10 @@ namespace daggy {
} }
} }
} }
for (const auto &taskName : completedTasks) {
runningTasks_.extract(taskName);
}
} }
void DAGRunner::stop(bool kill, bool blocking) void DAGRunner::stop(bool kill, bool blocking)

View File

@@ -175,12 +175,12 @@ namespace daggy::loggers::dag_run {
const std::string &taskName) const std::string &taskName)
{ {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
const auto &run = dagRuns_.at(dagRunID); auto &run = dagRuns_.at(dagRunID);
return TaskRecord{.task = run.dagSpec.tasks.at(taskName), return TaskRecord{.task = run.dagSpec.tasks.at(taskName),
.state = run.taskRunStates.at(taskName), .state = run.taskRunStates.at(taskName),
.stateChanges = run.taskStateChanges.at(taskName), .stateChanges = run.taskStateChanges.at(taskName),
.attempts = run.taskAttempts.at(taskName)}; .attempts = run.taskAttempts[taskName]};
} }
RunState OStreamLogger::getTaskState(DAGRunID dagRunID, RunState OStreamLogger::getTaskState(DAGRunID dagRunID,