Adding executors and varmap

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-09-29 08:57:20 -03:00
parent a3a0f0f3e1
commit b436ea6da5
4 changed files with 528 additions and 0 deletions
+263
View File
@@ -0,0 +1,263 @@
use super::*;
use futures::stream::futures_unordered::FuturesUnordered;
use psutil;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::process::Stdio;
use tokio::process::Command;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
use futures::StreamExt;
use tokio::io::AsyncReadExt;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
enum Cmd {
Simple(String),
Split(Vec<String>),
}
impl Cmd {
fn generate(&self, varmap: &VarMap) -> Vec<String> {
let cmd = match self {
Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(),
Cmd::Split(v) => v.clone(),
};
cmd.into_iter().map(|x| varmap.apply_to(&x)).collect()
}
}
/// Contains specifics on how to run a local task
#[derive(Serialize, Deserialize, Clone, Debug)]
struct LocalTaskDetail {
/// The command and all arguments to run
command: Cmd,
/// Environment variables to set
#[serde(default)]
environment: HashMap<String, Option<String>>,
/// Timeout in seconds
#[serde(default)]
timeout: u64,
}
fn extract_details(details: &TaskDetails) -> Result<LocalTaskDetail, serde_json::Error> {
serde_json::from_value::<LocalTaskDetail>(details.clone())
}
fn validate_task(details: &TaskDetails) -> Result<()> {
if let Err(err) = extract_details(details) {
Err(anyhow!("{}", err))
} else {
Ok(())
}
}
struct ChildStats {
max_cpu: f32,
avg_cpu: f32,
max_rss: u64,
avg_rss: f32,
}
// Collect performance stats for a child
async fn gather_child_stats(pid: psutil::Pid) -> Result<ChildStats> {
let mut stats = ChildStats {
max_cpu: 0.0,
avg_cpu: 0.0,
max_rss: 0,
avg_rss: 0.0,
};
let mut periods: f32 = 0.0;
let mut proc = psutil::process::Process::new(pid)?;
while let (Ok(pct), Ok(mem)) = (proc.cpu_percent(), proc.memory_info()) {
// update CPU
if pct > stats.max_cpu {
stats.max_cpu = pct;
}
stats.avg_cpu += pct;
// update RSS
let rss = mem.rss();
if rss > stats.max_rss {
stats.max_rss = rss;
}
stats.avg_rss += rss as f32;
periods += 1.0;
sleep(Duration::from_millis(100)).await;
}
if periods > 0.0 {
stats.avg_cpu /= periods;
stats.avg_rss /= periods;
}
Ok(stats)
}
async fn run_task(
task: TaskDetails,
mut stop_rx: oneshot::Receiver<()>,
output_options: TaskOutputOptions,
varmap: VarMap,
) -> Result<TaskAttempt> {
let mut details = extract_details(&task).unwrap();
let mut attempt = TaskAttempt::new();
let cmd = details.command.generate(&varmap);
details.command = Cmd::Split(cmd.clone());
let (program, args) = cmd.split_first().unwrap();
attempt.executor.push(format!("{:?}\n", details));
let mut command = Command::new(program);
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.args(args);
// Need to convert optional
let cmd_env: HashMap<String, String> = details
.environment
.iter()
.filter(|(_, v)| v.is_some())
.map(|(k, v)| (k.clone(), varmap.apply_to(&v.clone().unwrap())))
.collect();
command.env_clear();
command.envs(cmd_env);
attempt.start_time = Utc::now();
let mut child = command.spawn()?;
// Start getting performance stats
let pid = child.id().unwrap();
let perf_monitor = tokio::spawn(async move { gather_child_stats(pid).await });
// Read from stdout constantly to prevent pipe blocking
let mut stdout_handle = child.stdout.take().unwrap();
let stdout_reader: tokio::task::JoinHandle<Result<Vec<u8>>> = tokio::spawn(async move {
let mut data = Vec::new();
stdout_handle.read_to_end(&mut data).await?;
Ok(data)
});
// Read from stderr constantly to prevent pipe blocking
let mut stderr_handle = child.stderr.take().unwrap();
let stderr_reader: tokio::task::JoinHandle<Result<Vec<u8>>> = tokio::spawn(async move {
let mut data = Vec::new();
stderr_handle.read_to_end(&mut data).await?;
Ok(data)
});
// Generate a timeout message, if needed
let (timeout_tx, mut timeout_rx) = oneshot::channel();
if details.timeout > 0 {
let timeout = details.timeout;
tokio::spawn(async move {
sleep(Duration::from_millis(1000 * timeout)).await;
timeout_tx.send(()).unwrap_or(());
});
}
tokio::select! {
_ = child.wait() => {},
_ = (&mut stop_rx) => {
attempt.killed = true;
child.kill().await.unwrap_or(());
attempt.executor.push("Task was killed by request".to_owned());
}
_ = (&mut timeout_rx) => {
child.kill().await.unwrap_or(());
attempt.killed = true;
attempt.executor.push("Task exceeded the timeout interval and was killed".to_owned());
}
}
// Get any output
let mut stdout = String::from_utf8_lossy(&stdout_reader.await??).to_string();
let mut stderr = String::from_utf8_lossy(&stderr_reader.await??).to_string();
let output = child.wait_with_output().await.unwrap();
attempt.exit_code = output.status.code().unwrap_or(-1i32);
attempt.succeeded = output.status.success();
if !(attempt.succeeded && output_options.discard_successful) {
if output_options.truncate {
stdout = head_tail(
&stdout,
output_options.head_bytes,
output_options.tail_bytes,
);
stderr = head_tail(
&stdout,
output_options.head_bytes,
output_options.tail_bytes,
);
}
attempt.output = stdout;
attempt.error = stderr;
}
// Set stats
if let Ok(stats) = perf_monitor.await? {
attempt.max_cpu = stats.max_cpu;
attempt.avg_cpu = stats.avg_cpu;
attempt.max_rss = stats.max_rss;
attempt.avg_rss = stats.avg_rss;
}
attempt.stop_time = Utc::now();
Ok(attempt)
}
/// The mpsc channel can be sized to fit max parallelism
pub async fn start_local_executor(
max_parallel: usize,
mut exe_msgs: mpsc::UnboundedReceiver<ExecutorMessage>,
) {
let mut running = FuturesUnordered::new();
while let Some(msg) = exe_msgs.recv().await {
use ExecutorMessage::{ExecuteTask, Stop, ValidateTask};
match msg {
ValidateTask { details, response } => {
tokio::spawn(async move {
let result = validate_task(&details);
response.send(result).unwrap_or(());
});
}
ExecuteTask {
details,
varmap,
output_options,
response,
kill,
} => {
if running.len() == max_parallel {
running.next().await;
}
running.push(tokio::spawn(async move {
let attempt = match run_task(details, kill, output_options, varmap).await {
Ok(attempt) => attempt,
Err(e) => TaskAttempt {
succeeded: false,
executor: vec![format!("Failed to launch command: {:?}", e)],
..TaskAttempt::new()
},
};
response.send(attempt).unwrap();
}));
}
Stop {} => {
break;
}
}
}
}
pub fn start(max_parallel: usize, msgs: mpsc::UnboundedReceiver<ExecutorMessage>) {
tokio::spawn(async move {
start_local_executor(max_parallel, msgs).await;
});
}
+179
View File
@@ -0,0 +1,179 @@
use super::*;
pub mod local_executor;
fn default_bytes() -> usize {
20480
}
/// Options in how to handle task output. Some tasks can be quite
/// verbose, and the output may not be needed.
#[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)]
#[serde(deny_unknown_fields)]
pub struct TaskOutputOptions {
/// If true, output from successful tasks is discarded entirely, in
/// keeping with the UNIX philosophy of no news is good news
#[serde(default)]
pub discard_successful: bool,
/// If true, and output is not discarded, truncate the output of
/// each task to a maximum of the first / last `preserve` kb of
/// data
#[serde(default)]
pub truncate: bool,
/// Number of KB of output to preserve at the beginning of the ouptut
#[serde(default = "default_bytes")]
pub head_bytes: usize,
/// Number of KB of output to preserve at the end of the outut
#[serde(default = "default_bytes")]
pub tail_bytes: usize,
}
impl Default for TaskOutputOptions {
fn default() -> Self {
TaskOutputOptions {
discard_successful: true,
truncate: true,
head_bytes: default_bytes(),
tail_bytes: default_bytes(),
}
}
}
/// Messages for interacting with an Executor
#[derive(Debug)]
pub enum ExecutorMessage {
/// Validate a set of tasks.
/// Errors
/// Returns the vector of task issues
ValidateTask {
details: serde_json::Value,
response: oneshot::Sender<Result<()>>,
},
/// Execute the given task, along with enough information
/// Errors
/// Will return `Err` if the tasks are invalid, according to the executor
ExecuteTask {
details: serde_json::Value,
varmap: VarMap,
output_options: TaskOutputOptions,
response: oneshot::Sender<TaskAttempt>,
kill: oneshot::Receiver<()>,
},
Stop {},
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskAttempt {
#[serde(default)]
pub task_name: String,
#[serde(default = "chrono::Utc::now")]
pub scheduled_time: DateTime<Utc>,
#[serde(default = "chrono::Utc::now")]
pub start_time: DateTime<Utc>,
#[serde(default = "chrono::Utc::now")]
pub stop_time: DateTime<Utc>,
#[serde(default)]
pub succeeded: bool,
#[serde(default)]
pub killed: bool,
#[serde(default)]
pub infra_failure: bool,
#[serde(default)]
pub output: String,
#[serde(default)]
pub error: String,
#[serde(default)]
pub executor: Vec<String>,
#[serde(default)]
pub exit_code: i32,
/// as a percentage
#[serde(default)]
pub max_cpu: f32,
/// as a percentage
#[serde(default)]
pub avg_cpu: f32,
/// In bytes
#[serde(default)]
pub max_rss: u64,
/// In bytes
#[serde(default)]
pub avg_rss: f32,
}
impl Default for TaskAttempt {
fn default() -> Self {
TaskAttempt {
task_name: String::new(),
scheduled_time: Utc::now(),
start_time: Utc::now(),
stop_time: Utc::now(),
succeeded: false,
killed: false,
infra_failure: false,
output: "".to_owned(),
error: "".to_owned(),
executor: Vec::new(),
exit_code: 0i32,
max_cpu: 0.0,
avg_cpu: 0.0,
max_rss: 0,
avg_rss: 0.0,
}
}
}
impl TaskAttempt {
#[must_use]
pub fn new() -> Self {
TaskAttempt::default()
}
}
/// Keeps the first / last bytes of a str
#[must_use]
pub fn head_tail(data: &str, head: usize, tail: usize) -> String {
if data.len() < head + tail {
data.to_owned()
} else {
let n_chars = data.chars().count();
let charsize = (data.len() as f64 / n_chars as f64).ceil() as usize;
let head_chars = head / charsize;
let tail_chars = tail / charsize;
let mut tail: String = data.chars().rev().take(tail_chars).collect();
tail = tail.chars().rev().collect();
format!(
"{}\n...\n{}",
data.chars().take(head_chars).collect::<String>(),
tail
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_head_tail() {
let sample = "This is a very long string".to_owned();
assert_eq!(head_tail(&sample, 5, 5), "This \n...\ntring".to_owned());
assert_eq!(head_tail(&sample, 50, 50), sample);
}
}
+2
View File
@@ -15,6 +15,7 @@ use crate::interval_set::*;
use crate::requirement::*; use crate::requirement::*;
use crate::schedule::*; use crate::schedule::*;
use crate::task::*; use crate::task::*;
use crate::varmap::*;
pub type Resource = String; pub type Resource = String;
pub type TaskDetails = serde_json::Value; pub type TaskDetails = serde_json::Value;
@@ -26,3 +27,4 @@ pub mod interval_set;
pub mod requirement; pub mod requirement;
pub mod schedule; pub mod schedule;
pub mod task; pub mod task;
pub mod varmap;
+84
View File
@@ -0,0 +1,84 @@
use super::*;
use std::ops::{Deref, DerefMut};
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct VarMap(HashMap<String, String>);
impl Deref for VarMap {
type Target = HashMap<String, String>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for VarMap {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl VarMap {
// Derive variables from a given interval
pub fn from_interval(int: &Interval, tz: Tz) -> Self {
let start = int.start.with_timezone(&tz);
let end = int.end.with_timezone(&tz);
VarMap(HashMap::from([
("PERIOD_START".to_owned(), format!("{}", start)),
("PERIOD_END".to_owned(), format!("{}", end)),
("yyyy".to_owned(), format!("{}", end.year())),
("mm".to_owned(), format!("{}", end.month())),
("dd".to_owned(), format!("{}", end.day())),
(
"yyyymmdd".to_owned(),
format!("{}{}{}", end.year(), end.month(), end.day()),
),
(
"hhmmss".to_owned(),
format!("{}{}{}", end.hour(), end.minute(), end.second()),
),
]))
}
/// Interpolate values into a string, assuming string has variables
/// as ${varname}
pub fn apply_to(&self, s: &str) -> String {
let mut expanded = s.to_string();
for (key, value) in self.0.iter() {
expanded = expanded.replace(&format!("${{{}}}", key), value);
}
expanded
}
}
impl From<HashMap<String, String>> for VarMap {
fn from(data: HashMap<String, String>) -> Self {
VarMap(data)
}
}
impl<'a> FromIterator<(&'a String, &'a String)> for VarMap {
fn from_iter<I: IntoIterator<Item = (&'a String, &'a String)>>(iter: I) -> Self {
let mut data = HashMap::new();
for (k, v) in iter {
data.insert(k.clone(), v.clone());
}
VarMap(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn check_simple_apply() {
let s = "This is a ${test} of home and ${test} of away ${beep}";
let vm = VarMap(HashMap::from([("test".to_owned(), "alpha".to_owned())]));
assert_eq!(
&vm.apply_to(s),
"This is a alpha of home and alpha of away ${beep}"
);
}
}