From f5ca3315f0d4234755a63f2125aa11684f2dde5f Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Thu, 6 Oct 2022 15:32:05 -0300 Subject: [PATCH] Adding wfd config, endpoint to get state --- examples/wfd.json | 17 +++++++++++++++++ src/bin/wfd/main.rs | 41 ++++++++++++++++++++++++++++------------- src/prelude.rs | 2 +- 3 files changed, 46 insertions(+), 14 deletions(-) create mode 100644 examples/wfd.json diff --git a/examples/wfd.json b/examples/wfd.json new file mode 100644 index 0000000..6d8982d --- /dev/null +++ b/examples/wfd.json @@ -0,0 +1,17 @@ +{ + "server": { + "ip": "127.0.0.1", + "port": 2503 + }, + "storage": { + "type": "redis", + "url": "redis://localhost", + "prefix": "another" + }, + "executor": { + "type": "local", + "workers": 10 + } +} + + diff --git a/src/bin/wfd/main.rs b/src/bin/wfd/main.rs index b3d44f5..b5af41e 100644 --- a/src/bin/wfd/main.rs +++ b/src/bin/wfd/main.rs @@ -3,7 +3,7 @@ use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, R use clap::Parser; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use waterfall::prelude::*; #[derive(Serialize, Deserialize, Debug)] @@ -90,24 +90,19 @@ struct SimpleError { } async fn get_state(state: web::Data) -> impl Responder { - // Build the current state - // let (response, rx) = oneshot::channel(); + let (response, rx) = oneshot::channel(); - /* state - .config - .tracker - .send(TrackerMessage::GetRun { run_id, response }) + .storage_tx + .send(StorageMessage::LoadState { response }) .unwrap(); - match rx.await.unwrap() { - Ok(run) => HttpResponse::Ok().json(run), + match rx.await { + Ok(world) => HttpResponse::Ok().json(world), Err(error) => HttpResponse::BadRequest().json(SimpleError { error: format!("{:?}", error), }), } - */ - HttpResponse::Ok().body("") } /* @@ -152,8 +147,8 @@ struct Args { #[derive(Clone)] struct AppState { - exe_tx: mpsc::UnboundedSender, storage_tx: mpsc::UnboundedSender, + runner_tx: mpsc::UnboundedSender, } #[actix_web::main] @@ -175,10 +170,28 @@ async fn main() -> std::io::Result<()> { // Start the workers let (exe_tx, exe_handle) = config.executor.start(); let (storage_tx, storage_handle) = config.storage.start(); + let (runner_tx, runner_rx) = mpsc::unbounded_channel(); let data = web::Data::new(AppState { - exe_tx: exe_tx.clone(), storage_tx: storage_tx.clone(), + runner_tx: runner_tx.clone(), + }); + + let tasks = world_def.taskset().unwrap(); + let mut runner = Runner::new( + tasks, + world_def.variables, + runner_rx, + exe_tx.clone(), + storage_tx.clone(), + world_def.output_options, + args.force_recheck, + ) + .await + .unwrap(); + + let runner_handle = tokio::spawn(async move { + runner.run().await; }); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); @@ -236,6 +249,8 @@ async fn main() -> std::io::Result<()> { .await; // Shutdown the runner + runner_tx.send(RunnerMessage::Stop {}).unwrap(); + runner_handle.await.unwrap(); exe_tx.send(ExecutorMessage::Stop {}).unwrap(); exe_handle.await.unwrap(); storage_tx.send(StorageMessage::Stop {}).unwrap(); diff --git a/src/prelude.rs b/src/prelude.rs index 14dffe9..7a5e578 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,6 +1,6 @@ pub use crate::calendar::Calendar; pub use crate::executors::*; -pub use crate::runner::Runner; +pub use crate::runner::{Runner, RunnerMessage}; pub use crate::storage::*; pub use crate::task::{TaskDefinition, TaskResources}; pub use crate::world::WorldDefinition;