diff --git a/examples/world.json b/examples/world.json index 31ca5b1..52a784e 100644 --- a/examples/world.json +++ b/examples/world.json @@ -7,9 +7,9 @@ }, "tasks": { "task_a": { - "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}" }, - "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}" }, - "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}" }, + "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, "provides": [ "task_a" ], @@ -21,9 +21,9 @@ "valid_to": "2022-01-08T09:00:00" }, "task_b": { - "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}" }, - "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}" }, - "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}" }, + "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, "provides": [ "task_b" ], "requires": [ { "resource": "task_a", "offset": 0 } ], diff --git a/src/bin/waterfallw/main.rs b/src/bin/waterfallw/main.rs index 5fdc378..09dafa5 100644 --- a/src/bin/waterfallw/main.rs +++ b/src/bin/waterfallw/main.rs @@ -29,7 +29,8 @@ async fn submit_task( let submission = details.into_inner(); - let (_, kill) = oneshot::channel(); + // Need to keep this unused, otherwise the LE will kill it immediately + let (kill_tx, kill) = oneshot::channel(); data.executor .send(ExecutorMessage::ExecuteTask { details: submission.details, diff --git a/src/executors/agent_executor.rs b/src/executors/agent_executor.rs index a9a4b04..64f94cb 100644 --- a/src/executors/agent_executor.rs +++ b/src/executors/agent_executor.rs @@ -69,8 +69,7 @@ impl AgentTarget { #[derive(Serialize, Deserialize, Clone, Debug)] struct AgentTaskDetail { /// The command and all arguments to run - #[serde(default)] - command: Vec, + command: Cmd, /// Environment variables to set #[serde(default)] @@ -142,6 +141,7 @@ async fn submit_task( } } Err(e) => { + warn!("Failed to submit task: {:?}", e); attempt.succeeded = false; attempt.infra_failure = true; attempt.executor.push(format!( diff --git a/src/executors/local_executor.rs b/src/executors/local_executor.rs index f81803d..0ed8fc8 100644 --- a/src/executors/local_executor.rs +++ b/src/executors/local_executor.rs @@ -13,24 +13,6 @@ use tokio::io::AsyncReadExt; type Environment = HashMap>; -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(untagged)] -enum Cmd { - Simple(String), - Split(Vec), -} - -impl Cmd { - fn generate(&self, varmap: &VarMap) -> Vec { - let cmd = match self { - Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(), - Cmd::Split(v) => v.clone(), - }; - - cmd.into_iter().map(|x| varmap.apply_to(&x)).collect() - } -} - /// Contains specifics on how to run a local task #[derive(Serialize, Deserialize, Clone, Debug)] struct LocalTaskDetail { diff --git a/src/executors/mod.rs b/src/executors/mod.rs index 7998530..b175225 100644 --- a/src/executors/mod.rs +++ b/src/executors/mod.rs @@ -30,6 +30,24 @@ fn default_bytes() -> usize { 20480 } +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(untagged)] +pub enum Cmd { + Simple(String), + Split(Vec), +} + +impl Cmd { + pub fn generate(&self, varmap: &VarMap) -> Vec { + let cmd = match self { + Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(), + Cmd::Split(v) => v.clone(), + }; + + cmd.into_iter().map(|x| varmap.apply_to(&x)).collect() + } +} + /// Options in how to handle task output. Some tasks can be quite /// verbose, and the output may not be needed. #[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)]