diff --git a/src/resource_interval.rs b/src/resource_interval.rs index 2138a72..6942157 100644 --- a/src/resource_interval.rs +++ b/src/resource_interval.rs @@ -31,7 +31,7 @@ impl ResourceInterval { ResourceInterval(res) } - pub fn difference(&mut self, other: &ResourceInterval) -> Self { + pub fn difference(&self, other: &ResourceInterval) -> Self { let res: HashMap = self .0 .iter() @@ -71,45 +71,6 @@ impl From<&HashMap> for ResourceInterval { } } -/* -impl Add for &ResourceInterval { - type Output = ResourceInterval; - fn add(self, other: &ResourceInterval) -> Self::Output { - let res: HashMap = - other.0.iter().fold(self.0.clone(), |mut acc, (res, is)| { - acc.entry(res.clone()) - .or_insert(IntervalSet::new()) - .merge(is); - acc - }); - ResourceInterval(res) - } -} - -impl Sub for &ResourceInterval { - type Output = ResourceInterval; - fn sub(self, other: &ResourceInterval) -> Self::Output { - let res: HashMap = self - .0 - .iter() - .map(|(res, is)| { - ( - res.clone(), - is.difference(other.get(res).unwrap_or(&IntervalSet::new())), - ) - }) - .collect(); - ResourceInterval(res) - } -} - -impl AsRef for ResourceInterval { - fn as_ref(&self) -> &Self { - self - } -} -*/ - #[cfg(test)] mod tests { use super::*; diff --git a/src/runner.rs b/src/runner.rs index b133370..a1442a5 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -49,6 +49,8 @@ pub struct Runner { vars: VarMap, output_options: TaskOutputOptions, + // States + end_state: ResourceInterval, target: ResourceInterval, current: ResourceInterval, @@ -191,13 +193,13 @@ impl Runner { } } - let target = tasks.get_state(Utc::now())?; - + let end_state = tasks.coverage()?; let mut runner = Runner { tasks, vars, output_options, - target, + end_state, + target: ResourceInterval::new(), current: ResourceInterval::new(), queue: Vec::new(), qidx: 0, @@ -206,46 +208,76 @@ impl Runner { executor, }; - // Create queue - let required = runner.target.difference(&runner.current); - runner.queue = runner - .tasks - .iter() - .fold(Vec::new(), |mut acc, (name, task)| { - let res: Vec = task - .generate_intervals(&required) - .unwrap() - .into_iter() - .map({ - |interval| Action { - task: name.clone(), - interval, - state: ActionState::Queued, - } - }) - .collect(); - acc.extend(res); - acc - }); + runner.tick()?; - let unsatisfied = runner + Ok(runner) + } + + pub fn tick(&mut self) -> Result<()> { + let target = self.tasks.get_state(Utc::now())?; + + // Create queue + let required = target.difference(&self.current); + println!("REQ {:?}", required); + for (name, task) in self.tasks.iter() { + let res = IntervalSet::from(task.generate_intervals(&required).unwrap()); + println!("GEN ({}): {:?}", name, res); + } + self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| { + let res: Vec = task + .generate_intervals(&required) + .unwrap() + .into_iter() + .map({ + |interval| Action { + task: name.clone(), + interval, + state: ActionState::Queued, + } + }) + .collect(); + acc.extend(res); + acc + }); + + // Ensure that all actions can be satisfied + let unsatisfied = self .queue .iter() .filter(|act| { - !runner + !self .tasks .get(&act.task) .unwrap() - .can_be_satisfied(act.interval, &runner.target) + .can_be_satisfied(act.interval, &target) }) .fold(HashSet::new(), |mut acc, a| { - println!("INVALID: {:?}", a); + println!("Task cannot be satisfied: {:?}", a); acc.insert(a.task.clone()); acc }); + // Ensure current + + let mut result_state = self.current.clone(); + for action in &self.queue { + for res in &self.tasks.get(&action.task).unwrap().provides { + result_state + .entry(res.clone()) + .or_insert(IntervalSet::new()) + .insert(action.interval); + } + } + if result_state != target { + return Err(anyhow!( + "Actions generated produce\n\t{:?}\nExpected\n\t{:?}", + result_state, + target + )); + } + if unsatisfied.is_empty() { - Ok(runner) + self.target = target; + Ok(()) } else { Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied)) } @@ -361,7 +393,7 @@ impl Runner { } fn is_done(&self) -> bool { - self.target == self.current + self.end_state == self.current } } diff --git a/src/schedule.rs b/src/schedule.rs index b064522..9add410 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -26,8 +26,11 @@ impl Schedule { return Vec::new(); } - let st = interval.start.with_timezone(&self.timezone); - let et = interval.end.with_timezone(&self.timezone); + let st = self.interval(interval.start, 0).start; + let et = self.interval(interval.end, 0).end; + + //let st = interval.start.with_timezone(&self.timezone); + //let et = interval.end.with_timezone(&self.timezone); let mut date = self.calendar.prev(st.date().naive_local()); let end_date = self.calendar.next(et.date().succ().naive_local()); @@ -58,19 +61,24 @@ impl Schedule { times } + /// Given a timestamp, return the interval that contains it pub fn interval(&self, dt: DateTime, offset: i32) -> Interval { // Need to get the current interval, then offset it let at = dt.with_timezone(&self.timezone); - let rt = if self.times.iter().any(|x| *x == at.time()) { + + // If the time is at an edge + let rt = if self.times.iter().any(|x| *x == at.time()) + && self.calendar.includes(at.date().naive_local()) + { at } else { - self.prev_time(at) + self.next_time(at) }; - let start = self.offset(rt, offset); + let end = self.offset(rt, offset); Interval::new( - start.with_timezone(&Utc), - self.next_time(start).with_timezone(&Utc), + self.prev_time(end).with_timezone(&Utc), + end.with_timezone(&Utc), ) } @@ -125,8 +133,9 @@ impl Schedule { self.timezone.from_local_datetime(&time).unwrap() } - /// Given a timestamp, return the scheduled time `offset` - pub fn offset(&self, mut dt: DateTime, offset: i32) -> DateTime { + // Given a timestamp, return the scheduled time `offset` + // A bit dangerous, providing an offset of 0 + fn offset(&self, mut dt: DateTime, offset: i32) -> DateTime { if offset > 0 { for _ in 0..offset { dt = self.next_time(dt); @@ -357,6 +366,21 @@ mod tests { timezone, }; + // Weekends are correct + assert_eq!( + sched.interval(timezone.ymd(2022, 1, 1).and_hms(9, 0, 0), 0), + Interval::new( + timezone + .ymd(2021, 12, 31) + .and_hms(11, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 3) + .and_hms(10, 30, 0) + .with_timezone(&Utc) + ) + ); + // prev and next are reversible let dt = timezone.ymd(2022, 1, 3).and_hms(11, 0, 0); assert_eq!( diff --git a/src/task.rs b/src/task.rs index 4d8e3c5..8628445 100644 --- a/src/task.rs +++ b/src/task.rs @@ -43,7 +43,8 @@ impl TaskDefinition { pub fn to_task(&self, calendar: &Calendar) -> Task { let schedule = Schedule::new(calendar.clone(), self.times.clone(), self.timezone); /* - The valid_{from,to} interval must be aligned to the actual schedule + The valid_{from,to} interval must be aligned to the actual schedule. + They will be adjusted to include any interval who's */ let start = schedule .interval( @@ -110,7 +111,7 @@ impl Task { }) .collect(); - if reqs.is_empty() { + let res = if reqs.is_empty() { Ok(Vec::new()) } else { let ris = &reqs[0]; @@ -129,7 +130,8 @@ impl Task { acc })) } - } + }; + res } pub fn validity(&self, max_time: DateTime) -> IntervalSet { @@ -191,7 +193,7 @@ impl Task { #[cfg(test)] mod tests { use super::*; - use chrono_tz::America::Halifax; + use chrono_tz::America::{Halifax, New_York}; macro_rules! intv { ( $x:literal, $y:literal ) => { @@ -289,6 +291,14 @@ mod tests { } }; + // Require that all times generated be within the + // valid_over + let mut exact = ResourceInterval::new(); + exact.insert(&"resource_a".to_owned(), &task.valid_over.clone()); + exact.insert(&"resource_b".to_owned(), &task.valid_over.clone()); + let res = IntervalSet::from(task.generate_intervals(&exact).unwrap()); + assert_eq!(res, task.valid_over); + // Ensure that the intervals generated over the valid period // exactly cover the valid period let mut theoretical = ResourceInterval::new(); @@ -297,4 +307,63 @@ mod tests { let generated = IntervalSet::from(task.generate_intervals(&theoretical).unwrap()); assert_eq!(task.valid_over, generated); } + + #[test] + fn check_task_valid_over() { + let task_json = r#" + { + "up": "/usr/bin/touch /tmp/a_${yyyymmdd}_${hhmmss}", + "down": "/usr/bin/rm /tmp/a_${yyyymmdd}_${hhmmss}", + "check": "/usr/bin/test -e /tmp/a_${yyyymmdd}_${hhmmss}", + "provides": [ + "resource_a", + "resource_b" + ], + "requires": [ + { "resource": "alpha", "offset": 0 }, + { "resource": "beta", "offset": -1 } + ], + "calendar_name": "std", + "times": [ "17:00:00" ], + "timezone": "America/New_York", + "valid_from": "2022-01-04T09:00:00", + "valid_to": "2022-01-07T00:00:00" + } + "#; + + let cal = Calendar::new(); + { + let task_def: TaskDefinition = serde_json::from_str(task_json).unwrap(); + let task = task_def.to_task(&cal); + + // Assert the valid interval is correct + assert_eq!( + task.valid_over, + IntervalSet::from(vec![Interval::new( + New_York.ymd(2022, 1, 3).and_hms(17, 0, 0), + New_York.ymd(2022, 1, 6).and_hms(17, 0, 0) + )]) + ); + } + + // Another test with different times + { + let mut task_def: TaskDefinition = serde_json::from_str(task_json).unwrap(); + + task_def.times = vec![NaiveTime::from_hms(9, 0, 0), NaiveTime::from_hms(12, 0, 0)]; + task_def.valid_from = NaiveDate::from_ymd(2022, 1, 1).and_hms(9, 0, 0); + task_def.valid_to = Some(NaiveDate::from_ymd(2022, 1, 7).and_hms(17, 0, 0)); + + let task = task_def.to_task(&cal); + + // Assert the valid interval is correct + assert_eq!( + task.valid_over, + IntervalSet::from(vec![Interval::new( + New_York.ymd(2021, 12, 31).and_hms(12, 0, 0), + New_York.ymd(2022, 1, 7).and_hms(12, 0, 0) + )]) + ); + } + } }