Lots of fixes to poor daggyr implementation, added debugging messages

This commit is contained in:
Ian Roddis
2021-12-21 16:49:45 -04:00
parent 505ffb02bd
commit d90f49b2bb
6 changed files with 142 additions and 106 deletions

View File

@@ -119,101 +119,99 @@ namespace dl = daggy::loggers::dag_run;
std::unique_ptr<dl::DAGRunLogger> loggerFactory(const rj::Value &config) std::unique_ptr<dl::DAGRunLogger> loggerFactory(const rj::Value &config)
{ {
if (config.HasMember("logger")) { if (!config.HasMember("logger"))
const auto &logConf = config["logger"]; return std::make_unique<dl::OStreamLogger>(std::cout);
if (!logConf.IsObject())
throw std::runtime_error("logger config is not an object");
if (!logConf.HasMember("name"))
throw std::runtime_error("logger config is missing logger name");
if (!logConf.HasMember("config"))
throw std::runtime_error("logger config is missing logger config");
std::string name = logConf["name"].GetString(); const auto &logConf = config["logger"];
const auto &logConfig = logConf["config"]; if (!logConf.IsObject())
if (name == "OStreamLogger") { throw std::runtime_error("logger config is not an object");
if (logConfig.HasMember("file")) { if (!logConf.HasMember("name"))
std::string fn = logConfig["file"].GetString(); throw std::runtime_error("logger config is missing logger name");
if (fn == "-") if (!logConf.HasMember("config"))
return std::make_unique<dl::OStreamLogger>(std::cout); throw std::runtime_error("logger config is missing logger config");
std::ofstream ofh(logConfig["file"].GetString()); std::string name = logConf["name"].GetString();
return std::make_unique<dl::OStreamLogger>(ofh); const auto &logConfig = logConf["config"];
} if (name == "OStreamLogger") {
if (logConfig.HasMember("file")) {
std::string fn = logConfig["file"].GetString();
if (fn == "-")
return std::make_unique<dl::OStreamLogger>(std::cout);
std::ofstream ofh(logConfig["file"].GetString());
return std::make_unique<dl::OStreamLogger>(ofh);
} }
#ifdef DAGGY_ENABLE_REDIS
else if (name == "RedisLogger") {
std::string host = "localhost";
uint16_t port = 6379;
std::string prefix = "daggy";
if (logConfig.HasMember("prefix"))
prefix = logConfig["prefix"].GetString();
if (logConfig.HasMember("host"))
host = logConfig["host"].GetString();
if (logConfig.HasMember("port"))
port = logConfig["port"].GetInt();
return std::make_unique<dl::RedisLogger>(prefix, host, port);
}
#endif
else
throw std::runtime_error("Unknown logger type: " + name);
} }
return std::make_unique<dl::OStreamLogger>(std::cout); #ifdef DAGGY_ENABLE_REDIS
else if (name == "RedisLogger") {
std::string host = "localhost";
uint16_t port = 6379;
std::string prefix = "daggy";
if (logConfig.HasMember("prefix"))
prefix = logConfig["prefix"].GetString();
if (logConfig.HasMember("host"))
host = logConfig["host"].GetString();
if (logConfig.HasMember("port"))
port = logConfig["port"].GetInt();
return std::make_unique<dl::RedisLogger>(prefix, host, port);
}
#endif
throw std::runtime_error("Unknown logger type: " + name);
} }
namespace de = daggy::executors::task; namespace de = daggy::executors::task;
std::unique_ptr<de::TaskExecutor> executorFactory(const rj::Value &config) std::unique_ptr<de::TaskExecutor> executorFactory(const rj::Value &config)
{ {
if (config.HasMember("executor")) { if (!config.HasMember("executor"))
const auto &execConf = config["executor"]; return std::make_unique<de::ForkingTaskExecutor>(10);
if (!execConf.IsObject())
throw std::runtime_error("Executor config is not an object");
if (!execConf.HasMember("name"))
throw std::runtime_error("Executor config is missing name");
if (!execConf.HasMember("config"))
throw std::runtime_error("Executor config is missing config");
std::string name = execConf["name"].GetString();
const auto &execConfig = execConf["config"];
if (name == "ForkingTaskExecutor") { const auto &execConf = config["executor"];
size_t threads = 10; if (!execConf.IsObject())
if (execConfig.HasMember("threads")) throw std::runtime_error("Executor config is not an object");
threads = execConfig["threads"].GetInt64(); if (!execConf.HasMember("name"))
return std::make_unique<de::ForkingTaskExecutor>(threads); throw std::runtime_error("Executor config is missing name");
} if (!execConf.HasMember("config"))
throw std::runtime_error("Executor config is missing config");
std::string name = execConf["name"].GetString();
const auto &execConfig = execConf["config"];
if (name == "ForkingTaskExecutor") {
size_t threads = 10;
if (execConfig.HasMember("threads"))
threads = execConfig["threads"].GetInt64();
return std::make_unique<de::ForkingTaskExecutor>(threads);
}
#ifdef DAGGY_ENABLE_SLURM #ifdef DAGGY_ENABLE_SLURM
else if (name == "SlurmTaskExecutor") { else if (name == "SlurmTaskExecutor") {
return std::make_unique<de::SlurmTaskExecutor>(); return std::make_unique<de::SlurmTaskExecutor>();
} }
#endif #endif
else if (name == "DaggyRunnerTaskExecutor") { else if (name == "DaggyRunnerTaskExecutor") {
if (!execConfig.HasMember("runners")) if (!execConfig.HasMember("runners"))
throw std::runtime_error( throw std::runtime_error(
"DaggyRunnerExecutor config needs at least one remote runner"); "DaggyRunnerExecutor config needs at least one remote runner");
auto exe = std::make_unique<de::DaggyRunnerTaskExecutor>(); auto exe = std::make_unique<de::DaggyRunnerTaskExecutor>();
const auto &runners = execConfig["runners"]; const auto &runners = execConfig["runners"];
if (!runners.IsArray()) { if (!runners.IsArray())
throw std::runtime_error(
"DaggyRunnerExecutor runners must be an array of urls");
for (size_t i = 0; i < runners.Size(); ++i) {
if (!runners[i].IsString())
throw std::runtime_error( throw std::runtime_error(
"DaggyRunnerExecutor runners must be an array of urls"); "DaggyRunnerExecutor runners must be an array of urls");
exe->addRunner(runners[i].GetString());
for (size_t i = 0; i < runners.Size(); ++i) {
if (!runners[i].IsString())
throw std::runtime_error(
"DaggyRunnerExecutor runners must be an array of urls");
exe->addRunner(runners[i].GetString());
}
return exe;
}
} }
else
throw std::runtime_error("Unknown executor type: " + name); return exe;
} }
return std::make_unique<de::ForkingTaskExecutor>(10); throw std::runtime_error("Unknown executor type: " + name);
} }
int main(int argc, char **argv) int main(int argc, char **argv)

View File

@@ -159,6 +159,8 @@ namespace daggy::daggyr {
.resourcesUsed = resourcesUsed}); .resourcesUsed = resourcesUsed});
} }
std::cout << "Enqueued " << runID << " / " << taskName << std::endl;
response.send(Pistache::Http::Code::Ok, ""); response.send(Pistache::Http::Code::Ok, "");
} }
@@ -175,6 +177,7 @@ namespace daggy::daggyr {
std::string payload; std::string payload;
// Check to see if it's pending
bool found = false; bool found = false;
{ {
std::lock_guard<std::mutex> lock(pendingGuard_); std::lock_guard<std::mutex> lock(pendingGuard_);
@@ -193,11 +196,13 @@ namespace daggy::daggyr {
curCapacity_.cores += it->second.resourcesUsed.cores; curCapacity_.cores += it->second.resourcesUsed.cores;
curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB; curCapacity_.memoryMB += it->second.resourcesUsed.memoryMB;
} }
std::cout << "Resolved " << it->first.first << " / "
<< it->first.second << std::endl;
} }
else { }
payload = R"({ "state": "RUNNING" })"; else {
found = true; payload = R"({ "state": "RUNNING" })";
} found = true;
} }
} }
@@ -211,6 +216,7 @@ namespace daggy::daggyr {
payload = R"({ "state": "COMPLETED", "attempt": )" + payload = R"({ "state": "COMPLETED", "attempt": )" +
attemptRecordToJSON(it->second) + "}"; attemptRecordToJSON(it->second) + "}";
} }
response.send(Pistache::Http::Code::Ok, payload); response.send(Pistache::Http::Code::Ok, payload);
} }

View File

@@ -55,6 +55,7 @@ namespace daggy::executors::task {
DAGRunID runID; DAGRunID runID;
std::string taskName; std::string taskName;
std::string runnerURL; std::string runnerURL;
uint32_t retries;
}; };
// Resolves jobs through polling // Resolves jobs through polling

View File

@@ -106,8 +106,12 @@ namespace daggy {
taskAttemptCounts_[taskName] = 1; taskAttemptCounts_[taskName] = 1;
logger_.updateTaskState(runID_, taskName, RunState::RUNNING); logger_.updateTaskState(runID_, taskName, RunState::RUNNING);
runningTasks_.emplace(taskName, try {
executor_.execute(runID_, taskName, task)); auto fut = executor_.execute(runID_, taskName, task);
runningTasks_.emplace(taskName, std::move(fut));
}
catch (std::exception &e) {
}
++nRunningTasks_; ++nRunningTasks_;
auto nextTask = dag_.visitNext(); auto nextTask = dag_.visitNext();

View File

@@ -234,6 +234,7 @@ namespace daggy {
curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2);
if (trace) { if (trace) {
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);

View File

@@ -135,6 +135,7 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
// Capacities for a runner can be negative, meaning that they're currently // Capacities for a runner can be negative, meaning that they're currently
// oversubscribed. // oversubscribed.
std::vector<std::pair<std::string, double>> impacts; std::vector<std::pair<std::string, double>> impacts;
for (const auto &runner : runners_) { for (const auto &runner : runners_) {
try { try {
const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity"); const auto &[code, doc] = JSON_HTTP_REQUEST(runner + "/v1/capacity");
@@ -158,8 +159,14 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
} }
} }
if (impacts.empty()) if (impacts.empty()) {
throw std::runtime_error("No runners available for execution"); std::promise<AttemptRecord> prom;
auto fut = prom.get_future();
AttemptRecord record{.rc = -1,
.executorLog = "No runners available for execution"};
prom.set_value(std::move(record));
return fut;
}
auto cit = impacts.begin(); auto cit = impacts.begin();
for (auto it = impacts.begin(); it != impacts.end(); ++it) { for (auto it = impacts.begin(); it != impacts.end(); ++it) {
@@ -167,8 +174,19 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
cit = it; cit = it;
} }
RunningTask rt{ std::stringstream ss;
.prom{}, .runID = runID, .taskName = taskName, .runnerURL = cit->first}; ss << cit->first << "/v1/task/" << runID << "/" << taskName;
auto url = ss.str();
const auto response = HTTP_REQUEST(url, taskToJSON(task), "POST");
if (response.code != HTTPCode::Ok)
throw std::runtime_error("Unable to submit task: " + response.body);
RunningTask rt{.prom{},
.runID = runID,
.taskName = taskName,
.runnerURL = cit->first,
.retries = 3};
auto fut = rt.prom.get_future(); auto fut = rt.prom.get_future();
@@ -194,34 +212,42 @@ void DaggyRunnerTaskExecutor::monitor()
{ {
std::vector<std::pair<DAGRunID, std::string>> resolvedJobs; std::vector<std::pair<DAGRunID, std::string>> resolvedJobs;
std::lock_guard<std::mutex> lock(rtGuard_); {
for (auto &[taskID, task] : runningTasks_) { std::lock_guard<std::mutex> lock(rtGuard_);
try { for (auto &[taskID, task] : runningTasks_) {
const auto &[code, json] = JSON_HTTP_REQUEST( try {
task.runnerURL + "/v1/task/" + std::to_string(taskID.first) + const auto &[code, json] = JSON_HTTP_REQUEST(
"/" + taskID.second); task.runnerURL + "/v1/task/" + std::to_string(taskID.first) +
if (code != HTTPCode::Ok) { "/" + taskID.second);
AttemptRecord record{ if (code != HTTPCode::Ok) {
.rc = -1, .executorLog = "Unable to query runner for progress"}; --task.retries;
task.prom.set_value(std::move(record));
resolvedJobs.emplace_back(taskID); if (task.retries == 0) {
AttemptRecord record{
.rc = -1,
.executorLog = "Unable to query runner for progress"};
task.prom.set_value(std::move(record));
resolvedJobs.emplace_back(taskID);
}
continue;
}
if (json["state"] == "COMPLETED") {
auto attempt = attemptRecordFromJSON(json["attempt"]);
task.prom.set_value(std::move(attempt));
resolvedJobs.emplace_back(taskID);
}
}
catch (std::runtime_error &e) {
continue; continue;
} }
if (json["state"] == "COMPLETED") {
task.prom.set_value(attemptRecordFromJSON(json["attempt"]));
resolvedJobs.emplace_back(taskID);
}
} }
catch (std::runtime_error &e) {
continue;
}
for (const auto &tid : resolvedJobs) { for (const auto &tid : resolvedJobs) {
runningTasks_.extract(tid); runningTasks_.extract(tid);
} }
} }
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(250));
} }
} }
} }