Adding support for RedisJSON logger

Squashed commit of the following:

commit dc3a1bf07b5e7afdfd45e56f34596300dab6fd70
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 13 15:15:28 2021 -0300

    Updating documentation a bit

commit 8ec9c8c74f587368b32d034d3240a5537a69d4b1
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 13 15:11:23 2021 -0300

    Completing tests for redis

commit a6308dfa35b40b5a147394af8e3322ada871eb92
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 13 14:56:22 2021 -0300

    Resolving some errors with forking environment

commit 34691b6f85abae67001f4a4c234a4f7314407331
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 13 10:53:55 2021 -0300

    Checkpointing work on unit tests

commit 44c2b50fde30348938d901703ead9e279c3cd237
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 13 09:09:58 2021 -0300

    Checkpointing work on redis

commit a8051b725257087e25bc452673633ba6b40e3985
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Fri Oct 8 13:31:41 2021 -0300

    Checkpointing progress, changing state updates to a single record type

commit 456b84ad8c7dee0ff0dd39d5a7caead1ccd1126c
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Thu Oct 7 16:43:48 2021 -0300

    Checkpointing progress

commit f19dcaa4e417c3f2f6e527c288fe51401c9fe1d7
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Thu Oct 7 11:53:35 2021 -0300

    Moving back to hiredis to avoid boost dependency

commit e4bea6c589e82c82fd41476f164d946d77677193
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 6 10:41:16 2021 -0300

    fixing comments

commit 807a73c2a406817001eec048483938545a60194c
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Oct 6 10:40:38 2021 -0300

    Switching to redis-cpp

commit d060c008d4d96bf3a81a19d35067f95f3638b8ca
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 17:54:06 2021 -0300

    Adding hiredis dep
This commit is contained in:
Ian Roddis
2021-10-13 15:18:01 -03:00
parent 0d4c45f5fc
commit 9a0d2bb145
25 changed files with 909 additions and 137 deletions

View File

@@ -12,13 +12,15 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror")
if(CMAKE_BUILD_TYPE MATCHES "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fno-omit-frame-pointer")
set(TSAN_OPTIONS "suppressions=${CMAKE_CURRENT_DIR}/tests/tsan.supp")
endif()
set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party)
find_package(Threads REQUIRED)
option(DAGGY_ENABLE_SLURM "add support for SLURM executor" ON)
option(DAGGY_ENABLE_SLURM "add support for slurm executor" ON)
option(DAGGY_ENABLE_REDIS "add support for redis logger" ON)
option(DAGGY_ENABLE_BENCHMARKS "Add catch2 benchmarks" ON)
include(cmake/rapidjson.cmake)

View File

@@ -30,8 +30,10 @@ Individual tasks (vertices) are run via a task executor. Daggy supports multiple
fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html)
or [kubernetes](https://kubernetes.io/) (planned).
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger).
Future plans include supporting [redis](https://redis.io) and [postgres](https://postgresql.org).
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and
[RedisJSON](https://oss.redis.com/redisjson/).
Future plans include supporting [postgres](https://postgresql.org).
Building
==
@@ -377,3 +379,9 @@ files will then be read after the task has completed, and stored in the AttemptR
For this reason, it's important that the `tmpDir` directory **be readable by the daggy engine**. i.e in a distributed
environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy,
although it will still be available wherever it was written by slurm.
Loggers
=======
RedisJSON
---------

View File

@@ -7,23 +7,14 @@ Tasks
- Quality of Life
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated
list
- Cancel DAG execution entirely
- Marking tasks explicitly complete
- Executors
- Forking
- Add support for environment variables with `execvpe`
- Slurm
- Add support for environment variables
- Loggers
- Add in ability for loggers to be asynchronous
- Additional Methods
- Get DAG State
- Get Task Details
- Stream updates?
- Implementations
- [ ] General logger
- [ ] SQL logger (sqlite, postgres)
- [ ] Redis DAGRunLogger
- Server
- [ ] Multiple executors
- [ ] Log to general logger

View File

@@ -12,4 +12,4 @@ ExternalProject_Add(${PROJECT_NAME}-external
CONFIGURE_COMMAND "")
add_library(${PROJECT_NAME} INTERFACE)
add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-external)
target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include)
target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include)

View File

@@ -1,5 +1,5 @@
# SLURM
message("-- DAGGY_ENABLED_SLURM is set to ${DAGGY_ENABLE_SLURM}")
# Slurm support
message("-- DAGGY_ENABLE_SLURM is set to ${DAGGY_ENABLE_SLURM}")
if (DAGGY_ENABLE_SLURM)
find_library(SLURM_LIB libslurm.so libslurm.a slurm REQUIRED)
find_path(SLURM_INCLUDE_DIR "slurm/slurm.h" REQUIRED)
@@ -16,3 +16,10 @@ if (DAGGY_ENABLE_SLURM)
target_compile_definitions(slurm INTERFACE DAGGY_ENABLE_SLURM)
target_link_libraries(slurm INTERFACE dl resolv)
endif ()
# Redis support
message("-- DAGGY_ENABLE_REDIS is set to ${DAGGY_ENABLE_SLURM}")
if (DAGGY_ENABLE_REDIS)
include(cmake/hiredis.cmake)
target_compile_definitions(slurm INTERFACE DAGGY_ENABLE_REDIS)
endif ()

23
cmake/hiredis.cmake Normal file
View File

@@ -0,0 +1,23 @@
project(hiredis)
include(ExternalProject)
set_directory_properties(PROPERTIES EP_UPDATE_DISCONNECTED true)
ExternalProject_Add(hiredisDownload
PREFIX ${hiredis_root}
GIT_REPOSITORY https://github.com/redis/hiredis.git
GIT_TAG "v1.0.1"
CONFIGURE_COMMAND ""
BUILD_IN_SOURCE ON
INSTALL_COMMAND ""
)
ExternalProject_Get_Property(hiredisDownload SOURCE_DIR)
set(HIREDIS_INCLUDE_DIR ${SOURCE_DIR})
set(HIREDIS_LIB_DIR ${SOURCE_DIR})
add_library(${PROJECT_NAME} SHARED IMPORTED)
add_dependencies(${PROJECT_NAME} hiredisDownload)
target_include_directories(${PROJECT_NAME} INTERFACE ${HIREDIS_INCLUDE_DIR})
set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${HIREDIS_LIB_DIR}/libhiredis.a")

View File

@@ -6,6 +6,10 @@ IF (DAGGY_ENABLE_SLURM)
target_link_libraries(${PROJECT_NAME} slurm)
endif ()
IF (DAGGY_ENABLE_REDIS)
target_link_libraries(${PROJECT_NAME} hiredis)
endif ()
target_include_directories(${PROJECT_NAME} PUBLIC include)
target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson better-enums)

View File

@@ -9,12 +9,14 @@
#include "Defines.hpp"
#include "Utilities.hpp"
#include "loggers/dag_run/Defines.hpp"
namespace rj = rapidjson;
namespace daggy {
void checkRJParse(const rj::ParseResult &result,
const std::string &prefix = "");
void dumpJSON(const rj::Value &doc, std::ostream &os);
// Parameters
ConfigValues configFromJSON(const std::string &jsonSpec);
@@ -26,6 +28,8 @@ namespace daggy {
// Tasks
Task taskFromJSON(const std::string &name, const rj::Value &spec,
const ConfigValues &jobDefaults = {});
Task taskFromJSON(const std::string &name, const std::string &spec,
const ConfigValues &jobDefaults = {});
TaskSet tasksFromJSON(const std::string &jsonSpec,
const ConfigValues &jobDefaults = {});
@@ -43,6 +47,8 @@ namespace daggy {
// Attempt Records
std::string attemptRecordToJSON(const AttemptRecord &attemptRecord);
AttemptRecord attemptRecordFromJSON(const std::string &json);
AttemptRecord attemptRecordFromJSON(const rj::Value &spec);
// default serialization
std::ostream &operator<<(std::ostream &os, const Task &task);
@@ -50,4 +56,13 @@ namespace daggy {
std::string timePointToString(const TimePoint &tp);
TimePoint stringToTimePoint(const std::string &timeStr);
/*
DAGRun Loggers
*/
namespace logger = loggers::dag_run;
std::string stateUpdateRecordToJSON(const logger::StateUpdateRecord &rec);
logger::StateUpdateRecord stateUpdateRecordFromJSON(const rj::Value &json);
logger::StateUpdateRecord stateUpdateRecordFromJSON(const std::string &json);
} // namespace daggy

View File

@@ -26,8 +26,10 @@ namespace daggy {
const ConfigValues &interpolatedValues = {});
TaskDAG buildDAGFromTasks(
TaskSet &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates = {});
const TaskSet &tasks,
const std::unordered_map<std::string,
std::vector<loggers::dag_run::StateUpdateRecord>>
&updates = {});
void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks);

View File

@@ -43,8 +43,8 @@ namespace daggy::loggers::dag_run {
virtual RunState getDAGRunState(DAGRunID dagRunID) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0;
virtual Task &getTask(DAGRunID dagRunID, const std::string &taskName) = 0;
virtual RunState &getTaskState(DAGRunID dagRunID,
const std::string &taskName) = 0;
virtual Task getTask(DAGRunID dagRunID, const std::string &taskName) = 0;
virtual RunState getTaskState(DAGRunID dagRunID,
const std::string &taskName) = 0;
};
} // namespace daggy::loggers::dag_run

View File

@@ -9,17 +9,10 @@
#include "../../Defines.hpp"
namespace daggy::loggers::dag_run {
struct TaskUpdateRecord
struct StateUpdateRecord
{
TimePoint time;
std::string taskName;
RunState newState;
};
struct DAGUpdateRecord
{
TimePoint time;
RunState newState;
RunState state;
};
// Pretty heavy weight, but
@@ -28,8 +21,9 @@ namespace daggy::loggers::dag_run {
DAGSpec dagSpec;
std::unordered_map<std::string, RunState> taskRunStates;
std::unordered_map<std::string, std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges;
std::unordered_map<std::string, std::vector<StateUpdateRecord>>
taskStateChanges;
std::vector<StateUpdateRecord> dagStateChanges;
};
struct DAGRunSummary
@@ -42,3 +36,4 @@ namespace daggy::loggers::dag_run {
std::unordered_map<RunState, size_t> taskStateCounts;
};
} // namespace daggy::loggers::dag_run

View File

@@ -43,9 +43,9 @@ namespace daggy::loggers::dag_run {
RunState getDAGRunState(DAGRunID dagRunID) override;
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
Task &getTask(DAGRunID dagRunID, const std::string &taskName) override;
RunState &getTaskState(DAGRunID dagRunID,
const std::string &taskName) override;
Task getTask(DAGRunID dagRunID, const std::string &taskName) override;
RunState getTaskState(DAGRunID dagRunID,
const std::string &taskName) override;
private:
std::mutex guard_;

View File

@@ -0,0 +1,125 @@
#pragma once
#include <iterator>
#ifdef DAGGY_ENABLE_REDIS
#include <hiredis.h>
#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <string>
#include <variant>
#include <vector>
/*
Why a Redis Helper? This wraps hiredis structs in a class with a destructor
that will clean up after itself.
The query() method is a bit wonky with all the variants, but it works well
enough.
Important note: The hiredis context is not thread safe, so neither is this.
Create contexts as needed.
*/
namespace daggy::loggers::dag_run::redis {
using RedisDatum = std::variant<std::string, double, size_t>;
// Either a single Datum, or a vector of Datum
struct RedisData
{
void operator=(const RedisDatum &val)
{
data_ = val;
}
void operator=(const RedisData &other)
{
data_ = other.data_;
}
void operator=(const std::vector<RedisDatum> &other)
{
data_ = other;
}
RedisDatum asDatum()
{
return std::get<RedisDatum>(data_);
}
template <typename T>
T as()
{
return std::get<T>(std::get<RedisDatum>(data_));
}
template <typename T>
std::vector<T> asList()
{
std::vector<T> data;
const auto &inp = std::get<std::vector<RedisDatum>>(data_);
std::transform(inp.begin(), inp.end(), std::back_inserter(data),
[](const auto &i) { return std::get<T>(i); });
return data;
}
template <typename T, typename V>
std::unordered_map<T, V> asHash()
{
std::unordered_map<T, V> data;
const auto &inp = std::get<std::vector<RedisDatum>>(data_);
if (inp.size() % 2 != 0)
throw std::runtime_error("Number of items is not even");
for (size_t i = 0; i < inp.size(); i += 2) {
data.emplace(std::get<T>(inp[i]), std::get<V>(inp[i + 1]));
}
return data;
}
std::variant<RedisDatum, std::vector<RedisDatum>> data_;
};
class RedisContext
{
public:
RedisContext(const std::string &host, int port);
template <class... Args>
RedisData query(Args &&...args)
{
redisReply *reply = static_cast<redisReply *>(
redisCommand(ctx_, std::forward<Args>(args)...));
if (!reply) {
throw std::runtime_error("Cannot query redis.");
}
if (reply->type == REDIS_REPLY_ERROR) {
if (reply->str) {
std::string error{reply->str};
throw std::runtime_error("Error querying redis: " + error);
}
else {
throw std::runtime_error("Unknown error querying redis");
}
}
auto data = parseReply_(reply);
freeReplyObject(reply);
return data;
}
~RedisContext()
{
redisFree(ctx_);
}
private:
RedisData parseReply_(const redisReply *reply);
redisContext *ctx_;
};
} // namespace daggy::loggers::dag_run::redis
#endif

View File

@@ -0,0 +1,75 @@
#pragma once
#ifdef DAGGY_ENABLE_REDIS
#include <iostream>
#include <mutex>
#include "DAGRunLogger.hpp"
#include "Defines.hpp"
#include "RedisHelper.hpp"
namespace daggy::loggers::dag_run {
/*
RunIDS are obtained from the counter dagRunIDs;
Keys are constructed from the dagRunID.
- dagRunIDs is an INTEGER COUNTER that returns the next dagRunID
- {runid}_spec is a HASH from taskName -> taskJSON
{
"tag": tag,
"tasks": { ...tasks... },
*/
class RedisLogger : public DAGRunLogger
{
public:
explicit RedisLogger(const std::string &host = "127.0.0.1",
int port = 6379);
// Execution
DAGRunID startDAGRun(const DAGSpec &dagSpec) override;
void addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) override;
void updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
void logTaskAttempt(DAGRunID, const std::string &taskName,
const AttemptRecord &attempt) override;
void updateTaskState(DAGRunID dagRunID, const std::string &taskName,
RunState state) override;
// Querying
DAGSpec getDAGSpec(DAGRunID dagRunID) override;
std::vector<DAGRunSummary> queryDAGRuns(const std::string &tag = "",
bool all = false) override;
RunState getDAGRunState(DAGRunID dagRunID) override;
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
Task getTask(DAGRunID dagRunID, const std::string &taskName) override;
RunState getTaskState(DAGRunID dagRunID,
const std::string &taskName) override;
private:
std::string host_;
int port_;
const std::string dagRunIDsKey_;
inline const std::string getDAGTagMembersKey_(const std::string &) const;
inline const std::string getDAGRunKey_(DAGRunID) const;
};
} // namespace daggy::loggers::dag_run
#endif

View File

@@ -1,5 +1,7 @@
#include <rapidjson/error/en.h>
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <iomanip>
@@ -17,6 +19,14 @@ namespace daggy {
}
}
void dumpJSON(const rj::Value &doc, std::ostream &os)
{
rj::StringBuffer buffer;
rj::Writer<rj::StringBuffer> writer(buffer);
doc.Accept(writer);
os << buffer.GetString() << std::endl;
}
ConfigValues configFromJSON(const std::string &jsonSpec)
{
rj::Document doc;
@@ -159,6 +169,14 @@ namespace daggy {
return task;
}
Task taskFromJSON(const std::string &name, const std::string &spec,
const ConfigValues &jobDefaults)
{
rj::Document doc;
checkRJParse(doc.Parse(spec.c_str()));
return taskFromJSON(name, doc, jobDefaults);
}
TaskSet tasksFromJSON(const std::string &jsonSpec,
const ConfigValues &jobDefaults)
{
@@ -266,12 +284,34 @@ namespace daggy {
<< R"("startTime": )" << std::quoted(timePointToString(record.startTime))
<< ',' << R"("stopTime": )"
<< std::quoted(timePointToString(record.stopTime)) << ',' << R"("rc": )"
<< std::to_string(record.rc) << ',' << R"("executorLog": )"
<< record.rc << ',' << R"("executorLog": )"
<< std::quoted(record.executorLog) << ',' << R"("outputLog": )"
<< std::quoted(record.outputLog) << ',' << R"("errorLog": )"
<< std::quoted(record.errorLog) << '}';
return ss.str();
std::string json = ss.str();
return globalSub(json, "\n", "\\n");
}
AttemptRecord attemptRecordFromJSON(const std::string &json)
{
std::string jsonNew = globalSub(json, "\\n", "\n");
rj::Document doc;
checkRJParse(doc.Parse(jsonNew.c_str()), "Parsing config");
return attemptRecordFromJSON(doc);
}
AttemptRecord attemptRecordFromJSON(const rj::Value &spec)
{
AttemptRecord rec;
rec.startTime = stringToTimePoint(spec["startTime"].GetString());
rec.stopTime = stringToTimePoint(spec["stopTime"].GetString());
rec.rc = spec["rc"].GetInt();
rec.executorLog = spec["executorLog"].GetString();
rec.outputLog = spec["outputLog"].GetString();
rec.errorLog = spec["errorLog"].GetString();
return rec;
}
std::string timePointToString(const TimePoint &tp)
@@ -328,4 +368,31 @@ namespace daggy {
return dagFromJSON(doc);
}
std::string stateUpdateRecordToJSON(const logger::StateUpdateRecord &rec)
{
std::stringstream ss;
ss << R"({ "time": )" << std::quoted(timePointToString(rec.time))
<< R"(, "state": )" << std::quoted(rec.state._to_string()) << "}";
return ss.str();
}
logger::StateUpdateRecord stateUpdateRecordFromJSON(const rj::Value &json)
{
logger::StateUpdateRecord rec{.state = RunState::QUEUED};
if (!json.HasMember("time"))
throw std::runtime_error("StateUpdateRecord missing required field time");
if (!json.HasMember("state"))
throw std::runtime_error(
"StateUpdateRecord missing required field state");
rec.state = RunState::_from_string(json["state"].GetString());
rec.time = stringToTimePoint(json["time"].GetString());
return rec;
}
logger::StateUpdateRecord stateUpdateRecordFromJSON(const std::string &json)
{
rj::Document doc;
checkRJParse(doc.Parse(json.c_str()), "Parsing config");
return stateUpdateRecordFromJSON(doc);
}
} // namespace daggy

View File

@@ -328,9 +328,7 @@ namespace daggy {
else {
ss << ',';
}
ss << '{' << R"("newState": )"
<< std::quoted(change.newState._to_string()) << ',' << R"("time": )"
<< std::quoted(timePointToString(change.time)) << '}';
ss << stateUpdateRecordToJSON(change);
}
ss << "]";
ss << '}';

View File

@@ -105,25 +105,29 @@ namespace daggy {
}
TaskDAG buildDAGFromTasks(
TaskSet &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates)
const TaskSet &tasks,
const std::unordered_map<std::string,
std::vector<loggers::dag_run::StateUpdateRecord>>
&updates)
{
TaskDAG dag;
updateDAGFromTasks(dag, tasks);
// Replay any updates
for (const auto &update : updates) {
switch (update.newState) {
case RunState::RUNNING:
case RunState::RETRY:
case RunState::PAUSED:
case RunState::ERRORED:
case RunState::KILLED:
dag.setVertexState(update.taskName, RunState::RUNNING);
break;
case RunState::COMPLETED:
case RunState::QUEUED:
break;
for (const auto &[taskName, taskUpdates] : updates) {
for (const auto &update : taskUpdates) {
switch (update.state) {
case RunState::RUNNING:
case RunState::RETRY:
case RunState::PAUSED:
case RunState::ERRORED:
case RunState::KILLED:
dag.setVertexState(taskName, RunState::RUNNING);
break;
case RunState::COMPLETED:
case RunState::QUEUED:
break;
}
}
}

View File

@@ -105,14 +105,13 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
argv.push_back(nullptr);
// Populate the environment
auto it = task.job.find("environment");
if (it != task.job.end()) {
const auto environment = std::get<Command>(task.job.at("environment"));
std::transform(
environment.begin(), environment.end(), std::back_inserter(envp),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
envp.push_back(nullptr);
}
auto environment = (task.job.count("environment") == 0
? std::vector<std::string>{}
: std::get<Command>(task.job.at("environment")));
std::transform(
environment.begin(), environment.end(), std::back_inserter(envp),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
envp.push_back(nullptr);
// Create the pipe
int stdoutPipe[2];
@@ -135,8 +134,10 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
}
close(stdoutPipe[0]);
close(stderrPipe[0]);
execvpe(argv[0], argv.data(), envp.data());
exit(-1);
char **env = (envp.empty() ? nullptr : envp.data());
auto res = execvpe(argv[0], argv.data(), env);
std::cout << res << std::endl;
exit(errno);
}
std::atomic<bool> reading = true;

View File

@@ -1,3 +1,5 @@
target_sources(${PROJECT_NAME} PRIVATE
OStreamLogger.cpp
RedisLogger.cpp
RedisHelper.cpp
)

View File

@@ -5,7 +5,7 @@
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
#include <iterator>
namespace daggy { namespace loggers { namespace dag_run {
namespace daggy::loggers::dag_run {
OStreamLogger::OStreamLogger(std::ostream &os)
: os_(os)
{
@@ -93,7 +93,7 @@ namespace daggy { namespace loggers { namespace dag_run {
RunState state)
{
auto &dagRun = dagRuns_.at(dagRunID);
dagRun.taskStateChanges.push_back({Clock::now(), taskName, state});
dagRun.taskStateChanges[taskName].push_back({Clock::now(), state});
auto it = dagRun.taskRunStates.find(taskName);
if (it == dagRun.taskRunStates.end()) {
dagRun.taskRunStates.emplace(taskName, state);
@@ -121,18 +121,26 @@ namespace daggy { namespace loggers { namespace dag_run {
size_t i = 0;
for (const auto &run : dagRuns_) {
if ((!all) &&
(run.dagStateChanges.back().newState == +RunState::COMPLETED)) {
(run.dagStateChanges.back().state == +RunState::COMPLETED)) {
continue;
}
if (!tag.empty() and tag != run.dagSpec.tag)
continue;
TimePoint lastTaskUpdate;
for (const auto &[_, updates] : run.taskStateChanges) {
for (const auto &update : updates) {
if (update.time > lastTaskUpdate)
lastTaskUpdate = update.time;
}
}
DAGRunSummary summary{
.runID = i,
.tag = run.dagSpec.tag,
.runState = run.dagStateChanges.back().newState,
.runState = run.dagStateChanges.back().state,
.startTime = run.dagStateChanges.front().time,
.lastUpdate = std::max<TimePoint>(run.taskStateChanges.back().time,
.lastUpdate = std::max<TimePoint>(lastTaskUpdate,
run.dagStateChanges.back().time)};
for (const auto &[_, taskState] : run.taskRunStates) {
@@ -153,19 +161,20 @@ namespace daggy { namespace loggers { namespace dag_run {
RunState OStreamLogger::getDAGRunState(DAGRunID dagRunID)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagStateChanges.back().newState;
return dagRuns_.at(dagRunID).dagStateChanges.back().state;
}
Task &OStreamLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
Task OStreamLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagSpec.tasks.at(taskName);
}
RunState &OStreamLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
RunState OStreamLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).taskRunStates.at(taskName);
}
}}} // namespace daggy::loggers::dag_run
} // namespace daggy::loggers::dag_run

View File

@@ -0,0 +1,90 @@
#include <stdexcept>
#ifdef DAGGY_ENABLE_REDIS
#include <daggy/loggers/dag_run/RedisHelper.hpp>
namespace daggy::loggers::dag_run::redis {
RedisContext::RedisContext(const std::string &host, int port)
{
const struct timeval timeout = {0, 250000}; // .250 seconds
ctx_ = redisConnectWithTimeout(host.c_str(), port, timeout);
if (ctx_ == nullptr) {
throw std::runtime_error("Unable to ping redis server at " + host + ":" +
std::to_string(port));
}
}
RedisData RedisContext::parseReply_(const redisReply *reply)
{
RedisData data;
/*
switch (reply->type) {
case REDIS_REPLY_ERROR: {
std::cout << "\tERROR " << reply->str << std::endl;
break;
}
case REDIS_REPLY_STRING: {
std::cout << "\tSTRING" << std::endl;
break;
}
case REDIS_REPLY_VERB: {
std::cout << "\tVERB" << std::endl;
break;
}
case REDIS_REPLY_DOUBLE: {
std::cout << "\tDOUBLE" << std::endl;
break;
}
case REDIS_REPLY_INTEGER: {
std::cout << "\tINTEGER" << std::endl;
break;
}
case REDIS_REPLY_ARRAY: {
std::cout << "\tARRAY" << std::endl;
break;
}
case REDIS_REPLY_NIL: {
std::cout << "\tNIL" << std::endl;
break;
}
}
*/
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB: {
std::string raw(reply->str);
if (raw[0] == '"' and raw[raw.size() - 1] == '"') {
data = raw.substr(1, raw.size() - 2);
}
else {
data = RedisDatum{raw};
}
break;
}
case REDIS_REPLY_DOUBLE: {
data = RedisDatum{reply->dval};
break;
}
case REDIS_REPLY_INTEGER: {
data = RedisDatum{(size_t)reply->integer};
break;
}
case REDIS_REPLY_ARRAY: {
std::vector<RedisDatum> parts;
for (size_t i = 0UL; i < reply->elements; ++i) {
parts.push_back(parseReply_(reply->element[i]).asDatum());
}
data = parts;
break;
}
}
return data;
}
} // namespace daggy::loggers::dag_run::redis
#endif

View File

@@ -0,0 +1,316 @@
#include <stdexcept>
#ifdef DAGGY_ENABLE_REDIS
#include <enum.h>
#include <algorithm>
#include <daggy/Serialization.hpp>
#include <daggy/loggers/dag_run/RedisLogger.hpp>
#include <iomanip>
#include <iterator>
namespace daggy::loggers::dag_run {
RedisLogger::RedisLogger(const std::string &host, int port)
: host_(host)
, port_(port)
, dagRunIDsKey_("dagRunIDs")
{
redis::RedisContext ctx(host_, port_);
auto resp = ctx.query("exists %s", dagRunIDsKey_.c_str());
if (resp.as<size_t>() == 0) {
ctx.query("set %s %s", dagRunIDsKey_.c_str(), "0");
}
}
// Execution
DAGRunID RedisLogger::startDAGRun(const DAGSpec &dagSpec)
{
redis::RedisContext ctx(host_, port_);
auto resp = ctx.query("incr %s", dagRunIDsKey_.c_str());
DAGRunID runID = resp.as<size_t>();
// Store the DAGRun
std::stringstream ss;
ss << "{"
<< R"("tag": )" << std::quoted(dagSpec.tag) << R"(, "startTime": )"
<< std::quoted(timePointToString(Clock::now()))
<< R"(, "stateUpdates": [])"
<< R"(, "taskStates": {})"
<< R"(, "taskStateUpdates": {})"
<< R"(, "taskAttempts": {})"
<< R"(, "tasks": )" << tasksToJSON(dagSpec.tasks)
<< R"(, "taskConfig": {)"
<< R"("variables": )" << configToJSON(dagSpec.taskConfig.variables)
<< R"(, "jobDefaults": )" << configToJSON(dagSpec.taskConfig.jobDefaults)
<< R"(} })";
const auto &dagKey = getDAGRunKey_(runID);
ctx.query("JSON.SET %s . %s", dagKey.c_str(), ss.str().c_str());
for (const auto &[taskName, _] : dagSpec.tasks) {
ctx.query("JSON.SET %s %s []", dagKey.c_str(),
(".taskStateUpdates." + taskName).c_str());
ctx.query("JSON.SET %s %s []", dagKey.c_str(),
(".taskAttempts." + taskName).c_str());
}
// store tags
ctx.query("SADD %s %s", getDAGTagMembersKey_(dagSpec.tag).c_str(),
std::to_string(runID).c_str());
// Store tasks, initial states
for (const auto &[taskName, task] : dagSpec.tasks) {
updateTaskState(runID, taskName, RunState::QUEUED);
}
// Update the dag run state
updateDAGRunState(runID, RunState::QUEUED);
return runID;
}
void RedisLogger::addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task)
{
updateTask(dagRunID, taskName, task);
updateTaskState(dagRunID, taskName, RunState::QUEUED);
}
void RedisLogger::updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task)
{
redis::RedisContext ctx(host_, port_);
std::string taskJSON = taskToJSON(task);
ctx.query("JSON.SET %s %s %s", getDAGRunKey_(dagRunID).c_str(),
(".tasks." + taskName).c_str(), taskJSON.c_str(),
taskToJSON(task).c_str());
}
void RedisLogger::updateDAGRunState(DAGRunID dagRunID, RunState state)
{
redis::RedisContext ctx(host_, port_);
const auto &dagKey = getDAGRunKey_(dagRunID);
// Set the state
ctx.query(R"(JSON.SET %s .state "%s")", dagKey.c_str(), state._to_string());
ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(),
timePointToString(Clock::now()).c_str());
// Add the update record
StateUpdateRecord rec{.time = Clock::now(), .state = state};
ctx.query("JSON.ARRAPPEND %s .stateUpdates %s", dagKey.c_str(),
stateUpdateRecordToJSON(rec).c_str());
}
void RedisLogger::logTaskAttempt(DAGRunID dagRunID,
const std::string &taskName,
const AttemptRecord &attempt)
{
redis::RedisContext ctx(host_, port_);
ctx.query("JSON.ARRAPPEND %s %s %s", getDAGRunKey_(dagRunID).c_str(),
(".taskAttempts." + taskName).c_str(),
attemptRecordToJSON(attempt).c_str());
}
void RedisLogger::updateTaskState(DAGRunID dagRunID,
const std::string &taskName, RunState state)
{
redis::RedisContext ctx(host_, port_);
const auto &dagKey = getDAGRunKey_(dagRunID);
// Set the state
ctx.query(R"(JSON.SET %s %s "%s")", dagKey.c_str(),
(".taskStates." + taskName).c_str(), state._to_string());
ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(),
timePointToString(Clock::now()).c_str());
// Add the update record
StateUpdateRecord rec{.time = Clock::now(), .state = state};
ctx.query("JSON.ARRAPPEND %s %s %s", dagKey.c_str(),
(".taskStateUpdates." + taskName).c_str(),
stateUpdateRecordToJSON(rec).c_str());
}
// Querying
DAGSpec RedisLogger::getDAGSpec(DAGRunID dagRunID)
{
redis::RedisContext ctx(host_, port_);
const auto &dagKey = getDAGRunKey_(dagRunID);
DAGSpec spec;
spec.tag = ctx.query("JSON.GET %s .tag", dagKey.c_str()).as<std::string>();
auto tasks =
ctx.query("JSON.GET %s .tasks", dagKey.c_str()).as<std::string>();
spec.tasks = tasksFromJSON(tasks);
auto taskVars =
ctx.query("JSON.GET %s .taskConfig.variables", dagKey.c_str())
.as<std::string>();
spec.taskConfig.variables = configFromJSON(taskVars);
auto jobDefaults =
ctx.query("JSON.GET %s .taskConfig.jobDefaults", dagKey.c_str())
.as<std::string>();
spec.taskConfig.jobDefaults = configFromJSON(jobDefaults);
return spec;
};
std::vector<DAGRunSummary> RedisLogger::queryDAGRuns(const std::string &tag,
bool all)
{
redis::RedisContext ctx(host_, port_);
std::vector<DAGRunSummary> summaries;
auto reply = ctx.query("GET %s", dagRunIDsKey_.c_str());
size_t maxRuns = std::stoull(reply.as<std::string>());
RunState state = RunState::QUEUED;
for (size_t runID = 1; runID <= maxRuns; ++runID) {
try {
state = getDAGRunState(runID);
}
catch (std::runtime_error &e) {
continue;
}
if (!all and state == +RunState::COMPLETED)
continue;
const auto &dagKey = getDAGRunKey_(runID);
DAGRunSummary summary{
.runID = runID,
.tag =
ctx.query("JSON.GET %s .tag", dagKey.c_str()).as<std::string>(),
.runState = state,
.startTime = stringToTimePoint(
ctx.query("JSON.GET %s .startTime", dagKey.c_str())
.as<std::string>()),
.lastUpdate = stringToTimePoint(
ctx.query("JSON.GET %s .lastUpdateTime", dagKey.c_str())
.as<std::string>())};
auto taskStates = ctx.query("JSON.GET %s .taskStates", dagKey.c_str())
.as<std::string>();
rj::Document doc;
checkRJParse(doc.Parse(taskStates.c_str()));
for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) {
std::string stateStr = it->value.GetString();
auto taskState = RunState::_from_string(stateStr.c_str());
summary.taskStateCounts[taskState]++;
}
summaries.emplace_back(summary);
}
return summaries;
}
DAGRunRecord RedisLogger::getDAGRun(DAGRunID dagRunID)
{
DAGRunRecord rec;
redis::RedisContext ctx(host_, port_);
rec.dagSpec = getDAGSpec(dagRunID);
auto json = ctx.query("JSON.GET %s", getDAGRunKey_(dagRunID).c_str())
.as<std::string>();
rj::Document doc;
checkRJParse(doc.Parse(json.c_str()));
// Populate taskRunStates
const auto &taskStates = doc["taskStates"].GetObject();
for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd();
++it) {
rec.taskRunStates.emplace(it->name.GetString(),
RunState::_from_string(it->value.GetString()));
}
// Populate taskAttempts
const auto &taskAttempts = doc["taskAttempts"].GetObject();
for (auto it = taskAttempts.MemberBegin(); it != taskAttempts.MemberEnd();
++it) {
const std::string taskName = it->name.GetString();
const auto &newAttempts = it->value.GetArray();
auto &attempts = rec.taskAttempts[taskName];
for (size_t i = 0; i < newAttempts.Size(); ++i) {
auto rec = attemptRecordFromJSON(newAttempts[i]);
attempts.emplace_back(rec);
}
}
// Populate taskStateChanges
const auto &taskStateUpdates = doc["taskStateUpdates"].GetObject();
for (auto it = taskStateUpdates.MemberBegin();
it != taskStateUpdates.MemberEnd(); ++it) {
std::string taskName = it->name.GetString();
const auto &updates = it->value.GetArray();
auto &taskUpdates = rec.taskStateChanges[taskName];
for (size_t i = 0; i < updates.Size(); ++i) {
taskUpdates.emplace_back(stateUpdateRecordFromJSON(updates[i]));
}
}
// Populate DAG Updates
const auto &dagStateUpdates = doc["stateUpdates"].GetArray();
for (size_t i = 0; i < dagStateUpdates.Size(); ++i) {
rec.dagStateChanges.emplace_back(
stateUpdateRecordFromJSON(dagStateUpdates[i]));
}
return rec;
}
RunState RedisLogger::getDAGRunState(DAGRunID dagRunID)
{
redis::RedisContext ctx(host_, port_);
auto resp =
ctx.query("JSON.GET %s .state", getDAGRunKey_(dagRunID).c_str());
std::string stateStr = resp.as<std::string>();
if (stateStr.empty())
throw std::runtime_error("No such dagrun");
return RunState::_from_string(stateStr.c_str());
}
Task RedisLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
{
redis::RedisContext ctx(host_, port_);
auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(),
(".tasks." + taskName).c_str());
return taskFromJSON(taskName, resp.as<std::string>());
}
RunState RedisLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{
redis::RedisContext ctx(host_, port_);
auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(),
(".taskStates." + taskName).c_str());
return RunState::_from_string(resp.as<std::string>().c_str());
}
inline const std::string RedisLogger::getDAGTagMembersKey_(
const std::string &tag) const
{
return "tags_" + tag;
}
inline const std::string RedisLogger::getDAGRunKey_(DAGRunID runID) const
{
return std::to_string(runID) + "_spec";
}
} // namespace daggy::loggers::dag_run
#endif

View File

@@ -4,88 +4,126 @@
#include <iostream>
#include <sstream>
#include "daggy/Serialization.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
#include "daggy/loggers/dag_run/RedisLogger.hpp"
using namespace daggy;
using namespace daggy::loggers::dag_run;
const TaskSet SAMPLE_TASKS{
{"work_a",
Task{.job{{"command", std::vector<std::string>{"/bin/echo", "a"}}},
Task{.definedName{"work_a"},
.job{{"command", std::vector<std::string>{"/bin/echo", "a"}}},
.children{"c"}}},
{"work_b",
Task{.job{{"command", std::vector<std::string>{"/bin/echo", "b"}}},
Task{.definedName{"work_b"},
.job{{"command", std::vector<std::string>{"/bin/echo", "b"}}},
.children{"c"}}},
{"work_c",
Task{.job{{"command", std::vector<std::string>{"/bin/echo", "c"}}}}}};
Task{.definedName{"work_c"},
.job{{"command", std::vector<std::string>{"/bin/echo", "c"}}}}}};
inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &tag,
const TaskSet &tasks)
{
auto runID = logger.startDAGRun(DAGSpec{.tag = tag, .tasks = tasks});
// Verify run shows up in the list
namespace {
void testDAGRunInit(DAGRunLogger &logger, const std::string &tag,
const TaskSet &tasks)
{
auto runs = logger.queryDAGRuns();
REQUIRE(!runs.empty());
auto it = std::find_if(runs.begin(), runs.end(),
[runID](const auto &r) { return r.runID == runID; });
REQUIRE(it != runs.end());
REQUIRE(it->tag == tag);
REQUIRE(it->runState == +RunState::QUEUED);
}
auto runID = logger.startDAGRun(DAGSpec{.tag = tag, .tasks = tasks});
// Verify states
{
REQUIRE(logger.getDAGRunState(runID) == +RunState::QUEUED);
for (const auto &[k, _] : tasks) {
REQUIRE(logger.getTaskState(runID, k) == +RunState::QUEUED);
// Verify run shows up in the list
SECTION("New run shows up in list of runs")
{
auto runs = logger.queryDAGRuns();
REQUIRE(!runs.empty());
auto it = std::find_if(runs.begin(), runs.end(), [runID](const auto &r) {
return r.runID == runID;
});
REQUIRE(it != runs.end());
REQUIRE(it->tag == tag);
REQUIRE(it->runState == +RunState::QUEUED);
}
// Verify dagSpec matches
SECTION("Can retrieve DAG Spec")
{
auto spec = logger.getDAGSpec(runID);
REQUIRE(spec.tag == tag);
REQUIRE(spec.tasks == tasks);
}
// Verify states
SECTION("DAG State matches expectations")
{
REQUIRE(logger.getDAGRunState(runID) == +RunState::QUEUED);
for (const auto &[k, _] : tasks) {
REQUIRE(logger.getTaskState(runID, k) == +RunState::QUEUED);
}
}
// Verify integrity of run
SECTION("Can retrieve the full run")
{
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.dagSpec.tag == tag);
REQUIRE(dagRun.dagSpec.tasks == tasks);
REQUIRE(dagRun.taskRunStates.size() == tasks.size());
auto nonQueuedTask = std::find_if(
dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(),
[](const auto &a) { return a.second != +RunState::QUEUED; });
REQUIRE(nonQueuedTask == dagRun.taskRunStates.end());
REQUIRE(dagRun.dagStateChanges.size() == 1);
REQUIRE(dagRun.dagStateChanges.back().state == +RunState::QUEUED);
}
// Update DAG state and ensure that it's updated;
SECTION("Can update DAG state and retrieve new state")
{
logger.updateDAGRunState(runID, RunState::RUNNING);
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.dagStateChanges.back().state == +RunState::RUNNING);
}
// Update a task state
SECTION("Can update task state and retrieve new state")
{
for (const auto &[k, v] : tasks)
logger.updateTaskState(runID, k, RunState::RUNNING);
auto dagRun = logger.getDAGRun(runID);
for (const auto &[k, v] : tasks) {
REQUIRE(dagRun.taskRunStates.at(k) == +RunState::RUNNING);
}
}
SECTION("Log task attempt and retrieve it")
{
std::cout << "Task attempts" << std::endl;
logger.logTaskAttempt(runID, "work_a",
AttemptRecord{.rc = 2, .errorLog = "help"});
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.taskAttempts["work_a"].size() == 1);
REQUIRE(dagRun.taskAttempts["work_a"][0].errorLog == "help");
REQUIRE(dagRun.taskAttempts["work_a"][0].rc == 2);
}
}
// Verify integrity of run
{
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.dagSpec.tag == tag);
REQUIRE(dagRun.dagSpec.tasks == tasks);
REQUIRE(dagRun.taskRunStates.size() == tasks.size());
auto nonQueuedTask = std::find_if(
dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(),
[](const auto &a) { return a.second != +RunState::QUEUED; });
REQUIRE(nonQueuedTask == dagRun.taskRunStates.end());
REQUIRE(dagRun.dagStateChanges.size() == 1);
REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::QUEUED);
}
// Update DAG state and ensure that it's updated;
{
logger.updateDAGRunState(runID, RunState::RUNNING);
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::RUNNING);
}
// Update a task state
{
for (const auto &[k, v] : tasks)
logger.updateTaskState(runID, k, RunState::RUNNING);
auto dagRun = logger.getDAGRun(runID);
for (const auto &[k, v] : tasks) {
REQUIRE(dagRun.taskRunStates.at(k) == +RunState::RUNNING);
}
}
return runID;
}
} // namespace
TEST_CASE("ostream_logger", "[ostream_logger]")
{
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("DAGRun Starts")
{
testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
}
testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
}
#ifdef DAGGY_ENABLE_REDIS
TEST_CASE("redis_logger", "[redis_logger]")
{
daggy::loggers::dag_run::RedisLogger logger;
testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
}
#endif

View File

@@ -6,6 +6,7 @@
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/executors/task/NoopTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
#include "daggy/loggers/dag_run/RedisLogger.hpp"
namespace fs = std::filesystem;

View File

@@ -79,8 +79,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(rec.outputLog.find(valTwo) != std::string::npos);
REQUIRE(rec.errorLog.empty());
if (fs::exists(scriptFile))
fs::remove_all(scriptFile);
// if (fs::exists(scriptFile)) fs::remove_all(scriptFile);
}
SECTION("Error Run")