diff --git a/src/bin/wf/main.rs b/src/bin/wf/main.rs index 1962286..e8da2c1 100644 --- a/src/bin/wf/main.rs +++ b/src/bin/wf/main.rs @@ -136,7 +136,7 @@ async fn main() -> std::io::Result<()> { .await .unwrap(); - runner.run().await; + runner.run(false).await; exe_tx.send(ExecutorMessage::Stop {}).unwrap(); exe_handle.await.unwrap(); diff --git a/src/bin/wfd/main.rs b/src/bin/wfd/main.rs index b5af41e..4843c34 100644 --- a/src/bin/wfd/main.rs +++ b/src/bin/wfd/main.rs @@ -93,8 +93,25 @@ async fn get_state(state: web::Data) -> impl Responder { let (response, rx) = oneshot::channel(); state - .storage_tx - .send(StorageMessage::LoadState { response }) + .runner_tx + .send(RunnerMessage::GetState { response }) + .unwrap(); + + match rx.await { + Ok(world) => HttpResponse::Ok().json(world), + Err(error) => HttpResponse::BadRequest().json(SimpleError { + error: format!("{:?}", error), + }), + } +} + +async fn timeline(span: web::Query, state: web::Data) -> impl Responder { + let interval = span.into_inner(); + + let (response, rx) = oneshot::channel(); + state + .runner_tx + .send(RunnerMessage::GetResourceStateDetails { interval, response }) .unwrap(); match rx.await { @@ -191,7 +208,7 @@ async fn main() -> std::io::Result<()> { .unwrap(); let runner_handle = tokio::spawn(async move { - runner.run().await; + runner.run(true).await; }); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); diff --git a/src/prelude.rs b/src/prelude.rs index 7a5e578..9c100cf 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,5 +1,6 @@ pub use crate::calendar::Calendar; pub use crate::executors::*; +pub use crate::interval::Interval; pub use crate::runner::{Runner, RunnerMessage}; pub use crate::storage::*; pub use crate::task::{TaskDefinition, TaskResources}; diff --git a/src/runner.rs b/src/runner.rs index 26b7f7d..3092941 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -30,8 +30,20 @@ pub struct Action { } #[derive(Debug, Serialize, Deserialize)] +pub struct RunnerState { + coverage: ResourceInterval, + current: ResourceInterval, +} + +// Eventually we want to coerce the data into this format for timelines-chart +// Resource (group) -> Task (label) -> data [ { "timeRange": [date,date], "val": state } ] +pub type ResourceStateDetails = + HashMap, DateTime, ActionState)>>>; + +#[derive(Debug)] pub enum RunnerMessage { Tick, + PollMessages, ActionCompleted { action_id: usize, succeeded: bool, @@ -50,6 +62,13 @@ pub enum RunnerMessage { resources: HashSet, interval: Interval, }, + GetState { + response: oneshot::Sender, + }, + GetResourceStateDetails { + interval: Interval, + response: oneshot::Sender, + }, Stop, } @@ -274,14 +293,13 @@ impl Runner { storage, }; - runner.tick(); - runner.queue_actions(); + runner.update_target(); Ok(runner) } // Generate a new target state and generate any required actions - pub fn tick(&mut self) { + pub fn update_target(&mut self) { let new_target = self.tasks.get_state(Utc::now() + Duration::days(1)); let new_required = new_target.difference(&self.target); let mut new_actions = @@ -310,35 +328,108 @@ impl Runner { self.actions.extend(new_actions); } - pub async fn run(&mut self) { - self.events - .push(delayed_event(Duration::seconds(1), RunnerMessage::Tick)); + fn tick(&mut self) { + debug!("Tick"); + // Enqueue new messages + while let Ok(msg) = self.messages.try_recv() { + self.events.push(delayed_event(Duration::seconds(0), msg)); + } + match self.actions.last() { + Some(action) => { + if action.interval.end <= Utc::now() { + self.tick() + } + } + None => self.tick(), + } + + // Perform maintenance + self.queue_actions(); + + self.events.push(delayed_event( + Duration::milliseconds(250), + RunnerMessage::Tick, + )); + } + + fn poll_messages(&mut self) { + while let Ok(msg) = self.messages.try_recv() { + self.events.push(delayed_event(Duration::seconds(0), msg)); + } + self.events.push(delayed_event( + Duration::milliseconds(10), + RunnerMessage::PollMessages, + )); + } + + fn get_resource_state_details( + &self, + interval: Interval, + response: oneshot::Sender, + ) { + // HashMap, DateTime, ActionState)>>>; + let mut res : ResourceStateDetails = HashMap::new(); + + let all_resources : HashSet = self.tasks.iter().fold(HashSet::new(), |mut acc, t| { + acc.extend(t.provides.clone()); + acc + }); + + // Build out the hash + for resource in all_resources { + let mut res_ints = HashMap::new(); + for task in self.tasks.iter() { + if task.provides.contains(&resource) { + res_ints.insert(task.name.clone(), Vec::new()); + } + } + res.insert(resource.clone(), res_ints); + } + + let actions = self + .actions + .iter() + .filter(|x| interval.is_contiguous(x.interval)) + .collect(); + + + for action in actions { + let task = &self.tasks[action.task]; + for resource in task.provides { + res.entry(resource.clone()).or_insert(HashMap::new()).entry(task.name).or_insert(Hash + } + } + + for task in &self.tasks { + for resource in &task.provides {} + } + + response.send(res); + } + + pub async fn run(&mut self, mut stay_up: bool) { + self.tick(); + self.poll_messages(); - // Need to incorporate the ability to receive messages - // // Loop until the current state matches the end state - while !self.is_done() { + while stay_up || !self.is_done() { match self.events.next().await { + Some(Ok(RunnerMessage::GetState { response })) => { + response + .send(RunnerState { + current: self.current.clone(), + coverage: self.end_state.clone(), + }) + .unwrap_or(()); + } + Some(Ok(RunnerMessage::PollMessages)) => { + self.poll_messages(); + } Some(Ok(RunnerMessage::Tick)) => { - debug!("Tick"); - // Enqueue new messages - while let Ok(msg) = self.messages.try_recv() { - self.events.push(delayed_event(Duration::seconds(0), msg)); - } - match self.actions.last() { - Some(action) => { - if action.interval.end <= Utc::now() { - self.tick() - } - } - None => self.tick(), - } - - // Perform maintenance - self.queue_actions(); - - self.events - .push(delayed_event(Duration::seconds(5), RunnerMessage::Tick)); + self.tick(); + } + Some(Ok(RunnerMessage::GetResourceStateDetails { interval, response })) => { + self.get_resource_state_details(interval, response); } Some(Ok(RunnerMessage::ForceUp { resources, @@ -386,6 +477,7 @@ impl Runner { } Some(Ok(RunnerMessage::Stop)) => { info!("Stopping"); + stay_up = false; break; } Some(Ok(RunnerMessage::RetryAction { action_id })) => { @@ -410,8 +502,8 @@ impl Runner { fn complete_task(&mut self, action_id: usize, succeeded: bool) { info!("Completing action {}", action_id); + let action = &mut self.actions[action_id]; if succeeded { - let action = &mut self.actions[action_id]; let task = self.tasks.get(action.task).unwrap(); action.state = ActionState::Completed; for res in &task.provides { @@ -423,6 +515,7 @@ impl Runner { self.store_state(); self.queue_actions(); } else { + action.state = ActionState::Errored; self.events.push(delayed_event( Duration::seconds(30), RunnerMessage::RetryAction { action_id }, @@ -567,7 +660,7 @@ mod tests { .await .unwrap(); - runner.run().await; + runner.run(false).await; tx.send(ExecutorMessage::Stop {}).unwrap(); executor.await.unwrap();