Fixing daggyr issues when reporting on tasks with very large outputs

(>10kb).

Squashed commit of the following:

commit b87fa418b4aca78928186a8fa992bef701e044a4
Author: Ian Roddis <tech@kinesin.ca>
Date:   Mon Feb 14 12:55:34 2022 -0400

    removing memory leak

commit 5e284ab92dbea991262a08c0cd50d6fc2f912e3b
Author: Ian Roddis <tech@kinesin.ca>
Date:   Mon Feb 14 11:58:57 2022 -0400

    Speeding up serialization, fixing payload sizing issue on daggyr

commit e5e358820da4c2587741abdc3b6b103e5a4d4dd3
Author: Ian Roddis <tech@kinesin.ca>
Date:   Sun Feb 13 22:24:04 2022 -0400

    changing newlines to std::endl for flush goodness

commit 705ec86b75be947e64f4124ec8017cba2c8465e6
Author: Ian Roddis <tech@kinesin.ca>
Date:   Sun Feb 13 22:16:56 2022 -0400

    adding more logging

commit aa3db9c23e55da7a0523dc57e268b605ce8faac3
Author: Ian Roddis <tech@kinesin.ca>
Date:   Sun Feb 13 22:13:56 2022 -0400

    Adding threadid

commit 3b1a0f1333b2d43bc5ecad0746435504babbaa61
Author: Ian Roddis <tech@kinesin.ca>
Date:   Sun Feb 13 22:13:24 2022 -0400

    Adding some debugging

commit 804507e65251858fa597b7c27bcece8d8dfd589d
Author: Ian Roddis <tech@kinesin.ca>
Date:   Sun Feb 13 21:52:53 2022 -0400

    Removing curl global cleanup
This commit is contained in:
Ian Roddis
2022-02-15 11:22:21 -04:00
parent 0d365f3a7b
commit 71756d9ec2
13 changed files with 238 additions and 31 deletions

View File

@@ -22,4 +22,4 @@ file(MAKE_DIRECTORY ${PISTACHE_INCLUDE_DIR})
add_library(${PROJECT_NAME} SHARED IMPORTED) add_library(${PROJECT_NAME} SHARED IMPORTED)
add_dependencies(${PROJECT_NAME} PistacheDownload) add_dependencies(${PROJECT_NAME} PistacheDownload)
target_include_directories(${PROJECT_NAME} INTERFACE ${PISTACHE_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} INTERFACE ${PISTACHE_INCLUDE_DIR})
set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_LIB_DIR}/libpistache.a") set_target_properties(${PROJECT_NAME} PROPERTIES IMPORTED_LOCATION "${PISTACHE_LIB_DIR}/libpistache.a")

View File

@@ -257,6 +257,7 @@ std::unique_ptr<de::TaskExecutor> executorFactory(const rj::Value &config)
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
curl_global_init(CURL_GLOBAL_ALL);
argparse::ArgumentParser args("Daggy"); argparse::ArgumentParser args("Daggy");
args.add_argument("-v", "--verbose") args.add_argument("-v", "--verbose")
@@ -339,4 +340,5 @@ int main(int argc, char **argv)
std::this_thread::sleep_for(std::chrono::seconds(30)); std::this_thread::sleep_for(std::chrono::seconds(30));
} }
server.shutdown(); server.shutdown();
curl_global_cleanup();
} }

View File

@@ -126,6 +126,7 @@ daggy::GeneralLogger getLogger(const std::string &logFile,
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
curl_global_init(CURL_GLOBAL_ALL);
std::ios::sync_with_stdio(false); std::ios::sync_with_stdio(false);
argparse::ArgumentParser args("Daggy"); argparse::ArgumentParser args("Daggy");
@@ -216,4 +217,5 @@ int main(int argc, char **argv)
} }
server.shutdown(); server.shutdown();
logger.shutdown(); logger.shutdown();
curl_global_cleanup();
} }

View File

@@ -43,6 +43,7 @@ namespace daggy::daggyr {
private: private:
void createDescription(); void createDescription();
void serializeResults();
bool handleAuth(const Pistache::Rest::Request &request); bool handleAuth(const Pistache::Rest::Request &request);
@@ -59,11 +60,17 @@ namespace daggy::daggyr {
GeneralLogger &logger_; GeneralLogger &logger_;
executors::task::ForkingTaskExecutor executor_; executors::task::ForkingTaskExecutor executor_;
std::thread serializer_;
std::atomic<bool> running_;
using TaskID = std::pair<DAGRunID, std::string>; using TaskID = std::pair<DAGRunID, std::string>;
Capacity maxCapacity_; Capacity maxCapacity_;
std::mutex rtGuard_; std::mutex rtGuard_;
std::unordered_map<TaskID, daggy::executors::task::TaskFuture> std::unordered_map<TaskID, daggy::executors::task::TaskFuture>
runningTasks_; runningTasks_;
std::mutex resultsGuard_;
std::unordered_map<TaskID, Future<std::string>>
results_;
}; };
} // namespace daggy::daggyr } // namespace daggy::daggyr

View File

@@ -25,8 +25,8 @@ namespace daggy::daggyr {
.threads(threads) .threads(threads)
.flags(Pistache::Tcp::Options::ReuseAddr | .flags(Pistache::Tcp::Options::ReuseAddr |
Pistache::Tcp::Options::ReusePort) Pistache::Tcp::Options::ReusePort)
.maxRequestSize(102400) .maxRequestSize(4294967296)
.maxResponseSize(102400); .maxResponseSize(4294967296);
endpoint_.init(opts); endpoint_.init(opts);
createDescription(); createDescription();
} }
@@ -37,6 +37,8 @@ namespace daggy::daggyr {
, desc_("Daggy Runner API", "0.1") , desc_("Daggy Runner API", "0.1")
, logger_(logger) , logger_(logger)
, executor_(maxCores) , executor_(maxCores)
, serializer_(&Server::serializeResults, this)
, running_(true)
, maxCapacity_{maxCores, maxMemoryMB} , maxCapacity_{maxCores, maxMemoryMB}
{ {
} }
@@ -63,6 +65,40 @@ namespace daggy::daggyr {
void Server::shutdown() void Server::shutdown()
{ {
endpoint_.shutdown(); endpoint_.shutdown();
running_ = false;
serializer_.join();
}
void Server::serializeResults() {
using Node = std::unordered_map<TaskID, daggy::executors::task::TaskFuture>::node_type;
std::vector<TaskID> ready;
std::vector<Node> nodes;
while (running_) {
ready.clear();
nodes.clear();
std::this_thread::sleep_for(1s);
{
std::lock_guard<std::mutex> rtLock(rtGuard_);
for (const auto & [taskid, fut] : runningTasks_) {
if (fut->ready())
ready.push_back(taskid);
}
for (const auto & tid : ready) {
nodes.emplace_back(runningTasks_.extract(tid));
}
}
// Insert the results
{
std::lock_guard<std::mutex> resultsLock(resultsGuard_);
for (const auto & node : nodes) {
auto json = attemptRecordToJSON(node.mapped()->get());
results_[node.key()].set(json);
}
}
}
} }
uint16_t Server::getPort() const uint16_t Server::getPort() const
@@ -153,10 +189,15 @@ namespace daggy::daggyr {
auto tid = std::make_pair(runID, taskName); auto tid = std::make_pair(runID, taskName);
auto fut = executor_.execute(runID, taskName, task); auto fut = executor_.execute(runID, taskName, task);
Future<std::string> strFut;
{ {
std::lock_guard<std::mutex> lock(rtGuard_); std::lock_guard<std::mutex> lock(rtGuard_);
runningTasks_.emplace(std::move(tid), std::move(fut)); runningTasks_.emplace(tid, std::move(fut));
}
{
std::lock_guard<std::mutex> lock(resultsGuard_);
results_[tid];
} }
logger_.debug(requestID + ": Task successfully enqueued"); logger_.debug(requestID + ": Task successfully enqueued");
@@ -175,42 +216,42 @@ namespace daggy::daggyr {
auto requestID = std::to_string(runID) + "/" + taskName; auto requestID = std::to_string(runID) + "/" + taskName;
auto taskID = std::make_pair(runID, taskName); auto taskID = std::make_pair(runID, taskName);
std::unordered_map<TaskID, daggy::executors::task::TaskFuture>::node_type std::unordered_map<TaskID, Future<std::string>>::node_type
node; node;
bool notFound = false; Pistache::Http::Code code = Pistache::Http::Code::Ok;
{ {
std::lock_guard<std::mutex> lock(rtGuard_); std::lock_guard<std::mutex> lock(resultsGuard_);
auto it = runningTasks_.find(taskID); auto it = results_.find(taskID);
if (it == runningTasks_.end()) { if (it == results_.end()) {
logger_.warn(requestID + ": Polled about unknown task"); logger_.warn(requestID + ": Polled about unknown task");
notFound = true; code = Pistache::Http::Code::Not_Found;
} }
else if (!it->second->ready()) { else if (!it->second.ready()) {
logger_.debug(requestID + ": Polled but task not ready yet"); logger_.debug(requestID + ": Polled but task not ready yet");
notFound = true; code = Pistache::Http::Code::Precondition_Required;
} }
else { else {
logger_.debug(requestID + ": Polled and ready."); logger_.debug(requestID + ": Polled and ready.");
node = runningTasks_.extract(taskID); node = results_.extract(taskID);
} }
} }
if (notFound) { if (code != Pistache::Http::Code::Ok) {
response.send(Pistache::Http::Code::Not_Found, ""); response.send(code, "");
return; return;
} }
auto prom = response.send(Pistache::Http::Code::Ok, std::string payload = node.mapped().get();
attemptRecordToJSON(node.mapped()->get())); auto prom = response.send(Pistache::Http::Code::Ok, payload);
// If the promise fails, then reinsert the result for later polling
if (prom.isRejected()) { if (prom.isRejected()) {
logger_.warn(requestID + logger_.warn(requestID +
": Record sent to poller, but failed to complete transfer."); ": Record sent to poller, but failed to complete transfer.");
std::lock_guard<std::mutex> lock(rtGuard_); std::lock_guard<std::mutex> lock(resultsGuard_);
runningTasks_.insert(std::move(node)); results_.insert(std::move(node));
} }
else { else {
logger_.debug(requestID + ": Record send successfully"); logger_.debug(requestID + ": Record send successfully. ");
} }
} }

View File

@@ -34,7 +34,6 @@ namespace daggy {
void set(const T val) void set(const T val)
{ {
if (val_) { if (val_) {
std::cout << "Future already has a value!" << std::endl;
throw std::runtime_error("Future already has a value"); throw std::runtime_error("Future already has a value");
} }
val_.emplace(val); val_.emplace(val);

View File

@@ -50,6 +50,7 @@ namespace daggy {
Ok = 200, Ok = 200,
Not_Found = 404, Not_Found = 404,
Not_Acceptable = 406, Not_Acceptable = 406,
Not_Ready = 428,
Server_Error = 500 Server_Error = 500
}; };

View File

@@ -23,6 +23,7 @@ namespace daggy {
std::string dumpJSON(const rj::Value &doc) std::string dumpJSON(const rj::Value &doc)
{ {
rj::StringBuffer buffer; rj::StringBuffer buffer;
buffer.Clear();
rj::Writer<rj::StringBuffer> writer(buffer); rj::Writer<rj::StringBuffer> writer(buffer);
doc.Accept(writer); doc.Accept(writer);
return buffer.GetString(); return buffer.GetString();
@@ -281,8 +282,68 @@ namespace daggy {
return os; return os;
} }
// From https://stackoverflow.com/questions/7724448/simple-json-string-escape-for-c
enum State {ESCAPED, UNESCAPED};
std::string escapeJSON(const std::string& input)
{
std::string output;
output.reserve(input.length());
for (std::string::size_type i = 0; i < input.length(); ++i)
{
switch (input[i]) {
case '"':
output += "\\\"";
break;
case '/':
output += "\\/";
break;
case '\b':
output += "\\b";
break;
case '\f':
output += "\\f";
break;
case '\n':
output += "\\n";
break;
case '\r':
output += "\\r";
break;
case '\t':
output += "\\t";
break;
case '\\':
output += "\\\\";
break;
default:
output += input[i];
break;
}
}
return output;
}
std::string attemptRecordToJSON(const AttemptRecord &record) std::string attemptRecordToJSON(const AttemptRecord &record)
{ {
std::stringstream ss;
ss << "{"
<< R"("startTime": ")" << timePointToString(record.startTime) << "\","
<< R"("stopTime": ")" << timePointToString(record.stopTime) << "\","
<< R"("rc": )" << record.rc << ","
<< R"("outputLog": ")" << escapeJSON(record.outputLog) << "\","
<< R"("errorLog": ")" << escapeJSON(record.errorLog) << "\","
<< R"("executorLog": ")" << escapeJSON(record.executorLog) << '"'
<< "}";
return ss.str();
}
/*
std::string attemptRecordToJSON(const AttemptRecord &record) {
rj::Document doc; rj::Document doc;
doc.SetObject(); doc.SetObject();
auto &alloc = doc.GetAllocator(); auto &alloc = doc.GetAllocator();
@@ -314,9 +375,9 @@ namespace daggy {
rj::Value().SetString(record.executorLog.c_str(), rj::Value().SetString(record.executorLog.c_str(),
record.executorLog.size(), alloc), record.executorLog.size(), alloc),
alloc); alloc);
return dumpJSON(doc); return dumpJSON(doc);
} }
*/
AttemptRecord attemptRecordFromJSON(const std::string &json) AttemptRecord attemptRecordFromJSON(const std::string &json)
{ {

View File

@@ -225,16 +225,17 @@ namespace daggy {
CURLcode res; CURLcode res;
struct curl_slist *headers = NULL; struct curl_slist *headers = NULL;
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init(); curl = curl_easy_init();
if (curl) { if (curl) {
std::stringstream buffer; std::stringstream buffer;
char errbuf[CURL_ERROR_SIZE];
curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 8); curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, &errbuf);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 8L);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
if (trace) { if (trace) {
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);
@@ -253,19 +254,19 @@ namespace daggy {
res = curl_easy_perform(curl); res = curl_easy_perform(curl);
if (res != CURLE_OK) { if (res != CURLE_OK) {
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
response.code = HTTPCode::Server_Error; response.code = HTTPCode::Server_Error;
response.body = std::string{"CURL Failed: "} + curl_easy_strerror(res); response.body = std::string{"CURL Failed: "} + curl_easy_strerror(res);
return response; return response;
} }
curl_easy_cleanup(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code);
curl_easy_cleanup(curl);
response.body = buffer.str(); response.body = buffer.str();
} else {
throw std::runtime_error("Unable to init cURL object");
} }
curl_global_cleanup();
return response; return response;
} }

View File

@@ -16,6 +16,7 @@ add_executable(${PROJECT_NAME} main.cpp
int_basic.cpp int_basic.cpp
# Performance checks # Performance checks
perf_dag.cpp perf_dag.cpp
perf_serialization.cpp
) )
target_link_libraries(${PROJECT_NAME} libdaggy stdc++fs Catch2::Catch2) target_link_libraries(${PROJECT_NAME} libdaggy stdc++fs Catch2::Catch2)

View File

@@ -49,7 +49,7 @@ const size_t MAX_CHILDREN = 10;
static auto DAG = createDAG(N_NODES, MAX_CHILDREN); static auto DAG = createDAG(N_NODES, MAX_CHILDREN);
TEST_CASE("massive DAGs", "[dag_performance]") TEST_CASE("massive DAGs", "[dag_performance][perf]")
{ {
BENCHMARK_ADVANCED("dag.reset")(Catch::Benchmark::Chronometer meter) BENCHMARK_ADVANCED("dag.reset")(Catch::Benchmark::Chronometer meter)
{ {

View File

@@ -0,0 +1,50 @@
#ifdef CATCH_CONFIG_ENABLE_BENCHMARKING
#include <catch2/catch.hpp>
#include <iostream>
#include <random>
#include <daggy/Serialization.hpp>
std::string random_string(std::size_t length)
{
const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz;\"'\n\t{}[]()";
std::random_device random_device;
std::mt19937 generator(random_device());
std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1);
std::string random_string;
for (std::size_t i = 0; i < length; ++i)
{
random_string += CHARACTERS[distribution(generator)];
}
return random_string;
}
using namespace daggy;
TEST_CASE("attempt output", "[serialize_attempt_performance][perf]")
{
size_t LOGSIZE = 512000;
// Need lots of data
AttemptRecord attempt{
.startTime = Clock::now(),
.stopTime = Clock::now(),
.rc = 0,
.executorLog = random_string(LOGSIZE),
.outputLog = random_string(LOGSIZE),
.errorLog = random_string(LOGSIZE)
};
BENCHMARK_ADVANCED("rj_dump")(Catch::Benchmark::Chronometer meter)
{
meter.measure([&] { return attemptRecordToJSON(attempt); });
};
}
#endif

View File

@@ -102,3 +102,45 @@ TEST_CASE("task_serialization", "[serialize_tasks]")
} }
} }
} }
TEST_CASE("attempt_serialization", "[serialize_attempt]")
{
SECTION("Serialize Simple Attempt")
{
using namespace daggy;
AttemptRecord attempt{
.startTime = Clock::now(),
.stopTime = Clock::now(),
.rc = 0,
.executorLog = "",
.outputLog = "",
.errorLog = "",
};
auto json = attemptRecordToJSON(attempt);
rj::Document doc;
REQUIRE_NOTHROW(checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord"));
}
SECTION("Serialize Attempt with complicated logs")
{
using namespace daggy;
AttemptRecord attempt{
.startTime = Clock::now(),
.stopTime = Clock::now(),
.rc = 0,
.executorLog = "",
.outputLog = "This is a testament to\nmore complicated, \"potentially quoted\"\nproblem\tspaces\n",
.errorLog = "",
};
auto json = attemptRecordToJSON(attempt);
rj::Document doc;
REQUIRE_NOTHROW(checkRJParse(doc.Parse(json.c_str()), "Parsing AttemptRecord"));
}
}