diff --git a/src/interval_set.rs b/src/interval_set.rs index 111c3b1..21d1ac4 100644 --- a/src/interval_set.rs +++ b/src/interval_set.rs @@ -126,6 +126,12 @@ impl IntervalSet { pub fn difference(&self, other: &Self) -> Self { self.intersection(&other.complement()) } + + /// Subtract all intervals in `other` from self + /// both sides must be sorted + pub fn subtract(&mut self, other: &Self) { + self.0 = self.difference(other).0; + } } impl Deref for IntervalSet { type Target = Vec; diff --git a/src/runner.rs b/src/runner.rs index 3582214..78e290d 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -32,18 +32,26 @@ pub struct Action { #[derive(Debug, Serialize, Deserialize)] pub enum RunnerMessage { Tick, - ActionCompleted { action_id: usize, succeeded: bool }, - RetryAction { action_id: usize }, + ActionCompleted { + action_id: usize, + succeeded: bool, + }, + RetryAction { + action_id: usize, + }, /* + /// Marks all resources in the set available over the interval ForceUp { resources: HashSet, interval: Interval, }, + */ + /// Marks all resources in the set as down over _at least_ the interval. + /// Will cause a re-check / re-gen ForceDown { resources: HashSet, interval: Interval, }, - */ Stop, } @@ -334,6 +342,37 @@ impl Runner { self.events .push(delayed_event(Duration::seconds(5), RunnerMessage::Tick)); } + /* + Some(Ok(RunnerMessage::ForceUp { + resources, + interval, + })) => { + } + */ + Some(Ok(RunnerMessage::ForceDown { + resources, + interval, + })) => { + // Use the interval to identify + for (tid, task) in self.tasks.iter().enumerate() { + if task.provides.is_subset(&resources) { + let aligned_is = + IntervalSet::from(task.schedule.align_interval(interval)); + for resource in &task.provides { + self.current + .get_mut(resource) + .unwrap() + .subtract(&aligned_is); + } + for action in &mut self.actions { + if action.task == tid && aligned_is.has_subset(action.interval) { + action.state = ActionState::Queued; + } + } + } + } + self.store_state(); + } Some(Ok(RunnerMessage::Stop)) => { info!("Stopping"); break; @@ -347,29 +386,7 @@ impl Runner { action_id, succeeded, })) => { - info!("Completing action {}", action_id); - if succeeded { - let action = &mut self.actions[action_id]; - let task = self.tasks.get(action.task).unwrap(); - action.state = ActionState::Completed; - for res in &task.provides { - self.current - .entry(res.clone()) - .or_insert(IntervalSet::new()) - .insert(action.interval); - } - self.storage - .send(StorageMessage::StoreState { - state: self.current.clone(), - }) - .unwrap(); - self.queue_actions(); - } else { - self.events.push(delayed_event( - Duration::seconds(30), - RunnerMessage::RetryAction { action_id }, - )); - } + self.complete_task(action_id, succeeded); } Some(Err(e)) => { panic!("Something went wrong: {:?}", e) @@ -380,6 +397,36 @@ impl Runner { } } + fn complete_task(&mut self, action_id: usize, succeeded: bool) { + info!("Completing action {}", action_id); + if succeeded { + let action = &mut self.actions[action_id]; + let task = self.tasks.get(action.task).unwrap(); + action.state = ActionState::Completed; + for res in &task.provides { + self.current + .entry(res.clone()) + .or_insert(IntervalSet::new()) + .insert(action.interval); + } + self.store_state(); + self.queue_actions(); + } else { + self.events.push(delayed_event( + Duration::seconds(30), + RunnerMessage::RetryAction { action_id }, + )); + } + } + + fn store_state(&self) { + self.storage + .send(StorageMessage::StoreState { + state: self.current.clone(), + }) + .unwrap(); + } + fn queue_actions(&mut self) { let now = Utc::now(); diff --git a/src/schedule.rs b/src/schedule.rs index a4972ee..a1dea7a 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -21,6 +21,32 @@ impl Schedule { } } + fn is_end_time(&self, dt: DateTime) -> bool { + // Need to get the current interval, then offset it + let at = dt.with_timezone(&self.timezone); + self.times.iter().any(|x| *x == at.time()) + && self.calendar.includes(at.date().naive_local()) + } + + /// Given an interval I, return the interval J that is the smallest + /// set of schedule intervals that completely contain I. + /// If the given interval is bounded by MIN_TIME or MAX_TIME, then the + /// returned interval will be likewise bounded + pub fn align_interval(&self, interval: Interval) -> Interval { + let st = if interval.start == MIN_TIME { + self.next_time(interval.start).with_timezone(&Utc) + } else { + interval.start + }; + let et = if interval.end == MAX_TIME { + self.prev_time(interval.end).with_timezone(&Utc) + } else { + interval.end + }; + + Interval::new(self.interval(st, 0).start, self.interval(et, 0).end) + } + pub fn generate(&self, interval: Interval) -> Vec { if self.times.is_empty() { return Vec::new(); @@ -67,9 +93,7 @@ impl Schedule { let at = dt.with_timezone(&self.timezone); // If the time is at an edge - let rt = if self.times.iter().any(|x| *x == at.time()) - && self.calendar.includes(at.date().naive_local()) - { + let rt = if self.is_end_time(at) { at } else { self.next_time(at)