Adding in task attempts drilldown

This commit is contained in:
Ian Roddis
2022-01-06 15:20:06 -04:00
parent 1786b53f7b
commit 856e5bd2f4
7 changed files with 151 additions and 51 deletions

View File

@@ -44,7 +44,11 @@ namespace daggy::loggers::dag_run {
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0;
virtual Task getTask(DAGRunID dagRunID, const std::string &taskName) = 0;
virtual TaskRecord getTaskRecord(DAGRunID dagRunID,
const std::string &taskName) = 0;
virtual RunState getTaskState(DAGRunID dagRunID,
const std::string &taskName) = 0;
const std::string &taskName) = 0;
};
} // namespace daggy::loggers::dag_run

View File

@@ -15,10 +15,19 @@ namespace daggy::loggers::dag_run {
RunState state;
};
struct TaskRecord
{
Task task;
RunState state;
std::vector<StateUpdateRecord> stateChanges;
std::vector<AttemptRecord> attempts;
};
// Pretty heavy weight, but
struct DAGRunRecord
{
DAGSpec dagSpec;
std::unordered_map<std::string, TaskRecord> taskRecords;
std::unordered_map<std::string, RunState> taskRunStates;
std::unordered_map<std::string, std::vector<AttemptRecord>> taskAttempts;
std::unordered_map<std::string, std::vector<StateUpdateRecord>>

View File

@@ -44,6 +44,8 @@ namespace daggy::loggers::dag_run {
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
Task getTask(DAGRunID dagRunID, const std::string &taskName) override;
TaskRecord getTaskRecord(DAGRunID dagRunID,
const std::string &taskName) override;
RunState getTaskState(DAGRunID dagRunID,
const std::string &taskName) override;

View File

@@ -59,6 +59,8 @@ namespace daggy::loggers::dag_run {
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
Task getTask(DAGRunID dagRunID, const std::string &taskName) override;
TaskRecord getTaskRecord(DAGRunID dagRunID,
const std::string &taskName) override;
RunState getTaskState(DAGRunID dagRunID,
const std::string &taskName) override;

View File

@@ -170,6 +170,18 @@ namespace daggy::loggers::dag_run {
return dagRuns_.at(dagRunID).dagSpec.tasks.at(taskName);
}
TaskRecord OStreamLogger::getTaskRecord(DAGRunID dagRunID,
const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
const auto &run = dagRuns_.at(dagRunID);
return TaskRecord{.task = run.dagSpec.tasks.at(taskName),
.state = run.taskRunStates.at(taskName),
.stateChanges = run.taskStateChanges.at(taskName),
.attempts = run.taskAttempts.at(taskName)};
}
RunState OStreamLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{

View File

@@ -252,6 +252,48 @@ namespace daggy::loggers::dag_run {
return taskFromJSON(taskName, resp.as<std::string>());
}
TaskRecord RedisLogger::getTaskRecord(DAGRunID dagRunID,
const std::string &taskName)
{
// Task State
auto taskState = RunState::_from_string(
ctx_.query("HGET %s %s", getTaskStatesKey_(dagRunID).c_str(),
taskName.c_str())
.as<std::string>()
.c_str());
// task
auto task = taskFromJSON(
taskName, ctx_.query("HGET %s %s", getTasksKey_(dagRunID).c_str(),
taskName.c_str())
.as<std::string>());
// Attempts
auto attemptJSONS =
ctx_.query("LRANGE %s 0 -1",
getTaskAttemptKey_(dagRunID, taskName).c_str())
.asList<std::string>();
std::vector<AttemptRecord> attempts;
std::transform(attemptJSONS.begin(), attemptJSONS.end(),
std::back_inserter(attempts),
[](const auto &s) { return attemptRecordFromJSON(s); });
// Populate stateUpdates
auto taskStateUpdates =
ctx_.query("LRANGE %s 0 -1",
getTaskStateUpdateKey_(dagRunID, taskName).c_str())
.asList<std::string>();
std::vector<StateUpdateRecord> stateUpdates;
std::transform(taskStateUpdates.begin(), taskStateUpdates.end(),
std::back_inserter(stateUpdates),
[](const auto &s) { return stateUpdateRecordFromJSON(s); });
return TaskRecord{.task = task,
.state = taskState,
.stateChanges = stateUpdates,
.attempts = attempts};
}
RunState RedisLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{