Adapting the agent

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-05 09:19:07 -03:00
parent 0d6cea4152
commit ca9a32c032
2 changed files with 29 additions and 15 deletions
+15 -14
View File
@@ -7,8 +7,11 @@ use serde::Serialize;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use config::*; use config::*;
use waterfall::executors::agent_executor::TaskSubmission;
use waterfall::prelude::*; use waterfall::prelude::*;
type TaskDetails = serde_json::Value;
#[derive(Serialize)] #[derive(Serialize)]
struct SimpleError { struct SimpleError {
error: String, error: String,
@@ -19,30 +22,28 @@ async fn get_resources(data: web::Data<GlobalConfig>) -> impl Responder {
} }
async fn submit_task( async fn submit_task(
details: web::Json<TaskDetails>, details: web::Json<TaskSubmission>,
data: web::Data<GlobalConfig>, data: web::Data<GlobalConfig>,
) -> impl Responder { ) -> impl Responder {
let (response, mut rx) = mpsc::unbounded_channel(); let (response, rx) = oneshot::channel();
let submission = data.into_inner();
let trx = data.tracker.clone(); let trx = data.tracker.clone();
data.executor data.executor
.send(ExecutorMessage::ExecuteTask { .send(ExecutorMessage::ExecuteTask {
details: details.into_inner(), details: submission.details,
output_options: TaskOutputOptions::default(), output_options: submission.output_options,
tracker: trx, varmap: submission.varmap,
response, response,
}) })
.unwrap(); .unwrap();
match rx.recv().await.unwrap() { HttpResponse::Ok().json(rx.await.unwrap())
RunnerMessage::ExecutionReport { attempt, .. } => HttpResponse::Ok().json(attempt),
other => HttpResponse::BadRequest().json(SimpleError {
error: format!("Unexpected message {:?}", other),
}),
}
} }
/*
async fn stop_task( async fn stop_task(
path: web::Path<(RunID, TaskID)>, path: web::Path<(RunID, TaskID)>,
data: web::Data<GlobalConfig>, data: web::Data<GlobalConfig>,
@@ -61,6 +62,7 @@ async fn stop_task(
rx.await.unwrap(); rx.await.unwrap();
HttpResponse::Ok() HttpResponse::Ok()
} }
*/
async fn ready() -> impl Responder { async fn ready() -> impl Responder {
HttpResponse::Ok() HttpResponse::Ok()
@@ -148,8 +150,7 @@ async fn main() -> std::io::Result<()> {
.service( .service(
web::scope("/api/v1") web::scope("/api/v1")
.route("/resources", web::get().to(get_resources)) .route("/resources", web::get().to(get_resources))
.route("/{run_id}/{task_id}", web::post().to(submit_task)) .route("/run", web::post().to(submit_task)),
.route("/{run_id}/{task_id}", web::delete().to(stop_task)),
) )
}) })
.bind(config.listen_spec())? .bind(config.listen_spec())?
@@ -157,7 +158,7 @@ async fn main() -> std::io::Result<()> {
.await; .await;
config.executor.send(ExecutorMessage::Stop {}).unwrap(); config.executor.send(ExecutorMessage::Stop {}).unwrap();
config.tracker.send(TrackerMessage::Stop {}).unwrap(); config.storage.send(StorageMessage::Stop {}).unwrap();
res res
} }
+14 -1
View File
@@ -102,6 +102,14 @@ fn validate_task(details: &TaskDetails, max_capacities: &[TaskResources]) -> Res
} }
} }
/// Contains specifics on how to run a local task
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskSubmission {
details: TaskDetails,
varmap: VarMap,
output_options: TaskOutputOptions,
}
async fn submit_task( async fn submit_task(
base_url: String, base_url: String,
details: TaskDetails, details: TaskDetails,
@@ -111,7 +119,12 @@ async fn submit_task(
) -> TaskAttempt { ) -> TaskAttempt {
let submit_url = format!("{}/run", base_url); let submit_url = format!("{}/run", base_url);
let mut attempt = TaskAttempt::new(); let mut attempt = TaskAttempt::new();
match client.post(submit_url).json(&details).send().await { let submission = TaskSubmission {
details,
varmap,
output_options,
};
match client.post(submit_url).json(&submission).send().await {
Ok(result) => { Ok(result) => {
if result.status() == reqwest::StatusCode::OK { if result.status() == reqwest::StatusCode::OK {
attempt = result.json().await.unwrap(); attempt = result.json().await.unwrap();