diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp index 4457a52..fd49247 100644 --- a/daggyr/libdaggyr/include/daggyr/Server.hpp +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -77,14 +77,15 @@ namespace daggy::daggyr { bool resolved; }; + std::mutex resolvedGuard_; + std::string resolved_; + size_t nResolved_; + void monitor(); std::atomic running_; std::thread monitorWorker_; std::mutex pendingGuard_; std::unordered_map pending_; - - std::mutex resolvedGuard_; - std::deque resolved_; }; } // namespace daggy::daggyr diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index c41808f..869d2cb 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -38,6 +38,8 @@ namespace daggy::daggyr { , executor_(maxCores) , maxCapacity_{maxCores, maxMemoryMB} , curCapacity_{maxCores, maxMemoryMB} + , resolved_("[") + , nResolved_(0) , running_(true) , monitorWorker_(&Server::monitor, this) { @@ -200,8 +202,16 @@ namespace daggy::daggyr { { std::lock_guard lock(resolvedGuard_); - for (const auto &[_, item] : payloads) - resolved_.push_back(item); + for (const auto &[_, item] : payloads) { + if (resolved_.empty()) { + resolved_ = "["; + } + + if (nResolved_ > 0) + resolved_ += ','; + resolved_ += item; + ++nResolved_; + } } std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -213,37 +223,16 @@ namespace daggy::daggyr { { if (!handleAuth(request)) return; - auto ss = Clock::now(); - - std::stringstream payload; - payload << "["; - bool first = true; - size_t cnt = 0; + std::string payload = "["; + payload.reserve(65536); { std::lock_guard lock(resolvedGuard_); - cnt = resolved_.size(); - for (const auto &item : resolved_) { - if (first) { - first = false; - } - else { - payload << ", "; - } - payload << item; - } - resolved_.clear(); + payload.swap(resolved_); + nResolved_ = 0; } - payload << "]"; + payload += "]"; - auto payloadStr = payload.str(); - response.send(Pistache::Http::Code::Ok, payloadStr); - auto ee = Clock::now(); - - std::cout - << "Completed request: with " << cnt << " updates in" - << " total (" - << std::chrono::duration_cast(ee - ss).count() - << " ns)\n"; + response.send(Pistache::Http::Code::Ok, payload); } void Server::handleStopTask(const Pistache::Rest::Request &request, diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index 7e2a349..9c52980 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -234,7 +234,7 @@ namespace daggy { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - // curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); diff --git a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp index 17479c8..296a558 100644 --- a/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp +++ b/libdaggy/src/executors/task/DaggyRunnerTaskExecutor.cpp @@ -266,6 +266,7 @@ void DaggyRunnerTaskExecutor::monitor() std::unordered_map runners; while (running_) { + std::this_thread::sleep_for(std::chrono::seconds(2)); std::unordered_map, std::optional> resolvedJobs; @@ -302,7 +303,11 @@ void DaggyRunnerTaskExecutor::monitor() std::cout << "Unable to poll: " << e.what() << std::endl; continue; } - std::cout << "Doc is now: " << doc.Size() << std::endl; + + if (!doc.IsArray()) { + std::cout << "Got nonsense from poll: " << dumpJSON(doc) << std::endl; + continue; + } const auto tasks = doc.GetArray(); for (size_t idx = 0; idx < tasks.Size(); ++idx) { @@ -338,7 +343,5 @@ void DaggyRunnerTaskExecutor::monitor() runningTasks_.extract(tid); } } - - std::this_thread::sleep_for(std::chrono::seconds(10)); } }