Adding task_set

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-09-29 15:17:07 -03:00
parent 1746470367
commit 5d0ec03804
3 changed files with 94 additions and 5 deletions
+10
View File
@@ -20,6 +20,9 @@ use crate::storage::*;
use crate::task::*; use crate::task::*;
use crate::varmap::*; use crate::varmap::*;
const MAX_TIME: DateTime<Utc> = chrono::DateTime::<Utc>::MAX_UTC;
const MIN_TIME: DateTime<Utc> = chrono::DateTime::<Utc>::MIN_UTC;
pub type Resource = String; pub type Resource = String;
pub type TaskDetails = serde_json::Value; pub type TaskDetails = serde_json::Value;
@@ -32,4 +35,11 @@ pub mod resource_interval;
pub mod schedule; pub mod schedule;
pub mod storage; pub mod storage;
pub mod task; pub mod task;
pub mod task_set;
pub mod varmap; pub mod varmap;
/*
TODO:
target_state -> TaskSet.coverage()
current state
*/
+2 -5
View File
@@ -54,7 +54,7 @@ impl TaskDefinition {
let end = match self.valid_to { let end = match self.valid_to {
Some(nt) => self.timezone.from_local_datetime(&nt).unwrap(), Some(nt) => self.timezone.from_local_datetime(&nt).unwrap(),
None => DateTime::<Utc>::MAX_UTC.with_timezone(&self.timezone), None => MAX_TIME.with_timezone(&self.timezone),
}; };
let actual_end = schedule.interval(end, 0).start; let actual_end = schedule.interval(end, 0).start;
@@ -148,10 +148,7 @@ impl Task {
let end_dt = time.with_timezone(&Utc); let end_dt = time.with_timezone(&Utc);
let horizon_is = self let horizon_is = self
.valid_over .valid_over
.difference(&IntervalSet::from(vec![Interval::new( .difference(&IntervalSet::from(vec![Interval::new(end_dt, MAX_TIME)]));
end_dt,
DateTime::<Utc>::MAX_UTC,
)]));
self.provides.iter().all(|res| { self.provides.iter().all(|res| {
if let Some(is) = available.get(res) { if let Some(is) = available.get(res) {
!(horizon_is.difference(is)).is_empty() !(horizon_is.difference(is)).is_empty()
+82
View File
@@ -0,0 +1,82 @@
use super::*;
use std::ops::{Deref, DerefMut};
pub enum ActionState {
Queued,
Running,
Errored,
Completed,
}
pub struct Action {
task: String,
interval: Interval,
state: ActionState,
}
pub struct TaskSet(HashMap<String, Task>);
impl TaskSet {
pub fn new() -> Self {
TaskSet(HashMap::new())
}
pub fn coverage(&self) -> Result<ResourceInterval> {
self.get_state(MAX_TIME)
}
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> Result<ResourceInterval> {
let mut res = ResourceInterval::new();
let timeline = IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)));
// Insert all of the covered items
for task in self.values() {
let task_timeline = task.valid_over.intersection(&timeline);
for resource in &task.provides {
let ris = res.entry(resource.clone()).or_insert(IntervalSet::new());
let already_provided = ris.intersection(&task_timeline);
if !already_provided.is_empty() {
return Err(anyhow!(
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
resource,
already_provided
));
}
ris.merge(&task_timeline);
}
}
Ok(res)
}
pub fn get_actions(&self, required: &ResourceInterval) -> Result<Vec<Action>> {
let mut actions = Vec::new();
for (name, task) in self.iter() {
let new_actions: Vec<Action> = task
.generate_intervals(required)?
.into_iter()
.map(|interval| Action {
task: name.clone(),
interval,
state: ActionState::Queued,
})
.collect();
actions.extend(new_actions);
}
Ok(actions)
}
}
impl Deref for TaskSet {
type Target = HashMap<String, Task>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for TaskSet {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}