From 2dcb2203e57929a30d7e2f6a1dce0fb3037f447f Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Mon, 3 Oct 2022 16:27:43 -0300 Subject: [PATCH] Adding runner and world definition --- src/interval.rs | 7 + src/lib.rs | 4 + src/requirement.rs | 40 ++-- src/resource_interval.rs | 16 +- src/runner.rs | 446 +++++++++++++++++++++++++++++++++++++++ src/schedule.rs | 18 +- src/task.rs | 10 +- src/task_set.rs | 43 ++-- src/world.rs | 45 ++++ 9 files changed, 548 insertions(+), 81 deletions(-) create mode 100644 src/runner.rs create mode 100644 src/world.rs diff --git a/src/interval.rs b/src/interval.rs index a416ddd..d9b33e6 100644 --- a/src/interval.rs +++ b/src/interval.rs @@ -1,4 +1,5 @@ use super::*; +use std::fmt::Display; use std::ops::{Add, BitAnd, BitOr, Sub}; /* @@ -66,6 +67,12 @@ impl Interval { } } +impl Display for Interval { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "({}, {}]", self.start, self.end) + } +} + impl BitAnd for Interval { type Output = Interval; fn bitand(self, other: Interval) -> Self::Output { diff --git a/src/lib.rs b/src/lib.rs index 6bd6afc..738bb0a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,9 @@ use crate::resource_interval::*; use crate::schedule::*; use crate::storage::*; use crate::task::*; +use crate::task_set::*; use crate::varmap::*; +use crate::world::*; const MAX_TIME: DateTime = chrono::DateTime::::MAX_UTC; const MIN_TIME: DateTime = chrono::DateTime::::MIN_UTC; @@ -32,11 +34,13 @@ pub mod interval; pub mod interval_set; pub mod requirement; pub mod resource_interval; +pub mod runner; pub mod schedule; pub mod storage; pub mod task; pub mod task_set; pub mod varmap; +pub mod world; /* TODO: diff --git a/src/requirement.rs b/src/requirement.rs index 0844058..4edd901 100644 --- a/src/requirement.rs +++ b/src/requirement.rs @@ -5,7 +5,7 @@ pub trait Satisfiable { /// Returns true if the requirement is satisfied now fn is_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool; @@ -14,7 +14,7 @@ pub trait Satisfiable { /// in time fn can_be_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool; @@ -31,39 +31,39 @@ pub enum AggregateRequirement { impl Satisfiable for AggregateRequirement { fn is_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool { match self { AggregateRequirement::All(reqs) => reqs .iter() - .all(|x| x.is_satisfied(time, schedule, available)), + .all(|x| x.is_satisfied(interval, schedule, available)), AggregateRequirement::Any(reqs) => reqs .iter() - .any(|x| x.is_satisfied(time, schedule, available)), + .any(|x| x.is_satisfied(interval, schedule, available)), AggregateRequirement::None(reqs) => !reqs .iter() - .any(|x| x.is_satisfied(time, schedule, available)), + .any(|x| x.is_satisfied(interval, schedule, available)), } } fn can_be_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool { match self { AggregateRequirement::All(reqs) => reqs .iter() - .all(|x| x.can_be_satisfied(time, schedule, available)), + .all(|x| x.can_be_satisfied(interval, schedule, available)), AggregateRequirement::Any(reqs) => reqs .iter() - .any(|x| x.can_be_satisfied(time, schedule, available)), + .any(|x| x.can_be_satisfied(interval, schedule, available)), AggregateRequirement::None(reqs) => !reqs .iter() - .any(|x| x.can_be_satisfied(time, schedule, available)), + .any(|x| x.can_be_satisfied(interval, schedule, available)), } } } @@ -78,14 +78,14 @@ pub enum SingleRequirement { impl Satisfiable for SingleRequirement { fn is_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool { match self { //SingleRequirement::ResourceInterval { .. } => true, SingleRequirement::Offset { resource, offset } => { - let intv = schedule.interval(*time, *offset); + let intv = schedule.interval(interval.end, *offset); match available.get(resource) { Some(is) => is.has_subset(intv), None => false, @@ -97,13 +97,13 @@ impl Satisfiable for SingleRequirement { fn can_be_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool { match self { SingleRequirement::Offset { resource, offset } => { - let intv = schedule.interval(*time, *offset); + let intv = schedule.interval(interval.end, *offset); match available.get(resource) { Some(is) => is.has_subset(intv), None => false, @@ -124,25 +124,25 @@ pub enum Requirement { impl Satisfiable for Requirement { fn is_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool { match self { - Requirement::One(req) => req.is_satisfied(time, schedule, available), - Requirement::Group(req) => req.is_satisfied(time, schedule, available), + Requirement::One(req) => req.is_satisfied(interval, schedule, available), + Requirement::Group(req) => req.is_satisfied(interval, schedule, available), } } fn can_be_satisfied( &self, - time: &DateTime, + interval: Interval, schedule: &Schedule, available: &HashMap, ) -> bool { match self { - Requirement::One(req) => req.can_be_satisfied(time, schedule, available), - Requirement::Group(req) => req.can_be_satisfied(time, schedule, available), + Requirement::One(req) => req.can_be_satisfied(interval, schedule, available), + Requirement::Group(req) => req.can_be_satisfied(interval, schedule, available), } } } diff --git a/src/resource_interval.rs b/src/resource_interval.rs index 2c0d049..00a9e26 100644 --- a/src/resource_interval.rs +++ b/src/resource_interval.rs @@ -5,7 +5,7 @@ use std::ops::{Add, Deref, DerefMut, Sub}; /// represent where a resource is available, or where it's required /// Resources are independent, so overlaps between the /// interval sets are possible. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct ResourceInterval(HashMap); impl ResourceInterval { @@ -61,9 +61,9 @@ impl From<&HashMap> for ResourceInterval { } } -impl Add for ResourceInterval { +impl<'a, 'b> Add<&'b ResourceInterval> for &'a ResourceInterval { type Output = ResourceInterval; - fn add(self, other: ResourceInterval) -> Self::Output { + fn add(self, other: &'b ResourceInterval) -> Self::Output { let res: HashMap = other.0.iter().fold(self.0.clone(), |mut acc, (res, is)| { acc.entry(res.clone()) @@ -75,9 +75,9 @@ impl Add for ResourceInterval { } } -impl Sub for ResourceInterval { +impl<'a, 'b> Sub<&'b ResourceInterval> for &'a ResourceInterval { type Output = ResourceInterval; - fn sub(self, other: ResourceInterval) -> Self::Output { + fn sub(self, other: &'b ResourceInterval) -> Self::Output { let res: HashMap = self .0 .iter() @@ -124,17 +124,17 @@ mod tests { fn test_addition() { let a = ri!("alpha", (13, 15)); - assert_eq!(a + ri!("alpha", (15, 18)), ri!("alpha", (13, 18))); + assert_eq!(&a + &ri!("alpha", (15, 18)), ri!("alpha", (13, 18))); } #[test] fn test_subtraction() { assert_eq!( - ri!("alpha", (13, 18)) - ri!("alpha", (15, 16)), + &ri!("alpha", (13, 18)) - &ri!("alpha", (15, 16)), ri!("alpha", (13, 15), (16, 18)) ); assert_eq!( - ri!("alpha", (13, 18)) - ResourceInterval::new(), + &ri!("alpha", (13, 18)) - &ResourceInterval::new(), ri!("alpha", (13, 18)) ); } diff --git a/src/runner.rs b/src/runner.rs new file mode 100644 index 0000000..5b5362e --- /dev/null +++ b/src/runner.rs @@ -0,0 +1,446 @@ +use super::*; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::StreamExt; + +/* + Runner is responsible for taking a TaskSet and a varmap and + iteratively taking steps to converge the current state to + be the target state. + + The runner will continue to execute until: + - A Stop message is sent + - current = TaskSet::coverage (the theoretical) +*/ + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ActionState { + Queued, + Running, + Errored, + Completed, +} + +#[derive(Debug, Clone)] +pub struct Action { + task: String, + interval: Interval, + state: ActionState, + // kill: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum WorldEvent { + Start, + TaskFailed { + task_name: String, + interval: Interval, + }, + TaskCompleted { + task_name: String, + interval: Interval, + }, + Timeout, + Stop, +} + +// Takes a definition, and runs it to completion +pub struct Runner { + tasks: TaskSet, + vars: VarMap, + output_options: TaskOutputOptions, + + target: ResourceInterval, + current: ResourceInterval, + + queue: Vec, + qidx: usize, + + events: FuturesUnordered>, + + last_horizon: DateTime, + executor: mpsc::UnboundedSender, +} + +fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle { + tokio::spawn(async move { + tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await; + WorldEvent::Timeout + }) +} + +async fn validate_cmd( + executor: mpsc::UnboundedSender, + cmd: serde_json::Value, +) -> Result<()> { + let (response, rx) = oneshot::channel(); + executor + .send(ExecutorMessage::ValidateTask { + details: cmd, + response, + }) + .unwrap(); + rx.await? +} + +async fn run_task( + details: serde_json::Value, + executor: mpsc::UnboundedSender, + kill: oneshot::Receiver<()>, + output_options: &TaskOutputOptions, + varmap: &VarMap, +) -> bool { + let (response, response_rx) = oneshot::channel(); + executor + .send(ExecutorMessage::ExecuteTask { + details, + output_options: output_options.clone(), + varmap: varmap.clone(), + response, + kill, + }) + .unwrap(); + response_rx.await.unwrap() +} + +async fn up_task( + task_name: String, + interval: Interval, + kill: oneshot::Receiver<()>, + varmap: VarMap, + up: TaskDetails, + check: Option, + output_options: TaskOutputOptions, + executor: mpsc::UnboundedSender, +) -> WorldEvent { + if let Some(check_cmd) = check.clone() { + let (subkill, subkill_rx) = oneshot::channel(); + let succeeded = run_task( + check_cmd.clone(), + executor.clone(), + subkill_rx, + &output_options, + &varmap, + ) + .await; + + // If check succeeded, resources are up + if succeeded { + return WorldEvent::TaskCompleted { + task_name, + interval, + }; + } + } + + // UP + let (subkill, subkill_rx) = oneshot::channel(); + let succeeded = run_task(up, executor.clone(), subkill_rx, &output_options, &varmap).await; + if !succeeded { + return WorldEvent::TaskFailed { + task_name, + interval, + }; + } + + // recheck + if let Some(check_cmd) = check { + let (subkill, subkill_rx) = oneshot::channel(); + let succeeded = run_task( + check_cmd.clone(), + executor.clone(), + subkill_rx, + &output_options, + &varmap, + ) + .await; + + // If check succeeded, resources are up + if succeeded { + WorldEvent::TaskCompleted { + task_name, + interval, + } + } else { + WorldEvent::TaskFailed { + task_name, + interval, + } + } + } else { + WorldEvent::TaskCompleted { + task_name, + interval, + } + } +} + +impl Runner { + pub async fn new( + tasks: TaskSet, + vars: VarMap, + executor: mpsc::UnboundedSender, + output_options: TaskOutputOptions, + ) -> Result { + for tdef in tasks.values() { + validate_cmd(executor.clone(), tdef.up.clone()).await?; + if let Some(cmd) = &tdef.down { + validate_cmd(executor.clone(), cmd.clone()).await?; + } + if let Some(cmd) = &tdef.check { + validate_cmd(executor.clone(), cmd.clone()).await?; + } + } + + let target = tasks.get_state(Utc::now())?; + + let mut runner = Runner { + tasks, + vars, + output_options, + target, + current: ResourceInterval::new(), + queue: Vec::new(), + qidx: 0, + events: FuturesUnordered::new(), + last_horizon: DateTime::::MIN_UTC, + executor, + }; + + // Create queue + let required = &runner.target - &runner.current; + runner.queue = runner + .tasks + .iter() + .fold(Vec::new(), |mut acc, (name, task)| { + let res: Vec = task + .generate_intervals(&required) + .unwrap() + .into_iter() + .map({ + |interval| Action { + task: name.clone(), + interval, + state: ActionState::Queued, + } + }) + .collect(); + acc.extend(res); + acc + }); + + let unsatisfied = runner + .queue + .iter() + .filter(|act| { + !runner + .tasks + .get(&act.task) + .unwrap() + .can_be_satisfied(act.interval, &runner.target) + }) + .fold(HashSet::new(), |mut acc, a| { + println!("INVALID: {:?}", a); + acc.insert(a.task.clone()); + acc + }); + + if unsatisfied.is_empty() { + Ok(runner) + } else { + Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied)) + } + } + + // We'll be using channels for running + pub async fn run(&mut self, stop: oneshot::Receiver) { + self.events.push(tokio::spawn(async move { + stop.await.expect("Unable to get stop"); + WorldEvent::Stop + })); + self.queue_actions(); + + // Loop while we can make progress + while !self.is_done() { + println!( + "At the top:\nTARGET: {:?}\nCURRENT: {:?}", + self.target, self.current + ); + + match self.events.next().await { + Some(Ok(WorldEvent::Start)) => { + println!("START"); + self.queue_actions(); + } + Some(Ok(WorldEvent::Stop)) => { + println!("Stop"); + break; + } + Some(Ok(WorldEvent::Timeout)) => { + println!("Timeout"); + self.queue_actions(); + } + Some(Ok(WorldEvent::TaskFailed { + task_name, + interval, + })) => { + println!("FAILED: {} / {}", task_name, interval); + println!("Well that sucks"); + } + Some(Ok(WorldEvent::TaskCompleted { + task_name, + interval, + })) => { + let action = self + .queue + .iter_mut() + .find(|x| x.task == task_name && x.interval == interval) + .unwrap(); + let task = self.tasks.get(&task_name).unwrap(); + action.state = ActionState::Completed; + for res in &task.provides { + self.current.get_mut(res).unwrap().insert(action.interval); + } + self.queue_actions(); + } + Some(Err(e)) => { + panic!("Something went wrong: {:?}", e) + } + None => { + // No pending actions waiting + // Can probably wait to the next event + continue; + } + } + // Log stuff + } + } + + fn queue_actions(&mut self) { + let now = Utc::now(); + + // Collect any outstanding futures + for action in self.queue[self.qidx..] + .iter_mut() + .filter(|x| x.state == ActionState::Queued && x.interval.end <= now) + { + let task = self.tasks.get(&action.task).unwrap(); + if !task.can_run(action.interval, &self.current) { + continue; + } + let (kill_tx, kill) = oneshot::channel(); + let varmap: VarMap = VarMap::from_interval(&action.interval, task.timezone) + .iter() + .chain(self.vars.iter()) + .collect(); + let task_name = action.task.clone(); + let interval = action.interval; + let up = task.up.clone(); + let check = task.check.clone(); + let output_options = self.output_options.clone(); + let exe = self.executor.clone(); + self.events.push(tokio::spawn(async move { + up_task( + task_name.clone(), + interval, + kill, + varmap, + up, + check, + output_options, + exe, + ) + .await + })); + // action.response = Some(response_rx); + // action.kill = Some(kill_tx); + action.state = ActionState::Running; + } + } + + fn is_done(&self) -> bool { + self.target == self.current + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::executors::local_executor; + + #[tokio::test] + async fn test_runner() { + let json_runner = r#"{ + "variables": { + "HOME": "/tmp/world_test" + }, + "calendars": { + "std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] } + }, + "tasks": { + "task_a": { + "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}" }, + "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}" }, + "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}" }, + + "provides": [ "task_a" ], + + "calendar_name": "std", + "times": [ "09:00:00", "12:00:00"], + "timezone": "America/New_York", + + "valid_from": "2022-01-01T09:00:00", + "valid_to": "2022-01-08T09:00:00" + }, + "task_b": { + "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}" }, + "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}" }, + "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}" }, + + "provides": [ "task_b" ], + "requires": [ { "resource": "task_a", "offset": 0 } ], + + "calendar_name": "std", + "times": [ "17:00:00" ], + "timezone": "America/New_York", + + "valid_from": "2022-01-04T09:00:00", + "valid_to": "2022-01-07T00:00:00" + } + } + }"#; + + /* + task_a: + declared: [2022-01-01T09:00:00, 2022-01-08T09:00:00] + actual: [2021-12-31T12:00:00, 2022-01-07T12:00:00] + task_b: + declared: [2022-01-02T09:00:00, 2022-01-07T13:00:00] + actual: [2021-12-31T17:00:00, 2022-01-07T17:00:00] + */ + + // Some Deserializer. + let world_def: WorldDefinition = serde_json::from_str(json_runner).unwrap(); + + let tasks = world_def.taskset().unwrap(); + + // Executor + let (tx, rx) = mpsc::unbounded_channel(); + let executor = local_executor::start(10, rx); + + let mut runner = Runner::new( + tasks, + world_def.variables, + tx.clone(), + world_def.output_options, + ) + .await + .unwrap(); + + let (wtx, wrx) = oneshot::channel(); + runner.run(wrx).await; + + tx.send(ExecutorMessage::Stop {}).unwrap(); + + assert_eq!(1, 1); + } +} diff --git a/src/schedule.rs b/src/schedule.rs index 4c12c95..b064522 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -58,23 +58,7 @@ impl Schedule { times } - pub fn interval_utc(&self, dt: DateTime, offset: i32) -> Interval { - // Need to get the current interval, then offset it - let at = dt.with_timezone(&self.timezone); - let rt = if self.times.iter().any(|x| *x == at.time()) { - at - } else { - self.prev_time(at) - }; - - let start = self.offset(rt, offset); - Interval::new( - start.with_timezone(&Utc), - self.next_time(start).with_timezone(&Utc), - ) - } - - pub fn interval(&self, dt: DateTime, offset: i32) -> Interval { + pub fn interval(&self, dt: DateTime, offset: i32) -> Interval { // Need to get the current interval, then offset it let at = dt.with_timezone(&self.timezone); let rt = if self.times.iter().any(|x| *x == at.time()) { diff --git a/src/task.rs b/src/task.rs index f6f6585..4d8e3c5 100644 --- a/src/task.rs +++ b/src/task.rs @@ -159,18 +159,16 @@ impl Task { } /// Returns true if all requirements are satisfied - pub fn can_run(&self, time: DateTime, available: &ResourceInterval) -> bool { - let local_time = time.with_timezone(&self.timezone); + pub fn can_run(&self, interval: Interval, available: &ResourceInterval) -> bool { self.requires .iter() - .all(|req| req.is_satisfied(&local_time, &self.schedule, available)) + .all(|req| req.is_satisfied(interval, &self.schedule, available)) } - pub fn can_be_satisfied(&self, time: DateTime, available: &ResourceInterval) -> bool { - let local_time = time.with_timezone(&self.timezone); + pub fn can_be_satisfied(&self, interval: Interval, available: &ResourceInterval) -> bool { self.requires .iter() - .all(|req| req.can_be_satisfied(&local_time, &self.schedule, available)) + .all(|req| req.can_be_satisfied(interval, &self.schedule, available)) } pub fn up(&self, interval: &Interval) -> Result> { diff --git a/src/task_set.rs b/src/task_set.rs index 0cf6963..52a8860 100644 --- a/src/task_set.rs +++ b/src/task_set.rs @@ -1,19 +1,8 @@ use super::*; +use std::convert::From; use std::ops::{Deref, DerefMut}; -pub enum ActionState { - Queued, - Running, - Errored, - Completed, -} - -pub struct Action { - task: String, - interval: Interval, - state: ActionState, -} - +#[derive(Clone, Debug)] pub struct TaskSet(HashMap); impl TaskSet { @@ -25,6 +14,11 @@ impl TaskSet { self.get_state(MAX_TIME) } + pub fn validate(&self) -> Result<()> { + self.get_state(MAX_TIME)?; + Ok(()) + } + pub fn get_state(&self, time: DateTime) -> Result { let mut res = ResourceInterval::new(); @@ -49,23 +43,6 @@ impl TaskSet { Ok(res) } - - pub fn get_actions(&self, required: &ResourceInterval) -> Result> { - let mut actions = Vec::new(); - for (name, task) in self.iter() { - let new_actions: Vec = task - .generate_intervals(required)? - .into_iter() - .map(|interval| Action { - task: name.clone(), - interval, - state: ActionState::Queued, - }) - .collect(); - actions.extend(new_actions); - } - Ok(actions) - } } impl Deref for TaskSet { @@ -80,3 +57,9 @@ impl DerefMut for TaskSet { &mut self.0 } } + +impl From> for TaskSet { + fn from(data: HashMap) -> Self { + Self(data) + } +} diff --git a/src/world.rs b/src/world.rs new file mode 100644 index 0000000..205e108 --- /dev/null +++ b/src/world.rs @@ -0,0 +1,45 @@ +use super::*; + +// A struct used for serializing / deserializing world +#[derive(Debug, Serialize, Deserialize)] +pub struct WorldDefinition { + pub tasks: HashMap, + + pub calendars: HashMap, + + #[serde(default)] + pub variables: VarMap, + + #[serde(default)] + pub output_options: TaskOutputOptions, +} + +impl WorldDefinition { + pub fn taskset(&self) -> Result { + // Ensure all tasks reference a valid calendar + for (name, def) in self.tasks.iter() { + if !self.calendars.contains_key(&def.calendar_name) { + return Err(anyhow!( + "Task {} references calendar {}, which is not defined", + name, + def.calendar_name + )); + } + } + let tasks: HashMap = self + .tasks + .iter() + .map(|(tn, td)| { + ( + tn.clone(), + td.to_task(self.calendars.get(&td.calendar_name).unwrap()), + ) + }) + .collect(); + let ts = TaskSet::from(tasks); + + ts.validate()?; + + Ok(ts) + } +}