From ea3f67f226ad5abb01cf718746a44361db371807 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Fri, 4 Feb 2022 11:58:45 -0400 Subject: [PATCH] Adding a general logger and integrating it with daggyr --- daggyr/daggyr/daggyr.cpp | 51 ++++++++++++---- daggyr/libdaggyr/include/daggyr/Server.hpp | 4 +- daggyr/libdaggyr/src/Server.cpp | 40 +++++++++--- libdaggy/include/daggy/GeneralLogger.hpp | 71 ++++++++++++++++++++++ libdaggy/src/CMakeLists.txt | 1 + libdaggy/src/GeneralLogger.cpp | 66 ++++++++++++++++++++ libdaggy/tests/CMakeLists.txt | 1 + libdaggy/tests/unit_generallogger.cpp | 31 ++++++++++ 8 files changed, 245 insertions(+), 20 deletions(-) create mode 100644 libdaggy/include/daggy/GeneralLogger.hpp create mode 100644 libdaggy/src/GeneralLogger.cpp create mode 100644 libdaggy/tests/unit_generallogger.cpp diff --git a/daggyr/daggyr/daggyr.cpp b/daggyr/daggyr/daggyr.cpp index 999a9c7..5ce87e8 100644 --- a/daggyr/daggyr/daggyr.cpp +++ b/daggyr/daggyr/daggyr.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -109,8 +110,29 @@ void daemonize() } } +daggy::GeneralLogger getLogger(const std::string &logFile, + daggy::LogLevel level, std::ofstream &ofh) +{ + if (logFile == "-") { + std::cout << "Logging to cout" << std::endl; + return daggy::GeneralLogger(std::cout, level); + } + if (logFile == "") { + std::cout << "No logging" << std::endl; + return daggy::GeneralLogger(std::cout, daggy::LogLevel::NONE); + } + + std::cout << "Logging to " << logFile << std::endl; + ofh.open(logFile, std::ios::ate | std::ios::binary); + if (!ofh.good()) { + throw std::runtime_error("Unable to open log file " + logFile); + } + return daggy::GeneralLogger(ofh, level); +} + int main(int argc, char **argv) { + std::ios::sync_with_stdio(false); argparse::ArgumentParser args("Daggy"); args.add_argument("-v", "--verbose") @@ -118,6 +140,8 @@ int main(int argc, char **argv) .implicit_value(true); args.add_argument("-d", "--daemon").default_value(false).implicit_value(true); args.add_argument("--config").default_value(std::string{}); + args.add_argument("--log-file").default_value(std::string{}); + args.add_argument("--log-level").default_value(std::string{"NONE"}); args.add_argument("--ip").default_value(std::string{"127.0.0.1"}); args.add_argument("--port").default_value(2504u).action( [](const std::string &value) -> unsigned { return std::stoul(value); }); @@ -135,9 +159,10 @@ int main(int argc, char **argv) sysinfo(&systemInfo); - bool verbose = args.get("--verbose"); bool asDaemon = args.get("--daemon"); auto configFile = args.get("--config"); + auto logFile = args.get("--log-file"); + auto logLevel = args.get("--log-level"); std::string listenIP = args.get("--ip"); auto listenPort = args.get("--port"); size_t webThreads = 50; @@ -156,6 +181,10 @@ int main(int argc, char **argv) if (doc.HasMember("ip")) listenIP = doc["ip"].GetString(); + if (doc.HasMember("log-file")) + logFile = doc["log-file"].GetString(); + if (doc.HasMember("log-level")) + logLevel = doc["log-level"].GetString(); if (doc.HasMember("port")) listenPort = doc["port"].GetInt(); if (doc.HasMember("web-threads")) @@ -169,22 +198,21 @@ int main(int argc, char **argv) } } - if (verbose) { - std::cout << "Server running at http://" << listenIP << ':' << listenPort - << std::endl - << "Max Cores: " << maxCores << std::endl - << "Max Memory: " << maxMemoryMB << " MB" << std::endl - << "Max Web Clients: " << webThreads << std::endl - << std::endl - << "Ctrl-C to exit" << std::endl; - } + std::ofstream ofh; + auto logger = + getLogger(logFile, daggy::LogLevel::_from_string(logLevel.c_str()), ofh); + logger.info("Server running at http://" + listenIP + ':' + + std::to_string(listenPort)); + logger.info("Max Cores: " + std::to_string(maxCores)); + logger.info("Max Cores: " + std::to_string(maxMemoryMB)); + logger.info("Web Threads: " + std::to_string(webThreads)); if (asDaemon) { daemonize(); } Pistache::Address listenSpec(listenIP, listenPort); - daggy::daggyr::Server server(listenSpec, maxCores, maxMemoryMB); + daggy::daggyr::Server server(listenSpec, maxCores, maxMemoryMB, logger); server.init(webThreads); server.start(); @@ -193,4 +221,5 @@ int main(int argc, char **argv) std::this_thread::sleep_for(std::chrono::seconds(30)); } server.shutdown(); + logger.shutdown(); } diff --git a/daggyr/libdaggyr/include/daggyr/Server.hpp b/daggyr/libdaggyr/include/daggyr/Server.hpp index fba0861..d515b58 100644 --- a/daggyr/libdaggyr/include/daggyr/Server.hpp +++ b/daggyr/libdaggyr/include/daggyr/Server.hpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -25,7 +26,7 @@ namespace daggy::daggyr { { public: Server(const Pistache::Address &listenSpec, ssize_t maxCores, - ssize_t maxMemoryMB); + ssize_t maxMemoryMB, GeneralLogger &logger); ~Server(); Server &setSSLCertificates(const fs::path &cert, const fs::path &key); @@ -56,6 +57,7 @@ namespace daggy::daggyr { Pistache::Rest::Description desc_; Pistache::Rest::Router router_; + GeneralLogger &logger_; executors::task::ForkingTaskExecutor executor_; using TaskID = std::pair; diff --git a/daggyr/libdaggyr/src/Server.cpp b/daggyr/libdaggyr/src/Server.cpp index a134e48..025d209 100644 --- a/daggyr/libdaggyr/src/Server.cpp +++ b/daggyr/libdaggyr/src/Server.cpp @@ -32,9 +32,10 @@ namespace daggy::daggyr { } Server::Server(const Pistache::Address &listenSpec, ssize_t maxCores, - ssize_t maxMemoryMB) + ssize_t maxMemoryMB, GeneralLogger &logger) : endpoint_(listenSpec) , desc_("Daggy Runner API", "0.1") + , logger_(logger) , executor_(maxCores) , maxCapacity_{maxCores, maxMemoryMB} { @@ -132,16 +133,21 @@ namespace daggy::daggyr { if (!handleAuth(request)) return; - auto runID = request.param(":runID").as(); - auto taskName = request.param(":taskName").as(); + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + auto requestID = std::to_string(runID) + "/" + taskName; + logger_.info("Received task " + requestID); Capacity resourcesUsed; Task task; + logger_.debug([&] { return requestID + ": " + taskToJSON(task); }); try { task = taskFromJSON(taskName, request.body()); resourcesUsed = capacityFromTask(task); } catch (std::exception &e) { + logger_.warn( + [&] { return requestID + ": Unable to parse task: " + e.what(); }); REQ_RESPONSE(Not_Acceptable, e.what()); } @@ -153,6 +159,8 @@ namespace daggy::daggyr { runningTasks_.emplace(std::move(tid), std::move(fut)); } + logger_.debug(requestID + ": Task successfully enqueued"); + response.send(Pistache::Http::Code::Ok, ""); } @@ -162,8 +170,9 @@ namespace daggy::daggyr { if (!handleAuth(request)) return; - auto runID = request.param(":runID").as(); - auto taskName = request.param(":taskName").as(); + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + auto requestID = std::to_string(runID) + "/" + taskName; auto taskID = std::make_pair(runID, taskName); std::unordered_map::node_type @@ -172,10 +181,16 @@ namespace daggy::daggyr { { std::lock_guard lock(rtGuard_); auto it = runningTasks_.find(taskID); - if (it == runningTasks_.end() || !it->second->ready()) { + if (it == runningTasks_.end()) { + logger_.warn(requestID + ": Polled about unknown task"); + notFound = true; + } + else if (!it->second->ready()) { + logger_.debug(requestID + ": Polled but task not ready yet"); notFound = true; } else { + logger_.debug(requestID + ": Polled and ready."); node = runningTasks_.extract(taskID); } } @@ -189,9 +204,14 @@ namespace daggy::daggyr { attemptRecordToJSON(node.mapped()->get())); // If the promise fails, then reinsert the result for later polling if (prom.isRejected()) { + logger_.warn(requestID + + ": Record sent to poller, but failed to complete transfer."); std::lock_guard lock(rtGuard_); runningTasks_.insert(std::move(node)); } + else { + logger_.debug(requestID + ": Record send successfully"); + } } void Server::handleStopTask(const Pistache::Rest::Request &request, @@ -200,9 +220,13 @@ namespace daggy::daggyr { if (!handleAuth(request)) return; - auto runID = request.param(":runID").as(); - auto taskName = request.param(":taskName").as(); + auto runID = request.param(":runID").as(); + auto taskName = request.param(":taskName").as(); + auto requestID = std::to_string(runID) + "/" + taskName; + logger_.debug([&] { + return std::to_string(runID) + "/" + taskName + ": Stop task received."; + }); executor_.stop(runID, taskName); REQ_RESPONSE(Ok, ""); diff --git a/libdaggy/include/daggy/GeneralLogger.hpp b/libdaggy/include/daggy/GeneralLogger.hpp new file mode 100644 index 0000000..a15e56b --- /dev/null +++ b/libdaggy/include/daggy/GeneralLogger.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "Defines.hpp" + +namespace daggy { + BETTER_ENUM(LogLevel, int, NONE = 0, ERROR, WARN, INFO, DEBUG); + + class GeneralLogger + { + public: + explicit GeneralLogger(std::ostream &os, LogLevel level = LogLevel::WARN); + ~GeneralLogger(); + + void setLevel(LogLevel level); + void log(const std::string &msg, LogLevel level); + + // Function will only be called if required + void log(const std::function &fun, LogLevel level); + + template + void error(const T &msg) + { + log(msg, LogLevel::ERROR); + } + template + void warn(const T &msg) + { + log(msg, LogLevel::WARN); + } + template + void info(const T &msg) + { + log(msg, LogLevel::INFO); + } + template + void debug(const T &msg) + { + log(msg, LogLevel::DEBUG); + } + + void shutdown(); + + private: + void emitMessages(); + + struct LogMessage + { + LogLevel level; + std::string msg; + TimePoint time; + }; + + std::atomic running_; + std::ostream &os_; + LogLevel level_; + + std::condition_variable newMessage_; + std::mutex messageGuard_; + std::deque messages_; + + std::thread messageEmiter_; + }; + +} // namespace daggy diff --git a/libdaggy/src/CMakeLists.txt b/libdaggy/src/CMakeLists.txt index 75f6ac4..c4ff5e6 100644 --- a/libdaggy/src/CMakeLists.txt +++ b/libdaggy/src/CMakeLists.txt @@ -2,6 +2,7 @@ target_sources(${PROJECT_NAME} PRIVATE Serialization.cpp Utilities.cpp DAGRunner.cpp + GeneralLogger.cpp ) add_subdirectory(executors) diff --git a/libdaggy/src/GeneralLogger.cpp b/libdaggy/src/GeneralLogger.cpp new file mode 100644 index 0000000..3db85f2 --- /dev/null +++ b/libdaggy/src/GeneralLogger.cpp @@ -0,0 +1,66 @@ +#include +#include + +namespace daggy { + GeneralLogger::GeneralLogger(std::ostream &os, LogLevel level) + : running_(true) + , os_(os) + , level_(level) + , messageEmiter_(&GeneralLogger::emitMessages, this) + { + } + + GeneralLogger::~GeneralLogger() + { + shutdown(); + } + + void GeneralLogger::shutdown() + { + if (!running_) + return; + running_ = false; + newMessage_.notify_one(); + messageEmiter_.join(); + os_.flush(); + } + + void GeneralLogger::setLevel(LogLevel level) + { + level_ = level; + } + void GeneralLogger::log(const std::string &msg, LogLevel level) + { + if (level > level_) { + return; + } + { + std::lock_guard lock(messageGuard_); + messages_.emplace_back( + LogMessage{.level = level, .msg = msg, .time = Clock::now()}); + } + newMessage_.notify_one(); + } + + void GeneralLogger::log(const std::function &fun, + LogLevel level) + { + if (level > level_) + return; + log(fun(), level); + } + + void GeneralLogger::emitMessages() + { + while (running_ || !messages_.empty()) { + std::unique_lock lock(messageGuard_); + newMessage_.wait(lock, [&] { return !(messages_.empty() && running_); }); + for (const auto &msg : messages_) { + os_ << timePointToString(msg.time) << " [" << msg.level._to_string() + << "] " << msg.msg << '\n'; + } + os_.flush(); + messages_.clear(); + } + } +} // namespace daggy diff --git a/libdaggy/tests/CMakeLists.txt b/libdaggy/tests/CMakeLists.txt index 23c9a2d..6c96733 100644 --- a/libdaggy/tests/CMakeLists.txt +++ b/libdaggy/tests/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(${PROJECT_NAME} main.cpp unit_serialization.cpp unit_threadpool.cpp unit_utilities.cpp + unit_generallogger.cpp # integration tests int_basic.cpp # Performance checks diff --git a/libdaggy/tests/unit_generallogger.cpp b/libdaggy/tests/unit_generallogger.cpp new file mode 100644 index 0000000..c73ec30 --- /dev/null +++ b/libdaggy/tests/unit_generallogger.cpp @@ -0,0 +1,31 @@ +#include +#include +#include +#include + +using namespace daggy; + +TEST_CASE("General Logger", "[general_logger]") +{ + std::stringstream ss; + GeneralLogger logger(ss); + + SECTION("Logger logs a message") + { + std::string testMessage = "Test Message"; + logger.setLevel(LogLevel::INFO); + logger.warn(testMessage); + logger.shutdown(); + + auto captured = ss.str(); + REQUIRE(!captured.empty()); + REQUIRE(captured.find(testMessage) != std::string::npos); + } + + SECTION("Logger does not emit messages of higher levels") + { + logger.setLevel(LogLevel::INFO); + logger.debug("Test Message"); + REQUIRE(ss.str().empty()); + } +}