Adding example for agent execution

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-05 10:35:48 -03:00
parent d82b000f9b
commit 923025bc49
3 changed files with 40 additions and 28 deletions
+17
View File
@@ -0,0 +1,17 @@
{
"storage": {
"type": "redis",
"url": "redis://localhost",
"prefix": "world"
},
"executor": {
"type": "agent",
"targets": [
{
"base_url": "http://localhost:2504/api/v1",
"resources": { "cores": 10 }
}
]
}
}
+6 -6
View File
@@ -7,9 +7,9 @@
}, },
"tasks": { "tasks": {
"task_a": { "task_a": {
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}", "resources": { "cores": 1 } }, "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_a" ], "provides": [ "task_a" ],
@@ -21,9 +21,9 @@
"valid_to": "2022-01-08T09:00:00" "valid_to": "2022-01-08T09:00:00"
}, },
"task_b": { "task_b": {
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}", "resources": { "cores": 1 } }, "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_b" ], "provides": [ "task_b" ],
"requires": [ { "resource": "task_a", "offset": 0 } ], "requires": [ { "resource": "task_a", "offset": 0 } ],
+17 -22
View File
@@ -115,7 +115,7 @@ async fn submit_task(
output_options: TaskOutputOptions, output_options: TaskOutputOptions,
client: reqwest::Client, client: reqwest::Client,
varmap: VarMap, varmap: VarMap,
) -> TaskAttempt { ) -> Result<TaskAttempt> {
let submit_url = format!("{}/run", base_url); let submit_url = format!("{}/run", base_url);
let mut attempt = TaskAttempt::new(); let mut attempt = TaskAttempt::new();
let submission = TaskSubmission { let submission = TaskSubmission {
@@ -130,28 +130,21 @@ async fn submit_task(
attempt attempt
.executor .executor
.push(format!("Executed on agent at {}", base_url)); .push(format!("Executed on agent at {}", base_url));
Ok(attempt)
} else { } else {
attempt.succeeded = false; Err(anyhow!(
attempt.infra_failure = true;
attempt.executor.push(format!(
"Unable to dispatch to agent at {}: {:?}", "Unable to dispatch to agent at {}: {:?}",
base_url, base_url,
result.text().await.unwrap() result.text().await.unwrap()
)); ))
} }
} }
Err(e) => { Err(e) => Err(anyhow!(
warn!("Failed to submit task: {:?}", e); "Unable to dispatch to agent at {}: {:?}",
attempt.succeeded = false; base_url,
attempt.infra_failure = true; e
attempt.executor.push(format!( )),
"Unable to dispatch to agent at {}: {:?}",
base_url, e
));
}
} }
attempt
} }
// async fn select_target() -> Option<usize> {} // async fn select_target() -> Option<usize> {}
@@ -211,11 +204,12 @@ async fn start_agent_executor(
}) { }) {
// There is a remote agent with capacity // There is a remote agent with capacity
Some((tid, target)) => { Some((tid, target)) => {
info!("Dispatching job to {}", target.base_url);
target.current_resources.sub(&resources).unwrap(); target.current_resources.sub(&resources).unwrap();
let base_url = target.base_url.clone(); let base_url = target.base_url.clone();
let submit_client = client.clone(); let submit_client = client.clone();
running.push(tokio::spawn(async move { running.push(tokio::spawn(async move {
let attempt = submit_task( let res = submit_task(
base_url, base_url,
details, details,
output_options, output_options,
@@ -223,8 +217,11 @@ async fn start_agent_executor(
varmap, varmap,
) )
.await; .await;
let rc = attempt.succeeded; let mut rc = false;
response.send(attempt).unwrap(); if let Ok(attempt) = res {
response.send(attempt).unwrap();
rc = true;
}
(tid, resources, rc) (tid, resources, rc)
})); }));
break; break;
@@ -233,13 +230,11 @@ async fn start_agent_executor(
None => { None => {
// Give the outstanding tasks a chance to complete or agents // Give the outstanding tasks a chance to complete or agents
// recover // recover
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
info!("Waiting to run message");
// Refresh any disabled targets // Refresh any disabled targets
for (tid, target) in targets.iter_mut().enumerate() { for (tid, target) in targets.iter_mut().enumerate() {
if target.enabled { if target.enabled {
info!("Skipping {} as it is enabled", target.base_url);
continue; continue;
} }
target.refresh_resources(&client).await; target.refresh_resources(&client).await;