From 71756d9ec2badc02e9d6b37b257ff4880d17378d Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Tue, 15 Feb 2022 11:22:21 -0400 Subject: [PATCH] Fixing daggyr issues when reporting on tasks with very large outputs (>10kb). Squashed commit of the following: commit b87fa418b4aca78928186a8fa992bef701e044a4 Author: Ian Roddis Date: Mon Feb 14 12:55:34 2022 -0400 removing memory leak commit 5e284ab92dbea991262a08c0cd50d6fc2f912e3b Author: Ian Roddis Date: Mon Feb 14 11:58:57 2022 -0400 Speeding up serialization, fixing payload sizing issue on daggyr commit e5e358820da4c2587741abdc3b6b103e5a4d4dd3 Author: Ian Roddis Date: Sun Feb 13 22:24:04 2022 -0400 changing newlines to std::endl for flush goodness commit 705ec86b75be947e64f4124ec8017cba2c8465e6 Author: Ian Roddis Date: Sun Feb 13 22:16:56 2022 -0400 adding more logging commit aa3db9c23e55da7a0523dc57e268b605ce8faac3 Author: Ian Roddis Date: Sun Feb 13 22:13:56 2022 -0400 Adding threadid commit 3b1a0f1333b2d43bc5ecad0746435504babbaa61 Author: Ian Roddis Date: Sun Feb 13 22:13:24 2022 -0400 Adding some debugging commit 804507e65251858fa597b7c27bcece8d8dfd589d Author: Ian Roddis Date: Sun Feb 13 21:52:53 2022 -0400 Removing curl global cleanup --- cmake/pistache.cmake | 2 +- daggyd/daggyd/daggyd.cpp | 2 + daggyr/daggyr/daggyr.cpp | 2 + daggyr/libdaggyr/include/daggyr/Server.hpp | 7 ++ daggyr/libdaggyr/src/Server.cpp | 81 ++++++++++++++++------ libdaggy/include/daggy/Future.hpp | 1 - libdaggy/include/daggy/Utilities.hpp | 1 + libdaggy/src/Serialization.cpp | 63 ++++++++++++++++- libdaggy/src/Utilities.cpp | 15 ++-- libdaggy/tests/CMakeLists.txt | 1 + libdaggy/tests/perf_dag.cpp | 2 +- libdaggy/tests/perf_serialization.cpp | 50 +++++++++++++ libdaggy/tests/unit_serialization.cpp | 42 +++++++++++ 13 files changed, 238 insertions(+), 31 deletions(-) create mode 100644 libdaggy/tests/perf_serialization.cpp diff --git a/cmake/pistache.cmake b/cmake/pistache.cmake index 3080bf9..8b5591d 100644 --- a/cmake/pistache.cmake +++ b/cmake/pistache.cmake @@ -22,4 +22,4 @@ file(MAKE_DIRECTORY ${PISTACHE_INCLUDE_DIR}) add_library(${PROJECT_NAME} SHARED IMPORTED) add_dependencies(${PROJECT_NAME} PistacheDownload) target_include_directories(${PROJECT_NAME} INTERFACE ${PISTACHE_INCLUDE_DIR}) -set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_LIB_DIR}/libpistache.a") \ No newline at end of file +set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_LIB_DIR}/libpistache.a") diff --git a/daggyd/daggyd/daggyd.cpp b/daggyd/daggyd/daggyd.cpp index 368842c..fe1e19f 100644 --- a/daggyd/daggyd/daggyd.cpp +++ b/daggyd/daggyd/daggyd.cpp @@ -257,6 +257,7 @@ std::unique_ptr executorFactory(const rj::Value &config) int main(int argc, char **argv) { + curl_global_init(CURL_GLOBAL_ALL); argparse::ArgumentParser args("Daggy"); args.add_argument("-v", "--verbose") @@ -339,4 +340,5 @@ int main(int argc, char **argv) std::this_thread::sleep_for(std::chrono::seconds(30)); } server.shutdown(); + curl_global_cleanup(); } diff --git a/daggyr/daggyr/daggyr.cpp b/daggyr/daggyr/daggyr.cpp index b6ba84c..ad7a2d2 100644 --- a/daggyr/daggyr/daggyr.cpp +++ b/daggyr/daggyr/daggyr.cpp @@ -126,6 +126,7 @@ daggy::GeneralLogger getLogger(const std::string &logFile, int main(int argc, char **argv) { + curl_global_init(CURL_GLOBAL_ALL); std::ios::sync_with_stdio(false); argparse::ArgumentParser args("Daggy"); @@ -216,4 +217,5 @@ int main(int argc, char **argv) } server.shutdown(); logger.shutdown(); + curl_global_cleanup(); } diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp index d515b58..a867ddd 100644 --- a/daggyr/libdaggyr/include/daggyr/Server.hpp +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -43,6 +43,7 @@ namespace daggy::daggyr { private: void createDescription(); + void serializeResults(); bool handleAuth(const Pistache::Rest::Request &request); @@ -59,11 +60,17 @@ namespace daggy::daggyr { GeneralLogger &logger_; executors::task::ForkingTaskExecutor executor_; + std::thread serializer_; + std::atomic running_; using TaskID = std::pair; Capacity maxCapacity_; std::mutex rtGuard_; std::unordered_map runningTasks_; + + std::mutex resultsGuard_; + std::unordered_map> + results_; }; } // namespace daggy::daggyr diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index 025d209..4c17cb2 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -25,8 +25,8 @@ namespace daggy::daggyr { .threads(threads) .flags(Pistache::Tcp::Options::ReuseAddr | Pistache::Tcp::Options::ReusePort) - .maxRequestSize(102400) - .maxResponseSize(102400); + .maxRequestSize(4294967296) + .maxResponseSize(4294967296); endpoint_.init(opts); createDescription(); } @@ -37,6 +37,8 @@ namespace daggy::daggyr { , desc_("Daggy Runner API", "0.1") , logger_(logger) , executor_(maxCores) + , serializer_(&Server::serializeResults, this) + , running_(true) , maxCapacity_{maxCores, maxMemoryMB} { } @@ -63,6 +65,40 @@ namespace daggy::daggyr { void Server::shutdown() { endpoint_.shutdown(); + running_ = false; + serializer_.join(); + } + + void Server::serializeResults() { + using Node = std::unordered_map::node_type; + + std::vector ready; + std::vector nodes; + while (running_) { + ready.clear(); + nodes.clear(); + std::this_thread::sleep_for(1s); + { + std::lock_guard rtLock(rtGuard_); + for (const auto & [taskid, fut] : runningTasks_) { + if (fut->ready()) + ready.push_back(taskid); + } + + for (const auto & tid : ready) { + nodes.emplace_back(runningTasks_.extract(tid)); + } + } + + // Insert the results + { + std::lock_guard resultsLock(resultsGuard_); + for (const auto & node : nodes) { + auto json = attemptRecordToJSON(node.mapped()->get()); + results_[node.key()].set(json); + } + } + } } uint16_t Server::getPort() const @@ -153,10 +189,15 @@ namespace daggy::daggyr { auto tid = std::make_pair(runID, taskName); auto fut = executor_.execute(runID, taskName, task); + Future strFut; { std::lock_guard lock(rtGuard_); - runningTasks_.emplace(std::move(tid), std::move(fut)); + runningTasks_.emplace(tid, std::move(fut)); + } + { + std::lock_guard lock(resultsGuard_); + results_[tid]; } logger_.debug(requestID + ": Task successfully enqueued"); @@ -175,42 +216,42 @@ namespace daggy::daggyr { auto requestID = std::to_string(runID) + "/" + taskName; auto taskID = std::make_pair(runID, taskName); - std::unordered_map::node_type + std::unordered_map>::node_type node; - bool notFound = false; + Pistache::Http::Code code = Pistache::Http::Code::Ok; { - std::lock_guard lock(rtGuard_); - auto it = runningTasks_.find(taskID); - if (it == runningTasks_.end()) { + std::lock_guard lock(resultsGuard_); + auto it = results_.find(taskID); + if (it == results_.end()) { logger_.warn(requestID + ": Polled about unknown task"); - notFound = true; + code = Pistache::Http::Code::Not_Found; } - else if (!it->second->ready()) { + else if (!it->second.ready()) { logger_.debug(requestID + ": Polled but task not ready yet"); - notFound = true; + code = Pistache::Http::Code::Precondition_Required; } else { logger_.debug(requestID + ": Polled and ready."); - node = runningTasks_.extract(taskID); + node = results_.extract(taskID); } } - if (notFound) { - response.send(Pistache::Http::Code::Not_Found, ""); + if (code != Pistache::Http::Code::Ok) { + response.send(code, ""); return; } - auto prom = response.send(Pistache::Http::Code::Ok, - attemptRecordToJSON(node.mapped()->get())); - // If the promise fails, then reinsert the result for later polling + std::string payload = node.mapped().get(); + auto prom = response.send(Pistache::Http::Code::Ok, payload); + if (prom.isRejected()) { logger_.warn(requestID + ": Record sent to poller, but failed to complete transfer."); - std::lock_guard lock(rtGuard_); - runningTasks_.insert(std::move(node)); + std::lock_guard lock(resultsGuard_); + results_.insert(std::move(node)); } else { - logger_.debug(requestID + ": Record send successfully"); + logger_.debug(requestID + ": Record send successfully. "); } } diff --git a/libdaggy/include/daggy/Future.hpp b/libdaggy/include/daggy/Future.hpp index 771e536..c0c0cb8 100644 --- a/libdaggy/include/daggy/Future.hpp +++ b/libdaggy/include/daggy/Future.hpp @@ -34,7 +34,6 @@ namespace daggy { void set(const T val) { if (val_) { - std::cout << "Future already has a value!" << std::endl; throw std::runtime_error("Future already has a value"); } val_.emplace(val); diff --git a/libdaggy/include/daggy/Utilities.hpp b/libdaggy/include/daggy/Utilities.hpp index 47a4af2..fa5ce2c 100644 --- a/libdaggy/include/daggy/Utilities.hpp +++ b/libdaggy/include/daggy/Utilities.hpp @@ -50,6 +50,7 @@ namespace daggy { Ok = 200, Not_Found = 404, Not_Acceptable = 406, + Not_Ready = 428, Server_Error = 500 }; diff --git a/libdaggy/src/Serialization.cpp b/libdaggy/src/Serialization.cpp index 0f6a673..1056a87 100644 --- a/libdaggy/src/Serialization.cpp +++ b/libdaggy/src/Serialization.cpp @@ -23,6 +23,7 @@ namespace daggy { std::string dumpJSON(const rj::Value &doc) { rj::StringBuffer buffer; + buffer.Clear(); rj::Writer writer(buffer); doc.Accept(writer); return buffer.GetString(); @@ -281,8 +282,68 @@ namespace daggy { return os; } + // From https://stackoverflow.com/questions/7724448/simple-json-string-escape-for-c + enum State {ESCAPED, UNESCAPED}; + std::string escapeJSON(const std::string& input) + { + std::string output; + output.reserve(input.length()); + + for (std::string::size_type i = 0; i < input.length(); ++i) + { + switch (input[i]) { + case '"': + output += "\\\""; + break; + case '/': + output += "\\/"; + break; + case '\b': + output += "\\b"; + break; + case '\f': + output += "\\f"; + break; + case '\n': + output += "\\n"; + break; + case '\r': + output += "\\r"; + break; + case '\t': + output += "\\t"; + break; + case '\\': + output += "\\\\"; + break; + default: + output += input[i]; + break; + } + + } + + return output; + } + std::string attemptRecordToJSON(const AttemptRecord &record) { + std::stringstream ss; + + ss << "{" + << R"("startTime": ")" << timePointToString(record.startTime) << "\"," + << R"("stopTime": ")" << timePointToString(record.stopTime) << "\"," + << R"("rc": )" << record.rc << "," + << R"("outputLog": ")" << escapeJSON(record.outputLog) << "\"," + << R"("errorLog": ")" << escapeJSON(record.errorLog) << "\"," + << R"("executorLog": ")" << escapeJSON(record.executorLog) << '"' + << "}"; + + return ss.str(); + } + + /* + std::string attemptRecordToJSON(const AttemptRecord &record) { rj::Document doc; doc.SetObject(); auto &alloc = doc.GetAllocator(); @@ -314,9 +375,9 @@ namespace daggy { rj::Value().SetString(record.executorLog.c_str(), record.executorLog.size(), alloc), alloc); - return dumpJSON(doc); } + */ AttemptRecord attemptRecordFromJSON(const std::string &json) { diff --git a/libdaggy/src/Utilities.cpp b/libdaggy/src/Utilities.cpp index 9d18add..494e993 100644 --- a/libdaggy/src/Utilities.cpp +++ b/libdaggy/src/Utilities.cpp @@ -225,16 +225,17 @@ namespace daggy { CURLcode res; struct curl_slist *headers = NULL; - curl_global_init(CURL_GLOBAL_ALL); - curl = curl_easy_init(); if (curl) { std::stringstream buffer; + char errbuf[CURL_ERROR_SIZE]; curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 8); + curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, &errbuf); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 8L); + curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); if (trace) { curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); @@ -253,19 +254,19 @@ namespace daggy { res = curl_easy_perform(curl); if (res != CURLE_OK) { + curl_easy_cleanup(curl); response.code = HTTPCode::Server_Error; response.body = std::string{"CURL Failed: "} + curl_easy_strerror(res); return response; } - curl_easy_cleanup(curl); - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code); + curl_easy_cleanup(curl); response.body = buffer.str(); + } else { + throw std::runtime_error("Unable to init cURL object"); } - curl_global_cleanup(); - return response; } diff --git a/libdaggy/tests/CMakeLists.txt b/libdaggy/tests/CMakeLists.txt index 8c8d730..5b269ad 100644 --- a/libdaggy/tests/CMakeLists.txt +++ b/libdaggy/tests/CMakeLists.txt @@ -16,6 +16,7 @@ add_executable(${PROJECT_NAME} main.cpp int_basic.cpp # Performance checks perf_dag.cpp + perf_serialization.cpp ) target_link_libraries(${PROJECT_NAME} libdaggy stdc++fs Catch2::Catch2) diff --git a/libdaggy/tests/perf_dag.cpp b/libdaggy/tests/perf_dag.cpp index f315257..9b8447c 100644 --- a/libdaggy/tests/perf_dag.cpp +++ b/libdaggy/tests/perf_dag.cpp @@ -49,7 +49,7 @@ const size_t MAX_CHILDREN = 10; static auto DAG = createDAG(N_NODES, MAX_CHILDREN); -TEST_CASE("massive DAGs", "[dag_performance]") +TEST_CASE("massive DAGs", "[dag_performance][perf]") { BENCHMARK_ADVANCED("dag.reset")(Catch::Benchmark::Chronometer meter) { diff --git a/libdaggy/tests/perf_serialization.cpp b/libdaggy/tests/perf_serialization.cpp new file mode 100644 index 0000000..d5cde3a --- /dev/null +++ b/libdaggy/tests/perf_serialization.cpp @@ -0,0 +1,50 @@ +#ifdef CATCH_CONFIG_ENABLE_BENCHMARKING + +#include +#include +#include + +#include + +std::string random_string(std::size_t length) +{ + const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz;\"'\n\t{}[]()"; + + std::random_device random_device; + std::mt19937 generator(random_device()); + std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); + + std::string random_string; + + for (std::size_t i = 0; i < length; ++i) + { + random_string += CHARACTERS[distribution(generator)]; + } + + return random_string; +} + +using namespace daggy; + +TEST_CASE("attempt output", "[serialize_attempt_performance][perf]") +{ + size_t LOGSIZE = 512000; + // Need lots of data + AttemptRecord attempt{ + .startTime = Clock::now(), + .stopTime = Clock::now(), + .rc = 0, + .executorLog = random_string(LOGSIZE), + .outputLog = random_string(LOGSIZE), + .errorLog = random_string(LOGSIZE) + }; + + + BENCHMARK_ADVANCED("rj_dump")(Catch::Benchmark::Chronometer meter) + { + meter.measure([&] { return attemptRecordToJSON(attempt); }); + }; + +} + +#endif diff --git a/libdaggy/tests/unit_serialization.cpp b/libdaggy/tests/unit_serialization.cpp index 37b2109..13faf7a 100644 --- a/libdaggy/tests/unit_serialization.cpp +++ b/libdaggy/tests/unit_serialization.cpp @@ -102,3 +102,45 @@ TEST_CASE("task_serialization", "[serialize_tasks]") } } } + +TEST_CASE("attempt_serialization", "[serialize_attempt]") +{ + SECTION("Serialize Simple Attempt") + { + using namespace daggy; + + AttemptRecord attempt{ + .startTime = Clock::now(), + .stopTime = Clock::now(), + .rc = 0, + .executorLog = "", + .outputLog = "", + .errorLog = "", + }; + + auto json = attemptRecordToJSON(attempt); + + rj::Document doc; + REQUIRE_NOTHROW(checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord")); + } + + SECTION("Serialize Attempt with complicated logs") + { + using namespace daggy; + + AttemptRecord attempt{ + .startTime = Clock::now(), + .stopTime = Clock::now(), + .rc = 0, + .executorLog = "", + .outputLog = "This is a testament to\nmore complicated, \"potentially quoted\"\nproblem\tspaces\n", + .errorLog = "", + }; + + auto json = attemptRecordToJSON(attempt); + + rj::Document doc; + REQUIRE_NOTHROW(checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord")); + } + +}