- Fixing issue with parameter expansion on DAG submission to server
- Adding sections to unit_server tests - Adding cleanup
This commit is contained in:
@@ -101,13 +101,6 @@ namespace daggy {
|
|||||||
if (!doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
|
if (!doc.HasMember("tasks")) { REQ_ERROR(Bad_Request, "DAG Run has no tasks."); }
|
||||||
|
|
||||||
std::string runName = doc["name"].GetString();
|
std::string runName = doc["name"].GetString();
|
||||||
std::vector<Task> tasks;
|
|
||||||
try {
|
|
||||||
auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray());
|
|
||||||
tasks.swap(parsedTasks);
|
|
||||||
} catch (std::exception &e) {
|
|
||||||
REQ_ERROR(Bad_Request, e.what());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get parameters if there are any
|
// Get parameters if there are any
|
||||||
ParameterValues parameters;
|
ParameterValues parameters;
|
||||||
@@ -120,6 +113,16 @@ namespace daggy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the tasks
|
||||||
|
std::vector<Task> tasks;
|
||||||
|
try {
|
||||||
|
auto parsedTasks = tasksFromJSON(doc["tasks"].GetArray(), parameters);
|
||||||
|
tasks.swap(parsedTasks);
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
REQ_ERROR(Bad_Request, e.what());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Get a run ID
|
// Get a run ID
|
||||||
auto runID = logger_.startDAGRun(runName, tasks);
|
auto runID = logger_.startDAGRun(runName, tasks);
|
||||||
auto dag = buildDAGFromTasks(tasks);
|
auto dag = buildDAGFromTasks(tasks);
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
#include <iterator>
|
||||||
|
|
||||||
#include <magic_enum.hpp>
|
#include <magic_enum.hpp>
|
||||||
|
|
||||||
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
|
#include <daggy/loggers/dag_run/OStreamLogger.hpp>
|
||||||
@@ -13,6 +15,11 @@ namespace daggy {
|
|||||||
size_t runID = nextRunID_++;
|
size_t runID = nextRunID_++;
|
||||||
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
|
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
|
||||||
<< " tasks" << std::endl;
|
<< " tasks" << std::endl;
|
||||||
|
for (const auto &task : tasks) {
|
||||||
|
os_ << "TASK (" << task.name << "): ";
|
||||||
|
std::copy(task.command.begin(), task.command.end(), std::ostream_iterator<std::string>(os_, " "));
|
||||||
|
os_ << std::endl;
|
||||||
|
}
|
||||||
return runID;
|
return runID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -62,29 +62,34 @@ TEST_CASE("Server Basic Endpoints", "[server_basic]") {
|
|||||||
const std::string host = "localhost:";
|
const std::string host = "localhost:";
|
||||||
const std::string baseURL = host + std::to_string(server.getPort());
|
const std::string baseURL = host + std::to_string(server.getPort());
|
||||||
|
|
||||||
{
|
SECTION ("Ready Endpoint") {
|
||||||
auto response = REQUEST(baseURL + "/ready");
|
auto response = REQUEST(baseURL + "/ready");
|
||||||
REQUIRE(response.code() == Pistache::Http::Code::Ok);
|
REQUIRE(response.code() == Pistache::Http::Code::Ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string dagRun = R"({
|
SECTION("Simple DAGRun Submission") {
|
||||||
"name": "unit_server",
|
std::string dagRun = R"({
|
||||||
"parameters": { "DIRS": [ "A", "B" ] },
|
"name": "unit_server",
|
||||||
"tasks": [
|
"parameters": { "FILE": [ "A", "B" ] },
|
||||||
{ "name": "touch",
|
"tasks": [
|
||||||
"command": [ "/usr/bin/touch", "/tmp/{{DIRS}}" ]
|
{ "name": "touch",
|
||||||
},
|
"command": [ "/usr/bin/touch", "/tmp/{{FILE}}" ]
|
||||||
{
|
},
|
||||||
"name": "cat",
|
{
|
||||||
"command": [ "/usr/bin/cat", "/tmp/A", "/tmp/B" ]
|
"name": "cat",
|
||||||
"parents": [ "touch" ]
|
"command": [ "/usr/bin/cat", "/tmp/A", "/tmp/B" ],
|
||||||
}
|
"parents": [ "touch" ]
|
||||||
]
|
}
|
||||||
})";
|
]
|
||||||
|
})";
|
||||||
|
|
||||||
{
|
|
||||||
auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun);
|
auto response = REQUEST(baseURL + "/v1/dagrun/", dagRun);
|
||||||
REQUIRE(response.code() == Pistache::Http::Code::Ok);
|
REQUIRE(response.code() == Pistache::Http::Code::Ok);
|
||||||
|
|
||||||
|
for (const auto &pth : std::vector<fs::path>{"/tmp/A", "/tmp/B"}) {
|
||||||
|
REQUIRE(fs::exists(pth));
|
||||||
|
fs::remove(pth);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
|
|||||||
Reference in New Issue
Block a user