Large re-organization to split daggyd away from the core libdaggy.

This paves the way for implementing daggys and other utilities.

Squashed commit of the following:

commit 1f77239ab3c9e44d190eef94531a39501c8c4dfe
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 18 16:25:02 2021 -0300

    Adding README, stdout support for daggyd logging

commit c2c237224e84a3be68aaa597ce98af1365e74a13
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 18 16:10:29 2021 -0300

    removing old daggyd

commit cfea2baf61ca10c535801c5a391d2d525a1a2d04
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 18 16:10:09 2021 -0300

    Moving tests into their sub-project folders

commit e41ca42069bea1db16dd76b6684a3f692fef6b15
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 18 15:57:40 2021 -0300

    Splitting out daggyd from libdaggy

commit be97b146c1d2446f5c03cb78707e921f18c60bd8
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 18 15:56:55 2021 -0300

    Splitting out daggyd from libdaggy

commit cb61e140e9d6d8832d61fb7037fd4c0ff6edad00
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 18 15:49:47 2021 -0300

    moving daggy to libdaggy
This commit is contained in:
Ian Roddis
2021-10-18 16:28:40 -03:00
parent 612bc8af8a
commit 470a6f2bb7
59 changed files with 586 additions and 52 deletions

View File

@@ -0,0 +1,8 @@
target_sources(${PROJECT_NAME} PRIVATE
Serialization.cpp
Utilities.cpp
DAGRunner.cpp
)
add_subdirectory(executors)
add_subdirectory(loggers)

213
libdaggy/src/DAGRunner.cpp Normal file
View File

@@ -0,0 +1,213 @@
#include <chrono>
#include <daggy/DAGRunner.hpp>
#include <mutex>
#include <stdexcept>
namespace daggy {
DAGRunner::DAGRunner(DAGRunID runID, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
const TaskParameters &taskParams)
: runID_(runID)
, executor_(executor)
, logger_(logger)
, dag_(dag)
, taskParams_(taskParams)
, running_(true)
, kill_(true)
, nRunningTasks_(0)
, nErroredTasks_(0)
{
}
DAGRunner::~DAGRunner()
{
std::lock_guard<std::mutex> lock(runGuard_);
}
TaskDAG DAGRunner::run()
{
kill_ = false;
running_ = true;
logger_.updateDAGRunState(runID_, RunState::RUNNING);
bool allVisited;
{
std::lock_guard<std::mutex> lock(runGuard_);
allVisited = dag_.allVisited();
}
while (!allVisited) {
{
std::lock_guard<std::mutex> runLock(runGuard_);
if (!running_ and kill_) {
killRunning();
}
collectFinished();
queuePending();
if (!running_ and (nRunningTasks_ - nErroredTasks_ <= 0)) {
logger_.updateDAGRunState(runID_, RunState::KILLED);
break;
}
if (nRunningTasks_ > 0 and nErroredTasks_ == nRunningTasks_) {
logger_.updateDAGRunState(runID_, RunState::ERRORED);
break;
}
}
std::this_thread::sleep_for(250ms);
{
std::lock_guard<std::mutex> lock(runGuard_);
allVisited = dag_.allVisited();
}
}
if (dag_.allVisited()) {
logger_.updateDAGRunState(runID_, RunState::COMPLETED);
}
running_ = false;
return dag_;
}
void DAGRunner::resetRunning()
{
if (running_)
throw std::runtime_error("Unable to reset while DAG is running.");
std::lock_guard<std::mutex> lock(runGuard_);
nRunningTasks_ = 0;
nErroredTasks_ = 0;
runningTasks_.clear();
taskAttemptCounts_.clear();
dag_.resetRunning();
}
void DAGRunner::killRunning()
{
for (const auto &[taskName, _] : runningTasks_) {
executor_.stop(runID_, taskName);
}
}
void DAGRunner::queuePending()
{
if (!running_)
return;
// Check for any completed tasks
// Add all remaining tasks in a task queue to avoid dominating the thread
// pool
auto t = dag_.visitNext();
while (t.has_value()) {
// Schedule the task to run
auto &taskName = t.value().first;
auto &task = t.value().second;
taskAttemptCounts_[taskName] = 1;
logger_.updateTaskState(runID_, taskName, RunState::RUNNING);
runningTasks_.emplace(taskName,
executor_.execute(runID_, taskName, task));
++nRunningTasks_;
auto nextTask = dag_.visitNext();
if (not nextTask.has_value())
break;
t.emplace(nextTask.value());
}
}
void DAGRunner::collectFinished()
{
for (auto &[taskName, fut] : runningTasks_) {
if (fut.valid() and fut.wait_for(1ms) == std::future_status::ready) {
auto attempt = fut.get();
logger_.logTaskAttempt(runID_, taskName, attempt);
// Not a reference, since adding tasks will invalidate references
auto vert = dag_.getVertex(taskName);
auto &task = vert.data;
if (attempt.rc == 0) {
logger_.updateTaskState(runID_, taskName, RunState::COMPLETED);
if (task.isGenerator) {
// Parse the output and update the DAGs
try {
auto parsedTasks =
tasksFromJSON(attempt.outputLog, taskParams_.jobDefaults);
auto newTasks =
expandTaskSet(parsedTasks, executor_, taskParams_.variables);
updateDAGFromTasks(dag_, newTasks);
// Add in dependencies from current task to new tasks
for (const auto &[ntName, ntTask] : newTasks) {
logger_.addTask(runID_, ntName, ntTask);
task.children.insert(ntName);
}
// Efficiently add new edges from generator task
// to children
std::unordered_set<std::string> baseNames;
for (const auto &[k, v] : parsedTasks) {
baseNames.insert(v.definedName);
}
dag_.addEdgeIf(taskName, [&](const auto &v) {
return baseNames.count(v.data.definedName) > 0;
});
logger_.updateTask(runID_, taskName, task);
}
catch (std::exception &e) {
logger_.logTaskAttempt(
runID_, taskName,
AttemptRecord{
.executorLog =
std::string{"Failed to parse JSON output: "} +
e.what()});
logger_.updateTaskState(runID_, taskName, RunState::ERRORED);
++nErroredTasks_;
}
}
dag_.completeVisit(taskName);
--nRunningTasks_;
}
else {
// RC isn't 0
if (taskAttemptCounts_[taskName] <= task.maxRetries) {
logger_.updateTaskState(runID_, taskName, RunState::RETRY);
runningTasks_[taskName] = executor_.execute(runID_, taskName, task);
++taskAttemptCounts_[taskName];
}
else {
if (logger_.getTaskState(runID_, taskName) == +RunState::RUNNING or
logger_.getTaskState(runID_, taskName) == +RunState::RETRY) {
logger_.updateTaskState(runID_, taskName, RunState::ERRORED);
++nErroredTasks_;
}
else {
// Task was killed
--nRunningTasks_;
}
}
}
}
}
}
void DAGRunner::stop(bool kill, bool blocking)
{
kill_ = kill;
running_ = false;
if (blocking) {
while (true) {
{
std::lock_guard<std::mutex> lock(runGuard_);
if (nRunningTasks_ - nErroredTasks_ == 0)
break;
}
std::this_thread::sleep_for(250ms);
}
}
}
} // namespace daggy

View File

@@ -0,0 +1,418 @@
#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <iomanip>
#include <sstream>
namespace daggy {
void checkRJParse(const rj::ParseResult &result, const std::string &prefix)
{
if (!result) {
std::stringstream ss;
ss << (prefix.empty() ? "" : prefix + ':')
<< "Error parsing JSON: " << rj::GetParseError_En(result.Code())
<< " at byte offset " << result.Offset();
throw std::runtime_error(ss.str());
}
}
std::string dumpJSON(const rj::Value &doc)
{
rj::StringBuffer buffer;
rj::Writer<rj::StringBuffer> writer(buffer);
doc.Accept(writer);
return buffer.GetString();
}
ConfigValues configFromJSON(const std::string &jsonSpec)
{
rj::Document doc;
checkRJParse(doc.Parse(jsonSpec.c_str()), "Parsing config");
return configFromJSON(doc);
}
ConfigValues configFromJSON(const rj::Value &spec)
{
std::unordered_map<std::string, ConfigValue> parameters;
if (!spec.IsObject()) {
throw std::runtime_error("Parameters in spec is not a JSON dictionary");
}
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (!it->name.IsString()) {
throw std::runtime_error("All keys must be strings.");
}
std::string name = it->name.GetString();
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {
if (!it->value[i].IsString()) {
throw std::runtime_error(
"Attribute for " + std::string{it->name.GetString()} +
" item " + std::to_string(i) + " is not a string.");
}
values.emplace_back(it->value[i].GetString());
}
parameters[name] = values;
}
else if (it->value.IsString()) {
parameters[name] = it->value.GetString();
}
else {
throw std::runtime_error("Attribute for " +
std::string{it->name.GetString()} +
" is not a string or an array.");
}
}
return parameters;
}
std::string configToJSON(const ConfigValues &config)
{
std::stringstream ss;
ss << '{';
bool first = true;
for (const auto &[k, v] : config) {
if (first) {
first = false;
}
else {
ss << ", ";
}
ss << std::quoted(k) << ": ";
if (std::holds_alternative<std::string>(v)) {
ss << std::quoted(std::get<std::string>(v));
}
else {
ss << '[';
const auto &values = std::get<std::vector<std::string>>(v);
bool firstVal = true;
for (const auto &val : values) {
if (firstVal) {
firstVal = false;
}
else {
ss << ", ";
}
ss << std::quoted(val);
}
ss << ']';
}
}
ss << '}';
return ss.str();
}
Task taskFromJSON(const std::string &name, const rj::Value &spec,
const ConfigValues &jobDefaults)
{
Task task{.definedName = name,
.isGenerator = false,
.maxRetries = 0,
.retryIntervalSeconds = 0,
.job = jobDefaults};
if (!spec.IsObject()) {
throw std::runtime_error("Tasks is not an object");
}
// Grab the standard fields with defaults;
if (spec.HasMember("isGenerator")) {
task.isGenerator = spec["isGenerator"].GetBool();
}
if (spec.HasMember("maxRetries")) {
task.maxRetries = spec["maxRetries"].GetInt();
}
if (spec.HasMember("retryIntervalSeconds")) {
task.retryIntervalSeconds = spec["retryIntervalSeconds"].GetInt();
}
// Children / parents
if (spec.HasMember("children")) {
const auto &specChildren = spec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) {
task.children.insert(specChildren[c].GetString());
}
}
if (spec.HasMember("parents")) {
const auto &specParents = spec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) {
task.parents.insert(specParents[c].GetString());
}
}
if (spec.HasMember("job")) {
const auto &params = spec["job"];
if (!params.IsObject())
throw std::runtime_error("job is not a dictionary.");
for (auto it = params.MemberBegin(); it != params.MemberEnd(); ++it) {
if (!it->name.IsString())
throw std::runtime_error("job key must be a string.");
if (it->value.IsArray()) {
std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) {
values.emplace_back(it->value[i].GetString());
}
task.job.insert_or_assign(it->name.GetString(), values);
}
else {
task.job.insert_or_assign(it->name.GetString(),
it->value.GetString());
}
}
}
return task;
}
Task taskFromJSON(const std::string &name, const std::string &spec,
const ConfigValues &jobDefaults)
{
rj::Document doc;
checkRJParse(doc.Parse(spec.c_str()));
return taskFromJSON(name, doc, jobDefaults);
}
TaskSet tasksFromJSON(const std::string &jsonSpec,
const ConfigValues &jobDefaults)
{
rj::Document doc;
checkRJParse(doc.Parse(jsonSpec.c_str()));
return tasksFromJSON(doc, jobDefaults);
}
TaskSet tasksFromJSON(const rj::Value &spec, const ConfigValues &jobDefaults)
{
TaskSet tasks;
if (!spec.IsObject()) {
throw std::runtime_error("Tasks is not an object");
}
// Tasks
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (!it->name.IsString())
throw std::runtime_error("Task names must be a string.");
if (!it->value.IsObject())
throw std::runtime_error("Task definitions must be an object.");
const auto &taskName = it->name.GetString();
tasks.emplace(taskName, taskFromJSON(taskName, it->value, jobDefaults));
}
// Normalize tasks so all the children are populated
for (auto &[k, v] : tasks) {
for (const auto &p : v.parents) {
tasks[p].children.insert(k);
}
v.parents.clear();
}
return tasks;
}
// I really want to do this with rapidjson, but damn they make it ugly and
// difficult. So we'll shortcut and generate the JSON directly.
std::string taskToJSON(const Task &task)
{
std::stringstream ss;
bool first;
ss << "{"
<< R"("maxRetries": )" << task.maxRetries << ','
<< R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ',';
ss << R"("job": )" << configToJSON(task.job) << ',';
ss << R"("children": [)";
first = true;
for (const auto &child : task.children) {
if (!first)
ss << ',';
ss << std::quoted(child);
first = false;
}
ss << "],";
ss << R"("parents": [)";
first = true;
for (const auto &parent : task.parents) {
if (!first)
ss << ',';
ss << std::quoted(parent);
first = false;
}
ss << "],";
ss << R"("isGenerator": )" << (task.isGenerator ? "true" : "false");
ss << '}';
return ss.str();
}
std::string tasksToJSON(const TaskSet &tasks)
{
std::stringstream ss;
ss << "{";
bool first = true;
for (const auto &[name, task] : tasks) {
if (!first)
ss << ',';
ss << std::quoted(name) << ": " << taskToJSON(task);
first = false;
}
ss << "}";
return ss.str();
}
std::ostream &operator<<(std::ostream &os, const Task &task)
{
os << taskToJSON(task);
return os;
}
std::string attemptRecordToJSON(const AttemptRecord &record)
{
rj::Document doc;
doc.SetObject();
auto &alloc = doc.GetAllocator();
auto startTime = timePointToString(record.startTime);
doc.AddMember(
"startTime",
rj::Value().SetString(startTime.c_str(), startTime.size(), alloc),
alloc);
auto stopTime = timePointToString(record.stopTime);
doc.AddMember(
"stopTime",
rj::Value().SetString(stopTime.c_str(), stopTime.size(), alloc), alloc);
doc.AddMember("rc", rj::Value().SetInt(record.rc), alloc);
doc.AddMember("outputLog",
rj::Value().SetString(record.outputLog.c_str(),
record.outputLog.size(), alloc),
alloc);
doc.AddMember("errorLog",
rj::Value().SetString(record.errorLog.c_str(),
record.errorLog.size(), alloc),
alloc);
doc.AddMember("executorLog",
rj::Value().SetString(record.executorLog.c_str(),
record.executorLog.size(), alloc),
alloc);
return dumpJSON(doc);
}
AttemptRecord attemptRecordFromJSON(const std::string &json)
{
rj::Document doc;
checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord");
return attemptRecordFromJSON(doc);
}
AttemptRecord attemptRecordFromJSON(const rj::Value &spec)
{
AttemptRecord rec;
rec.startTime = stringToTimePoint(spec["startTime"].GetString());
rec.stopTime = stringToTimePoint(spec["stopTime"].GetString());
rec.rc = spec["rc"].GetInt();
rec.executorLog = spec["executorLog"].GetString();
rec.outputLog = spec["outputLog"].GetString();
rec.errorLog = spec["errorLog"].GetString();
return rec;
}
std::string timePointToString(const TimePoint &tp)
{
return std::to_string(tp.time_since_epoch().count());
}
TimePoint stringToTimePoint(const std::string &timeString)
{
using namespace std::chrono;
size_t nanos = std::stoull(timeString);
nanoseconds dur(nanos);
return TimePoint(dur);
}
DAGSpec dagFromJSON(const rj::Value &spec)
{
DAGSpec info;
if (!spec.IsObject()) {
throw std::runtime_error("Payload is not a dictionary.");
}
if (!spec.HasMember("tag")) {
throw std::runtime_error("DAG Run is missing a name.");
}
if (!spec.HasMember("tasks")) {
throw std::runtime_error("DAG Run has no tasks.");
}
info.tag = spec["tag"].GetString();
// Get parameters if there are any
if (spec.HasMember("parameters")) {
info.taskConfig.variables = configFromJSON(spec["parameters"]);
}
// Job Defaults
if (spec.HasMember("jobDefaults")) {
info.taskConfig.jobDefaults = configFromJSON(spec["jobDefaults"]);
}
// Get the tasks
info.tasks = tasksFromJSON(spec["tasks"], info.taskConfig.jobDefaults);
return info;
}
DAGSpec dagFromJSON(const std::string &jsonSpec)
{
rj::Document doc;
checkRJParse(doc.Parse(jsonSpec.c_str()), "Parsing config");
return dagFromJSON(doc);
}
std::string stateUpdateRecordToJSON(const logger::StateUpdateRecord &rec)
{
std::stringstream ss;
ss << R"({ "time": )" << std::quoted(timePointToString(rec.time))
<< R"(, "state": )" << std::quoted(rec.state._to_string()) << "}";
return ss.str();
}
logger::StateUpdateRecord stateUpdateRecordFromJSON(const rj::Value &json)
{
logger::StateUpdateRecord rec{.state = RunState::QUEUED};
if (!json.HasMember("time"))
throw std::runtime_error("StateUpdateRecord missing required field time");
if (!json.HasMember("state"))
throw std::runtime_error(
"StateUpdateRecord missing required field state");
rec.state = RunState::_from_string(json["state"].GetString());
rec.time = stringToTimePoint(json["time"].GetString());
return rec;
}
logger::StateUpdateRecord stateUpdateRecordFromJSON(const std::string &json)
{
rj::Document doc;
checkRJParse(doc.Parse(json.c_str()), "Parsing config");
return stateUpdateRecordFromJSON(doc);
}
} // namespace daggy

142
libdaggy/src/Utilities.cpp Normal file
View File

@@ -0,0 +1,142 @@
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <future>
#include <iomanip>
using namespace std::chrono_literals;
namespace daggy {
std::string globalSub(std::string string, const std::string &pattern,
const std::string &replacement)
{
size_t pos = string.find(pattern);
while (pos != std::string::npos) {
string.replace(pos, pattern.size(), replacement);
pos = string.find(pattern, pos + replacement.size());
}
return string;
}
std::vector<std::vector<std::string>> interpolateValues(
const std::vector<std::string> &raw, const ConfigValues &values)
{
std::vector<std::vector<std::string>> cooked{{}};
for (const auto &part : raw) {
std::vector<std::string> expandedPart{part};
// Find all values of parameters, and expand them
for (const auto &[paramRaw, paramValue] : values) {
std::string param = "{{" + paramRaw + "}}";
auto pos = part.find(param);
if (pos == std::string::npos)
continue;
std::vector<std::string> newExpandedPart;
if (std::holds_alternative<std::string>(paramValue)) {
for (auto &cmd : expandedPart) {
newExpandedPart.push_back(
globalSub(cmd, param, std::get<std::string>(paramValue)));
}
}
else {
for (const auto &val :
std::get<std::vector<std::string>>(paramValue)) {
for (const auto &cmd : expandedPart) {
newExpandedPart.push_back(globalSub(cmd, param, val));
}
}
}
expandedPart.swap(newExpandedPart);
}
std::vector<std::vector<std::string>> newCommands;
for (const auto &newPart : expandedPart) {
for (auto cmd : cooked) {
cmd.push_back(newPart);
newCommands.emplace_back(cmd);
}
}
cooked.swap(newCommands);
}
return cooked;
}
TaskSet expandTaskSet(const TaskSet &tasks,
executors::task::TaskExecutor &executor,
const ConfigValues &interpolatedValues)
{
// Expand the tasks first
TaskSet newTaskSet;
for (const auto &[baseName, task] : tasks) {
executor.validateTaskParameters(task.job);
const auto newJobs =
executor.expandTaskParameters(task.job, interpolatedValues);
size_t i = 0;
for (const auto &newJob : newJobs) {
Task newTask{task};
newTask.job = newJob;
newTaskSet.emplace(baseName + "_" + std::to_string(i), newTask);
++i;
}
}
return newTaskSet;
}
void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks)
{
// Add the missing vertices
for (const auto &[name, task] : tasks) {
dag.addVertex(name, task);
}
// Add edges
for (const auto &[name, t] : tasks) {
const auto &task = t;
dag.addEdgeIf(name, [&task](const auto &v) {
return task.children.count(v.data.definedName) > 0;
});
}
if (!dag.isValid()) {
throw std::runtime_error("DAG contains a cycle");
}
}
TaskDAG buildDAGFromTasks(
const TaskSet &tasks,
const std::unordered_map<std::string,
std::vector<loggers::dag_run::StateUpdateRecord>>
&updates)
{
TaskDAG dag;
updateDAGFromTasks(dag, tasks);
// Replay any updates
for (const auto &[taskName, taskUpdates] : updates) {
for (const auto &update : taskUpdates) {
switch (update.state) {
case RunState::RUNNING:
case RunState::RETRY:
case RunState::PAUSED:
case RunState::ERRORED:
case RunState::KILLED:
dag.setVertexState(taskName, RunState::RUNNING);
break;
case RunState::COMPLETED:
case RunState::QUEUED:
break;
}
}
}
return dag;
}
std::ostream &operator<<(std::ostream &os, const TimePoint &tp)
{
os << tp.time_since_epoch().count() << std::endl;
return os;
}
} // namespace daggy

View File

@@ -0,0 +1 @@
add_subdirectory(task)

View File

@@ -0,0 +1,5 @@
target_sources(${PROJECT_NAME} PRIVATE
SlurmTaskExecutor.cpp
NoopTaskExecutor.cpp
ForkingTaskExecutor.cpp
)

View File

@@ -0,0 +1,229 @@
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <wait.h>
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <iomanip>
using namespace daggy::executors::task;
std::string slurp(int fd)
{
std::string result;
const ssize_t BUFFER_SIZE = 4096;
char buffer[BUFFER_SIZE];
struct pollfd pfd
{
.fd = fd, .events = POLLIN, .revents = 0
};
poll(&pfd, 1, 1);
while (pfd.revents & POLLIN) {
ssize_t bytes = read(fd, buffer, BUFFER_SIZE);
if (bytes == 0) {
break;
}
else {
result.append(buffer, bytes);
}
pfd.revents = 0;
poll(&pfd, 1, 1);
}
return result;
}
ForkingTaskExecutor::ForkingTaskExecutor(size_t nThreads)
: tp_(nThreads)
{
}
ForkingTaskExecutor::~ForkingTaskExecutor()
{
std::lock_guard<std::mutex> lock(taskControlsGuard_);
taskControls_.clear();
}
bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
std::string key = std::to_string(runID) + "_" + taskName;
std::lock_guard<std::mutex> lock(taskControlsGuard_);
auto it = taskControls_.find(key);
if (it == taskControls_.end())
return true;
it->second = false;
return true;
}
std::future<daggy::AttemptRecord> ForkingTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
{
std::string key = std::to_string(runID) + "_" + taskName;
std::lock_guard<std::mutex> lock(taskControlsGuard_);
auto [it, ins] = taskControls_.emplace(key, true);
auto &running = it->second;
return tp_.addTask([this, task, &running, key]() {
auto ret = this->runTask(task, running);
std::lock_guard<std::mutex> lock(this->taskControlsGuard_);
this->taskControls_.extract(key);
return ret;
});
}
daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
std::atomic<bool> &running)
{
AttemptRecord rec;
rec.startTime = Clock::now();
// Need to convert the strings
std::vector<char *> argv;
std::vector<char *> envp;
// Populate the command
Command command;
if (task.job.count("commandString")) {
std::stringstream ss;
ss << std::get<std::string>(task.job.at("commandString"));
std::string tok;
while (ss >> std::quoted(tok)) {
command.push_back(tok);
}
}
else {
const auto cmd = std::get<Command>(task.job.at("command"));
std::copy(cmd.begin(), cmd.end(), std::back_inserter(command));
}
std::transform(
command.begin(), command.end(), std::back_inserter(argv),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
argv.push_back(nullptr);
// Populate the environment
auto environment = (task.job.count("environment") == 0
? std::vector<std::string>{}
: std::get<Command>(task.job.at("environment")));
std::transform(
environment.begin(), environment.end(), std::back_inserter(envp),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
envp.push_back(nullptr);
// Create the pipe
int stdoutPipe[2];
int pipeRC = pipe2(stdoutPipe, O_DIRECT);
if (pipeRC != 0)
throw std::runtime_error("Unable to create pipe for stdout");
int stderrPipe[2];
pipeRC = pipe2(stderrPipe, O_DIRECT);
if (pipeRC != 0)
throw std::runtime_error("Unable to create pipe for stderr");
pid_t child = fork();
if (child < 0) {
throw std::runtime_error("Unable to fork child");
}
else if (child == 0) { // child
while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {
}
while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {
}
close(stdoutPipe[0]);
close(stderrPipe[0]);
char **env = (envp.empty() ? nullptr : envp.data());
auto res = execvpe(argv[0], argv.data(), env);
std::cout << res << std::endl;
exit(errno);
}
std::atomic<bool> reading = true;
std::thread stdoutReader([&]() {
while (reading)
rec.outputLog.append(slurp(stdoutPipe[0]));
});
std::thread stderrReader([&]() {
while (reading)
rec.errorLog.append(slurp(stderrPipe[0]));
});
siginfo_t childInfo;
while (running) {
childInfo.si_pid = 0;
waitid(P_PID, child, &childInfo, WEXITED | WNOHANG);
if (childInfo.si_pid > 0) {
break;
}
std::this_thread::sleep_for(250ms);
}
if (!running) {
rec.executorLog = "Killed";
// Send the kills until pid is dead
while (kill(child, SIGKILL) != -1) {
// Need to collect the child to avoid a zombie process
waitid(P_PID, child, &childInfo, WEXITED | WNOHANG);
std::this_thread::sleep_for(50ms);
}
}
reading = false;
rec.stopTime = Clock::now();
if (childInfo.si_pid > 0) {
rec.rc = childInfo.si_status;
}
else {
rec.rc = -1;
}
stdoutReader.join();
stderrReader.join();
close(stdoutPipe[0]);
close(stderrPipe[0]);
return rec;
}
bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
// command or commandString is required
if (job.count("command")) {
if (!std::holds_alternative<Command>(job.at("command")))
throw std::runtime_error(R"(command must be an array of strings)");
}
else {
if (job.count("commandString") == 0) {
throw std::runtime_error(R"(command or commandString must be defined.)");
}
if (!std::holds_alternative<std::string>(job.at("commandString")))
throw std::runtime_error(R"(commandString must be a string)");
}
if (job.count("environment")) {
if (!std::holds_alternative<Command>(job.at("environment")))
throw std::runtime_error(R"(environment must be an array of strings)");
}
return true;
}
std::vector<daggy::ConfigValues> ForkingTaskExecutor::expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues)
{
std::vector<ConfigValues> newValues;
const auto command = std::get<Command>(job.at("command"));
for (const auto &expandedCommand :
interpolateValues(command, expansionValues)) {
ConfigValues newCommand{job};
newCommand.at("command") = expandedCommand;
newValues.emplace_back(newCommand);
}
return newValues;
}

View File

@@ -0,0 +1,51 @@
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/NoopTaskExecutor.hpp>
namespace daggy::executors::task {
std::future<daggy::AttemptRecord> NoopTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
{
std::promise<daggy::AttemptRecord> promise;
auto ts = Clock::now();
promise.set_value(AttemptRecord{.startTime = ts,
.stopTime = ts,
.rc = 0,
.executorLog = taskName,
.outputLog = taskName,
.errorLog = taskName});
return promise.get_future();
}
bool NoopTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
auto it = job.find("command");
if (it == job.end())
throw std::runtime_error(R"(job does not have a "command" argument)");
if (!std::holds_alternative<Command>(it->second))
throw std::runtime_error(
R"(taskParameter's "command" must be an array of strings)");
return true;
}
std::vector<daggy::ConfigValues> NoopTaskExecutor::expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues)
{
std::vector<ConfigValues> newValues;
const auto command = std::get<Command>(job.at("command"));
for (const auto &expandedCommand :
interpolateValues(command, expansionValues)) {
ConfigValues newCommand{job};
newCommand.at("command") = expandedCommand;
newValues.emplace_back(newCommand);
}
return newValues;
}
bool NoopTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
return true;
}
} // namespace daggy::executors::task

View File

@@ -0,0 +1,347 @@
#include <iomanip>
#include <iterator>
#include <mutex>
#include <stdexcept>
#ifdef DAGGY_ENABLE_SLURM
#include <slurm/slurm.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <csignal>
#include <cstdlib>
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/SlurmTaskExecutor.hpp>
#include <filesystem>
#include <fstream>
#include <random>
namespace fs = std::filesystem;
namespace daggy::executors::task {
std::string getUniqueTag(size_t nChars = 6)
{
std::string result(nChars, '\0');
static std::random_device dev;
static std::mt19937 rng(dev());
std::uniform_int_distribution<int> dist(0, 61);
const char *v =
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
for (size_t i = 0; i < nChars; i++) {
result[i] = v[dist(rng)];
}
return result;
}
void readAndClean(const fs::path &fn, std::string &dest)
{
if (!fs::exists(fn))
return;
std::ifstream ifh;
ifh.open(fn);
std::string contents(std::istreambuf_iterator<char>{ifh}, {});
ifh.close();
fs::remove_all(fn);
dest.swap(contents);
}
SlurmTaskExecutor::SlurmTaskExecutor()
: running_(true)
, monitorWorker_(&SlurmTaskExecutor::monitor, this)
{
std::string priority =
"SLURM_PRIO_PROCESS=" + std::to_string(getpriority(PRIO_PROCESS, 0));
std::string submitDir = "SLURM_SUBMIT_DIR=" + fs::current_path().string();
const size_t MAX_HOSTNAME_LENGTH = 50;
std::string submitHost(MAX_HOSTNAME_LENGTH, '\0');
gethostname(submitHost.data(), MAX_HOSTNAME_LENGTH);
submitHost = "SLURM_SUBMIT_HOST=" + submitHost;
submitHost.resize(submitHost.find('\0'));
uint32_t mask = umask(0);
umask(mask); // Restore the old mask
std::stringstream ss;
ss << "SLURM_UMASK=0" << uint32_t{((mask >> 6) & 07)}
<< uint32_t{((mask >> 3) & 07)} << uint32_t{(mask & 07)};
// Set some environment variables
putenv(const_cast<char *>(priority.c_str()));
putenv(const_cast<char *>(submitDir.c_str()));
putenv(const_cast<char *>(submitHost.c_str()));
putenv(const_cast<char *>(ss.str().c_str()));
}
SlurmTaskExecutor::~SlurmTaskExecutor()
{
running_ = false;
monitorWorker_.join();
// Resolve the remaining futures
std::lock_guard<std::mutex> lock(promiseGuard_);
for (auto &[jobID, job] : runningJobs_) {
job.prom.set_value(
AttemptRecord{.rc = -1, .executorLog = "executor killed"});
}
runningJobs_.clear();
}
// Validates the job to ensure that all required values are set and are of
// the right type,
bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
const std::unordered_set<std::string> requiredFields{
"minCPUs", "minMemoryMB", "minTmpDiskMB", "priority",
"timeLimitSeconds", "userID", "workDir", "tmpDir"};
for (const auto &requiredField : requiredFields) {
if (job.count(requiredField) == 0) {
throw std::runtime_error("Missing field " + requiredField);
}
}
// Require command or commandString
if (job.count("command") + job.count("commandString") == 0)
throw std::runtime_error(
"Either command or commandString must be specified");
if (job.count("environment")) {
if (!std::holds_alternative<Command>(job.at("environment")))
throw std::runtime_error(R"(environment must be an array of strings)");
}
return true;
}
std::vector<ConfigValues> SlurmTaskExecutor::expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues)
{
std::vector<ConfigValues> newValues;
const auto command = std::get<Command>(job.at("command"));
for (const auto &expandedCommand :
interpolateValues(command, expansionValues)) {
ConfigValues newCommand{job};
newCommand.at("command") = expandedCommand;
newValues.emplace_back(newCommand);
}
return newValues;
}
std::future<AttemptRecord> SlurmTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
{
std::stringstream executorLog;
const auto &job = task.job;
const auto uniqueTaskName = taskName + "_" + getUniqueTag(6);
fs::path tmpDir = std::get<std::string>(job.at("tmpDir"));
std::string stdoutFile = (tmpDir / (uniqueTaskName + ".stdout")).string();
std::string stderrFile = (tmpDir / (uniqueTaskName + ".stderr")).string();
std::string workDir = std::get<std::string>(job.at("workDir"));
// Convert command to argc / argv
std::vector<char *> argv{nullptr};
// Populate the command
Command command;
if (task.job.count("commandString")) {
std::stringstream ss;
ss << std::get<std::string>(task.job.at("commandString"));
std::string tok;
while (ss >> std::quoted(tok)) {
command.push_back(tok);
}
}
else {
const auto cmd = std::get<Command>(task.job.at("command"));
std::copy(cmd.begin(), cmd.end(), std::back_inserter(command));
}
std::transform(
command.begin(), command.end(), std::back_inserter(argv),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
argv.push_back(nullptr);
std::vector<std::string> env{""};
std::vector<char *> envp;
auto it = task.job.find("environment");
if (it != task.job.end()) {
const auto environment = std::get<Command>(task.job.at("environment"));
std::copy(environment.begin(), environment.end(),
std::back_inserter(env));
}
std::transform(
env.begin(), env.end(), std::back_inserter(envp),
[](const std::string &s) { return const_cast<char *>(s.c_str()); });
char script[] = "#!/bin/bash\n$@\n";
char stdinFile[] = "/dev/null";
// taken from slurm
int error_code;
job_desc_msg_t jd;
submit_response_msg_t *resp_msg;
slurm_init_job_desc_msg(&jd);
jd.contiguous = 1;
jd.name = const_cast<char *>(taskName.c_str());
jd.min_cpus = std::stoi(std::get<std::string>(job.at("minCPUs")));
jd.pn_min_memory = std::stoi(std::get<std::string>(job.at("minMemoryMB")));
jd.pn_min_tmp_disk =
std::stoi(std::get<std::string>(job.at("minTmpDiskMB")));
jd.priority = std::stoi(std::get<std::string>(job.at("priority")));
jd.shared = 0;
jd.time_limit =
std::stoi(std::get<std::string>(job.at("timeLimitSeconds")));
jd.min_nodes = 1;
jd.user_id = std::stoi(std::get<std::string>(job.at("userID")));
jd.argv = argv.data();
jd.argc = argv.size();
// TODO figure out the script to run
jd.script = script;
jd.std_in = stdinFile;
jd.std_err = const_cast<char *>(stderrFile.c_str());
jd.std_out = const_cast<char *>(stdoutFile.c_str());
jd.work_dir = const_cast<char *>(workDir.c_str());
// jd.env_size = 1;
// jd.environment = env;
jd.env_size = envp.size();
jd.environment = envp.data();
error_code = slurm_submit_batch_job(&jd, &resp_msg);
if (error_code) {
std::stringstream ss;
ss << "Unable to submit slurm job: " << slurm_strerror(error_code);
throw std::runtime_error(ss.str());
}
uint32_t jobID = resp_msg->job_id;
executorLog << "Job " << resp_msg->job_submit_user_msg << '\n';
slurm_free_submit_response_response_msg(resp_msg);
std::lock_guard<std::mutex> lock(promiseGuard_);
Job newJob{.prom{},
.stdoutFile = stdoutFile,
.stderrFile = stderrFile,
.runID = runID,
.taskName = taskName};
auto fut = newJob.prom.get_future();
runningJobs_.emplace(jobID, std::move(newJob));
return fut;
}
bool SlurmTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
// Hopefully this isn't a common thing, so just scrap the current jobs and
// kill them
size_t jobID = 0;
{
std::lock_guard<std::mutex> lock(promiseGuard_);
for (const auto &[k, v] : runningJobs_) {
if (v.runID == runID and v.taskName == taskName) {
jobID = k;
break;
}
}
if (jobID == 0)
return true;
}
// Send the kill message to slurm
slurm_kill_job(jobID, SIGKILL, KILL_HURRY);
return true;
}
void SlurmTaskExecutor::monitor()
{
std::unordered_set<size_t> resolvedJobs;
while (running_) {
{
std::lock_guard<std::mutex> lock(promiseGuard_);
for (auto &[jobID, job] : runningJobs_) {
job_info_msg_t *jobStatus;
int error_code =
slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL);
if (error_code != SLURM_SUCCESS)
continue;
uint32_t idx = jobStatus->record_count;
if (idx == 0)
continue;
idx--;
const slurm_job_info_t &jobInfo = jobStatus->job_array[idx];
AttemptRecord record;
switch (jobInfo.job_state) {
case JOB_PENDING:
case JOB_SUSPENDED:
case JOB_RUNNING:
continue;
// Job has finished
case JOB_COMPLETE: /* completed execution successfully */
record.rc = jobInfo.exit_code;
break;
case JOB_FAILED: /* completed execution unsuccessfully */
record.rc = jobInfo.exit_code;
record.executorLog = "Script errored.\n";
break;
case JOB_CANCELLED: /* cancelled by user */
record.rc = 9; // matches SIGKILL
record.executorLog = "Job cancelled by user.\n";
break;
case JOB_TIMEOUT: /* terminated on reaching time limit */
record.rc = jobInfo.exit_code;
record.executorLog = "Job exceeded time limit.\n";
break;
case JOB_NODE_FAIL: /* terminated on node failure */
record.rc = jobInfo.exit_code;
record.executorLog = "Node failed during execution\n";
break;
case JOB_PREEMPTED: /* terminated due to preemption */
record.rc = jobInfo.exit_code;
record.executorLog = "Job terminated due to pre-emption.\n";
break;
case JOB_BOOT_FAIL: /* terminated due to node boot failure */
record.rc = jobInfo.exit_code;
record.executorLog =
"Job failed to run due to failure of compute node to "
"boot.\n";
break;
case JOB_DEADLINE: /* terminated on deadline */
record.rc = jobInfo.exit_code;
record.executorLog = "Job terminated due to deadline.\n";
break;
case JOB_OOM: /* experienced out of memory error */
record.rc = jobInfo.exit_code;
record.executorLog = "Job terminated due to out-of-memory.\n";
break;
}
slurm_free_job_info_msg(jobStatus);
readAndClean(job.stdoutFile, record.outputLog);
readAndClean(job.stderrFile, record.errorLog);
job.prom.set_value(std::move(record));
resolvedJobs.insert(jobID);
}
for (const auto &jobID : resolvedJobs) {
runningJobs_.extract(jobID);
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
} // namespace daggy::executors::task
#endif

View File

@@ -0,0 +1 @@
add_subdirectory(dag_run)

View File

@@ -0,0 +1,5 @@
target_sources(${PROJECT_NAME} PRIVATE
OStreamLogger.cpp
RedisLogger.cpp
RedisHelper.cpp
)

View File

@@ -0,0 +1,180 @@
#include <enum.h>
#include <algorithm>
#include <daggy/Serialization.hpp>
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
#include <iterator>
namespace daggy::loggers::dag_run {
OStreamLogger::OStreamLogger(std::ostream &os)
: os_(os)
{
}
OStreamLogger::~OStreamLogger()
{
std::lock_guard<std::mutex> lock(guard_);
dagRuns_.clear();
}
// Execution
DAGRunID OStreamLogger::startDAGRun(const DAGSpec &dagSpec)
{
std::lock_guard<std::mutex> lock(guard_);
size_t runID = dagRuns_.size();
dagRuns_.emplace_back(DAGRunRecord{.dagSpec = dagSpec});
for (const auto &[name, _] : dagSpec.tasks) {
_updateTaskState(runID, name, RunState::QUEUED);
}
_updateDAGRunState(runID, RunState::QUEUED);
os_ << "Starting new DAGRun tagged " << dagSpec.tag << " with ID " << runID
<< " and " << dagSpec.tasks.size() << " tasks" << std::endl;
for (const auto &[name, task] : dagSpec.tasks) {
os_ << "TASK (" << name << "): " << configToJSON(task.job);
os_ << std::endl;
}
return runID;
}
void OStreamLogger::addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task)
{
std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID];
dagRun.dagSpec.tasks[taskName] = task;
_updateTaskState(dagRunID, taskName, RunState::QUEUED);
}
void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task)
{
std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID];
dagRun.dagSpec.tasks[taskName] = task;
}
void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state)
{
std::lock_guard<std::mutex> lock(guard_);
_updateDAGRunState(dagRunID, state);
}
void OStreamLogger::_updateDAGRunState(DAGRunID dagRunID, RunState state)
{
os_ << "DAG State Change(" << dagRunID << "): " << state._to_string()
<< std::endl;
dagRuns_[dagRunID].dagStateChanges.push_back({Clock::now(), state});
}
void OStreamLogger::logTaskAttempt(DAGRunID dagRunID,
const std::string &taskName,
const AttemptRecord &attempt)
{
std::lock_guard<std::mutex> lock(guard_);
const std::string &msg =
attempt.rc == 0 ? attempt.outputLog : attempt.errorLog;
os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC "
<< attempt.rc << ": " << msg << std::endl;
dagRuns_[dagRunID].taskAttempts[taskName].push_back(attempt);
}
void OStreamLogger::updateTaskState(DAGRunID dagRunID,
const std::string &taskName,
RunState state)
{
std::lock_guard<std::mutex> lock(guard_);
_updateTaskState(dagRunID, taskName, state);
}
void OStreamLogger::_updateTaskState(DAGRunID dagRunID,
const std::string &taskName,
RunState state)
{
auto &dagRun = dagRuns_.at(dagRunID);
dagRun.taskStateChanges[taskName].push_back({Clock::now(), state});
auto it = dagRun.taskRunStates.find(taskName);
if (it == dagRun.taskRunStates.end()) {
dagRun.taskRunStates.emplace(taskName, state);
}
else {
it->second = state;
}
os_ << "Task State Change (" << dagRunID << '/' << taskName
<< "): " << state._to_string() << std::endl;
}
// Querying
DAGSpec OStreamLogger::getDAGSpec(DAGRunID dagRunID)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagSpec;
};
std::vector<DAGRunSummary> OStreamLogger::queryDAGRuns(const std::string &tag,
bool all)
{
std::vector<DAGRunSummary> summaries;
std::lock_guard<std::mutex> lock(guard_);
size_t i = 0;
for (const auto &run : dagRuns_) {
if ((!all) &&
(run.dagStateChanges.back().state == +RunState::COMPLETED)) {
continue;
}
if (!tag.empty() and tag != run.dagSpec.tag)
continue;
TimePoint lastTaskUpdate;
for (const auto &[_, updates] : run.taskStateChanges) {
for (const auto &update : updates) {
if (update.time > lastTaskUpdate)
lastTaskUpdate = update.time;
}
}
DAGRunSummary summary{
.runID = i,
.tag = run.dagSpec.tag,
.runState = run.dagStateChanges.back().state,
.startTime = run.dagStateChanges.front().time,
.lastUpdate = std::max<TimePoint>(lastTaskUpdate,
run.dagStateChanges.back().time)};
for (const auto &[_, taskState] : run.taskRunStates) {
summary.taskStateCounts[taskState]++;
}
summaries.emplace_back(summary);
}
return summaries;
}
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID);
}
RunState OStreamLogger::getDAGRunState(DAGRunID dagRunID)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagStateChanges.back().state;
}
Task OStreamLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagSpec.tasks.at(taskName);
}
RunState OStreamLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).taskRunStates.at(taskName);
}
} // namespace daggy::loggers::dag_run

View File

@@ -0,0 +1,90 @@
#include <stdexcept>
#ifdef DAGGY_ENABLE_REDIS
#include <daggy/loggers/dag_run/RedisHelper.hpp>
namespace daggy::loggers::dag_run::redis {
RedisContext::RedisContext(const std::string &host, int port)
{
const struct timeval timeout = {0, 250000}; // .250 seconds
ctx_ = redisConnectWithTimeout(host.c_str(), port, timeout);
if (ctx_ == nullptr) {
throw std::runtime_error("Unable to ping redis server at " + host + ":" +
std::to_string(port));
}
}
RedisData RedisContext::parseReply_(const redisReply *reply)
{
RedisData data;
/*
switch (reply->type) {
case REDIS_REPLY_ERROR: {
std::cout << "\tERROR " << reply->str << std::endl;
break;
}
case REDIS_REPLY_STRING: {
std::cout << "\tSTRING" << std::endl;
break;
}
case REDIS_REPLY_VERB: {
std::cout << "\tVERB" << std::endl;
break;
}
case REDIS_REPLY_DOUBLE: {
std::cout << "\tDOUBLE" << std::endl;
break;
}
case REDIS_REPLY_INTEGER: {
std::cout << "\tINTEGER" << std::endl;
break;
}
case REDIS_REPLY_ARRAY: {
std::cout << "\tARRAY" << std::endl;
break;
}
case REDIS_REPLY_NIL: {
std::cout << "\tNIL" << std::endl;
break;
}
}
*/
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB: {
std::string raw(reply->str);
if (raw[0] == '"' and raw[raw.size() - 1] == '"') {
data = raw.substr(1, raw.size() - 2);
}
else {
data = RedisDatum{raw};
}
break;
}
case REDIS_REPLY_DOUBLE: {
data = RedisDatum{reply->dval};
break;
}
case REDIS_REPLY_INTEGER: {
data = RedisDatum{(size_t)reply->integer};
break;
}
case REDIS_REPLY_ARRAY: {
std::vector<RedisDatum> parts;
for (size_t i = 0UL; i < reply->elements; ++i) {
parts.push_back(parseReply_(reply->element[i]).asDatum());
}
data = parts;
break;
}
}
return data;
}
} // namespace daggy::loggers::dag_run::redis
#endif

View File

@@ -0,0 +1,265 @@
#include <stdexcept>
#ifdef DAGGY_ENABLE_REDIS
#include <enum.h>
#include <algorithm>
#include <daggy/Serialization.hpp>
#include <daggy/loggers/dag_run/RedisLogger.hpp>
#include <iomanip>
#include <iterator>
namespace daggy::loggers::dag_run {
RedisLogger::RedisLogger(const std::string &prefix, const std::string &host,
int port)
: prefix_(prefix)
, dagRunIDsKey_(prefix_ + "_dagRunIDs")
, ctx_(host, port)
{
auto resp = ctx_.query("exists %s", dagRunIDsKey_.c_str());
if (resp.as<size_t>() == 0) {
ctx_.query("set %s %s", dagRunIDsKey_.c_str(), "0");
}
}
// Execution
DAGRunID RedisLogger::startDAGRun(const DAGSpec &dagSpec)
{
auto resp = ctx_.query("incr %s", dagRunIDsKey_.c_str());
DAGRunID runID = resp.as<size_t>();
ctx_.query("SET %s %s", getTagKey_(runID).c_str(), dagSpec.tag.c_str());
ctx_.query("SET %s %s", getStartTimeKey_(runID).c_str(),
timePointToString(Clock::now()).c_str());
ctx_.query("SET %s %s", getTaskVariablesKey_(runID).c_str(),
configToJSON(dagSpec.taskConfig.variables).c_str());
ctx_.query("SET %s %s", getTaskDefaultsKey_(runID).c_str(),
configToJSON(dagSpec.taskConfig.jobDefaults).c_str());
for (const auto &[taskName, task] : dagSpec.tasks) {
ctx_.query("HSET %s %s %s", getTasksKey_(runID).c_str(), taskName.c_str(),
taskToJSON(task).c_str());
updateTaskState(runID, taskName, RunState::QUEUED);
}
// Store tasks, initial states
for (const auto &[taskName, task] : dagSpec.tasks) {
updateTaskState(runID, taskName, RunState::QUEUED);
}
// Update the dag run state
updateDAGRunState(runID, RunState::QUEUED);
return runID;
}
void RedisLogger::addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task)
{
updateTask(dagRunID, taskName, task);
updateTaskState(dagRunID, taskName, RunState::QUEUED);
}
void RedisLogger::updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task)
{
ctx_.query("HSET %s %s %s", getTasksKey_(dagRunID).c_str(),
taskName.c_str(), taskToJSON(task).c_str());
}
void RedisLogger::updateDAGRunState(DAGRunID dagRunID, RunState state)
{
// Set the state
ctx_.query("SET %s %s", getDAGStateKey_(dagRunID).c_str(),
state._to_string());
ctx_.query("SET %s %s", getLastUpdateKey_(dagRunID).c_str(),
timePointToString(Clock::now()).c_str());
// Add the update record
StateUpdateRecord rec{.time = Clock::now(), .state = state};
ctx_.query("RPUSH %s %s", getDAGStateUpdateKey_(dagRunID).c_str(),
stateUpdateRecordToJSON(rec).c_str());
}
void RedisLogger::logTaskAttempt(DAGRunID dagRunID,
const std::string &taskName,
const AttemptRecord &attempt)
{
std::string attemptJSON = attemptRecordToJSON(attempt);
ctx_.query("RPUSH %s %s", getTaskAttemptKey_(dagRunID, taskName).c_str(),
attemptJSON.c_str());
}
void RedisLogger::updateTaskState(DAGRunID dagRunID,
const std::string &taskName, RunState state)
{
// Set the state
ctx_.query(R"(HSET %s %s %s)", getTaskStatesKey_(dagRunID).c_str(),
taskName.c_str(), state._to_string());
ctx_.query(R"(SET %s %s)", getLastUpdateKey_(dagRunID),
timePointToString(Clock::now()).c_str());
// Add the update record
StateUpdateRecord rec{.time = Clock::now(), .state = state};
ctx_.query("RPUSH %s %s",
getTaskStateUpdateKey_(dagRunID, taskName).c_str(),
stateUpdateRecordToJSON(rec).c_str());
}
// Querying
DAGSpec RedisLogger::getDAGSpec(DAGRunID dagRunID)
{
DAGSpec spec;
spec.tag =
ctx_.query("GET %s", getTagKey_(dagRunID).c_str()).as<std::string>();
auto tasks = ctx_.query("HGETALL %s", getTasksKey_(dagRunID).c_str())
.asHash<std::string, std::string>();
for (const auto &[taskName, taskJSON] : tasks) {
spec.tasks.emplace(taskName, taskFromJSON(taskName, taskJSON));
}
auto taskVars = ctx_.query("GET %s", getTaskVariablesKey_(dagRunID).c_str())
.as<std::string>();
spec.taskConfig.variables = configFromJSON(taskVars);
auto jobDefaults =
ctx_.query("GET %s", getTaskDefaultsKey_(dagRunID).c_str())
.as<std::string>();
spec.taskConfig.jobDefaults = configFromJSON(jobDefaults);
return spec;
};
std::vector<DAGRunSummary> RedisLogger::queryDAGRuns(const std::string &tag,
bool all)
{
std::vector<DAGRunSummary> summaries;
auto reply = ctx_.query("GET %s", dagRunIDsKey_.c_str());
size_t maxRuns = std::stoull(reply.as<std::string>());
RunState state = RunState::QUEUED;
for (size_t runID = 1; runID <= maxRuns; ++runID) {
try {
state = getDAGRunState(runID);
}
catch (std::runtime_error &e) {
continue;
}
if (!all and state == +RunState::COMPLETED)
continue;
const auto dagTag =
ctx_.query("GET %s", getTagKey_(runID).c_str()).as<std::string>();
if (!tag.empty() and dagTag != tag)
continue;
const auto startTime =
ctx_.query("GET %s", getStartTimeKey_(runID).c_str())
.as<std::string>();
const auto lastTime =
ctx_.query("GET %s", getLastUpdateKey_(runID).c_str())
.as<std::string>();
DAGRunSummary summary{
.runID = runID,
.tag = dagTag,
.runState = state,
.startTime = stringToTimePoint(startTime),
.lastUpdate = stringToTimePoint(lastTime),
};
auto taskStates =
ctx_.query("HGETALL %s", getTaskStatesKey_(runID).c_str())
.asHash<std::string, std::string>();
for (const auto &[taskName, state] : taskStates) {
auto taskState = RunState::_from_string(state.c_str());
summary.taskStateCounts[taskState]++;
}
summaries.emplace_back(summary);
}
return summaries;
}
DAGRunRecord RedisLogger::getDAGRun(DAGRunID dagRunID)
{
DAGRunRecord rec;
rec.dagSpec = getDAGSpec(dagRunID);
// Populate DAG Updates
auto dagStateUpdates =
ctx_.query("LRANGE %s 0 -1", getDAGStateUpdateKey_(dagRunID).c_str())
.asList<std::string>();
std::transform(dagStateUpdates.begin(), dagStateUpdates.end(),
std::back_inserter(rec.dagStateChanges),
[](const auto &s) { return stateUpdateRecordFromJSON(s); });
// Populate taskRunStates
auto taskStates =
ctx_.query("HGETALL %s", getTaskStatesKey_(dagRunID).c_str())
.asHash<std::string, std::string>();
for (const auto &[taskName, state] : taskStates) {
rec.taskRunStates.emplace(taskName,
RunState::_from_string(state.c_str()));
}
for (const auto &[taskName, _] : rec.dagSpec.tasks) {
// Populate taskAttempts
auto taskAttempts =
ctx_.query("LRANGE %s 0 -1",
getTaskAttemptKey_(dagRunID, taskName).c_str())
.asList<std::string>();
std::transform(taskAttempts.begin(), taskAttempts.end(),
std::back_inserter(rec.taskAttempts[taskName]),
[](const auto &s) { return attemptRecordFromJSON(s); });
// Populate stateUpdates
auto taskStateUpdates =
ctx_.query("LRANGE %s 0 -1",
getTaskStateUpdateKey_(dagRunID, taskName).c_str())
.asList<std::string>();
auto &stateUpdates = rec.taskStateChanges[taskName];
std::transform(taskStateUpdates.begin(), taskStateUpdates.end(),
std::back_inserter(stateUpdates), [](const auto &s) {
return stateUpdateRecordFromJSON(s);
});
}
return rec;
}
RunState RedisLogger::getDAGRunState(DAGRunID dagRunID)
{
auto resp = ctx_.query("GET %s", getDAGStateKey_(dagRunID).c_str());
std::string stateStr = resp.as<std::string>();
if (stateStr.empty())
throw std::runtime_error("No such dagrun");
return RunState::_from_string(stateStr.c_str());
}
Task RedisLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
{
auto resp = ctx_.query("HGET %s %s", getTasksKey_(dagRunID).c_str(),
taskName.c_str());
return taskFromJSON(taskName, resp.as<std::string>());
}
RunState RedisLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{
auto resp = ctx_.query("HGET %s %s", getTaskStatesKey_(dagRunID).c_str(),
taskName.c_str());
return RunState::_from_string(resp.as<std::string>().c_str());
}
} // namespace daggy::loggers::dag_run
#endif