Fixing issues with schedule generation

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-04 12:18:24 -03:00
parent 26aa4dd4ea
commit 3d680bf5da
4 changed files with 169 additions and 83 deletions
+1 -40
View File
@@ -31,7 +31,7 @@ impl ResourceInterval {
ResourceInterval(res) ResourceInterval(res)
} }
pub fn difference(&mut self, other: &ResourceInterval) -> Self { pub fn difference(&self, other: &ResourceInterval) -> Self {
let res: HashMap<Resource, IntervalSet> = self let res: HashMap<Resource, IntervalSet> = self
.0 .0
.iter() .iter()
@@ -71,45 +71,6 @@ impl From<&HashMap<Resource, IntervalSet>> for ResourceInterval {
} }
} }
/*
impl Add for &ResourceInterval {
type Output = ResourceInterval;
fn add(self, other: &ResourceInterval) -> Self::Output {
let res: HashMap<Resource, IntervalSet> =
other.0.iter().fold(self.0.clone(), |mut acc, (res, is)| {
acc.entry(res.clone())
.or_insert(IntervalSet::new())
.merge(is);
acc
});
ResourceInterval(res)
}
}
impl Sub for &ResourceInterval {
type Output = ResourceInterval;
fn sub(self, other: &ResourceInterval) -> Self::Output {
let res: HashMap<Resource, IntervalSet> = self
.0
.iter()
.map(|(res, is)| {
(
res.clone(),
is.difference(other.get(res).unwrap_or(&IntervalSet::new())),
)
})
.collect();
ResourceInterval(res)
}
}
impl AsRef<Self> for ResourceInterval {
fn as_ref(&self) -> &Self {
self
}
}
*/
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
+46 -14
View File
@@ -49,6 +49,8 @@ pub struct Runner {
vars: VarMap, vars: VarMap,
output_options: TaskOutputOptions, output_options: TaskOutputOptions,
// States
end_state: ResourceInterval,
target: ResourceInterval, target: ResourceInterval,
current: ResourceInterval, current: ResourceInterval,
@@ -191,13 +193,13 @@ impl Runner {
} }
} }
let target = tasks.get_state(Utc::now())?; let end_state = tasks.coverage()?;
let mut runner = Runner { let mut runner = Runner {
tasks, tasks,
vars, vars,
output_options, output_options,
target, end_state,
target: ResourceInterval::new(),
current: ResourceInterval::new(), current: ResourceInterval::new(),
queue: Vec::new(), queue: Vec::new(),
qidx: 0, qidx: 0,
@@ -206,12 +208,22 @@ impl Runner {
executor, executor,
}; };
runner.tick()?;
Ok(runner)
}
pub fn tick(&mut self) -> Result<()> {
let target = self.tasks.get_state(Utc::now())?;
// Create queue // Create queue
let required = runner.target.difference(&runner.current); let required = target.difference(&self.current);
runner.queue = runner println!("REQ {:?}", required);
.tasks for (name, task) in self.tasks.iter() {
.iter() let res = IntervalSet::from(task.generate_intervals(&required).unwrap());
.fold(Vec::new(), |mut acc, (name, task)| { println!("GEN ({}): {:?}", name, res);
}
self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| {
let res: Vec<Action> = task let res: Vec<Action> = task
.generate_intervals(&required) .generate_intervals(&required)
.unwrap() .unwrap()
@@ -228,24 +240,44 @@ impl Runner {
acc acc
}); });
let unsatisfied = runner // Ensure that all actions can be satisfied
let unsatisfied = self
.queue .queue
.iter() .iter()
.filter(|act| { .filter(|act| {
!runner !self
.tasks .tasks
.get(&act.task) .get(&act.task)
.unwrap() .unwrap()
.can_be_satisfied(act.interval, &runner.target) .can_be_satisfied(act.interval, &target)
}) })
.fold(HashSet::new(), |mut acc, a| { .fold(HashSet::new(), |mut acc, a| {
println!("INVALID: {:?}", a); println!("Task cannot be satisfied: {:?}", a);
acc.insert(a.task.clone()); acc.insert(a.task.clone());
acc acc
}); });
// Ensure current +
let mut result_state = self.current.clone();
for action in &self.queue {
for res in &self.tasks.get(&action.task).unwrap().provides {
result_state
.entry(res.clone())
.or_insert(IntervalSet::new())
.insert(action.interval);
}
}
if result_state != target {
return Err(anyhow!(
"Actions generated produce\n\t{:?}\nExpected\n\t{:?}",
result_state,
target
));
}
if unsatisfied.is_empty() { if unsatisfied.is_empty() {
Ok(runner) self.target = target;
Ok(())
} else { } else {
Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied)) Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied))
} }
@@ -361,7 +393,7 @@ impl Runner {
} }
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
self.target == self.current self.end_state == self.current
} }
} }
+33 -9
View File
@@ -26,8 +26,11 @@ impl Schedule {
return Vec::new(); return Vec::new();
} }
let st = interval.start.with_timezone(&self.timezone); let st = self.interval(interval.start, 0).start;
let et = interval.end.with_timezone(&self.timezone); let et = self.interval(interval.end, 0).end;
//let st = interval.start.with_timezone(&self.timezone);
//let et = interval.end.with_timezone(&self.timezone);
let mut date = self.calendar.prev(st.date().naive_local()); let mut date = self.calendar.prev(st.date().naive_local());
let end_date = self.calendar.next(et.date().succ().naive_local()); let end_date = self.calendar.next(et.date().succ().naive_local());
@@ -58,19 +61,24 @@ impl Schedule {
times times
} }
/// Given a timestamp, return the interval that contains it
pub fn interval<T: TimeZone>(&self, dt: DateTime<T>, offset: i32) -> Interval { pub fn interval<T: TimeZone>(&self, dt: DateTime<T>, offset: i32) -> Interval {
// Need to get the current interval, then offset it // Need to get the current interval, then offset it
let at = dt.with_timezone(&self.timezone); let at = dt.with_timezone(&self.timezone);
let rt = if self.times.iter().any(|x| *x == at.time()) {
// If the time is at an edge
let rt = if self.times.iter().any(|x| *x == at.time())
&& self.calendar.includes(at.date().naive_local())
{
at at
} else { } else {
self.prev_time(at) self.next_time(at)
}; };
let start = self.offset(rt, offset); let end = self.offset(rt, offset);
Interval::new( Interval::new(
start.with_timezone(&Utc), self.prev_time(end).with_timezone(&Utc),
self.next_time(start).with_timezone(&Utc), end.with_timezone(&Utc),
) )
} }
@@ -125,8 +133,9 @@ impl Schedule {
self.timezone.from_local_datetime(&time).unwrap() self.timezone.from_local_datetime(&time).unwrap()
} }
/// Given a timestamp, return the scheduled time `offset` // Given a timestamp, return the scheduled time `offset`
pub fn offset(&self, mut dt: DateTime<Tz>, offset: i32) -> DateTime<Tz> { // A bit dangerous, providing an offset of 0
fn offset(&self, mut dt: DateTime<Tz>, offset: i32) -> DateTime<Tz> {
if offset > 0 { if offset > 0 {
for _ in 0..offset { for _ in 0..offset {
dt = self.next_time(dt); dt = self.next_time(dt);
@@ -357,6 +366,21 @@ mod tests {
timezone, timezone,
}; };
// Weekends are correct
assert_eq!(
sched.interval(timezone.ymd(2022, 1, 1).and_hms(9, 0, 0), 0),
Interval::new(
timezone
.ymd(2021, 12, 31)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
timezone
.ymd(2022, 1, 3)
.and_hms(10, 30, 0)
.with_timezone(&Utc)
)
);
// prev and next are reversible // prev and next are reversible
let dt = timezone.ymd(2022, 1, 3).and_hms(11, 0, 0); let dt = timezone.ymd(2022, 1, 3).and_hms(11, 0, 0);
assert_eq!( assert_eq!(
+73 -4
View File
@@ -43,7 +43,8 @@ impl TaskDefinition {
pub fn to_task(&self, calendar: &Calendar) -> Task { pub fn to_task(&self, calendar: &Calendar) -> Task {
let schedule = Schedule::new(calendar.clone(), self.times.clone(), self.timezone); let schedule = Schedule::new(calendar.clone(), self.times.clone(), self.timezone);
/* /*
The valid_{from,to} interval must be aligned to the actual schedule The valid_{from,to} interval must be aligned to the actual schedule.
They will be adjusted to include any interval who's
*/ */
let start = schedule let start = schedule
.interval( .interval(
@@ -110,7 +111,7 @@ impl Task {
}) })
.collect(); .collect();
if reqs.is_empty() { let res = if reqs.is_empty() {
Ok(Vec::new()) Ok(Vec::new())
} else { } else {
let ris = &reqs[0]; let ris = &reqs[0];
@@ -129,7 +130,8 @@ impl Task {
acc acc
})) }))
} }
} };
res
} }
pub fn validity(&self, max_time: DateTime<Utc>) -> IntervalSet { pub fn validity(&self, max_time: DateTime<Utc>) -> IntervalSet {
@@ -191,7 +193,7 @@ impl Task {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use chrono_tz::America::Halifax; use chrono_tz::America::{Halifax, New_York};
macro_rules! intv { macro_rules! intv {
( $x:literal, $y:literal ) => { ( $x:literal, $y:literal ) => {
@@ -289,6 +291,14 @@ mod tests {
} }
}; };
// Require that all times generated be within the
// valid_over
let mut exact = ResourceInterval::new();
exact.insert(&"resource_a".to_owned(), &task.valid_over.clone());
exact.insert(&"resource_b".to_owned(), &task.valid_over.clone());
let res = IntervalSet::from(task.generate_intervals(&exact).unwrap());
assert_eq!(res, task.valid_over);
// Ensure that the intervals generated over the valid period // Ensure that the intervals generated over the valid period
// exactly cover the valid period // exactly cover the valid period
let mut theoretical = ResourceInterval::new(); let mut theoretical = ResourceInterval::new();
@@ -297,4 +307,63 @@ mod tests {
let generated = IntervalSet::from(task.generate_intervals(&theoretical).unwrap()); let generated = IntervalSet::from(task.generate_intervals(&theoretical).unwrap());
assert_eq!(task.valid_over, generated); assert_eq!(task.valid_over, generated);
} }
#[test]
fn check_task_valid_over() {
let task_json = r#"
{
"up": "/usr/bin/touch /tmp/a_${yyyymmdd}_${hhmmss}",
"down": "/usr/bin/rm /tmp/a_${yyyymmdd}_${hhmmss}",
"check": "/usr/bin/test -e /tmp/a_${yyyymmdd}_${hhmmss}",
"provides": [
"resource_a",
"resource_b"
],
"requires": [
{ "resource": "alpha", "offset": 0 },
{ "resource": "beta", "offset": -1 }
],
"calendar_name": "std",
"times": [ "17:00:00" ],
"timezone": "America/New_York",
"valid_from": "2022-01-04T09:00:00",
"valid_to": "2022-01-07T00:00:00"
}
"#;
let cal = Calendar::new();
{
let task_def: TaskDefinition = serde_json::from_str(task_json).unwrap();
let task = task_def.to_task(&cal);
// Assert the valid interval is correct
assert_eq!(
task.valid_over,
IntervalSet::from(vec![Interval::new(
New_York.ymd(2022, 1, 3).and_hms(17, 0, 0),
New_York.ymd(2022, 1, 6).and_hms(17, 0, 0)
)])
);
}
// Another test with different times
{
let mut task_def: TaskDefinition = serde_json::from_str(task_json).unwrap();
task_def.times = vec![NaiveTime::from_hms(9, 0, 0), NaiveTime::from_hms(12, 0, 0)];
task_def.valid_from = NaiveDate::from_ymd(2022, 1, 1).and_hms(9, 0, 0);
task_def.valid_to = Some(NaiveDate::from_ymd(2022, 1, 7).and_hms(17, 0, 0));
let task = task_def.to_task(&cal);
// Assert the valid interval is correct
assert_eq!(
task.valid_over,
IntervalSet::from(vec![Interval::new(
New_York.ymd(2021, 12, 31).and_hms(12, 0, 0),
New_York.ymd(2022, 1, 7).and_hms(12, 0, 0)
)])
);
}
}
} }