- More work on DAGLoggers

- Still need unit tests for the FilesystemLogger
This commit is contained in:
Ian Roddis
2021-08-13 10:23:55 -03:00
parent 4d519cc596
commit 9f90f54b67
13 changed files with 221 additions and 54 deletions

View File

@@ -10,8 +10,8 @@ namespace daggy {
TimePoint startTime;
TimePoint stopTime;
int rc; // RC from the task
std::string metaLog; // Logs from the dag_executor
std::string output; // stdout from command
std::string error; // stderr from command
std::string executorLog; // Logs from the dag_executor
std::string outputLog; // stdout from command
std::string errorLog; // stderr from command
};
}

View File

@@ -20,4 +20,5 @@ namespace daggy {
using DAGDefID = int16_t;
using DAGRunID = size_t;
using TaskID = size_t;
}

View File

@@ -9,6 +9,7 @@
#include "Defines.hpp"
#include "Task.hpp"
#include "AttemptRecord.hpp"
namespace rj = rapidjson;
@@ -16,17 +17,24 @@ namespace daggy {
// Parameters
ParameterValues parametersFromJSON(const std::string &jsonSpec);
ParameterValues parametersFromJSON(const rj::Document &spec);
ParameterValues parametersFromJSON(const rj::Value &spec);
// Tasks
std::vector<Task> tasksFromJSON(const std::string &jsonSpec, const ParameterValues &parameters = {});
std::vector<Task> tasksFromJSON(const rj::Document &spec, const ParameterValues &parameters = {});
std::vector<Task> tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters = {});
std::string taskToJSON(const Task &task);
std::string tasksToJSON(const std::vector<Task> &tasks);
// Attempt Records
std::string attemptRecordToJSON(const AttemptRecord &attemptRecord);
// default serialization
std::ostream &operator<<(std::ostream &os, const Task &task);
std::string timePointToString(const TimePoint &tp);
TimePoint stringToTimePoint(const std::string &timeStr);
}

View File

@@ -32,4 +32,5 @@ namespace daggy {
loggers::dag_run::DAGRunLogger &logger,
DAG dag);
std::ostream &operator<<(std::ostream &os, const TimePoint &tp);
}

View File

@@ -23,9 +23,10 @@ namespace daggy {
virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0;
virtual void
logTaskAttempt(DAGRunID dagRunID, const std::string &taskName, const AttemptRecord &attempt) = 0;
virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) = 0;
virtual void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) = 0;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;

View File

@@ -21,36 +21,36 @@ namespace daggy {
*
* Filesystem logger creates the following structure:
* {root}/
* current/
* {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups
* runs/
* {runID}/
* meta.json --- Contains the DAG name, task definitions
* {taskID}/
* states --- State changes
* states.csv --- DAG state changes
* {taskName}/
* states.csv --- TASK state changes
* {attempt}/
* meta.json --- timestamps and rc
* stdout
* stderr
* execlog
* metadata.json --- timestamps and rc
* output.log
* error.log
* executor.log
*/
class FileSystemLogger : public DAGRunLogger {
public:
FileSystemLogger(fs::path root);
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override;
void
logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override;
virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override;
void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID);
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
private:
fs::path root_;

View File

@@ -17,18 +17,19 @@ namespace daggy {
OStreamLogger(std::ostream &os);
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) override;
void
logTaskAttempt(DAGRunID, const std::string &taskName, const AttemptRecord &attempt) override;
virtual void updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) override;
void updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) override;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID);
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
private:
DAGRunID nextRunID_;

View File

@@ -15,7 +15,7 @@ namespace daggy {
return parametersFromJSON(doc);
}
ParameterValues parametersFromJSON(const rj::Document &spec) {
ParameterValues parametersFromJSON(const rj::Value &spec) {
std::unordered_map<std::string, ParameterValue> parameters;
if (!spec.IsObject()) { throw std::runtime_error("Parameters in spec is not a JSON dictionary"); }
for (auto it = spec.MemberBegin(); it != spec.MemberEnd(); ++it) {
@@ -53,7 +53,7 @@ namespace daggy {
return tasksFromJSON(doc, parameters);
}
std::vector<Task> tasksFromJSON(const rj::Document &spec, const ParameterValues &parameters) {
std::vector<Task> tasksFromJSON(const rj::Value &spec, const ParameterValues &parameters) {
std::vector<Task> tasks;
if (!spec.IsArray()) { throw std::runtime_error("Tasks is not an array"); }
@@ -200,4 +200,32 @@ namespace daggy {
os << taskToJSON(task);
return os;
}
std::string attemptRecordToJSON(const AttemptRecord &record) {
std::stringstream ss;
ss << "{"
<< R"("startTime": )" << std::quoted(timePointToString(record.startTime)) << ','
<< R"("stopTime": )" << std::quoted(timePointToString(record.stopTime)) << ','
<< R"("rc": )" << std::to_string(record.rc) << ','
<< R"("executorLog": )" << std::quoted(record.executorLog) << ','
<< R"("outputLog": )" << std::quoted(record.outputLog) << ','
<< R"("errorLog": )" << std::quoted(record.errorLog)
<< '}';
return ss.str();
}
std::string timePointToString(const TimePoint &tp) {
std::stringstream ss;
ss << tp;
return ss.str();
}
TimePoint stringToTimePoint(const std::string &timeString) {
std::tm dt;
std::stringstream ss{timeString};
ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z");
return Clock::from_time_t(mktime(&dt));
}
}

View File

@@ -1,3 +1,5 @@
#include <iomanip>
#include <daggy/Utilities.hpp>
namespace daggy {
@@ -55,13 +57,13 @@ namespace daggy {
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger) {
std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING);
logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RUNNING);
while (attempts.size() < task.maxRetries + 1) {
attempts.push_back(executor.runCommand(task.command));
logger.logTaskAttempt(runID, taskID, attempts.back());
logger.logTaskAttempt(runID, task.name, attempts.back());
if (attempts.back().rc == 0) break;
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY);
logger.updateTaskState(runID, task.name, loggers::dag_run::RunState::RETRY);
}
return attempts;
}
@@ -88,16 +90,17 @@ namespace daggy {
if (taskState.fut.valid()) {
auto attemptRecords = taskState.fut.get();
const auto &taskName = tasks[taskState.tid].name;
if (attemptRecords.empty()) {
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED);
logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED);
continue;
}
if (attemptRecords.back().rc == 0) {
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED);
logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::COMPLETED);
dag.completeVisit(taskState.tid);
taskState.complete = true;
} else {
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED);
logger.updateTaskState(runID, taskName, loggers::dag_run::RunState::ERRORED);
}
}
}
@@ -127,4 +130,10 @@ namespace daggy {
std::this_thread::sleep_for(250ms);
}
}
std::ostream &operator<<(std::ostream &os, const TimePoint &tp) {
auto t_c = Clock::to_time_t(tp);
os << std::put_time(std::localtime(&t_c), "%Y-%m-%d %H:%M:%S %Z");
return os;
}
}

View File

@@ -62,8 +62,8 @@ ForkingTaskExecutor::runCommand(std::vector<std::string> cmd) {
}
std::atomic<bool> running = true;
std::thread stdoutReader([&]() { while (running) rec.output.append(slurp(stdoutPipe[0])); });
std::thread stderrReader([&]() { while (running) rec.error.append(slurp(stderrPipe[0])); });
std::thread stdoutReader([&]() { while (running) rec.outputLog.append(slurp(stdoutPipe[0])); });
std::thread stderrReader([&]() { while (running) rec.errorLog.append(slurp(stderrPipe[0])); });
int rc = 0;
waitpid(child, &rc, 0);

View File

@@ -1,4 +1,10 @@
#include <fstream>
#include <magic_enum.hpp>
#include <daggy/loggers/dag_run/FileSystemLogger.hpp>
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
namespace fs = std::filesystem;
@@ -44,17 +50,128 @@ namespace daggy {
fs::create_directories(runRoot);
// Create meta.json with DAGRun Name and task definitions
std::ofstream ofh(runRoot / "metadata.json", std::ios::trunc | std::ios::binary);
ofh << R"({ "name": )" << std::quoted(name) << R"(, "tasks": )" << tasksToJSON(tasks) << "}\n";
ofh.close();
// Task directories
for (const auto &task : tasks) {
auto taskDir = runRoot / task.name;
fs::create_directories(taskDir);
std::ofstream ofh(taskDir / "states.csv");
}
return runID;
}
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {}
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {
std::ofstream ofh(getRunRoot(dagRunID) / "states.csv", std::ios::binary | std::ios::app);
ofh << std::quoted(timePointToString(Clock::now())) << ',' << magic_enum::enum_name(state) << '\n';
ofh.flush();
ofh.close();
}
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord &attempt) {}
void
FileSystemLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName,
const AttemptRecord &attempt) {
auto taskRoot = getRunRoot(dagRunID) / taskName;
size_t i = 1;
while (fs::exists(taskRoot / std::to_string(i))) { ++i; }
void FileSystemLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {}
auto attemptDir = taskRoot / std::to_string(i);
fs::create_directories(attemptDir);
std::ofstream ofh;
// Metadata
ofh.open(attemptDir / "metadata.json");
ofh << "{\n"
<< R"("startTime": )" << std::quoted(timePointToString(attempt.startTime)) << ",\n"
<< R"("stopTime": )" << std::quoted(timePointToString(attempt.stopTime)) << ",\n"
<< R"("rc": )" << attempt.rc << '\n'
<< '}';
// output
ofh.open(attemptDir / "executor.log");
ofh << attempt.executorLog << std::flush;
ofh.close();
// Output
ofh.open(attemptDir / "output.log");
ofh << attempt.outputLog << std::flush;
ofh.close();
// Error
ofh.open(attemptDir / "error.log");
ofh << attempt.errorLog << std::flush;
ofh.close();
}
void FileSystemLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) {
std::ofstream ofh(getRunRoot(dagRunID) / taskName / "states.csv", std::ios::binary | std::ios::app);
ofh << std::quoted(timePointToString(Clock::now())) << ',' << magic_enum::enum_name(state) << '\n';
ofh.flush();
ofh.close();
}
// Querying
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) { return {}; }
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask) {
return {};
}
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) { return {}; }
}
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID) {
DAGRunRecord record;
auto runRoot = getRunRoot(dagRunID);
if (!fs::exists(runRoot)) {
throw std::runtime_error("No DAGRun with that ID exists");
}
std::ifstream ifh(runRoot / "metadata.json", std::ios::binary);
std::string metaData;
std::getline(ifh, metaData, '\0');
ifh.close();
rj::Document doc;
doc.Parse(metaData.c_str());
record.name = doc["name"].GetString();
record.tasks = tasksFromJSON(doc["tasks"].GetObject());
// DAG State Changes
std::string line;
std::string token;
auto dagStateFile = runRoot / "states.csv";
ifh.open(dagStateFile);
while (std::getline(ifh, line)) {
std::stringstream ss{line};
std::string time;
std::string state;
std::getline(ss, time, ',');
std::getline(ss, state);
record.dagStateChanges.emplace_back(DAGUpdateRecord{
.time = stringToTimePoint(time),
.newState = magic_enum::enum_cast<RunState>(state).value()
});
}
ifh.close();
// Task states
for (const auto &task : record.tasks) {
auto taskStateFile = runRoot / task.name / "states.csv";
if (!fs::exists(taskStateFile)) {
record.runStates.push_back(RunState::QUEUED);
continue;
}
ifh.open(taskStateFile);
while (std::getline(ifh, line)) { continue; }
std::stringstream ss{line};
while (std::getline(ss, token, ',')) { continue; }
RunState taskState = magic_enum::enum_cast<RunState>(token).value();
record.runStates.emplace_back(taskState);
ifh.close();
}
return record;
}
}

View File

@@ -21,16 +21,17 @@ namespace daggy {
os_ << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl;
}
void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord &attempt) {
void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, const std::string &taskName,
const AttemptRecord &attempt) {
std::lock_guard<std::mutex> lock(guard_);
const std::string &msg = attempt.rc == 0 ? attempt.output : attempt.error;
os_ << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": "
const std::string &msg = attempt.rc == 0 ? attempt.outputLog : attempt.errorLog;
os_ << "Task Attempt (" << dagRunID << '/' << taskName << "): Ran with RC " << attempt.rc << ": "
<< msg << std::endl;
}
void OStreamLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {
void OStreamLogger::updateTaskState(DAGRunID dagRunID, const std::string &taskName, RunState state) {
std::lock_guard<std::mutex> lock(guard_);
os_ << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state)
os_ << "Task State Change (" << dagRunID << '/' << taskName << "): " << magic_enum::enum_name(state)
<< std::endl;
}

View File

@@ -14,8 +14,8 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
auto rec = ex.runCommand(cmd);
REQUIRE(rec.rc == 0);
REQUIRE(rec.output == "abc 123\n");
REQUIRE(rec.error.empty());
REQUIRE(rec.outputLog == "abc 123\n");
REQUIRE(rec.errorLog.empty());
}
SECTION("Error Run") {
@@ -24,8 +24,8 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
auto rec = ex.runCommand(cmd);
REQUIRE(rec.rc == 2);
REQUIRE(rec.error == "/usr/bin/expr: syntax error: missing argument after +\n");
REQUIRE(rec.output.empty());
REQUIRE(rec.errorLog == "/usr/bin/expr: syntax error: missing argument after +\n");
REQUIRE(rec.outputLog.empty());
}
SECTION("Large Output") {
@@ -41,8 +41,8 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
auto rec = ex.runCommand(cmd);
REQUIRE(rec.rc == 0);
REQUIRE(rec.output.size() == std::filesystem::file_size(bigFile));
REQUIRE(rec.error.empty());
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
REQUIRE(rec.errorLog.empty());
}
}
}