Adding executor description for verbose output, adding output directory for build artifacts

This commit is contained in:
Ian Roddis
2022-01-07 14:37:06 -04:00
parent 856e5bd2f4
commit 4da78c9dd6
12 changed files with 61 additions and 3 deletions

View File

@@ -10,6 +10,10 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_EXPORT_COMPILE_COMMANDS True)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Werror")
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
if(CMAKE_BUILD_TYPE MATCHES "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
# set(TSAN_OPTIONS "suppressions=${CMAKE_CURRENT_DIR}/tests/tsan.supp")

View File

@@ -270,11 +270,15 @@ int main(int argc, char **argv)
doc.SetObject();
}
auto logger = loggerFactory(doc);
auto executor = executorFactory(doc);
if (verbose) {
std::cout << "Server running at http://" << listenIP << ':' << listenPort
<< std::endl
<< "Max DAG Processing: " << dagThreads << std::endl
<< "Max Web Clients: " << webThreads << std::endl
<< "Executor: " << executor->description() << std::endl
<< std::endl
<< "Ctrl-C to exit" << std::endl;
}
@@ -283,9 +287,6 @@ int main(int argc, char **argv)
daemonize();
}
auto logger = loggerFactory(doc);
auto executor = executorFactory(doc);
Pistache::Address listenSpec(listenIP, listenPort);
daggy::daggyd::Server server(listenSpec, *logger, *executor, dagThreads);

View File

@@ -169,6 +169,11 @@ namespace daggy {
cv_.notify_one();
}
size_t size() const
{
return workers_.size();
}
private:
// need to keep track of threads, so we can join them
std::vector<std::thread> workers_;

View File

@@ -51,6 +51,8 @@ namespace daggy::executors::task {
bool stop(DAGRunID runID, const std::string &taskName) override;
std::string description() const;
void addRunner(const std::string &url);
private:

View File

@@ -31,6 +31,8 @@ namespace daggy::executors::task {
bool stop(DAGRunID runID, const std::string &taskName) override;
std::string description() const override;
private:
ThreadPool tp_;
std::mutex taskControlsGuard_;

View File

@@ -21,5 +21,7 @@ namespace daggy::executors::task {
const Task &task) override;
bool stop(DAGRunID runID, const std::string &taskName) override;
std::string description() const;
};
} // namespace daggy::executors::task

View File

@@ -25,6 +25,8 @@ namespace daggy::executors::task {
bool stop(DAGRunID runID, const std::string &taskName) override;
std::string description() const override;
private:
struct Job
{

View File

@@ -33,5 +33,8 @@ namespace daggy::executors::task {
// Kill a currently executing task. This will resolve the future.
virtual bool stop(DAGRunID runID, const std::string &taskName) = 0;
// Report a description of the executor
virtual std::string description() const = 0;
};
} // namespace daggy::executors::task

View File

@@ -88,6 +88,25 @@ DaggyRunnerTaskExecutor::~DaggyRunnerTaskExecutor()
monitorWorker_.join();
}
std::string DaggyRunnerTaskExecutor::description() const
{
std::stringstream ss;
ss << "DaggyRunnerTaskExecutor running with " << runners_.size()
<< " runners: [";
bool first = true;
for (const auto &[runner, _] : runners_) {
if (first) {
first = false;
}
else {
ss << ", ";
}
ss << runner;
}
ss << "]";
return ss.str();
}
// Validates the job to ensure that all required values are set and are of
// the right type,
bool DaggyRunnerTaskExecutor::validateTaskParameters(const ConfigValues &job)

View File

@@ -72,6 +72,13 @@ ForkingTaskExecutor::~ForkingTaskExecutor()
taskControls_.clear();
}
std::string ForkingTaskExecutor::description() const
{
std::stringstream ss;
ss << "ForkingTaskExecutor with " << tp_.size() << " threads";
return ss.str();
}
bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName)
{
std::string key = std::to_string(runID) + "_" + taskName;

View File

@@ -2,6 +2,12 @@
#include <daggy/executors/task/NoopTaskExecutor.hpp>
namespace daggy::executors::task {
std::string NoopTaskExecutor::description() const
{
return "NoopTaskExecutor";
}
std::future<daggy::AttemptRecord> NoopTaskExecutor::execute(
DAGRunID runID, const std::string &taskName, const Task &task)
{

View File

@@ -93,6 +93,11 @@ namespace daggy::executors::task {
runningJobs_.clear();
}
std::string SlurmTaskExecutor::description() const
{
return "SlurmTaskExecutor";
}
// Validates the job to ensure that all required values are set and are of
// the right type,
bool SlurmTaskExecutor::validateTaskParameters(const ConfigValues &job)