diff --git a/Cargo.toml b/Cargo.toml index 8a3b07c..a838177 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,5 @@ redis = { version = "*", features = ["aio", "tokio-comp"] } clap = { version = "3.1", features = ["derive"] } env_logger = "0.9" log = "0.4" +actix-web = "4" +actix-cors = "0.6" diff --git a/src/bin/agent/config.rs b/src/bin/agent/config.rs new file mode 100644 index 0000000..7857451 --- /dev/null +++ b/src/bin/agent/config.rs @@ -0,0 +1,84 @@ +pub use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use sysinfo::{RefreshKind, System, SystemExt}; +use tokio::sync::mpsc; +use waterfall::prelude::*; + +fn default_resources() -> TaskResources { + let system = System::new_with_specifics(RefreshKind::new().with_cpu().with_memory()); + let cores = (system.processors().len() as i64) - 2; + let free_memory = (system.total_memory() - system.used_memory()) as f64; + let memory_mb = ((free_memory * 0.8) as i64) / 1024; + + let mut resources = TaskResources::new(); + resources.insert("cores".to_owned(), cores); + resources.insert("memory_mb".to_owned(), memory_mb); + resources +} + +fn default_ip() -> String { + "127.0.0.1".to_owned() +} + +fn default_port() -> u32 { + 2504 +} + +#[derive(Deserialize, Debug, Clone)] +pub struct GlobalConfigSpec { + #[serde(default = "default_ip")] + pub ip: String, + + #[serde(default = "default_port")] + pub port: u32, + + #[serde(default = "default_resources")] + pub resources: TaskResources, +} + +impl Default for GlobalConfigSpec { + fn default() -> Self { + GlobalConfigSpec { + ip: String::from("127.0.0.1"), + port: default_port(), + resources: default_resources(), + } + } +} + +#[derive(Clone)] +pub struct GlobalConfig { + pub ip: String, + pub port: u32, + pub resources: TaskResources, + pub tracker: mpsc::UnboundedSender, + pub executor: mpsc::UnboundedSender, +} + +impl GlobalConfig { + pub fn new(spec: &GlobalConfigSpec) -> Self { + let def_res = default_resources(); + let cores = def_res.get("cores").unwrap(); + + let workers = spec.resources.get("cores").unwrap_or(cores); + + let (executor, exe_rx) = mpsc::unbounded_channel(); + local_executor::start(*workers as usize, exe_rx); + + // Tracker + let (tracker, trx) = mpsc::unbounded_channel(); + noop_tracker::start(trx); + + GlobalConfig { + ip: spec.ip.clone(), + port: spec.port, + resources: spec.resources.clone(), + tracker, + executor, + } + } + + pub fn listen_spec(&self) -> String { + format!("{}:{}", self.ip, self.port) + } +} diff --git a/src/bin/agent/main.rs b/src/bin/agent/main.rs new file mode 100644 index 0000000..4d5ca6f --- /dev/null +++ b/src/bin/agent/main.rs @@ -0,0 +1,163 @@ +mod config; + +use actix_cors::Cors; +use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder}; +use clap::Parser; +use serde::Serialize; +use tokio::sync::{mpsc, oneshot}; + +use config::*; +use waterfall::prelude::*; + +#[derive(Serialize)] +struct SimpleError { + error: String, +} + +async fn get_resources(data: web::Data) -> impl Responder { + HttpResponse::Ok().json(data.resources.clone()) +} + +async fn submit_task( + details: web::Json, + data: web::Data, +) -> impl Responder { + let (response, mut rx) = mpsc::unbounded_channel(); + + let trx = data.tracker.clone(); + + data.executor + .send(ExecutorMessage::ExecuteTask { + details: details.into_inner(), + output_options: TaskOutputOptions::default(), + tracker: trx, + response, + }) + .unwrap(); + + match rx.recv().await.unwrap() { + RunnerMessage::ExecutionReport { attempt, .. } => HttpResponse::Ok().json(attempt), + other => HttpResponse::BadRequest().json(SimpleError { + error: format!("Unexpected message {:?}", other), + }), + } +} + +async fn stop_task( + path: web::Path<(RunID, TaskID)>, + data: web::Data, +) -> impl Responder { + let (run_id, task_id) = path.into_inner(); + let (response, rx) = oneshot::channel(); + + data.executor + .send(ExecutorMessage::StopTask { + run_id, + task_id, + response, + }) + .unwrap(); + + rx.await.unwrap(); + HttpResponse::Ok() +} + +async fn ready() -> impl Responder { + HttpResponse::Ok() +} + +fn init(config_file: &str) -> GlobalConfig { + let spec: GlobalConfigSpec = if config_file.is_empty() { + GlobalConfigSpec::default() + } else { + let json = std::fs::read_to_string(config_file) + .unwrap_or_else(|_| panic!("Unable to open {} for reading", config_file)); + serde_json::from_str(&json).expect("Error parsing config json") + }; + + GlobalConfig::new(&spec) +} + +#[derive(Parser, Debug)] +#[clap(author, version, about)] +struct Args { + /// Configuration File + #[clap(short, long, default_value = "")] + config: String, + + /// Enable verbose logging + #[clap(short, long)] + verbose: bool, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + let args = Args::parse(); + + let data = web::Data::new(init(args.config.as_ref())); + let config = data.clone(); + + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + let res = HttpServer::new(move || { + let cors = Cors::default() + .allow_any_header() + .allow_any_method() + .allow_any_origin() + .send_wildcard(); + + let json_config = web::JsonConfig::default() + .limit(1048576) + .error_handler(|err, _req| { + use actix_web::error::JsonPayloadError; + let payload = match &err { + JsonPayloadError::OverflowKnownLength { length, limit } => SimpleError { + error: format!("Payload too big ({} > {})", length, limit), + }, + JsonPayloadError::Overflow { limit } => SimpleError { + error: format!("Payload too big (> {})", limit), + }, + JsonPayloadError::ContentType => SimpleError { + error: "Unsupported Content-Type".to_owned(), + }, + JsonPayloadError::Deserialize(e) => SimpleError { + error: format!("Parsing error: {}", e), + }, + JsonPayloadError::Serialize(e) => SimpleError { + error: format!("JSON Generation error: {}", e), + }, + JsonPayloadError::Payload(payload) => SimpleError { + error: format!("Payload error: {}", payload), + }, + _ => SimpleError { + error: "Unknown error".to_owned(), + }, + }; + + error::InternalError::from_response(err, HttpResponse::Conflict().json(payload)) + .into() + }); + + App::new() + .wrap(cors) + .app_data(data.clone()) + .wrap(Logger::new( + r#"%a "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#, + )) + .app_data(json_config) + .route("/ready", web::get().to(ready)) + .service( + web::scope("/api/v1") + .route("/resources", web::get().to(get_resources)) + .route("/{run_id}/{task_id}", web::post().to(submit_task)) + .route("/{run_id}/{task_id}", web::delete().to(stop_task)), + ) + }) + .bind(config.listen_spec())? + .run() + .await; + + config.executor.send(ExecutorMessage::Stop {}).unwrap(); + config.tracker.send(TrackerMessage::Stop {}).unwrap(); + + res +} diff --git a/src/bin/wf/main.rs b/src/bin/wf/main.rs index f8f8098..0380084 100644 --- a/src/bin/wf/main.rs +++ b/src/bin/wf/main.rs @@ -2,6 +2,7 @@ use clap::Parser; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; +use waterfall; use waterfall::prelude::*; #[derive(Serialize, Deserialize, Debug)] @@ -19,9 +20,10 @@ impl StorageConfig { ) { let (tx, rx) = mpsc::unbounded_channel(); match self { - StorageConfig::Redis { url, prefix } => { - (tx, redis_store::start(rx, url.clone(), prefix.clone())) - } + StorageConfig::Redis { url, prefix } => ( + tx, + waterfall::storage::redis::start(rx, url.clone(), prefix.clone()), + ), } } } diff --git a/src/executors/agent_executor.rs b/src/executors/agent_executor.rs index bcf78b5..43e8b97 100644 --- a/src/executors/agent_executor.rs +++ b/src/executors/agent_executor.rs @@ -183,12 +183,9 @@ async fn start_agent_executor( }); } ExecuteTask { - task_name, - interval, details, varmap, output_options, - storage, response, kill, } => { @@ -214,14 +211,7 @@ async fn start_agent_executor( ) .await; let rc = attempt.succeeded; - storage - .send(StorageMessage::StoreAttempt { - task_name, - interval, - attempt, - }) - .unwrap(); - response.send(rc).unwrap(); + response.send(attempt).unwrap(); (tid, resources, rc) })); break; diff --git a/src/executors/local_executor.rs b/src/executors/local_executor.rs index a507fa0..f81803d 100644 --- a/src/executors/local_executor.rs +++ b/src/executors/local_executor.rs @@ -255,12 +255,9 @@ pub async fn start_local_executor( }); } ExecuteTask { - task_name, - interval, details, varmap, output_options, - storage, response, kill, } => { @@ -277,15 +274,7 @@ pub async fn start_local_executor( ..TaskAttempt::new() }, }; - let rc = attempt.succeeded; - storage - .send(StorageMessage::StoreAttempt { - task_name, - interval, - attempt, - }) - .unwrap(); - response.send(rc).unwrap(); + response.send(attempt).unwrap(); })); } Stop {} => { diff --git a/src/executors/mod.rs b/src/executors/mod.rs index 17ceb0d..7998530 100644 --- a/src/executors/mod.rs +++ b/src/executors/mod.rs @@ -17,13 +17,10 @@ pub enum ExecutorMessage { /// Errors /// Will return `Err` if the tasks are invalid, according to the executor ExecuteTask { - task_name: String, - interval: Interval, details: serde_json::Value, varmap: VarMap, output_options: TaskOutputOptions, - storage: mpsc::UnboundedSender, - response: oneshot::Sender, + response: oneshot::Sender, kill: oneshot::Receiver<()>, }, Stop {}, diff --git a/src/prelude.rs b/src/prelude.rs index 907474b..14dffe9 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -2,5 +2,5 @@ pub use crate::calendar::Calendar; pub use crate::executors::*; pub use crate::runner::Runner; pub use crate::storage::*; -pub use crate::task::TaskDefinition; +pub use crate::task::{TaskDefinition, TaskResources}; pub use crate::world::WorldDefinition; diff --git a/src/runner.rs b/src/runner.rs index d1dff6c..105f90c 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -99,23 +99,29 @@ async fn run_task( let (response, response_rx) = oneshot::channel(); executor .send(ExecutorMessage::ExecuteTask { - task_name, - interval, details, output_options: output_options.clone(), varmap: varmap.clone(), response, kill, - storage, }) .unwrap(); - response_rx.await.unwrap() + let attempt = response_rx.await.unwrap(); + let rc = attempt.succeeded; + storage + .send(StorageMessage::StoreAttempt { + task_name, + interval, + attempt: attempt.clone(), + }) + .unwrap(); + rc } async fn up_task( task_name: String, interval: Interval, - kill: oneshot::Receiver<()>, + _kill: oneshot::Receiver<()>, varmap: VarMap, up: TaskDetails, check: Option, @@ -124,7 +130,7 @@ async fn up_task( storage: mpsc::UnboundedSender, ) -> RunnerEvent { if let Some(check_cmd) = check.clone() { - let (subkill, subkill_rx) = oneshot::channel(); + let (_subkill, subkill_rx) = oneshot::channel(); let succeeded = run_task( task_name.clone(), interval, @@ -147,7 +153,7 @@ async fn up_task( } // UP - let (subkill, subkill_rx) = oneshot::channel(); + let (_subkill, subkill_rx) = oneshot::channel(); let succeeded = run_task( task_name.clone(), interval, @@ -168,7 +174,7 @@ async fn up_task( // recheck if let Some(check_cmd) = check { - let (subkill, subkill_rx) = oneshot::channel(); + let (_subkill, subkill_rx) = oneshot::channel(); let succeeded = run_task( task_name.clone(), interval, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 06ea17f..bef9a63 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -25,4 +25,5 @@ pub enum StorageMessage { Stop {}, } -pub mod redis_store; +pub mod noop; +pub mod redis; diff --git a/src/storage/noop.rs b/src/storage/noop.rs new file mode 100644 index 0000000..cbc3aaf --- /dev/null +++ b/src/storage/noop.rs @@ -0,0 +1,29 @@ +use super::*; + +/// The mpsc channel can be sized to fit max parallelism +pub async fn start_storage(mut msgs: mpsc::UnboundedReceiver) -> Result<()> { + let mut current_state = ResourceInterval::new(); + while let Some(msg) = msgs.recv().await { + use StorageMessage::*; + match msg { + StoreAttempt { .. } => {} + StoreState { state } => { + current_state = state; + } + LoadState { response } => { + response.send(current_state.clone()).unwrap(); + } + Stop {} => { + break; + } + } + } + + Ok(()) +} + +pub fn start(msgs: mpsc::UnboundedReceiver) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + start_storage(msgs).await.expect("Unable to start storage"); + }) +} diff --git a/src/storage/redis_store.rs b/src/storage/redis.rs similarity index 98% rename from src/storage/redis_store.rs rename to src/storage/redis.rs index 78968f2..814afbe 100644 --- a/src/storage/redis_store.rs +++ b/src/storage/redis.rs @@ -1,5 +1,7 @@ use super::*; +extern crate redis; + use futures::prelude::*; use redis::AsyncCommands;