Squashed commit of the following:

commit b06b11cbb5d09c6d091551e39767cd3316f88376
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 11:57:37 2021 -0300

    Fixing failing unit test

commit fe2a43a19b2a16a9aedd9e95e71e672935ecaeb1
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 11:54:01 2021 -0300

    Adding endpoints and updating documentation

commit 46e0deeefb8b06291ae5e2d6b8ec4749c5b0ea6f
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Oct 5 11:49:43 2021 -0300

    Completing unit tests and relevant fixes

commit e0569f370624844feee6aae4708bfe683f4156cf
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 4 17:30:59 2021 -0300

    Adding in gcc tsan for debug builds to help with race conditions, fixing many of those, and fixing really crummy assumption about how futures worked that will speed up task execution by a ton.

commit c748a4f592e1ada5546908be5281d04f4749539d
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Oct 4 10:14:43 2021 -0300

    Checkpointing work that seems to have resolved the race condition

commit 7a79f2943e0d50545d976a28b4b379340a90dded
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Wed Sep 29 09:27:07 2021 -0300

    Completing the rough-in for DAG killing / pausing / resuming

commit 4cf8d81d5f6fcf4a7dd83d8fca3e23f153aa8acb
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 14:53:50 2021 -0300

    Adding dagrunner unit tests, adding a resetRunning method to resume

commit 54e2c1f9f5e7d5b339d71be024e0e390c4d2bf61
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 14:45:57 2021 -0300

    Refactoring runDAG into DAGRunner

commit 682be7a11e2fae850e1bc3e207628d2335768c2b
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 14:34:43 2021 -0300

    Adding DAGRunner class to replace Utilities::runDAG, making Slurm cancellation rc agree with SIGKILL

commit 4171b3a6998791abfc71b04f8de1ae93c4f90a78
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 14:14:17 2021 -0300

    Adding unit tests for stopping jobs to slurm

commit dc0b1ff26a5d98471164132d35bb8a552cc75ff8
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 14:04:15 2021 -0300

    Adding in stop method for task executors

commit e752b44f55113be54392bcbb5c3d6f251d673cfa
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 12:32:06 2021 -0300

    Adding additional tests for loggers

commit f0773d5a84a422738fc17c9277a2b735a21a3d04
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 12:29:21 2021 -0300

    Unit tests pass

commit 993ff2810de2d53dc6a59ab53d620fecf152d4a0
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 12:24:34 2021 -0300

    Adding handling for new routes, still need to add tests for new routes

commit 676623b14e45759872a2dbcbc98f6a744e022a71
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Tue Sep 28 12:12:43 2021 -0300

    Adding handling for new routes, still need to add tests for new routes

commit b9edb6ba291eb064f4c459a308ea6912fba9fa02
Author: Ian Roddis <gitlab@ie2r.com>
Date:   Mon Sep 27 11:59:14 2021 -0300

    Defining new endpoints, fixing dag resumption code, adding PAUSED state, refactoring DAGSpec and adding deserializer
This commit is contained in:
Ian Roddis
2021-10-05 11:57:55 -03:00
parent dded91220f
commit 65ab439848
32 changed files with 1538 additions and 618 deletions

View File

@@ -1,9 +1,18 @@
cmake_minimum_required(VERSION 3.14) cmake_minimum_required(VERSION 3.14)
project(overall) project(overall)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Debug")
endif()
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_EXPORT_COMPILE_COMMANDS True) set(CMAKE_EXPORT_COMPILE_COMMANDS True)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror")
if(CMAKE_BUILD_TYPE MATCHES "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fno-omit-frame-pointer")
endif()
set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party) set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party)
@@ -19,6 +28,8 @@ include(cmake/argparse.cmake)
include(cmake/Catch2.cmake) include(cmake/Catch2.cmake)
include(cmake/daggy_features.cmake) include(cmake/daggy_features.cmake)
message("-- CMAKE Build Type is ${CMAKE_BUILD_TYPE}")
# use, i.e. don't skip the full RPATH for the build tree # use, i.e. don't skip the full RPATH for the build tree
set(CMAKE_SKIP_BUILD_RPATH FALSE) set(CMAKE_SKIP_BUILD_RPATH FALSE)

View File

@@ -28,11 +28,10 @@ graph LR
Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via Individual tasks (vertices) are run via a task executor. Daggy supports multiple executors, from local executor (via
fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html) fork), to distributed work managers like [slurm](https://slurm.schedmd.com/overview.html)
or [kubernetes](https://kubernetes.io/) (both planned). or [kubernetes](https://kubernetes.io/) (planned).
State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger), and a State is maintained via state loggers. Currently daggy supports an in-memory state manager (OStreamLogger).
filesystem logger (FileSystemLogger). Future plans include supporting [redis](https://redis.io) Future plans include supporting [redis](https://redis.io) and [postgres](https://postgresql.org).
and [postgres](https://postgresql.org).
Building Building
== ==
@@ -43,13 +42,17 @@ Building
- cmake >= 3.14 - cmake >= 3.14
- gcc >= 8 - gcc >= 8
- libslurm (if needed)
``` ```
git clone https://gitlab.com/iroddis/daggy git clone https://gitlab.com/iroddis/daggy
cd daggy cd daggy
mkdir build mkdir build
cd build cd build
cmake .. cmake [-DDAGGY_ENABLE_SLURM=ON] ..
make make
tests/tests # for unit tests
``` ```
DAG Run Definition DAG Run Definition

View File

@@ -1,5 +1,5 @@
# SLURM # SLURM
message("DAGGY_ENABLED_SLURM is set to ${DAGGY_ENABLE_SLURM}") message("-- DAGGY_ENABLED_SLURM is set to ${DAGGY_ENABLE_SLURM}")
if (DAGGY_ENABLE_SLURM) if (DAGGY_ENABLE_SLURM)
find_library(SLURM_LIB libslurm.so libslurm.a slurm REQUIRED) find_library(SLURM_LIB libslurm.so libslurm.a slurm REQUIRED)
find_path(SLURM_INCLUDE_DIR "slurm/slurm.h" REQUIRED) find_path(SLURM_INCLUDE_DIR "slurm/slurm.h" REQUIRED)

View File

@@ -0,0 +1,55 @@
#pragma once
#include <rapidjson/document.h>
#include <future>
#include <iomanip>
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>
#include "DAG.hpp"
#include "Defines.hpp"
#include "Serialization.hpp"
#include "Utilities.hpp"
#include "daggy/executors/task/TaskExecutor.hpp"
#include "daggy/loggers/dag_run/DAGRunLogger.hpp"
using namespace std::chrono_literals;
namespace daggy {
class DAGRunner
{
public:
DAGRunner(DAGRunID runID, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
const TaskParameters &taskParams);
~DAGRunner();
TaskDAG run();
void resetRunning();
void stop(bool kill = false, bool blocking = false);
private:
void collectFinished();
void queuePending();
void killRunning();
DAGRunID runID_;
executors::task::TaskExecutor &executor_;
loggers::dag_run::DAGRunLogger &logger_;
TaskDAG dag_;
const TaskParameters &taskParams_;
std::atomic<bool> running_;
std::atomic<bool> kill_;
ssize_t nRunningTasks_;
ssize_t nErroredTasks_;
std::unordered_map<std::string, std::future<AttemptRecord>> runningTasks_;
std::unordered_map<std::string, size_t> taskAttemptCounts_;
std::mutex runGuard_;
};
} // namespace daggy

View File

@@ -22,9 +22,8 @@ namespace daggy {
// DAG Runs // DAG Runs
using DAGRunID = size_t; using DAGRunID = size_t;
BETTER_ENUM(RunState, uint32_t, QUEUED = 1 << 0, RUNNING = 1 << 1, BETTER_ENUM(RunState, uint32_t, QUEUED = 1, RUNNING, RETRY, ERRORED, KILLED,
RETRY = 1 << 2, ERRORED = 1 << 3, KILLED = 1 << 4, PAUSED, COMPLETED);
COMPLETED = 1 << 5);
struct Task struct Task
{ {
@@ -50,6 +49,20 @@ namespace daggy {
using TaskSet = std::unordered_map<std::string, Task>; using TaskSet = std::unordered_map<std::string, Task>;
// All the components required to define and run a DAG
struct TaskParameters
{
ConfigValues variables;
ConfigValues jobDefaults;
};
struct DAGSpec
{
std::string tag;
TaskSet tasks;
TaskParameters taskConfig;
};
struct AttemptRecord struct AttemptRecord
{ {
TimePoint startTime; TimePoint startTime;

View File

@@ -8,6 +8,7 @@
#include <vector> #include <vector>
#include "Defines.hpp" #include "Defines.hpp"
#include "Utilities.hpp"
namespace rj = rapidjson; namespace rj = rapidjson;
@@ -36,6 +37,10 @@ namespace daggy {
std::string tasksToJSON(const TaskSet &tasks); std::string tasksToJSON(const TaskSet &tasks);
// Full specs
DAGSpec dagFromJSON(const rj::Value &spec);
DAGSpec dagFromJSON(const std::string &jsonSpec);
// Attempt Records // Attempt Records
std::string attemptRecordToJSON(const AttemptRecord &attemptRecord); std::string attemptRecordToJSON(const AttemptRecord &attemptRecord);

View File

@@ -6,10 +6,15 @@
#include <filesystem> #include <filesystem>
#include "DAGRunner.hpp"
#include "ThreadPool.hpp" #include "ThreadPool.hpp"
#include "executors/task/TaskExecutor.hpp" #include "executors/task/TaskExecutor.hpp"
#include "loggers/dag_run/DAGRunLogger.hpp" #include "loggers/dag_run/DAGRunLogger.hpp"
#define DAGGY_REST_HANDLER(func) \
void func(const Pistache::Rest::Request &request, \
Pistache::Http::ResponseWriter response);
namespace fs = std::filesystem; namespace fs = std::filesystem;
namespace daggy { namespace daggy {
@@ -18,14 +23,8 @@ namespace daggy {
public: public:
Server(const Pistache::Address &listenSpec, Server(const Pistache::Address &listenSpec,
loggers::dag_run::DAGRunLogger &logger, loggers::dag_run::DAGRunLogger &logger,
executors::task::TaskExecutor &executor, size_t nDAGRunners) executors::task::TaskExecutor &executor, size_t nDAGRunners);
: endpoint_(listenSpec) ~Server();
, desc_("Daggy API", "0.1")
, logger_(logger)
, executor_(executor)
, runnerPool_(nDAGRunners)
{
}
Server &setSSLCertificates(const fs::path &cert, const fs::path &key); Server &setSSLCertificates(const fs::path &cert, const fs::path &key);
@@ -39,21 +38,21 @@ namespace daggy {
private: private:
void createDescription(); void createDescription();
void queueDAG_(DAGRunID runID, const TaskDAG &dag,
const TaskParameters &taskParameters);
void handleRunDAG(const Pistache::Rest::Request &request, DAGGY_REST_HANDLER(handleReady); // X
Pistache::Http::ResponseWriter response); DAGGY_REST_HANDLER(handleQueryDAGs); // X
DAGGY_REST_HANDLER(handleRunDAG); // X
DAGGY_REST_HANDLER(handleValidateDAG); // X
DAGGY_REST_HANDLER(handleGetDAGRun); // X
DAGGY_REST_HANDLER(handleGetDAGRunState); // X
DAGGY_REST_HANDLER(handleSetDAGRunState); // X
DAGGY_REST_HANDLER(handleGetTask); // X
DAGGY_REST_HANDLER(handleGetTaskState); // X
DAGGY_REST_HANDLER(handleSetTaskState); // X
void handleGetDAGRuns(const Pistache::Rest::Request &request, bool handleAuth(const Pistache::Rest::Request &request);
Pistache::Http::ResponseWriter response);
void handleGetDAGRun(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response);
void handleReady(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response);
bool handleAuth(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter &response);
Pistache::Http::Endpoint endpoint_; Pistache::Http::Endpoint endpoint_;
Pistache::Rest::Description desc_; Pistache::Rest::Description desc_;
@@ -62,5 +61,8 @@ namespace daggy {
loggers::dag_run::DAGRunLogger &logger_; loggers::dag_run::DAGRunLogger &logger_;
executors::task::TaskExecutor &executor_; executors::task::TaskExecutor &executor_;
ThreadPool runnerPool_; ThreadPool runnerPool_;
std::mutex runnerGuard_;
std::unordered_map<DAGRunID, std::shared_ptr<DAGRunner>> runners_;
}; };
} // namespace daggy } // namespace daggy

View File

@@ -31,9 +31,5 @@ namespace daggy {
void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks); void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks);
TaskDAG runDAG(DAGRunID runID, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
const ConfigValues job = {});
std::ostream &operator<<(std::ostream &os, const TimePoint &tp); std::ostream &operator<<(std::ostream &os, const TimePoint &tp);
} // namespace daggy } // namespace daggy

View File

@@ -10,10 +10,8 @@ namespace daggy::executors::task {
public: public:
using Command = std::vector<std::string>; using Command = std::vector<std::string>;
explicit ForkingTaskExecutor(size_t nThreads) explicit ForkingTaskExecutor(size_t nThreads);
: tp_(nThreads) ~ForkingTaskExecutor() override;
{
}
// Validates the job to ensure that all required values are set and are of // Validates the job to ensure that all required values are set and are of
// the right type, // the right type,
@@ -23,11 +21,16 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override; const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task // Runs the task
std::future<AttemptRecord> execute(const std::string &taskName, std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override; const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
private: private:
ThreadPool tp_; ThreadPool tp_;
AttemptRecord runTask(const Task &task); std::mutex taskControlsGuard_;
AttemptRecord runTask(const Task &task, std::atomic<bool> &running);
std::unordered_map<std::string, std::atomic<bool>> taskControls_;
}; };
} // namespace daggy::executors::task } // namespace daggy::executors::task

View File

@@ -16,7 +16,10 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override; const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task // Runs the task
std::future<AttemptRecord> execute(const std::string &taskName, std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override; const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
}; };
} // namespace daggy::executors::task } // namespace daggy::executors::task

View File

@@ -19,15 +19,20 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override; const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task // Runs the task
std::future<AttemptRecord> execute(const std::string &taskName, std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override; const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
private: private:
struct Job struct Job
{ {
std::promise<AttemptRecord> prom; std::promise<AttemptRecord> prom;
std::string stdoutFile; std::string stdoutFile;
std::string stderrFile; std::string stderrFile;
DAGRunID runID;
std::string taskName;
}; };
std::mutex promiseGuard_; std::mutex promiseGuard_;

View File

@@ -27,7 +27,11 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) = 0; const ConfigValues &job, const ConfigValues &expansionValues) = 0;
// Blocking execution of a task // Blocking execution of a task
virtual std::future<AttemptRecord> execute(const std::string &taskName, virtual std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) = 0; const Task &task) = 0;
// Kill a currently executing task. This will resolve the future.
virtual bool stop(DAGRunID runID, const std::string &taskName) = 0;
}; };
} // namespace daggy::executors::task } // namespace daggy::executors::task

View File

@@ -17,8 +17,8 @@ namespace daggy::loggers::dag_run {
public: public:
virtual ~DAGRunLogger() = default; virtual ~DAGRunLogger() = default;
// Execution // Insertion / Updates
virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0; virtual DAGRunID startDAGRun(const DAGSpec &dagSpec) = 0;
virtual void addTask(DAGRunID dagRunID, const std::string &taskName, virtual void addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) = 0; const Task &task) = 0;
@@ -35,8 +35,16 @@ namespace daggy::loggers::dag_run {
RunState state) = 0; RunState state) = 0;
// Querying // Querying
virtual std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) = 0; virtual DAGSpec getDAGSpec(DAGRunID dagRunID) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0; virtual std::vector<DAGRunSummary> queryDAGRuns(const std::string &tag = "",
bool all = false) = 0;
virtual RunState getDAGRunState(DAGRunID dagRunID) = 0;
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0;
virtual Task &getTask(DAGRunID dagRunID, const std::string &taskName) = 0;
virtual RunState &getTaskState(DAGRunID dagRunID,
const std::string &taskName) = 0;
}; };
} // namespace daggy::loggers::dag_run } // namespace daggy::loggers::dag_run

View File

@@ -25,8 +25,7 @@ namespace daggy::loggers::dag_run {
// Pretty heavy weight, but // Pretty heavy weight, but
struct DAGRunRecord struct DAGRunRecord
{ {
std::string name; DAGSpec dagSpec;
TaskSet tasks;
std::unordered_map<std::string, RunState> taskRunStates; std::unordered_map<std::string, RunState> taskRunStates;
std::unordered_map<std::string, std::vector<AttemptRecord>> taskAttempts; std::unordered_map<std::string, std::vector<AttemptRecord>> taskAttempts;
std::vector<TaskUpdateRecord> taskStateChanges; std::vector<TaskUpdateRecord> taskStateChanges;
@@ -36,7 +35,7 @@ namespace daggy::loggers::dag_run {
struct DAGRunSummary struct DAGRunSummary
{ {
DAGRunID runID; DAGRunID runID;
std::string name; std::string tag;
RunState runState; RunState runState;
TimePoint startTime; TimePoint startTime;
TimePoint lastUpdate; TimePoint lastUpdate;

View File

@@ -15,9 +15,10 @@ namespace daggy::loggers::dag_run {
{ {
public: public:
explicit OStreamLogger(std::ostream &os); explicit OStreamLogger(std::ostream &os);
~OStreamLogger() override;
// Execution // Execution
DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override; DAGRunID startDAGRun(const DAGSpec &dagSpec) override;
void addTask(DAGRunID dagRunID, const std::string &taskName, void addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) override; const Task &task) override;
@@ -34,10 +35,18 @@ namespace daggy::loggers::dag_run {
RunState state) override; RunState state) override;
// Querying // Querying
std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override; DAGSpec getDAGSpec(DAGRunID dagRunID) override;
std::vector<DAGRunSummary> queryDAGRuns(const std::string &tag = "",
bool all = false) override;
RunState getDAGRunState(DAGRunID dagRunID) override;
DAGRunRecord getDAGRun(DAGRunID dagRunID) override; DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
Task &getTask(DAGRunID dagRunID, const std::string &taskName) override;
RunState &getTaskState(DAGRunID dagRunID,
const std::string &taskName) override;
private: private:
std::mutex guard_; std::mutex guard_;
std::ostream &os_; std::ostream &os_;

View File

@@ -2,6 +2,7 @@ target_sources(${PROJECT_NAME} PRIVATE
Serialization.cpp Serialization.cpp
Server.cpp Server.cpp
Utilities.cpp Utilities.cpp
DAGRunner.cpp
) )
add_subdirectory(executors) add_subdirectory(executors)

213
daggy/src/DAGRunner.cpp Normal file
View File

@@ -0,0 +1,213 @@
#include <chrono>
#include <daggy/DAGRunner.hpp>
#include <mutex>
#include <stdexcept>
namespace daggy {
DAGRunner::DAGRunner(DAGRunID runID, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
const TaskParameters &taskParams)
: runID_(runID)
, executor_(executor)
, logger_(logger)
, dag_(dag)
, taskParams_(taskParams)
, running_(true)
, kill_(true)
, nRunningTasks_(0)
, nErroredTasks_(0)
{
}
DAGRunner::~DAGRunner()
{
std::lock_guard<std::mutex> lock(runGuard_);
}
TaskDAG DAGRunner::run()
{
kill_ = false;
running_ = true;
logger_.updateDAGRunState(runID_, RunState::RUNNING);
bool allVisited;
{
std::lock_guard<std::mutex> lock(runGuard_);
allVisited = dag_.allVisited();
}
while (!allVisited) {
{
std::lock_guard<std::mutex> runLock(runGuard_);
if (!running_ and kill_) {
killRunning();
}
collectFinished();
queuePending();
if (!running_ and (nRunningTasks_ - nErroredTasks_ <= 0)) {
logger_.updateDAGRunState(runID_, RunState::KILLED);
break;
}
if (nRunningTasks_ > 0 and nErroredTasks_ == nRunningTasks_) {
logger_.updateDAGRunState(runID_, RunState::ERRORED);
break;
}
}
std::this_thread::sleep_for(250ms);
{
std::lock_guard<std::mutex> lock(runGuard_);
allVisited = dag_.allVisited();
}
}
if (dag_.allVisited()) {
logger_.updateDAGRunState(runID_, RunState::COMPLETED);
}
running_ = false;
return dag_;
}
void DAGRunner::resetRunning()
{
if (running_)
throw std::runtime_error("Unable to reset while DAG is running.");
std::lock_guard<std::mutex> lock(runGuard_);
nRunningTasks_ = 0;
nErroredTasks_ = 0;
runningTasks_.clear();
taskAttemptCounts_.clear();
dag_.resetRunning();
}
void DAGRunner::killRunning()
{
for (const auto &[taskName, _] : runningTasks_) {
executor_.stop(runID_, taskName);
}
}
void DAGRunner::queuePending()
{
if (!running_)
return;
// Check for any completed tasks
// Add all remaining tasks in a task queue to avoid dominating the thread
// pool
auto t = dag_.visitNext();
while (t.has_value()) {
// Schedule the task to run
auto &taskName = t.value().first;
auto &task = t.value().second;
taskAttemptCounts_[taskName] = 1;
logger_.updateTaskState(runID_, taskName, RunState::RUNNING);
runningTasks_.emplace(taskName,
executor_.execute(runID_, taskName, task));
++nRunningTasks_;
auto nextTask = dag_.visitNext();
if (not nextTask.has_value())
break;
t.emplace(nextTask.value());
}
}
void DAGRunner::collectFinished()
{
for (auto &[taskName, fut] : runningTasks_) {
if (fut.valid() and fut.wait_for(1ms) == std::future_status::ready) {
auto attempt = fut.get();
logger_.logTaskAttempt(runID_, taskName, attempt);
// Not a reference, since adding tasks will invalidate references
auto vert = dag_.getVertex(taskName);
auto &task = vert.data;
if (attempt.rc == 0) {
logger_.updateTaskState(runID_, taskName, RunState::COMPLETED);
if (task.isGenerator) {
// Parse the output and update the DAGs
try {
auto parsedTasks =
tasksFromJSON(attempt.outputLog, taskParams_.jobDefaults);
auto newTasks =
expandTaskSet(parsedTasks, executor_, taskParams_.variables);
updateDAGFromTasks(dag_, newTasks);
// Add in dependencies from current task to new tasks
for (const auto &[ntName, ntTask] : newTasks) {
logger_.addTask(runID_, ntName, ntTask);
task.children.insert(ntName);
}
// Efficiently add new edges from generator task
// to children
std::unordered_set<std::string> baseNames;
for (const auto &[k, v] : parsedTasks) {
baseNames.insert(v.definedName);
}
dag_.addEdgeIf(taskName, [&](const auto &v) {
return baseNames.count(v.data.definedName) > 0;
});
logger_.updateTask(runID_, taskName, task);
}
catch (std::exception &e) {
logger_.logTaskAttempt(
runID_, taskName,
AttemptRecord{
.executorLog =
std::string{"Failed to parse JSON output: "} +
e.what()});
logger_.updateTaskState(runID_, taskName, RunState::ERRORED);
++nErroredTasks_;
}
}
dag_.completeVisit(taskName);
--nRunningTasks_;
}
else {
// RC isn't 0
if (taskAttemptCounts_[taskName] <= task.maxRetries) {
logger_.updateTaskState(runID_, taskName, RunState::RETRY);
runningTasks_[taskName] = executor_.execute(runID_, taskName, task);
++taskAttemptCounts_[taskName];
}
else {
if (logger_.getTaskState(runID_, taskName) == +RunState::RUNNING or
logger_.getTaskState(runID_, taskName) == +RunState::RETRY) {
logger_.updateTaskState(runID_, taskName, RunState::ERRORED);
++nErroredTasks_;
}
else {
// Task was killed
--nRunningTasks_;
}
}
}
}
}
}
void DAGRunner::stop(bool kill, bool blocking)
{
kill_ = kill;
running_ = false;
if (blocking) {
while (true) {
{
std::lock_guard<std::mutex> lock(runGuard_);
if (nRunningTasks_ - nErroredTasks_ == 0)
break;
}
std::this_thread::sleep_for(250ms);
}
}
}
} // namespace daggy

View File

@@ -276,17 +276,56 @@ namespace daggy {
std::string timePointToString(const TimePoint &tp) std::string timePointToString(const TimePoint &tp)
{ {
std::stringstream ss; return std::to_string(tp.time_since_epoch().count());
ss << tp;
return ss.str();
} }
TimePoint stringToTimePoint(const std::string &timeString) TimePoint stringToTimePoint(const std::string &timeString)
{ {
std::tm dt{}; using namespace std::chrono;
std::stringstream ss{timeString};
ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); size_t nanos = std::stoull(timeString);
return Clock::from_time_t(mktime(&dt)); nanoseconds dur(nanos);
return TimePoint(dur);
}
DAGSpec dagFromJSON(const rj::Value &spec)
{
DAGSpec info;
if (!spec.IsObject()) {
throw std::runtime_error("Payload is not a dictionary.");
}
if (!spec.HasMember("tag")) {
throw std::runtime_error("DAG Run is missing a name.");
}
if (!spec.HasMember("tasks")) {
throw std::runtime_error("DAG Run has no tasks.");
}
info.tag = spec["tag"].GetString();
// Get parameters if there are any
if (spec.HasMember("parameters")) {
info.taskConfig.variables = configFromJSON(spec["parameters"]);
}
// Job Defaults
if (spec.HasMember("jobDefaults")) {
info.taskConfig.jobDefaults = configFromJSON(spec["jobDefaults"]);
}
// Get the tasks
info.tasks = tasksFromJSON(spec["tasks"], info.taskConfig.jobDefaults);
return info;
}
DAGSpec dagFromJSON(const std::string &jsonSpec)
{
rj::Document doc;
checkRJParse(doc.Parse(jsonSpec.c_str()), "Parsing config");
return dagFromJSON(doc);
} }
} // namespace daggy } // namespace daggy

View File

@@ -4,12 +4,18 @@
#include <daggy/Server.hpp> #include <daggy/Server.hpp>
#include <daggy/Utilities.hpp> #include <daggy/Utilities.hpp>
#include <iomanip> #include <iomanip>
#include <mutex>
#include <numeric>
#include <stdexcept>
#include <thread>
#include <utility>
#define REQ_ERROR(code, msg) \ #define REQ_RESPONSE(code, msg) \
response.send(Pistache::Http::Code::code, msg); \ std::stringstream ss; \
ss << R"({"message": )" << std::quoted(msg) << "}"; \
response.send(Pistache::Http::Code::code, ss.str()); \
return; return;
namespace rj = rapidjson;
using namespace Pistache; using namespace Pistache;
namespace daggy { namespace daggy {
@@ -25,6 +31,22 @@ namespace daggy {
createDescription(); createDescription();
} }
Server::Server(const Pistache::Address &listenSpec,
loggers::dag_run::DAGRunLogger &logger,
executors::task::TaskExecutor &executor, size_t nDAGRunners)
: endpoint_(listenSpec)
, desc_("Daggy API", "0.1")
, logger_(logger)
, executor_(executor)
, runnerPool_(nDAGRunners)
{
}
Server::~Server()
{
shutdown();
}
void Server::start() void Server::start()
{ {
router_.initFromDescription(desc_); router_.initFromDescription(desc_);
@@ -42,6 +64,7 @@ namespace daggy {
void Server::shutdown() void Server::shutdown()
{ {
endpoint_.shutdown(); endpoint_.shutdown();
runnerPool_.shutdown();
} }
uint16_t Server::getPort() const uint16_t Server::getPort() const
@@ -55,7 +78,7 @@ namespace daggy {
auto backendErrorResponse = auto backendErrorResponse =
desc_.response(Http::Code::Internal_Server_Error, desc_.response(Http::Code::Internal_Server_Error,
"An error occurred with the backend"); R"({"error": "An error occurred with the backend"})");
desc_.schemes(Rest::Scheme::Http) desc_.schemes(Rest::Scheme::Http)
.basePath("/v1") .basePath("/v1")
@@ -69,111 +92,131 @@ namespace daggy {
auto versionPath = desc_.path("/v1"); auto versionPath = desc_.path("/v1");
auto dagPath = versionPath.path("/dagrun"); /*
DAG Run Summaries
*/
auto dagRunsPath = versionPath.path("/dagruns");
// Run a DAG dagRunsPath.route(desc_.get("/"))
dagPath.route(desc_.post("/")) .bind(&Server::handleQueryDAGs, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok, "List summaries DAGs");
/*
Individual DAG Run routes
*/
auto dagRunPath = versionPath.path("/dagrun");
dagRunPath.route(desc_.post("/"))
.bind(&Server::handleRunDAG, this) .bind(&Server::handleRunDAG, this)
.produces(MIME(Application, Json), MIME(Application, Xml)) .produces(MIME(Application, Json))
.response(Http::Code::Ok, "Run a DAG"); .response(Http::Code::Ok, "Run a DAG");
// List detailed DAG run
dagPath.route(desc_.get("/:runID"))
.bind(&Server::handleGetDAGRun, this)
.produces(MIME(Application, Json), MIME(Application, Xml))
.response(Http::Code::Ok, "Details of a specific DAG run");
// List all DAG runs dagRunPath.route(desc_.post("/validate"))
dagPath.route(desc_.get("/")) .bind(&Server::handleValidateDAG, this)
.bind(&Server::handleGetDAGRuns, this) .produces(MIME(Application, Json))
.produces(MIME(Application, Json), MIME(Application, Xml)) .response(Http::Code::Ok, "Validate a DAG Run Spec");
.response(Http::Code::Ok, "The list of all known DAG Runs");
/*
Management of a specific DAG
*/
auto specificDAGRunPath = dagRunPath.path("/:runID");
specificDAGRunPath.route(desc_.get("/"))
.bind(&Server::handleGetDAGRun, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok, "Full DAG Run");
specificDAGRunPath.route(desc_.get("/state"))
.bind(&Server::handleGetDAGRunState, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok,
"Structure of a DAG and DAG and Task run states");
specificDAGRunPath.route(desc_.patch("/state/:state"))
.bind(&Server::handleSetDAGRunState, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok, "Change the state of a DAG");
/*
Task paths
*/
auto taskPath = specificDAGRunPath.path("/task/:taskName");
taskPath.route(desc_.get("/"))
.bind(&Server::handleGetTask, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok, "Details of a specific task");
/*
Task State paths
*/
auto taskStatePath = taskPath.path("/state");
taskStatePath.route(desc_.get("/"))
.bind(&Server::handleGetTaskState, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok, "Get a task state");
taskStatePath.route(desc_.patch("/:state"))
.bind(&Server::handleSetTaskState, this)
.produces(MIME(Application, Json))
.response(Http::Code::Ok, "Set a task state");
} }
/*
* {
* "name": "DAG Run Name"
* "job": {...}
* "tasks": {...}
*/
void Server::handleRunDAG(const Pistache::Rest::Request &request, void Server::handleRunDAG(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response) Pistache::Http::ResponseWriter response)
{ {
if (!handleAuth(request, response)) if (!handleAuth(request))
return; return;
rj::Document doc; auto dagSpec = dagFromJSON(request.body());
try { dagSpec.tasks =
doc.Parse(request.body().c_str()); expandTaskSet(dagSpec.tasks, executor_, dagSpec.taskConfig.variables);
}
catch (std::exception &e) {
REQ_ERROR(Bad_Request, std::string{"Invalid JSON payload: "} + e.what());
}
if (!doc.IsObject()) {
REQ_ERROR(Bad_Request, "Payload is not a dictionary.");
}
if (!doc.HasMember("name")) {
REQ_ERROR(Bad_Request, "DAG Run is missing a name.");
}
if (!doc.HasMember("tasks")) {
REQ_ERROR(Bad_Request, "DAG Run has no tasks.");
}
std::string runName = doc["name"].GetString();
// Get parameters if there are any
ConfigValues parameters;
if (doc.HasMember("parameters")) {
try {
auto parsedParams = configFromJSON(doc["parameters"].GetObject());
parameters.swap(parsedParams);
}
catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
}
// Job Defaults
ConfigValues jobDefaults;
if (doc.HasMember("jobDefaults")) {
try {
auto parsedJobDefaults = configFromJSON(doc["jobDefaults"].GetObject());
jobDefaults.swap(parsedJobDefaults);
}
catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
}
// Get the tasks
TaskSet tasks;
try {
auto taskTemplates = tasksFromJSON(doc["tasks"], jobDefaults);
auto expandedTasks = expandTaskSet(taskTemplates, executor_, parameters);
tasks.swap(expandedTasks);
}
catch (std::exception &e) {
REQ_ERROR(Bad_Request, e.what());
}
// Get a run ID // Get a run ID
auto runID = logger_.startDAGRun(runName, tasks); DAGRunID runID = logger_.startDAGRun(dagSpec);
auto dag = buildDAGFromTasks(tasks); auto dag = buildDAGFromTasks(dagSpec.tasks);
queueDAG_(runID, dag, dagSpec.taskConfig);
runnerPool_.addTask([this, parameters, runID, dag]() {
runDAG(runID, this->executor_, this->logger_, dag, parameters);
});
response.send(Pistache::Http::Code::Ok, response.send(Pistache::Http::Code::Ok,
R"({"runID": )" + std::to_string(runID) + "}"); R"({"runID": )" + std::to_string(runID) + "}");
} }
void Server::handleGetDAGRuns(const Pistache::Rest::Request &request, void Server::handleValidateDAG(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response) Pistache::Http::ResponseWriter response)
{ {
if (!handleAuth(request, response)) try {
dagFromJSON(request.body());
response.send(Pistache::Http::Code::Ok, R"({"valid": true})");
}
catch (std::exception &e) {
std::string error = e.what();
response.send(Pistache::Http::Code::Ok,
std::string{R"({"valid": true, "error": })"} + error + "}");
}
}
void Server::handleQueryDAGs(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response)
{
if (!handleAuth(request))
return; return;
auto dagRuns = logger_.getDAGs(0);
bool all = false;
std::string tag = "";
if (request.query().has("tag")) {
tag = request.query().get("tag").value();
}
if (request.hasParam(":all")) {
auto val = request.query().get("all").value();
if (val == "true" or val == "1") {
all = true;
}
}
auto dagRuns = logger_.queryDAGRuns(tag, all);
std::stringstream ss; std::stringstream ss;
ss << '['; ss << '[';
@@ -187,8 +230,8 @@ namespace daggy {
} }
ss << " {" ss << " {"
<< R"("runID": )" << run.runID << ',' << R"("name": )" << R"("runID": )" << run.runID << ',' << R"("tag": )"
<< std::quoted(run.name) << "," << std::quoted(run.tag) << ","
<< R"("startTime": )" << std::quoted(timePointToString(run.startTime)) << R"("startTime": )" << std::quoted(timePointToString(run.startTime))
<< ',' << R"("lastUpdate": )" << ',' << R"("lastUpdate": )"
<< std::quoted(timePointToString(run.lastUpdate)) << ',' << std::quoted(timePointToString(run.lastUpdate)) << ','
@@ -214,10 +257,10 @@ namespace daggy {
void Server::handleGetDAGRun(const Pistache::Rest::Request &request, void Server::handleGetDAGRun(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response) Pistache::Http::ResponseWriter response)
{ {
if (!handleAuth(request, response)) if (!handleAuth(request))
return; return;
if (!request.hasParam(":runID")) { if (!request.hasParam(":runID")) {
REQ_ERROR(Not_Found, "No runID provided in URL"); REQ_RESPONSE(Not_Found, "No runID provided in URL");
} }
auto runID = request.param(":runID").as<size_t>(); auto runID = request.param(":runID").as<size_t>();
auto run = logger_.getDAGRun(runID); auto run = logger_.getDAGRun(runID);
@@ -225,9 +268,9 @@ namespace daggy {
bool first = true; bool first = true;
std::stringstream ss; std::stringstream ss;
ss << "{" ss << "{"
<< R"("runID": )" << runID << ',' << R"("name": )" << R"("runID": )" << runID << ',' << R"("tag": )"
<< std::quoted(run.name) << ',' << R"("tasks": )" << std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )"
<< tasksToJSON(run.tasks) << ','; << tasksToJSON(run.dagSpec.tasks) << ',';
// task run states // task run states
ss << R"("taskStates": { )"; ss << R"("taskStates": { )";
@@ -295,21 +338,179 @@ namespace daggy {
response.send(Pistache::Http::Code::Ok, ss.str()); response.send(Pistache::Http::Code::Ok, ss.str());
} }
void Server::handleGetDAGRunState(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response)
{
if (!handleAuth(request))
return;
DAGRunID runID = request.param(":runID").as<DAGRunID>();
RunState state = RunState::QUEUED;
try {
state = logger_.getDAGRunState(runID);
std::stringstream ss;
ss << R"({ "runID": )" << runID << R"(, "state": )"
<< std::quoted(state._to_string()) << '}';
response.send(Pistache::Http::Code::Ok, ss.str());
}
catch (std::exception &e) {
REQ_RESPONSE(Not_Found, e.what());
}
}
void Server::queueDAG_(DAGRunID runID, const TaskDAG &dag,
const TaskParameters &taskParameters)
{
std::lock_guard<std::mutex> lock(runnerGuard_);
/*
auto it = runners_.emplace(
std::piecewise_construct, std::forward_as_tuple(runID),
std::forward_as_tuple(runID, executor_, logger_, dag,
taskParameters));
*/
auto it = runners_.emplace(
runID, std::make_shared<DAGRunner>(runID, executor_, logger_, dag,
taskParameters));
if (!it.second)
throw std::runtime_error("A DAGRun with the same ID is already running");
auto runner = it.first->second;
runnerPool_.addTask([runner, runID, this]() {
runner->run();
std::lock_guard<std::mutex> lock(this->runnerGuard_);
this->runners_.extract(runID);
});
}
void Server::handleSetDAGRunState(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response)
{
if (!handleAuth(request))
return;
// TODO handle state transition
DAGRunID runID = request.param(":runID").as<DAGRunID>();
RunState newState = RunState::_from_string(
request.param(":state").as<std::string>().c_str());
std::shared_ptr<DAGRunner> runner{nullptr};
{
std::lock_guard<std::mutex> lock(runnerGuard_);
auto it = runners_.find(runID);
if (runners_.find(runID) != runners_.end()) {
runner = it->second;
}
}
if (runner) {
switch (newState) {
case RunState::PAUSED:
case RunState::KILLED: {
runner->stop(true, true);
logger_.updateDAGRunState(runID, newState);
break;
}
default: {
REQ_RESPONSE(Method_Not_Allowed,
std::string{"Cannot transition to state "} +
newState._to_string());
}
}
}
else {
switch (newState) {
case RunState::QUEUED: {
auto dagRun = logger_.getDAGRun(runID);
auto dag =
buildDAGFromTasks(dagRun.dagSpec.tasks, dagRun.taskStateChanges);
dag.resetRunning();
queueDAG_(runID, dag, dagRun.dagSpec.taskConfig);
break;
}
default:
REQ_RESPONSE(
Method_Not_Allowed,
std::string{"DAG not running, cannot transition to state "} +
newState._to_string());
}
}
REQ_RESPONSE(Ok, "");
}
void Server::handleGetTask(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response)
{
if (!handleAuth(request))
return;
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
try {
auto task = logger_.getTask(runID, taskName);
response.send(Pistache::Http::Code::Ok, taskToJSON(task));
}
catch (std::exception &e) {
REQ_RESPONSE(Not_Found, e.what());
}
}
void Server::handleGetTaskState(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response)
{
if (!handleAuth(request))
return;
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
try {
auto state = logger_.getTaskState(runID, taskName);
std::stringstream ss;
ss << R"({ "runID": )" << runID << R"(, "taskName": )"
<< std::quoted(taskName) << R"(, "state": )"
<< std::quoted(state._to_string()) << '}';
response.send(Pistache::Http::Code::Ok, ss.str());
}
catch (std::exception &e) {
REQ_RESPONSE(Not_Found, e.what());
}
}
void Server::handleSetTaskState(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response)
{
if (!handleAuth(request))
return;
// TODO implement handling of task state
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
RunState state = RunState::_from_string(
request.param(":state").as<std::string>().c_str());
try {
logger_.updateTaskState(runID, taskName, state);
response.send(Pistache::Http::Code::Ok, "");
}
catch (std::exception &e) {
REQ_RESPONSE(Not_Found, e.what());
}
}
void Server::handleReady(const Pistache::Rest::Request &request, void Server::handleReady(const Pistache::Rest::Request &request,
Pistache::Http::ResponseWriter response) Pistache::Http::ResponseWriter response)
{ {
response.send(Pistache::Http::Code::Ok, "Ya like DAGs?"); response.send(Pistache::Http::Code::Ok, R"({ "msg": "Ya like DAGs?"})");
} }
/* /*
* handleAuth will check any auth methods and handle any responses in the case * handleAuth will check any auth methods and handle any responses in the
* of failed auth. If it returns false, callers should cease handling the * case of failed auth. If it returns false, callers should cease handling
* response * the response
*/ */
bool Server::handleAuth(const Pistache::Rest::Request &request, bool Server::handleAuth(const Pistache::Rest::Request &request)
Pistache::Http::ResponseWriter &response)
{ {
(void)response;
return true; return true;
} }
} // namespace daggy } // namespace daggy

View File

@@ -92,8 +92,9 @@ namespace daggy {
} }
// Add edges // Add edges
for (const auto &[name, task] : tasks) { for (const auto &[name, t] : tasks) {
dag.addEdgeIf(name, [&](const auto &v) { const auto &task = t;
dag.addEdgeIf(name, [&task](const auto &v) {
return task.children.count(v.data.definedName) > 0; return task.children.count(v.data.definedName) > 0;
}); });
} }
@@ -115,10 +116,10 @@ namespace daggy {
switch (update.newState) { switch (update.newState) {
case RunState::RUNNING: case RunState::RUNNING:
case RunState::RETRY: case RunState::RETRY:
case RunState::PAUSED:
case RunState::ERRORED: case RunState::ERRORED:
case RunState::KILLED: case RunState::KILLED:
dag.setVertexState(update.taskName, RunState::RUNNING); dag.setVertexState(update.taskName, RunState::RUNNING);
dag.setVertexState(update.taskName, RunState::COMPLETED);
break; break;
case RunState::COMPLETED: case RunState::COMPLETED:
case RunState::QUEUED: case RunState::QUEUED:
@@ -129,120 +130,9 @@ namespace daggy {
return dag; return dag;
} }
TaskDAG runDAG(DAGRunID runID, executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger, TaskDAG dag,
const ConfigValues parameters)
{
logger.updateDAGRunState(runID, RunState::RUNNING);
std::unordered_map<std::string, std::future<AttemptRecord>> runningTasks;
std::unordered_map<std::string, size_t> taskAttemptCounts;
size_t running = 0;
size_t errored = 0;
while (!dag.allVisited()) {
// Check for any completed tasks
for (auto &[taskName, fut] : runningTasks) {
if (fut.valid()) {
auto attempt = fut.get();
logger.logTaskAttempt(runID, taskName, attempt);
// Not a reference, since adding tasks will invalidate references
auto vert = dag.getVertex(taskName);
auto &task = vert.data;
if (attempt.rc == 0) {
logger.updateTaskState(runID, taskName, RunState::COMPLETED);
if (task.isGenerator) {
// Parse the output and update the DAGs
try {
auto parsedTasks = tasksFromJSON(attempt.outputLog);
auto newTasks =
expandTaskSet(parsedTasks, executor, parameters);
updateDAGFromTasks(dag, newTasks);
// Add in dependencies from current task to new tasks
for (const auto &[ntName, ntTask] : newTasks) {
logger.addTask(runID, ntName, ntTask);
task.children.insert(ntName);
}
// Efficiently add new edges from generator task
// to children
std::unordered_set<std::string> baseNames;
for (const auto &[k, v] : parsedTasks) {
baseNames.insert(v.definedName);
}
dag.addEdgeIf(taskName, [&](const auto &v) {
return baseNames.count(v.data.definedName) > 0;
});
logger.updateTask(runID, taskName, task);
}
catch (std::exception &e) {
logger.logTaskAttempt(
runID, taskName,
AttemptRecord{
.executorLog =
std::string{"Failed to parse JSON output: "} +
e.what()});
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
}
dag.completeVisit(taskName);
--running;
}
else {
// RC isn't 0
if (taskAttemptCounts[taskName] <= task.maxRetries) {
logger.updateTaskState(runID, taskName, RunState::RETRY);
runningTasks[taskName] = executor.execute(taskName, task);
++taskAttemptCounts[taskName];
}
else {
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
}
}
}
// Add all remaining tasks in a task queue to avoid dominating the thread
// pool
auto t = dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
auto &taskName = t.value().first;
auto &task = t.value().second;
taskAttemptCounts[taskName] = 1;
logger.updateTaskState(runID, taskName, RunState::RUNNING);
runningTasks.emplace(taskName, executor.execute(taskName, task));
++running;
auto nextTask = dag.visitNext();
if (not nextTask.has_value())
break;
t.emplace(nextTask.value());
}
if (running > 0 and errored == running) {
logger.updateDAGRunState(runID, RunState::ERRORED);
break;
}
std::this_thread::sleep_for(250ms);
}
if (dag.allVisited()) {
logger.updateDAGRunState(runID, RunState::COMPLETED);
}
return dag;
}
std::ostream &operator<<(std::ostream &os, const TimePoint &tp) std::ostream &operator<<(std::ostream &os, const TimePoint &tp)
{ {
auto t_c = Clock::to_time_t(tp); os << tp.time_since_epoch().count() << std::endl;
os << std::put_time(std::localtime(&t_c), "%Y-%m-%d %H:%M:%S %Z");
return os; return os;
} }
} // namespace daggy } // namespace daggy

View File

@@ -36,13 +36,45 @@ std::string slurp(int fd)
return result; return result;
} }
std::future<daggy::AttemptRecord> ForkingTaskExecutor::execute( ForkingTaskExecutor::ForkingTaskExecutor(size_t nThreads)
const std::string &taskName, const Task &task) : tp_(nThreads)
{ {
return tp_.addTask([this, task]() { return this->runTask(task); });
} }
daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task) ForkingTaskExecutor::~ForkingTaskExecutor()
{
std::lock_guard<std::mutex> lock(taskControlsGuard_);
taskControls_.clear();
}
bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
std::string key = std::to_string(runID) + "_" + taskName;
std::lock_guard<std::mutex> lock(taskControlsGuard_);
auto it = taskControls_.find(key);
if (it == taskControls_.end())
return true;
it->second = false;
return true;
}
std::future<daggy::AttemptRecord> ForkingTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
{
std::string key = std::to_string(runID) + "_" + taskName;
std::lock_guard<std::mutex> lock(taskControlsGuard_);
auto [it, ins] = taskControls_.emplace(key, true);
auto &running = it->second;
return tp_.addTask([this, task, &running, key]() {
auto ret = this->runTask(task, running);
std::lock_guard<std::mutex> lock(this->taskControlsGuard_);
this->taskControls_.extract(key);
return ret;
});
}
daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task,
std::atomic<bool> &running)
{ {
AttemptRecord rec; AttemptRecord rec;
@@ -81,23 +113,41 @@ daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task)
exit(-1); exit(-1);
} }
std::atomic<bool> running = true; std::atomic<bool> reading = true;
std::thread stdoutReader([&]() { std::thread stdoutReader([&]() {
while (running) while (reading)
rec.outputLog.append(slurp(stdoutPipe[0])); rec.outputLog.append(slurp(stdoutPipe[0]));
}); });
std::thread stderrReader([&]() { std::thread stderrReader([&]() {
while (running) while (reading)
rec.errorLog.append(slurp(stderrPipe[0])); rec.errorLog.append(slurp(stderrPipe[0]));
}); });
int rc = 0; siginfo_t childInfo;
waitpid(child, &rc, 0); while (running) {
running = false; childInfo.si_pid = 0;
waitid(P_PID, child, &childInfo, WEXITED | WNOHANG);
if (childInfo.si_pid > 0) {
break;
}
std::this_thread::sleep_for(250ms);
}
if (!running) {
rec.executorLog = "Killed";
// Send the kills until pid is dead
while (kill(child, SIGKILL) != -1) {
// Need to collect the child to avoid a zombie process
waitid(P_PID, child, &childInfo, WEXITED | WNOHANG);
std::this_thread::sleep_for(50ms);
}
}
reading = false;
rec.stopTime = Clock::now(); rec.stopTime = Clock::now();
if (WIFEXITED(rc)) { if (childInfo.si_pid > 0) {
rec.rc = WEXITSTATUS(rc); rec.rc = childInfo.si_status;
} }
else { else {
rec.rc = -1; rec.rc = -1;

View File

@@ -3,7 +3,7 @@
namespace daggy::executors::task { namespace daggy::executors::task {
std::future<daggy::AttemptRecord> NoopTaskExecutor::execute( std::future<daggy::AttemptRecord> NoopTaskExecutor::execute(
const std::string &taskName, const Task &task) DAGRunID runID, const std::string &taskName, const Task &task)
{ {
std::promise<daggy::AttemptRecord> promise; std::promise<daggy::AttemptRecord> promise;
auto ts = Clock::now(); auto ts = Clock::now();
@@ -42,4 +42,10 @@ namespace daggy::executors::task {
return newValues; return newValues;
} }
bool NoopTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
return true;
}
} // namespace daggy::executors::task } // namespace daggy::executors::task

View File

@@ -1,4 +1,5 @@
#include <iterator> #include <iterator>
#include <mutex>
#include <stdexcept> #include <stdexcept>
#ifdef DAGGY_ENABLE_SLURM #ifdef DAGGY_ENABLE_SLURM
#include <slurm/slurm.h> #include <slurm/slurm.h>
@@ -6,6 +7,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <csignal>
#include <cstdlib> #include <cstdlib>
#include <daggy/Utilities.hpp> #include <daggy/Utilities.hpp>
#include <daggy/executors/task/SlurmTaskExecutor.hpp> #include <daggy/executors/task/SlurmTaskExecutor.hpp>
@@ -115,7 +117,7 @@ namespace daggy::executors::task {
} }
std::future<AttemptRecord> SlurmTaskExecutor::execute( std::future<AttemptRecord> SlurmTaskExecutor::execute(
const std::string &taskName, const Task &task) DAGRunID runID, const std::string &taskName, const Task &task)
{ {
std::stringstream executorLog; std::stringstream executorLog;
@@ -191,13 +193,39 @@ namespace daggy::executors::task {
slurm_free_submit_response_response_msg(resp_msg); slurm_free_submit_response_response_msg(resp_msg);
std::lock_guard<std::mutex> lock(promiseGuard_); std::lock_guard<std::mutex> lock(promiseGuard_);
Job newJob{.prom{}, .stdoutFile = stdoutFile, .stderrFile = stderrFile}; Job newJob{.prom{},
.stdoutFile = stdoutFile,
.stderrFile = stderrFile,
.runID = runID,
.taskName = taskName};
auto fut = newJob.prom.get_future(); auto fut = newJob.prom.get_future();
runningJobs_.emplace(jobID, std::move(newJob)); runningJobs_.emplace(jobID, std::move(newJob));
return fut; return fut;
} }
bool SlurmTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
// Hopefully this isn't a common thing, so just scrap the current jobs and
// kill them
size_t jobID = 0;
{
std::lock_guard<std::mutex> lock(promiseGuard_);
for (const auto &[k, v] : runningJobs_) {
if (v.runID == runID and v.taskName == taskName) {
jobID = k;
break;
}
}
if (jobID == 0)
return true;
}
// Send the kill message to slurm
slurm_kill_job(jobID, SIGKILL, KILL_HURRY);
return true;
}
void SlurmTaskExecutor::monitor() void SlurmTaskExecutor::monitor()
{ {
std::unordered_set<size_t> resolvedJobs; std::unordered_set<size_t> resolvedJobs;
@@ -225,32 +253,40 @@ namespace daggy::executors::task {
// Job has finished // Job has finished
case JOB_COMPLETE: /* completed execution successfully */ case JOB_COMPLETE: /* completed execution successfully */
case JOB_FAILED: /* completed execution unsuccessfully */ case JOB_FAILED: /* completed execution unsuccessfully */
record.rc = jobInfo.exit_code;
record.executorLog = "Script errored.\n"; record.executorLog = "Script errored.\n";
break; break;
case JOB_CANCELLED: /* cancelled by user */ case JOB_CANCELLED: /* cancelled by user */
record.rc = 9; // matches SIGKILL
record.executorLog = "Job cancelled by user.\n"; record.executorLog = "Job cancelled by user.\n";
break; break;
case JOB_TIMEOUT: /* terminated on reaching time limit */ case JOB_TIMEOUT: /* terminated on reaching time limit */
record.rc = jobInfo.exit_code;
record.executorLog = "Job exceeded time limit.\n"; record.executorLog = "Job exceeded time limit.\n";
break; break;
case JOB_NODE_FAIL: /* terminated on node failure */ case JOB_NODE_FAIL: /* terminated on node failure */
record.rc = jobInfo.exit_code;
record.executorLog = "Node failed during execution\n"; record.executorLog = "Node failed during execution\n";
break; break;
case JOB_PREEMPTED: /* terminated due to preemption */ case JOB_PREEMPTED: /* terminated due to preemption */
record.rc = jobInfo.exit_code;
record.executorLog = "Job terminated due to pre-emption.\n"; record.executorLog = "Job terminated due to pre-emption.\n";
break; break;
case JOB_BOOT_FAIL: /* terminated due to node boot failure */ case JOB_BOOT_FAIL: /* terminated due to node boot failure */
record.rc = jobInfo.exit_code;
record.executorLog = record.executorLog =
"Job failed to run due to failure of compute node to boot.\n"; "Job failed to run due to failure of compute node to "
"boot.\n";
break; break;
case JOB_DEADLINE: /* terminated on deadline */ case JOB_DEADLINE: /* terminated on deadline */
record.rc = jobInfo.exit_code;
record.executorLog = "Job terminated due to deadline.\n"; record.executorLog = "Job terminated due to deadline.\n";
break; break;
case JOB_OOM: /* experienced out of memory error */ case JOB_OOM: /* experienced out of memory error */
record.rc = jobInfo.exit_code;
record.executorLog = "Job terminated due to out-of-memory.\n"; record.executorLog = "Job terminated due to out-of-memory.\n";
break; break;
} }
record.rc = jobInfo.exit_code;
slurm_free_job_info_msg(jobStatus); slurm_free_job_info_msg(jobStatus);
readAndClean(job.stdoutFile, record.outputLog); readAndClean(job.stdoutFile, record.outputLog);
@@ -265,7 +301,7 @@ namespace daggy::executors::task {
} }
} }
std::this_thread::sleep_for(std::chrono::milliseconds(250)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
} }
} // namespace daggy::executors::task } // namespace daggy::executors::task

View File

@@ -11,20 +11,26 @@ namespace daggy { namespace loggers { namespace dag_run {
{ {
} }
OStreamLogger::~OStreamLogger()
{
std::lock_guard<std::mutex> lock(guard_);
dagRuns_.clear();
}
// Execution // Execution
DAGRunID OStreamLogger::startDAGRun(std::string name, const TaskSet &tasks) DAGRunID OStreamLogger::startDAGRun(const DAGSpec &dagSpec)
{ {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
size_t runID = dagRuns_.size(); size_t runID = dagRuns_.size();
dagRuns_.push_back({.name = name, .tasks = tasks}); dagRuns_.emplace_back(DAGRunRecord{.dagSpec = dagSpec});
for (const auto &[name, _] : tasks) { for (const auto &[name, _] : dagSpec.tasks) {
_updateTaskState(runID, name, RunState::QUEUED); _updateTaskState(runID, name, RunState::QUEUED);
} }
_updateDAGRunState(runID, RunState::QUEUED); _updateDAGRunState(runID, RunState::QUEUED);
os_ << "Starting new DAGRun named " << name << " with ID " << runID os_ << "Starting new DAGRun tagged " << dagSpec.tag << " with ID " << runID
<< " and " << tasks.size() << " tasks" << std::endl; << " and " << dagSpec.tasks.size() << " tasks" << std::endl;
for (const auto &[name, task] : tasks) { for (const auto &[name, task] : dagSpec.tasks) {
os_ << "TASK (" << name << "): " << configToJSON(task.job); os_ << "TASK (" << name << "): " << configToJSON(task.job);
os_ << std::endl; os_ << std::endl;
} }
@@ -35,8 +41,8 @@ namespace daggy { namespace loggers { namespace dag_run {
const Task &task) const Task &task)
{ {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID]; auto &dagRun = dagRuns_[dagRunID];
dagRun.tasks[taskName] = task; dagRun.dagSpec.tasks[taskName] = task;
_updateTaskState(dagRunID, taskName, RunState::QUEUED); _updateTaskState(dagRunID, taskName, RunState::QUEUED);
} }
@@ -44,8 +50,8 @@ namespace daggy { namespace loggers { namespace dag_run {
const Task &task) const Task &task)
{ {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
auto &dagRun = dagRuns_[dagRunID]; auto &dagRun = dagRuns_[dagRunID];
dagRun.tasks[taskName] = task; dagRun.dagSpec.tasks[taskName] = task;
} }
void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state)
@@ -101,15 +107,29 @@ namespace daggy { namespace loggers { namespace dag_run {
} }
// Querying // Querying
std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) DAGSpec OStreamLogger::getDAGSpec(DAGRunID dagRunID)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagSpec;
};
std::vector<DAGRunSummary> OStreamLogger::queryDAGRuns(const std::string &tag,
bool all)
{ {
std::vector<DAGRunSummary> summaries; std::vector<DAGRunSummary> summaries;
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
size_t i = 0; size_t i = 0;
for (const auto &run : dagRuns_) { for (const auto &run : dagRuns_) {
if ((!all) &&
(run.dagStateChanges.back().newState == +RunState::COMPLETED)) {
continue;
}
if (!tag.empty() and tag != run.dagSpec.tag)
continue;
DAGRunSummary summary{ DAGRunSummary summary{
.runID = i, .runID = i,
.name = run.name, .tag = run.dagSpec.tag,
.runState = run.dagStateChanges.back().newState, .runState = run.dagStateChanges.back().newState,
.startTime = run.dagStateChanges.front().time, .startTime = run.dagStateChanges.front().time,
.lastUpdate = std::max<TimePoint>(run.taskStateChanges.back().time, .lastUpdate = std::max<TimePoint>(run.taskStateChanges.back().time,
@@ -126,10 +146,26 @@ namespace daggy { namespace loggers { namespace dag_run {
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID) DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunID)
{ {
if (dagRunID >= dagRuns_.size()) {
throw std::runtime_error("No such DAGRun ID");
}
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
return dagRuns_[dagRunID]; return dagRuns_.at(dagRunID);
} }
RunState OStreamLogger::getDAGRunState(DAGRunID dagRunID)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagStateChanges.back().newState;
}
Task &OStreamLogger::getTask(DAGRunID dagRunID, const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).dagSpec.tasks.at(taskName);
}
RunState &OStreamLogger::getTaskState(DAGRunID dagRunID,
const std::string &taskName)
{
std::lock_guard<std::mutex> lock(guard_);
return dagRuns_.at(dagRunID).taskRunStates.at(taskName);
}
}}} // namespace daggy::loggers::dag_run }}} // namespace daggy::loggers::dag_run

30
endpoints.otl Normal file
View File

@@ -0,0 +1,30 @@
ready [handleReady]
v1
dagruns
:GET - Summary of running DAGs [handleListDAGs]
{tag}
dagrun
:POST - submit a dag run [handleRunDAG]
validate
:POST - Ensure a submitted DAG run is valid [handleValidateDAG
{runID}
:GET - Full DAG run information [handleGetDAG]
summary
:GET - RunState of DAG, and task RunState counts
state
:PATCH - Change the state of a DAG (paused, killed)
:GET - Summary of dag run (structure + runstates)
tasks
{taskName}
:GET - Full task definition and output
state
:GET -- Get the task state
:PATCH -- Set the task state

View File

@@ -2,6 +2,7 @@ project(tests)
add_executable(tests main.cpp add_executable(tests main.cpp
# unit tests # unit tests
unit_dag.cpp unit_dag.cpp
unit_dagrunner.cpp
unit_dagrun_loggers.cpp unit_dagrun_loggers.cpp
unit_executor_forkingexecutor.cpp unit_executor_forkingexecutor.cpp
unit_executor_slurmexecutor.cpp unit_executor_slurmexecutor.cpp
@@ -14,4 +15,4 @@ add_executable(tests main.cpp
# Performance checks # Performance checks
perf_dag.cpp perf_dag.cpp
) )
target_link_libraries(tests libdaggy stdc++fs Catch2::Catch2) target_link_libraries(tests libdaggy stdc++fs Catch2::Catch2 curl)

View File

@@ -2,11 +2,10 @@
#include <filesystem> #include <filesystem>
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include <sstream>
#include "daggy/loggers/dag_run/OStreamLogger.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
using namespace daggy; using namespace daggy;
using namespace daggy::loggers::dag_run; using namespace daggy::loggers::dag_run;
@@ -20,28 +19,68 @@ const TaskSet SAMPLE_TASKS{
{"work_c", {"work_c",
Task{.job{{"command", std::vector<std::string>{"/bin/echo", "c"}}}}}}; Task{.job{{"command", std::vector<std::string>{"/bin/echo", "c"}}}}}};
inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name, inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &tag,
const TaskSet &tasks) const TaskSet &tasks)
{ {
auto runID = logger.startDAGRun(name, tasks); auto runID = logger.startDAGRun(DAGSpec{.tag = tag, .tasks = tasks});
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.tasks == tasks); // Verify run shows up in the list
{
auto runs = logger.queryDAGRuns();
REQUIRE(!runs.empty());
auto it = std::find_if(runs.begin(), runs.end(),
[runID](const auto &r) { return r.runID == runID; });
REQUIRE(it != runs.end());
REQUIRE(it->tag == tag);
REQUIRE(it->runState == +RunState::QUEUED);
}
REQUIRE(dagRun.taskRunStates.size() == tasks.size()); // Verify states
auto nonQueuedTask = {
std::find_if(dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(), REQUIRE(logger.getDAGRunState(runID) == +RunState::QUEUED);
[](const auto &a) { return a.second != +RunState::QUEUED; }); for (const auto &[k, _] : tasks) {
REQUIRE(nonQueuedTask == dagRun.taskRunStates.end()); REQUIRE(logger.getTaskState(runID, k) == +RunState::QUEUED);
}
}
// Verify integrity of run
{
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.dagSpec.tag == tag);
REQUIRE(dagRun.dagSpec.tasks == tasks);
REQUIRE(dagRun.taskRunStates.size() == tasks.size());
auto nonQueuedTask = std::find_if(
dagRun.taskRunStates.begin(), dagRun.taskRunStates.end(),
[](const auto &a) { return a.second != +RunState::QUEUED; });
REQUIRE(nonQueuedTask == dagRun.taskRunStates.end());
REQUIRE(dagRun.dagStateChanges.size() == 1);
REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::QUEUED);
}
// Update DAG state and ensure that it's updated;
{
logger.updateDAGRunState(runID, RunState::RUNNING);
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::RUNNING);
}
// Update a task state
{
for (const auto &[k, v] : tasks)
logger.updateTaskState(runID, k, RunState::RUNNING);
auto dagRun = logger.getDAGRun(runID);
for (const auto &[k, v] : tasks) {
REQUIRE(dagRun.taskRunStates.at(k) == +RunState::RUNNING);
}
}
REQUIRE(dagRun.dagStateChanges.size() == 1);
REQUIRE(dagRun.dagStateChanges.back().newState == +RunState::QUEUED);
return runID; return runID;
} }
TEST_CASE("ostream_logger", "[ostream_logger]") TEST_CASE("ostream_logger", "[ostream_logger]")
{ {
// cleanup();
std::stringstream ss; std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss); daggy::loggers::dag_run::OStreamLogger logger(ss);
@@ -49,6 +88,4 @@ TEST_CASE("ostream_logger", "[ostream_logger]")
{ {
testDAGRunInit(logger, "init_test", SAMPLE_TASKS); testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
} }
// cleanup();
} }

256
tests/unit_dagrunner.cpp Normal file
View File

@@ -0,0 +1,256 @@
#include <catch2/catch.hpp>
#include <filesystem>
#include <fstream>
#include "daggy/DAGRunner.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/executors/task/NoopTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
TEST_CASE("dagrunner", "[dagrunner_order_preservation]")
{
daggy::executors::task::NoopTaskExecutor ex;
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::TimePoint globalStartTime = daggy::Clock::now();
daggy::DAGSpec dagSpec;
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"};
dagSpec.taskConfig.variables = daggy::configFromJSON(testParams);
std::string taskJSON = R"({
"A": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "B","D" ]},
"B": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "C","D","E" ]},
"C": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "D"]},
"D": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "E"]},
"E": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}}
})";
dagSpec.tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex,
dagSpec.taskConfig.variables);
REQUIRE(dagSpec.tasks.size() == 20);
auto dag = daggy::buildDAGFromTasks(dagSpec.tasks);
auto runID = logger.startDAGRun(dagSpec);
daggy::DAGRunner runner(runID, ex, logger, dag, dagSpec.taskConfig);
auto endDAG = runner.run();
REQUIRE(endDAG.allVisited());
// Ensure the run order
auto rec = logger.getDAGRun(runID);
daggy::TimePoint globalStopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 5> minTimes;
minTimes.fill(globalStartTime);
std::array<daggy::TimePoint, 5> maxTimes;
maxTimes.fill(globalStopTime);
for (const auto &[k, v] : rec.taskAttempts) {
size_t idx = k[0] - 65;
auto &startTime = minTimes[idx];
auto &stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 5; ++i) {
for (size_t j = i + 1; j < 4; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
}
TEST_CASE("DAGRunner simple execution", "[dagrunner_simple]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::DAGSpec dagSpec;
SECTION("Simple execution")
{
std::string prefix = (fs::current_path() / "asdlk").string();
std::unordered_map<std::string, std::string> files{
{"A", prefix + "_A"}, {"B", prefix + "_B"}, {"C", prefix + "_C"}};
std::string taskJSON =
R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + files.at("A") +
R"("]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" +
files.at("B") +
R"("]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" +
files.at("C") + R"("]}}})";
dagSpec.tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(dagSpec.tasks);
auto runID = logger.startDAGRun(dagSpec);
daggy::DAGRunner runner(runID, ex, logger, dag, dagSpec.taskConfig);
auto endDAG = runner.run();
REQUIRE(endDAG.allVisited());
for (const auto &[_, file] : files) {
REQUIRE(fs::exists(file));
fs::remove(file);
}
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
for (const auto &[_, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.front().rc == 0);
}
}
}
TEST_CASE("DAG Runner Restart old DAG", "[dagrunner_restart]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::DAGSpec dagSpec;
SECTION("Recovery from Error")
{
auto cleanup = []() {
// Cleanup
std::vector<fs::path> paths{"rec_error_A", "noexist"};
for (const auto &pth : paths) {
if (fs::exists(pth))
fs::remove_all(pth);
}
};
cleanup();
std::string goodPrefix = "rec_error_";
std::string badPrefix = "noexist/rec_error_";
std::string taskJSON =
R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + goodPrefix +
R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" +
badPrefix +
R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" +
badPrefix + R"(C"]}}})";
dagSpec.tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(dagSpec.tasks);
auto runID = logger.startDAGRun(dagSpec);
daggy::DAGRunner runner(runID, ex, logger, dag, dagSpec.taskConfig);
auto tryDAG = runner.run();
REQUIRE(!tryDAG.allVisited());
// Create the missing dir, then continue to run the DAG
fs::create_directory("noexist");
runner.resetRunning();
auto endDAG = runner.run();
REQUIRE(endDAG.allVisited());
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
REQUIRE(record.taskAttempts["A_0"].size() == 1); // A ran fine
REQUIRE(record.taskAttempts["B_0"].size() ==
2); // B errored and had to be retried
REQUIRE(record.taskAttempts["C_0"].size() ==
1); // C wasn't run because B errored
cleanup();
}
}
TEST_CASE("DAG Runner Generator Tasks", "[dagrunner_generator]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::DAGSpec dagSpec;
SECTION("Generator tasks")
{
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
dagSpec.taskConfig.variables = daggy::configFromJSON(testParams);
std::string generatorOutput =
R"({"B": {"job": {"command": ["/usr/bin/echo", "-e", "{{DATE}}"]}, "children": ["C"]}})";
fs::path ofn = fs::current_path() / "generator_test_output.json";
std::ofstream ofh{ofn};
ofh << generatorOutput << std::endl;
ofh.close();
daggy::TimePoint globalStartTime = daggy::Clock::now();
std::stringstream jsonTasks;
jsonTasks
<< R"({ "A": { "job": {"command": [ "/usr/bin/cat", )"
<< std::quoted(ofn.string())
<< R"(]}, "children": ["C"], "isGenerator": true},)"
<< R"("C": { "job": {"command": [ "/usr/bin/echo", "hello!"]} } })";
dagSpec.tasks = daggy::tasksFromJSON(jsonTasks.str());
REQUIRE(dagSpec.tasks.size() == 2);
REQUIRE(dagSpec.tasks["A"].children ==
std::unordered_set<std::string>{"C"});
dagSpec.tasks =
daggy::expandTaskSet(dagSpec.tasks, ex, dagSpec.taskConfig.variables);
REQUIRE(dagSpec.tasks.size() == 2);
REQUIRE(dagSpec.tasks["A_0"].children ==
std::unordered_set<std::string>{"C"});
auto dag = daggy::buildDAGFromTasks(dagSpec.tasks);
REQUIRE(dag.size() == 2);
auto runID = logger.startDAGRun(dagSpec);
daggy::DAGRunner runner(runID, ex, logger, dag, dagSpec.taskConfig);
auto finalDAG = runner.run();
REQUIRE(finalDAG.allVisited());
REQUIRE(finalDAG.size() == 4);
// Check the logger
auto record = logger.getDAGRun(runID);
REQUIRE(record.dagSpec.tasks.size() == 4);
REQUIRE(record.taskRunStates.size() == 4);
for (const auto &[taskName, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.back().rc == 0);
}
// Ensure that children were updated properly
REQUIRE(record.dagSpec.tasks["A_0"].children ==
std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.dagSpec.tasks["B_0"].children ==
std::unordered_set<std::string>{"C"});
REQUIRE(record.dagSpec.tasks["B_1"].children ==
std::unordered_set<std::string>{"C"});
REQUIRE(record.dagSpec.tasks["C_0"].children.empty());
// Ensure they were run in the right order
// All A's get run before B's, which run before C's
daggy::TimePoint globalStopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 3> minTimes;
minTimes.fill(globalStartTime);
std::array<daggy::TimePoint, 3> maxTimes;
maxTimes.fill(globalStopTime);
for (const auto &[k, v] : record.taskAttempts) {
size_t idx = k[0] - 65;
auto &startTime = minTimes[idx];
auto &stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 3; ++i) {
for (size_t j = i + 1; j < 2; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
}
}

View File

@@ -1,6 +1,7 @@
#include <catch2/catch.hpp> #include <catch2/catch.hpp>
#include <filesystem> #include <filesystem>
#include <iostream> #include <iostream>
#include <thread>
#include "daggy/Serialization.hpp" #include "daggy/Serialization.hpp"
#include "daggy/Utilities.hpp" #include "daggy/Utilities.hpp"
@@ -18,7 +19,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(ex.validateTaskParameters(task.job)); REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute("command", task); auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get(); auto rec = recFuture.get();
REQUIRE(rec.rc == 0); REQUIRE(rec.rc == 0);
@@ -32,7 +33,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
.job{{"command", daggy::executors::task::ForkingTaskExecutor::Command{ .job{{"command", daggy::executors::task::ForkingTaskExecutor::Command{
"/usr/bin/expr", "1", "+", "+"}}}}; "/usr/bin/expr", "1", "+", "+"}}}};
auto recFuture = ex.execute("command", task); auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get(); auto rec = recFuture.get();
REQUIRE(rec.rc == 2); REQUIRE(rec.rc == 2);
@@ -40,6 +41,28 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(rec.outputLog.empty()); REQUIRE(rec.outputLog.empty());
} }
SECTION("Killing a long task")
{
daggy::Task task{
.job{{"command", daggy::executors::task::ForkingTaskExecutor::Command{
"/usr/bin/sleep", "30"}}}};
auto start = daggy::Clock::now();
auto recFuture = ex.execute(0, "command", task);
std::this_thread::sleep_for(1s);
ex.stop(0, "command");
auto rec = recFuture.get();
auto stop = daggy::Clock::now();
REQUIRE(rec.rc == 9);
REQUIRE(rec.errorLog.empty());
REQUIRE(rec.outputLog.empty());
REQUIRE(rec.executorLog == "Killed");
REQUIRE(
std::chrono::duration_cast<std::chrono::seconds>(stop - start).count() <
20);
}
SECTION("Large Output") SECTION("Large Output")
{ {
const std::vector<std::string> BIG_FILES{"/usr/share/dict/linux.words", const std::vector<std::string> BIG_FILES{"/usr/share/dict/linux.words",
@@ -54,7 +77,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
.job{{"command", daggy::executors::task::ForkingTaskExecutor::Command{ .job{{"command", daggy::executors::task::ForkingTaskExecutor::Command{
"/usr/bin/cat", bigFile}}}}; "/usr/bin/cat", bigFile}}}};
auto recFuture = ex.execute("command", task); auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get(); auto rec = recFuture.get();
REQUIRE(rec.rc == 0); REQUIRE(rec.rc == 0);

View File

@@ -34,7 +34,7 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
REQUIRE(ex.validateTaskParameters(task.job)); REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute("command", task); auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get(); auto rec = recFuture.get();
REQUIRE(rec.rc == 0); REQUIRE(rec.rc == 0);
@@ -49,7 +49,7 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
"/usr/bin/expr", "1", "+", "+"}}}}; "/usr/bin/expr", "1", "+", "+"}}}};
task.job.merge(defaultJobValues); task.job.merge(defaultJobValues);
auto recFuture = ex.execute("command", task); auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get(); auto rec = recFuture.get();
REQUIRE(rec.rc != 0); REQUIRE(rec.rc != 0);
@@ -57,6 +57,23 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
REQUIRE(rec.outputLog.empty()); REQUIRE(rec.outputLog.empty());
} }
SECTION("Killing a long task")
{
daggy::Task task{
.job{{"command", daggy::executors::task::SlurmTaskExecutor::Command{
"/usr/bin/sleep", "30"}}}};
task.job.merge(defaultJobValues);
auto recFuture = ex.execute(0, "command", task);
ex.stop(0, "command");
auto rec = recFuture.get();
REQUIRE(rec.rc == 9);
REQUIRE(rec.errorLog.empty());
REQUIRE(rec.outputLog.empty());
REQUIRE(rec.executorLog == "Job cancelled by user.\n");
}
SECTION("Large Output") SECTION("Large Output")
{ {
const std::vector<std::string> BIG_FILES{"/usr/share/dict/linux.words", const std::vector<std::string> BIG_FILES{"/usr/share/dict/linux.words",
@@ -72,7 +89,7 @@ TEST_CASE("slurm_execution", "[slurm_executor]")
"/usr/bin/cat", bigFile}}}}; "/usr/bin/cat", bigFile}}}};
task.job.merge(defaultJobValues); task.job.merge(defaultJobValues);
auto recFuture = ex.execute("command", task); auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get(); auto rec = recFuture.get();
REQUIRE(rec.rc == 0); REQUIRE(rec.rc == 0);

View File

@@ -1,51 +1,131 @@
#include <curl/curl.h>
#include <pistache/client.h> #include <pistache/client.h>
#include <rapidjson/document.h> #include <rapidjson/document.h>
#include <sys/stat.h>
#include <catch2/catch.hpp> #include <catch2/catch.hpp>
#include <daggy/Serialization.hpp> #include <daggy/Serialization.hpp>
#include <daggy/Server.hpp> #include <daggy/Server.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp> #include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/executors/task/NoopTaskExecutor.hpp>
#include <daggy/loggers/dag_run/OStreamLogger.hpp> #include <daggy/loggers/dag_run/OStreamLogger.hpp>
#include <filesystem> #include <filesystem>
#include <iostream> #include <iostream>
#include <thread>
namespace rj = rapidjson; namespace rj = rapidjson;
Pistache::Http::Response REQUEST(const std::string &url, using namespace daggy;
const std::string &payload = "")
{
Pistache::Http::Experimental::Client client;
client.init();
Pistache::Http::Response response;
auto reqSpec = (payload.empty() ? client.get(url) : client.post(url));
reqSpec.timeout(std::chrono::seconds(2));
if (!payload.empty()) {
reqSpec.body(payload);
}
auto request = reqSpec.send();
bool ok = false, error = false;
std::string msg;
request.then(
[&](Pistache::Http::Response rsp) {
ok = true;
response = std::move(rsp);
},
[&](std::exception_ptr ptr) {
error = true;
try {
std::rethrow_exception(std::move(ptr));
}
catch (std::exception &e) {
msg = e.what();
}
});
Pistache::Async::Barrier<Pistache::Http::Response> barrier(request); #ifdef DEBUG_HTTP
barrier.wait_for(std::chrono::seconds(2)); static int my_trace(CURL *handle, curl_infotype type, char *data, size_t size,
client.shutdown(); void *userp)
if (error) { {
throw std::runtime_error(msg); const char *text;
(void)handle; /* prevent compiler warning */
(void)userp;
switch (type) {
case CURLINFO_TEXT:
fprintf(stderr, "== Info: %s", data);
default: /* in case a new one is introduced to shock us */
return 0;
case CURLINFO_HEADER_OUT:
text = "=> Send header";
break;
case CURLINFO_DATA_OUT:
text = "=> Send data";
break;
case CURLINFO_SSL_DATA_OUT:
text = "=> Send SSL data";
break;
case CURLINFO_HEADER_IN:
text = "<= Recv header";
break;
case CURLINFO_DATA_IN:
text = "<= Recv data";
break;
case CURLINFO_SSL_DATA_IN:
text = "<= Recv SSL data";
break;
} }
std::cerr << "\n================== " << text
<< " ==================" << std::endl
<< data << std::endl;
return 0;
}
#endif
enum HTTPCode : long
{
Ok = 200,
Not_Found = 404
};
struct HTTPResponse
{
HTTPCode code;
std::string body;
};
uint curlWriter(char *in, uint size, uint nmemb, std::stringstream *out)
{
uint r;
r = size * nmemb;
out->write(in, r);
return r;
}
HTTPResponse REQUEST(const std::string &url, const std::string &payload = "",
const std::string &method = "GET")
{
HTTPResponse response;
CURL *curl;
CURLcode res;
struct curl_slist *headers = NULL;
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if (curl) {
std::stringstream buffer;
#ifdef DEBUG_HTTP
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, my_trace);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
#endif
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
if (!payload.empty()) {
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, payload.size());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload.c_str());
headers = curl_slist_append(headers, "Content-Type: Application/Json");
}
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str());
headers = curl_slist_append(headers, "Expect:");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
res = curl_easy_perform(curl);
if (res != CURLE_OK) {
curl_easy_cleanup(curl);
throw std::runtime_error(std::string{"CURL Failed: "} +
curl_easy_strerror(res));
}
curl_easy_cleanup(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code);
response.body = buffer.str();
}
curl_global_cleanup();
return response; return response;
} }
@@ -68,19 +148,19 @@ TEST_CASE("rest_endpoint", "[server_basic]")
SECTION("Ready Endpoint") SECTION("Ready Endpoint")
{ {
auto response = REQUEST(baseURL + "/ready"); auto response = REQUEST(baseURL + "/ready");
REQUIRE(response.code() == Pistache::Http::Code::Ok); REQUIRE(response.code == HTTPCode::Ok);
} }
SECTION("Querying a non-existent dagrunid should fail ") SECTION("Querying a non-existent dagrunid should fail ")
{ {
auto response = REQUEST(baseURL + "/v1/dagrun/100"); auto response = REQUEST(baseURL + "/v1/dagrun/100");
REQUIRE(response.code() != Pistache::Http::Code::Ok); REQUIRE(response.code != HTTPCode::Ok);
} }
SECTION("Simple DAGRun Submission") SECTION("Simple DAGRun Submission")
{ {
std::string dagRun = R"({ std::string dagRun = R"({
"name": "unit_server", "tag": "unit_server",
"parameters": { "FILE": [ "A", "B" ] }, "parameters": { "FILE": [ "A", "B" ] },
"tasks": { "tasks": {
"touch": { "job": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ]} }, "touch": { "job": { "command": [ "/usr/bin/touch", "dagrun_{{FILE}}" ]} },
@@ -90,14 +170,16 @@ TEST_CASE("rest_endpoint", "[server_basic]")
} }
})"; })";
auto dagSpec = daggy::dagFromJSON(dagRun);
// Submit, and get the runID // Submit, and get the runID
daggy::DAGRunID runID = 0; daggy::DAGRunID runID = 0;
{ {
auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun); auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun, "POST");
REQUIRE(response.code() == Pistache::Http::Code::Ok); REQUIRE(response.code == HTTPCode::Ok);
rj::Document doc; rj::Document doc;
daggy::checkRJParse(doc.Parse(response.body().c_str())); daggy::checkRJParse(doc.Parse(response.body.c_str()));
REQUIRE(doc.IsObject()); REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("runID")); REQUIRE(doc.HasMember("runID"));
@@ -106,11 +188,11 @@ TEST_CASE("rest_endpoint", "[server_basic]")
// Ensure our runID shows up in the list of running DAGs // Ensure our runID shows up in the list of running DAGs
{ {
auto response = REQUEST(baseURL + "/v1/dagrun/"); auto response = REQUEST(baseURL + "/v1/dagruns?all=1");
REQUIRE(response.code() == Pistache::Http::Code::Ok); REQUIRE(response.code == HTTPCode::Ok);
rj::Document doc; rj::Document doc;
daggy::checkRJParse(doc.Parse(response.body().c_str())); daggy::checkRJParse(doc.Parse(response.body.c_str()));
REQUIRE(doc.IsArray()); REQUIRE(doc.IsArray());
REQUIRE(doc.Size() >= 1); REQUIRE(doc.Size() >= 1);
@@ -120,10 +202,10 @@ TEST_CASE("rest_endpoint", "[server_basic]")
for (size_t i = 0; i < runs.Size(); ++i) { for (size_t i = 0; i < runs.Size(); ++i) {
const auto &run = runs[i]; const auto &run = runs[i];
REQUIRE(run.IsObject()); REQUIRE(run.IsObject());
REQUIRE(run.HasMember("name")); REQUIRE(run.HasMember("tag"));
REQUIRE(run.HasMember("runID")); REQUIRE(run.HasMember("runID"));
std::string runName = run["name"].GetString(); std::string runName = run["tag"].GetString();
if (runName == "unit_server") { if (runName == "unit_server") {
REQUIRE(run["runID"].GetUint64() == runID); REQUIRE(run["runID"].GetUint64() == runID);
found = true; found = true;
@@ -133,13 +215,28 @@ TEST_CASE("rest_endpoint", "[server_basic]")
REQUIRE(found); REQUIRE(found);
} }
// Ensure we can get one of our tasks
{
auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID) +
"/task/cat_0");
REQUIRE(response.code == HTTPCode::Ok);
rj::Document doc;
daggy::checkRJParse(doc.Parse(response.body.c_str()));
REQUIRE_NOTHROW(daggy::taskFromJSON("cat", doc));
auto task = daggy::taskFromJSON("cat", doc);
REQUIRE(task == dagSpec.tasks.at("cat"));
}
// Wait until our DAG is complete // Wait until our DAG is complete
bool complete = true; bool complete = true;
for (auto i = 0; i < 10; ++i) { for (auto i = 0; i < 10; ++i) {
auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID)); auto response = REQUEST(baseURL + "/v1/dagrun/" + std::to_string(runID));
REQUIRE(response.code() == Pistache::Http::Code::Ok); REQUIRE(response.code == HTTPCode::Ok);
rj::Document doc; rj::Document doc;
daggy::checkRJParse(doc.Parse(response.body().c_str())); daggy::checkRJParse(doc.Parse(response.body.c_str()));
REQUIRE(doc.IsObject()); REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("taskStates")); REQUIRE(doc.HasMember("taskStates"));
@@ -173,6 +270,113 @@ TEST_CASE("rest_endpoint", "[server_basic]")
fs::remove(pth); fs::remove(pth);
} }
} }
}
TEST_CASE("Server cancels and resumes execution", "[server_resume]")
{
std::stringstream ss;
daggy::executors::task::ForkingTaskExecutor executor(10);
daggy::loggers::dag_run::OStreamLogger logger(ss);
Pistache::Address listenSpec("localhost", Pistache::Port(0));
const size_t nDAGRunners = 10, nWebThreads = 10;
daggy::Server server(listenSpec, logger, executor, nDAGRunners);
server.init(nWebThreads);
server.start();
const std::string host = "localhost:";
const std::string baseURL = host + std::to_string(server.getPort());
SECTION("Cancel / Resume DAGRun")
{
std::string dagRunJSON = R"({
"tag": "unit_server",
"tasks": {
"touch_A": { "job": { "command": [ "/usr/bin/touch", "resume_touch_a" ]}, "children": ["touch_C"] },
"sleep_B": { "job": { "command": [ "/usr/bin/sleep", "3" ]}, "children": ["touch_C"] },
"touch_C": { "job": { "command": [ "/usr/bin/touch", "resume_touch_c" ]} }
}
})";
auto dagSpec = daggy::dagFromJSON(dagRunJSON);
// Submit, and get the runID
daggy::DAGRunID runID;
{
auto response = REQUEST(baseURL + "/v1/dagrun/", dagRunJSON, "POST");
REQUIRE(response.code == HTTPCode::Ok);
rj::Document doc;
daggy::checkRJParse(doc.Parse(response.body.c_str()));
REQUIRE(doc.IsObject());
REQUIRE(doc.HasMember("runID"));
runID = doc["runID"].GetUint64();
}
std::this_thread::sleep_for(1s);
// Stop the current run
{
auto response = REQUEST(
baseURL + "/v1/dagrun/" + std::to_string(runID) + "/state/KILLED", "",
"PATCH");
REQUIRE(response.code == HTTPCode::Ok);
REQUIRE(logger.getDAGRunState(runID) == +daggy::RunState::KILLED);
}
// Verify that the run still exists
{
auto dagRun = logger.getDAGRun(runID);
REQUIRE(dagRun.taskRunStates.at("touch_A_0") ==
+daggy::RunState::COMPLETED);
REQUIRE(fs::exists("resume_touch_a"));
REQUIRE(dagRun.taskRunStates.at("sleep_B_0") ==
+daggy::RunState::ERRORED);
REQUIRE(dagRun.taskRunStates.at("touch_C_0") == +daggy::RunState::QUEUED);
}
// Set the errored task state
{
auto url = baseURL + "/v1/dagrun/" + std::to_string(runID) +
"/task/sleep_B_0/state/QUEUED";
auto response = REQUEST(url, "", "PATCH");
REQUIRE(response.code == HTTPCode::Ok);
REQUIRE(logger.getTaskState(runID, "sleep_B_0") ==
+daggy::RunState::QUEUED);
}
// Resume
{
struct stat s;
lstat("resume_touch_A", &s);
auto preMTime = s.st_mtim.tv_sec;
auto response = REQUEST(
baseURL + "/v1/dagrun/" + std::to_string(runID) + "/state/QUEUED", "",
"PATCH");
// Wait for run to complete
std::this_thread::sleep_for(5s);
REQUIRE(logger.getDAGRunState(runID) == +daggy::RunState::COMPLETED);
REQUIRE(fs::exists("resume_touch_c"));
REQUIRE(fs::exists("resume_touch_a"));
for (const auto &[taskName, task] : dagSpec.tasks) {
REQUIRE(logger.getTaskState(runID, taskName + "_0") ==
+daggy::RunState::COMPLETED);
}
// Ensure "touch_A" wasn't run again
lstat("resume_touch_A", &s);
auto postMTime = s.st_mtim.tv_sec;
REQUIRE(preMTime == postMTime);
}
}
server.shutdown(); server.shutdown();
} }

View File

@@ -8,11 +8,6 @@
#include "daggy/Serialization.hpp" #include "daggy/Serialization.hpp"
#include "daggy/Utilities.hpp" #include "daggy/Utilities.hpp"
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
#include "daggy/executors/task/NoopTaskExecutor.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem;
TEST_CASE("string_utilities", "[utilities_string]") TEST_CASE("string_utilities", "[utilities_string]")
{ {
@@ -59,234 +54,3 @@ TEST_CASE("string_expansion", "[utilities_parameter_expansion]")
REQUIRE(result.size() == 4); REQUIRE(result.size() == 4);
} }
} }
TEST_CASE("dag_runner_order", "[dagrun_order]")
{
daggy::executors::task::NoopTaskExecutor ex;
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::TimePoint globalStartTime = daggy::Clock::now();
std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string taskJSON = R"({
"A": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "B","D" ]},
"B": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "C","D","E" ]},
"C": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "D"]},
"D": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}, "children": [ "E"]},
"E": {"job": {"command": ["/usr/bin/touch", "{{DATE}}"]}}
})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex, params);
REQUIRE(tasks.size() == 20);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited());
// Ensure the run order
auto rec = logger.getDAGRun(runID);
daggy::TimePoint globalStopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 5> minTimes;
minTimes.fill(globalStartTime);
std::array<daggy::TimePoint, 5> maxTimes;
maxTimes.fill(globalStopTime);
for (const auto &[k, v] : rec.taskAttempts) {
size_t idx = k[0] - 65;
auto &startTime = minTimes[idx];
auto &stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 5; ++i) {
for (size_t j = i + 1; j < 4; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
}
TEST_CASE("dag_runner", "[utilities_dag_runner]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Simple execution")
{
std::string prefix = (fs::current_path() / "asdlk").string();
std::unordered_map<std::string, std::string> files{
{"A", prefix + "_A"}, {"B", prefix + "_B"}, {"C", prefix + "_C"}};
std::string taskJSON =
R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + files.at("A") +
R"("]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" +
files.at("B") +
R"("]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" +
files.at("C") + R"("]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto endDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(endDAG.allVisited());
for (const auto &[_, file] : files) {
REQUIRE(fs::exists(file));
fs::remove(file);
}
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
for (const auto &[_, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.front().rc == 0);
}
}
}
TEST_CASE("runDAG_recovery", "[runDAG]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Recovery from Error")
{
auto cleanup = []() {
// Cleanup
std::vector<fs::path> paths{"rec_error_A", "noexist"};
for (const auto &pth : paths) {
if (fs::exists(pth))
fs::remove_all(pth);
}
};
cleanup();
std::string goodPrefix = "rec_error_";
std::string badPrefix = "noexist/rec_error_";
std::string taskJSON =
R"({"A": {"job": {"command": ["/usr/bin/touch", ")" + goodPrefix +
R"(A"]}, "children": ["C"]}, "B": {"job": {"command": ["/usr/bin/touch", ")" +
badPrefix +
R"(B"]}, "children": ["C"]}, "C": {"job": {"command": ["/usr/bin/touch", ")" +
badPrefix + R"(C"]}}})";
auto tasks = expandTaskSet(daggy::tasksFromJSON(taskJSON), ex);
auto dag = daggy::buildDAGFromTasks(tasks);
auto runID = logger.startDAGRun("test_run", tasks);
auto tryDAG = daggy::runDAG(runID, ex, logger, dag);
REQUIRE(!tryDAG.allVisited());
// Create the missing dir, then continue to run the DAG
fs::create_directory("noexist");
tryDAG.resetRunning();
auto endDAG = daggy::runDAG(runID, ex, logger, tryDAG);
REQUIRE(endDAG.allVisited());
// Get the DAG Run Attempts
auto record = logger.getDAGRun(runID);
REQUIRE(record.taskAttempts["A_0"].size() == 1); // A ran fine
REQUIRE(record.taskAttempts["B_0"].size() ==
2); // B errored and had to be retried
REQUIRE(record.taskAttempts["C_0"].size() ==
1); // C wasn't run because B errored
cleanup();
}
}
TEST_CASE("runDAG_generator", "[runDAG_generator]")
{
daggy::executors::task::ForkingTaskExecutor ex(10);
std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss);
SECTION("Generator tasks")
{
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ]})"};
auto params = daggy::configFromJSON(testParams);
std::string generatorOutput =
R"({"B": {"job": {"command": ["/usr/bin/echo", "-e", "{{DATE}}"]}, "children": ["C"]}})";
fs::path ofn = fs::current_path() / "generator_test_output.json";
std::ofstream ofh{ofn};
ofh << generatorOutput << std::endl;
ofh.close();
daggy::TimePoint globalStartTime = daggy::Clock::now();
std::stringstream jsonTasks;
jsonTasks
<< R"({ "A": { "job": {"command": [ "/usr/bin/cat", )"
<< std::quoted(ofn.string())
<< R"(]}, "children": ["C"], "isGenerator": true},)"
<< R"("C": { "job": {"command": [ "/usr/bin/echo", "hello!"]} } })";
auto baseTasks = daggy::tasksFromJSON(jsonTasks.str());
REQUIRE(baseTasks.size() == 2);
REQUIRE(baseTasks["A"].children == std::unordered_set<std::string>{"C"});
auto tasks = daggy::expandTaskSet(baseTasks, ex, params);
REQUIRE(tasks.size() == 2);
REQUIRE(tasks["A_0"].children == std::unordered_set<std::string>{"C"});
auto dag = daggy::buildDAGFromTasks(tasks);
REQUIRE(dag.size() == 2);
auto runID = logger.startDAGRun("generator_run", tasks);
auto finalDAG = daggy::runDAG(runID, ex, logger, dag, params);
REQUIRE(finalDAG.allVisited());
REQUIRE(finalDAG.size() == 4);
// Check the logger
auto record = logger.getDAGRun(runID);
REQUIRE(record.tasks.size() == 4);
REQUIRE(record.taskRunStates.size() == 4);
for (const auto &[taskName, attempts] : record.taskAttempts) {
REQUIRE(attempts.size() == 1);
REQUIRE(attempts.back().rc == 0);
}
// Ensure that children were updated properly
REQUIRE(record.tasks["A_0"].children ==
std::unordered_set<std::string>{"B_0", "B_1", "C"});
REQUIRE(record.tasks["B_0"].children ==
std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["B_1"].children ==
std::unordered_set<std::string>{"C"});
REQUIRE(record.tasks["C_0"].children.empty());
// Ensure they were run in the right order
// All A's get run before B's, which run before C's
daggy::TimePoint globalStopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 3> minTimes;
minTimes.fill(globalStartTime);
std::array<daggy::TimePoint, 3> maxTimes;
maxTimes.fill(globalStopTime);
for (const auto &[k, v] : record.taskAttempts) {
size_t idx = k[0] - 65;
auto &startTime = minTimes[idx];
auto &stopTime = maxTimes[idx];
startTime = std::max(startTime, v.front().startTime);
stopTime = std::min(stopTime, v.back().stopTime);
}
for (size_t i = 0; i < 3; ++i) {
for (size_t j = i + 1; j < 2; ++j) {
REQUIRE(maxTimes[i] < minTimes[j]);
}
}
}
}