Adding rough-in for more components, pistache dependency pull

This commit is contained in:
Ian Roddis
2021-06-03 17:57:56 -03:00
parent fbe77d03f9
commit 315638f650
8 changed files with 99 additions and 5 deletions

View File

@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.10)
cmake_minimum_required(VERSION 3.14)
project(overall)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True)
@@ -6,5 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS True)
find_package (Threads REQUIRED)
include(cmake/Pistache.cmake)
add_subdirectory(daggy)
add_subdirectory(tests)

14
README.md Normal file
View File

@@ -0,0 +1,14 @@
Daggy: Ya like dags?
Description
==
Daggy is a work orchestration framework for running workflows modeled as
directed, acyclic graphs (DAGs). These are quite useful when modeling
data ingestion / processing pipelines.
Requirements
==
- rapidjson

9
cmake/Pistache.cmake Normal file
View File

@@ -0,0 +1,9 @@
project(pistache NONE)
include(ExternalProject)
ExternalProject_Add(pistache
GIT_REPOSITORY https://github.com/pistacheio/pistache.git
GIT_TAG master
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@@ -33,7 +33,7 @@ void DAG<T>::addEdge(const T & from, const T & to) {
auto & src = vertices.at(from);
auto & dst = vertices.at(to);
if (shortest_path(to, from).size() > 1) {
if (shortestPath(to, from).size() > 1) {
throw std::runtime_error("Unable to add edge that would result in a cycle");
}
@@ -51,7 +51,7 @@ std::deque<T> DAG<T>::shortestPath(const T & from, const T & to) {
auto & src = vertices.at(from);
for (const auto & cid : src.children) {
auto pth = shortest_path(cid, to);
auto pth = shortestPath(cid, to);
if (subpath.size() == 0 or subpath.size() > pth.size())
subpath.swap(pth);
}

View File

@@ -0,0 +1,38 @@
#pragma once
#include <vector>
#include <thread>
#include <future>
#include <string>
#include <chrono>
#include "Task.hpp"
using Clock = std::chrono::system_clock;
/*
Executors run Tasks, returning a future with the results.
If there are many retries, logs are returned for each attempt.
*/
namespace daggy {
struct AttemptRecord {
std::chrono::time_point<Clock> startTime;
std::chrono::time_point<Clock> stopTime;
int rc; // RC from the task
std::string metaLog; // Logs from the executor
std::string output; // stdout from command
std::string error; // stderr from command
};
using TaskResult = std::vector<AttemptRecord>;
class Executor {
public:
Executor(size_t maxParallelism) : maxParallelism_(maxPA
virtual const std::string getName() const = 0;
virtual std::future<TaskResult> runTask(Task & task) = 0;
private:
size_t maxParallelism_;
};
}

View File

@@ -0,0 +1,18 @@
#pragma once
#include <memory>
#include <unordered_map>
#include <string>
#include "Executor.hpp"
namespace daggy {
class Scheduler {
public:
void registerExecutor(std::shared_ptr<Executor> executor);
void runDAG(std::unordered_map<std::string, Task> tasks);
private:
std::unordered_map<std::string, std::shared_ptr<Executor>> executors;
std::unordered_map<std::string, std::vector<std::future<TaskResult>>> jobs;
};
}

View File

@@ -0,0 +1,13 @@
#pragma once
#include <string>
#include <cstdint>
namespace daggy {
struct Task {
std::string name;
std::string command;
uint8_t max_retries;
uint32_t retry_interval_seconds; // Time to wait between retries
};
}

View File

@@ -13,10 +13,10 @@ TEST_CASE("DAG Basic Tests", "[dag]") {
dag.addEdge(i-1, i);
}
REQUIRE(dag.shortest_path(0,9).size() == 10);
REQUIRE(dag.shortestPath(0,9).size() == 10);
dag.addEdge(5, 9);
REQUIRE(dag.shortest_path(0,9).size() == 7);
REQUIRE(dag.shortestPath(0,9).size() == 7);
REQUIRE_THROWS(dag.addEdge(9, 5));
}