- Roughing out the DAGLogger interface
- Adding spec for FileSystem Logger
This commit is contained in:
10
TODO.md
10
TODO.md
@@ -1,5 +1,5 @@
|
|||||||
- Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger
|
- [ ] Rip apart scheduler and re-implement runDag as a utility function, taking an execution pool, task list, and logger
|
||||||
- Separate concerns for DAG logger vs DAG definition storage
|
- [ ] Separate concerns for DAG logger vs DAG definition storage
|
||||||
- Add in authorization scheme (maybe JWT?)
|
- [ ] Add in authorization scheme (maybe JWT?)
|
||||||
- Flesh out server and interface
|
- [ ] Flesh out server and interface
|
||||||
- Add ability to define child -> parent relationships
|
- [X] Add ability to define child -> parent relationships
|
||||||
|
|||||||
68
daggy/include/daggy/DAGLogger.hpp
Normal file
68
daggy/include/daggy/DAGLogger.hpp
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "DAGRun.hpp"
|
||||||
|
|
||||||
|
/*
|
||||||
|
DAGLogger represents the interface to store all the state information
|
||||||
|
for daggy to run. Abstracted in case other back-end solutions need to
|
||||||
|
be supported.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
using DAGDefID = int16_t;
|
||||||
|
using DAGRunID = size_t;
|
||||||
|
|
||||||
|
enum class RunState : uint32_t {
|
||||||
|
QUEUED = 0,
|
||||||
|
RUNNING = 1,
|
||||||
|
ERRORED = 1 << 1,
|
||||||
|
KILLED = 1 << 2,
|
||||||
|
COMPLETED = 1 << 3
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TaskUpdateRecord {
|
||||||
|
TimePoint time;
|
||||||
|
size_t taskID;
|
||||||
|
RunState newState;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DAGUpdateRecord {
|
||||||
|
TimePoint time;
|
||||||
|
RunState newState;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Pretty heavy weight, but
|
||||||
|
struct DAGRunRecord {
|
||||||
|
std::string name;
|
||||||
|
std::vector<Task> tasks;
|
||||||
|
std::vector<RunState> runStates;
|
||||||
|
std::vector<std::vector<AttemptRecord>> taskAttempts;
|
||||||
|
std::vector<TaskUpdateRecord> taskStateChanges;
|
||||||
|
std::vector<DAGUpdateRecord> dagStateChanges;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DAGRunSummary {
|
||||||
|
DAGRunID runID;
|
||||||
|
std::string name;
|
||||||
|
RunState runState;
|
||||||
|
TimePoint startTime;
|
||||||
|
TimePoint lastUpdate;
|
||||||
|
std::unordered_map<RunState, size_t> taskStates;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DAGLogger {
|
||||||
|
public:
|
||||||
|
// Execution
|
||||||
|
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
|
||||||
|
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
|
||||||
|
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0;
|
||||||
|
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0;
|
||||||
|
virtual void updateTaskState(DAGRunID dagRunId, RunState state) = 0;
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
|
||||||
|
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -1,33 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
#include "DAGRun.hpp"
|
|
||||||
|
|
||||||
/*
|
|
||||||
MetaStore represents the interface to store all the state information
|
|
||||||
for daggy to run. Abstracted in case other back-end solutions need to
|
|
||||||
be supported.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace daggy {
|
|
||||||
using DAGDefID = int16_t;
|
|
||||||
using DAGRunID = size_t;
|
|
||||||
|
|
||||||
enum class DAGRunState : uint32_t {
|
|
||||||
QUEUED = 0,
|
|
||||||
RUNNING,
|
|
||||||
ERRORED,
|
|
||||||
KILLED,
|
|
||||||
COMPLETED
|
|
||||||
};
|
|
||||||
|
|
||||||
class DAGLogger {
|
|
||||||
public:
|
|
||||||
// Execution
|
|
||||||
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
|
|
||||||
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0;
|
|
||||||
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0;
|
|
||||||
virtual void updateDAGRunState(DAGRunID dagRunId, DAGRunState state) = 0;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
#include <rapidjson/document.h>
|
#include <rapidjson/document.h>
|
||||||
|
|
||||||
#include "Logger.hpp"
|
#include "DAGLogger.hpp"
|
||||||
#include "TaskExecutor.hpp"
|
#include "TaskExecutor.hpp"
|
||||||
#include "Task.hpp"
|
#include "Task.hpp"
|
||||||
#include "Defines.hpp"
|
#include "Defines.hpp"
|
||||||
|
|||||||
55
daggy/include/daggy/dagloggers/FileSystemLogger.hpp
Normal file
55
daggy/include/daggy/dagloggers/FileSystemLogger.hpp
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <filesystem>
|
||||||
|
|
||||||
|
#include <rapidjson/document.h>
|
||||||
|
#include "../DAGLogger.hpp"
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
namespace rj = rapidjson;
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
/*
|
||||||
|
* This logger should only be used for debug purposes. It's not really optimized for querying, and will
|
||||||
|
* use a ton of inodes to track state.
|
||||||
|
*
|
||||||
|
* On the plus side, it's trivial to look at without using the API.
|
||||||
|
*
|
||||||
|
* Filesystem logger creates the following structure:
|
||||||
|
* {root}/
|
||||||
|
* current/
|
||||||
|
* {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups
|
||||||
|
* runs/
|
||||||
|
* {runID}/
|
||||||
|
* meta.json --- Contains the DAG name, task definitions
|
||||||
|
* {taskID}/
|
||||||
|
* states --- State changes
|
||||||
|
* {attempt}/
|
||||||
|
* meta.json --- timestamps and rc
|
||||||
|
* stdout
|
||||||
|
* stderr
|
||||||
|
* execlog
|
||||||
|
*/
|
||||||
|
class FileSystemLogger : DAGLogger {
|
||||||
|
public:
|
||||||
|
FileSystemLogger(fs::path root);
|
||||||
|
|
||||||
|
// Execution
|
||||||
|
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override;
|
||||||
|
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
|
||||||
|
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) override;
|
||||||
|
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) override;
|
||||||
|
virtual void updateTaskState(DAGRunID dagRunId, RunState state) override;
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
|
||||||
|
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
|
||||||
|
|
||||||
|
private:
|
||||||
|
fs::path root_;
|
||||||
|
std::mutex lock_;
|
||||||
|
|
||||||
|
const fs::path getCurrentPath() const;
|
||||||
|
const fs::path getRunsRoot() const;
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include "../TaskExecutor.hpp"
|
#include "../TaskExecutor.hpp"
|
||||||
|
|
||||||
namespace daggy {
|
namespace daggy {
|
||||||
|
|||||||
28
daggy/src/dagloggers/FileSystemLogger.cpp
Normal file
28
daggy/src/dagloggers/FileSystemLogger.cpp
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
#include <daggy/dagloggers/FileSystemLogger.hpp>
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
|
||||||
|
const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
|
||||||
|
|
||||||
|
FileSystemLogger(fs::path root)
|
||||||
|
: root_(root)
|
||||||
|
{
|
||||||
|
std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
|
||||||
|
for (const path : reqPaths) {
|
||||||
|
if (! fs::exists(path)) { fs::create_directory(path); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execution
|
||||||
|
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){ }
|
||||||
|
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ }
|
||||||
|
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ }
|
||||||
|
void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ }
|
||||||
|
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, RunState state){ }
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask){ }
|
||||||
|
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunId) {}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user