diff --git a/CMakeLists.txt b/CMakeLists.txt index 753977e..25684bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.10) +cmake_minimum_required(VERSION 3.14) project(overall) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) @@ -6,5 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS True) find_package (Threads REQUIRED) +include(cmake/Pistache.cmake) + add_subdirectory(daggy) add_subdirectory(tests) diff --git a/README.md b/README.md new file mode 100644 index 0000000..302589a --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +Daggy: Ya like dags? + +Description +== + +Daggy is a work orchestration framework for running workflows modeled as +directed, acyclic graphs (DAGs). These are quite useful when modeling +data ingestion / processing pipelines. + + +Requirements +== + +- rapidjson diff --git a/cmake/Pistache.cmake b/cmake/Pistache.cmake new file mode 100644 index 0000000..cf113a3 --- /dev/null +++ b/cmake/Pistache.cmake @@ -0,0 +1,9 @@ +project(pistache NONE) +include(ExternalProject) + +ExternalProject_Add(pistache + GIT_REPOSITORY https://github.com/pistacheio/pistache.git + GIT_TAG master + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/daggy/include/daggy/DAG.impl b/daggy/include/daggy/DAG.impl index b348ea1..b56847c 100644 --- a/daggy/include/daggy/DAG.impl +++ b/daggy/include/daggy/DAG.impl @@ -33,7 +33,7 @@ void DAG::addEdge(const T & from, const T & to) { auto & src = vertices.at(from); auto & dst = vertices.at(to); - if (shortest_path(to, from).size() > 1) { + if (shortestPath(to, from).size() > 1) { throw std::runtime_error("Unable to add edge that would result in a cycle"); } @@ -51,7 +51,7 @@ std::deque DAG::shortestPath(const T & from, const T & to) { auto & src = vertices.at(from); for (const auto & cid : src.children) { - auto pth = shortest_path(cid, to); + auto pth = shortestPath(cid, to); if (subpath.size() == 0 or subpath.size() > pth.size()) subpath.swap(pth); } diff --git a/daggy/include/daggy/Executor.hpp b/daggy/include/daggy/Executor.hpp new file mode 100644 index 0000000..a9e822c --- /dev/null +++ b/daggy/include/daggy/Executor.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "Task.hpp" + +using Clock = std::chrono::system_clock; + +/* + Executors run Tasks, returning a future with the results. + If there are many retries, logs are returned for each attempt. +*/ + +namespace daggy { + struct AttemptRecord { + std::chrono::time_point startTime; + std::chrono::time_point stopTime; + int rc; // RC from the task + std::string metaLog; // Logs from the executor + std::string output; // stdout from command + std::string error; // stderr from command + }; + + using TaskResult = std::vector; + + class Executor { + public: + Executor(size_t maxParallelism) : maxParallelism_(maxPA + virtual const std::string getName() const = 0; + virtual std::future runTask(Task & task) = 0; + private: + size_t maxParallelism_; + }; +} diff --git a/daggy/include/daggy/Scheduler.hpp b/daggy/include/daggy/Scheduler.hpp new file mode 100644 index 0000000..6fb91f9 --- /dev/null +++ b/daggy/include/daggy/Scheduler.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include +#include +#include + +#include "Executor.hpp" + +namespace daggy { + class Scheduler { + public: + void registerExecutor(std::shared_ptr executor); + void runDAG(std::unordered_map tasks); + private: + std::unordered_map> executors; + std::unordered_map>> jobs; + }; +} diff --git a/daggy/include/daggy/Task.hpp b/daggy/include/daggy/Task.hpp new file mode 100644 index 0000000..282a8e5 --- /dev/null +++ b/daggy/include/daggy/Task.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +namespace daggy { + struct Task { + std::string name; + std::string command; + uint8_t max_retries; + uint32_t retry_interval_seconds; // Time to wait between retries + }; +} diff --git a/tests/unit_dag.cpp b/tests/unit_dag.cpp index 2879b16..c145ef6 100644 --- a/tests/unit_dag.cpp +++ b/tests/unit_dag.cpp @@ -13,10 +13,10 @@ TEST_CASE("DAG Basic Tests", "[dag]") { dag.addEdge(i-1, i); } - REQUIRE(dag.shortest_path(0,9).size() == 10); + REQUIRE(dag.shortestPath(0,9).size() == 10); dag.addEdge(5, 9); - REQUIRE(dag.shortest_path(0,9).size() == 7); + REQUIRE(dag.shortestPath(0,9).size() == 7); REQUIRE_THROWS(dag.addEdge(9, 5)); }