Roughing in more scheduler and threadpool

This commit is contained in:
Ian Roddis
2021-06-16 07:36:17 -03:00
parent 40f6b283be
commit e44b7c4e8f
4 changed files with 37 additions and 6 deletions

View File

@@ -3,7 +3,7 @@ project(daggy)
#ExternalProject_Add_StepDependencies(pistache_extern build)
file(GLOB SOURCES src/*.cpp src/**/*.cpp)
add_library(${PROJECT_NAME} STATIC ${SOURCES})
add_library(${PROJECT_NAME} STATIC ${SOURCES} src/Scheduler.cpp)
include_directories(${PISTACHE_INCLUDE_DIR})
target_include_directories(${PROJECT_NAME} PUBLIC include)
target_link_libraries(${PROJECT_NAME} pistache pthread)

View File

@@ -14,12 +14,17 @@ namespace daggy {
class Scheduler {
public:
// Register an executor
void registerExecutor(std::shared_ptr<Executor> executor);
void runTasks(std::vector<Task> tasks, std::unordered_map<std::string, ParameterValue> parameters);
void registerExecutor(std::shared_ptr<Executor> executor
, size_t maxParallelTasks
);
void runTasks(std::vector<Task> tasks
, std::unordered_map<std::string, ParameterValue> parameters)
, DAG dag = {} // Allows for loading of an existing DAG
);
private:
std::unordered_map<std::string, std::shared_ptr<Executor>> executors;
std::unordered_map<std::string, std::vector<std::future<TaskResult>>> jobs;
std::vector<Parameter> parameters;
std::unordered_map<std::string, std::shared_ptr<Executor>> executors_;
std::unordered_map<std::string, std::vector<std::future<TaskResult>>> jobs_;
std::unordered_map<std::string, ParameterValue> parameters_;
};
}

View File

@@ -0,0 +1,19 @@
#pragma once
#include <atomic>
#include <thread>
#include <vector>
#include <memory>
#include <condition_variable>
namespace daggy {
class ThreadPool {
public:
ThreadPool(size_t nWorkers);
private:
std::atomic<bool> shutdown_;
std::mutex guard_;
std::condition_variable cv_;
std::vector<std::thread> workers;
};
}

7
daggy/src/Scheduler.cpp Normal file
View File

@@ -0,0 +1,7 @@
#pragma once
namespace daggy {
void Scheduler::registerExecutor(std::shared_ptr<Executor> executor, size_t maxParallelTasks) {
executors_.emplace(executor->getName(), executor);
}
}