Adding force-down option

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-06 14:51:06 -03:00
parent 2c96b16ec8
commit 03412bb79d
3 changed files with 106 additions and 29 deletions
+6
View File
@@ -126,6 +126,12 @@ impl IntervalSet {
pub fn difference(&self, other: &Self) -> Self { pub fn difference(&self, other: &Self) -> Self {
self.intersection(&other.complement()) self.intersection(&other.complement())
} }
/// Subtract all intervals in `other` from self
/// both sides must be sorted
pub fn subtract(&mut self, other: &Self) {
self.0 = self.difference(other).0;
}
} }
impl Deref for IntervalSet { impl Deref for IntervalSet {
type Target = Vec<Interval>; type Target = Vec<Interval>;
+62 -15
View File
@@ -32,18 +32,26 @@ pub struct Action {
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum RunnerMessage { pub enum RunnerMessage {
Tick, Tick,
ActionCompleted { action_id: usize, succeeded: bool }, ActionCompleted {
RetryAction { action_id: usize }, action_id: usize,
succeeded: bool,
},
RetryAction {
action_id: usize,
},
/* /*
/// Marks all resources in the set available over the interval
ForceUp { ForceUp {
resources: HashSet<String>, resources: HashSet<String>,
interval: Interval, interval: Interval,
}, },
*/
/// Marks all resources in the set as down over _at least_ the interval.
/// Will cause a re-check / re-gen
ForceDown { ForceDown {
resources: HashSet<String>, resources: HashSet<String>,
interval: Interval, interval: Interval,
}, },
*/
Stop, Stop,
} }
@@ -334,6 +342,37 @@ impl Runner {
self.events self.events
.push(delayed_event(Duration::seconds(5), RunnerMessage::Tick)); .push(delayed_event(Duration::seconds(5), RunnerMessage::Tick));
} }
/*
Some(Ok(RunnerMessage::ForceUp {
resources,
interval,
})) => {
}
*/
Some(Ok(RunnerMessage::ForceDown {
resources,
interval,
})) => {
// Use the interval to identify
for (tid, task) in self.tasks.iter().enumerate() {
if task.provides.is_subset(&resources) {
let aligned_is =
IntervalSet::from(task.schedule.align_interval(interval));
for resource in &task.provides {
self.current
.get_mut(resource)
.unwrap()
.subtract(&aligned_is);
}
for action in &mut self.actions {
if action.task == tid && aligned_is.has_subset(action.interval) {
action.state = ActionState::Queued;
}
}
}
}
self.store_state();
}
Some(Ok(RunnerMessage::Stop)) => { Some(Ok(RunnerMessage::Stop)) => {
info!("Stopping"); info!("Stopping");
break; break;
@@ -347,6 +386,18 @@ impl Runner {
action_id, action_id,
succeeded, succeeded,
})) => { })) => {
self.complete_task(action_id, succeeded);
}
Some(Err(e)) => {
panic!("Something went wrong: {:?}", e)
}
None => {}
}
// Log stuff
}
}
fn complete_task(&mut self, action_id: usize, succeeded: bool) {
info!("Completing action {}", action_id); info!("Completing action {}", action_id);
if succeeded { if succeeded {
let action = &mut self.actions[action_id]; let action = &mut self.actions[action_id];
@@ -358,11 +409,7 @@ impl Runner {
.or_insert(IntervalSet::new()) .or_insert(IntervalSet::new())
.insert(action.interval); .insert(action.interval);
} }
self.storage self.store_state();
.send(StorageMessage::StoreState {
state: self.current.clone(),
})
.unwrap();
self.queue_actions(); self.queue_actions();
} else { } else {
self.events.push(delayed_event( self.events.push(delayed_event(
@@ -371,13 +418,13 @@ impl Runner {
)); ));
} }
} }
Some(Err(e)) => {
panic!("Something went wrong: {:?}", e) fn store_state(&self) {
} self.storage
None => {} .send(StorageMessage::StoreState {
} state: self.current.clone(),
// Log stuff })
} .unwrap();
} }
fn queue_actions(&mut self) { fn queue_actions(&mut self) {
+27 -3
View File
@@ -21,6 +21,32 @@ impl Schedule {
} }
} }
fn is_end_time<T: TimeZone>(&self, dt: DateTime<T>) -> bool {
// Need to get the current interval, then offset it
let at = dt.with_timezone(&self.timezone);
self.times.iter().any(|x| *x == at.time())
&& self.calendar.includes(at.date().naive_local())
}
/// Given an interval I, return the interval J that is the smallest
/// set of schedule intervals that completely contain I.
/// If the given interval is bounded by MIN_TIME or MAX_TIME, then the
/// returned interval will be likewise bounded
pub fn align_interval(&self, interval: Interval) -> Interval {
let st = if interval.start == MIN_TIME {
self.next_time(interval.start).with_timezone(&Utc)
} else {
interval.start
};
let et = if interval.end == MAX_TIME {
self.prev_time(interval.end).with_timezone(&Utc)
} else {
interval.end
};
Interval::new(self.interval(st, 0).start, self.interval(et, 0).end)
}
pub fn generate(&self, interval: Interval) -> Vec<Interval> { pub fn generate(&self, interval: Interval) -> Vec<Interval> {
if self.times.is_empty() { if self.times.is_empty() {
return Vec::new(); return Vec::new();
@@ -67,9 +93,7 @@ impl Schedule {
let at = dt.with_timezone(&self.timezone); let at = dt.with_timezone(&self.timezone);
// If the time is at an edge // If the time is at an edge
let rt = if self.times.iter().any(|x| *x == at.time()) let rt = if self.is_end_time(at) {
&& self.calendar.includes(at.date().naive_local())
{
at at
} else { } else {
self.next_time(at) self.next_time(at)