Fixing issue with resolving
This commit is contained in:
@@ -204,9 +204,11 @@ namespace daggy::daggyr {
|
|||||||
}
|
}
|
||||||
std::cout << "Resolved " << it->runID << " / " << it->taskName
|
std::cout << "Resolved " << it->runID << " / " << it->taskName
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
it = pending_.erase(it);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
payload << R"("state": "PENDING")";
|
payload << R"("state": "PENDING")";
|
||||||
|
++it;
|
||||||
}
|
}
|
||||||
payload << "}";
|
payload << "}";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -234,7 +234,7 @@ namespace daggy {
|
|||||||
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
|
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
|
||||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
|
||||||
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
|
||||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
|
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2);
|
||||||
|
|
||||||
if (trace) {
|
if (trace) {
|
||||||
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);
|
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);
|
||||||
|
|||||||
@@ -149,7 +149,6 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
|
|||||||
|
|
||||||
double impact =
|
double impact =
|
||||||
std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB);
|
std::min(cores / caps.total.cores, memoryMB / caps.total.memoryMB);
|
||||||
std::cout << runner << ": " << impact << std::endl;
|
|
||||||
impacts.emplace_back(runner, impact);
|
impacts.emplace_back(runner, impact);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,10 +161,8 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
|
|||||||
return fut;
|
return fut;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::sort(impacts.begin(), impacts.end());
|
std::sort(impacts.begin(), impacts.end(),
|
||||||
for (const auto &[runner, impact] : impacts) {
|
[](const auto &a, const auto &b) { return a.second < b.second; });
|
||||||
std::cout << "\t" << runner << ": " << impact << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
runner = impacts.back().first;
|
runner = impacts.back().first;
|
||||||
auto &caps = runners_.at(runner);
|
auto &caps = runners_.at(runner);
|
||||||
@@ -173,8 +170,6 @@ std::future<AttemptRecord> DaggyRunnerTaskExecutor::execute(
|
|||||||
caps.current.memoryMB -= taskUsed.memoryMB;
|
caps.current.memoryMB -= taskUsed.memoryMB;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "Queuing on runner: " << runner << std::endl;
|
|
||||||
|
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << runner << "/v1/task/" << runID << "/" << taskName;
|
ss << runner << "/v1/task/" << runID << "/" << taskName;
|
||||||
auto url = ss.str();
|
auto url = ss.str();
|
||||||
@@ -221,80 +216,74 @@ void DaggyRunnerTaskExecutor::addRunner(const std::string &url)
|
|||||||
void DaggyRunnerTaskExecutor::monitor()
|
void DaggyRunnerTaskExecutor::monitor()
|
||||||
{
|
{
|
||||||
while (running_) {
|
while (running_) {
|
||||||
|
std::unordered_map<std::pair<DAGRunID, std::string>,
|
||||||
|
std::optional<AttemptRecord>>
|
||||||
|
resolvedJobs;
|
||||||
|
|
||||||
|
std::unordered_map<std::pair<DAGRunID, std::string>, Capacity>
|
||||||
|
taskResources;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unordered_map<std::pair<DAGRunID, std::string>,
|
std::lock_guard<std::mutex> lock(rtGuard_);
|
||||||
std::optional<AttemptRecord>>
|
for (const auto &[tid, info] : runningTasks_) {
|
||||||
resolvedJobs;
|
taskResources.emplace(tid, info.resources);
|
||||||
|
|
||||||
std::unordered_map<std::pair<DAGRunID, std::string>, Capacity>
|
|
||||||
taskResources;
|
|
||||||
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(rtGuard_);
|
|
||||||
for (const auto &[tid, info] : runningTasks_) {
|
|
||||||
taskResources.emplace(tid, info.resources);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(runnersGuard_);
|
std::lock_guard<std::mutex> lock(runnersGuard_);
|
||||||
for (auto &[runnerURL, caps] : runners_) {
|
for (auto &[runnerURL, caps] : runners_) {
|
||||||
try {
|
try {
|
||||||
const auto &[code, json] =
|
const auto &[code, json] = JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
|
||||||
JSON_HTTP_REQUEST(runnerURL + "/v1/poll");
|
if (code != HTTPCode::Ok)
|
||||||
if (code != HTTPCode::Ok)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
const auto tasks = json.GetArray();
|
|
||||||
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
|
|
||||||
const auto &task = tasks[idx];
|
|
||||||
if (task["state"] == "PENDING") {
|
|
||||||
resolvedJobs.emplace(
|
|
||||||
std::make_pair(task["runID"].GetInt64(),
|
|
||||||
task["taskName"].GetString()),
|
|
||||||
std::nullopt);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
auto tid = std::make_pair(task["runID"].GetInt64(),
|
|
||||||
task["taskName"].GetString());
|
|
||||||
const auto &res = taskResources.at(tid);
|
|
||||||
caps.current.cores += res.cores;
|
|
||||||
caps.current.memoryMB += res.memoryMB;
|
|
||||||
|
|
||||||
resolvedJobs.emplace(tid,
|
|
||||||
attemptRecordFromJSON(task["attempt"]));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (std::exception &e) {
|
|
||||||
std::cout << "Curl timeout failed for runner " << runnerURL << ": "
|
|
||||||
<< e.what() << std::endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<std::pair<DAGRunID, std::string>> completedTasks;
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(rtGuard_);
|
|
||||||
for (auto &[taskID, task] : runningTasks_) {
|
|
||||||
auto it = resolvedJobs.find(taskID);
|
|
||||||
if (it == resolvedJobs.end()) {
|
|
||||||
--task.retries;
|
|
||||||
|
|
||||||
if (task.retries == 0) {
|
|
||||||
AttemptRecord record{
|
|
||||||
.rc = -1,
|
|
||||||
.executorLog = "Unable to query runner for progress"};
|
|
||||||
task.prom.set_value(std::move(record));
|
|
||||||
completedTasks.emplace_back(taskID);
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
const auto tasks = json.GetArray();
|
||||||
|
for (size_t idx = 0; idx < tasks.Size(); ++idx) {
|
||||||
|
const auto &task = tasks[idx];
|
||||||
|
if (task["state"] == "PENDING") {
|
||||||
|
resolvedJobs.emplace(std::make_pair(task["runID"].GetInt64(),
|
||||||
|
task["taskName"].GetString()),
|
||||||
|
std::nullopt);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
auto tid = std::make_pair(task["runID"].GetInt64(),
|
||||||
|
task["taskName"].GetString());
|
||||||
|
const auto &res = taskResources.at(tid);
|
||||||
|
caps.current.cores += res.cores;
|
||||||
|
caps.current.memoryMB += res.memoryMB;
|
||||||
|
|
||||||
|
resolvedJobs.emplace(tid, attemptRecordFromJSON(task["attempt"]));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (it->second.has_value()) {
|
}
|
||||||
// Task has completed
|
catch (std::exception &e) {
|
||||||
task.prom.set_value(it->second.value());
|
std::cout << "Curl timeout failed for runner " << runnerURL << ": "
|
||||||
|
<< e.what() << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::pair<DAGRunID, std::string>> completedTasks;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(rtGuard_);
|
||||||
|
for (auto &[taskID, task] : runningTasks_) {
|
||||||
|
auto it = resolvedJobs.find(taskID);
|
||||||
|
if (it == resolvedJobs.end()) {
|
||||||
|
--task.retries;
|
||||||
|
|
||||||
|
if (task.retries == 0) {
|
||||||
|
AttemptRecord record{
|
||||||
|
.rc = -1, .executorLog = "Unable to query runner for progress"};
|
||||||
|
task.prom.set_value(std::move(record));
|
||||||
completedTasks.emplace_back(taskID);
|
completedTasks.emplace_back(taskID);
|
||||||
}
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if (it->second.has_value()) {
|
||||||
|
// Task has completed
|
||||||
|
task.prom.set_value(it->second.value());
|
||||||
|
completedTasks.emplace_back(taskID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (const auto &tid : completedTasks) {
|
for (const auto &tid : completedTasks) {
|
||||||
|
|||||||
Reference in New Issue
Block a user