From b436ea6da53aa10b4d27c3a2bd200de8d36ec4c3 Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Thu, 29 Sep 2022 08:57:20 -0300 Subject: [PATCH] Adding executors and varmap --- src/executors/local_executor.rs | 263 ++++++++++++++++++++++++++++++++ src/executors/mod.rs | 179 ++++++++++++++++++++++ src/lib.rs | 2 + src/varmap.rs | 84 ++++++++++ 4 files changed, 528 insertions(+) create mode 100644 src/executors/local_executor.rs create mode 100644 src/executors/mod.rs create mode 100644 src/varmap.rs diff --git a/src/executors/local_executor.rs b/src/executors/local_executor.rs new file mode 100644 index 0000000..7e20c2c --- /dev/null +++ b/src/executors/local_executor.rs @@ -0,0 +1,263 @@ +use super::*; +use futures::stream::futures_unordered::FuturesUnordered; +use psutil; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::process::Stdio; +use tokio::process::Command; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::{sleep, Duration}; + +use futures::StreamExt; +use tokio::io::AsyncReadExt; + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(untagged)] +enum Cmd { + Simple(String), + Split(Vec), +} + +impl Cmd { + fn generate(&self, varmap: &VarMap) -> Vec { + let cmd = match self { + Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(), + Cmd::Split(v) => v.clone(), + }; + + cmd.into_iter().map(|x| varmap.apply_to(&x)).collect() + } +} + +/// Contains specifics on how to run a local task +#[derive(Serialize, Deserialize, Clone, Debug)] +struct LocalTaskDetail { + /// The command and all arguments to run + command: Cmd, + + /// Environment variables to set + #[serde(default)] + environment: HashMap>, + + /// Timeout in seconds + #[serde(default)] + timeout: u64, +} + +fn extract_details(details: &TaskDetails) -> Result { + serde_json::from_value::(details.clone()) +} + +fn validate_task(details: &TaskDetails) -> Result<()> { + if let Err(err) = extract_details(details) { + Err(anyhow!("{}", err)) + } else { + Ok(()) + } +} + +struct ChildStats { + max_cpu: f32, + avg_cpu: f32, + max_rss: u64, + avg_rss: f32, +} + +// Collect performance stats for a child +async fn gather_child_stats(pid: psutil::Pid) -> Result { + let mut stats = ChildStats { + max_cpu: 0.0, + avg_cpu: 0.0, + max_rss: 0, + avg_rss: 0.0, + }; + let mut periods: f32 = 0.0; + + let mut proc = psutil::process::Process::new(pid)?; + + while let (Ok(pct), Ok(mem)) = (proc.cpu_percent(), proc.memory_info()) { + // update CPU + if pct > stats.max_cpu { + stats.max_cpu = pct; + } + stats.avg_cpu += pct; + + // update RSS + let rss = mem.rss(); + if rss > stats.max_rss { + stats.max_rss = rss; + } + stats.avg_rss += rss as f32; + + periods += 1.0; + sleep(Duration::from_millis(100)).await; + } + if periods > 0.0 { + stats.avg_cpu /= periods; + stats.avg_rss /= periods; + } + Ok(stats) +} + +async fn run_task( + task: TaskDetails, + mut stop_rx: oneshot::Receiver<()>, + output_options: TaskOutputOptions, + varmap: VarMap, +) -> Result { + let mut details = extract_details(&task).unwrap(); + let mut attempt = TaskAttempt::new(); + let cmd = details.command.generate(&varmap); + details.command = Cmd::Split(cmd.clone()); + let (program, args) = cmd.split_first().unwrap(); + attempt.executor.push(format!("{:?}\n", details)); + + let mut command = Command::new(program); + command.stdout(Stdio::piped()); + command.stderr(Stdio::piped()); + command.args(args); + + // Need to convert optional + let cmd_env: HashMap = details + .environment + .iter() + .filter(|(_, v)| v.is_some()) + .map(|(k, v)| (k.clone(), varmap.apply_to(&v.clone().unwrap()))) + .collect(); + + command.env_clear(); + command.envs(cmd_env); + + attempt.start_time = Utc::now(); + let mut child = command.spawn()?; + + // Start getting performance stats + let pid = child.id().unwrap(); + let perf_monitor = tokio::spawn(async move { gather_child_stats(pid).await }); + + // Read from stdout constantly to prevent pipe blocking + let mut stdout_handle = child.stdout.take().unwrap(); + let stdout_reader: tokio::task::JoinHandle>> = tokio::spawn(async move { + let mut data = Vec::new(); + stdout_handle.read_to_end(&mut data).await?; + Ok(data) + }); + + // Read from stderr constantly to prevent pipe blocking + let mut stderr_handle = child.stderr.take().unwrap(); + let stderr_reader: tokio::task::JoinHandle>> = tokio::spawn(async move { + let mut data = Vec::new(); + stderr_handle.read_to_end(&mut data).await?; + Ok(data) + }); + + // Generate a timeout message, if needed + let (timeout_tx, mut timeout_rx) = oneshot::channel(); + if details.timeout > 0 { + let timeout = details.timeout; + tokio::spawn(async move { + sleep(Duration::from_millis(1000 * timeout)).await; + timeout_tx.send(()).unwrap_or(()); + }); + } + + tokio::select! { + _ = child.wait() => {}, + _ = (&mut stop_rx) => { + attempt.killed = true; + child.kill().await.unwrap_or(()); + attempt.executor.push("Task was killed by request".to_owned()); + } + _ = (&mut timeout_rx) => { + child.kill().await.unwrap_or(()); + attempt.killed = true; + attempt.executor.push("Task exceeded the timeout interval and was killed".to_owned()); + } + } + + // Get any output + let mut stdout = String::from_utf8_lossy(&stdout_reader.await??).to_string(); + let mut stderr = String::from_utf8_lossy(&stderr_reader.await??).to_string(); + + let output = child.wait_with_output().await.unwrap(); + attempt.exit_code = output.status.code().unwrap_or(-1i32); + attempt.succeeded = output.status.success(); + if !(attempt.succeeded && output_options.discard_successful) { + if output_options.truncate { + stdout = head_tail( + &stdout, + output_options.head_bytes, + output_options.tail_bytes, + ); + stderr = head_tail( + &stdout, + output_options.head_bytes, + output_options.tail_bytes, + ); + } + attempt.output = stdout; + attempt.error = stderr; + } + + // Set stats + if let Ok(stats) = perf_monitor.await? { + attempt.max_cpu = stats.max_cpu; + attempt.avg_cpu = stats.avg_cpu; + attempt.max_rss = stats.max_rss; + attempt.avg_rss = stats.avg_rss; + } + + attempt.stop_time = Utc::now(); + Ok(attempt) +} + +/// The mpsc channel can be sized to fit max parallelism +pub async fn start_local_executor( + max_parallel: usize, + mut exe_msgs: mpsc::UnboundedReceiver, +) { + let mut running = FuturesUnordered::new(); + + while let Some(msg) = exe_msgs.recv().await { + use ExecutorMessage::{ExecuteTask, Stop, ValidateTask}; + match msg { + ValidateTask { details, response } => { + tokio::spawn(async move { + let result = validate_task(&details); + response.send(result).unwrap_or(()); + }); + } + ExecuteTask { + details, + varmap, + output_options, + response, + kill, + } => { + if running.len() == max_parallel { + running.next().await; + } + running.push(tokio::spawn(async move { + let attempt = match run_task(details, kill, output_options, varmap).await { + Ok(attempt) => attempt, + Err(e) => TaskAttempt { + succeeded: false, + executor: vec![format!("Failed to launch command: {:?}", e)], + ..TaskAttempt::new() + }, + }; + response.send(attempt).unwrap(); + })); + } + Stop {} => { + break; + } + } + } +} + +pub fn start(max_parallel: usize, msgs: mpsc::UnboundedReceiver) { + tokio::spawn(async move { + start_local_executor(max_parallel, msgs).await; + }); +} diff --git a/src/executors/mod.rs b/src/executors/mod.rs new file mode 100644 index 0000000..221066a --- /dev/null +++ b/src/executors/mod.rs @@ -0,0 +1,179 @@ +use super::*; +pub mod local_executor; + +fn default_bytes() -> usize { + 20480 +} + +/// Options in how to handle task output. Some tasks can be quite +/// verbose, and the output may not be needed. +#[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)] +#[serde(deny_unknown_fields)] +pub struct TaskOutputOptions { + /// If true, output from successful tasks is discarded entirely, in + /// keeping with the UNIX philosophy of no news is good news + #[serde(default)] + pub discard_successful: bool, + + /// If true, and output is not discarded, truncate the output of + /// each task to a maximum of the first / last `preserve` kb of + /// data + #[serde(default)] + pub truncate: bool, + + /// Number of KB of output to preserve at the beginning of the ouptut + #[serde(default = "default_bytes")] + pub head_bytes: usize, + + /// Number of KB of output to preserve at the end of the outut + #[serde(default = "default_bytes")] + pub tail_bytes: usize, +} + +impl Default for TaskOutputOptions { + fn default() -> Self { + TaskOutputOptions { + discard_successful: true, + truncate: true, + head_bytes: default_bytes(), + tail_bytes: default_bytes(), + } + } +} + +/// Messages for interacting with an Executor +#[derive(Debug)] +pub enum ExecutorMessage { + /// Validate a set of tasks. + /// Errors + /// Returns the vector of task issues + ValidateTask { + details: serde_json::Value, + response: oneshot::Sender>, + }, + + /// Execute the given task, along with enough information + /// Errors + /// Will return `Err` if the tasks are invalid, according to the executor + ExecuteTask { + details: serde_json::Value, + varmap: VarMap, + output_options: TaskOutputOptions, + response: oneshot::Sender, + kill: oneshot::Receiver<()>, + }, + Stop {}, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TaskAttempt { + #[serde(default)] + pub task_name: String, + + #[serde(default = "chrono::Utc::now")] + pub scheduled_time: DateTime, + + #[serde(default = "chrono::Utc::now")] + pub start_time: DateTime, + + #[serde(default = "chrono::Utc::now")] + pub stop_time: DateTime, + + #[serde(default)] + pub succeeded: bool, + + #[serde(default)] + pub killed: bool, + + #[serde(default)] + pub infra_failure: bool, + + #[serde(default)] + pub output: String, + + #[serde(default)] + pub error: String, + + #[serde(default)] + pub executor: Vec, + + #[serde(default)] + pub exit_code: i32, + + /// as a percentage + #[serde(default)] + pub max_cpu: f32, + + /// as a percentage + #[serde(default)] + pub avg_cpu: f32, + + /// In bytes + #[serde(default)] + pub max_rss: u64, + + /// In bytes + #[serde(default)] + pub avg_rss: f32, +} + +impl Default for TaskAttempt { + fn default() -> Self { + TaskAttempt { + task_name: String::new(), + scheduled_time: Utc::now(), + start_time: Utc::now(), + stop_time: Utc::now(), + succeeded: false, + killed: false, + infra_failure: false, + output: "".to_owned(), + error: "".to_owned(), + executor: Vec::new(), + exit_code: 0i32, + max_cpu: 0.0, + avg_cpu: 0.0, + max_rss: 0, + avg_rss: 0.0, + } + } +} + +impl TaskAttempt { + #[must_use] + pub fn new() -> Self { + TaskAttempt::default() + } +} + +/// Keeps the first / last bytes of a str +#[must_use] +pub fn head_tail(data: &str, head: usize, tail: usize) -> String { + if data.len() < head + tail { + data.to_owned() + } else { + let n_chars = data.chars().count(); + let charsize = (data.len() as f64 / n_chars as f64).ceil() as usize; + let head_chars = head / charsize; + let tail_chars = tail / charsize; + let mut tail: String = data.chars().rev().take(tail_chars).collect(); + tail = tail.chars().rev().collect(); + format!( + "{}\n...\n{}", + data.chars().take(head_chars).collect::(), + tail + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_head_tail() { + let sample = "This is a very long string".to_owned(); + assert_eq!(head_tail(&sample, 5, 5), "This \n...\ntring".to_owned()); + assert_eq!(head_tail(&sample, 50, 50), sample); + } +} diff --git a/src/lib.rs b/src/lib.rs index c9345a3..4cf2c6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ use crate::interval_set::*; use crate::requirement::*; use crate::schedule::*; use crate::task::*; +use crate::varmap::*; pub type Resource = String; pub type TaskDetails = serde_json::Value; @@ -26,3 +27,4 @@ pub mod interval_set; pub mod requirement; pub mod schedule; pub mod task; +pub mod varmap; diff --git a/src/varmap.rs b/src/varmap.rs new file mode 100644 index 0000000..c601493 --- /dev/null +++ b/src/varmap.rs @@ -0,0 +1,84 @@ +use super::*; +use std::ops::{Deref, DerefMut}; + +#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] +pub struct VarMap(HashMap); + +impl Deref for VarMap { + type Target = HashMap; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for VarMap { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl VarMap { + // Derive variables from a given interval + pub fn from_interval(int: &Interval, tz: Tz) -> Self { + let start = int.start.with_timezone(&tz); + let end = int.end.with_timezone(&tz); + + VarMap(HashMap::from([ + ("PERIOD_START".to_owned(), format!("{}", start)), + ("PERIOD_END".to_owned(), format!("{}", end)), + ("yyyy".to_owned(), format!("{}", end.year())), + ("mm".to_owned(), format!("{}", end.month())), + ("dd".to_owned(), format!("{}", end.day())), + ( + "yyyymmdd".to_owned(), + format!("{}{}{}", end.year(), end.month(), end.day()), + ), + ( + "hhmmss".to_owned(), + format!("{}{}{}", end.hour(), end.minute(), end.second()), + ), + ])) + } + + /// Interpolate values into a string, assuming string has variables + /// as ${varname} + pub fn apply_to(&self, s: &str) -> String { + let mut expanded = s.to_string(); + for (key, value) in self.0.iter() { + expanded = expanded.replace(&format!("${{{}}}", key), value); + } + expanded + } +} + +impl From> for VarMap { + fn from(data: HashMap) -> Self { + VarMap(data) + } +} + +impl<'a> FromIterator<(&'a String, &'a String)> for VarMap { + fn from_iter>(iter: I) -> Self { + let mut data = HashMap::new(); + for (k, v) in iter { + data.insert(k.clone(), v.clone()); + } + VarMap(data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_simple_apply() { + let s = "This is a ${test} of home and ${test} of away ${beep}"; + let vm = VarMap(HashMap::from([("test".to_owned(), "alpha".to_owned())])); + + assert_eq!( + &vm.apply_to(s), + "This is a alpha of home and alpha of away ${beep}" + ); + } +}