#include #include #include #include #include #include #include using namespace daggy::executors::task; namespace daggy::executors::task::forking_executor { void validateTaskParameters(const daggy::ConfigValues &job) { // command or commandString is required if (job.count("command")) { if (!std::holds_alternative(job.at("command"))) throw std::runtime_error(R"(command must be an array of strings)"); } else { if (job.count("commandString") == 0) { throw std::runtime_error( R"(command or commandString must be defined.)"); } if (!std::holds_alternative(job.at("commandString"))) throw std::runtime_error(R"(commandString must be a string)"); } if (job.count("environment")) { if (!std::holds_alternative(job.at("environment"))) throw std::runtime_error(R"(environment must be an array of strings)"); } } } // namespace daggy::executors::task::forking_executor std::string slurp(int fd) { std::string result; const ssize_t BUFFER_SIZE = 4096; char buffer[BUFFER_SIZE]; struct pollfd pfd { .fd = fd, .events = POLLIN, .revents = 0 }; poll(&pfd, 1, 1); while (pfd.revents & POLLIN) { ssize_t bytes = read(fd, buffer, BUFFER_SIZE); if (bytes == 0) { break; } else { result.append(buffer, bytes); } pfd.revents = 0; poll(&pfd, 1, 1); } return result; } ForkingTaskExecutor::ForkingTaskExecutor(size_t nThreads) : tp_(nThreads) { } ForkingTaskExecutor::~ForkingTaskExecutor() { std::lock_guard lock(taskControlsGuard_); taskControls_.clear(); } std::string ForkingTaskExecutor::description() const { std::stringstream ss; ss << "ForkingTaskExecutor with " << tp_.size() << " threads"; return ss.str(); } bool ForkingTaskExecutor::stop(DAGRunID runID, const std::string &taskName) { std::string key = std::to_string(runID) + "_" + taskName; std::lock_guard lock(taskControlsGuard_); auto it = taskControls_.find(key); if (it == taskControls_.end()) return true; it->second = false; return true; } TaskFuture ForkingTaskExecutor::execute(DAGRunID runID, const std::string &taskName, const Task &task) { std::string key = std::to_string(runID) + "_" + taskName; std::unique_lock lock(taskControlsGuard_); auto [it, ins] = taskControls_.emplace(key, true); lock.unlock(); auto &running = it->second; return tp_.addTask([this, task, taskName, &running, key]() { auto ret = this->runTask(task, running); std::lock_guard lock(this->taskControlsGuard_); this->taskControls_.extract(key); return ret; }); } daggy::AttemptRecord ForkingTaskExecutor::runTask(const Task &task, std::atomic &running) { AttemptRecord rec{.rc = -1}; rec.startTime = Clock::now(); // Need to convert the strings std::vector argv; std::vector envp; // Populate the command Command command; if (task.job.count("commandString")) { std::stringstream ss; ss << std::get(task.job.at("commandString")); std::string tok; while (ss >> std::quoted(tok)) { command.push_back(tok); } } else { const auto cmd = std::get(task.job.at("command")); std::copy(cmd.begin(), cmd.end(), std::back_inserter(command)); } std::transform( command.begin(), command.end(), std::back_inserter(argv), [](const std::string &s) { return const_cast(s.c_str()); }); argv.push_back(nullptr); // Populate the environment auto environment = (task.job.count("environment") == 0 ? std::vector{} : std::get(task.job.at("environment"))); std::transform( environment.begin(), environment.end(), std::back_inserter(envp), [](const std::string &s) { return const_cast(s.c_str()); }); envp.push_back(nullptr); // Create the pipe int stdoutPipe[2]; int pipeRC = pipe2(stdoutPipe, O_DIRECT); if (pipeRC != 0) { rec.executorLog = "Unable to create pipe for stdout, check limits"; return rec; } int stderrPipe[2]; pipeRC = pipe2(stderrPipe, O_DIRECT); if (pipeRC != 0) { rec.executorLog = "Unable to create pipe for stderr, check limits"; return rec; } pid_t child = fork(); if (child < 0) { rec.executorLog = "Unable to fork."; return rec; } else if (child == 0) { // child while ((dup2(stdoutPipe[1], STDOUT_FILENO) == -1) && (errno == EINTR)) { } while ((dup2(stderrPipe[1], STDERR_FILENO) == -1) && (errno == EINTR)) { } close(stdoutPipe[0]); close(stderrPipe[0]); char **env = (envp.empty() ? nullptr : envp.data()); execvpe(argv[0], argv.data(), env); std::cerr << "Unable to launch \"" << argv[0] << "\": " << std::strerror(errno) << std::endl; exit(errno); } std::atomic reading = true; std::thread stdoutReader([&]() { while (reading) rec.outputLog.append(slurp(stdoutPipe[0])); }); std::thread stderrReader([&]() { while (reading) rec.errorLog.append(slurp(stderrPipe[0])); }); siginfo_t childInfo; while (running) { childInfo.si_pid = 0; waitid(P_PID, child, &childInfo, WEXITED | WNOHANG); if (childInfo.si_pid > 0) { break; } std::this_thread::sleep_for(100ms); } if (!running) { rec.executorLog = "Killed"; // Send the kills until pid is dead while (kill(child, SIGKILL) != -1) { // Need to collect the child to avoid a zombie process waitid(P_PID, child, &childInfo, WEXITED | WNOHANG); std::this_thread::sleep_for(50ms); } } reading = false; rec.stopTime = Clock::now(); if (childInfo.si_pid > 0) { rec.rc = childInfo.si_status; } else { rec.rc = -1; } stdoutReader.join(); stderrReader.join(); close(stdoutPipe[0]); close(stderrPipe[0]); close(stdoutPipe[1]); close(stderrPipe[1]); return rec; } bool ForkingTaskExecutor::validateTaskParameters(const ConfigValues &job) { forking_executor::validateTaskParameters(job); return true; } std::vector ForkingTaskExecutor::expandTaskParameters( const ConfigValues &job, const ConfigValues &expansionValues) { std::vector newValues; auto command = (job.count("command") == 0 ? Command{} : std::get(job.at("command"))); auto environment = (job.count("environment") == 0 ? Command{} : std::get(job.at("environment"))); Command both(command); std::copy(environment.begin(), environment.end(), std::back_inserter(both)); for (const auto &parts : interpolateValues(both, expansionValues)) { ConfigValues newCommand{job}; newCommand["command"] = Command(parts.begin(), parts.begin() + command.size()); newCommand["environment"] = Command(parts.begin() + command.size(), parts.end()); newValues.emplace_back(newCommand); } return newValues; }