From 811057ecad8ea6080e25a4852d2aec8b5200f6a1 Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Wed, 28 Sep 2022 21:06:18 -0300 Subject: [PATCH] Pass three with right-half intervals --- .gitignore | 2 + Cargo.toml | 19 ++ src/calendar.rs | 80 +++++++++ src/interval.rs | 172 ++++++++++++++++++ src/interval_set.rs | 247 ++++++++++++++++++++++++++ src/lib.rs | 27 +++ src/requirement.rs | 179 +++++++++++++++++++ src/schedule.rs | 371 +++++++++++++++++++++++++++++++++++++++ src/task.rs | 413 ++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 1510 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/calendar.rs create mode 100644 src/interval.rs create mode 100644 src/interval_set.rs create mode 100644 src/lib.rs create mode 100644 src/requirement.rs create mode 100644 src/schedule.rs create mode 100644 src/task.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ebe9958 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "waterfall" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1" +chrono = { version = "0.4", features = ["serde"] } +chrono-tz = { version = "0.6", features = ["serde"] } +futures = "0.3" +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["full"] } +users = { version = "0.11", optional = true } +psutil = { version = "3.2", features = ["process"] } +sysinfo = "0.23" diff --git a/src/calendar.rs b/src/calendar.rs new file mode 100644 index 0000000..6d03ebf --- /dev/null +++ b/src/calendar.rs @@ -0,0 +1,80 @@ +use super::*; +use std::collections::HashSet; + +pub fn default_dow_set() -> HashSet { + use Weekday::*; + HashSet::from([Mon, Tue, Wed, Thu, Fri]) +} + +// TODO +// - Make sure include and exclude are disjoint + +/// Maintains a list of days that are considered active +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct Calendar { + /// Day of Week Mask + #[serde(default = "default_dow_set")] + pub mask: HashSet, + + /// Dates to explicitly include + #[serde(default)] + pub exclude: HashSet, + + /// Dates to explicitly include + #[serde(default)] + pub include: HashSet, +} + +impl Calendar { + pub fn new() -> Self { + Calendar { + mask: default_dow_set(), + ..Calendar::default() + } + } + + pub fn includes(&self, date: NaiveDate) -> bool { + if self.exclude.contains(&date) { + false + } else if self.include.contains(&date) { + true + } else { + self.mask.contains(&date.weekday()) + } + } + + pub fn next(&self, date: NaiveDate) -> NaiveDate { + self.offset(date, 1) + } + + pub fn prev(&self, date: NaiveDate) -> NaiveDate { + self.offset(date, -1) + } + + pub fn offset(&self, mut date: NaiveDate, mut offset: i64) -> NaiveDate { + let incr = if offset < 0 { 1 } else { -1 }; + while offset != 0 { + date = date + Duration::days(-1 * incr); + while !self.includes(date) { + date = date + Duration::days(-1 * incr); + } + offset += incr; + } + date + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_next() { + let cal = Calendar::new(); + assert_eq!( + cal.next(NaiveDate::from_ymd(2022, 1, 1)), + NaiveDate::from_ymd(2022, 1, 3) + ); + } +} diff --git a/src/interval.rs b/src/interval.rs new file mode 100644 index 0000000..a416ddd --- /dev/null +++ b/src/interval.rs @@ -0,0 +1,172 @@ +use super::*; +use std::ops::{Add, BitAnd, BitOr, Sub}; + +/* + These intervals are all half-open on the left, so: + (start, end) + + This makes the end included in the interval for which it's + in charge of +*/ + +#[derive(Copy, Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Ord, PartialOrd)] +pub struct Interval { + pub start: DateTime, + pub end: DateTime, +} + +impl Interval { + pub fn new(start: DateTime, end: DateTime) -> Self { + let start = start.with_timezone(&Utc); + let end = end.with_timezone(&Utc); + if start > end { + Interval { end, start } + } else { + Interval { start, end } + } + } + + pub fn is_empty(&self) -> bool { + return self.start == self.end; + } + + pub fn len(&self) -> Duration { + self.end - self.start + } + + pub fn contains(&self, dt: DateTime) -> bool { + return self.start < dt && dt <= self.end; + } + + /// True if `other` is a subset of this interval + pub fn has_subset(&self, other: Interval) -> bool { + return self.start <= other.start && other.end <= self.end; + } + + /// True if `other` overlaps or is immediately adjascent to self + pub fn is_contiguous(&self, other: Interval) -> bool { + return (self.start <= other.start && other.start <= self.end) + || (other.start <= self.start && self.start <= other.end); + } + + /// True if self intersection other is an empty set + pub fn is_disjoint(&self, other: Interval) -> bool { + return self.end <= other.start || other.end <= self.start; + } + + pub fn intersection(&self, other: Interval) -> Interval { + if self.is_disjoint(other) { + Interval::new(self.start, self.start) + } else { + Interval { + start: std::cmp::max(self.start, other.start), + end: std::cmp::min(self.end, other.end), + } + } + } +} + +impl BitAnd for Interval { + type Output = Interval; + fn bitand(self, other: Interval) -> Self::Output { + self.intersection(other) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + macro_rules! dt { + ( $x:literal ) => { + Utc.ymd(2022, 1, 1).and_hms($x, 0, 0) + }; + } + + macro_rules! intv { + ( $x:literal, $y:literal ) => { + Interval::new( + Utc.ymd(2022, 1, 1).and_hms($x, 0, 0), + Utc.ymd(2022, 1, 1).and_hms($y, 0, 0), + ) + }; + } + + /* + Intervals + */ + + #[test] + fn test_interval_contains() { + let intv = intv!(2, 5); + + // Ensure the interval is half-open on the right + assert!(!intv.contains(dt!(0))); + assert!(!intv.contains(dt!(1))); + assert!(!intv.contains(dt!(2))); + assert!(intv.contains(dt!(3))); + assert!(intv.contains(dt!(4))); + assert!(intv.contains(dt!(5))); + assert!(!intv.contains(dt!(6))); + assert!(!intv.contains(dt!(7))); + } + + #[test] + fn test_interval_ordering() { + assert!(intv!(1, 2) < intv!(2, 3)); + assert!(intv!(1, 3) < intv!(2, 4)); + assert!(intv!(1, 3) < intv!(4, 5)); + assert!(intv!(1, 3) < intv!(4, 6)); + } + + #[test] + fn test_is_disjoint() { + let int = intv!(2, 5); + + assert!(int.is_disjoint(intv!(1, 2))); + assert!(!int.is_disjoint(intv!(1, 3))); + assert!(int.is_disjoint(intv!(5, 6))); + } + + #[test] + fn test_is_contiguous() { + let int = intv!(3, 4); + + assert!(!int.is_contiguous(intv!(1, 2))); + assert!(int.is_contiguous(intv!(2, 3))); + assert!(int.is_contiguous(intv!(1, 3))); + assert!(int.is_contiguous(intv!(4, 6))); + assert!(int.is_contiguous(intv!(1, 6))); + assert!(!int.is_contiguous(intv!(5, 6))); + } + + #[test] + fn test_has_subset() { + let int = intv!(2, 5); + + // Contains itself + assert!(int.has_subset(int)); + + assert!(int.has_subset(intv!(3, 4))); // Contains inner interval + assert!(!int.has_subset(intv!(1, 2))); // Left contiguous + assert!(!int.has_subset(intv!(1, 3))); // Left overlap + assert!(!int.has_subset(intv!(4, 6))); // Right overlap + assert!(!int.has_subset(intv!(5, 6))); // Right contiguous + assert!(!int.has_subset(intv!(1, 6))); // Outer scope + } + + #[test] + fn test_intersection() { + let int = intv!(2, 5); + + assert_eq!(int.intersection(int), int); // Union with itself + assert_eq!(int.intersection(intv!(1, 6)), int); // Union with itself + assert!(int.intersection(intv!(1, 2)).is_empty()); // Left + assert_eq!(int.intersection(intv!(1, 3)), intv!(2, 3)); // Left Overlap + assert_eq!(int.intersection(intv!(2, 3)), intv!(2, 3)); // Inner left + assert_eq!(int.intersection(intv!(3, 4)), intv!(3, 4)); // Inner + assert_eq!(int.intersection(intv!(4, 5)), intv!(4, 5)); // Right Inner + assert_eq!(int.intersection(intv!(4, 6)), intv!(4, 5)); // Inner + assert!(int.intersection(intv!(5, 6)).is_empty()); // Right + } +} diff --git a/src/interval_set.rs b/src/interval_set.rs new file mode 100644 index 0000000..6a059e3 --- /dev/null +++ b/src/interval_set.rs @@ -0,0 +1,247 @@ +use super::*; +use std::convert::From; +use std::ops::{Add, BitAnd, BitOr, Deref, DerefMut, Not, Sub}; + +/// A coalescing set of intervals +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd)] +pub struct IntervalSet(Vec); + +impl IntervalSet { + pub fn new() -> Self { + IntervalSet(Vec::new()) + } + + pub fn start(&self) -> Option> { + if let Some(intv) = self.first() { + Some(intv.start) + } else { + None + } + } + + pub fn end(&self) -> Option> { + if let Some(intv) = self.last() { + Some(intv.end) + } else { + None + } + } + + /// Returns true if interval is a subset + pub fn has_subset(&self, interval: Interval) -> bool { + self.0.iter().any(|x| x.has_subset(interval)) + } + + pub fn contains(&self, dt: DateTime) -> bool { + self.0.iter().any(|x| x.contains(dt.with_timezone(&Utc))) + } + + // Naive O(n^2) implementation + pub fn is_disjoint(&self, other: &IntervalSet) -> bool { + self.0 + .iter() + .all(|x| other.iter().all(|y| x.is_disjoint(*y))) + } + + pub fn intersection(&self, other: &IntervalSet) -> Self { + let mut res = IntervalSet(self.0.iter().fold(Vec::::new(), |mut acc, x| { + let new_intervals: Vec = other + .iter() + .map(|y| x.intersection(*y)) + .filter(|x| !x.is_empty()) + .collect(); + acc.extend(new_intervals); + acc + })); + res.coalesce(); + res + } + + pub fn complement(&self) -> Self { + if self.is_empty() { + IntervalSet(vec![Interval::new( + DateTime::::MIN_UTC, + DateTime::::MAX_UTC, + )]) + } else { + // Need to build the start of the range + let mut acc = Vec::new(); + let mut last_end = DateTime::::MIN_UTC; + for intv in &self.0 { + if intv.start == DateTime::::MIN_UTC { + last_end = intv.end; + } else { + acc.push(Interval::new(last_end, intv.start)); + last_end = intv.end; + } + } + if last_end != DateTime::::MAX_UTC { + acc.push(Interval::new(last_end, DateTime::::MAX_UTC)); + } + IntervalSet(acc) + } + } + + pub fn insert(&mut self, interval: Interval) { + let should_coalesce = self.0.iter().any(|intv| intv.is_contiguous(interval)); + self.0.push(interval); + if should_coalesce { + self.coalesce(); + } + } + + pub fn merge(&mut self, other: &IntervalSet) { + self.0.extend(other.0.clone()); + self.coalesce(); + } + + pub fn coalesce(&mut self) { + self.0.sort_unstable(); + self.0 = self + .0 + .iter() + .filter(|x| !x.is_empty()) + .fold(Vec::new(), |mut acc, int| { + if let Some(lst) = acc.last_mut() { + if !lst.is_contiguous(*int) { + acc.push(*int) + } else { + lst.end = int.end + } + } else { + acc.push(*int); + } + + acc + }); + } + + pub fn union(&self, other: &IntervalSet) -> Self { + let mut is = IntervalSet(self.0.iter().chain(other.0.iter()).copied().collect()); + is.coalesce(); + is + } + + /// Subtract all intervals in `other` from self + /// both sides must be sorted + pub fn difference(&self, other: &Self) -> Self { + self.intersection(&other.complement()) + } +} +impl Deref for IntervalSet { + type Target = Vec; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for IntervalSet { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} +impl From for IntervalSet { + fn from(interval: Interval) -> Self { + IntervalSet(vec![interval]) + } +} +impl From> for IntervalSet { + fn from(intervals: Vec) -> Self { + let mut is = IntervalSet(intervals); + is.coalesce(); + is + } +} +impl From<&[Interval]> for IntervalSet { + fn from(intervals: &[Interval]) -> Self { + let mut is = IntervalSet(intervals.to_vec()); + is.coalesce(); + is + } +} + +impl Not for &IntervalSet { + type Output = IntervalSet; + fn not(self) -> Self::Output { + self.complement() + } +} +impl Add for &IntervalSet { + type Output = IntervalSet; + fn add(self, other: &IntervalSet) -> Self::Output { + self.union(other) + } +} +impl Sub for &IntervalSet { + type Output = IntervalSet; + fn sub(self, other: &IntervalSet) -> Self::Output { + self.difference(other) + } +} +impl BitOr for &IntervalSet { + type Output = IntervalSet; + fn bitor(self, other: &IntervalSet) -> Self::Output { + self.union(other) + } +} +impl BitAnd for &IntervalSet { + type Output = IntervalSet; + fn bitand(self, other: &IntervalSet) -> Self::Output { + self.intersection(other) + } +} +impl Not for IntervalSet { + type Output = IntervalSet; + fn not(self) -> Self::Output { + self.complement() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + macro_rules! intv { + ( $x:literal, $y:literal ) => { + Interval::new( + Utc.ymd(2022, 1, 1).and_hms($x, 0, 0), + Utc.ymd(2022, 1, 1).and_hms($y, 0, 0), + ) + }; + } + + /* + Interval Set + */ + + #[test] + fn test_intervalset_difference() { + let isa = IntervalSet(vec![intv!(1, 3), intv!(5, 6)]); + + // Removing the entire span + let full = IntervalSet(vec![intv!(1, 6)]); + assert_eq!(isa.difference(&full), IntervalSet(vec![])); + assert_eq!( + isa.difference(&IntervalSet(vec![intv!(2, 5)])), + IntervalSet(vec![intv!(1, 2), intv!(5, 6)]) + ); + + // TODO need more tests here + } + + #[test] + fn test_intervalset_complement() { + // Complement's complement is the same + let is = IntervalSet(vec![intv!(2, 5), intv!(8, 20)]); + assert_eq!(is.complement().complement(), is); + + // Complement with one end at min time + let is = IntervalSet(vec![ + Interval::new( + DateTime::::MIN_UTC, + Utc.ymd(2021, 12, 1).and_hms(0, 0, 0), + ), + intv!(8, 20), + ]); + assert_eq!(is.complement().complement(), is); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..72fb508 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,27 @@ +#![allow(unused_imports)] +#![allow(dead_code)] + +use anyhow::{anyhow, Result}; +use chrono::prelude::*; +use chrono::{Duration, TimeZone}; +use chrono_tz::Tz; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use tokio::sync::{mpsc, oneshot}; + +use crate::calendar::*; +use crate::interval::*; +use crate::interval_set::*; +use crate::requirement::*; +use crate::schedule::*; +use crate::task::*; + +pub type Resource = String; +pub type TaskDetails = serde_json::Value; + +pub mod calendar; +pub mod interval; +pub mod interval_set; +pub mod requirement; +pub mod schedule; +pub mod task; diff --git a/src/requirement.rs b/src/requirement.rs new file mode 100644 index 0000000..0844058 --- /dev/null +++ b/src/requirement.rs @@ -0,0 +1,179 @@ +use super::*; +use std::path::Path; + +pub trait Satisfiable { + /// Returns true if the requirement is satisfied now + fn is_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool; + + /// Returns true if the requirement could be satisfied at some point + /// in time + fn can_be_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool; +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum AggregateRequirement { + All(Vec>), + Any(Vec>), + None(Vec>), +} + +impl Satisfiable for AggregateRequirement { + fn is_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool { + match self { + AggregateRequirement::All(reqs) => reqs + .iter() + .all(|x| x.is_satisfied(time, schedule, available)), + AggregateRequirement::Any(reqs) => reqs + .iter() + .any(|x| x.is_satisfied(time, schedule, available)), + AggregateRequirement::None(reqs) => !reqs + .iter() + .any(|x| x.is_satisfied(time, schedule, available)), + } + } + + fn can_be_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool { + match self { + AggregateRequirement::All(reqs) => reqs + .iter() + .all(|x| x.can_be_satisfied(time, schedule, available)), + AggregateRequirement::Any(reqs) => reqs + .iter() + .any(|x| x.can_be_satisfied(time, schedule, available)), + AggregateRequirement::None(reqs) => !reqs + .iter() + .any(|x| x.can_be_satisfied(time, schedule, available)), + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "snake_case", untagged)] +pub enum SingleRequirement { + Offset { resource: String, offset: i32 }, + File { path: String }, +} + +impl Satisfiable for SingleRequirement { + fn is_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool { + match self { + //SingleRequirement::ResourceInterval { .. } => true, + SingleRequirement::Offset { resource, offset } => { + let intv = schedule.interval(*time, *offset); + match available.get(resource) { + Some(is) => is.has_subset(intv), + None => false, + } + } + SingleRequirement::File { path } => Path::new(path).exists(), + } + } + + fn can_be_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool { + match self { + SingleRequirement::Offset { resource, offset } => { + let intv = schedule.interval(*time, *offset); + match available.get(resource) { + Some(is) => is.has_subset(intv), + None => false, + } + } + SingleRequirement::File { .. } => true, + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[serde(untagged)] +pub enum Requirement { + One(SingleRequirement), + Group(AggregateRequirement), +} + +impl Satisfiable for Requirement { + fn is_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool { + match self { + Requirement::One(req) => req.is_satisfied(time, schedule, available), + Requirement::Group(req) => req.is_satisfied(time, schedule, available), + } + } + + fn can_be_satisfied( + &self, + time: &DateTime, + schedule: &Schedule, + available: &HashMap, + ) -> bool { + match self { + Requirement::One(req) => req.can_be_satisfied(time, schedule, available), + Requirement::Group(req) => req.can_be_satisfied(time, schedule, available), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_complex_parse() { + let json = r#"{ + "any": [ + { "all": [ + { "resource": "resource_a", "offset": -1 }, + { "resource": "resource_b", "offset": -1 } + ] + }, + { "type": "file", "path": "/mnt/test/data_${yyyy}{$mm}{$dd}" } + ] + }"#; + let res: serde_json::Result = serde_json::from_str(json); + assert!(res.is_ok()); + } + + #[test] + fn check_simple_parse() { + let json = r#"{ "type": "file", "path": "/mnt/test/data_${yyyy}{$mm}{$dd}" }"#; + let res: serde_json::Result = serde_json::from_str(json); + println!("{:?}", res); + assert!(res.is_ok()); + } + + // TODO Add tests for satisfies +} diff --git a/src/schedule.rs b/src/schedule.rs new file mode 100644 index 0000000..6fce3f1 --- /dev/null +++ b/src/schedule.rs @@ -0,0 +1,371 @@ +use super::*; +use std::collections::HashSet; + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct Schedule { + calendar: Calendar, + times: Vec, + timezone: Tz, +} + +impl Schedule { + pub fn new(calendar: Calendar, times: Vec, timezone: Tz) -> Self { + let uniq: HashSet = HashSet::from_iter(times.iter().cloned()); + let mut times = Vec::from_iter(uniq.iter().cloned()); + times.sort(); + Schedule { + calendar, + times, + timezone, + } + } + + pub fn generate(&self, start: DateTime, end: DateTime) -> Vec> { + let st = start.with_timezone(&self.timezone); + let et = end.with_timezone(&self.timezone); + + let mut date = st.date().naive_local(); + let end_date = et.date().succ().naive_local(); + + let mut times = Vec::new(); + while date < end_date { + if self.calendar.includes(date) { + for time in &self.times { + let dt = self + .timezone + .from_local_datetime(&date.and_time(*time)) + .unwrap(); + if dt > start && dt <= end { + times.push(dt.with_timezone(&Utc)); + } else if end < dt { + break; + } + } + } + date = date.succ(); + } + + times + } + + pub fn interval_utc(&self, dt: DateTime, offset: i32) -> Interval { + // Need to get the current interval, then offset it + let at = dt.with_timezone(&self.timezone); + let rt = if self.times.iter().any(|x| *x == at.time()) { + at + } else { + self.prev_time(at) + }; + + let start = self.offset(rt, offset); + Interval::new( + start.with_timezone(&Utc), + self.next_time(start).with_timezone(&Utc), + ) + } + + pub fn interval(&self, dt: DateTime, offset: i32) -> Interval { + // Need to get the current interval, then offset it + let at = dt.with_timezone(&self.timezone); + let rt = if self.times.iter().any(|x| *x == at.time()) { + at + } else { + self.prev_time(at) + }; + + let start = self.offset(rt, offset); + Interval::new( + start.with_timezone(&Utc), + self.next_time(start).with_timezone(&Utc), + ) + } + + pub fn next_time(&self, dt: DateTime) -> DateTime { + let st = dt.with_timezone(&self.timezone); + + let mut date = st.date().naive_local(); + let mut time = st.time(); + + // Handle case where we're not on a valid date + if !self.calendar.includes(date) { + date = self.calendar.next(date); + time = self.times[0] - Duration::milliseconds(1); + } + + // Figure out the time slot + let time = match self.times.iter().find(|x| **x > time) { + Some(t) => date.and_time(*t), + None => self + .calendar + .next(date) + .and_time(*self.times.first().unwrap()), + }; + + // Cast into a timezone + self.timezone.from_local_datetime(&time).unwrap() + } + + /// Given a time, generate the preceding interval according to the schedule + pub fn prev_time(&self, dt: DateTime) -> DateTime { + let st = dt.with_timezone(&self.timezone); + + let mut date = st.date().naive_local(); + let mut time = st.time(); + + // Handle case where we're not on a valid date + if !self.calendar.includes(date) { + date = self.calendar.prev(date); + time = *self.times.last().unwrap() + Duration::milliseconds(1); + } + + // Figure out the time slot + let time = match self.times.iter().rev().find(|x| **x < time) { + Some(t) => date.and_time(*t), + None => self + .calendar + .prev(date) + .and_time(*self.times.last().unwrap()), + }; + + // Cast into a timezone + self.timezone.from_local_datetime(&time).unwrap() + } + + /// Given a timestamp, return the scheduled time `offset` + pub fn offset(&self, mut dt: DateTime, offset: i32) -> DateTime { + if offset > 0 { + for _ in 0..offset { + dt = self.next_time(dt); + } + } else { + for _ in offset..0 { + dt = self.prev_time(dt); + } + } + dt + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_simple_generation() { + let timezone = chrono_tz::America::Halifax; + let sched = Schedule { + calendar: Calendar::new(), + times: vec![ + NaiveTime::from_hms(10, 30, 0), + NaiveTime::from_hms(11, 30, 0), + ], + timezone, + }; + + // Simple generation + let times = sched.generate( + timezone + .ymd(2022, 1, 3) + .and_hms(11, 0, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 3) + .and_hms(12, 0, 0) + .with_timezone(&Utc), + ); + + assert_eq!(times.len(), 1); + assert_eq!( + times, + vec![timezone + .ymd(2022, 1, 3) + .and_hms(11, 30, 0) + .with_timezone(&Utc),] + ); + + // Generating scheduled times over a timerange + assert_eq!( + sched.generate( + timezone + .ymd(2021, 12, 31) + .and_hms(0, 0, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 5) + .and_hms(0, 0, 0) + .with_timezone(&Utc), + ), + vec![ + timezone + .ymd(2021, 12, 31) + .and_hms(10, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2021, 12, 31) + .and_hms(11, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 3) + .and_hms(10, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 3) + .and_hms(11, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 4) + .and_hms(10, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 4) + .and_hms(11, 30, 0) + .with_timezone(&Utc), + ] + ); + } + + #[test] + fn check_prev() { + let timezone = chrono_tz::America::Halifax; + let sched = Schedule { + calendar: Calendar::new(), + times: vec![ + NaiveTime::from_hms(10, 30, 0), + NaiveTime::from_hms(11, 30, 0), + ], + timezone, + }; + + assert_eq!( + sched.prev_time(timezone.ymd(2022, 1, 3).and_hms(11, 0, 0)), + timezone.ymd(2022, 1, 3).and_hms(10, 30, 0) + ); + assert_eq!( + sched.prev_time(timezone.ymd(2022, 1, 3).and_hms(11, 30, 0)), + timezone.ymd(2022, 1, 3).and_hms(10, 30, 0) + ); + } + + #[test] + fn check_offset() { + let timezone = chrono_tz::America::Halifax; + let sched = Schedule { + calendar: Calendar::new(), + times: vec![ + NaiveTime::from_hms(10, 30, 0), + NaiveTime::from_hms(11, 30, 0), + ], + timezone, + }; + + // Asking for no offset should yield the same time + assert_eq!( + sched.offset(timezone.ymd(2022, 1, 3).and_hms(11, 0, 0), 0), + timezone.ymd(2022, 1, 3).and_hms(11, 0, 0) + ); + + // -1 is equivalent to prev + let test_time = timezone.ymd(2022, 1, 3).and_hms(11, 0, 0); + assert_eq!(sched.offset(test_time, -1), sched.prev_time(test_time)); + assert_eq!(sched.offset(test_time, 1), sched.next_time(test_time)); + } + + #[test] + fn check_next() { + let timezone = chrono_tz::America::Halifax; + let sched = Schedule { + calendar: Calendar::new(), + times: vec![ + NaiveTime::from_hms(10, 30, 0), + NaiveTime::from_hms(11, 30, 0), + ], + timezone, + }; + + assert_eq!( + sched.next_time(timezone.ymd(2022, 1, 3).and_hms(11, 0, 0)), + timezone.ymd(2022, 1, 3).and_hms(11, 30, 0) + ); + assert_eq!( + sched.next_time(timezone.ymd(2022, 1, 3).and_hms(11, 30, 0)), + timezone.ymd(2022, 1, 4).and_hms(10, 30, 0) + ); + } + + #[test] + fn check_transivity() { + let timezone = chrono_tz::America::Halifax; + let sched = Schedule { + calendar: Calendar::new(), + times: vec![ + NaiveTime::from_hms(10, 30, 0), + NaiveTime::from_hms(11, 30, 0), + ], + timezone, + }; + + // prev and next are reversible + let dt = sched.prev_time(timezone.ymd(2022, 1, 3).and_hms(11, 0, 0)); // 10:30 -> 11:30 + assert_eq!(dt, sched.prev_time(sched.next_time(dt))); + } + + #[test] + fn check_interval() { + let timezone = chrono_tz::America::Halifax; + let sched = Schedule { + calendar: Calendar::new(), + times: vec![ + NaiveTime::from_hms(10, 30, 0), + NaiveTime::from_hms(11, 30, 0), + ], + timezone, + }; + + // prev and next are reversible + let dt = timezone.ymd(2022, 1, 3).and_hms(11, 0, 0); + assert_eq!( + sched.interval(dt, 0), + Interval::new( + timezone + .ymd(2022, 1, 3) + .and_hms(10, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 3) + .and_hms(11, 30, 0) + .with_timezone(&Utc) + ) + ); + + // Previous + assert_eq!( + sched.interval(dt, -1), + Interval::new( + timezone + .ymd(2021, 12, 31) + .and_hms(11, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 3) + .and_hms(10, 30, 0) + .with_timezone(&Utc) + ) + ); + + // Next + assert_eq!( + sched.interval(dt, 1), + Interval::new( + timezone + .ymd(2022, 1, 3) + .and_hms(11, 30, 0) + .with_timezone(&Utc), + timezone + .ymd(2022, 1, 4) + .and_hms(10, 30, 0) + .with_timezone(&Utc) + ) + ); + } +} diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..1c5e8b4 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,413 @@ +use super::*; + +fn default_bytes() -> usize { + 20480 +} + +/// Options in how to handle task output. Some tasks can be quite +/// verbose, and the output may not be needed. +#[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)] +#[serde(deny_unknown_fields)] +pub struct TaskOutputOptions { + /// If true, output from successful tasks is discarded entirely, in + /// keeping with the UNIX philosophy of no news is good news + #[serde(default)] + pub discard_successful: bool, + + /// If true, and output is not discarded, truncate the output of + /// each task to a maximum of the first / last `preserve` kb of + /// data + #[serde(default)] + pub truncate: bool, + + /// Number of KB of output to preserve at the beginning of the ouptut + #[serde(default = "default_bytes")] + pub head_bytes: usize, + + /// Number of KB of output to preserve at the end of the outut + #[serde(default = "default_bytes")] + pub tail_bytes: usize, +} + +impl Default for TaskOutputOptions { + fn default() -> Self { + TaskOutputOptions { + discard_successful: true, + truncate: true, + head_bytes: default_bytes(), + tail_bytes: default_bytes(), + } + } +} + +/// Defines the struct to parse for tasks +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +#[serde(deny_unknown_fields)] +pub struct TaskDefinition { + /// Command to run to generate the resources for the given interval + pub up: TaskDetails, + + /// Command to run to remove the resource for the given interval + /// If None, no additional action will happen when an interval goes stale + #[serde(default)] + pub down: Option, + + /// Command to run to verify the resources exist and are correct. + /// Run before `up` to see if needed, and after `up` to verify output + /// If None, no check is run to see if up needs to run, and no post-up check occurs + /// to verify up succeeded + #[serde(default)] + pub check: Option, + + /// Number of seconds + #[serde(default)] + pub alert_delay_seconds: Option, + + #[serde(default)] + pub provides: HashSet, + + #[serde(default)] + pub requires: Vec, + + pub calendar_name: String, + pub times: Vec, + pub timezone: Tz, + + pub valid_from: NaiveDateTime, + + #[serde(default)] + pub valid_to: Option, +} + +impl TaskDefinition { + pub fn to_task(&self, calendar: &Calendar) -> Task { + let schedule = Schedule::new(calendar.clone(), self.times.clone(), self.timezone); + /* + The valid_{from,to} interval must be aligned to the actual schedule + */ + let start = schedule + .interval( + self.timezone.from_local_datetime(&self.valid_from).unwrap(), + 0, + ) + .start; + + let end = match self.valid_to { + Some(nt) => self.timezone.from_local_datetime(&nt).unwrap(), + None => DateTime::::MAX_UTC.with_timezone(&self.timezone), + }; + + let actual_end = schedule.interval(end, 0).start; + + Task { + up: self.up.clone(), + down: self.down.clone(), + check: self.check.clone(), + + provides: self.provides.clone(), + requires: self.requires.clone(), + + schedule: schedule, + valid_over: IntervalSet::from(vec![Interval::new(start, actual_end)]), + timezone: self.timezone, + } + } +} + +/* + No need for serialize / deserialize here, since we don't + need to transmit it anywhere. It is reconstituted by the + definition +*/ +#[derive(Clone, Serialize, Debug)] +pub struct Task { + pub up: TaskDetails, + pub down: Option, + pub check: Option, + + pub provides: HashSet, + pub requires: Vec, + + pub schedule: Schedule, + pub valid_over: IntervalSet, + pub timezone: Tz, +} + +// Really need to rethink this valid_over and scheduling times. When generating + +impl Task { + pub fn generate_times( + &self, + required: &HashMap, + ) -> Result>> { + // Ensure that all intervals that are required are provided by this instance + let reqs: Vec = self + .provides + .iter() + .map(|res| { + if let Some(is) = required.get(res) { + is.intersection(&self.valid_over) + } else { + IntervalSet::new() + } + }) + .collect(); + + if reqs.is_empty() { + Ok(Vec::new()) + } else { + let ris = &reqs[0]; + // Ensure that all intervals are the same + if !reqs[1..].iter().all(|is| is == ris) { + Err(anyhow!( + "Task produces multiple resources, but intervals are not consistent across needs" + )) + } else { + Ok(ris.iter().fold(Vec::new(), |mut acc, intv| { + acc.extend(self.schedule.generate( + std::cmp::max(intv.start, self.valid_over.start().unwrap()), + std::cmp::min(intv.end, self.valid_over.end().unwrap()), + )); + acc + })) + } + } + } + + pub fn validity(&self, max_time: DateTime) -> IntervalSet { + if self.valid_over.is_empty() { + IntervalSet::new() + } else { + let timeline = + IntervalSet::from(vec![Interval::new(self.valid_over[0].start, max_time)]); + self.valid_over.intersection(&timeline) + } + } + + /// Returns true if this task can provide any resource that isn't currently available + /// as of the specified time + pub fn is_needed(&self, time: &DateTime, available: &HashMap) -> bool { + let end_dt = time.with_timezone(&Utc); + let horizon_is = self + .valid_over + .difference(&IntervalSet::from(vec![Interval::new( + end_dt, + DateTime::::MAX_UTC, + )])); + self.provides.iter().all(|res| { + if let Some(is) = available.get(res) { + !(&horizon_is - is).is_empty() + } else { + false + } + }) + } + + /// Returns true if all requirements are satisfied + pub fn can_run(&self, time: DateTime, available: &HashMap) -> bool { + let local_time = time.with_timezone(&self.timezone); + self.requires + .iter() + .all(|req| req.is_satisfied(&local_time, &self.schedule, available)) + } + + pub fn can_be_satisfied( + &self, + time: DateTime, + available: &HashMap, + ) -> bool { + let local_time = time.with_timezone(&self.timezone); + self.requires + .iter() + .all(|req| req.can_be_satisfied(&local_time, &self.schedule, available)) + } + + pub fn up(&self, interval: &Interval) -> Result> { + if self.check(interval) { + Ok(self.provides.clone()) + } else { + Ok(HashSet::new()) + } + } + + pub fn check(&self, _interval: &Interval) -> bool { + true + } + + pub fn down(&self, _interval: &Interval) -> Result> { + Ok(HashSet::new()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono_tz::America::Halifax; + + macro_rules! isv { + ( $x:literal, $y:literal ) => { + IntervalSet::from(vec![Interval::new( + Utc.ymd(2022, 1, $x).and_hms(0, 0, 0), + Utc.ymd(2022, 1, $y).and_hms(0, 0, 0), + )]) + }; + } + + #[test] + fn check_task_can_parse() { + // Spans a weekend + let task_json = r#" + { + "up": "/usr/bin/touch /tmp/a_${yyyymmdd}_${hhmmss}", + "down": "/usr/bin/rm /tmp/a_${yyyymmdd}_${hhmmss}", + "check": "/usr/bin/test -e /tmp/a_${yyyymmdd}_${hhmmss}", + "provides": [ + "resource_a", + "resource_b" + ], + "requires": [ + { "resource": "alpha", "offset": 0 }, + { "resource": "beta", "offset": -1 } + ], + "calendar_name": "std", + "times": [ "09:00:00", "13:00:00", "15:00:00" ], + "timezone": "America/Halifax", + "valid_from": "2022-01-05T12:30:00", + "valid_to": "2022-01-11T00:00:00" + } + "#; + + let task_def: TaskDefinition = serde_json::from_str(task_json).unwrap(); + + // Produces a std + let cal = Calendar::new(); + + let task = task_def.to_task(&cal); + + // Assert the valid interval is correct + assert_eq!( + task.valid_over, + IntervalSet::from(vec![Interval::new( + Halifax.ymd(2022, 1, 5).and_hms(9, 0, 0), + Halifax.ymd(2022, 1, 10).and_hms(15, 0, 0) + )]) + ); + + // No times when out of validity + let times = task + .generate_times(&HashMap::from([ + ("resource_a".to_owned(), isv!(13, 20)), + ("resource_b".to_owned(), isv!(13, 20)), + ])) + .unwrap(); + assert!(times.is_empty()); + + // Requiring within a valid time range generates times + let times = task + .generate_times(&HashMap::from([ + ("resource_a".to_owned(), isv!(6, 8)), + ("resource_b".to_owned(), isv!(6, 8)), + ])) + .unwrap(); + assert_eq!(times.len(), 6); + + // Raise error if unequal requirements + let res = task.generate_times(&HashMap::from([ + ("resource_a".to_owned(), isv!(6, 7)), + ("resource_b".to_owned(), isv!(6, 8)), + ])); + assert!(res.is_err()); + + // Require that all times generated be within the + // valid_over + let res = task.generate_times(&HashMap::from([ + ("resource_a".to_owned(), isv!(1, 30)), + ("resource_b".to_owned(), isv!(1, 30)), + ])); + assert!(res.is_ok()); + let times = res.unwrap(); + + assert!(times.iter().all(|time| task.valid_over.contains(*time))); + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TaskAttempt { + #[serde(default)] + pub task_name: String, + + #[serde(default = "chrono::Utc::now")] + pub scheduled_time: DateTime, + + #[serde(default = "chrono::Utc::now")] + pub start_time: DateTime, + + #[serde(default = "chrono::Utc::now")] + pub stop_time: DateTime, + + #[serde(default)] + pub succeeded: bool, + + #[serde(default)] + pub killed: bool, + + #[serde(default)] + pub infra_failure: bool, + + #[serde(default)] + pub output: String, + + #[serde(default)] + pub error: String, + + #[serde(default)] + pub executor: Vec, + + #[serde(default)] + pub exit_code: i32, + + /// as a percentage + #[serde(default)] + pub max_cpu: f32, + + /// as a percentage + #[serde(default)] + pub avg_cpu: f32, + + /// In bytes + #[serde(default)] + pub max_rss: u64, + + /// In bytes + #[serde(default)] + pub avg_rss: f32, +} + +impl Default for TaskAttempt { + fn default() -> Self { + TaskAttempt { + task_name: String::new(), + scheduled_time: Utc::now(), + start_time: Utc::now(), + stop_time: Utc::now(), + succeeded: false, + killed: false, + infra_failure: false, + output: "".to_owned(), + error: "".to_owned(), + executor: Vec::new(), + exit_code: 0i32, + max_cpu: 0.0, + avg_cpu: 0.0, + max_rss: 0, + avg_rss: 0.0, + } + } +} + +impl TaskAttempt { + #[must_use] + pub fn new() -> Self { + TaskAttempt::default() + } +}