Adding a general logger and integrating it with daggyr

This commit is contained in:
Ian Roddis
2022-02-04 11:58:45 -04:00
parent 57e93b5045
commit ea3f67f226
8 changed files with 245 additions and 20 deletions

View File

@@ -5,6 +5,7 @@
#include <argparse.hpp>
#include <atomic>
#include <csignal>
#include <daggy/GeneralLogger.hpp>
#include <daggy/Serialization.hpp>
#include <daggyr/Server.hpp>
#include <fstream>
@@ -109,8 +110,29 @@ void daemonize()
}
}
daggy::GeneralLogger getLogger(const std::string &logFile,
daggy::LogLevel level, std::ofstream &ofh)
{
if (logFile == "-") {
std::cout << "Logging to cout" << std::endl;
return daggy::GeneralLogger(std::cout, level);
}
if (logFile == "") {
std::cout << "No logging" << std::endl;
return daggy::GeneralLogger(std::cout, daggy::LogLevel::NONE);
}
std::cout << "Logging to " << logFile << std::endl;
ofh.open(logFile, std::ios::ate | std::ios::binary);
if (!ofh.good()) {
throw std::runtime_error("Unable to open log file " + logFile);
}
return daggy::GeneralLogger(ofh, level);
}
int main(int argc, char **argv)
{
std::ios::sync_with_stdio(false);
argparse::ArgumentParser args("Daggy");
args.add_argument("-v", "--verbose")
@@ -118,6 +140,8 @@ int main(int argc, char **argv)
.implicit_value(true);
args.add_argument("-d", "--daemon").default_value(false).implicit_value(true);
args.add_argument("--config").default_value(std::string{});
args.add_argument("--log-file").default_value(std::string{});
args.add_argument("--log-level").default_value(std::string{"NONE"});
args.add_argument("--ip").default_value(std::string{"127.0.0.1"});
args.add_argument("--port").default_value(2504u).action(
[](const std::string &value) -> unsigned { return std::stoul(value); });
@@ -135,9 +159,10 @@ int main(int argc, char **argv)
sysinfo(&systemInfo);
bool verbose = args.get<bool>("--verbose");
bool asDaemon = args.get<bool>("--daemon");
auto configFile = args.get<std::string>("--config");
auto logFile = args.get<std::string>("--log-file");
auto logLevel = args.get<std::string>("--log-level");
std::string listenIP = args.get<std::string>("--ip");
auto listenPort = args.get<unsigned>("--port");
size_t webThreads = 50;
@@ -156,6 +181,10 @@ int main(int argc, char **argv)
if (doc.HasMember("ip"))
listenIP = doc["ip"].GetString();
if (doc.HasMember("log-file"))
logFile = doc["log-file"].GetString();
if (doc.HasMember("log-level"))
logLevel = doc["log-level"].GetString();
if (doc.HasMember("port"))
listenPort = doc["port"].GetInt();
if (doc.HasMember("web-threads"))
@@ -169,22 +198,21 @@ int main(int argc, char **argv)
}
}
if (verbose) {
std::cout << "Server running at http://" << listenIP << ':' << listenPort
<< std::endl
<< "Max Cores: " << maxCores << std::endl
<< "Max Memory: " << maxMemoryMB << " MB" << std::endl
<< "Max Web Clients: " << webThreads << std::endl
<< std::endl
<< "Ctrl-C to exit" << std::endl;
}
std::ofstream ofh;
auto logger =
getLogger(logFile, daggy::LogLevel::_from_string(logLevel.c_str()), ofh);
logger.info("Server running at http://" + listenIP + ':' +
std::to_string(listenPort));
logger.info("Max Cores: " + std::to_string(maxCores));
logger.info("Max Cores: " + std::to_string(maxMemoryMB));
logger.info("Web Threads: " + std::to_string(webThreads));
if (asDaemon) {
daemonize();
}
Pistache::Address listenSpec(listenIP, listenPort);
daggy::daggyr::Server server(listenSpec, maxCores, maxMemoryMB);
daggy::daggyr::Server server(listenSpec, maxCores, maxMemoryMB, logger);
server.init(webThreads);
server.start();
@@ -193,4 +221,5 @@ int main(int argc, char **argv)
std::this_thread::sleep_for(std::chrono::seconds(30));
}
server.shutdown();
logger.shutdown();
}

View File

@@ -5,6 +5,7 @@
#include <pistache/http.h>
#include <daggy/DAGRunner.hpp>
#include <daggy/GeneralLogger.hpp>
#include <daggy/ThreadPool.hpp>
#include <daggy/executors/task/DaggyRunnerTaskExecutor.hpp>
#include <daggy/executors/task/ForkingTaskExecutor.hpp>
@@ -25,7 +26,7 @@ namespace daggy::daggyr {
{
public:
Server(const Pistache::Address &listenSpec, ssize_t maxCores,
ssize_t maxMemoryMB);
ssize_t maxMemoryMB, GeneralLogger &logger);
~Server();
Server &setSSLCertificates(const fs::path &cert, const fs::path &key);
@@ -56,6 +57,7 @@ namespace daggy::daggyr {
Pistache::Rest::Description desc_;
Pistache::Rest::Router router_;
GeneralLogger &logger_;
executors::task::ForkingTaskExecutor executor_;
using TaskID = std::pair<DAGRunID, std::string>;

View File

@@ -32,9 +32,10 @@ namespace daggy::daggyr {
}
Server::Server(const Pistache::Address &listenSpec, ssize_t maxCores,
ssize_t maxMemoryMB)
ssize_t maxMemoryMB, GeneralLogger &logger)
: endpoint_(listenSpec)
, desc_("Daggy Runner API", "0.1")
, logger_(logger)
, executor_(maxCores)
, maxCapacity_{maxCores, maxMemoryMB}
{
@@ -132,16 +133,21 @@ namespace daggy::daggyr {
if (!handleAuth(request))
return;
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
auto requestID = std::to_string(runID) + "/" + taskName;
logger_.info("Received task " + requestID);
Capacity resourcesUsed;
Task task;
logger_.debug([&] { return requestID + ": " + taskToJSON(task); });
try {
task = taskFromJSON(taskName, request.body());
resourcesUsed = capacityFromTask(task);
}
catch (std::exception &e) {
logger_.warn(
[&] { return requestID + ": Unable to parse task: " + e.what(); });
REQ_RESPONSE(Not_Acceptable, e.what());
}
@@ -153,6 +159,8 @@ namespace daggy::daggyr {
runningTasks_.emplace(std::move(tid), std::move(fut));
}
logger_.debug(requestID + ": Task successfully enqueued");
response.send(Pistache::Http::Code::Ok, "");
}
@@ -162,8 +170,9 @@ namespace daggy::daggyr {
if (!handleAuth(request))
return;
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
auto requestID = std::to_string(runID) + "/" + taskName;
auto taskID = std::make_pair(runID, taskName);
std::unordered_map<TaskID, daggy::executors::task::TaskFuture>::node_type
@@ -172,10 +181,16 @@ namespace daggy::daggyr {
{
std::lock_guard<std::mutex> lock(rtGuard_);
auto it = runningTasks_.find(taskID);
if (it == runningTasks_.end() || !it->second->ready()) {
if (it == runningTasks_.end()) {
logger_.warn(requestID + ": Polled about unknown task");
notFound = true;
}
else if (!it->second->ready()) {
logger_.debug(requestID + ": Polled but task not ready yet");
notFound = true;
}
else {
logger_.debug(requestID + ": Polled and ready.");
node = runningTasks_.extract(taskID);
}
}
@@ -189,9 +204,14 @@ namespace daggy::daggyr {
attemptRecordToJSON(node.mapped()->get()));
// If the promise fails, then reinsert the result for later polling
if (prom.isRejected()) {
logger_.warn(requestID +
": Record sent to poller, but failed to complete transfer.");
std::lock_guard<std::mutex> lock(rtGuard_);
runningTasks_.insert(std::move(node));
}
else {
logger_.debug(requestID + ": Record send successfully");
}
}
void Server::handleStopTask(const Pistache::Rest::Request &request,
@@ -200,9 +220,13 @@ namespace daggy::daggyr {
if (!handleAuth(request))
return;
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
auto runID = request.param(":runID").as<DAGRunID>();
auto taskName = request.param(":taskName").as<std::string>();
auto requestID = std::to_string(runID) + "/" + taskName;
logger_.debug([&] {
return std::to_string(runID) + "/" + taskName + ": Stop task received.";
});
executor_.stop(runID, taskName);
REQ_RESPONSE(Ok, "");

View File

@@ -0,0 +1,71 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <string>
#include "Defines.hpp"
namespace daggy {
BETTER_ENUM(LogLevel, int, NONE = 0, ERROR, WARN, INFO, DEBUG);
class GeneralLogger
{
public:
explicit GeneralLogger(std::ostream &os, LogLevel level = LogLevel::WARN);
~GeneralLogger();
void setLevel(LogLevel level);
void log(const std::string &msg, LogLevel level);
// Function will only be called if required
void log(const std::function<std::string()> &fun, LogLevel level);
template <typename T>
void error(const T &msg)
{
log(msg, LogLevel::ERROR);
}
template <typename T>
void warn(const T &msg)
{
log(msg, LogLevel::WARN);
}
template <typename T>
void info(const T &msg)
{
log(msg, LogLevel::INFO);
}
template <typename T>
void debug(const T &msg)
{
log(msg, LogLevel::DEBUG);
}
void shutdown();
private:
void emitMessages();
struct LogMessage
{
LogLevel level;
std::string msg;
TimePoint time;
};
std::atomic<bool> running_;
std::ostream &os_;
LogLevel level_;
std::condition_variable newMessage_;
std::mutex messageGuard_;
std::deque<LogMessage> messages_;
std::thread messageEmiter_;
};
} // namespace daggy

View File

@@ -2,6 +2,7 @@ target_sources(${PROJECT_NAME} PRIVATE
Serialization.cpp
Utilities.cpp
DAGRunner.cpp
GeneralLogger.cpp
)
add_subdirectory(executors)

View File

@@ -0,0 +1,66 @@
#include <daggy/GeneralLogger.hpp>
#include <daggy/Serialization.hpp>
namespace daggy {
GeneralLogger::GeneralLogger(std::ostream &os, LogLevel level)
: running_(true)
, os_(os)
, level_(level)
, messageEmiter_(&GeneralLogger::emitMessages, this)
{
}
GeneralLogger::~GeneralLogger()
{
shutdown();
}
void GeneralLogger::shutdown()
{
if (!running_)
return;
running_ = false;
newMessage_.notify_one();
messageEmiter_.join();
os_.flush();
}
void GeneralLogger::setLevel(LogLevel level)
{
level_ = level;
}
void GeneralLogger::log(const std::string &msg, LogLevel level)
{
if (level > level_) {
return;
}
{
std::lock_guard<std::mutex> lock(messageGuard_);
messages_.emplace_back(
LogMessage{.level = level, .msg = msg, .time = Clock::now()});
}
newMessage_.notify_one();
}
void GeneralLogger::log(const std::function<std::string()> &fun,
LogLevel level)
{
if (level > level_)
return;
log(fun(), level);
}
void GeneralLogger::emitMessages()
{
while (running_ || !messages_.empty()) {
std::unique_lock<std::mutex> lock(messageGuard_);
newMessage_.wait(lock, [&] { return !(messages_.empty() && running_); });
for (const auto &msg : messages_) {
os_ << timePointToString(msg.time) << " [" << msg.level._to_string()
<< "] " << msg.msg << '\n';
}
os_.flush();
messages_.clear();
}
}
} // namespace daggy

View File

@@ -11,6 +11,7 @@ add_executable(${PROJECT_NAME} main.cpp
unit_serialization.cpp
unit_threadpool.cpp
unit_utilities.cpp
unit_generallogger.cpp
# integration tests
int_basic.cpp
# Performance checks

View File

@@ -0,0 +1,31 @@
#include <catch2/catch.hpp>
#include <daggy/GeneralLogger.hpp>
#include <iostream>
#include <sstream>
using namespace daggy;
TEST_CASE("General Logger", "[general_logger]")
{
std::stringstream ss;
GeneralLogger logger(ss);
SECTION("Logger logs a message")
{
std::string testMessage = "Test Message";
logger.setLevel(LogLevel::INFO);
logger.warn(testMessage);
logger.shutdown();
auto captured = ss.str();
REQUIRE(!captured.empty());
REQUIRE(captured.find(testMessage) != std::string::npos);
}
SECTION("Logger does not emit messages of higher levels")
{
logger.setLevel(LogLevel::INFO);
logger.debug("Test Message");
REQUIRE(ss.str().empty());
}
}