- Removing Catch2 code from codebase, will pull it via FetchContent instead.
- Changing StdOutLogger to OStreamLogger, so that test cases output can be silenced.
This commit is contained in:
@@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS True)
|
||||
|
||||
set(THIRD_PARTY_DIR ${CMAKE_BINARY_DIR}/third_party)
|
||||
|
||||
find_package (Threads REQUIRED)
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
include(cmake/rapidjson.cmake)
|
||||
include(cmake/Pistache.cmake)
|
||||
|
||||
@@ -12,9 +12,9 @@ namespace daggy {
|
||||
* This logger should only be used for debug purposes. It doesn't actually log anything, just prints stuff
|
||||
* to stdout.
|
||||
*/
|
||||
class StdOutLogger : public DAGLogger {
|
||||
class OStreamLogger : public DAGLogger {
|
||||
public:
|
||||
StdOutLogger();
|
||||
OStreamLogger(std::ostream &os);
|
||||
|
||||
// Execution
|
||||
virtual DAGRunID startDAGRun(std::string name, const std::vector<Task> &tasks) override;
|
||||
@@ -33,6 +33,7 @@ namespace daggy {
|
||||
private:
|
||||
DAGRunID nextRunID_;
|
||||
std::mutex guard_;
|
||||
std::ostream &os_;
|
||||
};
|
||||
}
|
||||
}
|
||||
43
daggy/src/loggers/dag_run/OStreamLogger.cpp
Normal file
43
daggy/src/loggers/dag_run/OStreamLogger.cpp
Normal file
@@ -0,0 +1,43 @@
|
||||
#include <magic_enum.hpp>
|
||||
|
||||
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
|
||||
|
||||
namespace daggy {
|
||||
namespace loggers {
|
||||
namespace dag_run {
|
||||
OStreamLogger::OStreamLogger(std::ostream &os) : nextRunID_(0), os_(os) {}
|
||||
|
||||
// Execution
|
||||
DAGRunID OStreamLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
size_t runID = nextRunID_++;
|
||||
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
|
||||
<< " tasks" << std::endl;
|
||||
return runID;
|
||||
}
|
||||
|
||||
void OStreamLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
os_ << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl;
|
||||
}
|
||||
|
||||
void OStreamLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord &attempt) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
const std::string &msg = attempt.rc == 0 ? attempt.output : attempt.error;
|
||||
os_ << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": "
|
||||
<< msg << std::endl;
|
||||
}
|
||||
|
||||
void OStreamLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
os_ << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state)
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
// Querying
|
||||
std::vector<DAGRunSummary> OStreamLogger::getDAGs(uint32_t stateMask) { return {}; }
|
||||
|
||||
DAGRunRecord OStreamLogger::getDAGRun(DAGRunID dagRunId) { return {}; }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
#include <magic_enum.hpp>
|
||||
|
||||
#include <daggy/loggers/dag_run/StdOutLogger.hpp>
|
||||
|
||||
namespace daggy {
|
||||
namespace loggers {
|
||||
namespace dag_run {
|
||||
StdOutLogger::StdOutLogger() : nextRunID_(0) {}
|
||||
|
||||
// Execution
|
||||
DAGRunID StdOutLogger::startDAGRun(std::string name, const std::vector<Task> &tasks) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
size_t runID = nextRunID_++;
|
||||
std::cout << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
|
||||
<< " tasks" << std::endl;
|
||||
return runID;
|
||||
}
|
||||
|
||||
void StdOutLogger::updateDAGRunState(DAGRunID dagRunID, RunState state) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
std::cout << "DAG State Change(" << dagRunID << "): " << magic_enum::enum_name(state) << std::endl;
|
||||
}
|
||||
|
||||
void StdOutLogger::logTaskAttempt(DAGRunID dagRunID, size_t taskID, const AttemptRecord &attempt) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
const std::string &msg = attempt.rc == 0 ? attempt.output : attempt.error;
|
||||
std::cout << "Task Attempt (" << dagRunID << '/' << taskID << "): Ran with RC " << attempt.rc << ": "
|
||||
<< msg << std::endl;
|
||||
}
|
||||
|
||||
void StdOutLogger::updateTaskState(DAGRunID dagRunID, TaskID taskID, RunState state) {
|
||||
std::lock_guard<std::mutex> lock(guard_);
|
||||
std::cout << "Task State Change (" << dagRunID << '/' << taskID << "): " << magic_enum::enum_name(state)
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
// Querying
|
||||
std::vector<DAGRunSummary> StdOutLogger::getDAGs(uint32_t stateMask) { return {}; }
|
||||
|
||||
DAGRunRecord StdOutLogger::getDAGRun(DAGRunID dagRunId) { return {}; }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,13 @@
|
||||
Include(FetchContent)
|
||||
|
||||
FetchContent_Declare(
|
||||
Catch2
|
||||
GIT_REPOSITORY https://github.com/catchorg/Catch2.git
|
||||
GIT_TAG v2.13.7)
|
||||
FetchContent_MakeAvailable(Catch2)
|
||||
|
||||
project(tests)
|
||||
file(GLOB UNIT_TESTS unit_*.cpp)
|
||||
file(GLOB INTEGRATION_TESTS int_*.cpp)
|
||||
add_executable(tests main.cpp ${UNIT_TESTS} ${INTEGRATION_TESTS})
|
||||
target_link_libraries(tests daggy stdc++fs)
|
||||
target_link_libraries(tests daggy stdc++fs Catch2::Catch2)
|
||||
17698
tests/catch.hpp
17698
tests/catch.hpp
File diff suppressed because it is too large
Load Diff
@@ -2,7 +2,7 @@
|
||||
|
||||
#include "daggy/DAG.hpp"
|
||||
|
||||
#include "catch.hpp"
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
TEST_CASE("General tests", "[general]") {
|
||||
REQUIRE(1 == 1);
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
#include "daggy/DAG.hpp"
|
||||
|
||||
#define CATCH_CONFIG_MAIN
|
||||
#include "catch.hpp"
|
||||
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
TEST_CASE("Sanity tests", "[sanity]") {
|
||||
REQUIRE(1 == 1);
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
#include "daggy/DAG.hpp"
|
||||
|
||||
#include "catch.hpp"
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
TEST_CASE("DAG Construction Tests", "[dag]") {
|
||||
daggy::DAG dag;
|
||||
@@ -12,28 +12,26 @@ TEST_CASE("DAG Construction Tests", "[dag]") {
|
||||
|
||||
REQUIRE_NOTHROW(dag.addVertex());
|
||||
for (int i = 1; i < 10; ++i) {
|
||||
dag.addVertex();
|
||||
dag.addEdge(i-1, i);
|
||||
dag.addVertex();
|
||||
dag.addEdge(i - 1, i);
|
||||
}
|
||||
|
||||
REQUIRE(dag.size() == 10);
|
||||
REQUIRE(! dag.empty());
|
||||
REQUIRE(!dag.empty());
|
||||
|
||||
// Cannot add an edge that would result in a cycle
|
||||
REQUIRE_THROWS(dag.addEdge(9, 5));
|
||||
|
||||
// Bounds checking
|
||||
SECTION("addEdge Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.addEdge(20, 0));
|
||||
REQUIRE_THROWS(dag.addEdge(0, 20));
|
||||
}
|
||||
SECTION("dropEdge Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.dropEdge(20, 0));
|
||||
REQUIRE_THROWS(dag.dropEdge(0, 20));
|
||||
}
|
||||
SECTION("hasPath Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.hasPath(20, 0));
|
||||
REQUIRE_THROWS(dag.hasPath(0, 20));
|
||||
REQUIRE_THROWS(dag.addEdge(20, 0));
|
||||
REQUIRE_THROWS(dag.addEdge(0, 20));
|
||||
}SECTION("dropEdge Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.dropEdge(20, 0));
|
||||
REQUIRE_THROWS(dag.dropEdge(0, 20));
|
||||
}SECTION("hasPath Bounds Checking") {
|
||||
REQUIRE_THROWS(dag.hasPath(20, 0));
|
||||
REQUIRE_THROWS(dag.hasPath(0, 20));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,36 +49,36 @@ TEST_CASE("DAG Traversal Tests", "[dag]") {
|
||||
4 -------------------------------/ \-----> 9
|
||||
*/
|
||||
|
||||
std::vector<std::pair<int,int>> edges{
|
||||
{0, 6}
|
||||
, {1, 5}
|
||||
, {5, 6}
|
||||
, {6, 7}
|
||||
, {2, 3}
|
||||
, {3, 5}
|
||||
, {4, 7}
|
||||
, {7, 8}
|
||||
, {7, 9}
|
||||
std::vector<std::pair<int, int>> edges{
|
||||
{0, 6},
|
||||
{1, 5},
|
||||
{5, 6},
|
||||
{6, 7},
|
||||
{2, 3},
|
||||
{3, 5},
|
||||
{4, 7},
|
||||
{7, 8},
|
||||
{7, 9}
|
||||
};
|
||||
|
||||
for (auto const [from, to] : edges) {
|
||||
dag.addEdge(from, to);
|
||||
for (auto const[from, to] : edges) {
|
||||
dag.addEdge(from, to);
|
||||
}
|
||||
|
||||
SECTION("Baisc Traversal") {
|
||||
dag.reset();
|
||||
std::vector<int> visitOrder(N_VERTICES);
|
||||
size_t i = 0;
|
||||
while (! dag.allVisited()) {
|
||||
const auto & v = dag.visitNext().value();
|
||||
dag.completeVisit(v);
|
||||
visitOrder[v] = i;
|
||||
++i;
|
||||
}
|
||||
dag.reset();
|
||||
std::vector<int> visitOrder(N_VERTICES);
|
||||
size_t i = 0;
|
||||
while (!dag.allVisited()) {
|
||||
const auto &v = dag.visitNext().value();
|
||||
dag.completeVisit(v);
|
||||
visitOrder[v] = i;
|
||||
++i;
|
||||
}
|
||||
|
||||
// Ensure visit order is preserved
|
||||
for (auto const [from, to] : edges) {
|
||||
REQUIRE(visitOrder[from] <= visitOrder[to]);
|
||||
}
|
||||
// Ensure visit order is preserved
|
||||
for (auto const[from, to] : edges) {
|
||||
REQUIRE(visitOrder[from] <= visitOrder[to]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,48 +3,46 @@
|
||||
|
||||
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
||||
|
||||
#include "catch.hpp"
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
TEST_CASE("Basic Execution", "[forking_executor]") {
|
||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||
|
||||
SECTION("Simple Run") {
|
||||
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};
|
||||
SECTION("Simple Run") {
|
||||
std::vector<std::string> cmd{"/usr/bin/echo", "abc", "123"};
|
||||
|
||||
auto rec = ex.runCommand(cmd);
|
||||
auto rec = ex.runCommand(cmd);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.output == "abc 123\n");
|
||||
REQUIRE(rec.error.empty());
|
||||
}
|
||||
|
||||
SECTION("Error Run") {
|
||||
std::vector<std::string> cmd{"/usr/bin/expr", "1", "+", "+"};
|
||||
|
||||
auto rec = ex.runCommand(cmd);
|
||||
|
||||
REQUIRE(rec.rc == 2);
|
||||
REQUIRE(rec.error == "/usr/bin/expr: syntax error: missing argument after ‘+’\n");
|
||||
REQUIRE(rec.output.empty());
|
||||
}
|
||||
|
||||
SECTION("Large Output") {
|
||||
const std::vector<std::string> BIG_FILES{
|
||||
"/usr/share/dict/linux.words"
|
||||
, "/usr/share/dict/cracklib-small"
|
||||
, "/etc/ssh/moduli"
|
||||
};
|
||||
|
||||
for (const auto & bigFile : BIG_FILES) {
|
||||
if (! std::filesystem::exists(bigFile)) continue;
|
||||
|
||||
std::vector<std::string> cmd{"/usr/bin/cat", bigFile};
|
||||
|
||||
auto rec = ex.runCommand(cmd);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.output.size() == std::filesystem::file_size(bigFile));
|
||||
REQUIRE(rec.error.empty());
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.output == "abc 123\n");
|
||||
REQUIRE(rec.error.empty());
|
||||
}
|
||||
|
||||
SECTION("Error Run") {
|
||||
std::vector<std::string> cmd{"/usr/bin/expr", "1", "+", "+"};
|
||||
|
||||
auto rec = ex.runCommand(cmd);
|
||||
|
||||
REQUIRE(rec.rc == 2);
|
||||
REQUIRE(rec.error == "/usr/bin/expr: syntax error: missing argument after ‘+’\n");
|
||||
REQUIRE(rec.output.empty());
|
||||
}
|
||||
|
||||
SECTION("Large Output") {
|
||||
const std::vector<std::string> BIG_FILES{
|
||||
"/usr/share/dict/linux.words", "/usr/share/dict/cracklib-small", "/etc/ssh/moduli"
|
||||
};
|
||||
|
||||
for (const auto &bigFile : BIG_FILES) {
|
||||
if (!std::filesystem::exists(bigFile)) continue;
|
||||
|
||||
std::vector<std::string> cmd{"/usr/bin/cat", bigFile};
|
||||
|
||||
auto rec = ex.runCommand(cmd);
|
||||
|
||||
REQUIRE(rec.rc == 0);
|
||||
REQUIRE(rec.output.size() == std::filesystem::file_size(bigFile));
|
||||
REQUIRE(rec.error.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
|
||||
#include "catch.hpp"
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
#include "daggy/Serialization.hpp"
|
||||
|
||||
@@ -15,16 +15,13 @@ TEST_CASE("Deserialize Parameters", "[deserialize_parameters]") {
|
||||
REQUIRE(params.size() == 2);
|
||||
REQUIRE(std::holds_alternative<std::vector<std::string>>(params["{{DATE}}"]));
|
||||
REQUIRE(std::holds_alternative<std::string>(params["{{SOURCE}}"]));
|
||||
}
|
||||
SECTION("Invalid JSON") {
|
||||
}SECTION("Invalid JSON") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": "name")"};
|
||||
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
|
||||
}
|
||||
SECTION("Non-string Keys") {
|
||||
}SECTION("Non-string Keys") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], 6: "name"})"};
|
||||
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
|
||||
}
|
||||
SECTION("Non-array/Non-string values") {
|
||||
}SECTION("Non-array/Non-string values") {
|
||||
std::string testParams{R"({"DATE": ["2021-05-06", "2021-05-07" ], "SOURCE": {"name": "kevin"}})"};
|
||||
REQUIRE_THROWS(daggy::parametersFromJSON(testParams));
|
||||
}
|
||||
|
||||
@@ -3,32 +3,39 @@
|
||||
|
||||
#include "daggy/ThreadPool.hpp"
|
||||
|
||||
#include "catch.hpp"
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
using namespace daggy;
|
||||
|
||||
TEST_CASE("Threadpool Construction", "[threadpool]") {
|
||||
std::atomic<uint32_t> cnt(0);
|
||||
ThreadPool tp(10);
|
||||
std::atomic<uint32_t> cnt(0);
|
||||
ThreadPool tp(10);
|
||||
|
||||
std::vector<std::future<uint32_t>> rets;
|
||||
std::vector<std::future<uint32_t>> rets;
|
||||
|
||||
SECTION("Adding large tasks queues with return values") {
|
||||
auto tq = std::make_shared<daggy::TaskQueue>();
|
||||
std::vector<std::future<uint32_t>> res;
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
res.emplace_back(std::move(tq->addTask([&cnt]() { cnt++; return cnt.load(); })));
|
||||
tp.addTasks(tq);
|
||||
for (auto & r : res) r.get();
|
||||
REQUIRE(cnt == 100);
|
||||
}
|
||||
SECTION("Adding large tasks queues with return values") {
|
||||
auto tq = std::make_shared<daggy::TaskQueue>();
|
||||
std::vector<std::future<uint32_t>> res;
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
res.emplace_back(std::move(tq->addTask([&cnt]() {
|
||||
cnt++;
|
||||
return cnt.load();
|
||||
})));
|
||||
tp.addTasks(tq);
|
||||
for (auto &r : res) r.get();
|
||||
REQUIRE(cnt == 100);
|
||||
}
|
||||
|
||||
SECTION("Slow runs") {
|
||||
std::vector<std::future<void>> res;
|
||||
using namespace std::chrono_literals;
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
res.push_back(tp.addTask([&cnt]() { std::this_thread::sleep_for(20ms); cnt++; return; }));
|
||||
for (auto & r : res) r.get();
|
||||
REQUIRE(cnt == 100);
|
||||
}
|
||||
SECTION("Slow runs") {
|
||||
std::vector<std::future<void>> res;
|
||||
using namespace std::chrono_literals;
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
res.push_back(tp.addTask([&cnt]() {
|
||||
std::this_thread::sleep_for(20ms);
|
||||
cnt++;
|
||||
return;
|
||||
}));
|
||||
for (auto &r : res) r.get();
|
||||
REQUIRE(cnt == 100);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,12 +2,12 @@
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
|
||||
#include "catch.hpp"
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
#include "daggy/Utilities.hpp"
|
||||
#include "daggy/Serialization.hpp"
|
||||
#include "daggy/executors/task/ForkingTaskExecutor.hpp"
|
||||
#include "daggy/loggers/dag_run/StdOutLogger.hpp"
|
||||
#include "daggy/loggers/dag_run/OStreamLogger.hpp"
|
||||
|
||||
TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||
SECTION("Basic expansion") {
|
||||
@@ -32,7 +32,8 @@ TEST_CASE("Parameter Expansion", "[utilities_parameter_expansion]") {
|
||||
|
||||
TEST_CASE("DAG Runner", "[utilities_dag_runner]") {
|
||||
daggy::executors::task::ForkingTaskExecutor ex(10);
|
||||
daggy::loggers::dag_run::StdOutLogger logger;
|
||||
std::stringstream ss;
|
||||
daggy::loggers::dag_run::OStreamLogger logger(ss);
|
||||
|
||||
std::string taskJSON = R"([{"name": "A", "command": ["/bin/echo", "A"], "children": ["C"]}, {"name": "B", "command": ["/bin/echo", "B"], "children": ["C"]},{"name": "C", "command": ["/bin/echo", "C"]}])";
|
||||
auto tasks = daggy::tasksFromJSON(taskJSON);
|
||||
|
||||
Reference in New Issue
Block a user