- Roughing out the DAGLogger interface

- Adding spec for FileSystem Logger
This commit is contained in:
Ian Roddis
2021-08-08 12:22:39 -03:00
parent 1849a2fee4
commit 5a3796a8a1
8 changed files with 160 additions and 42 deletions

10
TODO.md
View File

@@ -1,5 +1,5 @@
- Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger
- Separate concerns for DAG logger vs DAG definition storage
- Add in authorization scheme (maybe JWT?)
- Flesh out server and interface
- Add ability to define child -> parent relationships
- [ ] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger
- [ ] Separate concerns for DAG logger vs DAG definition storage
- [ ] Add in authorization scheme (maybe JWT?)
- [ ] Flesh out server and interface
- [X] Add ability to define child -> parent relationships

View File

@@ -5,10 +5,11 @@
namespace daggy {
using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::time_point<Clock>;
struct AttemptRecord {
std::chrono::time_point<Clock> startTime;
std::chrono::time_point<Clock> stopTime;
TimePoint startTime;
TimePoint stopTime;
int rc; // RC from the task
std::string metaLog; // Logs from the executor
std::string output; // stdout from command

View File

@@ -0,0 +1,68 @@
#pragma once
#include <string>
#include "DAGRun.hpp"
/*
DAGLogger represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to
be supported.
*/
namespace daggy {
using DAGDefID = int16_t;
using DAGRunID = size_t;
enum class RunState : uint32_t {
QUEUED = 0,
RUNNING = 1,
ERRORED = 1 << 1,
KILLED = 1 << 2,
COMPLETED = 1 << 3
};
struct TaskUpdateRecord {
TimePoint time;
size_t taskID;
RunState newState;
};
struct DAGUpdateRecord {
TimePoint time;
RunState newState;
};
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
std::vector<Task> tasks;
std::vector<RunState> runStates;
std::vector<std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges;
};
struct DAGRunSummary {
DAGRunID runID;
std::string name;
RunState runState;
TimePoint startTime;
TimePoint lastUpdate;
std::unordered_map<RunState, size_t> taskStates;
};
class DAGLogger {
public:
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0;
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0;
virtual void updateTaskState(DAGRunID dagRunId, RunState state) = 0;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
};
}

View File

@@ -1,33 +0,0 @@
#pragma once
#include <string>
#include "DAGRun.hpp"
/*
MetaStore represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to
be supported.
*/
namespace daggy {
using DAGDefID = int16_t;
using DAGRunID = size_t;
enum class DAGRunState : uint32_t {
QUEUED = 0,
RUNNING,
ERRORED,
KILLED,
COMPLETED
};
class DAGLogger {
public:
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0;
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, DAGRunState state) = 0;
};
}

View File

@@ -7,7 +7,7 @@
#include <rapidjson/document.h>
#include "Logger.hpp"
#include "DAGLogger.hpp"
#include "TaskExecutor.hpp"
#include "Task.hpp"
#include "ThreadPool.hpp"

View File

@@ -0,0 +1,55 @@
#pragma once
#include <filesystem>
#include <rapidjson/document.h>
#include "../DAGLogger.hpp"
namespace fs = std::filesystem;
namespace rj = rapidjson;
namespace daggy {
/*
* This logger should only be used for debug purposes. It's not really optimized for querying, and will
* use a ton of inodes to track state.
*
* On the plus side, it's trivial to look at without using the API.
*
* Filesystem logger creates the following structure:
* {root}/
* current/
* {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups
* runs/
* {runID}/
* meta.json --- Contains the DAG name, task definitions
* {taskID}/
* states --- State changes
* {attempt}/
* meta.json --- timestamps and rc
* stdout
* stderr
* execlog
*/
class FileSystemLogger : DAGLogger {
public:
FileSystemLogger(fs::path root);
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) override;
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) override;
virtual void updateTaskState(DAGRunID dagRunId, RunState state) override;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
private:
fs::path root_;
std::mutex lock_;
const fs::path getCurrentPath() const;
const fs::path getRunsRoot() const;
};
}

View File

@@ -1,6 +1,5 @@
#pragma once
#include <iostream>
#include "../TaskExecutor.hpp"
namespace daggy {

View File

@@ -0,0 +1,28 @@
#include <daggy/dagloggers/FileSystemLogger.hpp>
namespace fs = std::filesystem;
namespace daggy {
const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
FileSystemLogger(fs::path root)
: root_(root)
{
std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
for (const path : reqPaths) {
if (! fs::exists(path)) { fs::create_directory(path); }
}
}
// Execution
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){ }
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ }
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ }
void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ }
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, RunState state){ }
// Querying
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask){ }
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {}
}