Files
daggy/libdaggy/src/Utilities.cpp
2022-01-12 13:11:55 -04:00

295 lines
8.1 KiB
C++

#include <daggy/Serialization.hpp>
#include <daggy/Utilities.hpp>
#include <future>
#include <iomanip>
using namespace std::chrono_literals;
static int http_trace(CURL *handle, curl_infotype type, char *data, size_t size,
void *userp)
{
const char *text;
(void)handle; /* prevent compiler warning */
(void)userp;
switch (type) {
case CURLINFO_TEXT:
fprintf(stderr, "== Info: %s", data);
default: /* in case a new one is introduced to shock us */
return 0;
case CURLINFO_HEADER_OUT:
text = "=> Send header";
break;
case CURLINFO_DATA_OUT:
text = "=> Send data";
break;
case CURLINFO_SSL_DATA_OUT:
text = "=> Send SSL data";
break;
case CURLINFO_HEADER_IN:
text = "<= Recv header";
break;
case CURLINFO_DATA_IN:
text = "<= Recv data";
break;
case CURLINFO_SSL_DATA_IN:
text = "<= Recv SSL data";
break;
}
std::cerr << "\n================== " << text
<< " ==================" << std::endl
<< data << std::endl;
return 0;
}
uint curlWriter(char *in, uint size, uint nmemb, std::stringstream *out)
{
uint r;
r = size * nmemb;
out->write(in, r);
return r;
}
namespace daggy {
std::string globalSub(std::string string, const std::string &pattern,
const std::string &replacement)
{
size_t pos = string.find(pattern);
while (pos != std::string::npos) {
string.replace(pos, pattern.size(), replacement);
pos = string.find(pattern, pos + replacement.size());
}
return string;
}
std::vector<std::unordered_map<std::string, std::string>>
generateCartesianValues(const ConfigValues &values)
{
using ResultType =
std::vector<std::unordered_map<std::string, std::string>>;
ResultType result{{}};
for (const auto &[k, v] : values) {
if (std::holds_alternative<std::string>(v)) {
for (auto &valset : result) {
valset.emplace(k, std::get<std::string>(v));
}
}
else {
ResultType new_result;
for (const auto &val : std::get<std::vector<std::string>>(v)) {
for (auto valset : result) {
valset.emplace(k, val);
new_result.emplace_back(valset);
}
}
result.swap(new_result);
}
}
return result;
}
std::unordered_set<std::string> matchingParameters(
const std::vector<std::string> &input, const ConfigValues &values)
{
std::unordered_set<std::string> matchParams;
for (const auto &[k, v] : values) {
std::string pattern = "{{" + k + "}}";
bool anyMatched =
std::any_of(input.begin(), input.end(), [&](const auto &part) {
return part.find(pattern) != std::string::npos;
});
if (anyMatched)
matchParams.insert(k);
}
return matchParams;
}
std::vector<std::vector<std::string>> interpolateValues(
const std::vector<std::string> &raw, const ConfigValues &values)
{
std::vector<std::vector<std::string>> cooked;
auto matchParams = matchingParameters(raw, values);
if (matchParams.empty()) {
cooked.emplace_back(raw);
return cooked;
}
ConfigValues paramSubset;
for (const auto &[k, v] : values) {
if (matchParams.count(k) == 0)
continue;
paramSubset.emplace(k, v);
}
const auto valueSets = generateCartesianValues(paramSubset);
for (const auto &valueSet : valueSets) {
std::vector<std::string> item(raw);
for (auto &part : item) {
for (const auto &[k, v] : valueSet) {
part = globalSub(part, "{{" + k + "}}", v);
}
}
cooked.emplace_back(item);
}
return cooked;
}
TaskSet expandTaskSet(const TaskSet &tasks,
executors::task::TaskExecutor &executor,
const ConfigValues &interpolatedValues)
{
// Expand the tasks first
TaskSet newTaskSet;
for (const auto &[baseName, task] : tasks) {
executor.validateTaskParameters(task.job);
const auto newJobs =
executor.expandTaskParameters(task.job, interpolatedValues);
size_t i = 0;
for (const auto &newJob : newJobs) {
Task newTask{task};
newTask.job = newJob;
newTaskSet.emplace(baseName + "_" + std::to_string(i), newTask);
++i;
}
}
return newTaskSet;
}
void updateDAGFromTasks(TaskDAG &dag, const TaskSet &tasks)
{
// Add the missing vertices
for (const auto &[name, task] : tasks) {
dag.addVertex(name, task);
}
// Add edges
for (const auto &[name, t] : tasks) {
const auto &task = t;
dag.addEdgeIf(name, [&task](const auto &v) {
return task.children.count(v.data.definedName) > 0;
});
}
if (!dag.isValid()) {
throw std::runtime_error("DAG contains a cycle");
}
}
TaskDAG buildDAGFromTasks(
const TaskSet &tasks,
const std::unordered_map<std::string,
std::vector<loggers::dag_run::StateUpdateRecord>>
&updates)
{
TaskDAG dag;
updateDAGFromTasks(dag, tasks);
// Replay any updates
for (const auto &[taskName, taskUpdates] : updates) {
for (const auto &update : taskUpdates) {
switch (update.state) {
case RunState::RUNNING:
case RunState::RETRY:
case RunState::PAUSED:
case RunState::ERRORED:
case RunState::KILLED:
dag.setVertexState(taskName, RunState::RUNNING);
break;
case RunState::COMPLETED:
case RunState::QUEUED:
break;
}
}
}
return dag;
}
std::ostream &operator<<(std::ostream &os, const TimePoint &tp)
{
os << tp.time_since_epoch().count() << std::endl;
return os;
}
HTTPResponse HTTP_REQUEST(const std::string &url, const std::string &payload,
const std::string &method, bool trace)
{
HTTPResponse response{.code = HTTPCode::Ok, .body = ""};
CURL *curl;
CURLcode res;
struct curl_slist *headers = NULL;
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if (curl) {
std::stringstream buffer;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriter);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3);
if (trace) {
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, http_trace);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
}
if (!payload.empty()) {
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, payload.size());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload.c_str());
headers = curl_slist_append(headers, "Content-Type: Application/Json");
}
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str());
headers = curl_slist_append(headers, "Expect:");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
res = curl_easy_perform(curl);
if (res != CURLE_OK) {
curl_easy_cleanup(curl);
response.code = HTTPCode::Server_Error;
response.body = std::string{"CURL Failed: "} + curl_easy_strerror(res);
return response;
}
curl_easy_cleanup(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.code);
response.body = buffer.str();
}
curl_global_cleanup();
return response;
}
std::pair<HTTPCode, rj::Document> JSON_HTTP_REQUEST(
const std::string &url, const std::string &payload,
const std::string &method, bool trace)
{
auto response = HTTP_REQUEST(url, payload, method);
rj::Document doc;
try {
checkRJParse(doc.Parse(response.body.c_str()));
}
catch (std::exception &e) {
doc.SetObject();
auto &alloc = doc.GetAllocator();
std::string message = (response.body.empty() ? e.what() : response.body);
doc.AddMember(
"error",
rj::Value().SetString(message.c_str(), message.size(), alloc), alloc);
}
return std::make_pair(response.code, std::move(doc));
}
} // namespace daggy