Squashed commit of the following: commit 69d5ef7a256b86a86d46e5ae374c00fded1497ea Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 12:15:55 2021 -0400 Updating readme commit 94a9f676d0f9cc0b55cdc18c4927eaea40d82c77 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 12:05:36 2021 -0400 Fixing serialization of attempt records when querying entire dag commit 945e5f90b24abf07c9af1bc4c6bbcb33e93b8069 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 11:37:59 2021 -0400 Compiles cleanly... commit 8b23e46081d47fb80dc1a2d998fc6dc4bbf301a8 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 10:43:03 2021 -0400 Adding in missing source file to cmake build list commit 6d10d9791206e2bc15788beadeea580b8e43a853 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 10:41:43 2021 -0400 Adding new executors commit 42a2c67f4d6ae99df95d917c8621d78cd99837a1 Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 10:27:14 2021 -0400 Fixing missing curl cmake dependency commit 394bc4c5d51ecee7bf14712f719c8bf7e97fb0fa Author: Ian Roddis <tech@kinesin.ca> Date: Thu Dec 16 10:21:58 2021 -0400 Fixing missing curl cmake dependency commit dd9efc8e7e7770ea1bcbccb70a1af9cfcff0414c Author: Ian Roddis <tech@kinesin.ca> Date: Wed Dec 15 17:15:38 2021 -0400 Checkpointing progress commit 3b3b55d6037bb96e46de6763f486f4ecb92fe6a0 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Dec 15 14:21:18 2021 -0400 updating readme commit 303027c11452941b2a0c0d1b04ac5942e79efd74 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Dec 15 14:17:16 2021 -0400 Namespacing daggyd Adding more error checking around deserialization of parameters Adding tests for runner agent commit c592eaeba12e2a449bae401e8c1d9ed236416d52 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Dec 15 11:20:21 2021 -0400 Checkpointing work commit fb1862d1cefe2b53a98659cce3c8c73d88bf5d84 Author: Ian Roddis <tech@kinesin.ca> Date: Wed Dec 15 09:52:29 2021 -0400 Copying daggyd for daggyr template, adding in basic routes
508 lines
14 KiB
C++
508 lines
14 KiB
C++
#include <enum.h>
|
|
|
|
#include <daggy/Serialization.hpp>
|
|
#include <daggy/Utilities.hpp>
|
|
#include <daggyd/Server.hpp>
|
|
#include <iomanip>
|
|
#include <mutex>
|
|
#include <numeric>
|
|
#include <stdexcept>
|
|
#include <thread>
|
|
#include <utility>
|
|
|
|
#define REQ_RESPONSE(code, msg) \
|
|
std::stringstream ss; \
|
|
ss << R"({"message": )" << std::quoted(msg) << "}"; \
|
|
response.send(Pistache::Http::Code::code, ss.str()); \
|
|
return;
|
|
|
|
using namespace Pistache;
|
|
|
|
namespace daggy::daggyd {
|
|
void Server::init(size_t threads)
|
|
{
|
|
auto opts = Http::Endpoint::options()
|
|
.threads(threads)
|
|
.flags(Pistache::Tcp::Options::ReuseAddr |
|
|
Pistache::Tcp::Options::ReusePort)
|
|
.maxRequestSize(4294967296)
|
|
.maxResponseSize(4294967296);
|
|
endpoint_.init(opts);
|
|
createDescription();
|
|
}
|
|
|
|
Server::Server(const Pistache::Address &listenSpec,
|
|
loggers::dag_run::DAGRunLogger &logger,
|
|
executors::task::TaskExecutor &executor, size_t nDAGRunners)
|
|
: endpoint_(listenSpec)
|
|
, desc_("Daggy API", "0.1")
|
|
, logger_(logger)
|
|
, executor_(executor)
|
|
, runnerPool_(nDAGRunners)
|
|
{
|
|
}
|
|
|
|
Server::~Server()
|
|
{
|
|
shutdown();
|
|
}
|
|
|
|
void Server::start()
|
|
{
|
|
router_.initFromDescription(desc_);
|
|
|
|
endpoint_.setHandler(router_.handler());
|
|
endpoint_.serveThreaded();
|
|
}
|
|
|
|
Server &Server::setSSLCertificates(const fs::path &cert, const fs::path &key)
|
|
{
|
|
endpoint_.useSSL(cert, key);
|
|
return *this;
|
|
}
|
|
|
|
void Server::shutdown()
|
|
{
|
|
endpoint_.shutdown();
|
|
runnerPool_.shutdown();
|
|
}
|
|
|
|
uint16_t Server::getPort() const
|
|
{
|
|
return endpoint_.getPort();
|
|
}
|
|
|
|
void Server::createDescription()
|
|
{
|
|
desc_.info().license("MIT", "https://opensource.org/licenses/MIT");
|
|
|
|
auto backendErrorResponse =
|
|
desc_.response(Http::Code::Internal_Server_Error,
|
|
R"({"error": "An error occurred with the backend"})");
|
|
|
|
desc_.schemes(Rest::Scheme::Http)
|
|
.basePath("/v1")
|
|
.produces(MIME(Application, Json))
|
|
.consumes(MIME(Application, Json));
|
|
|
|
desc_.route(desc_.get("/ready"))
|
|
.bind(&Server::handleReady, this)
|
|
.response(Http::Code::Ok, "Response to the /ready call")
|
|
.hide();
|
|
|
|
auto versionPath = desc_.path("/v1");
|
|
|
|
/*
|
|
DAG Run Summaries
|
|
*/
|
|
auto dagRunsPath = versionPath.path("/dagruns");
|
|
|
|
dagRunsPath.route(desc_.get("/"))
|
|
.bind(&Server::handleQueryDAGs, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "List summaries DAGs");
|
|
|
|
/*
|
|
Individual DAG Run routes
|
|
*/
|
|
auto dagRunPath = versionPath.path("/dagrun");
|
|
|
|
dagRunPath.route(desc_.post("/"))
|
|
.bind(&Server::handleRunDAG, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Run a DAG");
|
|
|
|
dagRunPath.route(desc_.post("/validate"))
|
|
.bind(&Server::handleValidateDAG, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Validate a DAG Run Spec");
|
|
|
|
/*
|
|
Management of a specific DAG
|
|
*/
|
|
auto specificDAGRunPath = dagRunPath.path("/:runID");
|
|
|
|
specificDAGRunPath.route(desc_.get("/"))
|
|
.bind(&Server::handleGetDAGRun, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Full DAG Run");
|
|
|
|
specificDAGRunPath.route(desc_.get("/state"))
|
|
.bind(&Server::handleGetDAGRunState, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok,
|
|
"Structure of a DAG and DAG and Task run states");
|
|
|
|
specificDAGRunPath.route(desc_.patch("/state/:state"))
|
|
.bind(&Server::handleSetDAGRunState, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Change the state of a DAG");
|
|
|
|
/*
|
|
Task paths
|
|
*/
|
|
auto taskPath = specificDAGRunPath.path("/task/:taskName");
|
|
taskPath.route(desc_.get("/"))
|
|
.bind(&Server::handleGetTask, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Details of a specific task");
|
|
|
|
/*
|
|
Task State paths
|
|
*/
|
|
auto taskStatePath = taskPath.path("/state");
|
|
|
|
taskStatePath.route(desc_.get("/"))
|
|
.bind(&Server::handleGetTaskState, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Get a task state");
|
|
|
|
taskStatePath.route(desc_.patch("/:state"))
|
|
.bind(&Server::handleSetTaskState, this)
|
|
.produces(MIME(Application, Json))
|
|
.response(Http::Code::Ok, "Set a task state");
|
|
}
|
|
|
|
void Server::handleRunDAG(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
auto dagSpec = dagFromJSON(request.body());
|
|
dagSpec.tasks =
|
|
expandTaskSet(dagSpec.tasks, executor_, dagSpec.taskConfig.variables);
|
|
|
|
// Get a run ID
|
|
DAGRunID runID = logger_.startDAGRun(dagSpec);
|
|
auto dag = buildDAGFromTasks(dagSpec.tasks);
|
|
queueDAG_(runID, dag, dagSpec.taskConfig);
|
|
|
|
response.send(Pistache::Http::Code::Ok,
|
|
R"({"runID": )" + std::to_string(runID) + "}");
|
|
}
|
|
|
|
void Server::handleValidateDAG(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
try {
|
|
dagFromJSON(request.body());
|
|
response.send(Pistache::Http::Code::Ok, R"({"valid": true})");
|
|
}
|
|
catch (std::exception &e) {
|
|
std::string error = e.what();
|
|
response.send(Pistache::Http::Code::Ok,
|
|
std::string{R"({"valid": true, "error": })"} + error + "}");
|
|
}
|
|
}
|
|
|
|
void Server::handleQueryDAGs(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
bool all = false;
|
|
std::string tag = "";
|
|
|
|
if (request.query().has("tag")) {
|
|
tag = request.query().get("tag").value();
|
|
}
|
|
|
|
if (request.hasParam(":all")) {
|
|
auto val = request.query().get("all").value();
|
|
if (val == "true" or val == "1") {
|
|
all = true;
|
|
}
|
|
}
|
|
|
|
auto dagRuns = logger_.queryDAGRuns(tag, all);
|
|
std::stringstream ss;
|
|
ss << '[';
|
|
|
|
bool first = true;
|
|
for (const auto &run : dagRuns) {
|
|
if (first) {
|
|
first = false;
|
|
}
|
|
else {
|
|
ss << ", ";
|
|
}
|
|
|
|
ss << " {"
|
|
<< R"("runID": )" << run.runID << ',' << R"("tag": )"
|
|
<< std::quoted(run.tag) << ","
|
|
<< R"("startTime": )" << std::quoted(timePointToString(run.startTime))
|
|
<< ',' << R"("lastUpdate": )"
|
|
<< std::quoted(timePointToString(run.lastUpdate)) << ','
|
|
<< R"("taskCounts": {)";
|
|
bool firstState = true;
|
|
for (const auto &[state, count] : run.taskStateCounts) {
|
|
if (firstState) {
|
|
firstState = false;
|
|
}
|
|
else {
|
|
ss << ", ";
|
|
}
|
|
ss << std::quoted(state._to_string()) << ':' << count;
|
|
}
|
|
ss << '}' // end of taskCounts
|
|
<< '}'; // end of item
|
|
}
|
|
|
|
ss << ']';
|
|
response.send(Pistache::Http::Code::Ok, ss.str());
|
|
}
|
|
|
|
void Server::handleGetDAGRun(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
if (!request.hasParam(":runID")) {
|
|
REQ_RESPONSE(Not_Found, "No runID provided in URL");
|
|
}
|
|
auto runID = request.param(":runID").as<size_t>();
|
|
auto run = logger_.getDAGRun(runID);
|
|
|
|
bool first = true;
|
|
std::stringstream ss;
|
|
ss << "{"
|
|
<< R"("runID": )" << runID << ',' << R"("tag": )"
|
|
<< std::quoted(run.dagSpec.tag) << ',' << R"("tasks": )"
|
|
<< tasksToJSON(run.dagSpec.tasks) << ',';
|
|
|
|
// task run states
|
|
ss << R"("taskStates": { )";
|
|
first = true;
|
|
for (const auto &[name, state] : run.taskRunStates) {
|
|
if (first) {
|
|
first = false;
|
|
}
|
|
else {
|
|
ss << ',';
|
|
}
|
|
ss << std::quoted(name) << ": " << std::quoted(state._to_string());
|
|
}
|
|
ss << "},";
|
|
|
|
// Attempt records
|
|
first = true;
|
|
ss << R"("taskAttempts": { )";
|
|
for (const auto &[taskName, attempts] : run.taskAttempts) {
|
|
if (first) {
|
|
first = false;
|
|
}
|
|
else {
|
|
ss << ',';
|
|
}
|
|
ss << std::quoted(taskName) << ": [";
|
|
bool firstAttempt = true;
|
|
for (const auto &attempt : attempts) {
|
|
if (firstAttempt) {
|
|
firstAttempt = false;
|
|
}
|
|
else {
|
|
ss << ',';
|
|
}
|
|
ss << attemptRecordToJSON(attempt);
|
|
}
|
|
ss << ']';
|
|
}
|
|
ss << "},";
|
|
|
|
// DAG state changes
|
|
first = true;
|
|
ss << R"("dagStateChanges": [ )";
|
|
for (const auto &change : run.dagStateChanges) {
|
|
if (first) {
|
|
first = false;
|
|
}
|
|
else {
|
|
ss << ',';
|
|
}
|
|
ss << stateUpdateRecordToJSON(change);
|
|
}
|
|
ss << "]";
|
|
ss << '}';
|
|
|
|
response.send(Pistache::Http::Code::Ok, ss.str());
|
|
}
|
|
|
|
void Server::handleGetDAGRunState(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
DAGRunID runID = request.param(":runID").as<DAGRunID>();
|
|
RunState state = RunState::QUEUED;
|
|
try {
|
|
state = logger_.getDAGRunState(runID);
|
|
std::stringstream ss;
|
|
ss << R"({ "runID": )" << runID << R"(, "state": )"
|
|
<< std::quoted(state._to_string()) << '}';
|
|
response.send(Pistache::Http::Code::Ok, ss.str());
|
|
}
|
|
catch (std::exception &e) {
|
|
REQ_RESPONSE(Not_Found, e.what());
|
|
}
|
|
}
|
|
|
|
void Server::queueDAG_(DAGRunID runID, const TaskDAG &dag,
|
|
const TaskParameters &taskParameters)
|
|
{
|
|
std::lock_guard<std::mutex> lock(runnerGuard_);
|
|
/*
|
|
auto it = runners_.emplace(
|
|
std::piecewise_construct, std::forward_as_tuple(runID),
|
|
std::forward_as_tuple(runID, executor_, logger_, dag,
|
|
taskParameters));
|
|
*/
|
|
auto it = runners_.emplace(
|
|
runID, std::make_shared<DAGRunner>(runID, executor_, logger_, dag,
|
|
taskParameters));
|
|
|
|
if (!it.second)
|
|
throw std::runtime_error("A DAGRun with the same ID is already running");
|
|
auto runner = it.first->second;
|
|
runnerPool_.addTask([runner, runID, this]() {
|
|
runner->run();
|
|
std::lock_guard<std::mutex> lock(this->runnerGuard_);
|
|
this->runners_.extract(runID);
|
|
});
|
|
}
|
|
|
|
void Server::handleSetDAGRunState(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
// TODO handle state transition
|
|
DAGRunID runID = request.param(":runID").as<DAGRunID>();
|
|
RunState newState = RunState::_from_string(
|
|
request.param(":state").as<std::string>().c_str());
|
|
|
|
std::shared_ptr<DAGRunner> runner{nullptr};
|
|
{
|
|
std::lock_guard<std::mutex> lock(runnerGuard_);
|
|
auto it = runners_.find(runID);
|
|
if (runners_.find(runID) != runners_.end()) {
|
|
runner = it->second;
|
|
}
|
|
}
|
|
|
|
if (runner) {
|
|
switch (newState) {
|
|
case RunState::PAUSED:
|
|
case RunState::KILLED: {
|
|
runner->stop(true, true);
|
|
logger_.updateDAGRunState(runID, newState);
|
|
break;
|
|
}
|
|
default: {
|
|
REQ_RESPONSE(Method_Not_Allowed,
|
|
std::string{"Cannot transition to state "} +
|
|
newState._to_string());
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
switch (newState) {
|
|
case RunState::QUEUED: {
|
|
auto dagRun = logger_.getDAGRun(runID);
|
|
auto dag =
|
|
buildDAGFromTasks(dagRun.dagSpec.tasks, dagRun.taskStateChanges);
|
|
dag.resetRunning();
|
|
queueDAG_(runID, dag, dagRun.dagSpec.taskConfig);
|
|
break;
|
|
}
|
|
default:
|
|
REQ_RESPONSE(
|
|
Method_Not_Allowed,
|
|
std::string{"DAG not running, cannot transition to state "} +
|
|
newState._to_string());
|
|
}
|
|
}
|
|
REQ_RESPONSE(Ok, "");
|
|
}
|
|
|
|
void Server::handleGetTask(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
auto runID = request.param(":runID").as<DAGRunID>();
|
|
auto taskName = request.param(":taskName").as<std::string>();
|
|
|
|
try {
|
|
auto task = logger_.getTask(runID, taskName);
|
|
response.send(Pistache::Http::Code::Ok, taskToJSON(task));
|
|
}
|
|
catch (std::exception &e) {
|
|
REQ_RESPONSE(Not_Found, e.what());
|
|
}
|
|
}
|
|
|
|
void Server::handleGetTaskState(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
auto runID = request.param(":runID").as<DAGRunID>();
|
|
auto taskName = request.param(":taskName").as<std::string>();
|
|
|
|
try {
|
|
auto state = logger_.getTaskState(runID, taskName);
|
|
std::stringstream ss;
|
|
ss << R"({ "runID": )" << runID << R"(, "taskName": )"
|
|
<< std::quoted(taskName) << R"(, "state": )"
|
|
<< std::quoted(state._to_string()) << '}';
|
|
response.send(Pistache::Http::Code::Ok, ss.str());
|
|
}
|
|
catch (std::exception &e) {
|
|
REQ_RESPONSE(Not_Found, e.what());
|
|
}
|
|
}
|
|
|
|
void Server::handleSetTaskState(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
if (!handleAuth(request))
|
|
return;
|
|
|
|
// TODO implement handling of task state
|
|
auto runID = request.param(":runID").as<DAGRunID>();
|
|
auto taskName = request.param(":taskName").as<std::string>();
|
|
RunState state = RunState::_from_string(
|
|
request.param(":state").as<std::string>().c_str());
|
|
|
|
try {
|
|
logger_.updateTaskState(runID, taskName, state);
|
|
response.send(Pistache::Http::Code::Ok, "");
|
|
}
|
|
catch (std::exception &e) {
|
|
REQ_RESPONSE(Not_Found, e.what());
|
|
}
|
|
}
|
|
|
|
void Server::handleReady(const Pistache::Rest::Request &request,
|
|
Pistache::Http::ResponseWriter response)
|
|
{
|
|
response.send(Pistache::Http::Code::Ok, R"({ "msg": "Ya like DAGs?"})");
|
|
}
|
|
|
|
/*
|
|
* handleAuth will check any auth methods and handle any responses in the
|
|
* case of failed auth. If it returns false, callers should cease handling
|
|
* the response
|
|
*/
|
|
bool Server::handleAuth(const Pistache::Rest::Request &request)
|
|
{
|
|
return true;
|
|
}
|
|
} // namespace daggy::daggyd
|