- Adding StdOutLogger and adding tests for DAG execution to verify it works.
- Roughing in FileSystemLogger - Deleting Scheduler code and associated unit tests as being too complicated for maintenance. - Refactoring namespaces for loggers and executors.
This commit is contained in:
@@ -10,8 +10,10 @@ find_package (Threads REQUIRED)
|
|||||||
|
|
||||||
include(cmake/rapidjson.cmake)
|
include(cmake/rapidjson.cmake)
|
||||||
include(cmake/Pistache.cmake)
|
include(cmake/Pistache.cmake)
|
||||||
|
include(cmake/MagicEnum.cmake)
|
||||||
|
|
||||||
include_directories(${RAPIDJSON_INCLUDE_DIR})
|
include_directories(${RAPIDJSON_INCLUDE_DIR})
|
||||||
|
include_directories(${MAGIC_ENUM_INCLUDE_DIR})
|
||||||
|
|
||||||
add_subdirectory(daggy)
|
add_subdirectory(daggy)
|
||||||
add_subdirectory(tests)
|
add_subdirectory(tests)
|
||||||
|
|||||||
17
cmake/MagicEnum.cmake
Normal file
17
cmake/MagicEnum.cmake
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
include(ExternalProject)
|
||||||
|
# Download RapidJSON
|
||||||
|
ExternalProject_Add(
|
||||||
|
magic-enum
|
||||||
|
PREFIX "third_party/magic-enum"
|
||||||
|
GIT_REPOSITORY "https://github.com/Neargye/magic_enum"
|
||||||
|
GIT_TAG "v0.7.3"
|
||||||
|
TIMEOUT 10
|
||||||
|
CONFIGURE_COMMAND ""
|
||||||
|
BUILD_COMMAND ""
|
||||||
|
INSTALL_COMMAND ""
|
||||||
|
UPDATE_COMMAND ""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Magic Enums is a header-only
|
||||||
|
ExternalProject_Get_Property(magic-enum source_dir)
|
||||||
|
set(MAGIC_ENUM_INCLUDE_DIR ${source_dir}/include)
|
||||||
@@ -7,4 +7,4 @@ add_library(${PROJECT_NAME} STATIC ${SOURCES})
|
|||||||
include_directories(${PISTACHE_INCLUDE_DIR})
|
include_directories(${PISTACHE_INCLUDE_DIR})
|
||||||
target_include_directories(${PROJECT_NAME} PUBLIC include)
|
target_include_directories(${PROJECT_NAME} PUBLIC include)
|
||||||
target_link_libraries(${PROJECT_NAME} pistache pthread)
|
target_link_libraries(${PROJECT_NAME} pistache pthread)
|
||||||
add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson)
|
add_dependencies(${PROJECT_NAME} PistacheDownload rapidjson magic-enum)
|
||||||
@@ -10,7 +10,7 @@ namespace daggy {
|
|||||||
TimePoint startTime;
|
TimePoint startTime;
|
||||||
TimePoint stopTime;
|
TimePoint stopTime;
|
||||||
int rc; // RC from the task
|
int rc; // RC from the task
|
||||||
std::string metaLog; // Logs from the executor
|
std::string metaLog; // Logs from the dag_executor
|
||||||
std::string output; // stdout from command
|
std::string output; // stdout from command
|
||||||
std::string error; // stderr from command
|
std::string error; // stderr from command
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,68 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
#include "DAGRun.hpp"
|
|
||||||
|
|
||||||
/*
|
|
||||||
DAGLogger represents the interface to store all the state information
|
|
||||||
for daggy to run. Abstracted in case other back-end solutions need to
|
|
||||||
be supported.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace daggy {
|
|
||||||
using DAGDefID = int16_t;
|
|
||||||
using DAGRunID = size_t;
|
|
||||||
|
|
||||||
enum class RunState : uint32_t {
|
|
||||||
QUEUED = 0,
|
|
||||||
RUNNING = 1,
|
|
||||||
ERRORED = 1 << 1,
|
|
||||||
KILLED = 1 << 2,
|
|
||||||
COMPLETED = 1 << 3
|
|
||||||
};
|
|
||||||
|
|
||||||
struct TaskUpdateRecord {
|
|
||||||
TimePoint time;
|
|
||||||
size_t taskID;
|
|
||||||
RunState newState;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct DAGUpdateRecord {
|
|
||||||
TimePoint time;
|
|
||||||
RunState newState;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Pretty heavy weight, but
|
|
||||||
struct DAGRunRecord {
|
|
||||||
std::string name;
|
|
||||||
std::vector<Task> tasks;
|
|
||||||
std::vector<RunState> runStates;
|
|
||||||
std::vector<std::vector<AttemptRecord>> taskAttempts;
|
|
||||||
std::vector<TaskUpdateRecord> taskStateChanges;
|
|
||||||
std::vector<DAGUpdateRecord> dagStateChanges;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct DAGRunSummary {
|
|
||||||
DAGRunID runID;
|
|
||||||
std::string name;
|
|
||||||
RunState runState;
|
|
||||||
TimePoint startTime;
|
|
||||||
TimePoint lastUpdate;
|
|
||||||
std::unordered_map<RunState, size_t> taskStates;
|
|
||||||
};
|
|
||||||
|
|
||||||
class DAGLogger {
|
|
||||||
public:
|
|
||||||
// Execution
|
|
||||||
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
|
|
||||||
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
|
|
||||||
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) = 0;
|
|
||||||
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) = 0;
|
|
||||||
virtual void updateTaskState(DAGRunID dagRunId, RunState state) = 0;
|
|
||||||
|
|
||||||
// Querying
|
|
||||||
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
|
|
||||||
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -19,4 +19,5 @@ namespace daggy {
|
|||||||
// DAG Runs
|
// DAG Runs
|
||||||
using DAGDefID = int16_t;
|
using DAGDefID = int16_t;
|
||||||
using DAGRunID = size_t;
|
using DAGRunID = size_t;
|
||||||
|
using TaskID = size_t;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,30 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <chrono>
|
|
||||||
#include <future>
|
|
||||||
#include <string>
|
|
||||||
#include <thread>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "Task.hpp"
|
|
||||||
#include "AttemptRecord.hpp"
|
|
||||||
#include "ThreadPool.hpp"
|
|
||||||
|
|
||||||
/*
|
|
||||||
Executors run Tasks, returning a future with the results.
|
|
||||||
If there are many retries, logs are returned for each attempt.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace daggy {
|
|
||||||
class TaskExecutor {
|
|
||||||
public:
|
|
||||||
TaskExecutor(size_t nThreads) : threadPool(nThreads) {};
|
|
||||||
|
|
||||||
virtual const std::string getName() const = 0;
|
|
||||||
|
|
||||||
// This will block if the executor is full
|
|
||||||
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
|
|
||||||
|
|
||||||
ThreadPool threadPool;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -7,19 +7,29 @@
|
|||||||
|
|
||||||
#include <rapidjson/document.h>
|
#include <rapidjson/document.h>
|
||||||
|
|
||||||
#include "DAGLogger.hpp"
|
#include "daggy/loggers/dag_run/DAGLoggerBase.hpp"
|
||||||
#include "TaskExecutor.hpp"
|
#include "daggy/executors/task/TaskExecutor.hpp"
|
||||||
#include "Task.hpp"
|
#include "Task.hpp"
|
||||||
#include "Defines.hpp"
|
#include "Defines.hpp"
|
||||||
|
#include "DAG.hpp"
|
||||||
|
|
||||||
namespace daggy {
|
namespace daggy {
|
||||||
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
|
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
|
||||||
|
|
||||||
|
DAG buildDAGFromTasks(const std::vector<Task> & tasks);
|
||||||
|
|
||||||
// Blocking call
|
// Blocking call
|
||||||
|
std::vector<AttemptRecord>
|
||||||
|
runTask(DAGRunID runID,
|
||||||
|
TaskID taskID,
|
||||||
|
const Task & task,
|
||||||
|
executors::task::TaskExecutor & executor,
|
||||||
|
loggers::dag_run::DAGLoggerBase & logger);
|
||||||
|
|
||||||
void runDAG(DAGRunID runID,
|
void runDAG(DAGRunID runID,
|
||||||
std::vector<Task> tasks,
|
std::vector<Task> tasks,
|
||||||
TaskExecutor & executor,
|
executors::task::TaskExecutor & executor,
|
||||||
DAGLogger & logger,
|
loggers::dag_run::DAGLoggerBase & logger,
|
||||||
DAG dag);
|
DAG dag);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,59 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <filesystem>
|
|
||||||
|
|
||||||
#include <rapidjson/document.h>
|
|
||||||
#include "../DAGLogger.hpp"
|
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
|
||||||
namespace rj = rapidjson;
|
|
||||||
|
|
||||||
namespace daggy {
|
|
||||||
/*
|
|
||||||
* This logger should only be used for debug purposes. It's not really optimized for querying, and will
|
|
||||||
* use a ton of inodes to track state.
|
|
||||||
*
|
|
||||||
* On the plus side, it's trivial to look at without using the API.
|
|
||||||
*
|
|
||||||
* Filesystem logger creates the following structure:
|
|
||||||
* {root}/
|
|
||||||
* current/
|
|
||||||
* {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups
|
|
||||||
* runs/
|
|
||||||
* {runID}/
|
|
||||||
* meta.json --- Contains the DAG name, task definitions
|
|
||||||
* {taskID}/
|
|
||||||
* states --- State changes
|
|
||||||
* {attempt}/
|
|
||||||
* meta.json --- timestamps and rc
|
|
||||||
* stdout
|
|
||||||
* stderr
|
|
||||||
* execlog
|
|
||||||
*/
|
|
||||||
class FileSystemLogger : DAGLogger {
|
|
||||||
public:
|
|
||||||
FileSystemLogger(fs::path root);
|
|
||||||
|
|
||||||
// Execution
|
|
||||||
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override;
|
|
||||||
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
|
|
||||||
virtual void logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt) override;
|
|
||||||
virtual void markTaskComplete(DAGRunID dagRun, size_t taskID) override;
|
|
||||||
virtual void updateTaskState(DAGRunID dagRunId, RunState state) override;
|
|
||||||
|
|
||||||
// Querying
|
|
||||||
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
|
|
||||||
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
|
|
||||||
|
|
||||||
private:
|
|
||||||
fs::path root_;
|
|
||||||
std::atomic<DAGRunID> nextRunID_;
|
|
||||||
std::mutex lock_;
|
|
||||||
|
|
||||||
std::unordered_map<fs::path, std::mutex> runLocks;
|
|
||||||
|
|
||||||
inline const fs::path getCurrentPath() const;
|
|
||||||
inline const fs::path getRunsRoot() const;
|
|
||||||
inline const fs::path getRunRoot(DAGRunID runID) const;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -1,18 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "../TaskExecutor.hpp"
|
|
||||||
|
|
||||||
namespace daggy {
|
|
||||||
namespace executor {
|
|
||||||
class ForkingTaskExecutor : public TaskExecutor {
|
|
||||||
public:
|
|
||||||
ForkingTaskExecutor(size_t nThreads)
|
|
||||||
: TaskExecutor(nThreads)
|
|
||||||
{}
|
|
||||||
|
|
||||||
const std::string getName() const override { return "ForkingTaskExecutor"; }
|
|
||||||
|
|
||||||
AttemptRecord runCommand(std::vector<std::string> cmd) override;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
20
daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp
Normal file
20
daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "TaskExecutor.hpp"
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
namespace executors {
|
||||||
|
namespace task {
|
||||||
|
class ForkingTaskExecutor : public TaskExecutor {
|
||||||
|
public:
|
||||||
|
ForkingTaskExecutor(size_t nThreads)
|
||||||
|
: TaskExecutor(nThreads)
|
||||||
|
{}
|
||||||
|
|
||||||
|
const std::string getName() const override { return "ForkingTaskExecutor"; }
|
||||||
|
|
||||||
|
AttemptRecord runCommand(std::vector<std::string> cmd) override;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
34
daggy/include/daggy/executors/task/TaskExecutor.hpp
Normal file
34
daggy/include/daggy/executors/task/TaskExecutor.hpp
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <future>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "daggy/Task.hpp"
|
||||||
|
#include "daggy/AttemptRecord.hpp"
|
||||||
|
#include "daggy/ThreadPool.hpp"
|
||||||
|
|
||||||
|
/*
|
||||||
|
Executors run Tasks, returning a future with the results.
|
||||||
|
If there are many retries, logs are returned for each attempt.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
namespace executors {
|
||||||
|
namespace task {
|
||||||
|
class TaskExecutor {
|
||||||
|
public:
|
||||||
|
TaskExecutor(size_t nThreads) : threadPool(nThreads) {};
|
||||||
|
|
||||||
|
virtual const std::string getName() const = 0;
|
||||||
|
|
||||||
|
// This will block if the dag_executor is full
|
||||||
|
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
|
||||||
|
|
||||||
|
ThreadPool threadPool;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
69
daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp
Normal file
69
daggy/include/daggy/loggers/dag_run/DAGLoggerBase.hpp
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "daggy/DAGRun.hpp"
|
||||||
|
|
||||||
|
/*
|
||||||
|
DAGLoggerBase represents the interface to store all the state information
|
||||||
|
for daggy to run. Abstracted in case other back-end solutions need to
|
||||||
|
be supported.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
namespace loggers {
|
||||||
|
namespace dag_run {
|
||||||
|
enum class RunState : uint32_t {
|
||||||
|
QUEUED = 0,
|
||||||
|
RUNNING = 1,
|
||||||
|
RETRY = 1 << 1,
|
||||||
|
ERRORED = 1 << 2,
|
||||||
|
KILLED = 1 << 3,
|
||||||
|
COMPLETED = 1 << 4
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TaskUpdateRecord {
|
||||||
|
TimePoint time;
|
||||||
|
TaskID taskID;
|
||||||
|
RunState newState;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DAGUpdateRecord {
|
||||||
|
TimePoint time;
|
||||||
|
RunState newState;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Pretty heavy weight, but
|
||||||
|
struct DAGRunRecord {
|
||||||
|
std::string name;
|
||||||
|
std::vector<Task> tasks;
|
||||||
|
std::vector<RunState> runStates;
|
||||||
|
std::vector<std::vector<AttemptRecord>> taskAttempts;
|
||||||
|
std::vector<TaskUpdateRecord> taskStateChanges;
|
||||||
|
std::vector<DAGUpdateRecord> dagStateChanges;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DAGRunSummary {
|
||||||
|
DAGRunID runID;
|
||||||
|
std::string name;
|
||||||
|
RunState runState;
|
||||||
|
TimePoint startTime;
|
||||||
|
TimePoint lastUpdate;
|
||||||
|
std::unordered_map<RunState, size_t> taskStateCounts;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DAGLoggerBase {
|
||||||
|
public:
|
||||||
|
// Execution
|
||||||
|
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) = 0;
|
||||||
|
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) = 0;
|
||||||
|
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) = 0;
|
||||||
|
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) = 0;
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0;
|
||||||
|
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId) = 0;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
64
daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp
Normal file
64
daggy/include/daggy/loggers/dag_run/FileSystemLogger.hpp
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <filesystem>
|
||||||
|
#include <atomic>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
#include <rapidjson/document.h>
|
||||||
|
#include "DAGLoggerBase.hpp"
|
||||||
|
|
||||||
|
namespace fs = std::filesystem;
|
||||||
|
namespace rj = rapidjson;
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
namespace loggers {
|
||||||
|
namespace dag_run {
|
||||||
|
/*
|
||||||
|
* This logger should only be used for debug purposes. It's not really optimized for querying, and will
|
||||||
|
* use a ton of inodes to track state.
|
||||||
|
*
|
||||||
|
* On the plus side, it's trivial to look at without using the API.
|
||||||
|
*
|
||||||
|
* Filesystem logger creates the following structure:
|
||||||
|
* {root}/
|
||||||
|
* current/
|
||||||
|
* {DAGRunID}.{STATE} -- A file for each DAG not in a COMPLETE state for faster lookups
|
||||||
|
* runs/
|
||||||
|
* {runID}/
|
||||||
|
* meta.json --- Contains the DAG name, task definitions
|
||||||
|
* {taskID}/
|
||||||
|
* states --- State changes
|
||||||
|
* {attempt}/
|
||||||
|
* meta.json --- timestamps and rc
|
||||||
|
* stdout
|
||||||
|
* stderr
|
||||||
|
* execlog
|
||||||
|
*/
|
||||||
|
class FileSystemLogger : DAGLoggerBase {
|
||||||
|
public:
|
||||||
|
FileSystemLogger(fs::path root);
|
||||||
|
|
||||||
|
// Execution
|
||||||
|
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override;
|
||||||
|
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
|
||||||
|
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override;
|
||||||
|
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override;
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
|
||||||
|
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
|
||||||
|
|
||||||
|
private:
|
||||||
|
fs::path root_;
|
||||||
|
std::atomic<DAGRunID> nextRunID_;
|
||||||
|
std::mutex lock_;
|
||||||
|
|
||||||
|
// std::unordered_map<fs::path, std::mutex> runLocks;
|
||||||
|
|
||||||
|
inline const fs::path getCurrentPath() const;
|
||||||
|
inline const fs::path getRunsRoot() const;
|
||||||
|
inline const fs::path getRunRoot(DAGRunID runID) const;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
35
daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp
Normal file
35
daggy/include/daggy/loggers/dag_run/StdOutLogger.hpp
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
#include "DAGLoggerBase.hpp"
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
namespace loggers {
|
||||||
|
namespace dag_run {
|
||||||
|
/*
|
||||||
|
* This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff
|
||||||
|
* to stdout.
|
||||||
|
*/
|
||||||
|
class StdOutLogger : public DAGLoggerBase {
|
||||||
|
public:
|
||||||
|
StdOutLogger();
|
||||||
|
|
||||||
|
// Execution
|
||||||
|
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> & tasks) override;
|
||||||
|
virtual void updateDAGRunState(DAGRunID dagRunId, RunState state) override;
|
||||||
|
virtual void logTaskAttempt(DAGRunID, TaskID taskID, const AttemptRecord & attempt) override;
|
||||||
|
virtual void updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state) override;
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
|
||||||
|
virtual DAGRunRecord getDAGRun(DAGRunID dagRunId);
|
||||||
|
|
||||||
|
private:
|
||||||
|
DAGRunID nextRunID_;
|
||||||
|
std::mutex guard_;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -30,11 +30,49 @@ namespace daggy {
|
|||||||
return commands;
|
return commands;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DAG buildDAGFromTasks(const std::vector<Task> & tasks) {
|
||||||
|
DAG dag;
|
||||||
|
std::unordered_map<std::string, size_t> taskIDs;
|
||||||
|
|
||||||
|
// Add all the vertices
|
||||||
|
for (const auto &task : tasks) {
|
||||||
|
taskIDs[task.name] = dag.addVertex();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add edges
|
||||||
|
for (size_t i = 0; i < tasks.size(); ++i) {
|
||||||
|
for (const auto &c : tasks[i].children) {
|
||||||
|
dag.addEdge(i, taskIDs[c]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dag.reset();
|
||||||
|
return dag;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<AttemptRecord> runTask(DAGRunID runID,
|
||||||
|
TaskID taskID,
|
||||||
|
const Task & task,
|
||||||
|
executors::task::TaskExecutor & executor,
|
||||||
|
loggers::dag_run::DAGLoggerBase & logger)
|
||||||
|
{
|
||||||
|
std::vector<AttemptRecord> attempts;
|
||||||
|
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RUNNING );
|
||||||
|
|
||||||
|
while (attempts.size() < task.maxRetries + 1) {
|
||||||
|
attempts.push_back(executor.runCommand(task.command));
|
||||||
|
logger.logTaskAttempt(runID, taskID, attempts.back());
|
||||||
|
if (attempts.back().rc == 0) break;
|
||||||
|
logger.updateTaskState(runID, taskID, loggers::dag_run::RunState::RETRY );
|
||||||
|
}
|
||||||
|
return attempts;
|
||||||
|
}
|
||||||
|
|
||||||
void runDAG(DAGRunID runID,
|
void runDAG(DAGRunID runID,
|
||||||
std::vector<Task> tasks,
|
std::vector<Task> tasks,
|
||||||
TaskExecutor & executor,
|
executors::task::TaskExecutor & executor,
|
||||||
DAGLogger & logger,
|
loggers::dag_run::DAGLoggerBase & logger,
|
||||||
DAG dag) {
|
DAG dag) {
|
||||||
|
logger.updateDAGRunState(runID, loggers::dag_run::RunState::RUNNING);
|
||||||
|
|
||||||
struct TaskState {
|
struct TaskState {
|
||||||
size_t tid;
|
size_t tid;
|
||||||
@@ -51,10 +89,17 @@ namespace daggy {
|
|||||||
|
|
||||||
if (taskState.fut.valid()) {
|
if (taskState.fut.valid()) {
|
||||||
auto attemptRecords = taskState.fut.get();
|
auto attemptRecords = taskState.fut.get();
|
||||||
if (attemptRecords.back().rc == 0) {
|
if (attemptRecords.empty()) {
|
||||||
dag.completeVisit(taskState.tid);
|
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED );
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (attemptRecords.back().rc == 0) {
|
||||||
|
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::COMPLETED );
|
||||||
|
dag.completeVisit(taskState.tid);
|
||||||
|
taskState.complete = true;
|
||||||
|
} else {
|
||||||
|
logger.updateTaskState(runID, taskState.tid, loggers::dag_run::RunState::ERRORED );
|
||||||
}
|
}
|
||||||
taskState.complete = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,21 +111,11 @@ namespace daggy {
|
|||||||
auto tid = t.value();
|
auto tid = t.value();
|
||||||
TaskState tsk{
|
TaskState tsk{
|
||||||
.tid = tid,
|
.tid = tid,
|
||||||
.fut = tq->addTask(
|
.fut = tq->addTask([tid, runID, &tasks, &executor, &logger]() {return runTask(runID, tid, tasks[tid], executor, logger);}),
|
||||||
[tid, &tasks, &executor]() {
|
.complete = false
|
||||||
std::vector<AttemptRecord> attempts;
|
|
||||||
|
|
||||||
while (attempts.size() < tasks[tid].maxRetries) {
|
|
||||||
attempts.push_back(executor.runCommand(tasks[tid].command));
|
|
||||||
if (attempts.back().rc == 0) break;
|
|
||||||
}
|
|
||||||
return attempts;
|
|
||||||
})
|
|
||||||
, .complete = false
|
|
||||||
};
|
};
|
||||||
taskStates.push_back(std::move(tsk));
|
taskStates.push_back(std::move(tsk));
|
||||||
|
|
||||||
//
|
|
||||||
auto nextTask = dag.visitNext();
|
auto nextTask = dag.visitNext();
|
||||||
if (not nextTask.has_value()) break;
|
if (not nextTask.has_value()) break;
|
||||||
t.emplace(nextTask.value());
|
t.emplace(nextTask.value());
|
||||||
|
|||||||
@@ -1,28 +1,32 @@
|
|||||||
#include <daggy/dagloggers/FileSystemLogger.hpp>
|
#include <daggy/loggers/dag_run/FileSystemLogger.hpp>
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
|
using namespace daggy::loggers::dag_run;
|
||||||
|
|
||||||
namespace daggy {
|
namespace daggy {
|
||||||
inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
|
inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
|
||||||
inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
|
inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
|
||||||
inline const fs::path getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); }
|
inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); }
|
||||||
|
|
||||||
FileSystemLogger(fs::path root)
|
FileSystemLogger::FileSystemLogger(fs::path root)
|
||||||
: root_(root)
|
: root_(root)
|
||||||
, nextRunID_(0)
|
, nextRunID_(0)
|
||||||
{
|
{
|
||||||
const std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
|
const std::vector<fs::path> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
|
||||||
for (const auto & path : reqPaths) {
|
for (const auto & path : reqPaths) {
|
||||||
if (! fs::exists(path)) { fs::create_directory(path); }
|
if (! fs::exists(path)) { fs::create_directory(path); }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the next run ID
|
// Get the next run ID
|
||||||
size_t runID = 0;
|
size_t runID = 0;
|
||||||
for (auto & dir : fs::std::filesystem::directory_iterator(getRunsRoot())) {
|
for (auto & dir : fs::directory_iterator(getRunsRoot())) {
|
||||||
try {
|
try {
|
||||||
runID = std::stoull(dir.stem());
|
runID = std::stoull(dir.path().stem());
|
||||||
if (runID > nextRunID_) nextRunID_ = runID + 1;
|
if (runID > nextRunID_) nextRunID_ = runID + 1;
|
||||||
} catch {}
|
} catch (std::exception & e) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -32,14 +36,13 @@ namespace daggy {
|
|||||||
|
|
||||||
// TODO make this threadsafe
|
// TODO make this threadsafe
|
||||||
fs::path runDir = getRunRoot(runID);
|
fs::path runDir = getRunRoot(runID);
|
||||||
std::lock_guard<std::mutex> guard(runLocks[runDir]);
|
// std::lock_guard<std::mutex> guard(runLocks[runDir]);
|
||||||
|
|
||||||
// Init the directory
|
// Init the directory
|
||||||
}
|
}
|
||||||
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ }
|
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){ }
|
||||||
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ }
|
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ }
|
||||||
void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ }
|
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, TaskID taskID, RunState state){ }
|
||||||
void FileSystemLogger::updateTaskState(DAGRunID dagRunId, RunState state){ }
|
|
||||||
|
|
||||||
// Querying
|
// Querying
|
||||||
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask){ }
|
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask){ }
|
||||||
|
|||||||
39
daggy/src/dagloggers/StdOutLogger.cpp
Normal file
39
daggy/src/dagloggers/StdOutLogger.cpp
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
#include <magic_enum.hpp>
|
||||||
|
|
||||||
|
#include <daggy/loggers/dag_run/StdOutLogger.hpp>
|
||||||
|
|
||||||
|
namespace daggy {
|
||||||
|
namespace loggers {
|
||||||
|
namespace dag_run {
|
||||||
|
StdOutLogger::StdOutLogger() : nextRunID_(0) { }
|
||||||
|
|
||||||
|
// Execution
|
||||||
|
DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector<Task> & tasks) {
|
||||||
|
std::lock_guard<std::mutex> lock(guard_);
|
||||||
|
size_t runID = nextRunID_++;
|
||||||
|
std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size() << " tasks" << std::endl;
|
||||||
|
return runID;
|
||||||
|
}
|
||||||
|
|
||||||
|
void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state){
|
||||||
|
std::lock_guard<std::mutex> lock(guard_);
|
||||||
|
std::cout << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord & attempt){
|
||||||
|
std::lock_guard<std::mutex> lock(guard_);
|
||||||
|
const std::string & msg = attempt.rc == 0 ? attempt.output : attempt.error;
|
||||||
|
std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": " << msg << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void StdOutLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {
|
||||||
|
std::lock_guard<std::mutex> lock(guard_);
|
||||||
|
std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state) << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Querying
|
||||||
|
std::vector<DAGRunSummary> StdOutLogger::getDAGs(uint32_t stateMask){ return {}; }
|
||||||
|
DAGRunRecord StdOutLogger::getDAGRun(DAGRunID dagRunId) { return {}; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,14 +1,11 @@
|
|||||||
#include <daggy/executors/ForkingTaskExecutor.hpp>
|
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
|
||||||
|
|
||||||
#include <array>
|
|
||||||
#include <utility>
|
|
||||||
|
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <wait.h>
|
#include <wait.h>
|
||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
|
|
||||||
using namespace daggy::executor;
|
using namespace daggy::executors::task;
|
||||||
|
|
||||||
std::string slurp(int fd) {
|
std::string slurp(int fd) {
|
||||||
std::string result;
|
std::string result;
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
|
|
||||||
#include "daggy/executors/ForkingTaskExecutor.hpp"
|
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
||||||
|
|
||||||
#include "catch.hpp"
|
#include "catch.hpp"
|
||||||
|
|
||||||
TEST_CASE("Basic Execution", "[forking_executor]") {
|
TEST_CASE("Basic Execution", "[forking_executor]") {
|
||||||
daggy::executor::ForkingTaskExecutor ex(10);
|
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||||
|
|
||||||
SECTION("Simple Run") {
|
SECTION("Simple Run") {
|
||||||
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};
|
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};
|
||||||
|
|||||||
@@ -6,9 +6,11 @@
|
|||||||
|
|
||||||
#include "daggy/Utilities.hpp"
|
#include "daggy/Utilities.hpp"
|
||||||
#include "daggy/Serialization.hpp"
|
#include "daggy/Serialization.hpp"
|
||||||
|
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
||||||
|
#include "daggy/loggers/dag_run/StdOutLogger.hpp"
|
||||||
|
|
||||||
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||||
SECTION("Basic Parse") {
|
SECTION("Basic expansion") {
|
||||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name", "TYPE": ["a", "b", "c"]})"};
|
||||||
auto params = daggy::parametersFromJSON(testParams);
|
auto params = daggy::parametersFromJSON(testParams);
|
||||||
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"};
|
std::vector<std::string> cmd{"/usr/bin/echo", "{{DATE}}", "{{SOURCE}}", "{{TYPE}}"};
|
||||||
@@ -26,4 +28,16 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
|||||||
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
// TYPE isn't used, so it's just |DATE| * |SOURCE|
|
||||||
REQUIRE(allCommands.size() == 2);
|
REQUIRE(allCommands.size() == 2);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
||||||
|
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||||
|
daggy::loggers::dag_run::StdOutLogger logger;
|
||||||
|
|
||||||
|
std::string taskJSON = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
|
||||||
|
auto tasks = daggy::tasksFromJSON(taskJSON);
|
||||||
|
auto dag = daggy::buildDAGFromTasks(tasks);
|
||||||
|
|
||||||
|
auto runID = logger.startDAGRun("test_run", tasks);
|
||||||
|
daggy::runDAG(runID, tasks, ex, logger, dag);
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user