diff --git a/README.md b/README.md index 23fac46..69078c4 100644 --- a/README.md +++ b/README.md @@ -22,3 +22,14 @@ make Architecture == +- Server + - Interactive endpoint + - DAGs + - CRUDs DAG definitions + - Kick off runs + - Return status +- Scheduler + - Accepts Executors + - Accepts task lists and parameters + - Runs and monitors DAGs + diff --git a/daggy/CMakeLists.txt b/daggy/CMakeLists.txt index d37eadc..bb1ca2a 100644 --- a/daggy/CMakeLists.txt +++ b/daggy/CMakeLists.txt @@ -2,7 +2,7 @@ project(daggy) #ExternalProject_Add_StepDependencies(pistache_extern build) -file(GLOB SOURCES src/*.cpp) +file(GLOB SOURCES src/*.cpp src/**/*.cpp) add_library(${PROJECT_NAME} STATIC ${SOURCES}) include_directories(${PISTACHE_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC include) diff --git a/daggy/include/daggy/Executor.hpp b/daggy/include/daggy/Executor.hpp index f317500..9b22f96 100644 --- a/daggy/include/daggy/Executor.hpp +++ b/daggy/include/daggy/Executor.hpp @@ -8,7 +8,6 @@ #include "Task.hpp" -using Clock = std::chrono::system_clock; /* Executors run Tasks, returning a future with the results. @@ -16,6 +15,8 @@ using Clock = std::chrono::system_clock; */ namespace daggy { + using Clock = std::chrono::system_clock; + struct AttemptRecord { std::chrono::time_point startTime; std::chrono::time_point stopTime; @@ -25,16 +26,11 @@ namespace daggy { std::string error; // stderr from command }; - using TaskResult = std::vector; - class Executor { public: - Executor(size_t maxParallelism) : maxParallelism_(maxParallelism) {} virtual const std::string getName() const = 0; // This will block if the executor is full - virtual std::future runTask(Task & task) = 0; - private: - size_t maxParallelism_; + virtual AttemptRecord runCommand(std::vector cmd) = 0; }; } diff --git a/daggy/include/daggy/Scheduler.hpp b/daggy/include/daggy/Scheduler.hpp index cfdc980..3583bec 100644 --- a/daggy/include/daggy/Scheduler.hpp +++ b/daggy/include/daggy/Scheduler.hpp @@ -9,13 +9,13 @@ #include "Executor.hpp" namespace daggy { - using Parameter = std::variant>; + using ParameterValue = std::variant>; class Scheduler { public: // Register an executor void registerExecutor(std::shared_ptr executor); - void runTasks(std::vector tasks, std::vector parameters); + void runTasks(std::vector tasks, std::unordered_map parameters); private: std::unordered_map> executors; diff --git a/daggy/include/daggy/executors/ForkingExecutor.hpp b/daggy/include/daggy/executors/ForkingExecutor.hpp new file mode 100644 index 0000000..9e14c67 --- /dev/null +++ b/daggy/include/daggy/executors/ForkingExecutor.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "../Executor.hpp" + +namespace daggy { + namespace executor { + class ForkingExecutor : Executor { + public: + const std::string getName() const override { return "ForkingExecutor"; } + + AttemptRecord runCommand(std::vector cmd) override; + }; + } +} diff --git a/daggy/src/executors/ForkingExecutor.cpp b/daggy/src/executors/ForkingExecutor.cpp new file mode 100644 index 0000000..b814612 --- /dev/null +++ b/daggy/src/executors/ForkingExecutor.cpp @@ -0,0 +1,87 @@ +#include + +#include +#include +#include + +using namespace daggy::executor; + +std::string slurp(int fd) { + std::string result; + + const size_t BUFFER_SIZE = 4096; + char buffer[BUFFER_SIZE]; + + struct pollfd pfd{ .fd = fd, .events = POLLIN, .revents = 0 }; + poll(&pfd, 1, 0); + + while (pfd.revents & POLLIN) { + ssize_t bytes = read(fd, buffer, BUFFER_SIZE); + if (bytes == -1) { + if (errno == EINTR) { + continue; + } else { + perror("read"); + exit(1); + } + } else if (bytes == 0) { + break; + } else { + result += buffer; + if (bytes < BUFFER_SIZE) break; + } + } + + return result; +} + + +daggy::AttemptRecord + ForkingExecutor::runCommand(std::vector cmd) +{ + AttemptRecord rec; + + rec.startTime = Clock::now(); + + // Need to convert the strings + std::vector argv; + for (const auto & s : cmd) { + argv.push_back(const_cast(s.c_str())); + } + argv.push_back(nullptr); + + // Create the pipe + int stdoutPipe[2]; pipe(stdoutPipe); + int stderrPipe[2]; pipe(stderrPipe); + + pid_t child = fork(); + if (child < 0) { + throw std::runtime_error("Unable to fork child"); + } else if (child == 0) { // child + while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {} + while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {} + close(stdoutPipe[0]); + close(stderrPipe[0]); + execvp(argv[0], argv.data()); + exit(-1); + } + + int rc = 0; + waitpid(child, &rc, 0); + + rec.stopTime = Clock::now(); + if (WIFEXITED(rc)) { + rec.rc = WEXITSTATUS(rc); + } else { + rec.rc = -1; + } + + + rec.output = slurp(stdoutPipe[0]); + rec.error = slurp(stderrPipe[0]); + + close(stdoutPipe[0]); + close(stderrPipe[0]); + + return rec; +} diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp new file mode 100644 index 0000000..36d5fda --- /dev/null +++ b/tests/unit_executor_forkingexecutor.cpp @@ -0,0 +1,29 @@ +#include + +#include "daggy/executors/ForkingExecutor.hpp" + +#include "catch.hpp" + +TEST_CASE("Basic Execution", "[forking_executor]") { + daggy::executor::ForkingExecutor ex; + + SECTION("Simple Run") { + std::vector cmd{"/usr/bin/echo", "abc", "123"}; + + auto rec = ex.runCommand(cmd); + + REQUIRE(rec.rc == 0); + REQUIRE(rec.output == "abc 123\n"); + REQUIRE(rec.error.empty()); + } + + SECTION("Error Run") { + std::vector cmd{"/usr/bin/expr", "1", "+", "+"}; + + auto rec = ex.runCommand(cmd); + + REQUIRE(rec.rc == 2); + REQUIRE(rec.error == "/usr/bin/expr: syntax error: missing argument after ‘+’\n"); + REQUIRE(rec.output.empty()); + } +}