From 923025bc4939a1b5fc0112767fbffe627b2faa7f Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Wed, 5 Oct 2022 10:35:48 -0300 Subject: [PATCH] Adding example for agent execution --- examples/config_wfw.json | 17 ++++++++++++++ examples/world.json | 12 +++++----- src/executors/agent_executor.rs | 39 ++++++++++++++------------------- 3 files changed, 40 insertions(+), 28 deletions(-) create mode 100644 examples/config_wfw.json diff --git a/examples/config_wfw.json b/examples/config_wfw.json new file mode 100644 index 0000000..f594891 --- /dev/null +++ b/examples/config_wfw.json @@ -0,0 +1,17 @@ +{ + "storage": { + "type": "redis", + "url": "redis://localhost", + "prefix": "world" + }, + "executor": { + "type": "agent", + "targets": [ + { + "base_url": "http://localhost:2504/api/v1", + "resources": { "cores": 10 } + } + ] + } +} + diff --git a/examples/world.json b/examples/world.json index 52a784e..139b8fb 100644 --- a/examples/world.json +++ b/examples/world.json @@ -7,9 +7,9 @@ }, "tasks": { "task_a": { - "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, - "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, - "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, + "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, "provides": [ "task_a" ], @@ -21,9 +21,9 @@ "valid_to": "2022-01-08T09:00:00" }, "task_b": { - "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, - "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, - "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, + "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, "provides": [ "task_b" ], "requires": [ { "resource": "task_a", "offset": 0 } ], diff --git a/src/executors/agent_executor.rs b/src/executors/agent_executor.rs index 64f94cb..2b1b71d 100644 --- a/src/executors/agent_executor.rs +++ b/src/executors/agent_executor.rs @@ -115,7 +115,7 @@ async fn submit_task( output_options: TaskOutputOptions, client: reqwest::Client, varmap: VarMap, -) -> TaskAttempt { +) -> Result { let submit_url = format!("{}/run", base_url); let mut attempt = TaskAttempt::new(); let submission = TaskSubmission { @@ -130,28 +130,21 @@ async fn submit_task( attempt .executor .push(format!("Executed on agent at {}", base_url)); + Ok(attempt) } else { - attempt.succeeded = false; - attempt.infra_failure = true; - attempt.executor.push(format!( + Err(anyhow!( "Unable to dispatch to agent at {}: {:?}", base_url, result.text().await.unwrap() - )); + )) } } - Err(e) => { - warn!("Failed to submit task: {:?}", e); - attempt.succeeded = false; - attempt.infra_failure = true; - attempt.executor.push(format!( - "Unable to dispatch to agent at {}: {:?}", - base_url, e - )); - } + Err(e) => Err(anyhow!( + "Unable to dispatch to agent at {}: {:?}", + base_url, + e + )), } - - attempt } // async fn select_target() -> Option {} @@ -211,11 +204,12 @@ async fn start_agent_executor( }) { // There is a remote agent with capacity Some((tid, target)) => { + info!("Dispatching job to {}", target.base_url); target.current_resources.sub(&resources).unwrap(); let base_url = target.base_url.clone(); let submit_client = client.clone(); running.push(tokio::spawn(async move { - let attempt = submit_task( + let res = submit_task( base_url, details, output_options, @@ -223,8 +217,11 @@ async fn start_agent_executor( varmap, ) .await; - let rc = attempt.succeeded; - response.send(attempt).unwrap(); + let mut rc = false; + if let Ok(attempt) = res { + response.send(attempt).unwrap(); + rc = true; + } (tid, resources, rc) })); break; @@ -233,13 +230,11 @@ async fn start_agent_executor( None => { // Give the outstanding tasks a chance to complete or agents // recover - tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; - info!("Waiting to run message"); + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Refresh any disabled targets for (tid, target) in targets.iter_mut().enumerate() { if target.enabled { - info!("Skipping {} as it is enabled", target.base_url); continue; } target.refresh_resources(&client).await;