Changing how execution parallelism is handled, so that different

executors can implement their own idea of parallelism.
This commit is contained in:
Ian Roddis
2021-09-15 13:05:04 -03:00
parent 4562ac755e
commit a6a7501d12
13 changed files with 167 additions and 104 deletions

View File

@@ -6,6 +6,7 @@
#include <pistache/endpoint.h>
#include <pistache/http.h>
#include "ThreadPool.hpp"
#include "loggers/dag_run/DAGRunLogger.hpp"
#include "executors/task/TaskExecutor.hpp"

View File

@@ -31,14 +31,6 @@ namespace daggy {
void updateDAGFromTasks(TaskDAG &dag, TaskSet &tasks);
// Blocking call
std::vector<AttemptRecord>
runTask(DAGRunID runID,
const std::string &taskName,
const Task &task,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger);
TaskDAG runDAG(DAGRunID runID,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,

View File

@@ -1,6 +1,7 @@
#pragma once
#include "TaskExecutor.hpp"
#include <daggy/ThreadPool.hpp>
namespace daggy::executors::task {
class ForkingTaskExecutor : public TaskExecutor {
@@ -8,7 +9,7 @@ namespace daggy::executors::task {
using Command = std::vector<std::string>;
ForkingTaskExecutor(size_t nThreads)
: TaskExecutor(nThreads) {}
: tp_(nThreads) {}
// Validates the job to ensure that all required values are set and are of the right type,
bool validateTaskParameters(const ConfigValues &job) override;
@@ -17,7 +18,10 @@ namespace daggy::executors::task {
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
AttemptRecord execute(const std::string &taskName, const Task &task) override;
std::future<AttemptRecord> execute(const std::string &taskName, const Task &task) override;
private:
ThreadPool tp_;
AttemptRecord runTask(const Task &task);
};
}

View File

@@ -7,7 +7,8 @@ namespace daggy::executors::task {
public:
using Command = std::vector<std::string>;
SlurmTaskExecutor(size_t nThreads);
SlurmTaskExecutor();
~SlurmTaskExecutor();
// Validates the job to ensure that all required values are set and are of the right type,
bool validateTaskParameters(const ConfigValues &job) override;
@@ -16,6 +17,21 @@ namespace daggy::executors::task {
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
AttemptRecord execute(const std::string &taskName, const Task &task) override;
std::future<AttemptRecord> execute(const std::string &taskName, const Task &task) override;
private:
struct Job {
std::promise<AttemptRecord> prom;
std::string stdoutFile;
std::string stderrFile;
};
std::mutex promiseGuard_;
std::unordered_map<size_t, Job> runningJobs_;
std::atomic<bool> running_;
// Monitors jobs and resolves promises
std::thread monitorWorker_;
void monitor();
};
}

View File

@@ -6,8 +6,7 @@
#include <thread>
#include <vector>
#include "daggy/Defines.hpp"
#include "daggy/ThreadPool.hpp"
#include <daggy/Defines.hpp>
/*
Executors run Tasks, returning a future with the results.
@@ -17,7 +16,7 @@
namespace daggy::executors::task {
class TaskExecutor {
public:
TaskExecutor(size_t nThreads) : threadPool(nThreads) {};
virtual ~TaskExecutor() = default;
// Validates the job to ensure that all required values are set and are of the right type,
virtual bool validateTaskParameters(const ConfigValues &job) = 0;
@@ -27,8 +26,6 @@ namespace daggy::executors::task {
expandTaskParameters(const ConfigValues &job, const ConfigValues &expansionValues) = 0;
// Blocking execution of a task
virtual AttemptRecord execute(const std::string &taskName, const Task &task) = 0;
ThreadPool threadPool;
virtual std::future<AttemptRecord> execute(const std::string &taskName, const Task &task) = 0;
};
}

View File

@@ -16,6 +16,8 @@ namespace daggy {
namespace dag_run {
class DAGRunLogger {
public:
virtual ~DAGRunLogger() = default;
// Execution
virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0;

View File

@@ -1,8 +1,11 @@
#include <future>
#include <iomanip>
#include <daggy/Utilities.hpp>
#include <daggy/Serialization.hpp>
using namespace std::chrono_literals;
namespace daggy {
std::string globalSub(std::string string, const std::string &pattern, const std::string &replacement) {
size_t pos = string.find(pattern);
@@ -64,7 +67,9 @@ namespace daggy {
executor.validateTaskParameters(task.job);
const auto newJobs = executor.expandTaskParameters(task.job, interpolatedValues);
if (newJobs.size() == 1) {
newTaskSet.emplace(baseName, task);
Task newTask{task};
newTask.job = newJobs.front();
newTaskSet.emplace(baseName, newTask);
} else {
size_t i = 0;
for (const auto &newJob: newJobs) {
@@ -126,23 +131,6 @@ namespace daggy {
return dag;
}
std::vector<AttemptRecord> runTask(DAGRunID runID,
const std::string &taskName,
const Task &task,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger) {
std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, taskName, RunState::RUNNING);
while (attempts.size() < task.maxRetries + 1) {
attempts.push_back(executor.execute(taskName, task));
logger.logTaskAttempt(runID, taskName, attempts.back());
if (attempts.back().rc == 0) break;
logger.updateTaskState(runID, taskName, RunState::RETRY);
}
return attempts;
}
TaskDAG runDAG(DAGRunID runID,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger,
@@ -151,7 +139,8 @@ namespace daggy {
) {
logger.updateDAGRunState(runID, RunState::RUNNING);
std::unordered_map<std::string, std::future<std::vector<AttemptRecord>>> runningTasks;
std::unordered_map<std::string, std::future<AttemptRecord>> runningTasks;
std::unordered_map<std::string, size_t> taskAttemptCounts;
size_t running = 0;
size_t errored = 0;
@@ -159,19 +148,16 @@ namespace daggy {
// Check for any completed tasks
for (auto &[taskName, fut]: runningTasks) {
if (fut.valid()) {
auto attemptRecords = fut.get();
if (attemptRecords.empty()) {
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
if (attemptRecords.back().rc == 0) {
auto attempt = fut.get();
logger.logTaskAttempt(runID, taskName, attempt);
if (attempt.rc == 0) {
logger.updateTaskState(runID, taskName, RunState::COMPLETED);
auto &vert = dag.getVertex(taskName);
auto &task = vert.data;
if (task.isGenerator) {
// Parse the output and update the DAGs
try {
auto newTasks = expandTaskSet(tasksFromJSON(attemptRecords.back().outputLog),
auto newTasks = expandTaskSet(tasksFromJSON(attempt.outputLog),
executor,
parameters
);
@@ -195,31 +181,36 @@ namespace daggy {
dag.completeVisit(taskName);
--running;
} else {
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
// RC isn't 0
const auto & task = dag.getVertex(taskName).data;
if (taskAttemptCounts[taskName] <= task.maxRetries) {
logger.updateTaskState(runID, taskName, RunState::RETRY);
runningTasks.extract(taskName);
runningTasks.emplace(taskName, executor.execute(taskName, task));
++taskAttemptCounts[taskName];
} else {
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
}
}
}
// Add all remaining tasks in a task queue to avoid dominating the thread pool
auto tq = std::make_shared<TaskQueue>();
auto t = dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
auto &taskName = t.value().first;
auto &task = t.value().second;
runningTasks.emplace(taskName, tq->addTask([runID, taskName, task, &executor, &logger]() {
return runTask(runID, taskName, task, executor, logger);
}));
taskAttemptCounts[taskName] = 1;
runningTasks.emplace(taskName, executor.execute(taskName, task));
++running;
auto nextTask = dag.visitNext();
if (not nextTask.has_value()) break;
t.emplace(nextTask.value());
}
if (!tq->empty()) {
executor.threadPool.addTasks(tq);
}
if (running > 0 and errored == running) {
logger.updateDAGRunState(runID, RunState::ERRORED);
break;

View File

@@ -31,8 +31,14 @@ std::string slurp(int fd) {
return result;
}
std::future<daggy::AttemptRecord>
ForkingTaskExecutor::execute(const std::string &taskName, const Task &task) {
return tp_.addTask([&](){return runTask(task);});
}
daggy::AttemptRecord
ForkingTaskExecutor::execute(const std::string &, const Task &task) {
ForkingTaskExecutor::runTask(const Task &task) {
AttemptRecord rec;
rec.startTime = Clock::now();

View File

@@ -1,3 +1,5 @@
#include <iterator>
#include <stdexcept>
#ifdef DAGGY_ENABLE_SLURM
#include <random>
#include <filesystem>
@@ -44,8 +46,10 @@ namespace daggy::executors::task {
dest.swap(contents);
}
SlurmTaskExecutor::SlurmTaskExecutor(size_t nThreads)
: TaskExecutor(nThreads) {
SlurmTaskExecutor::SlurmTaskExecutor()
: running_(true)
, monitorWorker_(&SlurmTaskExecutor::monitor, this)
{
std::string priority = "SLURM_PRIO_PROCESS=" + std::to_string(getpriority(PRIO_PROCESS, 0));
std::string submitDir = "SLURM_SUBMIT_DIR=" + fs::current_path().string();
@@ -71,6 +75,11 @@ namespace daggy::executors::task {
putenv(const_cast<char *>(ss.str().c_str()));
}
SlurmTaskExecutor::~SlurmTaskExecutor() {
running_ = false;
monitorWorker_.join();
}
// Validates the job to ensure that all required values are set and are of the right type,
bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job) {
const std::unordered_set<std::string> requiredFields{
@@ -107,8 +116,8 @@ namespace daggy::executors::task {
return newValues;
}
AttemptRecord SlurmTaskExecutor::execute(const std::string &taskName, const Task &task) {
AttemptRecord record;
std::future<AttemptRecord>
SlurmTaskExecutor::execute(const std::string &taskName, const Task &task) {
std::stringstream executorLog;
const auto &job = task.job;
@@ -122,9 +131,12 @@ namespace daggy::executors::task {
// Convert command to argc / argv
std::vector<char *> argv{nullptr};
const auto command = std::get<std::vector<std::string>>(task.job.at("command"));
for (const auto &s: command) {
argv.push_back(const_cast<char *>(s.c_str()));
}
std::transform(command.begin(),
command.end(),
std::back_inserter(argv),
[](const std::string & s) {
return const_cast<char *>(s.c_str());
});
char empty[] = "";
char *env[1];
@@ -170,78 +182,93 @@ namespace daggy::executors::task {
error_code = slurm_submit_batch_job(&jd, &resp_msg);
if (error_code) {
record.rc = error_code;
executorLog << "Unable to submit slurm job: "
std::stringstream ss;
ss << "Unable to submit slurm job: "
<< slurm_strerror(error_code);
record.executorLog = executorLog.str();
return record;
throw std::runtime_error(ss.str());
}
uint32_t jobID = resp_msg->job_id;
executorLog << "Job " << resp_msg->job_submit_user_msg << '\n';
slurm_free_submit_response_response_msg(resp_msg);
// Pool until done
job_info_msg_t * jobStatus;
bool waiting = true;
while (waiting) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
error_code = slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL);
if (error_code == SLURM_SUCCESS) {
uint32_t idx = jobStatus->record_count;
if (idx > 0) {
std::lock_guard<std::mutex> lock(promiseGuard_);
Job newJob{
.prom{},
.stdoutFile = stdoutFile,
.stderrFile = stderrFile
};
auto fut = newJob.prom.get_future();
runningJobs_.emplace(jobID, std::move(newJob));
return fut;
}
void SlurmTaskExecutor::monitor() {
std::unordered_set<size_t> resolvedJobs;
while (running_) {
{
std::lock_guard<std::mutex> lock(promiseGuard_);
for (auto & [jobID, job] : runningJobs_) {
job_info_msg_t * jobStatus;
int error_code = slurm_load_job(&jobStatus, jobID, SHOW_ALL | SHOW_DETAIL);
if (error_code != SLURM_SUCCESS) continue;
uint32_t idx = jobStatus->record_count;
if (idx == 0) continue;
idx--;
const slurm_job_info_t & jobInfo = jobStatus->job_array[idx];
AttemptRecord record;
switch(jobInfo.job_state) {
case JOB_PENDING:
case JOB_SUSPENDED:
case JOB_RUNNING:
continue;
break;
// Job has finished
// Job has finished
case JOB_COMPLETE: /* completed execution successfully */
case JOB_FAILED: /* completed execution unsuccessfully */
record.rc = jobInfo.exit_code;
executorLog << "Script errored.\n";
record.executorLog = "Script errored.\n";
break;
case JOB_CANCELLED: /* cancelled by user */
executorLog << "Job cancelled by user.\n";
record.executorLog = "Job cancelled by user.\n";
break;
case JOB_TIMEOUT: /* terminated on reaching time limit */
executorLog << "Job exceeded time limit.\n";
record.executorLog = "Job exceeded time limit.\n";
break;
case JOB_NODE_FAIL: /* terminated on node failure */
executorLog << "Node failed during execution\n";
record.executorLog = "Node failed during execution\n";
break;
case JOB_PREEMPTED: /* terminated due to preemption */
executorLog << "Job terminated due to pre-emption.\n";
record.executorLog = "Job terminated due to pre-emption.\n";
break;
case JOB_BOOT_FAIL: /* terminated due to node boot failure */
executorLog << "Job failed to run due to failure of compute node to boot.\n";
record.executorLog = "Job failed to run due to failure of compute node to boot.\n";
break;
case JOB_DEADLINE: /* terminated on deadline */
executorLog << "Job terminated due to deadline.\n";
record.executorLog = "Job terminated due to deadline.\n";
break;
case JOB_OOM: /* experienced out of memory error */
executorLog << "Job terminated due to out-of-memory.\n";
record.executorLog = "Job terminated due to out-of-memory.\n";
break;
}
waiting = false;
record.rc = jobInfo.exit_code;
slurm_free_job_info_msg(jobStatus);
readAndClean(job.stdoutFile, record.outputLog);
readAndClean(job.stderrFile, record.errorLog);
job.prom.set_value(std::move(record));
resolvedJobs.insert(jobID);
}
slurm_free_job_info_msg(jobStatus);
readAndClean(stdoutFile, record.outputLog);
readAndClean(stderrFile, record.errorLog);
record.executorLog = executorLog.str();
} else {
executorLog << "Failed to poll for job: "
<< slurm_strerror(error_code);
for (const auto &jobID : resolvedJobs) {
runningJobs_.extract(jobID);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
return record;
}
}
#endif

View File

@@ -16,7 +16,8 @@ TEST_CASE("forking_executor", "[forking_executor]") {
REQUIRE(ex.validateTaskParameters(task.job));
auto rec = ex.execute("command", task);
auto recFuture = ex.execute("command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
@@ -27,7 +28,8 @@ TEST_CASE("forking_executor", "[forking_executor]") {
daggy::Task task{.job{
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}};
auto rec = ex.execute("command", task);
auto recFuture = ex.execute("command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 2);
REQUIRE(rec.errorLog.size() >= 20);
@@ -45,7 +47,8 @@ TEST_CASE("forking_executor", "[forking_executor]") {
daggy::Task task{.job{
{"command", daggy::executors::task::ForkingTaskExecutor::Command{"/usr/bin/cat", bigFile}}}};
auto rec = ex.execute("command", task);
auto recFuture = ex.execute("command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));
@@ -80,4 +83,4 @@ TEST_CASE("forking_executor", "[forking_executor]") {
REQUIRE(tasks.size() == 4);
}
}
}

View File

@@ -15,7 +15,7 @@ namespace fs = std::filesystem;
#ifdef DAGGY_ENABLE_SLURM
TEST_CASE("slurm_execution", "[slurm_executor]") {
daggy::executors::task::SlurmTaskExecutor ex(10);
daggy::executors::task::SlurmTaskExecutor ex;
daggy::ConfigValues defaultJobValues{
{"minCPUs", "1"},
@@ -37,7 +37,8 @@ TEST_CASE("slurm_execution", "[slurm_executor]") {
REQUIRE(ex.validateTaskParameters(task.job));
auto rec = ex.execute("command", task);
auto recFuture = ex.execute("command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
@@ -49,7 +50,8 @@ TEST_CASE("slurm_execution", "[slurm_executor]") {
{"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/expr", "1", "+", "+"}}}};
task.job.merge(defaultJobValues);
auto rec = ex.execute("command", task);
auto recFuture = ex.execute("command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc != 0);
REQUIRE(rec.errorLog.size() >= 20);
@@ -68,7 +70,8 @@ TEST_CASE("slurm_execution", "[slurm_executor]") {
{"command", daggy::executors::task::SlurmTaskExecutor::Command{"/usr/bin/cat", bigFile}}}};
task.job.merge(defaultJobValues);
auto rec = ex.execute("command", task);
auto recFuture = ex.execute("command", task);
auto rec = recFuture.get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));

View File

@@ -38,4 +38,25 @@ TEST_CASE("threadpool", "[threadpool]") {
for (auto &r: res) r.get();
REQUIRE(cnt == 100);
}
SECTION("parallel") {
std::vector<std::future<void>> res;
using namespace std::chrono_literals;
std::atomic<uint32_t> maxCnt{0};
for (size_t i = 0; i < 100; ++i)
res.push_back(tp.addTask([&cnt,&maxCnt, i]() {
auto delay = 20ms;
uint32_t current = cnt.fetch_add(1);
delay += i * 1ms;
std::this_thread::sleep_for(delay);
if (current > maxCnt) {
maxCnt = current;
}
cnt--;
return;
}));
for (auto &r: res) r.get();
REQUIRE(maxCnt > 1);
}
}

View File

@@ -185,7 +185,7 @@ int main(int argc, char **argv) {
}
#ifdef DAGGY_ENABLE_SLURM
daggy::executors::task::SlurmTaskExecutor executor(executorThreads);
daggy::executors::task::SlurmTaskExecutor executor;
#else
daggy::executors::task::ForkingTaskExecutor executor(executorThreads);
#endif