diff --git a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp index 99abade..d549a1e 100644 --- a/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/ForkingTaskExecutor.hpp @@ -12,7 +12,7 @@ namespace daggy { const std::string getName() const override { return "ForkingTaskExecutor"; } - AttemptRecord runCommand(std::vector cmd) override; + AttemptRecord runCommand(const Task &task) override; }; } } diff --git a/daggy/include/daggy/executors/task/TaskExecutor.hpp b/daggy/include/daggy/executors/task/TaskExecutor.hpp index ecfad02..749ecb1 100644 --- a/daggy/include/daggy/executors/task/TaskExecutor.hpp +++ b/daggy/include/daggy/executors/task/TaskExecutor.hpp @@ -25,7 +25,7 @@ namespace daggy { virtual const std::string getName() const = 0; // This will block if the dag_executor is full - virtual AttemptRecord runCommand(std::vector cmd) = 0; + virtual AttemptRecord runCommand(const Task &task) = 0; ThreadPool threadPool; }; diff --git a/daggy/src/Utilities.cpp b/daggy/src/Utilities.cpp index b2557aa..d830683 100644 --- a/daggy/src/Utilities.cpp +++ b/daggy/src/Utilities.cpp @@ -95,7 +95,7 @@ namespace daggy { logger.updateTaskState(runID, task.name, RunState::RUNNING); while (attempts.size() < task.maxRetries + 1) { - attempts.push_back(executor.runCommand(task.command)); + attempts.push_back(executor.runCommand(task)); logger.logTaskAttempt(runID, task.name, attempts.back()); if (attempts.back().rc == 0) break; logger.updateTaskState(runID, task.name, RunState::RETRY); @@ -169,8 +169,8 @@ namespace daggy { executor.threadPool.addTasks(tq); } if (running > 0 and errored == running) { - logger.updateDAGRunState(runID, RunState::ERRORED); - break; + logger.updateDAGRunState(runID, RunState::ERRORED); + break; } std::this_thread::sleep_for(250ms); } diff --git a/daggy/src/executors/task/ForkingTaskExecutor.cpp b/daggy/src/executors/task/ForkingTaskExecutor.cpp index fc0b60b..f1b3eeb 100644 --- a/daggy/src/executors/task/ForkingTaskExecutor.cpp +++ b/daggy/src/executors/task/ForkingTaskExecutor.cpp @@ -31,14 +31,14 @@ std::string slurp(int fd) { } daggy::AttemptRecord -ForkingTaskExecutor::runCommand(std::vector cmd) { +ForkingTaskExecutor::runCommand(const Task &task) { AttemptRecord rec; rec.startTime = Clock::now(); // Need to convert the strings std::vector argv; - for (const auto &s : cmd) { + for (const auto &s : task.command) { argv.push_back(const_cast(s.c_str())); } argv.push_back(nullptr); diff --git a/tests/unit_executor_forkingexecutor.cpp b/tests/unit_executor_forkingexecutor.cpp index 61e61c0..a943439 100644 --- a/tests/unit_executor_forkingexecutor.cpp +++ b/tests/unit_executor_forkingexecutor.cpp @@ -9,9 +9,9 @@ TEST_CASE("Basic Execution", "[forking_executor]") { daggy::executors::task::ForkingTaskExecutor ex(10); SECTION("Simple Run") { - std::vector cmd{"/usr/bin/echo", "abc", "123"}; + daggy::Task task{.command{"/usr/bin/echo", "abc", "123"}}; - auto rec = ex.runCommand(cmd); + auto rec = ex.runCommand(task); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog == "abc 123\n"); @@ -19,9 +19,9 @@ TEST_CASE("Basic Execution", "[forking_executor]") { } SECTION("Error Run") { - std::vector cmd{"/usr/bin/expr", "1", "+", "+"}; + daggy::Task task{.command{"/usr/bin/expr", "1", "+", "+"}}; - auto rec = ex.runCommand(cmd); + auto rec = ex.runCommand(task); REQUIRE(rec.rc == 2); REQUIRE(rec.errorLog == "/usr/bin/expr: syntax error: missing argument after ‘+’\n"); @@ -36,9 +36,9 @@ TEST_CASE("Basic Execution", "[forking_executor]") { for (const auto &bigFile : BIG_FILES) { if (!std::filesystem::exists(bigFile)) continue; - std::vector cmd{"/usr/bin/cat", bigFile}; + daggy::Task task{.command{"/usr/bin/cat", bigFile}}; - auto rec = ex.runCommand(cmd); + auto rec = ex.runCommand(task); REQUIRE(rec.rc == 0); REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));