Adding wfd config, endpoint to get state

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-06 15:32:05 -03:00
parent eb590c848e
commit f5ca3315f0
3 changed files with 46 additions and 14 deletions
+17
View File
@@ -0,0 +1,17 @@
{
"server": {
"ip": "127.0.0.1",
"port": 2503
},
"storage": {
"type": "redis",
"url": "redis://localhost",
"prefix": "another"
},
"executor": {
"type": "local",
"workers": 10
}
}
+28 -13
View File
@@ -3,7 +3,7 @@ use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, R
use clap::Parser;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use waterfall::prelude::*;
#[derive(Serialize, Deserialize, Debug)]
@@ -90,24 +90,19 @@ struct SimpleError {
}
async fn get_state(state: web::Data<AppState>) -> impl Responder {
// Build the current state
// let (response, rx) = oneshot::channel();
let (response, rx) = oneshot::channel();
/*
state
.config
.tracker
.send(TrackerMessage::GetRun { run_id, response })
.storage_tx
.send(StorageMessage::LoadState { response })
.unwrap();
match rx.await.unwrap() {
Ok(run) => HttpResponse::Ok().json(run),
match rx.await {
Ok(world) => HttpResponse::Ok().json(world),
Err(error) => HttpResponse::BadRequest().json(SimpleError {
error: format!("{:?}", error),
}),
}
*/
HttpResponse::Ok().body("")
}
/*
@@ -152,8 +147,8 @@ struct Args {
#[derive(Clone)]
struct AppState {
exe_tx: mpsc::UnboundedSender<ExecutorMessage>,
storage_tx: mpsc::UnboundedSender<StorageMessage>,
runner_tx: mpsc::UnboundedSender<RunnerMessage>,
}
#[actix_web::main]
@@ -175,10 +170,28 @@ async fn main() -> std::io::Result<()> {
// Start the workers
let (exe_tx, exe_handle) = config.executor.start();
let (storage_tx, storage_handle) = config.storage.start();
let (runner_tx, runner_rx) = mpsc::unbounded_channel();
let data = web::Data::new(AppState {
exe_tx: exe_tx.clone(),
storage_tx: storage_tx.clone(),
runner_tx: runner_tx.clone(),
});
let tasks = world_def.taskset().unwrap();
let mut runner = Runner::new(
tasks,
world_def.variables,
runner_rx,
exe_tx.clone(),
storage_tx.clone(),
world_def.output_options,
args.force_recheck,
)
.await
.unwrap();
let runner_handle = tokio::spawn(async move {
runner.run().await;
});
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
@@ -236,6 +249,8 @@ async fn main() -> std::io::Result<()> {
.await;
// Shutdown the runner
runner_tx.send(RunnerMessage::Stop {}).unwrap();
runner_handle.await.unwrap();
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
exe_handle.await.unwrap();
storage_tx.send(StorageMessage::Stop {}).unwrap();
+1 -1
View File
@@ -1,6 +1,6 @@
pub use crate::calendar::Calendar;
pub use crate::executors::*;
pub use crate::runner::Runner;
pub use crate::runner::{Runner, RunnerMessage};
pub use crate::storage::*;
pub use crate::task::{TaskDefinition, TaskResources};
pub use crate::world::WorldDefinition;