Adding in inherited environment variables to local executor

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-09-29 14:07:01 -03:00
parent 4ba2a3aa26
commit 1746470367
2 changed files with 37 additions and 5 deletions
+33 -5
View File
@@ -11,6 +11,8 @@ use tokio::time::{sleep, Duration};
use futures::StreamExt; use futures::StreamExt;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
type Environment = HashMap<String, Option<String>>;
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)] #[serde(untagged)]
enum Cmd { enum Cmd {
@@ -37,7 +39,7 @@ struct LocalTaskDetail {
/// Environment variables to set /// Environment variables to set
#[serde(default)] #[serde(default)]
environment: HashMap<String, Option<String>>, environment: Environment,
/// Timeout in seconds /// Timeout in seconds
#[serde(default)] #[serde(default)]
@@ -104,6 +106,7 @@ async fn run_task(
mut stop_rx: oneshot::Receiver<()>, mut stop_rx: oneshot::Receiver<()>,
output_options: TaskOutputOptions, output_options: TaskOutputOptions,
varmap: VarMap, varmap: VarMap,
mut env: Environment,
) -> Result<TaskAttempt> { ) -> Result<TaskAttempt> {
let mut details = extract_details(&task).unwrap(); let mut details = extract_details(&task).unwrap();
let mut attempt = TaskAttempt::new(); let mut attempt = TaskAttempt::new();
@@ -117,9 +120,10 @@ async fn run_task(
command.stderr(Stdio::piped()); command.stderr(Stdio::piped());
command.args(args); command.args(args);
// Need to convert optional // Build out environment. This takes the initial environment, and will
let cmd_env: HashMap<String, String> = details // upsert it with the task details.
.environment env.extend(details.environment);
let cmd_env: HashMap<String, String> = env
.iter() .iter()
.filter(|(_, v)| v.is_some()) .filter(|(_, v)| v.is_some())
.map(|(k, v)| (k.clone(), varmap.apply_to(&v.clone().unwrap()))) .map(|(k, v)| (k.clone(), varmap.apply_to(&v.clone().unwrap())))
@@ -218,6 +222,29 @@ pub async fn start_local_executor(
) { ) {
let mut running = FuturesUnordered::new(); let mut running = FuturesUnordered::new();
/*
Inherited environment vars
*/
let default_vars = [
"LANG",
"HOSTNAME",
"LOGNAME",
"USER",
"PATH",
"HOME",
"XDG_CONFIG_HOME",
"ALL_PROXY",
"FTP_PROXY",
"HTTPS_PROXY",
"HTTP_PROXY",
"NO_PROXY",
];
let inherited_env: Environment = default_vars
.iter()
.map(|envvar| (envvar.to_string(), std::env::var(envvar).ok()))
.collect();
while let Some(msg) = exe_msgs.recv().await { while let Some(msg) = exe_msgs.recv().await {
use ExecutorMessage::{ExecuteTask, Stop, ValidateTask}; use ExecutorMessage::{ExecuteTask, Stop, ValidateTask};
match msg { match msg {
@@ -237,8 +264,9 @@ pub async fn start_local_executor(
if running.len() == max_parallel { if running.len() == max_parallel {
running.next().await; running.next().await;
} }
let env = inherited_env.clone();
running.push(tokio::spawn(async move { running.push(tokio::spawn(async move {
let attempt = match run_task(details, kill, output_options, varmap).await { let attempt = match run_task(details, kill, output_options, varmap, env).await {
Ok(attempt) => attempt, Ok(attempt) => attempt,
Err(e) => TaskAttempt { Err(e) => TaskAttempt {
succeeded: false, succeeded: false,
+4
View File
@@ -18,6 +18,10 @@ impl DerefMut for VarMap {
} }
impl VarMap { impl VarMap {
pub fn new() -> Self {
VarMap(HashMap::new())
}
// Derive variables from a given interval // Derive variables from a given interval
pub fn from_interval(int: &Interval, tz: Tz) -> Self { pub fn from_interval(int: &Interval, tz: Tz) -> Self {
let start = int.start.with_timezone(&tz); let start = int.start.with_timezone(&tz);