Changing Executor interface to allow for more flexible tasks down the road
This commit is contained in:
@@ -12,7 +12,7 @@ namespace daggy {
|
|||||||
|
|
||||||
const std::string getName() const override { return "ForkingTaskExecutor"; }
|
const std::string getName() const override { return "ForkingTaskExecutor"; }
|
||||||
|
|
||||||
AttemptRecord runCommand(std::vector<std::string> cmd) override;
|
AttemptRecord runCommand(const Task &task) override;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ namespace daggy {
|
|||||||
virtual const std::string getName() const = 0;
|
virtual const std::string getName() const = 0;
|
||||||
|
|
||||||
// This will block if the dag_executor is full
|
// This will block if the dag_executor is full
|
||||||
virtual AttemptRecord runCommand(std::vector<std::string> cmd) = 0;
|
virtual AttemptRecord runCommand(const Task &task) = 0;
|
||||||
|
|
||||||
ThreadPool threadPool;
|
ThreadPool threadPool;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ namespace daggy {
|
|||||||
logger.updateTaskState(runID, task.name, RunState::RUNNING);
|
logger.updateTaskState(runID, task.name, RunState::RUNNING);
|
||||||
|
|
||||||
while (attempts.size() < task.maxRetries + 1) {
|
while (attempts.size() < task.maxRetries + 1) {
|
||||||
attempts.push_back(executor.runCommand(task.command));
|
attempts.push_back(executor.runCommand(task));
|
||||||
logger.logTaskAttempt(runID, task.name, attempts.back());
|
logger.logTaskAttempt(runID, task.name, attempts.back());
|
||||||
if (attempts.back().rc == 0) break;
|
if (attempts.back().rc == 0) break;
|
||||||
logger.updateTaskState(runID, task.name, RunState::RETRY);
|
logger.updateTaskState(runID, task.name, RunState::RETRY);
|
||||||
@@ -169,8 +169,8 @@ namespace daggy {
|
|||||||
executor.threadPool.addTasks(tq);
|
executor.threadPool.addTasks(tq);
|
||||||
}
|
}
|
||||||
if (running > 0 and errored == running) {
|
if (running > 0 and errored == running) {
|
||||||
logger.updateDAGRunState(runID, RunState::ERRORED);
|
logger.updateDAGRunState(runID, RunState::ERRORED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::sleep_for(250ms);
|
std::this_thread::sleep_for(250ms);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,14 +31,14 @@ std::string slurp(int fd) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
daggy::AttemptRecord
|
daggy::AttemptRecord
|
||||||
ForkingTaskExecutor::runCommand(std::vector<std::string> cmd) {
|
ForkingTaskExecutor::runCommand(const Task &task) {
|
||||||
AttemptRecord rec;
|
AttemptRecord rec;
|
||||||
|
|
||||||
rec.startTime = Clock::now();
|
rec.startTime = Clock::now();
|
||||||
|
|
||||||
// Need to convert the strings
|
// Need to convert the strings
|
||||||
std::vector<char *> argv;
|
std::vector<char *> argv;
|
||||||
for (const auto &s : cmd) {
|
for (const auto &s : task.command) {
|
||||||
argv.push_back(const_cast<char *>(s.c_str()));
|
argv.push_back(const_cast<char *>(s.c_str()));
|
||||||
}
|
}
|
||||||
argv.push_back(nullptr);
|
argv.push_back(nullptr);
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
|
|||||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||||
|
|
||||||
SECTION("Simple Run") {
|
SECTION("Simple Run") {
|
||||||
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};
|
daggy::Task task{.command{"/usr/bin/echo", "abc", "123"}};
|
||||||
|
|
||||||
auto rec = ex.runCommand(cmd);
|
auto rec = ex.runCommand(task);
|
||||||
|
|
||||||
REQUIRE(rec.rc == 0);
|
REQUIRE(rec.rc == 0);
|
||||||
REQUIRE(rec.outputLog == "abc 123\n");
|
REQUIRE(rec.outputLog == "abc 123\n");
|
||||||
@@ -19,9 +19,9 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SECTION("Error Run") {
|
SECTION("Error Run") {
|
||||||
std::vector<std::string> cmd{"/usr/bin/expr", "1", "+", "+"};
|
daggy::Task task{.command{"/usr/bin/expr", "1", "+", "+"}};
|
||||||
|
|
||||||
auto rec = ex.runCommand(cmd);
|
auto rec = ex.runCommand(task);
|
||||||
|
|
||||||
REQUIRE(rec.rc == 2);
|
REQUIRE(rec.rc == 2);
|
||||||
REQUIRE(rec.errorLog == "/usr/bin/expr: syntax error: missing argument after ‘+’\n");
|
REQUIRE(rec.errorLog == "/usr/bin/expr: syntax error: missing argument after ‘+’\n");
|
||||||
@@ -36,9 +36,9 @@ TEST_CASE("Basic Execution", "[forking_executor]") {
|
|||||||
for (const auto &bigFile : BIG_FILES) {
|
for (const auto &bigFile : BIG_FILES) {
|
||||||
if (!std::filesystem::exists(bigFile)) continue;
|
if (!std::filesystem::exists(bigFile)) continue;
|
||||||
|
|
||||||
std::vector<std::string> cmd{"/usr/bin/cat", bigFile};
|
daggy::Task task{.command{"/usr/bin/cat", bigFile}};
|
||||||
|
|
||||||
auto rec = ex.runCommand(cmd);
|
auto rec = ex.runCommand(task);
|
||||||
|
|
||||||
REQUIRE(rec.rc == 0);
|
REQUIRE(rec.rc == 0);
|
||||||
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
|
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
|
||||||
|
|||||||
Reference in New Issue
Block a user