Adding SSH executor

This commit is contained in:
Ian Roddis
2022-01-14 15:39:57 -04:00
parent 9e7d78788b
commit 07646de2cd
4 changed files with 242 additions and 0 deletions

View File

@@ -12,6 +12,7 @@
// Add executors here
#include <daggy/executors/task/DaggyRunnerTaskExecutor.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/executors/task/SSHTaskExecutor.hpp>
#ifdef DAGGY_ENABLE_SLURM
#include <daggy/executors/task/SlurmTaskExecutor.hpp>
@@ -208,7 +209,45 @@ std::unique_ptr<de::TaskExecutor> executorFactory(const rj::Value &config)
"DaggyRunnerExecutor runners must be an array of urls");
exe->addRunner(runners[i].GetString());
}
}
else if (name == "SSHTaskExecutor") {
if (!execConfig.HasMember("hosts"))
throw std::runtime_error(
"SSHTaskExecutor config needs at least one host");
std::unordered_map<std::string,
daggy::executors::task::SSHTaskExecutor::RemoteHost>
remoteHosts;
const auto &hosts = execConfig["hosts"];
if (!hosts.IsObject())
throw std::runtime_error(
"SSHTaskExecutor hosts must be a dictionary of host => {cores, "
"memoryMB}");
for (auto it = hosts.MemberBegin(); it != hosts.MemberEnd(); ++it) {
if (!it->name.IsString())
throw std::runtime_error("Hostnames names must be a string.");
if (!it->value.IsObject())
throw std::runtime_error("Hostname definitions must be an object.");
const std::string hostName = it->name.GetString();
const auto &caps = it->value.GetObject();
if (!caps.HasMember("cores"))
throw std::runtime_error("Host " + hostName +
" is missing cores count.");
if (!caps.HasMember("memoryMB"))
throw std::runtime_error("Host " + hostName +
" is missing memoryMB size.");
size_t cores = caps["cores"].GetInt64();
size_t mem = caps["memoryMB"].GetInt64();
remoteHosts.emplace(hostName,
daggy::executors::task::SSHTaskExecutor::RemoteHost{
.cores = cores, .memoryMB = mem});
}
auto exe = std::make_unique<de::SSHTaskExecutor>(remoteHosts);
return exe;
}

View File

@@ -0,0 +1,60 @@
#pragma once
#include "ForkingTaskExecutor.hpp"
#include "TaskExecutor.hpp"
namespace daggy::executors::task {
class SSHTaskExecutor : public TaskExecutor
{
public:
struct RemoteHost
{
size_t cores;
size_t memoryMB;
};
using Command = std::vector<std::string>;
explicit SSHTaskExecutor(std::unordered_map<std::string, RemoteHost> hosts);
// Validates the job to ensure that all required values are set and are of
// the right type,
bool validateTaskParameters(const ConfigValues &job) override;
std::vector<ConfigValues> expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues) override;
// Runs the task
TaskFuture execute(DAGRunID runID, const std::string &taskName,
const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
std::string description() const override;
private:
struct RunningTask
{
std::string host;
size_t cores;
size_t memoryMB;
TaskFuture fut;
TaskFuture feFuture;
int sshRetries;
DAGRunID runID;
std::string taskName;
Task task;
};
std::unordered_map<std::string, RemoteHost> hosts_;
ForkingTaskExecutor fe_;
std::mutex hostGuard_;
std::condition_variable hostCV_;
std::deque<RunningTask> runningTasks_;
void monitor();
std::atomic<bool> running_;
std::thread monitorWorker_;
};
} // namespace daggy::executors::task

View File

@@ -3,4 +3,5 @@ target_sources(${PROJECT_NAME} PRIVATE
NoopTaskExecutor.cpp
ForkingTaskExecutor.cpp
DaggyRunnerTaskExecutor.cpp
SSHTaskExecutor.cpp
)

View File

@@ -0,0 +1,142 @@
#include <daggy/Utilities.hpp>
#include <daggy/executors/task/SSHTaskExecutor.hpp>
#include <iomanip>
#include <numeric>
using namespace daggy::executors::task;
SSHTaskExecutor::SSHTaskExecutor(
std::unordered_map<std::string, RemoteHost> hosts)
: hosts_(hosts)
, fe_(std::accumulate(
hosts_.begin(), hosts_.end(), 0UL,
[](size_t t, const auto &a) { return t + a.second.cores; }))
, running_(true)
, monitorWorker_(&SSHTaskExecutor::monitor, this)
{
}
std::string SSHTaskExecutor::description() const
{
std::stringstream ss;
ss << "SSHTaskExecutor with total cores on " << hosts_.size() << " hosts";
return ss.str();
}
bool SSHTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
return fe_.stop(runID, taskName);
}
TaskFuture SSHTaskExecutor::execute(DAGRunID runID, const std::string &taskName,
const Task &task)
{
std::vector<std::string> newCommand{"ssh"};
std::string user = "";
if (task.job.count("user") > 1)
user = std::get<std::string>(task.job.at("user")) + "@";
if (task.job.count("port") > 1) {
newCommand.push_back("-p");
newCommand.push_back(std::get<std::string>(task.job.at("port")));
}
size_t coresNeeded = std::stoull(std::get<std::string>(task.job.at("cores")));
size_t memoryMBNeeded =
std::stoull(std::get<std::string>(task.job.at("memoryMB")));
RemoteHost *host;
std::string hostname = "";
// Block until a host is found
std::unique_lock<std::mutex> lock(hostGuard_);
// Wait for a host to be available
hostCV_.wait(lock, [&] {
for (auto &r : hosts_) {
if (r.second.cores >= coresNeeded and
r.second.memoryMB >= memoryMBNeeded) {
host = &r.second;
hostname = r.first;
return true;
}
}
return false;
});
host->cores -= coresNeeded;
host->memoryMB -= memoryMBNeeded;
Task sshTask{task};
newCommand.push_back(user + hostname);
const auto oldCommand =
std::get<std::vector<std::string>>(task.job.at("command"));
std::copy(oldCommand.begin(), oldCommand.end(),
std::back_inserter(newCommand));
sshTask.job["command"] = newCommand;
RunningTask rt{
.host = hostname,
.cores = coresNeeded,
.memoryMB = memoryMBNeeded,
.fut = std::make_shared<Future<AttemptRecord>>(),
.feFuture = fe_.execute(runID, taskName, sshTask),
.sshRetries = 3,
.runID = runID,
.taskName = taskName,
.task = sshTask,
};
auto fut = rt.fut;
runningTasks_.emplace_back(std::move(rt));
return fut;
}
void SSHTaskExecutor::monitor()
{
while (running_) {
{
std::lock_guard<std::mutex> lock(hostGuard_);
while (!runningTasks_.empty() and runningTasks_.front().fut->ready())
runningTasks_.pop_front();
for (auto &rt : runningTasks_) {
if (rt.feFuture->ready() and !rt.fut->ready()) {
auto attempt = rt.feFuture->get();
// SSH is a bit flakey, but will error with 255 if it doesn't work
if (attempt.rc == 255) {
--rt.sshRetries;
if (rt.sshRetries > 0) {
/*
std::cout << "Resubmitting: " << rt.sshRetries;
for (const auto &i : std::get<std::vector<std::string>>(
rt.task.job.at("command")))
std::cout << " " << i;
std::cout << std::endl;
*/
rt.feFuture = fe_.execute(rt.runID, rt.taskName, rt.task);
continue;
}
}
rt.fut->set(rt.feFuture->get());
hosts_[rt.host].cores += rt.cores;
hosts_[rt.host].memoryMB += rt.memoryMB;
hostCV_.notify_one();
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
bool SSHTaskExecutor::validateTaskParameters(const ConfigValues &job)
{
return fe_.validateTaskParameters(job);
// TODO add in requirement for memory, cores, users, privkey, port
return true;
}
std::vector<daggy::ConfigValues> SSHTaskExecutor::expandTaskParameters(
const ConfigValues &job, const ConfigValues &expansionValues)
{
return fe_.expandTaskParameters(job, expansionValues);
}