Migrating to vanilla redis from rejson, using a threadsafe RedisHelper

Squashed commit of the following:

commit f3549005c0192fd77bf47d208b74a11dd21380fa
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Sun Oct 17 10:52:12 2021 -0300

    Fixing issues with serialization of attempt records that included newlines

commit ab387d62850428e320a05cdf54fd2026369d0bb6
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Fri Oct 15 12:57:14 2021 -0300

    Migrating to vanilla redis from rejson

commit f648cf065dea2d0a7e30aaec17441e9a37531ba0
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Fri Oct 15 10:23:34 2021 -0300

    Making rediscontext threadsafe

commit ca51d95c5c4a5f4aaa13c5abe32da161e919d66c
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Fri Oct 15 10:01:33 2021 -0300

    updating todo
This commit is contained in:
Ian Roddis
2021-10-17 10:52:27 -03:00
parent 08260043d0
commit e4e50fc219
9 changed files with 193 additions and 191 deletions

View File

@@ -15,6 +15,10 @@ Tasks
- Implementations - Implementations
- [ ] General logger - [ ] General logger
- [ ] SQL logger (sqlite, postgres) - [ ] SQL logger (sqlite, postgres)
- RedisLogger
- Convert to vanilla redis
- HGETALL
- Add prefix to distinguish daggyd instances
- Server - Server
- [ ] Multiple executors - [ ] Multiple executors
- [ ] Log to general logger - [ ] Log to general logger

View File

@@ -16,7 +16,7 @@ namespace rj = rapidjson;
namespace daggy { namespace daggy {
void checkRJParse(const rj::ParseResult &result, void checkRJParse(const rj::ParseResult &result,
const std::string &prefix = ""); const std::string &prefix = "");
void dumpJSON(const rj::Value &doc, std::ostream &os); std::string dumpJSON(const rj::Value &doc);
// Parameters // Parameters
ConfigValues configFromJSON(const std::string &jsonSpec); ConfigValues configFromJSON(const std::string &jsonSpec);

View File

@@ -8,6 +8,7 @@
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <iostream> #include <iostream>
#include <mutex>
#include <string> #include <string>
#include <variant> #include <variant>
#include <vector> #include <vector>
@@ -89,6 +90,7 @@ namespace daggy::loggers::dag_run::redis {
template <class... Args> template <class... Args>
RedisData query(Args &&...args) RedisData query(Args &&...args)
{ {
std::lock_guard<std::mutex> lock(contextGuard_);
redisReply *reply = static_cast<redisReply *>( redisReply *reply = static_cast<redisReply *>(
redisCommand(ctx_, std::forward<Args>(args)...)); redisCommand(ctx_, std::forward<Args>(args)...));
@@ -119,6 +121,7 @@ namespace daggy::loggers::dag_run::redis {
private: private:
RedisData parseReply_(const redisReply *reply); RedisData parseReply_(const redisReply *reply);
redisContext *ctx_; redisContext *ctx_;
std::mutex contextGuard_;
}; };
} // namespace daggy::loggers::dag_run::redis } // namespace daggy::loggers::dag_run::redis

View File

@@ -28,8 +28,9 @@ namespace daggy::loggers::dag_run {
class RedisLogger : public DAGRunLogger class RedisLogger : public DAGRunLogger
{ {
public: public:
explicit RedisLogger(const std::string &host = "127.0.0.1", explicit RedisLogger(const std::string &prefix = "daggy",
int port = 6379); const std::string &host = "127.0.0.1",
int port = 6379);
// Execution // Execution
DAGRunID startDAGRun(const DAGSpec &dagSpec) override; DAGRunID startDAGRun(const DAGSpec &dagSpec) override;
@@ -62,13 +63,39 @@ namespace daggy::loggers::dag_run {
const std::string &taskName) override; const std::string &taskName) override;
private: private:
std::string host_; const std::string prefix_;
int port_;
const std::string dagRunIDsKey_; const std::string dagRunIDsKey_;
redis::RedisContext ctx_;
inline const std::string getDAGTagMembersKey_(const std::string &) const; inline const std::string getDAGPrefix_(DAGRunID runID) const
inline const std::string getDAGRunKey_(DAGRunID) const; {
return prefix_ + "_" + std::to_string(runID) + "_";
}
#define GET_DAG_KEY(name, extra) \
inline std::string name(DAGRunID runID) const \
{ \
return getDAGPrefix_(runID) + extra; \
}
GET_DAG_KEY(getTagKey_, "tag");
GET_DAG_KEY(getTasksKey_, "tasks");
GET_DAG_KEY(getDAGStateKey_, "state");
GET_DAG_KEY(getDAGStateUpdateKey_, "stateUpdate");
GET_DAG_KEY(getTaskStatesKey_, "taskStates");
GET_DAG_KEY(getTaskVariablesKey_, "taskVariables");
GET_DAG_KEY(getTaskDefaultsKey_, "taskDefaults");
GET_DAG_KEY(getStartTimeKey_, "startTime");
GET_DAG_KEY(getLastUpdateKey_, "lastUpdate");
#define GET_TASK_KEY(name, category) \
inline std::string name(DAGRunID runID, const std::string &taskName) const \
{ \
return getDAGPrefix_(runID) + category + "_" + taskName; \
}
GET_TASK_KEY(getTaskStateUpdateKey_, "taskUpdateState");
GET_TASK_KEY(getTaskAttemptKey_, "taskAttempt");
}; };
} // namespace daggy::loggers::dag_run } // namespace daggy::loggers::dag_run

View File

@@ -1,3 +1,4 @@
#include <rapidjson/document.h>
#include <rapidjson/error/en.h> #include <rapidjson/error/en.h>
#include "rapidjson/stringbuffer.h" #include "rapidjson/stringbuffer.h"
@@ -19,12 +20,12 @@ namespace daggy {
} }
} }
void dumpJSON(const rj::Value &doc, std::ostream &os) std::string dumpJSON(const rj::Value &doc)
{ {
rj::StringBuffer buffer; rj::StringBuffer buffer;
rj::Writer<rj::StringBuffer> writer(buffer); rj::Writer<rj::StringBuffer> writer(buffer);
doc.Accept(writer); doc.Accept(writer);
os << buffer.GetString() << std::endl; return buffer.GetString();
} }
ConfigValues configFromJSON(const std::string &jsonSpec) ConfigValues configFromJSON(const std::string &jsonSpec)
@@ -278,26 +279,45 @@ namespace daggy {
std::string attemptRecordToJSON(const AttemptRecord &record) std::string attemptRecordToJSON(const AttemptRecord &record)
{ {
std::stringstream ss; rj::Document doc;
doc.SetObject();
auto &alloc = doc.GetAllocator();
ss << "{" auto startTime = timePointToString(record.startTime);
<< R"("startTime": )" << std::quoted(timePointToString(record.startTime)) doc.AddMember(
<< ',' << R"("stopTime": )" "startTime",
<< std::quoted(timePointToString(record.stopTime)) << ',' << R"("rc": )" rj::Value().SetString(startTime.c_str(), startTime.size(), alloc),
<< record.rc << ',' << R"("executorLog": )" alloc);
<< std::quoted(record.executorLog) << ',' << R"("outputLog": )"
<< std::quoted(record.outputLog) << ',' << R"("errorLog": )"
<< std::quoted(record.errorLog) << '}';
std::string json = ss.str(); auto stopTime = timePointToString(record.stopTime);
return globalSub(json, "\n", "\\n"); doc.AddMember(
"stopTime",
rj::Value().SetString(stopTime.c_str(), stopTime.size(), alloc), alloc);
doc.AddMember("rc", rj::Value().SetInt(record.rc), alloc);
doc.AddMember("outputLog",
rj::Value().SetString(record.outputLog.c_str(),
record.outputLog.size(), alloc),
alloc);
doc.AddMember("errorLog",
rj::Value().SetString(record.errorLog.c_str(),
record.errorLog.size(), alloc),
alloc);
doc.AddMember("executorLog",
rj::Value().SetString(record.executorLog.c_str(),
record.executorLog.size(), alloc),
alloc);
return dumpJSON(doc);
} }
AttemptRecord attemptRecordFromJSON(const std::string &json) AttemptRecord attemptRecordFromJSON(const std::string &json)
{ {
std::string jsonNew = globalSub(json, "\\n", "\n");
rj::Document doc; rj::Document doc;
checkRJParse(doc.Parse(jsonNew.c_str()), "Parsing config"); checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord");
return attemptRecordFromJSON(doc); return attemptRecordFromJSON(doc);
} }

View File

@@ -12,7 +12,7 @@ namespace daggy {
size_t pos = string.find(pattern); size_t pos = string.find(pattern);
while (pos != std::string::npos) { while (pos != std::string::npos) {
string.replace(pos, pattern.size(), replacement); string.replace(pos, pattern.size(), replacement);
pos = string.find(pattern); pos = string.find(pattern, pos + replacement.size());
} }
return string; return string;
} }

View File

@@ -10,60 +10,40 @@
#include <iterator> #include <iterator>
namespace daggy::loggers::dag_run { namespace daggy::loggers::dag_run {
RedisLogger::RedisLogger(const std::string &host, int port) RedisLogger::RedisLogger(const std::string &prefix, const std::string &host,
: host_(host) int port)
, port_(port) : prefix_(prefix)
, dagRunIDsKey_("dagRunIDs") , dagRunIDsKey_(prefix_ + "_dagRunIDs")
, ctx_(host, port)
{ {
redis::RedisContext ctx(host_, port_); auto resp = ctx_.query("exists %s", dagRunIDsKey_.c_str());
auto resp = ctx.query("exists %s", dagRunIDsKey_.c_str());
if (resp.as<size_t>() == 0) { if (resp.as<size_t>() == 0) {
ctx.query("set %s %s", dagRunIDsKey_.c_str(), "0"); ctx_.query("set %s %s", dagRunIDsKey_.c_str(), "0");
} }
} }
// Execution // Execution
DAGRunID RedisLogger::startDAGRun(const DAGSpec &dagSpec) DAGRunID RedisLogger::startDAGRun(const DAGSpec &dagSpec)
{ {
redis::RedisContext ctx(host_, port_); auto resp = ctx_.query("incr %s", dagRunIDsKey_.c_str());
auto resp = ctx.query("incr %s", dagRunIDsKey_.c_str());
DAGRunID runID = resp.as<size_t>(); DAGRunID runID = resp.as<size_t>();
// Store the DAGRun ctx_.query("SET %s %s", getTagKey_(runID).c_str(), dagSpec.tag.c_str());
std::stringstream ss; ctx_.query("SET %s %s", getStartTimeKey_(runID).c_str(),
timePointToString(Clock::now()).c_str());
ctx_.query("SET %s %s", getTaskVariablesKey_(runID).c_str(),
configToJSON(dagSpec.taskConfig.variables).c_str());
ctx_.query("SET %s %s", getTaskDefaultsKey_(runID).c_str(),
configToJSON(dagSpec.taskConfig.jobDefaults).c_str());
ss << "{" for (const auto &[taskName, task] : dagSpec.tasks) {
<< R"("tag": )" << std::quoted(dagSpec.tag) << R"(, "startTime": )" ctx_.query("HSET %s %s %s", getTasksKey_(runID).c_str(), taskName.c_str(),
<< std::quoted(timePointToString(Clock::now())) taskToJSON(task).c_str());
<< R"(, "stateUpdates": [])" updateTaskState(runID, taskName, RunState::QUEUED);
<< R"(, "taskStates": {})"
<< R"(, "taskStateUpdates": {})"
<< R"(, "taskAttempts": {})"
<< R"(, "tasks": )" << tasksToJSON(dagSpec.tasks)
<< R"(, "taskConfig": {)"
<< R"("variables": )" << configToJSON(dagSpec.taskConfig.variables)
<< R"(, "jobDefaults": )" << configToJSON(dagSpec.taskConfig.jobDefaults)
<< R"(} })";
const auto &dagKey = getDAGRunKey_(runID);
ctx.query("JSON.SET %s . %s", dagKey.c_str(), ss.str().c_str());
for (const auto &[taskName, _] : dagSpec.tasks) {
ctx.query("JSON.SET %s %s []", dagKey.c_str(),
(".taskStateUpdates." + taskName).c_str());
ctx.query("JSON.SET %s %s []", dagKey.c_str(),
(".taskAttempts." + taskName).c_str());
} }
// store tags
ctx.query("SADD %s %s", getDAGTagMembersKey_(dagSpec.tag).c_str(),
std::to_string(runID).c_str());
// Store tasks, initial states // Store tasks, initial states
for (const auto &[taskName, task] : dagSpec.tasks) { for (const auto &[taskName, task] : dagSpec.tasks) {
updateTaskState(runID, taskName, RunState::QUEUED); updateTaskState(runID, taskName, RunState::QUEUED);
@@ -85,81 +65,71 @@ namespace daggy::loggers::dag_run {
void RedisLogger::updateTask(DAGRunID dagRunID, const std::string &taskName, void RedisLogger::updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) const Task &task)
{ {
redis::RedisContext ctx(host_, port_); ctx_.query("HSET %s %s %s", getTasksKey_(dagRunID).c_str(),
std::string taskJSON = taskToJSON(task); taskName.c_str(), taskToJSON(task).c_str());
ctx.query("JSON.SET %s %s %s", getDAGRunKey_(dagRunID).c_str(),
(".tasks." + taskName).c_str(), taskJSON.c_str(),
taskToJSON(task).c_str());
} }
void RedisLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) void RedisLogger::updateDAGRunState(DAGRunID dagRunID, RunState state)
{ {
redis::RedisContext ctx(host_, port_);
const auto &dagKey = getDAGRunKey_(dagRunID);
// Set the state // Set the state
ctx.query(R"(JSON.SET %s .state "%s")", dagKey.c_str(), state._to_string()); ctx_.query("SET %s %s", getDAGStateKey_(dagRunID).c_str(),
state._to_string());
ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(), ctx_.query("SET %s %s", getLastUpdateKey_(dagRunID).c_str(),
timePointToString(Clock::now()).c_str()); timePointToString(Clock::now()).c_str());
// Add the update record // Add the update record
StateUpdateRecord rec{.time = Clock::now(), .state = state}; StateUpdateRecord rec{.time = Clock::now(), .state = state};
ctx.query("JSON.ARRAPPEND %s .stateUpdates %s", dagKey.c_str(), ctx_.query("RPUSH %s %s", getDAGStateUpdateKey_(dagRunID).c_str(),
stateUpdateRecordToJSON(rec).c_str()); stateUpdateRecordToJSON(rec).c_str());
} }
void RedisLogger::logTaskAttempt(DAGRunID dagRunID, void RedisLogger::logTaskAttempt(DAGRunID dagRunID,
const std::string &taskName, const std::string &taskName,
const AttemptRecord &attempt) const AttemptRecord &attempt)
{ {
redis::RedisContext ctx(host_, port_); std::string attemptJSON = attemptRecordToJSON(attempt);
ctx.query("JSON.ARRAPPEND %s %s %s", getDAGRunKey_(dagRunID).c_str(), ctx_.query("RPUSH %s %s", getTaskAttemptKey_(dagRunID, taskName).c_str(),
(".taskAttempts." + taskName).c_str(), attemptJSON.c_str());
attemptRecordToJSON(attempt).c_str());
} }
void RedisLogger::updateTaskState(DAGRunID dagRunID, void RedisLogger::updateTaskState(DAGRunID dagRunID,
const std::string &taskName, RunState state) const std::string &taskName, RunState state)
{ {
redis::RedisContext ctx(host_, port_);
const auto &dagKey = getDAGRunKey_(dagRunID);
// Set the state // Set the state
ctx.query(R"(JSON.SET %s %s "%s")", dagKey.c_str(), ctx_.query(R"(HSET %s %s %s)", getTaskStatesKey_(dagRunID).c_str(),
(".taskStates." + taskName).c_str(), state._to_string()); taskName.c_str(), state._to_string());
ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(), ctx_.query(R"(SET %s %s)", getLastUpdateKey_(dagRunID),
timePointToString(Clock::now()).c_str()); timePointToString(Clock::now()).c_str());
// Add the update record // Add the update record
StateUpdateRecord rec{.time = Clock::now(), .state = state}; StateUpdateRecord rec{.time = Clock::now(), .state = state};
ctx.query("JSON.ARRAPPEND %s %s %s", dagKey.c_str(), ctx_.query("RPUSH %s %s",
(".taskStateUpdates." + taskName).c_str(), getTaskStateUpdateKey_(dagRunID, taskName).c_str(),
stateUpdateRecordToJSON(rec).c_str()); stateUpdateRecordToJSON(rec).c_str());
} }
// Querying // Querying
DAGSpec RedisLogger::getDAGSpec(DAGRunID dagRunID) DAGSpec RedisLogger::getDAGSpec(DAGRunID dagRunID)
{ {
redis::RedisContext ctx(host_, port_);
const auto &dagKey = getDAGRunKey_(dagRunID);
DAGSpec spec; DAGSpec spec;
spec.tag = ctx.query("JSON.GET %s .tag", dagKey.c_str()).as<std::string>(); spec.tag =
ctx_.query("GET %s", getTagKey_(dagRunID).c_str()).as<std::string>();
auto tasks = auto tasks = ctx_.query("HGETALL %s", getTasksKey_(dagRunID).c_str())
ctx.query("JSON.GET %s .tasks", dagKey.c_str()).as<std::string>(); .asHash<std::string, std::string>();
spec.tasks = tasksFromJSON(tasks); for (const auto &[taskName, taskJSON] : tasks) {
spec.tasks.emplace(taskName, taskFromJSON(taskName, taskJSON));
}
auto taskVars = auto taskVars = ctx_.query("GET %s", getTaskVariablesKey_(dagRunID).c_str())
ctx.query("JSON.GET %s .taskConfig.variables", dagKey.c_str()) .as<std::string>();
.as<std::string>();
spec.taskConfig.variables = configFromJSON(taskVars); spec.taskConfig.variables = configFromJSON(taskVars);
auto jobDefaults = auto jobDefaults =
ctx.query("JSON.GET %s .taskConfig.jobDefaults", dagKey.c_str()) ctx_.query("GET %s", getTaskDefaultsKey_(dagRunID).c_str())
.as<std::string>(); .as<std::string>();
spec.taskConfig.jobDefaults = configFromJSON(jobDefaults); spec.taskConfig.jobDefaults = configFromJSON(jobDefaults);
@@ -169,10 +139,9 @@ namespace daggy::loggers::dag_run {
std::vector<DAGRunSummary> RedisLogger::queryDAGRuns(const std::string &tag, std::vector<DAGRunSummary> RedisLogger::queryDAGRuns(const std::string &tag,
bool all) bool all)
{ {
redis::RedisContext ctx(host_, port_);
std::vector<DAGRunSummary> summaries; std::vector<DAGRunSummary> summaries;
auto reply = ctx.query("GET %s", dagRunIDsKey_.c_str()); auto reply = ctx_.query("GET %s", dagRunIDsKey_.c_str());
size_t maxRuns = std::stoull(reply.as<std::string>()); size_t maxRuns = std::stoull(reply.as<std::string>());
@@ -186,28 +155,31 @@ namespace daggy::loggers::dag_run {
} }
if (!all and state == +RunState::COMPLETED) if (!all and state == +RunState::COMPLETED)
continue; continue;
const auto &dagKey = getDAGRunKey_(runID); const auto dagTag =
ctx_.query("GET %s", getTagKey_(runID).c_str()).as<std::string>();
if (!tag.empty() and dagTag != tag)
continue;
const auto startTime =
ctx_.query("GET %s", getStartTimeKey_(runID).c_str())
.as<std::string>();
const auto lastTime =
ctx_.query("GET %s", getLastUpdateKey_(runID).c_str())
.as<std::string>();
DAGRunSummary summary{ DAGRunSummary summary{
.runID = runID, .runID = runID,
.tag = .tag = dagTag,
ctx.query("JSON.GET %s .tag", dagKey.c_str()).as<std::string>(), .runState = state,
.runState = state, .startTime = stringToTimePoint(startTime),
.startTime = stringToTimePoint( .lastUpdate = stringToTimePoint(lastTime),
ctx.query("JSON.GET %s .startTime", dagKey.c_str()) };
.as<std::string>()),
.lastUpdate = stringToTimePoint(
ctx.query("JSON.GET %s .lastUpdateTime", dagKey.c_str())
.as<std::string>())};
auto taskStates = ctx.query("JSON.GET %s .taskStates", dagKey.c_str()) auto taskStates =
.as<std::string>(); ctx_.query("HGETALL %s", getTaskStatesKey_(runID).c_str())
.asHash<std::string, std::string>();
rj::Document doc; for (const auto &[taskName, state] : taskStates) {
checkRJParse(doc.Parse(taskStates.c_str())); auto taskState = RunState::_from_string(state.c_str());
for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) {
std::string stateStr = it->value.GetString();
auto taskState = RunState::_from_string(stateStr.c_str());
summary.taskStateCounts[taskState]++; summary.taskStateCounts[taskState]++;
} }
summaries.emplace_back(summary); summaries.emplace_back(summary);
@@ -219,54 +191,46 @@ namespace daggy::loggers::dag_run {
DAGRunRecord RedisLogger::getDAGRun(DAGRunID dagRunID) DAGRunRecord RedisLogger::getDAGRun(DAGRunID dagRunID)
{ {
DAGRunRecord rec; DAGRunRecord rec;
redis::RedisContext ctx(host_, port_);
rec.dagSpec = getDAGSpec(dagRunID); rec.dagSpec = getDAGSpec(dagRunID);
auto json = ctx.query("JSON.GET %s", getDAGRunKey_(dagRunID).c_str()) // Populate DAG Updates
.as<std::string>(); auto dagStateUpdates =
ctx_.query("LRANGE %s 0 -1", getDAGStateUpdateKey_(dagRunID).c_str())
rj::Document doc; .asList<std::string>();
checkRJParse(doc.Parse(json.c_str())); std::transform(dagStateUpdates.begin(), dagStateUpdates.end(),
std::back_inserter(rec.dagStateChanges),
[](const auto &s) { return stateUpdateRecordFromJSON(s); });
// Populate taskRunStates // Populate taskRunStates
const auto &taskStates = doc["taskStates"].GetObject(); auto taskStates =
for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); ctx_.query("HGETALL %s", getTaskStatesKey_(dagRunID).c_str())
++it) { .asHash<std::string, std::string>();
rec.taskRunStates.emplace(it->name.GetString(), for (const auto &[taskName, state] : taskStates) {
RunState::_from_string(it->value.GetString())); rec.taskRunStates.emplace(taskName,
RunState::_from_string(state.c_str()));
} }
// Populate taskAttempts for (const auto &[taskName, _] : rec.dagSpec.tasks) {
const auto &taskAttempts = doc["taskAttempts"].GetObject(); // Populate taskAttempts
for (auto it = taskAttempts.MemberBegin(); it != taskAttempts.MemberEnd(); auto taskAttempts =
++it) { ctx_.query("LRANGE %s 0 -1",
const std::string taskName = it->name.GetString(); getTaskAttemptKey_(dagRunID, taskName).c_str())
const auto &newAttempts = it->value.GetArray(); .asList<std::string>();
auto &attempts = rec.taskAttempts[taskName]; std::transform(taskAttempts.begin(), taskAttempts.end(),
for (size_t i = 0; i < newAttempts.Size(); ++i) { std::back_inserter(rec.taskAttempts[taskName]),
auto rec = attemptRecordFromJSON(newAttempts[i]); [](const auto &s) { return attemptRecordFromJSON(s); });
attempts.emplace_back(rec);
}
}
// Populate taskStateChanges // Populate stateUpdates
const auto &taskStateUpdates = doc["taskStateUpdates"].GetObject(); auto taskStateUpdates =
for (auto it = taskStateUpdates.MemberBegin(); ctx_.query("LRANGE %s 0 -1",
it != taskStateUpdates.MemberEnd(); ++it) { getTaskStateUpdateKey_(dagRunID, taskName).c_str())
std::string taskName = it->name.GetString(); .asList<std::string>();
const auto &updates = it->value.GetArray(); auto &stateUpdates = rec.taskStateChanges[taskName];
auto &taskUpdates = rec.taskStateChanges[taskName]; std::transform(taskStateUpdates.begin(), taskStateUpdates.end(),
for (size_t i = 0; i < updates.Size(); ++i) { std::back_inserter(stateUpdates), [](const auto &s) {
taskUpdates.emplace_back(stateUpdateRecordFromJSON(updates[i])); return stateUpdateRecordFromJSON(s);
} });
}
// Populate DAG Updates
const auto &dagStateUpdates = doc["stateUpdates"].GetArray();
for (size_t i = 0; i < dagStateUpdates.Size(); ++i) {
rec.dagStateChanges.emplace_back(
stateUpdateRecordFromJSON(dagStateUpdates[i]));
} }
return rec; return rec;
@@ -274,9 +238,7 @@ namespace daggy::loggers::dag_run {
RunState RedisLogger::getDAGRunState(DAGRunID dagRunID) RunState RedisLogger::getDAGRunState(DAGRunID dagRunID)
{ {
redis::RedisContext ctx(host_, port_); auto resp = ctx_.query("GET %s", getDAGStateKey_(dagRunID).c_str());
auto resp =
ctx.query("JSON.GET %s .state", getDAGRunKey_(dagRunID).c_str());
std::string stateStr = resp.as<std::string>(); std::string stateStr = resp.as<std::string>();
if (stateStr.empty()) if (stateStr.empty())
throw std::runtime_error("No such dagrun"); throw std::runtime_error("No such dagrun");
@@ -285,32 +247,19 @@ namespace daggy::loggers::dag_run {
Task RedisLogger::getTask(DAGRunID dagRunID, const std::string &taskName) Task RedisLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
{ {
redis::RedisContext ctx(host_, port_); auto resp = ctx_.query("HGET %s %s", getTasksKey_(dagRunID).c_str(),
auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(), taskName.c_str());
(".tasks." + taskName).c_str());
return taskFromJSON(taskName, resp.as<std::string>()); return taskFromJSON(taskName, resp.as<std::string>());
} }
RunState RedisLogger::getTaskState(DAGRunID dagRunID, RunState RedisLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName) const std::string &taskName)
{ {
redis::RedisContext ctx(host_, port_); auto resp = ctx_.query("HGET %s %s", getTaskStatesKey_(dagRunID).c_str(),
auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(), taskName.c_str());
(".taskStates." + taskName).c_str());
return RunState::_from_string(resp.as<std::string>().c_str()); return RunState::_from_string(resp.as<std::string>().c_str());
} }
inline const std::string RedisLogger::getDAGTagMembersKey_(
const std::string &tag) const
{
return "tags_" + tag;
}
inline const std::string RedisLogger::getDAGRunKey_(DAGRunID runID) const
{
return std::to_string(runID) + "_spec";
}
} // namespace daggy::loggers::dag_run } // namespace daggy::loggers::dag_run
#endif #endif

View File

@@ -99,13 +99,13 @@ namespace {
SECTION("Log task attempt and retrieve it") SECTION("Log task attempt and retrieve it")
{ {
std::cout << "Task attempts" << std::endl; std::string error = "long error string\nwith new\n lines";
logger.logTaskAttempt(runID, "work_a", logger.logTaskAttempt(runID, "work_a",
AttemptRecord{.rc = 2, .errorLog = "help"}); AttemptRecord{.rc = 2, .errorLog = error});
auto dagRun = logger.getDAGRun(runID); auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.taskAttempts["work_a"].size() == 1); REQUIRE(dagRun.taskAttempts["work_a"].size() == 1);
REQUIRE(dagRun.taskAttempts["work_a"][0].errorLog == "help"); REQUIRE(dagRun.taskAttempts["work_a"][0].errorLog == error);
REQUIRE(dagRun.taskAttempts["work_a"][0].rc == 2); REQUIRE(dagRun.taskAttempts["work_a"][0].rc == 2);
} }
} }

View File

@@ -6,11 +6,10 @@
#include "daggy/executors/task/ForkingTaskExecutor.hpp" #include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/executors/task/NoopTaskExecutor.hpp" #include "daggy/executors/task/NoopTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp"
#include "daggy/loggers/dag_run/RedisLogger.hpp"
namespace fs = std::filesystem; namespace fs = std::filesystem;
TEST_CASE("dagrunner", "[dagrunner_order_preservation]") TEST_CASE("dagrunner", "[dagrunner][dagrunner_order_preservation]")
{ {
daggy::executors::task::NoopTaskExecutor ex; daggy::executors::task::NoopTaskExecutor ex;
std::stringstream ss; std::stringstream ss;
@@ -70,7 +69,7 @@ TEST_CASE("dagrunner", "[dagrunner_order_preservation]")
} }
} }
TEST_CASE("DAGRunner simple execution", "[dagrunner_simple]") TEST_CASE("DAGRunner simple execution", "[dagrunner][dagrunner_simple]")
{ {
daggy::executors::task::ForkingTaskExecutor ex(10); daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss; std::stringstream ss;
@@ -110,7 +109,7 @@ TEST_CASE("DAGRunner simple execution", "[dagrunner_simple]")
} }
} }
TEST_CASE("DAG Runner Restart old DAG", "[dagrunner_restart]") TEST_CASE("DAG Runner Restart old DAG", "[dagrunner][dagrunner_restart]")
{ {
daggy::executors::task::ForkingTaskExecutor ex(10); daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss; std::stringstream ss;
@@ -167,7 +166,7 @@ TEST_CASE("DAG Runner Restart old DAG", "[dagrunner_restart]")
} }
} }
TEST_CASE("DAG Runner Generator Tasks", "[dagrunner_generator]") TEST_CASE("DAG Runner Generator Tasks", "[dagrunner][dagrunner_generator]")
{ {
daggy::executors::task::ForkingTaskExecutor ex(10); daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss; std::stringstream ss;