Adding local forking executor and associated tests

This commit is contained in:
Ian Roddis
2021-06-15 13:38:54 -03:00
parent 2cfe3e21d5
commit 81f0935f36
7 changed files with 147 additions and 10 deletions

View File

@@ -22,3 +22,14 @@ make
Architecture
==
- Server
- Interactive endpoint
- DAGs
- CRUDs DAG definitions
- Kick off runs
- Return status
- Scheduler
- Accepts Executors
- Accepts task lists and parameters
- Runs and monitors DAGs

View File

@@ -2,7 +2,7 @@ project(daggy)
#ExternalProject_Add_StepDependencies(pistache_extern build)
file(GLOB SOURCES src/*.cpp)
file(GLOB SOURCES src/*.cpp src/**/*.cpp)
add_library(${PROJECT_NAME} STATIC ${SOURCES})
include_directories(${PISTACHE_INCLUDE_DIR})
target_include_directories(${PROJECT_NAME} PUBLIC include)

View File

@@ -8,7 +8,6 @@
#include "Task.hpp"
using Clock = std::chrono::system_clock;
/*
Executors run Tasks, returning a future with the results.
@@ -16,6 +15,8 @@ using Clock = std::chrono::system_clock;
*/
namespace daggy {
using Clock = std::chrono::system_clock;
struct AttemptRecord {
std::chrono::time_point<Clock> startTime;
std::chrono::time_point<Clock> stopTime;
@@ -25,16 +26,11 @@ namespace daggy {
std::string error; // stderr from command
};
using TaskResult = std::vector<AttemptRecord>;
class Executor {
public:
Executor(size_t maxParallelism) : maxParallelism_(maxParallelism) {}
virtual const std::string getName() const = 0;
// This will block if the executor is full
virtual std::future<TaskResult> runTask(Task & task) = 0;
private:
size_t maxParallelism_;
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
};
}

View File

@@ -9,13 +9,13 @@
#include "Executor.hpp"
namespace daggy {
using Parameter = std::variant<std::string, std::vector<std::string>>;
using ParameterValue = std::variant<std::string, std::vector<std::string>>;
class Scheduler {
public:
// Register an executor
void registerExecutor(std::shared_ptr<Executor> executor);
void runTasks(std::vector<Task> tasks, std::vector<Parameter> parameters);
void runTasks(std::vector<Task> tasks, std::unordered_map<std::string, ParameterValue> parameters);
private:
std::unordered_map<std::string, std::shared_ptr<Executor>> executors;

View File

@@ -0,0 +1,14 @@
#pragma once
#include "../Executor.hpp"
namespace daggy {
namespace executor {
class ForkingExecutor : Executor {
public:
const std::string getName() const override { return "ForkingExecutor"; }
AttemptRecord runCommand(std::vector<std::string> cmd) override;
};
}
}

View File

@@ -0,0 +1,87 @@
#include <daggy/executors/ForkingExecutor.hpp>
#include <unistd.h>
#include <wait.h>
#include <poll.h>
using namespace daggy::executor;
std::string slurp(int fd) {
std::string result;
const size_t BUFFER_SIZE = 4096;
char buffer[BUFFER_SIZE];
struct pollfd pfd{ .fd = fd, .events = POLLIN, .revents = 0 };
poll(&pfd, 1, 0);
while (pfd.revents & POLLIN) {
ssize_t bytes = read(fd, buffer, BUFFER_SIZE);
if (bytes == -1) {
if (errno == EINTR) {
continue;
} else {
perror("read");
exit(1);
}
} else if (bytes == 0) {
break;
} else {
result += buffer;
if (bytes < BUFFER_SIZE) break;
}
}
return result;
}
daggy::AttemptRecord
ForkingExecutor::runCommand(std::vector<std::string> cmd)
{
AttemptRecord rec;
rec.startTime = Clock::now();
// Need to convert the strings
std::vector<char *> argv;
for (const auto & s : cmd) {
argv.push_back(const_cast<char *>(s.c_str()));
}
argv.push_back(nullptr);
// Create the pipe
int stdoutPipe[2]; pipe(stdoutPipe);
int stderrPipe[2]; pipe(stderrPipe);
pid_t child = fork();
if (child < 0) {
throw std::runtime_error("Unable to fork child");
} else if (child == 0) { // child
while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {}
while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) {}
close(stdoutPipe[0]);
close(stderrPipe[0]);
execvp(argv[0], argv.data());
exit(-1);
}
int rc = 0;
waitpid(child, &rc, 0);
rec.stopTime = Clock::now();
if (WIFEXITED(rc)) {
rec.rc = WEXITSTATUS(rc);
} else {
rec.rc = -1;
}
rec.output = slurp(stdoutPipe[0]);
rec.error = slurp(stderrPipe[0]);
close(stdoutPipe[0]);
close(stderrPipe[0]);
return rec;
}

View File

@@ -0,0 +1,29 @@
#include <iostream>
#include "daggy/executors/ForkingExecutor.hpp"
#include "catch.hpp"
TEST_CASE("Basic Execution", "[forking_executor]") {
daggy::executor::ForkingExecutor ex;
SECTION("Simple Run") {
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};
auto rec = ex.runCommand(cmd);
REQUIRE(rec.rc == 0);
REQUIRE(rec.output == "abc 123\n");
REQUIRE(rec.error.empty());
}
SECTION("Error Run") {
std::vector<std::string> cmd{"/usr/bin/expr", "1", "+", "+"};
auto rec = ex.runCommand(cmd);
REQUIRE(rec.rc == 2);
REQUIRE(rec.error == "/usr/bin/expr: syntax error: missing argument after +\n");
REQUIRE(rec.output.empty());
}
}