#include #include #include #include #include #include #include #include #include #include #include #include namespace rj = rapidjson; using namespace daggy; TEST_CASE("rest_endpoint", "[server_basic]") { std::stringstream ss; Pistache::Address listenSpec("localhost", Pistache::Port(0)); const ssize_t maxCores = 10, maxMemoryMB = 1000; daggyr::Server server(listenSpec, maxCores, maxMemoryMB); server.init(10); server.start(); const std::string host = "localhost:"; const std::string baseURL = host + std::to_string(server.getPort()); SECTION("Ready Endpoint") { auto response = HTTP_REQUEST(baseURL + "/ready"); REQUIRE(response.code == HTTPCode::Ok); } SECTION("Task Missing Cores should Fail") { std::string taskSpec = R"({ "job": { "command": [ "/bin/touch", "dagrun_{{FILE}}" ]}, "memoryMB": 100 })"; auto response = HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); REQUIRE(response.code == HTTPCode::Not_Acceptable); } SECTION("Task Missing MemoryMB should Fail") { std::string taskSpec = R"({ "job": { "command": [ "/bin/touch", "dagrun_{{FILE}}" ]}, "cores": 100 })"; auto response = HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); REQUIRE(response.code == HTTPCode::Not_Acceptable); } SECTION("Task submission and get result") { std::string taskSpec = R"({ "job": { "command": [ "/bin/echo", "hello", "world" ], "cores": "1", "memoryMB": "100" } })"; // Submit { auto response = HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); REQUIRE(response.code == HTTPCode::Ok); } while (true) { std::this_thread::sleep_for(250ms); auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/poll"); REQUIRE(doc.IsArray()); if (doc.Size() == 0) continue; const auto &task = doc[0]; REQUIRE(task.HasMember("state")); std::string state = task["state"].GetString(); if (state != "COMPLETED") continue; REQUIRE(task.HasMember("attempt")); auto attempt = attemptRecordFromJSON(task["attempt"]); REQUIRE(attempt.rc == 0); REQUIRE(attempt.outputLog == "hello world\n"); break; } } SECTION("Task capacity changes") { std::string taskSpec = R"({ "job": { "command": [ "/bin/sleep", "5" ], "cores": "1", "memoryMB": "100" } })"; auto getCapacity = [&]() -> daggy::executors::task::daggy_runner::Capacity { daggy::executors::task::daggy_runner::Capacity cap; auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/capacity"); REQUIRE(doc.IsObject()); REQUIRE(doc.HasMember("current")); const auto &cur = doc["current"]; REQUIRE(cur.IsObject()); REQUIRE(cur.HasMember("cores")); REQUIRE(cur.HasMember("memoryMB")); cap.cores = cur["cores"].GetInt64(); cap.memoryMB = cur["memoryMB"].GetInt64(); return cap; }; auto preCap = getCapacity(); // Submit { auto response = HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", taskSpec, "POST"); REQUIRE(response.code == HTTPCode::Ok); } auto postCap = getCapacity(); REQUIRE(postCap.cores == preCap.cores - 1); REQUIRE(postCap.memoryMB == preCap.memoryMB - 100); // Ensure the current job is running { auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/poll"); REQUIRE(doc.IsArray()); REQUIRE(doc.Size() > 0); REQUIRE(doc[0].HasMember("state")); REQUIRE(doc[0]["state"] != "COMPLETED"); } // Stop it { auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/task/0/sample_task", "", "DELETE"); REQUIRE(code == HTTPCode::Ok); } // Grab it and ensure it was killed while (true) { auto [code, doc] = JSON_HTTP_REQUEST(baseURL + "/v1/poll"); REQUIRE(code == HTTPCode::Ok); REQUIRE(doc.IsArray()); if (doc.Size() == 0) continue; const auto &task = doc[0]; REQUIRE(task.HasMember("state")); std::string state = task["state"].GetString(); if (state != "COMPLETED") continue; REQUIRE(task.HasMember("attempt")); auto attempt = attemptRecordFromJSON(task["attempt"]); REQUIRE(attempt.rc != 0); break; } } server.shutdown(); }