From e4e50fc219ec061df6bbe6030ccc84db7281af5a Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Sun, 17 Oct 2021 10:52:27 -0300 Subject: [PATCH] Migrating to vanilla redis from rejson, using a threadsafe RedisHelper Squashed commit of the following: commit f3549005c0192fd77bf47d208b74a11dd21380fa Author: Ian Roddis Date: Sun Oct 17 10:52:12 2021 -0300 Fixing issues with serialization of attempt records that included newlines commit ab387d62850428e320a05cdf54fd2026369d0bb6 Author: Ian Roddis Date: Fri Oct 15 12:57:14 2021 -0300 Migrating to vanilla redis from rejson commit f648cf065dea2d0a7e30aaec17441e9a37531ba0 Author: Ian Roddis Date: Fri Oct 15 10:23:34 2021 -0300 Making rediscontext threadsafe commit ca51d95c5c4a5f4aaa13c5abe32da161e919d66c Author: Ian Roddis Date: Fri Oct 15 10:01:33 2021 -0300 updating todo --- TODO.md | 4 + daggy/include/daggy/Serialization.hpp | 2 +- .../daggy/loggers/dag_run/RedisHelper.hpp | 3 + .../daggy/loggers/dag_run/RedisLogger.hpp | 41 ++- daggy/src/Serialization.cpp | 50 +++- daggy/src/Utilities.cpp | 2 +- daggy/src/loggers/dag_run/RedisLogger.cpp | 267 +++++++----------- tests/unit_dagrun_loggers.cpp | 6 +- tests/unit_dagrunner.cpp | 9 +- 9 files changed, 193 insertions(+), 191 deletions(-) diff --git a/TODO.md b/TODO.md index 97ced37..d80bc2d 100644 --- a/TODO.md +++ b/TODO.md @@ -15,6 +15,10 @@ Tasks - Implementations - [ ] General logger - [ ] SQL logger (sqlite, postgres) + - RedisLogger + - Convert to vanilla redis + - HGETALL + - Add prefix to distinguish daggyd instances - Server - [ ] Multiple executors - [ ] Log to general logger diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 0c6e28e..38e0eb3 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -16,7 +16,7 @@ namespace rj = rapidjson; namespace daggy { void checkRJParse(const rj::ParseResult &result, const std::string &prefix = ""); - void dumpJSON(const rj::Value &doc, std::ostream &os); + std::string dumpJSON(const rj::Value &doc); // Parameters ConfigValues configFromJSON(const std::string &jsonSpec); diff --git a/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp b/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp index 4e63884..2d1eeb4 100644 --- a/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp +++ b/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -89,6 +90,7 @@ namespace daggy::loggers::dag_run::redis { template RedisData query(Args &&...args) { + std::lock_guard lock(contextGuard_); redisReply *reply = static_cast( redisCommand(ctx_, std::forward(args)...)); @@ -119,6 +121,7 @@ namespace daggy::loggers::dag_run::redis { private: RedisData parseReply_(const redisReply *reply); redisContext *ctx_; + std::mutex contextGuard_; }; } // namespace daggy::loggers::dag_run::redis diff --git a/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp b/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp index 4f74064..28d66e2 100644 --- a/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp @@ -28,8 +28,9 @@ namespace daggy::loggers::dag_run { class RedisLogger : public DAGRunLogger { public: - explicit RedisLogger(const std::string &host = "127.0.0.1", - int port = 6379); + explicit RedisLogger(const std::string &prefix = "daggy", + const std::string &host = "127.0.0.1", + int port = 6379); // Execution DAGRunID startDAGRun(const DAGSpec &dagSpec) override; @@ -62,13 +63,39 @@ namespace daggy::loggers::dag_run { const std::string &taskName) override; private: - std::string host_; - int port_; - + const std::string prefix_; const std::string dagRunIDsKey_; + redis::RedisContext ctx_; - inline const std::string getDAGTagMembersKey_(const std::string &) const; - inline const std::string getDAGRunKey_(DAGRunID) const; + inline const std::string getDAGPrefix_(DAGRunID runID) const + { + return prefix_ + "_" + std::to_string(runID) + "_"; + } + +#define GET_DAG_KEY(name, extra) \ + inline std::string name(DAGRunID runID) const \ + { \ + return getDAGPrefix_(runID) + extra; \ + } + + GET_DAG_KEY(getTagKey_, "tag"); + GET_DAG_KEY(getTasksKey_, "tasks"); + GET_DAG_KEY(getDAGStateKey_, "state"); + GET_DAG_KEY(getDAGStateUpdateKey_, "stateUpdate"); + GET_DAG_KEY(getTaskStatesKey_, "taskStates"); + GET_DAG_KEY(getTaskVariablesKey_, "taskVariables"); + GET_DAG_KEY(getTaskDefaultsKey_, "taskDefaults"); + GET_DAG_KEY(getStartTimeKey_, "startTime"); + GET_DAG_KEY(getLastUpdateKey_, "lastUpdate"); + +#define GET_TASK_KEY(name, category) \ + inline std::string name(DAGRunID runID, const std::string &taskName) const \ + { \ + return getDAGPrefix_(runID) + category + "_" + taskName; \ + } + + GET_TASK_KEY(getTaskStateUpdateKey_, "taskUpdateState"); + GET_TASK_KEY(getTaskAttemptKey_, "taskAttempt"); }; } // namespace daggy::loggers::dag_run diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 86ba8dd..5a3f5ae 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -1,3 +1,4 @@ +#include #include #include "rapidjson/stringbuffer.h" @@ -19,12 +20,12 @@ namespace daggy { } } - void dumpJSON(const rj::Value &doc, std::ostream &os) + std::string dumpJSON(const rj::Value &doc) { rj::StringBuffer buffer; rj::Writer writer(buffer); doc.Accept(writer); - os << buffer.GetString() << std::endl; + return buffer.GetString(); } ConfigValues configFromJSON(const std::string &jsonSpec) @@ -278,26 +279,45 @@ namespace daggy { std::string attemptRecordToJSON(const AttemptRecord &record) { - std::stringstream ss; + rj::Document doc; + doc.SetObject(); + auto &alloc = doc.GetAllocator(); - ss << "{" - << R"("startTime": )" << std::quoted(timePointToString(record.startTime)) - << ',' << R"("stopTime": )" - << std::quoted(timePointToString(record.stopTime)) << ',' << R"("rc": )" - << record.rc << ',' << R"("executorLog": )" - << std::quoted(record.executorLog) << ',' << R"("outputLog": )" - << std::quoted(record.outputLog) << ',' << R"("errorLog": )" - << std::quoted(record.errorLog) << '}'; + auto startTime = timePointToString(record.startTime); + doc.AddMember( + "startTime", + rj::Value().SetString(startTime.c_str(), startTime.size(), alloc), + alloc); - std::string json = ss.str(); - return globalSub(json, "\n", "\\n"); + auto stopTime = timePointToString(record.stopTime); + doc.AddMember( + "stopTime", + rj::Value().SetString(stopTime.c_str(), stopTime.size(), alloc), alloc); + + doc.AddMember("rc", rj::Value().SetInt(record.rc), alloc); + + doc.AddMember("outputLog", + rj::Value().SetString(record.outputLog.c_str(), + record.outputLog.size(), alloc), + alloc); + + doc.AddMember("errorLog", + rj::Value().SetString(record.errorLog.c_str(), + record.errorLog.size(), alloc), + alloc); + + doc.AddMember("executorLog", + rj::Value().SetString(record.executorLog.c_str(), + record.executorLog.size(), alloc), + alloc); + + return dumpJSON(doc); } AttemptRecord attemptRecordFromJSON(const std::string &json) { - std::string jsonNew = globalSub(json, "\\n", "\n"); rj::Document doc; - checkRJParse(doc.Parse(jsonNew.c_str()), "Parsing config"); + checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord"); return attemptRecordFromJSON(doc); } diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index c530908..aca020a 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -12,7 +12,7 @@ namespace daggy { size_t pos = string.find(pattern); while (pos != std::string::npos) { string.replace(pos, pattern.size(), replacement); - pos = string.find(pattern); + pos = string.find(pattern, pos + replacement.size()); } return string; } diff --git a/daggy/src/loggers/dag_run/RedisLogger.cpp b/daggy/src/loggers/dag_run/RedisLogger.cpp index ce3915c..ab923c6 100644 --- a/daggy/src/loggers/dag_run/RedisLogger.cpp +++ b/daggy/src/loggers/dag_run/RedisLogger.cpp @@ -10,60 +10,40 @@ #include namespace daggy::loggers::dag_run { - RedisLogger::RedisLogger(const std::string &host, int port) - : host_(host) - , port_(port) - , dagRunIDsKey_("dagRunIDs") + RedisLogger::RedisLogger(const std::string &prefix, const std::string &host, + int port) + : prefix_(prefix) + , dagRunIDsKey_(prefix_ + "_dagRunIDs") + , ctx_(host, port) { - redis::RedisContext ctx(host_, port_); - - auto resp = ctx.query("exists %s", dagRunIDsKey_.c_str()); + auto resp = ctx_.query("exists %s", dagRunIDsKey_.c_str()); if (resp.as() == 0) { - ctx.query("set %s %s", dagRunIDsKey_.c_str(), "0"); + ctx_.query("set %s %s", dagRunIDsKey_.c_str(), "0"); } } // Execution DAGRunID RedisLogger::startDAGRun(const DAGSpec &dagSpec) { - redis::RedisContext ctx(host_, port_); - - auto resp = ctx.query("incr %s", dagRunIDsKey_.c_str()); + auto resp = ctx_.query("incr %s", dagRunIDsKey_.c_str()); DAGRunID runID = resp.as(); - // Store the DAGRun - std::stringstream ss; + ctx_.query("SET %s %s", getTagKey_(runID).c_str(), dagSpec.tag.c_str()); + ctx_.query("SET %s %s", getStartTimeKey_(runID).c_str(), + timePointToString(Clock::now()).c_str()); + ctx_.query("SET %s %s", getTaskVariablesKey_(runID).c_str(), + configToJSON(dagSpec.taskConfig.variables).c_str()); + ctx_.query("SET %s %s", getTaskDefaultsKey_(runID).c_str(), + configToJSON(dagSpec.taskConfig.jobDefaults).c_str()); - ss << "{" - << R"("tag": )" << std::quoted(dagSpec.tag) << R"(, "startTime": )" - << std::quoted(timePointToString(Clock::now())) - << R"(, "stateUpdates": [])" - << R"(, "taskStates": {})" - << R"(, "taskStateUpdates": {})" - << R"(, "taskAttempts": {})" - << R"(, "tasks": )" << tasksToJSON(dagSpec.tasks) - << R"(, "taskConfig": {)" - << R"("variables": )" << configToJSON(dagSpec.taskConfig.variables) - << R"(, "jobDefaults": )" << configToJSON(dagSpec.taskConfig.jobDefaults) - << R"(} })"; - - const auto &dagKey = getDAGRunKey_(runID); - - ctx.query("JSON.SET %s . %s", dagKey.c_str(), ss.str().c_str()); - - for (const auto &[taskName, _] : dagSpec.tasks) { - ctx.query("JSON.SET %s %s []", dagKey.c_str(), - (".taskStateUpdates." + taskName).c_str()); - ctx.query("JSON.SET %s %s []", dagKey.c_str(), - (".taskAttempts." + taskName).c_str()); + for (const auto &[taskName, task] : dagSpec.tasks) { + ctx_.query("HSET %s %s %s", getTasksKey_(runID).c_str(), taskName.c_str(), + taskToJSON(task).c_str()); + updateTaskState(runID, taskName, RunState::QUEUED); } - // store tags - ctx.query("SADD %s %s", getDAGTagMembersKey_(dagSpec.tag).c_str(), - std::to_string(runID).c_str()); - // Store tasks, initial states for (const auto &[taskName, task] : dagSpec.tasks) { updateTaskState(runID, taskName, RunState::QUEUED); @@ -85,81 +65,71 @@ namespace daggy::loggers::dag_run { void RedisLogger::updateTask(DAGRunID dagRunID, const std::string &taskName, const Task &task) { - redis::RedisContext ctx(host_, port_); - std::string taskJSON = taskToJSON(task); - ctx.query("JSON.SET %s %s %s", getDAGRunKey_(dagRunID).c_str(), - (".tasks." + taskName).c_str(), taskJSON.c_str(), - taskToJSON(task).c_str()); + ctx_.query("HSET %s %s %s", getTasksKey_(dagRunID).c_str(), + taskName.c_str(), taskToJSON(task).c_str()); } void RedisLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) { - redis::RedisContext ctx(host_, port_); - const auto &dagKey = getDAGRunKey_(dagRunID); - // Set the state - ctx.query(R"(JSON.SET %s .state "%s")", dagKey.c_str(), state._to_string()); + ctx_.query("SET %s %s", getDAGStateKey_(dagRunID).c_str(), + state._to_string()); - ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(), - timePointToString(Clock::now()).c_str()); + ctx_.query("SET %s %s", getLastUpdateKey_(dagRunID).c_str(), + timePointToString(Clock::now()).c_str()); // Add the update record StateUpdateRecord rec{.time = Clock::now(), .state = state}; - ctx.query("JSON.ARRAPPEND %s .stateUpdates %s", dagKey.c_str(), - stateUpdateRecordToJSON(rec).c_str()); + ctx_.query("RPUSH %s %s", getDAGStateUpdateKey_(dagRunID).c_str(), + stateUpdateRecordToJSON(rec).c_str()); } void RedisLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName, const AttemptRecord &attempt) { - redis::RedisContext ctx(host_, port_); - ctx.query("JSON.ARRAPPEND %s %s %s", getDAGRunKey_(dagRunID).c_str(), - (".taskAttempts." + taskName).c_str(), - attemptRecordToJSON(attempt).c_str()); + std::string attemptJSON = attemptRecordToJSON(attempt); + ctx_.query("RPUSH %s %s", getTaskAttemptKey_(dagRunID, taskName).c_str(), + attemptJSON.c_str()); } void RedisLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) { - redis::RedisContext ctx(host_, port_); - const auto &dagKey = getDAGRunKey_(dagRunID); - // Set the state - ctx.query(R"(JSON.SET %s %s "%s")", dagKey.c_str(), - (".taskStates." + taskName).c_str(), state._to_string()); + ctx_.query(R"(HSET %s %s %s)", getTaskStatesKey_(dagRunID).c_str(), + taskName.c_str(), state._to_string()); - ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(), - timePointToString(Clock::now()).c_str()); + ctx_.query(R"(SET %s %s)", getLastUpdateKey_(dagRunID), + timePointToString(Clock::now()).c_str()); // Add the update record StateUpdateRecord rec{.time = Clock::now(), .state = state}; - ctx.query("JSON.ARRAPPEND %s %s %s", dagKey.c_str(), - (".taskStateUpdates." + taskName).c_str(), - stateUpdateRecordToJSON(rec).c_str()); + ctx_.query("RPUSH %s %s", + getTaskStateUpdateKey_(dagRunID, taskName).c_str(), + stateUpdateRecordToJSON(rec).c_str()); } // Querying DAGSpec RedisLogger::getDAGSpec(DAGRunID dagRunID) { - redis::RedisContext ctx(host_, port_); - const auto &dagKey = getDAGRunKey_(dagRunID); - DAGSpec spec; - spec.tag = ctx.query("JSON.GET %s .tag", dagKey.c_str()).as(); + spec.tag = + ctx_.query("GET %s", getTagKey_(dagRunID).c_str()).as(); - auto tasks = - ctx.query("JSON.GET %s .tasks", dagKey.c_str()).as(); - spec.tasks = tasksFromJSON(tasks); + auto tasks = ctx_.query("HGETALL %s", getTasksKey_(dagRunID).c_str()) + .asHash(); + for (const auto &[taskName, taskJSON] : tasks) { + spec.tasks.emplace(taskName, taskFromJSON(taskName, taskJSON)); + } - auto taskVars = - ctx.query("JSON.GET %s .taskConfig.variables", dagKey.c_str()) - .as(); + auto taskVars = ctx_.query("GET %s", getTaskVariablesKey_(dagRunID).c_str()) + .as(); spec.taskConfig.variables = configFromJSON(taskVars); auto jobDefaults = - ctx.query("JSON.GET %s .taskConfig.jobDefaults", dagKey.c_str()) + ctx_.query("GET %s", getTaskDefaultsKey_(dagRunID).c_str()) .as(); spec.taskConfig.jobDefaults = configFromJSON(jobDefaults); @@ -169,10 +139,9 @@ namespace daggy::loggers::dag_run { std::vector RedisLogger::queryDAGRuns(const std::string &tag, bool all) { - redis::RedisContext ctx(host_, port_); std::vector summaries; - auto reply = ctx.query("GET %s", dagRunIDsKey_.c_str()); + auto reply = ctx_.query("GET %s", dagRunIDsKey_.c_str()); size_t maxRuns = std::stoull(reply.as()); @@ -186,28 +155,31 @@ namespace daggy::loggers::dag_run { } if (!all and state == +RunState::COMPLETED) continue; - const auto &dagKey = getDAGRunKey_(runID); + const auto dagTag = + ctx_.query("GET %s", getTagKey_(runID).c_str()).as(); + if (!tag.empty() and dagTag != tag) + continue; + + const auto startTime = + ctx_.query("GET %s", getStartTimeKey_(runID).c_str()) + .as(); + const auto lastTime = + ctx_.query("GET %s", getLastUpdateKey_(runID).c_str()) + .as(); + DAGRunSummary summary{ - .runID = runID, - .tag = - ctx.query("JSON.GET %s .tag", dagKey.c_str()).as(), - .runState = state, - .startTime = stringToTimePoint( - ctx.query("JSON.GET %s .startTime", dagKey.c_str()) - .as()), - .lastUpdate = stringToTimePoint( - ctx.query("JSON.GET %s .lastUpdateTime", dagKey.c_str()) - .as())}; + .runID = runID, + .tag = dagTag, + .runState = state, + .startTime = stringToTimePoint(startTime), + .lastUpdate = stringToTimePoint(lastTime), + }; - auto taskStates = ctx.query("JSON.GET %s .taskStates", dagKey.c_str()) - .as(); - - rj::Document doc; - checkRJParse(doc.Parse(taskStates.c_str())); - - for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) { - std::string stateStr = it->value.GetString(); - auto taskState = RunState::_from_string(stateStr.c_str()); + auto taskStates = + ctx_.query("HGETALL %s", getTaskStatesKey_(runID).c_str()) + .asHash(); + for (const auto &[taskName, state] : taskStates) { + auto taskState = RunState::_from_string(state.c_str()); summary.taskStateCounts[taskState]++; } summaries.emplace_back(summary); @@ -219,54 +191,46 @@ namespace daggy::loggers::dag_run { DAGRunRecord RedisLogger::getDAGRun(DAGRunID dagRunID) { DAGRunRecord rec; - redis::RedisContext ctx(host_, port_); rec.dagSpec = getDAGSpec(dagRunID); - auto json = ctx.query("JSON.GET %s", getDAGRunKey_(dagRunID).c_str()) - .as(); - - rj::Document doc; - checkRJParse(doc.Parse(json.c_str())); + // Populate DAG Updates + auto dagStateUpdates = + ctx_.query("LRANGE %s 0 -1", getDAGStateUpdateKey_(dagRunID).c_str()) + .asList(); + std::transform(dagStateUpdates.begin(), dagStateUpdates.end(), + std::back_inserter(rec.dagStateChanges), + [](const auto &s) { return stateUpdateRecordFromJSON(s); }); // Populate taskRunStates - const auto &taskStates = doc["taskStates"].GetObject(); - for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); - ++it) { - rec.taskRunStates.emplace(it->name.GetString(), - RunState::_from_string(it->value.GetString())); + auto taskStates = + ctx_.query("HGETALL %s", getTaskStatesKey_(dagRunID).c_str()) + .asHash(); + for (const auto &[taskName, state] : taskStates) { + rec.taskRunStates.emplace(taskName, + RunState::_from_string(state.c_str())); } - // Populate taskAttempts - const auto &taskAttempts = doc["taskAttempts"].GetObject(); - for (auto it = taskAttempts.MemberBegin(); it != taskAttempts.MemberEnd(); - ++it) { - const std::string taskName = it->name.GetString(); - const auto &newAttempts = it->value.GetArray(); - auto &attempts = rec.taskAttempts[taskName]; - for (size_t i = 0; i < newAttempts.Size(); ++i) { - auto rec = attemptRecordFromJSON(newAttempts[i]); - attempts.emplace_back(rec); - } - } + for (const auto &[taskName, _] : rec.dagSpec.tasks) { + // Populate taskAttempts + auto taskAttempts = + ctx_.query("LRANGE %s 0 -1", + getTaskAttemptKey_(dagRunID, taskName).c_str()) + .asList(); + std::transform(taskAttempts.begin(), taskAttempts.end(), + std::back_inserter(rec.taskAttempts[taskName]), + [](const auto &s) { return attemptRecordFromJSON(s); }); - // Populate taskStateChanges - const auto &taskStateUpdates = doc["taskStateUpdates"].GetObject(); - for (auto it = taskStateUpdates.MemberBegin(); - it != taskStateUpdates.MemberEnd(); ++it) { - std::string taskName = it->name.GetString(); - const auto &updates = it->value.GetArray(); - auto &taskUpdates = rec.taskStateChanges[taskName]; - for (size_t i = 0; i < updates.Size(); ++i) { - taskUpdates.emplace_back(stateUpdateRecordFromJSON(updates[i])); - } - } - - // Populate DAG Updates - const auto &dagStateUpdates = doc["stateUpdates"].GetArray(); - for (size_t i = 0; i < dagStateUpdates.Size(); ++i) { - rec.dagStateChanges.emplace_back( - stateUpdateRecordFromJSON(dagStateUpdates[i])); + // Populate stateUpdates + auto taskStateUpdates = + ctx_.query("LRANGE %s 0 -1", + getTaskStateUpdateKey_(dagRunID, taskName).c_str()) + .asList(); + auto &stateUpdates = rec.taskStateChanges[taskName]; + std::transform(taskStateUpdates.begin(), taskStateUpdates.end(), + std::back_inserter(stateUpdates), [](const auto &s) { + return stateUpdateRecordFromJSON(s); + }); } return rec; @@ -274,9 +238,7 @@ namespace daggy::loggers::dag_run { RunState RedisLogger::getDAGRunState(DAGRunID dagRunID) { - redis::RedisContext ctx(host_, port_); - auto resp = - ctx.query("JSON.GET %s .state", getDAGRunKey_(dagRunID).c_str()); + auto resp = ctx_.query("GET %s", getDAGStateKey_(dagRunID).c_str()); std::string stateStr = resp.as(); if (stateStr.empty()) throw std::runtime_error("No such dagrun"); @@ -285,32 +247,19 @@ namespace daggy::loggers::dag_run { Task RedisLogger::getTask(DAGRunID dagRunID, const std::string &taskName) { - redis::RedisContext ctx(host_, port_); - auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(), - (".tasks." + taskName).c_str()); + auto resp = ctx_.query("HGET %s %s", getTasksKey_(dagRunID).c_str(), + taskName.c_str()); return taskFromJSON(taskName, resp.as()); } RunState RedisLogger::getTaskState(DAGRunID dagRunID, const std::string &taskName) { - redis::RedisContext ctx(host_, port_); - auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(), - (".taskStates." + taskName).c_str()); + auto resp = ctx_.query("HGET %s %s", getTaskStatesKey_(dagRunID).c_str(), + taskName.c_str()); return RunState::_from_string(resp.as().c_str()); } - inline const std::string RedisLogger::getDAGTagMembersKey_( - const std::string &tag) const - { - return "tags_" + tag; - } - - inline const std::string RedisLogger::getDAGRunKey_(DAGRunID runID) const - { - return std::to_string(runID) + "_spec"; - } - } // namespace daggy::loggers::dag_run #endif diff --git a/tests/unit_dagrun_loggers.cpp b/tests/unit_dagrun_loggers.cpp index 06e0902..a92f1d4 100644 --- a/tests/unit_dagrun_loggers.cpp +++ b/tests/unit_dagrun_loggers.cpp @@ -99,13 +99,13 @@ namespace { SECTION("Log task attempt and retrieve it") { - std::cout << "Task attempts" << std::endl; + std::string error = "long error string\nwith new\n lines"; logger.logTaskAttempt(runID, "work_a", - AttemptRecord{.rc = 2, .errorLog = "help"}); + AttemptRecord{.rc = 2, .errorLog = error}); auto dagRun = logger.getDAGRun(runID); REQUIRE(dagRun.taskAttempts["work_a"].size() == 1); - REQUIRE(dagRun.taskAttempts["work_a"][0].errorLog == "help"); + REQUIRE(dagRun.taskAttempts["work_a"][0].errorLog == error); REQUIRE(dagRun.taskAttempts["work_a"][0].rc == 2); } } diff --git a/tests/unit_dagrunner.cpp b/tests/unit_dagrunner.cpp index b3eedc0..fa7432b 100644 --- a/tests/unit_dagrunner.cpp +++ b/tests/unit_dagrunner.cpp @@ -6,11 +6,10 @@ #include "daggy/executors/task/ForkingTaskExecutor.hpp" #include "daggy/executors/task/NoopTaskExecutor.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" -#include "daggy/loggers/dag_run/RedisLogger.hpp" namespace fs = std::filesystem; -TEST_CASE("dagrunner", "[dagrunner_order_preservation]") +TEST_CASE("dagrunner", "[dagrunner][dagrunner_order_preservation]") { daggy::executors::task::NoopTaskExecutor ex; std::stringstream ss; @@ -70,7 +69,7 @@ TEST_CASE("dagrunner", "[dagrunner_order_preservation]") } } -TEST_CASE("DAGRunner simple execution", "[dagrunner_simple]") +TEST_CASE("DAGRunner simple execution", "[dagrunner][dagrunner_simple]") { daggy::executors::task::ForkingTaskExecutor ex(10); std::stringstream ss; @@ -110,7 +109,7 @@ TEST_CASE("DAGRunner simple execution", "[dagrunner_simple]") } } -TEST_CASE("DAG Runner Restart old DAG", "[dagrunner_restart]") +TEST_CASE("DAG Runner Restart old DAG", "[dagrunner][dagrunner_restart]") { daggy::executors::task::ForkingTaskExecutor ex(10); std::stringstream ss; @@ -167,7 +166,7 @@ TEST_CASE("DAG Runner Restart old DAG", "[dagrunner_restart]") } } -TEST_CASE("DAG Runner Generator Tasks", "[dagrunner_generator]") +TEST_CASE("DAG Runner Generator Tasks", "[dagrunner][dagrunner_generator]") { daggy::executors::task::ForkingTaskExecutor ex(10); std::stringstream ss;