diff --git a/CMakeLists.txt b/CMakeLists.txt index f2107c7..0c1fdd0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,13 +12,15 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror") if(CMAKE_BUILD_TYPE MATCHES "Debug") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fno-omit-frame-pointer") + set(TSAN_OPTIONS "suppressions=${CMAKE_CURRENT_DIR}/tests/tsan.supp") endif() set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party) find_package(Threads REQUIRED) -option(DAGGY_ENABLE_SLURM "add support for SLURM executor" ON) +option(DAGGY_ENABLE_SLURM "add support for slurm executor" ON) +option(DAGGY_ENABLE_REDIS "add support for redis logger" ON) option(DAGGY_ENABLE_BENCHMARKS "Add catch2 benchmarks" ON) include(cmake/rapidjson.cmake) diff --git a/README.md b/README.md index 3a2d128..53c6a78 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,10 @@ Individual tasks (vertices) are run via a task executor. Daggy supports multiple fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html) or [kubernetes](https://kubernetes.io/) (planned). -State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger). -Future plans include supporting [redis](https://redis.io) and [postgres](https://postgresql.org). +State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and +[RedisJSON](https://oss.redis.com/redisjson/). + +Future plans include supporting [postgres](https://postgresql.org). Building == @@ -377,3 +379,9 @@ files will then be read after the task has completed, and stored in the AttemptR For this reason, it's important that the `tmpDir` directory **be readable by the daggy engine**. i.e in a distributed environment, it should be a shared filesystem. If this isn't the case, the job output will not be captured by daggy, although it will still be available wherever it was written by slurm. + +Loggers +======= + +RedisJSON +--------- diff --git a/TODO.md b/TODO.md index d1b3abc..97ced37 100644 --- a/TODO.md +++ b/TODO.md @@ -7,23 +7,14 @@ Tasks - Quality of Life - Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma separated list - - Cancel DAG execution entirely - - Marking tasks explicitly complete - Executors - - Forking - - Add support for environment variables with `execvpe` - - Slurm - - Add support for environment variables - Loggers - Add in ability for loggers to be asynchronous - Additional Methods - - Get DAG State - - Get Task Details - Stream updates? - Implementations - [ ] General logger - [ ] SQL logger (sqlite, postgres) - - [ ] Redis DAGRunLogger - Server - [ ] Multiple executors - [ ] Log to general logger diff --git a/cmake/argparse.cmake b/cmake/argparse.cmake index cb82b3c..0a14442 100644 --- a/cmake/argparse.cmake +++ b/cmake/argparse.cmake @@ -12,4 +12,4 @@ ExternalProject_Add(${PROJECT_NAME}-external CONFIGURE_COMMAND "") add_library(${PROJECT_NAME} INTERFACE) add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-external) -target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include) \ No newline at end of file +target_include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${THIRD_PARTY_DIR}/${PROJECT_NAME}/include) diff --git a/cmake/daggy_features.cmake b/cmake/daggy_features.cmake index d929413..c7cce8d 100644 --- a/cmake/daggy_features.cmake +++ b/cmake/daggy_features.cmake @@ -1,5 +1,5 @@ -# SLURM -message("-- DAGGY_ENABLED_SLURM is set to ${DAGGY_ENABLE_SLURM}") +# Slurm support +message("-- DAGGY_ENABLE_SLURM is set to ${DAGGY_ENABLE_SLURM}") if (DAGGY_ENABLE_SLURM) find_library(SLURM_LIB libslurm.so libslurm.a slurm REQUIRED) find_path(SLURM_INCLUDE_DIR "slurm/slurm.h" REQUIRED) @@ -16,3 +16,10 @@ if (DAGGY_ENABLE_SLURM) target_compile_definitions(slurm INTERFACE DAGGY_ENABLE_SLURM) target_link_libraries(slurm INTERFACE dl resolv) endif () + +# Redis support +message("-- DAGGY_ENABLE_REDIS is set to ${DAGGY_ENABLE_SLURM}") +if (DAGGY_ENABLE_REDIS) + include(cmake/hiredis.cmake) + target_compile_definitions(slurm INTERFACE DAGGY_ENABLE_REDIS) +endif () diff --git a/cmake/hiredis.cmake b/cmake/hiredis.cmake new file mode 100644 index 0000000..ce92dbc --- /dev/null +++ b/cmake/hiredis.cmake @@ -0,0 +1,23 @@ +project(hiredis) + +include(ExternalProject) + +set_directory_properties(PROPERTIES EP_UPDATE_DISCONNECTED true) + +ExternalProject_Add(hiredisDownload + PREFIX ${hiredis_root} + GIT_REPOSITORY https://github.com/redis/hiredis.git + GIT_TAG "v1.0.1" + CONFIGURE_COMMAND "" + BUILD_IN_SOURCE ON + INSTALL_COMMAND "" + ) + +ExternalProject_Get_Property(hiredisDownload SOURCE_DIR) +set(HIREDIS_INCLUDE_DIR ${SOURCE_DIR}) +set(HIREDIS_LIB_DIR ${SOURCE_DIR}) + +add_library(${PROJECT_NAME} SHARED IMPORTED) +add_dependencies(${PROJECT_NAME} hiredisDownload) +target_include_directories(${PROJECT_NAME} INTERFACE ${HIREDIS_INCLUDE_DIR}) +set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${HIREDIS_LIB_DIR}/libhiredis.a") diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index 3469849..2413319 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -6,6 +6,10 @@ IF (DAGGY_ENABLE_SLURM) target_link_libraries(${PROJECT_NAME} slurm) endif () +IF (DAGGY_ENABLE_REDIS) + target_link_libraries(${PROJECT_NAME} hiredis) +endif () + target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} pistache pthread rapidjson better-enums) diff --git a/daggy/include/daggy/Serialization.hpp b/daggy/include/daggy/Serialization.hpp index 5259d3a..0c6e28e 100644 --- a/daggy/include/daggy/Serialization.hpp +++ b/daggy/include/daggy/Serialization.hpp @@ -9,12 +9,14 @@ #include "Defines.hpp" #include "Utilities.hpp" +#include "loggers/dag_run/Defines.hpp" namespace rj = rapidjson; namespace daggy { void checkRJParse(const rj::ParseResult &result, const std::string &prefix = ""); + void dumpJSON(const rj::Value &doc, std::ostream &os); // Parameters ConfigValues configFromJSON(const std::string &jsonSpec); @@ -26,6 +28,8 @@ namespace daggy { // Tasks Task taskFromJSON(const std::string &name, const rj::Value &spec, const ConfigValues &jobDefaults = {}); + Task taskFromJSON(const std::string &name, const std::string &spec, + const ConfigValues &jobDefaults = {}); TaskSet tasksFromJSON(const std::string &jsonSpec, const ConfigValues &jobDefaults = {}); @@ -43,6 +47,8 @@ namespace daggy { // Attempt Records std::string attemptRecordToJSON(const AttemptRecord &attemptRecord); + AttemptRecord attemptRecordFromJSON(const std::string &json); + AttemptRecord attemptRecordFromJSON(const rj::Value &spec); // default serialization std::ostream &operator<<(std::ostream &os, const Task &task); @@ -50,4 +56,13 @@ namespace daggy { std::string timePointToString(const TimePoint &tp); TimePoint stringToTimePoint(const std::string &timeStr); + + /* + DAGRun Loggers + */ + namespace logger = loggers::dag_run; + + std::string stateUpdateRecordToJSON(const logger::StateUpdateRecord &rec); + logger::StateUpdateRecord stateUpdateRecordFromJSON(const rj::Value &json); + logger::StateUpdateRecord stateUpdateRecordFromJSON(const std::string &json); } // namespace daggy diff --git a/daggy/include/daggy/Utilities.hpp b/daggy/include/daggy/Utilities.hpp index 8c2f4dc..7207615 100644 --- a/daggy/include/daggy/Utilities.hpp +++ b/daggy/include/daggy/Utilities.hpp @@ -26,8 +26,10 @@ namespace daggy { const ConfigValues &interpolatedValues = {}); TaskDAG buildDAGFromTasks( - TaskSet &tasks, - const std::vector &updates = {}); + const TaskSet &tasks, + const std::unordered_map> + &updates = {}); void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks); diff --git a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp index 3232dc5..06e4b30 100644 --- a/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/DAGRunLogger.hpp @@ -43,8 +43,8 @@ namespace daggy::loggers::dag_run { virtual RunState getDAGRunState(DAGRunID dagRunID) = 0; virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0; - virtual Task &getTask(DAGRunID dagRunID, const std::string &taskName) = 0; - virtual RunState &getTaskState(DAGRunID dagRunID, - const std::string &taskName) = 0; + virtual Task getTask(DAGRunID dagRunID, const std::string &taskName) = 0; + virtual RunState getTaskState(DAGRunID dagRunID, + const std::string &taskName) = 0; }; } // namespace daggy::loggers::dag_run diff --git a/daggy/include/daggy/loggers/dag_run/Defines.hpp b/daggy/include/daggy/loggers/dag_run/Defines.hpp index db2d810..65d817e 100644 --- a/daggy/include/daggy/loggers/dag_run/Defines.hpp +++ b/daggy/include/daggy/loggers/dag_run/Defines.hpp @@ -9,17 +9,10 @@ #include "../../Defines.hpp" namespace daggy::loggers::dag_run { - struct TaskUpdateRecord + struct StateUpdateRecord { TimePoint time; - std::string taskName; - RunState newState; - }; - - struct DAGUpdateRecord - { - TimePoint time; - RunState newState; + RunState state; }; // Pretty heavy weight, but @@ -28,8 +21,9 @@ namespace daggy::loggers::dag_run { DAGSpec dagSpec; std::unordered_map taskRunStates; std::unordered_map> taskAttempts; - std::vector taskStateChanges; - std::vector dagStateChanges; + std::unordered_map> + taskStateChanges; + std::vector dagStateChanges; }; struct DAGRunSummary @@ -42,3 +36,4 @@ namespace daggy::loggers::dag_run { std::unordered_map taskStateCounts; }; } // namespace daggy::loggers::dag_run + diff --git a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp index 53c7bb8..4e942e0 100644 --- a/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp +++ b/daggy/include/daggy/loggers/dag_run/OStreamLogger.hpp @@ -43,9 +43,9 @@ namespace daggy::loggers::dag_run { RunState getDAGRunState(DAGRunID dagRunID) override; DAGRunRecord getDAGRun(DAGRunID dagRunID) override; - Task &getTask(DAGRunID dagRunID, const std::string &taskName) override; - RunState &getTaskState(DAGRunID dagRunID, - const std::string &taskName) override; + Task getTask(DAGRunID dagRunID, const std::string &taskName) override; + RunState getTaskState(DAGRunID dagRunID, + const std::string &taskName) override; private: std::mutex guard_; diff --git a/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp b/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp new file mode 100644 index 0000000..4e63884 --- /dev/null +++ b/daggy/include/daggy/loggers/dag_run/RedisHelper.hpp @@ -0,0 +1,125 @@ +#pragma once + +#include +#ifdef DAGGY_ENABLE_REDIS + +#include + +#include +#include +#include +#include +#include +#include + +/* + Why a Redis Helper? This wraps hiredis structs in a class with a destructor + that will clean up after itself. + + The query() method is a bit wonky with all the variants, but it works well + enough. + + Important note: The hiredis context is not thread safe, so neither is this. + Create contexts as needed. + */ + +namespace daggy::loggers::dag_run::redis { + using RedisDatum = std::variant; + + // Either a single Datum, or a vector of Datum + struct RedisData + { + void operator=(const RedisDatum &val) + { + data_ = val; + } + + void operator=(const RedisData &other) + { + data_ = other.data_; + } + + void operator=(const std::vector &other) + { + data_ = other; + } + + RedisDatum asDatum() + { + return std::get(data_); + } + + template + T as() + { + return std::get(std::get(data_)); + } + + template + std::vector asList() + { + std::vector data; + const auto &inp = std::get>(data_); + std::transform(inp.begin(), inp.end(), std::back_inserter(data), + [](const auto &i) { return std::get(i); }); + return data; + } + + template + std::unordered_map asHash() + { + std::unordered_map data; + const auto &inp = std::get>(data_); + if (inp.size() % 2 != 0) + throw std::runtime_error("Number of items is not even"); + for (size_t i = 0; i < inp.size(); i += 2) { + data.emplace(std::get(inp[i]), std::get(inp[i + 1])); + } + return data; + } + + std::variant> data_; + }; + + class RedisContext + { + public: + RedisContext(const std::string &host, int port); + + template + RedisData query(Args &&...args) + { + redisReply *reply = static_cast( + redisCommand(ctx_, std::forward(args)...)); + + if (!reply) { + throw std::runtime_error("Cannot query redis."); + } + + if (reply->type == REDIS_REPLY_ERROR) { + if (reply->str) { + std::string error{reply->str}; + throw std::runtime_error("Error querying redis: " + error); + } + else { + throw std::runtime_error("Unknown error querying redis"); + } + } + + auto data = parseReply_(reply); + freeReplyObject(reply); + return data; + } + + ~RedisContext() + { + redisFree(ctx_); + } + + private: + RedisData parseReply_(const redisReply *reply); + redisContext *ctx_; + }; +} // namespace daggy::loggers::dag_run::redis + +#endif diff --git a/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp b/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp new file mode 100644 index 0000000..4f74064 --- /dev/null +++ b/daggy/include/daggy/loggers/dag_run/RedisLogger.hpp @@ -0,0 +1,75 @@ +#pragma once + +#ifdef DAGGY_ENABLE_REDIS + +#include +#include + +#include "DAGRunLogger.hpp" +#include "Defines.hpp" +#include "RedisHelper.hpp" + +namespace daggy::loggers::dag_run { + + /* + RunIDS are obtained from the counter dagRunIDs; + + Keys are constructed from the dagRunID. + + - dagRunIDs is an INTEGER COUNTER that returns the next dagRunID + + - {runid}_spec is a HASH from taskName -> taskJSON + + { + "tag": tag, + "tasks": { ...tasks... }, + */ + + class RedisLogger : public DAGRunLogger + { + public: + explicit RedisLogger(const std::string &host = "127.0.0.1", + int port = 6379); + + // Execution + DAGRunID startDAGRun(const DAGSpec &dagSpec) override; + + void addTask(DAGRunID dagRunID, const std::string &taskName, + const Task &task) override; + + void updateTask(DAGRunID dagRunID, const std::string &taskName, + const Task &task) override; + + void updateDAGRunState(DAGRunID dagRunID, RunState state) override; + + void logTaskAttempt(DAGRunID, const std::string &taskName, + const AttemptRecord &attempt) override; + + void updateTaskState(DAGRunID dagRunID, const std::string &taskName, + RunState state) override; + + // Querying + DAGSpec getDAGSpec(DAGRunID dagRunID) override; + + std::vector queryDAGRuns(const std::string &tag = "", + bool all = false) override; + + RunState getDAGRunState(DAGRunID dagRunID) override; + DAGRunRecord getDAGRun(DAGRunID dagRunID) override; + + Task getTask(DAGRunID dagRunID, const std::string &taskName) override; + RunState getTaskState(DAGRunID dagRunID, + const std::string &taskName) override; + + private: + std::string host_; + int port_; + + const std::string dagRunIDsKey_; + + inline const std::string getDAGTagMembersKey_(const std::string &) const; + inline const std::string getDAGRunKey_(DAGRunID) const; + }; +} // namespace daggy::loggers::dag_run + +#endif diff --git a/daggy/src/Serialization.cpp b/daggy/src/Serialization.cpp index 422670b..86ba8dd 100644 --- a/daggy/src/Serialization.cpp +++ b/daggy/src/Serialization.cpp @@ -1,5 +1,7 @@ #include +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" #include #include #include @@ -17,6 +19,14 @@ namespace daggy { } } + void dumpJSON(const rj::Value &doc, std::ostream &os) + { + rj::StringBuffer buffer; + rj::Writer writer(buffer); + doc.Accept(writer); + os << buffer.GetString() << std::endl; + } + ConfigValues configFromJSON(const std::string &jsonSpec) { rj::Document doc; @@ -159,6 +169,14 @@ namespace daggy { return task; } + Task taskFromJSON(const std::string &name, const std::string &spec, + const ConfigValues &jobDefaults) + { + rj::Document doc; + checkRJParse(doc.Parse(spec.c_str())); + return taskFromJSON(name, doc, jobDefaults); + } + TaskSet tasksFromJSON(const std::string &jsonSpec, const ConfigValues &jobDefaults) { @@ -266,12 +284,34 @@ namespace daggy { << R"("startTime": )" << std::quoted(timePointToString(record.startTime)) << ',' << R"("stopTime": )" << std::quoted(timePointToString(record.stopTime)) << ',' << R"("rc": )" - << std::to_string(record.rc) << ',' << R"("executorLog": )" + << record.rc << ',' << R"("executorLog": )" << std::quoted(record.executorLog) << ',' << R"("outputLog": )" << std::quoted(record.outputLog) << ',' << R"("errorLog": )" << std::quoted(record.errorLog) << '}'; - return ss.str(); + std::string json = ss.str(); + return globalSub(json, "\n", "\\n"); + } + + AttemptRecord attemptRecordFromJSON(const std::string &json) + { + std::string jsonNew = globalSub(json, "\\n", "\n"); + rj::Document doc; + checkRJParse(doc.Parse(jsonNew.c_str()), "Parsing config"); + return attemptRecordFromJSON(doc); + } + + AttemptRecord attemptRecordFromJSON(const rj::Value &spec) + { + AttemptRecord rec; + rec.startTime = stringToTimePoint(spec["startTime"].GetString()); + rec.stopTime = stringToTimePoint(spec["stopTime"].GetString()); + rec.rc = spec["rc"].GetInt(); + rec.executorLog = spec["executorLog"].GetString(); + rec.outputLog = spec["outputLog"].GetString(); + rec.errorLog = spec["errorLog"].GetString(); + + return rec; } std::string timePointToString(const TimePoint &tp) @@ -328,4 +368,31 @@ namespace daggy { return dagFromJSON(doc); } + std::string stateUpdateRecordToJSON(const logger::StateUpdateRecord &rec) + { + std::stringstream ss; + ss << R"({ "time": )" << std::quoted(timePointToString(rec.time)) + << R"(, "state": )" << std::quoted(rec.state._to_string()) << "}"; + return ss.str(); + } + + logger::StateUpdateRecord stateUpdateRecordFromJSON(const rj::Value &json) + { + logger::StateUpdateRecord rec{.state = RunState::QUEUED}; + if (!json.HasMember("time")) + throw std::runtime_error("StateUpdateRecord missing required field time"); + if (!json.HasMember("state")) + throw std::runtime_error( + "StateUpdateRecord missing required field state"); + + rec.state = RunState::_from_string(json["state"].GetString()); + rec.time = stringToTimePoint(json["time"].GetString()); + return rec; + } + logger::StateUpdateRecord stateUpdateRecordFromJSON(const std::string &json) + { + rj::Document doc; + checkRJParse(doc.Parse(json.c_str()), "Parsing config"); + return stateUpdateRecordFromJSON(doc); + } } // namespace daggy diff --git a/daggy/src/Server.cpp b/daggy/src/Server.cpp index 9062706..e76509b 100644 --- a/daggy/src/Server.cpp +++ b/daggy/src/Server.cpp @@ -328,9 +328,7 @@ namespace daggy { else { ss << ','; } - ss << '{' << R"("newState": )" - << std::quoted(change.newState._to_string()) << ',' << R"("time": )" - << std::quoted(timePointToString(change.time)) << '}'; + ss << stateUpdateRecordToJSON(change); } ss << "]"; ss << '}'; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index 99258d0..c530908 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -105,25 +105,29 @@ namespace daggy { } TaskDAG buildDAGFromTasks( - TaskSet &tasks, - const std::vector &updates) + const TaskSet &tasks, + const std::unordered_map> + &updates) { TaskDAG dag; updateDAGFromTasks(dag, tasks); // Replay any updates - for (const auto &update : updates) { - switch (update.newState) { - case RunState::RUNNING: - case RunState::RETRY: - case RunState::PAUSED: - case RunState::ERRORED: - case RunState::KILLED: - dag.setVertexState(update.taskName, RunState::RUNNING); - break; - case RunState::COMPLETED: - case RunState::QUEUED: - break; + for (const auto &[taskName, taskUpdates] : updates) { + for (const auto &update : taskUpdates) { + switch (update.state) { + case RunState::RUNNING: + case RunState::RETRY: + case RunState::PAUSED: + case RunState::ERRORED: + case RunState::KILLED: + dag.setVertexState(taskName, RunState::RUNNING); + break; + case RunState::COMPLETED: + case RunState::QUEUED: + break; + } } } diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index 0fcdd8c..5ec27bf 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -105,14 +105,13 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, argv.push_back(nullptr); // Populate the environment - auto it = task.job.find("environment"); - if (it != task.job.end()) { - const auto environment = std::get(task.job.at("environment")); - std::transform( - environment.begin(), environment.end(), std::back_inserter(envp), - [](const std::string &s) { return const_cast(s.c_str()); }); - envp.push_back(nullptr); - } + auto environment = (task.job.count("environment") == 0 + ? std::vector{} + : std::get(task.job.at("environment"))); + std::transform( + environment.begin(), environment.end(), std::back_inserter(envp), + [](const std::string &s) { return const_cast(s.c_str()); }); + envp.push_back(nullptr); // Create the pipe int stdoutPipe[2]; @@ -135,8 +134,10 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, } close(stdoutPipe[0]); close(stderrPipe[0]); - execvpe(argv[0], argv.data(), envp.data()); - exit(-1); + char **env = (envp.empty() ? nullptr : envp.data()); + auto res = execvpe(argv[0], argv.data(), env); + std::cout << res << std::endl; + exit(errno); } std::atomic reading = true; diff --git a/daggy/src/loggers/dag_run/CMakeLists.txt b/daggy/src/loggers/dag_run/CMakeLists.txt index c3fee3e..b295de5 100644 --- a/daggy/src/loggers/dag_run/CMakeLists.txt +++ b/daggy/src/loggers/dag_run/CMakeLists.txt @@ -1,3 +1,5 @@ target_sources(${PROJECT_NAME} PRIVATE OStreamLogger.cpp + RedisLogger.cpp + RedisHelper.cpp ) diff --git a/daggy/src/loggers/dag_run/OStreamLogger.cpp b/daggy/src/loggers/dag_run/OStreamLogger.cpp index 1fc9bb3..23b3278 100644 --- a/daggy/src/loggers/dag_run/OStreamLogger.cpp +++ b/daggy/src/loggers/dag_run/OStreamLogger.cpp @@ -5,7 +5,7 @@ #include #include -namespace daggy { namespace loggers { namespace dag_run { +namespace daggy::loggers::dag_run { OStreamLogger::OStreamLogger(std::ostream &os) : os_(os) { @@ -93,7 +93,7 @@ namespace daggy { namespace loggers { namespace dag_run { RunState state) { auto &dagRun = dagRuns_.at(dagRunID); - dagRun.taskStateChanges.push_back({Clock::now(), taskName, state}); + dagRun.taskStateChanges[taskName].push_back({Clock::now(), state}); auto it = dagRun.taskRunStates.find(taskName); if (it == dagRun.taskRunStates.end()) { dagRun.taskRunStates.emplace(taskName, state); @@ -121,18 +121,26 @@ namespace daggy { namespace loggers { namespace dag_run { size_t i = 0; for (const auto &run : dagRuns_) { if ((!all) && - (run.dagStateChanges.back().newState == +RunState::COMPLETED)) { + (run.dagStateChanges.back().state == +RunState::COMPLETED)) { continue; } if (!tag.empty() and tag != run.dagSpec.tag) continue; + TimePoint lastTaskUpdate; + for (const auto &[_, updates] : run.taskStateChanges) { + for (const auto &update : updates) { + if (update.time > lastTaskUpdate) + lastTaskUpdate = update.time; + } + } + DAGRunSummary summary{ .runID = i, .tag = run.dagSpec.tag, - .runState = run.dagStateChanges.back().newState, + .runState = run.dagStateChanges.back().state, .startTime = run.dagStateChanges.front().time, - .lastUpdate = std::max(run.taskStateChanges.back().time, + .lastUpdate = std::max(lastTaskUpdate, run.dagStateChanges.back().time)}; for (const auto &[_, taskState] : run.taskRunStates) { @@ -153,19 +161,20 @@ namespace daggy { namespace loggers { namespace dag_run { RunState OStreamLogger::getDAGRunState(DAGRunID dagRunID) { std::lock_guard lock(guard_); - return dagRuns_.at(dagRunID).dagStateChanges.back().newState; + return dagRuns_.at(dagRunID).dagStateChanges.back().state; } - Task &OStreamLogger::getTask(DAGRunID dagRunID, const std::string &taskName) + Task OStreamLogger::getTask(DAGRunID dagRunID, const std::string &taskName) { std::lock_guard lock(guard_); return dagRuns_.at(dagRunID).dagSpec.tasks.at(taskName); } - RunState &OStreamLogger::getTaskState(DAGRunID dagRunID, - const std::string &taskName) + + RunState OStreamLogger::getTaskState(DAGRunID dagRunID, + const std::string &taskName) { std::lock_guard lock(guard_); return dagRuns_.at(dagRunID).taskRunStates.at(taskName); } -}}} // namespace daggy::loggers::dag_run +} // namespace daggy::loggers::dag_run diff --git a/daggy/src/loggers/dag_run/RedisHelper.cpp b/daggy/src/loggers/dag_run/RedisHelper.cpp new file mode 100644 index 0000000..5f82e80 --- /dev/null +++ b/daggy/src/loggers/dag_run/RedisHelper.cpp @@ -0,0 +1,90 @@ +#include +#ifdef DAGGY_ENABLE_REDIS + +#include + +namespace daggy::loggers::dag_run::redis { + RedisContext::RedisContext(const std::string &host, int port) + { + const struct timeval timeout = {0, 250000}; // .250 seconds + ctx_ = redisConnectWithTimeout(host.c_str(), port, timeout); + if (ctx_ == nullptr) { + throw std::runtime_error("Unable to ping redis server at " + host + ":" + + std::to_string(port)); + } + } + + RedisData RedisContext::parseReply_(const redisReply *reply) + { + RedisData data; + + /* + switch (reply->type) { + case REDIS_REPLY_ERROR: { + std::cout << "\tERROR " << reply->str << std::endl; + break; + } + case REDIS_REPLY_STRING: { + std::cout << "\tSTRING" << std::endl; + break; + } + case REDIS_REPLY_VERB: { + std::cout << "\tVERB" << std::endl; + break; + } + case REDIS_REPLY_DOUBLE: { + std::cout << "\tDOUBLE" << std::endl; + break; + } + case REDIS_REPLY_INTEGER: { + std::cout << "\tINTEGER" << std::endl; + break; + } + case REDIS_REPLY_ARRAY: { + std::cout << "\tARRAY" << std::endl; + break; + } + case REDIS_REPLY_NIL: { + std::cout << "\tNIL" << std::endl; + break; + } + } + */ + + switch (reply->type) { + case REDIS_REPLY_ERROR: + case REDIS_REPLY_STRING: + case REDIS_REPLY_VERB: { + std::string raw(reply->str); + if (raw[0] == '"' and raw[raw.size() - 1] == '"') { + data = raw.substr(1, raw.size() - 2); + } + else { + data = RedisDatum{raw}; + } + break; + } + case REDIS_REPLY_DOUBLE: { + data = RedisDatum{reply->dval}; + break; + } + case REDIS_REPLY_INTEGER: { + data = RedisDatum{(size_t)reply->integer}; + break; + } + case REDIS_REPLY_ARRAY: { + std::vector parts; + for (size_t i = 0UL; i < reply->elements; ++i) { + parts.push_back(parseReply_(reply->element[i]).asDatum()); + } + data = parts; + break; + } + } + + return data; + } + +} // namespace daggy::loggers::dag_run::redis + +#endif diff --git a/daggy/src/loggers/dag_run/RedisLogger.cpp b/daggy/src/loggers/dag_run/RedisLogger.cpp new file mode 100644 index 0000000..ce3915c --- /dev/null +++ b/daggy/src/loggers/dag_run/RedisLogger.cpp @@ -0,0 +1,316 @@ +#include +#ifdef DAGGY_ENABLE_REDIS + +#include + +#include +#include +#include +#include +#include + +namespace daggy::loggers::dag_run { + RedisLogger::RedisLogger(const std::string &host, int port) + : host_(host) + , port_(port) + , dagRunIDsKey_("dagRunIDs") + { + redis::RedisContext ctx(host_, port_); + + auto resp = ctx.query("exists %s", dagRunIDsKey_.c_str()); + + if (resp.as() == 0) { + ctx.query("set %s %s", dagRunIDsKey_.c_str(), "0"); + } + } + + // Execution + DAGRunID RedisLogger::startDAGRun(const DAGSpec &dagSpec) + { + redis::RedisContext ctx(host_, port_); + + auto resp = ctx.query("incr %s", dagRunIDsKey_.c_str()); + + DAGRunID runID = resp.as(); + + // Store the DAGRun + std::stringstream ss; + + ss << "{" + << R"("tag": )" << std::quoted(dagSpec.tag) << R"(, "startTime": )" + << std::quoted(timePointToString(Clock::now())) + << R"(, "stateUpdates": [])" + << R"(, "taskStates": {})" + << R"(, "taskStateUpdates": {})" + << R"(, "taskAttempts": {})" + << R"(, "tasks": )" << tasksToJSON(dagSpec.tasks) + << R"(, "taskConfig": {)" + << R"("variables": )" << configToJSON(dagSpec.taskConfig.variables) + << R"(, "jobDefaults": )" << configToJSON(dagSpec.taskConfig.jobDefaults) + << R"(} })"; + + const auto &dagKey = getDAGRunKey_(runID); + + ctx.query("JSON.SET %s . %s", dagKey.c_str(), ss.str().c_str()); + + for (const auto &[taskName, _] : dagSpec.tasks) { + ctx.query("JSON.SET %s %s []", dagKey.c_str(), + (".taskStateUpdates." + taskName).c_str()); + ctx.query("JSON.SET %s %s []", dagKey.c_str(), + (".taskAttempts." + taskName).c_str()); + } + + // store tags + ctx.query("SADD %s %s", getDAGTagMembersKey_(dagSpec.tag).c_str(), + std::to_string(runID).c_str()); + + // Store tasks, initial states + for (const auto &[taskName, task] : dagSpec.tasks) { + updateTaskState(runID, taskName, RunState::QUEUED); + } + + // Update the dag run state + updateDAGRunState(runID, RunState::QUEUED); + + return runID; + } + + void RedisLogger::addTask(DAGRunID dagRunID, const std::string &taskName, + const Task &task) + { + updateTask(dagRunID, taskName, task); + updateTaskState(dagRunID, taskName, RunState::QUEUED); + } + + void RedisLogger::updateTask(DAGRunID dagRunID, const std::string &taskName, + const Task &task) + { + redis::RedisContext ctx(host_, port_); + std::string taskJSON = taskToJSON(task); + ctx.query("JSON.SET %s %s %s", getDAGRunKey_(dagRunID).c_str(), + (".tasks." + taskName).c_str(), taskJSON.c_str(), + taskToJSON(task).c_str()); + } + + void RedisLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) + { + redis::RedisContext ctx(host_, port_); + const auto &dagKey = getDAGRunKey_(dagRunID); + + // Set the state + ctx.query(R"(JSON.SET %s .state "%s")", dagKey.c_str(), state._to_string()); + + ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(), + timePointToString(Clock::now()).c_str()); + + // Add the update record + StateUpdateRecord rec{.time = Clock::now(), .state = state}; + ctx.query("JSON.ARRAPPEND %s .stateUpdates %s", dagKey.c_str(), + stateUpdateRecordToJSON(rec).c_str()); + } + + void RedisLogger::logTaskAttempt(DAGRunID dagRunID, + const std::string &taskName, + const AttemptRecord &attempt) + { + redis::RedisContext ctx(host_, port_); + ctx.query("JSON.ARRAPPEND %s %s %s", getDAGRunKey_(dagRunID).c_str(), + (".taskAttempts." + taskName).c_str(), + attemptRecordToJSON(attempt).c_str()); + } + + void RedisLogger::updateTaskState(DAGRunID dagRunID, + const std::string &taskName, RunState state) + { + redis::RedisContext ctx(host_, port_); + const auto &dagKey = getDAGRunKey_(dagRunID); + + // Set the state + ctx.query(R"(JSON.SET %s %s "%s")", dagKey.c_str(), + (".taskStates." + taskName).c_str(), state._to_string()); + + ctx.query(R"(JSON.SET %s .lastUpdateTime "%s")", dagKey.c_str(), + timePointToString(Clock::now()).c_str()); + + // Add the update record + StateUpdateRecord rec{.time = Clock::now(), .state = state}; + ctx.query("JSON.ARRAPPEND %s %s %s", dagKey.c_str(), + (".taskStateUpdates." + taskName).c_str(), + stateUpdateRecordToJSON(rec).c_str()); + } + + // Querying + DAGSpec RedisLogger::getDAGSpec(DAGRunID dagRunID) + { + redis::RedisContext ctx(host_, port_); + const auto &dagKey = getDAGRunKey_(dagRunID); + + DAGSpec spec; + + spec.tag = ctx.query("JSON.GET %s .tag", dagKey.c_str()).as(); + + auto tasks = + ctx.query("JSON.GET %s .tasks", dagKey.c_str()).as(); + spec.tasks = tasksFromJSON(tasks); + + auto taskVars = + ctx.query("JSON.GET %s .taskConfig.variables", dagKey.c_str()) + .as(); + spec.taskConfig.variables = configFromJSON(taskVars); + + auto jobDefaults = + ctx.query("JSON.GET %s .taskConfig.jobDefaults", dagKey.c_str()) + .as(); + spec.taskConfig.jobDefaults = configFromJSON(jobDefaults); + + return spec; + }; + + std::vector RedisLogger::queryDAGRuns(const std::string &tag, + bool all) + { + redis::RedisContext ctx(host_, port_); + std::vector summaries; + + auto reply = ctx.query("GET %s", dagRunIDsKey_.c_str()); + + size_t maxRuns = std::stoull(reply.as()); + + RunState state = RunState::QUEUED; + for (size_t runID = 1; runID <= maxRuns; ++runID) { + try { + state = getDAGRunState(runID); + } + catch (std::runtime_error &e) { + continue; + } + if (!all and state == +RunState::COMPLETED) + continue; + const auto &dagKey = getDAGRunKey_(runID); + DAGRunSummary summary{ + .runID = runID, + .tag = + ctx.query("JSON.GET %s .tag", dagKey.c_str()).as(), + .runState = state, + .startTime = stringToTimePoint( + ctx.query("JSON.GET %s .startTime", dagKey.c_str()) + .as()), + .lastUpdate = stringToTimePoint( + ctx.query("JSON.GET %s .lastUpdateTime", dagKey.c_str()) + .as())}; + + auto taskStates = ctx.query("JSON.GET %s .taskStates", dagKey.c_str()) + .as(); + + rj::Document doc; + checkRJParse(doc.Parse(taskStates.c_str())); + + for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) { + std::string stateStr = it->value.GetString(); + auto taskState = RunState::_from_string(stateStr.c_str()); + summary.taskStateCounts[taskState]++; + } + summaries.emplace_back(summary); + } + + return summaries; + } + + DAGRunRecord RedisLogger::getDAGRun(DAGRunID dagRunID) + { + DAGRunRecord rec; + redis::RedisContext ctx(host_, port_); + + rec.dagSpec = getDAGSpec(dagRunID); + + auto json = ctx.query("JSON.GET %s", getDAGRunKey_(dagRunID).c_str()) + .as(); + + rj::Document doc; + checkRJParse(doc.Parse(json.c_str())); + + // Populate taskRunStates + const auto &taskStates = doc["taskStates"].GetObject(); + for (auto it = taskStates.MemberBegin(); it != taskStates.MemberEnd(); + ++it) { + rec.taskRunStates.emplace(it->name.GetString(), + RunState::_from_string(it->value.GetString())); + } + + // Populate taskAttempts + const auto &taskAttempts = doc["taskAttempts"].GetObject(); + for (auto it = taskAttempts.MemberBegin(); it != taskAttempts.MemberEnd(); + ++it) { + const std::string taskName = it->name.GetString(); + const auto &newAttempts = it->value.GetArray(); + auto &attempts = rec.taskAttempts[taskName]; + for (size_t i = 0; i < newAttempts.Size(); ++i) { + auto rec = attemptRecordFromJSON(newAttempts[i]); + attempts.emplace_back(rec); + } + } + + // Populate taskStateChanges + const auto &taskStateUpdates = doc["taskStateUpdates"].GetObject(); + for (auto it = taskStateUpdates.MemberBegin(); + it != taskStateUpdates.MemberEnd(); ++it) { + std::string taskName = it->name.GetString(); + const auto &updates = it->value.GetArray(); + auto &taskUpdates = rec.taskStateChanges[taskName]; + for (size_t i = 0; i < updates.Size(); ++i) { + taskUpdates.emplace_back(stateUpdateRecordFromJSON(updates[i])); + } + } + + // Populate DAG Updates + const auto &dagStateUpdates = doc["stateUpdates"].GetArray(); + for (size_t i = 0; i < dagStateUpdates.Size(); ++i) { + rec.dagStateChanges.emplace_back( + stateUpdateRecordFromJSON(dagStateUpdates[i])); + } + + return rec; + } + + RunState RedisLogger::getDAGRunState(DAGRunID dagRunID) + { + redis::RedisContext ctx(host_, port_); + auto resp = + ctx.query("JSON.GET %s .state", getDAGRunKey_(dagRunID).c_str()); + std::string stateStr = resp.as(); + if (stateStr.empty()) + throw std::runtime_error("No such dagrun"); + return RunState::_from_string(stateStr.c_str()); + } + + Task RedisLogger::getTask(DAGRunID dagRunID, const std::string &taskName) + { + redis::RedisContext ctx(host_, port_); + auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(), + (".tasks." + taskName).c_str()); + return taskFromJSON(taskName, resp.as()); + } + + RunState RedisLogger::getTaskState(DAGRunID dagRunID, + const std::string &taskName) + { + redis::RedisContext ctx(host_, port_); + auto resp = ctx.query("JSON.GET %s %s", getDAGRunKey_(dagRunID).c_str(), + (".taskStates." + taskName).c_str()); + return RunState::_from_string(resp.as().c_str()); + } + + inline const std::string RedisLogger::getDAGTagMembersKey_( + const std::string &tag) const + { + return "tags_" + tag; + } + + inline const std::string RedisLogger::getDAGRunKey_(DAGRunID runID) const + { + return std::to_string(runID) + "_spec"; + } + +} // namespace daggy::loggers::dag_run + +#endif diff --git a/tests/unit_dagrun_loggers.cpp b/tests/unit_dagrun_loggers.cpp index ed480e9..06e0902 100644 --- a/tests/unit_dagrun_loggers.cpp +++ b/tests/unit_dagrun_loggers.cpp @@ -4,88 +4,126 @@ #include #include +#include "daggy/Serialization.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" +#include "daggy/loggers/dag_run/RedisLogger.hpp" using namespace daggy; using namespace daggy::loggers::dag_run; const TaskSet SAMPLE_TASKS{ {"work_a", - Task{.job{{"command", std::vector{"/bin/echo", "a"}}}, + Task{.definedName{"work_a"}, + .job{{"command", std::vector{"/bin/echo", "a"}}}, .children{"c"}}}, {"work_b", - Task{.job{{"command", std::vector{"/bin/echo", "b"}}}, + Task{.definedName{"work_b"}, + .job{{"command", std::vector{"/bin/echo", "b"}}}, .children{"c"}}}, {"work_c", - Task{.job{{"command", std::vector{"/bin/echo", "c"}}}}}}; + Task{.definedName{"work_c"}, + .job{{"command", std::vector{"/bin/echo", "c"}}}}}}; -inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &tag, - const TaskSet &tasks) -{ - auto runID = logger.startDAGRun(DAGSpec{.tag = tag, .tasks = tasks}); - - // Verify run shows up in the list +namespace { + void testDAGRunInit(DAGRunLogger &logger, const std::string &tag, + const TaskSet &tasks) { - auto runs = logger.queryDAGRuns(); - REQUIRE(!runs.empty()); - auto it = std::find_if(runs.begin(), runs.end(), - [runID](const auto &r) { return r.runID == runID; }); - REQUIRE(it != runs.end()); - REQUIRE(it->tag == tag); - REQUIRE(it->runState == +RunState::QUEUED); - } + auto runID = logger.startDAGRun(DAGSpec{.tag = tag, .tasks = tasks}); - // Verify states - { - REQUIRE(logger.getDAGRunState(runID) == +RunState::QUEUED); - for (const auto &[k, _] : tasks) { - REQUIRE(logger.getTaskState(runID, k) == +RunState::QUEUED); + // Verify run shows up in the list + SECTION("New run shows up in list of runs") + { + auto runs = logger.queryDAGRuns(); + REQUIRE(!runs.empty()); + auto it = std::find_if(runs.begin(), runs.end(), [runID](const auto &r) { + return r.runID == runID; + }); + REQUIRE(it != runs.end()); + REQUIRE(it->tag == tag); + REQUIRE(it->runState == +RunState::QUEUED); + } + + // Verify dagSpec matches + SECTION("Can retrieve DAG Spec") + { + auto spec = logger.getDAGSpec(runID); + REQUIRE(spec.tag == tag); + REQUIRE(spec.tasks == tasks); + } + + // Verify states + SECTION("DAG State matches expectations") + { + REQUIRE(logger.getDAGRunState(runID) == +RunState::QUEUED); + for (const auto &[k, _] : tasks) { + REQUIRE(logger.getTaskState(runID, k) == +RunState::QUEUED); + } + } + + // Verify integrity of run + SECTION("Can retrieve the full run") + { + auto dagRun = logger.getDAGRun(runID); + + REQUIRE(dagRun.dagSpec.tag == tag); + REQUIRE(dagRun.dagSpec.tasks == tasks); + + REQUIRE(dagRun.taskRunStates.size() == tasks.size()); + auto nonQueuedTask = std::find_if( + dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(), + [](const auto &a) { return a.second != +RunState::QUEUED; }); + + REQUIRE(nonQueuedTask == dagRun.taskRunStates.end()); + REQUIRE(dagRun.dagStateChanges.size() == 1); + REQUIRE(dagRun.dagStateChanges.back().state == +RunState::QUEUED); + } + + // Update DAG state and ensure that it's updated; + SECTION("Can update DAG state and retrieve new state") + { + logger.updateDAGRunState(runID, RunState::RUNNING); + auto dagRun = logger.getDAGRun(runID); + REQUIRE(dagRun.dagStateChanges.back().state == +RunState::RUNNING); + } + + // Update a task state + SECTION("Can update task state and retrieve new state") + { + for (const auto &[k, v] : tasks) + logger.updateTaskState(runID, k, RunState::RUNNING); + auto dagRun = logger.getDAGRun(runID); + for (const auto &[k, v] : tasks) { + REQUIRE(dagRun.taskRunStates.at(k) == +RunState::RUNNING); + } + } + + SECTION("Log task attempt and retrieve it") + { + std::cout << "Task attempts" << std::endl; + logger.logTaskAttempt(runID, "work_a", + AttemptRecord{.rc = 2, .errorLog = "help"}); + auto dagRun = logger.getDAGRun(runID); + + REQUIRE(dagRun.taskAttempts["work_a"].size() == 1); + REQUIRE(dagRun.taskAttempts["work_a"][0].errorLog == "help"); + REQUIRE(dagRun.taskAttempts["work_a"][0].rc == 2); } } - - // Verify integrity of run - { - auto dagRun = logger.getDAGRun(runID); - - REQUIRE(dagRun.dagSpec.tag == tag); - REQUIRE(dagRun.dagSpec.tasks == tasks); - - REQUIRE(dagRun.taskRunStates.size() == tasks.size()); - auto nonQueuedTask = std::find_if( - dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(), - [](const auto &a) { return a.second != +RunState::QUEUED; }); - REQUIRE(nonQueuedTask == dagRun.taskRunStates.end()); - REQUIRE(dagRun.dagStateChanges.size() == 1); - REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::QUEUED); - } - - // Update DAG state and ensure that it's updated; - { - logger.updateDAGRunState(runID, RunState::RUNNING); - auto dagRun = logger.getDAGRun(runID); - REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::RUNNING); - } - - // Update a task state - { - for (const auto &[k, v] : tasks) - logger.updateTaskState(runID, k, RunState::RUNNING); - auto dagRun = logger.getDAGRun(runID); - for (const auto &[k, v] : tasks) { - REQUIRE(dagRun.taskRunStates.at(k) == +RunState::RUNNING); - } - } - - return runID; -} +} // namespace TEST_CASE("ostream_logger", "[ostream_logger]") { std::stringstream ss; daggy::loggers::dag_run::OStreamLogger logger(ss); - SECTION("DAGRun Starts") - { - testDAGRunInit(logger, "init_test", SAMPLE_TASKS); - } + testDAGRunInit(logger, "init_test", SAMPLE_TASKS); } + +#ifdef DAGGY_ENABLE_REDIS +TEST_CASE("redis_logger", "[redis_logger]") +{ + daggy::loggers::dag_run::RedisLogger logger; + + testDAGRunInit(logger, "init_test", SAMPLE_TASKS); +} +#endif diff --git a/tests/unit_dagrunner.cpp b/tests/unit_dagrunner.cpp index b5d7b8a..b3eedc0 100644 --- a/tests/unit_dagrunner.cpp +++ b/tests/unit_dagrunner.cpp @@ -6,6 +6,7 @@ #include "daggy/executors/task/ForkingTaskExecutor.hpp" #include "daggy/executors/task/NoopTaskExecutor.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp" +#include "daggy/loggers/dag_run/RedisLogger.hpp" namespace fs = std::filesystem; diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index 7aa7f0a..9738d3a 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -79,8 +79,7 @@ TEST_CASE("forking_executor", "[forking_executor]") REQUIRE(rec.outputLog.find(valTwo) != std::string::npos); REQUIRE(rec.errorLog.empty()); - if (fs::exists(scriptFile)) - fs::remove_all(scriptFile); + // if (fs::exists(scriptFile)) fs::remove_all(scriptFile); } SECTION("Error Run")