- Running project through clang-tidy formatter.

This commit is contained in:
Ian Roddis
2021-08-09 15:07:16 -03:00
parent 1fcdf66829
commit 30aea0818c
10 changed files with 129 additions and 99 deletions

View File

@@ -14,10 +14,12 @@ namespace rj = rapidjson;
namespace daggy { namespace daggy {
// Parameters // Parameters
ParameterValues parametersFromJSON(const std::string & jsonSpec); ParameterValues parametersFromJSON(const std::string &jsonSpec);
ParameterValues parametersFromJSON(const rj::Document & spec);
ParameterValues parametersFromJSON(const rj::Document &spec);
// Tasks // Tasks
std::vector<Task> tasksFromJSON(const std::string & jsonSpec, const ParameterValues & parameters = {}); std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {});
std::vector<Task> tasksFromJSON(const rj::Document & spec, const ParameterValues & parameters = {});
std::vector<Task> tasksFromJSON(const rj::Document &spec, const ParameterValues &parameters = {});
} }

View File

@@ -14,22 +14,22 @@
#include "DAG.hpp" #include "DAG.hpp"
namespace daggy { namespace daggy {
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters); std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters);
DAG buildDAGFromTasks(const std::vector<Task> & tasks); DAG buildDAGFromTasks(const std::vector<Task> &tasks);
// Blocking call // Blocking call
std::vector<AttemptRecord> std::vector<AttemptRecord>
runTask(DAGRunID runID, runTask(DAGRunID runID,
TaskID taskID, TaskID taskID,
const Task & task, const Task &task,
executors::task::TaskExecutor & executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase & logger); loggers::dag_run::DAGLoggerBase &logger);
void runDAG(DAGRunID runID, void runDAG(DAGRunID runID,
std::vector<Task> tasks, std::vector<Task> tasks,
executors::task::TaskExecutor & executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase & logger, loggers::dag_run::DAGLoggerBase &logger,
DAG dag); DAG dag);
} }

View File

@@ -8,8 +8,7 @@ namespace daggy {
class ForkingTaskExecutor : public TaskExecutor { class ForkingTaskExecutor : public TaskExecutor {
public: public:
ForkingTaskExecutor(size_t nThreads) ForkingTaskExecutor(size_t nThreads)
: TaskExecutor(nThreads) : TaskExecutor(nThreads) {}
{}
const std::string getName() const override { return "ForkingTaskExecutor"; } const std::string getName() const override { return "ForkingTaskExecutor"; }

View File

@@ -16,7 +16,7 @@ namespace daggy {
enum class RunState : uint32_t { enum class RunState : uint32_t {
QUEUED = 0, QUEUED = 0,
RUNNING = 1, RUNNING = 1,
RETRY = 1 << 1, RETRY = 1 << 1,
ERRORED = 1 << 2, ERRORED = 1 << 2,
KILLED = 1 << 3, KILLED = 1 << 3,
COMPLETED = 1 << 4 COMPLETED = 1 << 4
@@ -46,7 +46,7 @@ namespace daggy {
struct DAGRunSummary { struct DAGRunSummary {
DAGRunID runID; DAGRunID runID;
std::string name; std::string name;
RunState runState; RunState runState;
TimePoint startTime; TimePoint startTime;
TimePoint lastUpdate; TimePoint lastUpdate;
std::unordered_map<RunState, size_t> taskStateCounts; std::unordered_map<RunState, size_t> taskStateCounts;
@@ -55,13 +55,17 @@ namespace daggy {
class DAGLoggerBase { class DAGLoggerBase {
public: public:
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0; virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0; virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) = 0;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0; virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0; virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0; virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
}; };
} }

View File

@@ -39,13 +39,17 @@ namespace daggy {
FileSystemLogger(fs::path root); FileSystemLogger(fs::path root);
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override; virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override; virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
private: private:
@@ -56,7 +60,9 @@ namespace daggy {
// std::unordered_map<fs::path, std::mutex> runLocks; // std::unordered_map<fs::path, std::mutex> runLocks;
inline const fs::path getCurrentPath() const; inline const fs::path getCurrentPath() const;
inline const fs::path getRunsRoot() const; inline const fs::path getRunsRoot() const;
inline const fs::path getRunRoot(DAGRunID runID) const; inline const fs::path getRunRoot(DAGRunID runID) const;
}; };
} }

View File

@@ -17,13 +17,17 @@ namespace daggy {
StdOutLogger(); StdOutLogger();
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override; virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override; virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override; virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override; virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId); virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
private: private:

View File

@@ -3,28 +3,30 @@
namespace daggy { namespace daggy {
ParameterValues parametersFromJSON(const std::string & jsonSpec) { ParameterValues parametersFromJSON(const std::string &jsonSpec) {
rj::Document doc; rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) { if (!parseResult) {
throw std::runtime_error("Parameters spec is not valid JSON"); throw std::runtime_error("Parameters spec is not valid JSON");
} }
return parametersFromJSON(doc); return parametersFromJSON(doc);
} }
ParameterValues parametersFromJSON(const rj::Document & spec) { ParameterValues parametersFromJSON(const rj::Document &spec) {
std::unordered_map<std::string, ParameterValue> parameters; std::unordered_map<std::string, ParameterValue> parameters;
if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); } if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); }
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) { for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
if (! it->name.IsString()) { if (!it->name.IsString()) {
throw std::runtime_error("All keys must be strings."); throw std::runtime_error("All keys must be strings.");
} }
std::string name = std::string{"{{"} + it->name.GetString() + "}}"; std::string name = std::string{"{{"} + it->name.GetString() + "}}";
if (it->value.IsArray()) { if (it->value.IsArray()) {
std::vector<std::string> values; std::vector<std::string> values;
for (size_t i = 0; i < it->value.Size(); ++i) { for (size_t i = 0; i < it->value.Size(); ++i) {
if (! it->value[i].IsString()) { if (!it->value[i].IsString()) {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) + " is not a string."); throw std::runtime_error(
"Attribute for " + std::string{it->name.GetString()} + " item " + std::to_string(i) +
" is not a string.");
} }
values.emplace_back(it->value[i].GetString()); values.emplace_back(it->value[i].GetString());
} }
@@ -32,22 +34,23 @@ namespace daggy {
} else if (it->value.IsString()) { } else if (it->value.IsString()) {
parameters[name] = it->value.GetString(); parameters[name] = it->value.GetString();
} else { } else {
throw std::runtime_error("Attribute for " + std::string{it->name.GetString()} + " is not a string or an array."); throw std::runtime_error(
"Attribute for " + std::string{it->name.GetString()} + " is not a string or an array.");
} }
} }
return parameters; return parameters;
} }
std::vector<Task> tasksFromJSON(const std::string & jsonSpec, const ParameterValues & parameters) { std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters) {
rj::Document doc; rj::Document doc;
rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str()); rj::ParseResult parseResult = doc.Parse(jsonSpec.c_str());
if (! parseResult) { if (!parseResult) {
throw std::runtime_error("Unable to parse spec: "); throw std::runtime_error("Unable to parse spec: ");
} }
return tasksFromJSON(doc, parameters); return tasksFromJSON(doc, parameters);
} }
std::vector<Task> tasksFromJSON(const rj::Document & spec, const ParameterValues & parameters) { std::vector<Task> tasksFromJSON(const rj::Document &spec, const ParameterValues &parameters) {
std::vector<Task> tasks; std::vector<Task> tasks;
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); } if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); }
@@ -59,13 +62,13 @@ namespace daggy {
// Tasks // Tasks
for (size_t i = 0; i < spec.Size(); ++i) { for (size_t i = 0; i < spec.Size(); ++i) {
if (! spec[i].IsObject()) { if (!spec[i].IsObject()) {
throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary."); throw std::runtime_error("Task " + std::to_string(i) + " is not a dictionary.");
} }
const auto & taskSpec = spec[i].GetObject(); const auto &taskSpec = spec[i].GetObject();
for (const auto & reqField : reqFields) { for (const auto &reqField : reqFields) {
if (! taskSpec.HasMember(reqField.c_str())) { if (!taskSpec.HasMember(reqField.c_str())) {
throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField); throw std::runtime_error("Task " + std::to_string(i) + " is missing required field " + reqField);
} }
} }
@@ -77,18 +80,19 @@ namespace daggy {
uint8_t maxRetries = 0; uint8_t maxRetries = 0;
if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); } if (taskSpec.HasMember("maxRetries")) { maxRetries = taskSpec["maxRetries"].GetInt(); }
uint8_t retryIntervalSeconds = 0; uint8_t retryIntervalSeconds = 0;
if (taskSpec.HasMember("retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); } if (taskSpec.HasMember(
"retryIntervalSeconds")) { retryIntervalSeconds = taskSpec["retryIntervalSeconds"].GetInt(); }
// Children / parents // Children / parents
std::vector<std::string> children; std::vector<std::string> children;
if (taskSpec.HasMember("children")) { if (taskSpec.HasMember("children")) {
const auto & specChildren = taskSpec["children"].GetArray(); const auto &specChildren = taskSpec["children"].GetArray();
for (size_t c = 0; c < specChildren.Size(); ++c) { for (size_t c = 0; c < specChildren.Size(); ++c) {
children.emplace_back(specChildren[c].GetString()); children.emplace_back(specChildren[c].GetString());
} }
} }
if (taskSpec.HasMember("parents")) { if (taskSpec.HasMember("parents")) {
const auto & specParents = taskSpec["parents"].GetArray(); const auto &specParents = taskSpec["parents"].GetArray();
for (size_t c = 0; c < specParents.Size(); ++c) { for (size_t c = 0; c < specParents.Size(); ++c) {
parentMap[name].emplace_back(specParents[c].GetString()); parentMap[name].emplace_back(specParents[c].GetString());
} }
@@ -102,35 +106,35 @@ namespace daggy {
auto commands = expandCommands(command, parameters); auto commands = expandCommands(command, parameters);
// Create the tasks // Create the tasks
auto & taskNames = childrenMap[name]; auto &taskNames = childrenMap[name];
for (size_t tid = 0; tid < commands.size(); ++tid) { for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = name + "_" + std::to_string(tid); std::string taskName = name + "_" + std::to_string(tid);
taskNames.push_back(taskName); taskNames.push_back(taskName);
tasks.emplace_back(Task { tasks.emplace_back(Task{
.name = name + "_" + std::to_string(tid), .name = name + "_" + std::to_string(tid),
.command = commands[tid], .command = commands[tid],
.maxRetries = maxRetries, .maxRetries = maxRetries,
.retryIntervalSeconds = retryIntervalSeconds, .retryIntervalSeconds = retryIntervalSeconds,
.children = children .children = children
}); });
} }
} }
// Update any missing child -> parent relationship // Update any missing child -> parent relationship
for (auto & task : tasks) { for (auto &task : tasks) {
auto pit = parentMap.find(task.name); auto pit = parentMap.find(task.name);
if (pit == parentMap.end()) { continue; } if (pit == parentMap.end()) { continue; }
for (const auto & parent : pit->second) { for (const auto &parent : pit->second) {
tasks[taskIndex[parent]].children.emplace_back(task.name); tasks[taskIndex[parent]].children.emplace_back(task.name);
} }
} }
// At the end, replace the names of the children with all the expanded versions // At the end, replace the names of the children with all the expanded versions
for (auto & task : tasks) { for (auto &task : tasks) {
std::vector<std::string> children; std::vector<std::string> children;
for (const auto & child : task.children) { for (const auto &child : task.children) {
auto & newChildren = childrenMap[child]; auto &newChildren = childrenMap[child];
std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children)); std::copy(newChildren.begin(), newChildren.end(), std::back_inserter(children));
} }
task.children.swap(children); task.children.swap(children);

View File

@@ -2,16 +2,16 @@
namespace daggy { namespace daggy {
std::vector<std::vector<std::string>> std::vector<std::vector<std::string>>
expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters) { expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters) {
std::vector<std::vector<std::string>> commands{ {} }; std::vector<std::vector<std::string>> commands{{}};
for (const auto & part : command) { for (const auto &part : command) {
// this isn't an interpolated value // this isn't an interpolated value
if (parameters.find(part) == parameters.end()) { if (parameters.find(part) == parameters.end()) {
for (auto &cmd : commands) cmd.push_back(part); for (auto &cmd : commands) cmd.push_back(part);
continue; continue;
} }
auto & inVal = parameters.at(part); auto &inVal = parameters.at(part);
if (std::holds_alternative<std::string>(inVal)) { if (std::holds_alternative<std::string>(inVal)) {
for (auto &cmd : commands) cmd.push_back(std::get<std::string>(inVal)); for (auto &cmd : commands) cmd.push_back(std::get<std::string>(inVal));
continue; continue;
@@ -19,7 +19,7 @@ namespace daggy {
// Ends up being expensive, as it's a cartesian product // Ends up being expensive, as it's a cartesian product
std::vector<std::vector<std::string>> newCommands; std::vector<std::vector<std::string>> newCommands;
for (const auto & val : std::get<std::vector<std::string>>(inVal)) { for (const auto &val : std::get<std::vector<std::string>>(inVal)) {
for (auto cmd : commands) { for (auto cmd : commands) {
cmd.push_back(val); cmd.push_back(val);
newCommands.push_back(cmd); newCommands.push_back(cmd);
@@ -30,7 +30,7 @@ namespace daggy {
return commands; return commands;
} }
DAG buildDAGFromTasks(const std::vector<Task> & tasks) { DAG buildDAGFromTasks(const std::vector<Task> &tasks) {
DAG dag; DAG dag;
std::unordered_map<std::string, size_t> taskIDs; std::unordered_map<std::string, size_t> taskIDs;
@@ -50,27 +50,26 @@ namespace daggy {
} }
std::vector<AttemptRecord> runTask(DAGRunID runID, std::vector<AttemptRecord> runTask(DAGRunID runID,
TaskID taskID, TaskID taskID,
const Task & task, const Task &task,
executors::task::TaskExecutor & executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase & logger) loggers::dag_run::DAGLoggerBase &logger) {
{
std::vector<AttemptRecord> attempts; std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING ); logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING);
while (attempts.size() < task.maxRetries + 1) { while (attempts.size() < task.maxRetries + 1) {
attempts.push_back(executor.runCommand(task.command)); attempts.push_back(executor.runCommand(task.command));
logger.logTaskAttempt(runID, taskID, attempts.back()); logger.logTaskAttempt(runID, taskID, attempts.back());
if (attempts.back().rc == 0) break; if (attempts.back().rc == 0) break;
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY ); logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY);
} }
return attempts; return attempts;
} }
void runDAG(DAGRunID runID, void runDAG(DAGRunID runID,
std::vector<Task> tasks, std::vector<Task> tasks,
executors::task::TaskExecutor & executor, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase & logger, loggers::dag_run::DAGLoggerBase &logger,
DAG dag) { DAG dag) {
logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING); logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING);
@@ -90,15 +89,15 @@ namespace daggy {
if (taskState.fut.valid()) { if (taskState.fut.valid()) {
auto attemptRecords = taskState.fut.get(); auto attemptRecords = taskState.fut.get();
if (attemptRecords.empty()) { if (attemptRecords.empty()) {
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED ); logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED);
continue; continue;
} }
if (attemptRecords.back().rc == 0) { if (attemptRecords.back().rc == 0) {
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED ); logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED);
dag.completeVisit(taskState.tid); dag.completeVisit(taskState.tid);
taskState.complete = true; taskState.complete = true;
} else { } else {
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED ); logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED);
} }
} }
} }
@@ -110,9 +109,11 @@ namespace daggy {
// Schedule the task to run // Schedule the task to run
auto tid = t.value(); auto tid = t.value();
TaskState tsk{ TaskState tsk{
.tid = tid, .tid = tid,
.fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() {return runTask(runID, tid, tasks[tid], executor, logger);}), .fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() {
.complete = false return runTask(runID, tid, tasks[tid], executor, logger);
}),
.complete = false
}; };
taskStates.push_back(std::move(tsk)); taskStates.push_back(std::move(tsk));
@@ -120,7 +121,7 @@ namespace daggy {
if (not nextTask.has_value()) break; if (not nextTask.has_value()) break;
t.emplace(nextTask.value()); t.emplace(nextTask.value());
} }
if (! tq->empty()) { if (!tq->empty()) {
executor.threadPool.addTasks(tq); executor.threadPool.addTasks(tq);
} }
std::this_thread::sleep_for(250ms); std::this_thread::sleep_for(250ms);

View File

@@ -6,32 +6,34 @@ using namespace daggy::loggers::dag_run;
namespace daggy { namespace daggy {
inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; } inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; } inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); }
inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const {
return getRunsRoot() / std::to_string(runID);
}
FileSystemLogger::FileSystemLogger(fs::path root) FileSystemLogger::FileSystemLogger(fs::path root)
: root_(root) : root_(root), nextRunID_(0) {
, nextRunID_(0) const std::vector<fs::path> reqPaths{root_, getCurrentPath(), getRunsRoot()};
{ for (const auto &path : reqPaths) {
const std::vector<fs::path> reqPaths{ root_, getCurrentPath(), getRunsRoot()}; if (!fs::exists(path)) { fs::create_directory(path); }
for (const auto & path : reqPaths) {
if (! fs::exists(path)) { fs::create_directory(path); }
} }
// Get the next run ID // Get the next run ID
size_t runID = 0; size_t runID = 0;
for (auto & dir : fs::directory_iterator(getRunsRoot())) { for (auto &dir : fs::directory_iterator(getRunsRoot())) {
try { try {
runID = std::stoull(dir.path().stem()); runID = std::stoull(dir.path().stem());
if (runID > nextRunID_) nextRunID_ = runID + 1; if (runID > nextRunID_) nextRunID_ = runID + 1;
} catch (std::exception & e) { } catch (std::exception &e) {
continue; continue;
} }
} }
} }
// Execution // Execution
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){ DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) {
DAGRunID runID = nextRunID_++; DAGRunID runID = nextRunID_++;
// TODO make this threadsafe // TODO make this threadsafe
@@ -40,11 +42,15 @@ namespace daggy {
// Init the directory // Init the directory
} }
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ }
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ } void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {}
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state){ }
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {}
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) {}
// Querying // Querying
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask){ } std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) {}
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {} DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {}
} }

View File

@@ -5,34 +5,38 @@
namespace daggy { namespace daggy {
namespace loggers { namespace loggers {
namespace dag_run { namespace dag_run {
StdOutLogger::StdOutLogger() : nextRunID_(0) { } StdOutLogger::StdOutLogger() : nextRunID_(0) {}
// Execution // Execution
DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector<Task> & tasks) { DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
size_t runID = nextRunID_++; size_t runID = nextRunID_++;
std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl; std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
<< " tasks" << std::endl;
return runID; return runID;
} }
void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
std::cout << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl; std::cout << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl;
} }
void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord & attempt){ void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord &attempt) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
const std::string & msg = attempt.rc == 0 ? attempt.output : attempt.error; const std::string &msg = attempt.rc == 0 ? attempt.output : attempt.error;
std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": " << msg << std::endl; std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": "
<< msg << std::endl;
} }
void StdOutLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) { void StdOutLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state) << std::endl; std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state)
<< std::endl;
} }
// Querying // Querying
std::vector<DAGRunSummary> StdOutLogger::getDAGs(uint32_t stateMask){ return {}; } std::vector<DAGRunSummary> StdOutLogger::getDAGs(uint32_t stateMask) { return {}; }
DAGRunRecord StdOutLogger::getDAGRun(DAGRunID dagRunId) { return {}; } DAGRunRecord StdOutLogger::getDAGRun(DAGRunID dagRunId) { return {}; }
} }
} }