Checkpointing work

This commit is contained in:
Ian Roddis
2022-01-12 12:50:46 -04:00
parent 04e95cfcf3
commit 9a5a247f15
21 changed files with 320 additions and 160 deletions

View File

@@ -47,7 +47,8 @@ namespace daggy {
ssize_t nRunningTasks_;
ssize_t nErroredTasks_;
std::unordered_map<std::string, std::future<AttemptRecord>> runningTasks_;
std::unordered_map<std::string, daggy::executors::task::TaskFuture>
runningTasks_;
std::unordered_map<std::string, size_t> taskAttemptCounts_;
std::mutex runGuard_;

View File

@@ -9,6 +9,8 @@
#include <variant>
#include <vector>
#include "Future.hpp"
namespace daggy {
// Commands and parameters
using ConfigValue = std::variant<std::string, std::vector<std::string>>;
@@ -72,6 +74,7 @@ namespace daggy {
std::string outputLog; // stdout from command
std::string errorLog; // stderr from command
};
} // namespace daggy
BETTER_ENUMS_DECLARE_STD_HASH(daggy::RunState)

View File

@@ -0,0 +1,113 @@
#pragma once
#include <atomic>
#include <chrono>
#include <exception>
#include <iostream>
#include <optional>
#include <thread>
namespace daggy {
enum class FutureState : uint8_t
{
NOT_READY,
OK,
ERROR,
};
template <class T>
class Future
{
public:
Future()
: state_{FutureState::NOT_READY}
, val_(std::nullopt)
{
}
FutureState state()
{
return state_;
}
void set(const T val)
{
if (val_) {
std::cout << "Future already has a value!" << std::endl;
throw std::runtime_error("Future already has a value");
}
val_.emplace(val);
state_ = FutureState::OK;
}
bool ready() const
{
return state_.load() != FutureState::NOT_READY;
}
void setException(const std::exception &e)
{
exp_ = e;
state_ = FutureState::ERROR;
}
T get()
{
while (!ready()) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
if (state_ == FutureState::ERROR)
throw exp_;
return *val_;
}
private:
std::atomic<FutureState> state_;
std::optional<T> val_;
std::exception exp_;
};
template <>
struct Future<void>
{
public:
Future()
: state_{FutureState::NOT_READY}
{
}
FutureState state()
{
return state_;
}
bool ready() const
{
return state_ != FutureState::NOT_READY;
}
void set()
{
state_ = FutureState::OK;
}
void setException(const std::exception &e)
{
exp_ = e;
state_ = FutureState::ERROR;
}
void get()
{
if (state_ == FutureState::NOT_READY)
throw std::runtime_error("Value is not ready");
if (state_ == FutureState::ERROR)
throw exp_;
}
private:
std::atomic<FutureState> state_;
std::exception exp_;
};
} // namespace daggy

View File

@@ -3,17 +3,17 @@
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <list>
#include <memory>
#include <queue>
#include <thread>
#include <vector>
#include "Future.hpp"
using namespace std::chrono_literals;
namespace daggy {
class ThreadPool
{
public:
@@ -65,7 +65,7 @@ namespace daggy {
for (size_t i = 0; i < nWorkers; ++i)
workers_.emplace_back([&] {
std::packaged_task<void()> task;
std::function<void()> task;
while (true) {
{
std::unique_lock<std::mutex> lock(mtx_);
@@ -88,15 +88,30 @@ namespace daggy {
{
if (drain_)
throw std::runtime_error("Unable to add task to draining pool");
using return_type = std::invoke_result_t<F, Args...>;
std::packaged_task<return_type()> task(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto callable =
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto res = std::make_shared<Future<return_type>>();
std::future<return_type> res = task.get_future();
{
std::lock_guard<std::mutex> guard(mtx_);
tasks_.emplace(std::move(task));
tasks_.emplace([res, task = std::move(callable)]() -> void {
try {
if constexpr ((std::is_same<return_type, void>::value)) {
task();
res->set();
}
else {
return_type val = task();
res->set(val);
}
}
catch (std::exception &e) {
res->setException(e);
}
});
}
cv_.notify_one();
return res;
@@ -117,7 +132,7 @@ namespace daggy {
// need to keep track of threads, so we can join them
std::vector<std::thread> workers_;
// the task queue
std::queue<std::packaged_task<void()>> tasks_;
std::queue<std::function<void()>> tasks_;
// synchronization
std::mutex mtx_;

View File

@@ -45,9 +45,8 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override;
TaskFuture execute(DAGRunID runID, const std::string &taskName,
const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
@@ -60,7 +59,7 @@ namespace daggy::executors::task {
struct RunningTask
{
std::promise<AttemptRecord> prom;
TaskFuture fut;
DAGRunID runID;
std::string taskName;
std::string runnerURL;

View File

@@ -25,9 +25,8 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override;
TaskFuture execute(DAGRunID runID, const std::string &taskName,
const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;

View File

@@ -16,12 +16,11 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override;
TaskFuture execute(DAGRunID runID, const std::string &taskName,
const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
std::string description() const;
std::string description() const override;
};
} // namespace daggy::executors::task

View File

@@ -19,9 +19,8 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) override;
TaskFuture execute(DAGRunID runID, const std::string &taskName,
const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
@@ -30,7 +29,7 @@ namespace daggy::executors::task {
private:
struct Job
{
std::promise<AttemptRecord> prom;
TaskFuture fut;
std::string stdoutFile;
std::string stderrFile;
DAGRunID runID;

View File

@@ -13,6 +13,8 @@
*/
namespace daggy::executors::task {
using TaskFuture = std::shared_ptr<Future<AttemptRecord>>;
class TaskExecutor
{
public:
@@ -27,9 +29,8 @@ namespace daggy::executors::task {
const ConfigValues &job, const ConfigValues &expansionValues) = 0;
// Blocking execution of a task
virtual std::future<AttemptRecord> execute(DAGRunID runID,
const std::string &taskName,
const Task &task) = 0;
virtual TaskFuture execute(DAGRunID runID, const std::string &taskName,
const Task &task) = 0;
// Kill a currently executing task. This will resolve the future.
virtual bool stop(DAGRunID runID, const std::string &taskName) = 0;

View File

@@ -102,12 +102,12 @@ namespace daggy {
while (t.has_value()) {
// Schedule the task to run
auto &taskName = t.value().first;
auto &task = t.value().second;
taskAttemptCounts_[taskName] = 1;
logger_.updateTaskState(runID_, taskName, RunState::RUNNING);
try {
auto fut = executor_.execute(runID_, taskName, task);
auto &task = t.value().second;
auto fut = executor_.execute(runID_, taskName, task);
runningTasks_.emplace(taskName, std::move(fut));
}
catch (std::exception &e) {
@@ -125,8 +125,8 @@ namespace daggy {
void DAGRunner::collectFinished()
{
for (auto &[taskName, fut] : runningTasks_) {
if (fut.valid() and fut.wait_for(1ms) == std::future_status::ready) {
auto attempt = fut.get();
if (fut->ready()) {
auto attempt = fut->get();
logger_.logTaskAttempt(runID_, taskName, attempt);
// Not a reference, since adding tasks will invalidate references

View File

@@ -234,7 +234,7 @@ namespace daggy {
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2);
// curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30);
if (trace) {
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);

View File

@@ -145,8 +145,9 @@ std::vector<ConfigValues> DaggyRunnerTaskExecutor::expandTaskParameters(
}
// Runs the task
std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
TaskFuture DaggyRunnerTaskExecutor::execute(DAGRunID runID,
const std::string &taskName,
const Task &task)
{
auto taskUsed = capacityFromTask(task);
@@ -183,11 +184,9 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
}
if (impacts.empty()) {
std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
auto fut = std::make_shared<Future<AttemptRecord>>();
fut->set(AttemptRecord{
.rc = -1, .executorLog = "No runners available for execution"});
return fut;
}
}
@@ -217,22 +216,20 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
}
if (submitted_runner.empty()) {
std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
auto fut = std::make_shared<Future<AttemptRecord>>();
fut->set(AttemptRecord{
.rc = -1, .executorLog = "No runners available for execution"});
return fut;
}
RunningTask rt{.prom{},
RunningTask rt{.fut = std::make_shared<Future<AttemptRecord>>(),
.runID = runID,
.taskName = taskName,
.runnerURL = submitted_runner,
.retries = 3,
.resources = taskUsed};
auto fut = rt.prom.get_future();
TaskFuture fut = rt.fut;
std::lock_guard<std::mutex> lock(rtGuard_);
runningTasks_.emplace(std::make_pair(runID, taskName), std::move(rt));
@@ -293,34 +290,34 @@ void DaggyRunnerTaskExecutor::monitor()
rj::Document doc;
try {
auto [code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
if (code != HTTPCode::Ok)
if (code != HTTPCode::Ok) {
std::cout << "Unable to poll: " << code << ": " << dumpJSON(json)
<< std::endl;
continue;
}
doc.Swap(json);
}
catch (std::exception &e) {
std::cout << "Unable to poll: " << e.what() << std::endl;
continue;
}
std::cout << "Doc is now: " << doc.Size() << std::endl;
const auto tasks = doc.GetArray();
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
const auto &task = tasks[idx];
auto tid = std::make_pair(task["runID"].GetInt64(),
task["taskName"].GetString());
if (task["state"] == "PENDING") {
resolvedJobs.emplace(tid, std::nullopt);
auto it = taskResources.find(tid);
if (it != taskResources.end()) {
const auto &res = taskResources.at(tid);
caps.current.cores += res.cores;
caps.current.memoryMB += res.memoryMB;
}
else {
auto it = taskResources.find(tid);
if (it != taskResources.end()) {
const auto &res = taskResources.at(tid);
caps.current.cores += res.cores;
caps.current.memoryMB += res.memoryMB;
}
auto attempt = attemptRecordFromJSON(task["attempt"]);
resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"]));
}
auto attempt = attemptRecordFromJSON(task["attempt"]);
resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"]));
}
}
@@ -329,20 +326,11 @@ void DaggyRunnerTaskExecutor::monitor()
std::lock_guard<std::mutex> lock(rtGuard_);
for (auto &[taskID, task] : runningTasks_) {
auto it = resolvedJobs.find(taskID);
if (it == resolvedJobs.end()) {
--task.retries;
if (task.retries == 0) {
AttemptRecord record{
.rc = -1, .executorLog = "Unable to query runner for progress"};
task.prom.set_value(std::move(record));
completedTasks.emplace_back(taskID);
}
if (it == resolvedJobs.end())
continue;
}
else if (it->second.has_value()) {
if (it->second.has_value()) {
// Task has completed
task.prom.set_value(it->second.value());
task.fut->set(std::move(it->second.value()));
completedTasks.emplace_back(taskID);
}
}
@@ -351,6 +339,6 @@ void DaggyRunnerTaskExecutor::monitor()
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}

View File

@@ -90,8 +90,9 @@ bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
return true;
}
std::future<daggy::AttemptRecord> ForkingTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
TaskFuture ForkingTaskExecutor::execute(DAGRunID runID,
const std::string &taskName,
const Task &task)
{
std::string key = std::to_string(runID) + "_" + taskName;
std::lock_guard<std::mutex> lock(taskControlsGuard_);

View File

@@ -8,18 +8,20 @@ namespace daggy::executors::task {
return "NoopTaskExecutor";
}
std::future<daggy::AttemptRecord> NoopTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
TaskFuture NoopTaskExecutor::execute(DAGRunID runID,
const std::string &taskName,
const Task &task)
{
std::promise<daggy::AttemptRecord> promise;
auto ts = Clock::now();
promise.set_value(AttemptRecord{.startTime = ts,
.stopTime = ts,
.rc = 0,
.executorLog = taskName,
.outputLog = taskName,
.errorLog = taskName});
return promise.get_future();
auto ts = Clock::now();
auto fut = std::make_shared<Future<AttemptRecord>>();
fut->set(AttemptRecord{.startTime = ts,
.stopTime = ts,
.rc = 0,
.executorLog = taskName,
.outputLog = taskName,
.errorLog = taskName});
return fut;
}
bool NoopTaskExecutor::validateTaskParameters(const ConfigValues &job)

View File

@@ -87,8 +87,7 @@ namespace daggy::executors::task {
// Resolve the remaining futures
std::lock_guard<std::mutex> lock(promiseGuard_);
for (auto &[jobID, job] : runningJobs_) {
job.prom.set_value(
AttemptRecord{.rc = -1, .executorLog = "executor killed"});
job.fut->set(AttemptRecord{.rc = -1, .executorLog = "executor killed"});
}
runningJobs_.clear();
}
@@ -153,8 +152,9 @@ namespace daggy::executors::task {
return newValues;
}
std::future<AttemptRecord> SlurmTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
TaskFuture SlurmTaskExecutor::execute(DAGRunID runID,
const std::string &taskName,
const Task &task)
{
std::stringstream executorLog;
@@ -247,12 +247,12 @@ namespace daggy::executors::task {
slurm_free_submit_response_response_msg(resp_msg);
std::lock_guard<std::mutex> lock(promiseGuard_);
Job newJob{.prom{},
Job newJob{.fut = std::make_shared<Future<AttemptRecord>>(),
.stdoutFile = stdoutFile,
.stderrFile = stderrFile,
.runID = runID,
.taskName = taskName};
auto fut = newJob.prom.get_future();
auto fut = newJob.fut;
runningJobs_.emplace(jobID, std::move(newJob));
return fut;
@@ -348,7 +348,7 @@ namespace daggy::executors::task {
readAndClean(job.stdoutFile, record.outputLog);
readAndClean(job.stderrFile, record.errorLog);
job.prom.set_value(std::move(record));
job.fut->set(std::move(record));
resolvedJobs.insert(jobID);
}

View File

@@ -23,7 +23,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
auto rec = recFuture->get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
@@ -37,7 +37,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
auto rec = recFuture->get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
@@ -71,7 +71,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
auto rec = recFuture->get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() >= 6);
@@ -89,7 +89,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
"/usr/bin/expr", "1", "+", "+"}}}};
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
auto rec = recFuture->get();
REQUIRE(rec.rc == 2);
REQUIRE(rec.errorLog.size() >= 20);
@@ -106,7 +106,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
auto recFuture = ex.execute(0, "command", task);
std::this_thread::sleep_for(1s);
ex.stop(0, "command");
auto rec = recFuture.get();
auto rec = recFuture->get();
auto stop = daggy::Clock::now();
REQUIRE(rec.rc == 9);
@@ -133,7 +133,7 @@ TEST_CASE("forking_executor", "[forking_executor]")
"/usr/bin/cat", bigFile}}}};
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
auto rec = recFuture->get();
REQUIRE(rec.rc == 0);
REQUIRE(rec.outputLog.size() == std::filesystem::file_size(bigFile));

View File

@@ -23,7 +23,7 @@ TEST_CASE("noop_executor", "[noop_executor]")
REQUIRE(ex.validateTaskParameters(task.job));
auto recFuture = ex.execute(0, "command", task);
auto rec = recFuture.get();
auto rec = recFuture->get();
REQUIRE(rec.rc == 0);
}

View File

@@ -11,24 +11,24 @@ TEST_CASE("threadpool", "[threadpool]")
std::atomic<uint32_t> cnt(0);
ThreadPool tp(10);
std::vector<std::future<uint32_t>> rets;
std::vector<std::shared_ptr<Future<uint32_t>>> rets;
SECTION("Adding large tasks queues with return values")
{
std::vector<std::future<uint32_t>> res;
std::vector<std::shared_ptr<Future<uint32_t>>> res;
for (size_t i = 0; i < 100; ++i)
res.emplace_back(tp.addTask([&cnt]() {
cnt++;
return cnt.load();
}));
for (auto &r : res)
r.get();
r->get();
REQUIRE(cnt == 100);
}
SECTION("Slow runs")
{
std::vector<std::future<void>> res;
std::vector<std::shared_ptr<Future<void>>> res;
using namespace std::chrono_literals;
for (size_t i = 0; i < 100; ++i)
res.push_back(tp.addTask([&cnt]() {
@@ -37,7 +37,7 @@ TEST_CASE("threadpool", "[threadpool]")
return;
}));
for (auto &r : res)
r.get();
r->get();
REQUIRE(cnt == 100);
}
}