diff --git a/src/bin/wfd/main.rs b/src/bin/wfd/main.rs index 050e72a..6ddb0c4 100644 --- a/src/bin/wfd/main.rs +++ b/src/bin/wfd/main.rs @@ -146,16 +146,28 @@ struct TimelineGroup { data: Vec, } +#[derive(Serialize, Deserialize)] +struct DetailedTimelineOptions { + #[serde(default)] + max_intervals: Option, +} + async fn get_detailed_timeline( + options: web::Query, span: web::Json, state: web::Data, ) -> impl Responder { let interval = span.into_inner(); + let max_intervals = options.into_inner().max_intervals; let (response, rx) = oneshot::channel(); state .runner_tx - .send(RunnerMessage::GetResourceStateDetails { interval, response }) + .send(RunnerMessage::GetResourceStateDetails { + interval, + response, + max_intervals, + }) .unwrap(); match rx.await { @@ -201,32 +213,34 @@ async fn get_detailed_timeline( /// What resources it relies on /// Last attempt (if any) async fn get_segment_details( + max_intervals: web::Query>, span: web::Json, state: web::Data, ) -> impl Responder { + /* let interval = span.into_inner(); let (response, rx) = oneshot::channel(); state .runner_tx - .send(RunnerMessage::GetResourceStateDetails { interval, response }) + .send(RunnerMessage::GetResourceStateDetails { + interval, + response, + max_intervals: max_intervals.into_inner(), + }) .unwrap(); match rx.await { Ok(actions) => { let mut timeline = Vec::new(); - info!( - "Querying for actions over {}, got {} responses.", - interval, - actions.len() - ); - for (resource, tasks) in actions { let mut group = TimelineGroup { group: resource.clone(), data: Vec::new(), }; - for (task_name, intervals) in tasks.into_iter() { + for (task_name, mut intervals) in tasks.into_iter() { + // Collapse intervals + if intervals.len() > 50 {} let data = intervals .into_iter() .map(|a| TimelineInterval { @@ -249,6 +263,8 @@ async fn get_segment_details( error: format!("{:?}", error), }), } + */ + HttpResponse::Ok() } /* diff --git a/src/lib.rs b/src/lib.rs index 584f7d0..575d84c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![allow(unused_imports)] #![allow(dead_code)] +#![feature(slice_group_by)] use anyhow::{anyhow, Result}; use chrono::prelude::*; diff --git a/src/runner.rs b/src/runner.rs index b22cdf7..0fb50e5 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,6 +1,7 @@ use super::*; use futures::stream::futures_unordered::FuturesUnordered; use futures::StreamExt; +use std::cmp::Ordering; use std::collections::VecDeque; /* @@ -13,7 +14,7 @@ use std::collections::VecDeque; - current = TaskSet::coverage (the theoretical) */ -#[derive(Debug, Clone, Copy, PartialEq, Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, PartialOrd)] pub enum ActionState { Queued, Running, @@ -67,6 +68,7 @@ pub enum RunnerMessage { GetResourceStateDetails { interval: Interval, response: oneshot::Sender, + max_intervals: Option, }, Stop, } @@ -237,6 +239,40 @@ fn delayed_event(delay: Duration, event: RunnerMessage) -> tokio::task::JoinHand }) } +// Coalesces adjascent actions +fn coalesce_actions(mut actions: Vec) -> Vec { + if actions.is_empty() { + return actions; + } + + actions.sort_unstable_by(|a, b| { + let ord = a.task.partial_cmp(&b.task).unwrap(); + if ord == Ordering::Equal { + a.state.partial_cmp(&b.state).unwrap() + } else { + ord + } + }); + + let mut res: Vec = Vec::new(); + for group in actions.group_by(|a, b| a.task == b.task && a.state == b.state) { + let intervals: Vec = group.iter().map(|x| x.interval).collect(); + let is = IntervalSet::from(intervals); + let task = group.first().unwrap().task; + let state = group.first().unwrap().state; + + for interval in is.iter() { + res.push(Action { + task: task, + state: state, + interval: *interval, + }) + } + } + + res +} + impl Runner { pub async fn new( tasks: TaskSet, @@ -375,6 +411,7 @@ impl Runner { &self, interval: Interval, response: oneshot::Sender, + max_intervals: Option, ) { // HashMap, DateTime, ActionState)>>>; let mut res: ResourceStateDetails = HashMap::new(); @@ -396,13 +433,19 @@ impl Runner { res.insert(resource.clone(), res_ints); } - let actions: Vec = self + let mut actions: Vec = self .actions .iter() .filter(|x| interval.is_contiguous(x.interval)) .cloned() .collect(); + if let Some(max_intv) = max_intervals { + if actions.len() > max_intv { + actions = coalesce_actions(actions); + } + } + info!( "Filtered {} actions down to {}", self.actions.len(), @@ -444,8 +487,12 @@ impl Runner { Some(Ok(RunnerMessage::Tick)) => { self.tick(); } - Some(Ok(RunnerMessage::GetResourceStateDetails { interval, response })) => { - self.get_resource_state_details(interval, response); + Some(Ok(RunnerMessage::GetResourceStateDetails { + interval, + response, + max_intervals, + })) => { + self.get_resource_state_details(interval, response, max_intervals); } Some(Ok(RunnerMessage::ForceUp { resources, diff --git a/webui/src/App.vue b/webui/src/App.vue index e26509c..f9a619b 100644 --- a/webui/src/App.vue +++ b/webui/src/App.vue @@ -9,6 +9,7 @@ export default { refreshSeconds: 15, // How often to refresh waterfallURL: 'http://localhost:2503', activeSegment: null, + maxDisplayIntervals: 500, } }, @@ -19,6 +20,10 @@ export default { updateRefreshInterval(interval) { this.refreshSeconds = interval; }, + updateMaxDisplayIntervals(cnt) { + this.maxDisplayIntervals = cnt; + }, + setActiveSegment(segment) { this.activeSegment = segment; }, @@ -41,14 +46,17 @@ input { max-width: 25%; }
diff --git a/webui/src/components/GlobalSettings.vue b/webui/src/components/GlobalSettings.vue index 222b78c..f81a51f 100644 --- a/webui/src/components/GlobalSettings.vue +++ b/webui/src/components/GlobalSettings.vue @@ -1,17 +1,21 @@