- More refactoring

- Fixing cmake source discovery with GLOB_RECURSE
This commit is contained in:
Ian Roddis
2021-08-09 15:24:28 -03:00
parent 30aea0818c
commit 9a3671aba4
8 changed files with 93 additions and 83 deletions

View File

@@ -2,7 +2,7 @@ project(daggy)
#ExternalProject_Add_StepDependencies(pistache_extern build)
file(GLOB SOURCES src/*.cpp src/**/*.cpp)
file(GLOB_RECURSE SOURCES src/*.cpp)
add_library(${PROJECT_NAME} STATIC ${SOURCES})
include_directories(${PISTACHE_INCLUDE_DIR})
target_include_directories(${PROJECT_NAME} PUBLIC include)

View File

@@ -7,7 +7,7 @@
#include <rapidjson/document.h>
#include "daggy/loggers/dag_run/DAGLoggerBase.hpp"
#include "daggy/loggers/dag_run/DAGLogger.hpp"
#include "daggy/executors/task/TaskExecutor.hpp"
#include "Task.hpp"
#include "Defines.hpp"
@@ -24,12 +24,12 @@ namespace daggy {
TaskID taskID,
const Task &task,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase &logger);
loggers::dag_run::DAGLogger &logger);
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase &logger,
loggers::dag_run::DAGLogger &logger,
DAG dag);
}

View File

@@ -0,0 +1,37 @@
#pragma once
#include <string>
#include "../../Task.hpp"
#include "../../AttemptRecord.hpp"
#include "../../Defines.hpp"
#include "Defines.hpp"
/*
DAGLogger represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to
be supported.
*/
namespace daggy {
namespace loggers {
namespace dag_run {
class DAGLogger {
public:
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
};
}
}
}

View File

@@ -1,73 +0,0 @@
#pragma once
#include <string>
#include "daggy/DAGRun.hpp"
/*
DAGLoggerBase represents the interface to store all the state information
for daggy to run. Abstracted in case other back-end solutions need to
be supported.
*/
namespace daggy {
namespace loggers {
namespace dag_run {
enum class RunState : uint32_t {
QUEUED = 0,
RUNNING = 1,
RETRY = 1 << 1,
ERRORED = 1 << 2,
KILLED = 1 << 3,
COMPLETED = 1 << 4
};
struct TaskUpdateRecord {
TimePoint time;
TaskID taskID;
RunState newState;
};
struct DAGUpdateRecord {
TimePoint time;
RunState newState;
};
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
std::vector<Task> tasks;
std::vector<RunState> runStates;
std::vector<std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges;
std::vector<DAGUpdateRecord> dagStateChanges;
};
struct DAGRunSummary {
DAGRunID runID;
std::string name;
RunState runState;
TimePoint startTime;
TimePoint lastUpdate;
std::unordered_map<RunState, size_t> taskStateCounts;
};
class DAGLoggerBase {
public:
// Execution
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) = 0;
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord &attempt) = 0;
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0;
// Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
};
}
}
}

View File

@@ -0,0 +1,46 @@
#pragma once
namespace daggy {
namespace loggers {
namespace dag_run {
enum class RunState : uint32_t {
QUEUED = 0,
RUNNING = 1,
RETRY = 1 << 1,
ERRORED = 1 << 2,
KILLED = 1 << 3,
COMPLETED = 1 << 4
};
struct TaskUpdateRecord {
TimePoint time;
TaskID taskID;
RunState newState;
};
struct DAGUpdateRecord {
TimePoint time;
RunState newState;
};
// Pretty heavy weight, but
struct DAGRunRecord {
std::string name;
std::vector <Task> tasks;
std::vector <RunState> runStates;
std::vector <std::vector<AttemptRecord>> taskAttempts;
std::vector <TaskUpdateRecord> taskStateChanges;
std::vector <DAGUpdateRecord> dagStateChanges;
};
struct DAGRunSummary {
DAGRunID runID;
std::string name;
RunState runState;
TimePoint startTime;
TimePoint lastUpdate;
std::unordered_map <RunState, size_t> taskStateCounts;
};
}
}
}

View File

@@ -5,7 +5,7 @@
#include <mutex>
#include <rapidjson/document.h>
#include "DAGLoggerBase.hpp"
#include "DAGLogger.hpp"
namespace fs = std::filesystem;
namespace rj = rapidjson;
@@ -34,7 +34,7 @@ namespace daggy {
* stderr
* execlog
*/
class FileSystemLogger : DAGLoggerBase {
class FileSystemLogger : public DAGLogger {
public:
FileSystemLogger(fs::path root);

View File

@@ -3,7 +3,7 @@
#include <iostream>
#include <mutex>
#include "DAGLoggerBase.hpp"
#include "DAGLogger.hpp"
namespace daggy {
namespace loggers {
@@ -12,7 +12,7 @@ namespace daggy {
* This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff
* to stdout.
*/
class StdOutLogger : public DAGLoggerBase {
class StdOutLogger : public DAGLogger {
public:
StdOutLogger();

View File

@@ -53,7 +53,7 @@ namespace daggy {
TaskID taskID,
const Task &task,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase &logger) {
loggers::dag_run::DAGLogger &logger) {
std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING);
@@ -69,7 +69,7 @@ namespace daggy {
void runDAG(DAGRunID runID,
std::vector<Task> tasks,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGLoggerBase &logger,
loggers::dag_run::DAGLogger &logger,
DAG dag) {
logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING);