diff --git a/examples/config_wfw_multiple.json b/examples/config_wfw_multiple.json new file mode 100644 index 0000000..b06861b --- /dev/null +++ b/examples/config_wfw_multiple.json @@ -0,0 +1,25 @@ +{ + "storage": { + "type": "redis", + "url": "redis://localhost", + "prefix": "world" + }, + "executor": { + "type": "agent", + "targets": [ + { + "base_url": "http://localhost:2504/api/v1", + "resources": { "cores": 1 } + }, + { + "base_url": "http://localhost:2505/api/v1", + "resources": { "cores": 1 } + }, + { + "base_url": "http://localhost:2506/api/v1", + "resources": { "cores": 1 } + } + ] + } +} + diff --git a/examples/long_world.json b/examples/long_world.json new file mode 100644 index 0000000..a27da8e --- /dev/null +++ b/examples/long_world.json @@ -0,0 +1,39 @@ +{ + "variables": { + "HOME": "/tmp/world_test" + }, + "calendars": { + "std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] } + }, + "tasks": { + "task_a": { + "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + + "provides": [ "task_a" ], + + "calendar_name": "std", + "times": [ "09:00:00", "12:00:00"], + "timezone": "America/New_York", + + "valid_from": "2022-01-01T09:00:00", + "valid_to": "2023-01-08T09:00:00" + }, + "task_b": { + "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + + "provides": [ "task_b" ], + "requires": [ { "resource": "task_a", "offset": 0 } ], + + "calendar_name": "std", + "times": [ "17:00:00" ], + "timezone": "America/New_York", + + "valid_from": "2022-01-04T09:00:00", + "valid_to": "2023-01-07T00:00:00" + } + } +} diff --git a/src/bin/wf/main.rs b/src/bin/wf/main.rs index 77631d6..5a1b5f3 100644 --- a/src/bin/wf/main.rs +++ b/src/bin/wf/main.rs @@ -1,5 +1,6 @@ use clap::Parser; +use log::*; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; use waterfall; @@ -120,6 +121,8 @@ async fn main() -> std::io::Result<()> { let tasks = world_def.taskset().unwrap(); + debug!("Config: {:?}", args); + let mut runner = Runner::new( tasks, world_def.variables, diff --git a/src/bin/wfw/main.rs b/src/bin/wfw/main.rs index ddaed98..2d05777 100644 --- a/src/bin/wfw/main.rs +++ b/src/bin/wfw/main.rs @@ -89,6 +89,14 @@ struct Args { /// Enable verbose logging #[clap(short, long)] verbose: bool, + + /// Configuration File + #[clap(short, long)] + host: Option, + + /// Configuration File + #[clap(short, long)] + port: Option, } #[actix_web::main] @@ -98,6 +106,20 @@ async fn main() -> std::io::Result<()> { let data = web::Data::new(init(args.config.as_ref())); let config = data.clone(); + let host = if let Some(h) = args.host { + h + } else { + config.ip.clone() + }; + + let port = if let Some(p) = args.port { + p + } else { + config.port + }; + + let listen_spec = format!("{}:{}", host, port); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); let res = HttpServer::new(move || { let cors = Cors::default() @@ -152,7 +174,7 @@ async fn main() -> std::io::Result<()> { .route("/run", web::post().to(submit_task)), ) }) - .bind(config.listen_spec())? + .bind(listen_spec)? .run() .await; diff --git a/src/executors/agent_executor.rs b/src/executors/agent_executor.rs index f2ed7f1..ac144d3 100644 --- a/src/executors/agent_executor.rs +++ b/src/executors/agent_executor.rs @@ -229,7 +229,7 @@ async fn start_agent_executor( None => { // Give the outstanding tasks a chance to complete or agents // recover - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; // Refresh any disabled targets for (tid, target) in targets.iter_mut().enumerate() { diff --git a/src/executors/local_executor.rs b/src/executors/local_executor.rs index 0ed8fc8..e0fc0c2 100644 --- a/src/executors/local_executor.rs +++ b/src/executors/local_executor.rs @@ -97,6 +97,8 @@ async fn run_task( let (program, args) = cmd.split_first().unwrap(); attempt.executor.push(format!("{:?}\n", details)); + debug!("Running command {:?}", cmd); + let mut command = Command::new(program); command.stdout(Stdio::piped()); command.stderr(Stdio::piped()); diff --git a/src/lib.rs b/src/lib.rs index 11f2b32..b405405 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ use anyhow::{anyhow, Result}; use chrono::prelude::*; use chrono::{Duration, TimeZone}; use chrono_tz::Tz; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use tokio::sync::{mpsc, oneshot}; diff --git a/src/runner.rs b/src/runner.rs index b61bdce..d45c54d 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -74,9 +74,9 @@ pub struct Runner { storage: mpsc::UnboundedSender, } -fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle { +fn gen_timeout(duration: Duration) -> tokio::task::JoinHandle { tokio::spawn(async move { - tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await; + tokio::time::sleep(duration.to_std().unwrap()).await; RunnerMessage::Timeout }) } @@ -237,13 +237,16 @@ impl Runner { } let current = if force_check { + info!("Force re-check set, starting with empty current state."); + ResourceInterval::new() + } else { + info!("Pulling last state from storage"); let (response, rx) = oneshot::channel(); storage .send(StorageMessage::LoadState { response }) .unwrap(); - rx.await.unwrap() - } else { - ResourceInterval::new() + let res = rx.await.unwrap(); + res }; let end_state = tasks.coverage()?; @@ -317,7 +320,7 @@ impl Runner { } if result_state != target { return Err(anyhow!( - "Actions generated produce\n\t{:?}\nExpected\n\t{:?}", + "Actions generated produced\n\t{:?}\nExpected\n\t{:?}", result_state, target )); @@ -348,6 +351,17 @@ impl Runner { .iter() .all(|action| action.state == ActionState::Completed) { + let now = Utc::now(); + let next_time = self + .tasks + .values() + .map(|t| t.schedule.next_time(now)) + .min() + .unwrap() + .with_timezone(&Utc); + let sleep_duration = next_time - now; + info!("Sleeping for {} until next task", sleep_duration); + self.events.push(gen_timeout(sleep_duration)); self.tick().unwrap(); } match self.events.next().await { @@ -371,7 +385,6 @@ impl Runner { task_name, interval, })) => { - println!("Completing {}/{}", task_name, interval); let action = self .queue .iter_mut() @@ -385,6 +398,7 @@ impl Runner { .or_insert(IntervalSet::new()) .insert(action.interval); } + info!("Updating State"); self.storage .send(StorageMessage::StoreState { state: self.current.clone(), diff --git a/src/schedule.rs b/src/schedule.rs index 9add410..a4972ee 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -82,7 +82,7 @@ impl Schedule { ) } - pub fn next_time(&self, dt: DateTime) -> DateTime { + pub fn next_time(&self, dt: DateTime) -> DateTime { let st = dt.with_timezone(&self.timezone); let mut date = st.date().naive_local(); @@ -108,7 +108,7 @@ impl Schedule { } /// Given a time, generate the preceding interval according to the schedule - pub fn prev_time(&self, dt: DateTime) -> DateTime { + pub fn prev_time(&self, dt: DateTime) -> DateTime { let st = dt.with_timezone(&self.timezone); let mut date = st.date().naive_local(); diff --git a/src/task_set.rs b/src/task_set.rs index 52a8860..155788c 100644 --- a/src/task_set.rs +++ b/src/task_set.rs @@ -22,10 +22,19 @@ impl TaskSet { pub fn get_state(&self, time: DateTime) -> Result { let mut res = ResourceInterval::new(); - let timeline = IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc))); - // Insert all of the covered items for task in self.values() { + // TODO Need to align each of these intervals with a scheduled time + let timeline = if time < MAX_TIME { + let cur_intv = task.schedule.interval(time.clone(), 0); + if cur_intv.end > time { + IntervalSet::from(Interval::new(MIN_TIME, cur_intv.start)) + } else { + IntervalSet::from(Interval::new(MIN_TIME, cur_intv.end)) + } + } else { + IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc))) + }; let task_timeline = task.valid_over.intersection(&timeline); for resource in &task.provides { let ris = res.entry(resource.clone()).or_insert(IntervalSet::new());