Implement a bunch of clang-tidy suggested changes. Remove FilesystemLogger

This commit is contained in:
Ian Roddis
2021-09-22 10:30:27 -03:00
parent 288ce28d29
commit e7b15b3847
25 changed files with 82 additions and 397 deletions

View File

@@ -23,8 +23,8 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
struct Vertex struct Vertex
{ {
RunState state; RunState state = RunState::QUEUED;
uint32_t depCount; uint32_t depCount = 0;
V data; V data;
std::unordered_set<K> children; std::unordered_set<K> children;
}; };
@@ -32,8 +32,6 @@ namespace daggy {
template <typename K, typename V> template <typename K, typename V>
class DAG class DAG
{ {
using Edge = std::pair<K, K>;
public: public:
// Vertices // Vertices
void addVertex(K id, V data); void addVertex(K id, V data);
@@ -41,21 +39,19 @@ namespace daggy {
std::unordered_set<K> getVertices() const; std::unordered_set<K> getVertices() const;
// Edges // Edges
void addEdge(const K &src, const K &dst); void addEdge(const K &from, const K &to);
void addEdgeIf(const K &src, void addEdgeIf(const K &src,
std::function<bool(const Vertex<K, V> &v)> predicate); std::function<bool(const Vertex<K, V> &v)> predicate);
bool isValid() const; [[nodiscard]] bool isValid() const;
bool hasVertex(const K &from); bool hasVertex(const K &id);
const std::vector<Edge> &getEdges();
// Attributes // Attributes
size_t size() const; [[nodiscard]] size_t size() const;
bool empty() const; [[nodiscard]] bool empty() const;
// Reset the DAG to completely unvisited // Reset the DAG to completely unvisited
void reset(); void reset();
@@ -63,14 +59,12 @@ namespace daggy {
// Reset any vertex with RUNNING state to QUEUED // Reset any vertex with RUNNING state to QUEUED
void resetRunning(); void resetRunning();
RunState getVertexState(const K &id) const;
void setVertexState(const K &id, RunState state); void setVertexState(const K &id, RunState state);
void forEach( void forEach(
std::function<void(const std::pair<K, Vertex<K, V>> &)> fun) const; std::function<void(const std::pair<K, Vertex<K, V>> &)> fun) const;
bool allVisited() const; [[nodiscard]] bool allVisited() const;
std::optional<std::pair<K, V>> visitNext(); std::optional<std::pair<K, V>> visitNext();

View File

@@ -167,16 +167,10 @@ namespace daggy {
} }
template <typename K, typename V> template <typename K, typename V>
void DAG<K, V>::forEach(std::function<void(const std::pair<K, Vertex<K, V>> &) void DAG<K, V>::forEach(
std::function<void(const std::pair<K, Vertex<K, V>> &)> fun) const
>
fun) const
{ {
for (auto it = vertices_.begin(); it != vertices_. for (auto it = vertices_.begin(); it != vertices_.end(); ++it) {
end();
++it) {
fun(*it); fun(*it);
} }
} }

View File

@@ -27,11 +27,9 @@ namespace daggy {
{ {
} }
Server &setWebHandlerThreads(size_t nThreads);
Server &setSSLCertificates(const fs::path &cert, const fs::path &key); Server &setSSLCertificates(const fs::path &cert, const fs::path &key);
void init(int threads = 1); void init(size_t threads = 1);
void start(); void start();

View File

@@ -130,7 +130,7 @@ namespace daggy {
} }
if (tqit_ == taskQueues_.end()) if (tqit_ == taskQueues_.end())
tqit_ = taskQueues_.begin(); tqit_ = taskQueues_.begin();
task = std::move((*tqit_)->pop()); task = (*tqit_)->pop();
if ((*tqit_)->empty()) { if ((*tqit_)->empty()) {
tqit_ = taskQueues_.erase(tqit_); tqit_ = taskQueues_.erase(tqit_);
} }
@@ -160,7 +160,7 @@ namespace daggy {
return fut; return fut;
} }
void addTasks(std::shared_ptr<TaskQueue> tq) void addTasks(std::shared_ptr<TaskQueue> &tq)
{ {
if (drain_) if (drain_)
throw std::runtime_error("Unable to add task to draining pool"); throw std::runtime_error("Unable to add task to draining pool");
@@ -170,7 +170,7 @@ namespace daggy {
} }
private: private:
// need to keep track of threads so we can join them // need to keep track of threads, so we can join them
std::vector<std::thread> workers_; std::vector<std::thread> workers_;
// the task queue // the task queue
std::list<std::shared_ptr<TaskQueue>> taskQueues_; std::list<std::shared_ptr<TaskQueue>> taskQueues_;

View File

@@ -10,7 +10,7 @@ namespace daggy::executors::task {
public: public:
using Command = std::vector<std::string>; using Command = std::vector<std::string>;
ForkingTaskExecutor(size_t nThreads) explicit ForkingTaskExecutor(size_t nThreads)
: tp_(nThreads) : tp_(nThreads)
{ {
} }

View File

@@ -9,7 +9,7 @@ namespace daggy::executors::task {
using Command = std::vector<std::string>; using Command = std::vector<std::string>;
SlurmTaskExecutor(); SlurmTaskExecutor();
~SlurmTaskExecutor(); ~SlurmTaskExecutor() override;
// Validates the job to ensure that all required values are set and are of // Validates the job to ensure that all required values are set and are of
// the right type, // the right type,

View File

@@ -11,7 +11,7 @@
be supported. be supported.
*/ */
namespace daggy { namespace loggers { namespace dag_run { namespace daggy::loggers::dag_run {
class DAGRunLogger class DAGRunLogger
{ {
public: public:
@@ -20,10 +20,10 @@ namespace daggy { namespace loggers { namespace dag_run {
// Execution // Execution
virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0; virtual DAGRunID startDAGRun(std::string name, const TaskSet &tasks) = 0;
virtual void addTask(DAGRunID dagRunID, const std::string taskName, virtual void addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) = 0; const Task &task) = 0;
virtual void updateTask(DAGRunID dagRunID, const std::string taskName, virtual void updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) = 0; const Task &task) = 0;
virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0; virtual void updateDAGRunState(DAGRunID dagRunID, RunState state) = 0;
@@ -39,4 +39,4 @@ namespace daggy { namespace loggers { namespace dag_run {
virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0; virtual DAGRunRecord getDAGRun(DAGRunID dagRunID) = 0;
}; };
}}} // namespace daggy::loggers::dag_run } // namespace daggy::loggers::dag_run

View File

@@ -1,70 +0,0 @@
#pragma once
#include <rapidjson/document.h>
#include <atomic>
#include <filesystem>
#include <mutex>
#include "DAGRunLogger.hpp"
#include "Defines.hpp"
namespace fs = std::filesystem;
namespace rj = rapidjson;
namespace daggy::loggers::dag_run {
/*
* This logger should only be used for debug purposes. It's not really
* optimized for querying, and will use a ton of inodes to track state.
*
* On the plus side, it's trivial to look at without using the API.
*
* Filesystem logger creates the following structure:
* {root}/
* runs/
* {runID}/
* meta.json --- Contains the DAG name, task definitions
* states.csv --- DAG state changes
* {taskName}/
* states.csv --- TASK state changes
* {attempt}/
* metadata.json --- timestamps and rc
* output.log
* error.log
* executor.log
*/
class FileSystemLogger : public DAGRunLogger
{
public:
FileSystemLogger(fs::path root);
// Execution
DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
void logTaskAttempt(DAGRunID, const std::string &taskName,
const AttemptRecord &attempt) override;
void updateTaskState(DAGRunID dagRunID, const std::string &taskName,
RunState state) override;
// Querying
std::vector<DAGRunSummary> getDAGs(uint32_t stateMask) override;
DAGRunRecord getDAGRun(DAGRunID dagRunID) override;
private:
fs::path root_;
std::atomic<DAGRunID> nextRunID_;
std::mutex lock_;
// std::unordered_map<fs::path, std::mutex> runLocks;
inline const fs::path getCurrentPath() const;
inline const fs::path getRunsRoot() const;
inline const fs::path getRunRoot(DAGRunID runID) const;
};
} // namespace daggy::loggers::dag_run

View File

@@ -6,7 +6,7 @@
#include "DAGRunLogger.hpp" #include "DAGRunLogger.hpp"
#include "Defines.hpp" #include "Defines.hpp"
namespace daggy { namespace loggers { namespace dag_run { namespace daggy::loggers::dag_run {
/* /*
* This logger should only be used for debug purposes. It doesn't actually log * This logger should only be used for debug purposes. It doesn't actually log
* anything, just prints stuff to stdout. * anything, just prints stuff to stdout.
@@ -14,15 +14,15 @@ namespace daggy { namespace loggers { namespace dag_run {
class OStreamLogger : public DAGRunLogger class OStreamLogger : public DAGRunLogger
{ {
public: public:
OStreamLogger(std::ostream &os); explicit OStreamLogger(std::ostream &os);
// Execution // Execution
DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override; DAGRunID startDAGRun(std::string name, const TaskSet &tasks) override;
void addTask(DAGRunID dagRunID, const std::string taskName, void addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) override; const Task &task) override;
void updateTask(DAGRunID dagRunID, const std::string taskName, void updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) override; const Task &task) override;
void updateDAGRunState(DAGRunID dagRunID, RunState state) override; void updateDAGRunState(DAGRunID dagRunID, RunState state) override;
@@ -48,4 +48,4 @@ namespace daggy { namespace loggers { namespace dag_run {
void _updateDAGRunState(DAGRunID dagRunID, RunState state); void _updateDAGRunState(DAGRunID dagRunID, RunState state);
}; };
}}} // namespace daggy::loggers::dag_run } // namespace daggy::loggers::dag_run

View File

@@ -77,9 +77,9 @@ namespace daggy {
} }
else { else {
ss << '['; ss << '[';
const auto &vals = std::get<std::vector<std::string>>(v); const auto &values = std::get<std::vector<std::string>>(v);
bool firstVal = true; bool firstVal = true;
for (const auto &val : vals) { for (const auto &val : values) {
if (firstVal) { if (firstVal) {
firstVal = false; firstVal = false;
} }
@@ -200,7 +200,7 @@ namespace daggy {
std::string taskToJSON(const Task &task) std::string taskToJSON(const Task &task)
{ {
std::stringstream ss; std::stringstream ss;
bool first = false; bool first;
ss << "{" ss << "{"
<< R"("maxRetries": )" << task.maxRetries << ',' << R"("maxRetries": )" << task.maxRetries << ','
@@ -283,7 +283,7 @@ namespace daggy {
TimePoint stringToTimePoint(const std::string &timeString) TimePoint stringToTimePoint(const std::string &timeString)
{ {
std::tm dt; std::tm dt{};
std::stringstream ss{timeString}; std::stringstream ss{timeString};
ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z"); ss >> std::get_time(&dt, "%Y-%m-%d %H:%M:%S %Z");
return Clock::from_time_t(mktime(&dt)); return Clock::from_time_t(mktime(&dt));

View File

@@ -13,7 +13,7 @@ namespace rj = rapidjson;
using namespace Pistache; using namespace Pistache;
namespace daggy { namespace daggy {
void Server::init(int threads) void Server::init(size_t threads)
{ {
auto opts = Http::Endpoint::options() auto opts = Http::Endpoint::options()
.threads(threads) .threads(threads)
@@ -33,6 +33,12 @@ namespace daggy {
endpoint_.serveThreaded(); endpoint_.serveThreaded();
} }
Server &Server::setSSLCertificates(const fs::path &cert, const fs::path &key)
{
endpoint_.useSSL(cert, key);
return *this;
}
void Server::shutdown() void Server::shutdown()
{ {
endpoint_.shutdown(); endpoint_.shutdown();
@@ -47,8 +53,9 @@ namespace daggy {
{ {
desc_.info().license("MIT", "https://opensource.org/licenses/MIT"); desc_.info().license("MIT", "https://opensource.org/licenses/MIT");
auto backendErrorResponse = desc_.response( auto backendErrorResponse =
Http::Code::Internal_Server_Error, "An error occured with the backend"); desc_.response(Http::Code::Internal_Server_Error,
"An error occurred with the backend");
desc_.schemes(Rest::Scheme::Http) desc_.schemes(Rest::Scheme::Http)
.basePath("/v1") .basePath("/v1")
@@ -212,8 +219,8 @@ namespace daggy {
if (!request.hasParam(":runID")) { if (!request.hasParam(":runID")) {
REQ_ERROR(Not_Found, "No runID provided in URL"); REQ_ERROR(Not_Found, "No runID provided in URL");
} }
DAGRunID runID = request.param(":runID").as<size_t>(); auto runID = request.param(":runID").as<size_t>();
auto run = logger_.getDAGRun(runID); auto run = logger_.getDAGRun(runID);
bool first = true; bool first = true;
std::stringstream ss; std::stringstream ss;

View File

@@ -42,7 +42,7 @@ namespace daggy {
else { else {
for (const auto &val : for (const auto &val :
std::get<std::vector<std::string>>(paramValue)) { std::get<std::vector<std::string>>(paramValue)) {
for (auto cmd : expandedPart) { for (const auto &cmd : expandedPart) {
newExpandedPart.push_back(globalSub(cmd, param, val)); newExpandedPart.push_back(globalSub(cmd, param, val));
} }
} }

View File

@@ -1,5 +1,5 @@
target_sources(${PROJECT_NAME} PRIVATE target_sources(${PROJECT_NAME} PRIVATE
ForkingTaskExecutor.cpp SlurmTaskExecutor.cpp
SlurmTaskExecutor.cpp NoopTaskExecutor.cpp
NoopTaskExecutor.cpp ForkingTaskExecutor.cpp
) )

View File

@@ -2,12 +2,11 @@
#include <stdexcept> #include <stdexcept>
#ifdef DAGGY_ENABLE_SLURM #ifdef DAGGY_ENABLE_SLURM
#include <slurm/slurm.h> #include <slurm/slurm.h>
#include <stdlib.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <cstdlib>
#include <daggy/Utilities.hpp> #include <daggy/Utilities.hpp>
#include <daggy/executors/task/SlurmTaskExecutor.hpp> #include <daggy/executors/task/SlurmTaskExecutor.hpp>
#include <filesystem> #include <filesystem>
@@ -223,7 +222,6 @@ namespace daggy::executors::task {
case JOB_SUSPENDED: case JOB_SUSPENDED:
case JOB_RUNNING: case JOB_RUNNING:
continue; continue;
break;
// Job has finished // Job has finished
case JOB_COMPLETE: /* completed execution successfully */ case JOB_COMPLETE: /* completed execution successfully */
case JOB_FAILED: /* completed execution unsuccessfully */ case JOB_FAILED: /* completed execution unsuccessfully */

View File

@@ -1,4 +1,3 @@
target_sources(${PROJECT_NAME} PRIVATE target_sources(${PROJECT_NAME} PRIVATE
FileSystemLogger.cpp OStreamLogger.cpp
OStreamLogger.cpp )
)

View File

@@ -1,212 +0,0 @@
#include <enum.h>
#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <daggy/loggers/dag_run/FileSystemLogger.hpp>
#include <fstream>
#include <iomanip>
namespace fs = std::filesystem;
using namespace daggy::loggers::dag_run;
namespace daggy {
inline const fs::path FileSystemLogger::getCurrentPath() const
{
return root_ / "current";
}
inline const fs::path FileSystemLogger::getRunsRoot() const
{
return root_ / "runs";
}
inline const fs::path FileSystemLogger::getRunRoot(DAGRunID runID) const
{
return getRunsRoot() / std::to_string(runID);
}
FileSystemLogger::FileSystemLogger(fs::path root)
: root_(root)
, nextRunID_(0)
{
const std::vector<fs::path> reqPaths{root_, getCurrentPath(),
getRunsRoot()};
for (const auto &path : reqPaths) {
if (!fs::exists(path)) {
fs::create_directories(path);
}
}
// Get the next run ID
for (auto &dir : fs::directory_iterator(getRunsRoot())) {
try {
size_t runID = std::stoull(dir.path().stem());
if (runID > nextRunID_)
nextRunID_ = runID + 1;
}
catch (std::exception &e) {
continue;
}
}
}
// Execution
DAGRunID FileSystemLogger::startDAGRun(std::string name, const TaskSet &tasks)
{
DAGRunID runID = nextRunID_++;
// TODO make this threadsafe
fs::path runDir = getRunRoot(runID);
// std::lock_guard<std::mutex> guard(runLocks[runDir]);
// Init the directory
fs::path runRoot = getRunsRoot() / std::to_string(runID);
fs::create_directories(runRoot);
// Create meta.json with DAGRun Name and task definitions
std::ofstream ofh(runRoot / "metadata.json",
std::ios::trunc | std::ios::binary);
ofh << R"({ "name": )" << std::quoted(name) << R"(, "tasks": )"
<< tasksToJSON(tasks) << "}\n";
ofh.close();
// Task directories
for (const auto &[name, task] : tasks) {
auto taskDir = runRoot / name;
fs::create_directories(taskDir);
std::ofstream ofh(taskDir / "states.csv");
}
return runID;
}
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunID, RunState state)
{
std::ofstream ofh(getRunRoot(dagRunID) / "states.csv",
std::ios::binary | std::ios::app);
ofh << std::quoted(timePointToString(Clock::now())) << ','
<< state._to_string() << '\n';
ofh.flush();
ofh.close();
}
void FileSystemLogger::logTaskAttempt(DAGRunID dagRunID,
const std::string &taskName,
const AttemptRecord &attempt)
{
auto taskRoot = getRunRoot(dagRunID) / taskName;
size_t i = 1;
while (fs::exists(taskRoot / std::to_string(i))) {
++i;
}
auto attemptDir = taskRoot / std::to_string(i);
fs::create_directories(attemptDir);
std::ofstream ofh;
// Metadata
ofh.open(attemptDir / "metadata.json");
ofh << "{\n"
<< R"("startTime": )"
<< std::quoted(timePointToString(attempt.startTime)) << ",\n"
<< R"("stopTime": )" << std::quoted(timePointToString(attempt.stopTime))
<< ",\n"
<< R"("rc": )" << attempt.rc << '\n'
<< '}';
// output
ofh.open(attemptDir / "executor.log");
ofh << attempt.executorLog << std::flush;
ofh.close();
// Output
ofh.open(attemptDir / "output.log");
ofh << attempt.outputLog << std::flush;
ofh.close();
// Error
ofh.open(attemptDir / "error.log");
ofh << attempt.errorLog << std::flush;
ofh.close();
}
void FileSystemLogger::updateTaskState(DAGRunID dagRunID,
const std::string &taskName,
RunState state)
{
std::ofstream ofh(getRunRoot(dagRunID) / taskName / "states.csv",
std::ios::binary | std::ios::app);
ofh << std::quoted(timePointToString(Clock::now())) << ','
<< state._to_string() << '\n';
ofh.flush();
ofh.close();
}
// Querying
std::vector<DAGRunSummary> FileSystemLogger::getDAGs(uint32_t stateMask)
{
return {};
}
DAGRunRecord FileSystemLogger::getDAGRun(DAGRunID dagRunID)
{
DAGRunRecord record;
auto runRoot = getRunRoot(dagRunID);
if (!fs::exists(runRoot)) {
throw std::runtime_error("No DAGRun with that ID exists");
}
std::ifstream ifh(runRoot / "metadata.json", std::ios::binary);
std::string metaData;
std::getline(ifh, metaData, '\0');
ifh.close();
rj::Document doc;
doc.Parse(metaData.c_str());
record.name = doc["name"].GetString();
record.tasks = tasksFromJSON(doc["tasks"]);
// DAG State Changes
std::string line;
std::string token;
auto dagStateFile = runRoot / "states.csv";
ifh.open(dagStateFile);
while (std::getline(ifh, line)) {
std::stringstream ss{line};
std::string time;
std::string state;
std::getline(ss, time, ',');
std::getline(ss, state);
record.dagStateChanges.emplace_back(
DAGUpdateRecord{.time = stringToTimePoint(time),
.newState = RunState::_from_string(state.c_str())});
}
ifh.close();
// Task states
for (const auto &[taskName, task] : record.tasks) {
auto taskStateFile = runRoot / taskName / "states.csv";
if (!fs::exists(taskStateFile)) {
record.taskRunStates.emplace(taskName, RunState::QUEUED);
continue;
}
ifh.open(taskStateFile);
while (std::getline(ifh, line)) {
continue;
}
std::stringstream ss{line};
while (std::getline(ss, token, ',')) {
continue;
}
RunState taskState = RunState::_from_string(token.c_str());
record.taskRunStates.emplace(taskName, taskState);
ifh.close();
}
return record;
}
} // namespace daggy

View File

@@ -31,7 +31,7 @@ namespace daggy { namespace loggers { namespace dag_run {
return runID; return runID;
} }
void OStreamLogger::addTask(DAGRunID dagRunID, const std::string taskName, void OStreamLogger::addTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) const Task &task)
{ {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);
@@ -40,7 +40,7 @@ namespace daggy { namespace loggers { namespace dag_run {
_updateTaskState(dagRunID, taskName, RunState::QUEUED); _updateTaskState(dagRunID, taskName, RunState::QUEUED);
} }
void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string taskName, void OStreamLogger::updateTask(DAGRunID dagRunID, const std::string &taskName,
const Task &task) const Task &task)
{ {
std::lock_guard<std::mutex> lock(guard_); std::lock_guard<std::mutex> lock(guard_);

View File

@@ -1,8 +1,6 @@
#include <catch2/catch.hpp> #include <catch2/catch.hpp>
#include <iostream> #include <iostream>
#include "daggy/DAG.hpp"
TEST_CASE("General tests", "[general]") TEST_CASE("General tests", "[general]")
{ {
REQUIRE(1 == 1); REQUIRE(1 == 1);

View File

@@ -53,14 +53,14 @@ TEST_CASE("dag_traversal", "[dag]")
std::vector<std::pair<int, int>> edges{{0, 6}, {1, 5}, {5, 6}, {6, 7}, {2, 3}, std::vector<std::pair<int, int>> edges{{0, 6}, {1, 5}, {5, 6}, {6, 7}, {2, 3},
{3, 5}, {4, 7}, {7, 8}, {7, 9}}; {3, 5}, {4, 7}, {7, 8}, {7, 9}};
for (auto const [from, to] : edges) { for (const auto &[from, to] : edges) {
dag.addEdge(from, to); dag.addEdge(from, to);
} }
SECTION("Basic Traversal") SECTION("Basic Traversal")
{ {
dag.reset(); dag.reset();
std::vector<int> visitOrder(N_VERTICES); std::vector<size_t> visitOrder(N_VERTICES);
size_t i = 0; size_t i = 0;
while (!dag.allVisited()) { while (!dag.allVisited()) {
const auto v = dag.visitNext().value(); const auto v = dag.visitNext().value();
@@ -70,7 +70,7 @@ TEST_CASE("dag_traversal", "[dag]")
} }
// Ensure visit order is preserved // Ensure visit order is preserved
for (auto const [from, to] : edges) { for (const auto &[from, to] : edges) {
REQUIRE(visitOrder[from] <= visitOrder[to]); REQUIRE(visitOrder[from] <= visitOrder[to]);
} }
} }

View File

@@ -3,7 +3,6 @@
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include "daggy/loggers/dag_run/FileSystemLogger.hpp"
#include "daggy/loggers/dag_run/OStreamLogger.hpp" #include "daggy/loggers/dag_run/OStreamLogger.hpp"
namespace fs = std::filesystem; namespace fs = std::filesystem;
@@ -40,26 +39,6 @@ inline DAGRunID testDAGRunInit(DAGRunLogger &logger, const std::string &name,
return runID; return runID;
} }
/*
TEST_CASE("Filesystem Logger", "[filesystem_logger]") {
const fs::path logRoot{"fs_logger_unit"};
auto cleanup = [&]() {
if (fs::exists(logRoot)) {
fs::remove_all(logRoot);
}
};
//cleanup();
daggy::loggers::dag_run::FileSystemLogger logger(logRoot);
SECTION("DAGRun Starts") {
testDAGRunInit(logger, "init_test", SAMPLE_TASKS);
}
// cleanup();
}
*/
TEST_CASE("ostream_logger", "[ostream_logger]") TEST_CASE("ostream_logger", "[ostream_logger]")
{ {
// cleanup(); // cleanup();

View File

@@ -1,4 +1,3 @@
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <catch2/catch.hpp> #include <catch2/catch.hpp>

View File

@@ -7,12 +7,12 @@
#include <daggy/executors/task/ForkingTaskExecutor.hpp> #include <daggy/executors/task/ForkingTaskExecutor.hpp>
#include <daggy/loggers/dag_run/OStreamLogger.hpp> #include <daggy/loggers/dag_run/OStreamLogger.hpp>
#include <filesystem> #include <filesystem>
#include <fstream>
#include <iostream> #include <iostream>
namespace rj = rapidjson; namespace rj = rapidjson;
Pistache::Http::Response REQUEST(std::string url, std::string payload = "") Pistache::Http::Response REQUEST(const std::string &url,
const std::string &payload = "")
{ {
Pistache::Http::Experimental::Client client; Pistache::Http::Experimental::Client client;
client.init(); client.init();
@@ -28,12 +28,12 @@ Pistache::Http::Response REQUEST(std::string url, std::string payload = "")
request.then( request.then(
[&](Pistache::Http::Response rsp) { [&](Pistache::Http::Response rsp) {
ok = true; ok = true;
response = rsp; response = std::move(rsp);
}, },
[&](std::exception_ptr ptr) { [&](std::exception_ptr ptr) {
error = true; error = true;
try { try {
std::rethrow_exception(ptr); std::rethrow_exception(std::move(ptr));
} }
catch (std::exception &e) { catch (std::exception &e) {
msg = e.what(); msg = e.what();

View File

@@ -18,10 +18,10 @@ TEST_CASE("threadpool", "[threadpool]")
auto tq = std::make_shared<daggy::TaskQueue>(); auto tq = std::make_shared<daggy::TaskQueue>();
std::vector<std::future<uint32_t>> res; std::vector<std::future<uint32_t>> res;
for (size_t i = 0; i < 100; ++i) for (size_t i = 0; i < 100; ++i)
res.emplace_back(std::move(tq->addTask([&cnt]() { res.emplace_back(tq->addTask([&cnt]() {
cnt++; cnt++;
return cnt.load(); return cnt.load();
}))); }));
tp.addTasks(tq); tp.addTasks(tq);
for (auto &r : res) for (auto &r : res)
r.get(); r.get();

View File

@@ -5,7 +5,6 @@
#include <fstream> #include <fstream>
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
#include <random>
#include "daggy/Serialization.hpp" #include "daggy/Serialization.hpp"
#include "daggy/Utilities.hpp" #include "daggy/Utilities.hpp"
@@ -67,7 +66,7 @@ TEST_CASE("dag_runner_order", "[dagrun_order]")
std::stringstream ss; std::stringstream ss;
daggy::loggers::dag_run::OStreamLogger logger(ss); daggy::loggers::dag_run::OStreamLogger logger(ss);
daggy::TimePoint startTime = daggy::Clock::now(); daggy::TimePoint globalStartTime = daggy::Clock::now();
std::string testParams{ std::string testParams{
R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"}; R"({"DATE": ["2021-05-06", "2021-05-07", "2021-05-08", "2021-05-09" ]})"};
@@ -94,11 +93,11 @@ TEST_CASE("dag_runner_order", "[dagrun_order]")
// Ensure the run order // Ensure the run order
auto rec = logger.getDAGRun(runID); auto rec = logger.getDAGRun(runID);
daggy::TimePoint stopTime = daggy::Clock::now(); daggy::TimePoint globalStopTime = daggy::Clock::now();
std::array<daggy::TimePoint, 5> minTimes; std::array<daggy::TimePoint, 5> minTimes;
minTimes.fill(startTime); minTimes.fill(globalStartTime);
std::array<daggy::TimePoint, 5> maxTimes; std::array<daggy::TimePoint, 5> maxTimes;
maxTimes.fill(stopTime); maxTimes.fill(globalStopTime);
for (const auto &[k, v] : rec.taskAttempts) { for (const auto &[k, v] : rec.taskAttempts) {
size_t idx = k[0] - 65; size_t idx = k[0] - 65;

View File

@@ -1,8 +1,8 @@
#include <signal.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <argparse.hpp> #include <argparse.hpp>
#include <atomic> #include <atomic>
#include <csignal>
#include <daggy/Server.hpp> #include <daggy/Server.hpp>
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
@@ -38,6 +38,8 @@ void signalHandler(int signal)
case SIGTERM: case SIGTERM:
running = false; running = false;
break; break;
default:
break;
} }
} }
@@ -61,7 +63,7 @@ void daemonize()
sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */ sigaddset(&newSigSet, SIGTTOU); /* ignore Tty background writes */
sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */ sigaddset(&newSigSet, SIGTTIN); /* ignore Tty background reads */
sigprocmask(SIG_BLOCK, &newSigSet, sigprocmask(SIG_BLOCK, &newSigSet,
NULL); /* Block the above specified signals */ nullptr); /* Block the above specified signals */
/* Set up a signal handler */ /* Set up a signal handler */
newSigAction.sa_handler = signalHandler; newSigAction.sa_handler = signalHandler;
@@ -69,9 +71,9 @@ void daemonize()
newSigAction.sa_flags = 0; newSigAction.sa_flags = 0;
/* Signals to handle */ /* Signals to handle */
sigaction(SIGHUP, &newSigAction, NULL); /* catch hangup signal */ sigaction(SIGHUP, &newSigAction, nullptr); /* catch hangup signal */
sigaction(SIGTERM, &newSigAction, NULL); /* catch term signal */ sigaction(SIGTERM, &newSigAction, nullptr); /* catch term signal */
sigaction(SIGINT, &newSigAction, NULL); /* catch interrupt signal */ sigaction(SIGINT, &newSigAction, nullptr); /* catch interrupt signal */
// Fork once // Fork once
pid = fork(); pid = fork();
@@ -107,7 +109,7 @@ void daemonize()
(void)rc; (void)rc;
/* Close all open file descriptors */ /* Close all open file descriptors */
for (auto x = sysconf(_SC_OPEN_MAX); x >= 0; x--) { for (int x = sysconf(_SC_OPEN_MAX); x >= 0; x--) {
close(x); close(x);
} }
} }
@@ -152,14 +154,14 @@ int main(int argc, char **argv)
exit(1); exit(1);
} }
bool verbose = args.get<bool>("--verbose"); bool verbose = args.get<bool>("--verbose");
bool asDaemon = args.get<bool>("--daemon"); bool asDaemon = args.get<bool>("--daemon");
std::string logFileName = args.get<std::string>("--log-file"); auto logFileName = args.get<std::string>("--log-file");
std::string listenIP = args.get<std::string>("--ip"); auto listenIP = args.get<std::string>("--ip");
uint16_t listenPort = args.get<int>("--port"); auto listenPort = args.get<int>("--port");
size_t executorThreads = args.get<size_t>("--executor-threads"); auto executorThreads = args.get<size_t>("--executor-threads");
size_t webThreads = args.get<size_t>("--web-threads"); auto webThreads = args.get<size_t>("--web-threads");
size_t dagThreads = args.get<size_t>("--dag-threads"); auto dagThreads = args.get<size_t>("--dag-threads");
if (logFileName == "-") { if (logFileName == "-") {
if (asDaemon) { if (asDaemon) {