diff --git a/src/bin/wf/main.rs b/src/bin/wf/main.rs index 5a1b5f3..1962286 100644 --- a/src/bin/wf/main.rs +++ b/src/bin/wf/main.rs @@ -123,9 +123,11 @@ async fn main() -> std::io::Result<()> { debug!("Config: {:?}", args); + let (_runner_tx, runner_rx) = mpsc::unbounded_channel(); let mut runner = Runner::new( tasks, world_def.variables, + runner_rx, exe_tx.clone(), storage_tx.clone(), world_def.output_options, @@ -134,8 +136,7 @@ async fn main() -> std::io::Result<()> { .await .unwrap(); - let (wtx, wrx) = oneshot::channel(); - runner.run(wrx).await; + runner.run().await; exe_tx.send(ExecutorMessage::Stop {}).unwrap(); exe_handle.await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index b405405..584f7d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,9 +43,3 @@ pub mod task; pub mod task_set; pub mod varmap; pub mod world; - -/* - TODO: - target_state -> TaskSet.coverage() - current state -*/ diff --git a/src/requirement.rs b/src/requirement.rs index 4edd901..85a6b9c 100644 --- a/src/requirement.rs +++ b/src/requirement.rs @@ -18,6 +18,8 @@ pub trait Satisfiable { schedule: &Schedule, available: &HashMap, ) -> bool; + + fn resources(&self) -> HashSet; } #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] @@ -29,6 +31,23 @@ pub enum AggregateRequirement { } impl Satisfiable for AggregateRequirement { + fn resources(&self) -> HashSet { + match self { + AggregateRequirement::All(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| { + acc.extend(req.resources()); + acc + }), + AggregateRequirement::Any(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| { + acc.extend(req.resources()); + acc + }), + AggregateRequirement::None(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| { + acc.extend(req.resources()); + acc + }), + } + } + fn is_satisfied( &self, interval: Interval, @@ -76,6 +95,13 @@ pub enum SingleRequirement { } impl Satisfiable for SingleRequirement { + fn resources(&self) -> HashSet { + match self { + SingleRequirement::Offset { resource, .. } => HashSet::from([resource.to_owned()]), + SingleRequirement::File { path } => HashSet::new(), + } + } + fn is_satisfied( &self, interval: Interval, @@ -145,6 +171,13 @@ impl Satisfiable for Requirement { Requirement::Group(req) => req.can_be_satisfied(interval, schedule, available), } } + + fn resources(&self) -> HashSet { + match self { + Requirement::One(req) => req.resources(), + Requirement::Group(req) => req.resources(), + } + } } #[cfg(test)] diff --git a/src/runner.rs b/src/runner.rs index d45c54d..3582214 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,6 +1,7 @@ use super::*; use futures::stream::futures_unordered::FuturesUnordered; use futures::StreamExt; +use std::collections::VecDeque; /* Runner is responsible for taking a TaskSet and a varmap and @@ -22,7 +23,7 @@ pub enum ActionState { #[derive(Debug, Clone)] pub struct Action { - task: String, + task: usize, interval: Interval, state: ActionState, // kill: Option>, @@ -30,15 +31,9 @@ pub struct Action { #[derive(Debug, Serialize, Deserialize)] pub enum RunnerMessage { - Start, - TaskFailed { - task_name: String, - interval: Interval, - }, - TaskCompleted { - task_name: String, - interval: Interval, - }, + Tick, + ActionCompleted { action_id: usize, succeeded: bool }, + RetryAction { action_id: usize }, /* ForceUp { resources: HashSet, @@ -49,7 +44,6 @@ pub enum RunnerMessage { interval: Interval, }, */ - Timeout, Stop, } @@ -64,23 +58,17 @@ pub struct Runner { target: ResourceInterval, current: ResourceInterval, - queue: Vec, + actions: Vec, qidx: usize, events: FuturesUnordered>, last_horizon: DateTime, + messages: mpsc::UnboundedReceiver, executor: mpsc::UnboundedSender, storage: mpsc::UnboundedSender, } -fn gen_timeout(duration: Duration) -> tokio::task::JoinHandle { - tokio::spawn(async move { - tokio::time::sleep(duration.to_std().unwrap()).await; - RunnerMessage::Timeout - }) -} - async fn validate_cmd( executor: mpsc::UnboundedSender, cmd: serde_json::Value, @@ -129,6 +117,7 @@ async fn run_task( } async fn up_task( + action_id: usize, task_name: String, interval: Interval, _kill: oneshot::Receiver<()>, @@ -155,9 +144,9 @@ async fn up_task( // If check succeeded, resources are up if succeeded { - return RunnerMessage::TaskCompleted { - task_name, - interval, + return RunnerMessage::ActionCompleted { + action_id, + succeeded: true, }; } } @@ -176,9 +165,9 @@ async fn up_task( ) .await; if !succeeded { - return RunnerMessage::TaskFailed { - task_name, - interval, + return RunnerMessage::ActionCompleted { + action_id, + succeeded: false, }; } @@ -199,34 +188,45 @@ async fn up_task( // If check succeeded, resources are up if succeeded { - RunnerMessage::TaskCompleted { - task_name, - interval, - } + return RunnerMessage::ActionCompleted { + action_id, + succeeded: true, + }; } else { - RunnerMessage::TaskFailed { - task_name, - interval, - } + return RunnerMessage::ActionCompleted { + action_id, + succeeded: false, + }; } } else { - RunnerMessage::TaskCompleted { - task_name, - interval, - } + return RunnerMessage::ActionCompleted { + action_id, + succeeded: true, + }; } } +fn delayed_event(delay: Duration, event: RunnerMessage) -> tokio::task::JoinHandle { + tokio::spawn(async move { + tokio::time::sleep(delay.to_std().unwrap()).await; + event + }) +} + impl Runner { pub async fn new( tasks: TaskSet, vars: VarMap, + messages: mpsc::UnboundedReceiver, executor: mpsc::UnboundedSender, storage: mpsc::UnboundedSender, output_options: TaskOutputOptions, force_check: bool, ) -> Result { - for tdef in tasks.values() { + tasks.validate()?; + + // Validate the task commands can run on the executor + for tdef in tasks.iter() { validate_cmd(executor.clone(), tdef.up.clone()).await?; if let Some(cmd) = &tdef.down { validate_cmd(executor.clone(), cmd.clone()).await?; @@ -236,6 +236,7 @@ impl Runner { } } + // Load last-known state let current = if force_check { info!("Force re-check set, starting with empty current state."); ResourceInterval::new() @@ -248,172 +249,132 @@ impl Runner { let res = rx.await.unwrap(); res }; + let target = current.clone(); - let end_state = tasks.coverage()?; + let end_state = tasks.coverage(); let mut runner = Runner { tasks, vars, output_options, end_state, - target: ResourceInterval::new(), + target, current, - queue: Vec::new(), + actions: Vec::new(), qidx: 0, events: FuturesUnordered::new(), last_horizon: DateTime::::MIN_UTC, + messages, executor, storage, }; - runner.tick()?; + runner.tick(); + runner.queue_actions(); Ok(runner) } - pub fn tick(&mut self) -> Result<()> { - let target = self.tasks.get_state(Utc::now())?; + // Generate a new target state and generate any required actions + pub fn tick(&mut self) { + let new_target = self.tasks.get_state(Utc::now() + Duration::days(1)); + let new_required = new_target.difference(&self.target); + let mut new_actions = + self.tasks + .iter() + .enumerate() + .fold(Vec::new(), |mut acc, (idx, task)| { + let res: Vec = task + .generate_intervals(&new_required) + .unwrap() + .into_iter() + .map({ + |interval| Action { + task: idx, + interval, + state: ActionState::Queued, + } + }) + .collect(); + acc.extend(res); + acc + }); + new_actions.sort_unstable_by(|a, b| a.interval.end.partial_cmp(&b.interval.end).unwrap()); - // Create queue - let required = target.difference(&self.current); - self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| { - let res: Vec = task - .generate_intervals(&required) - .unwrap() - .into_iter() - .map({ - |interval| Action { - task: name.clone(), - interval, - state: ActionState::Queued, - } - }) - .collect(); - acc.extend(res); - acc - }); - - // Ensure that all actions can be satisfied - let unsatisfied = self - .queue - .iter() - .filter(|act| { - !self - .tasks - .get(&act.task) - .unwrap() - .can_be_satisfied(act.interval, &target) - }) - .fold(HashSet::new(), |mut acc, a| { - acc.insert(a.task.clone()); - acc - }); - - // Ensure current + - let mut result_state = self.current.clone(); - for action in &self.queue { - for res in &self.tasks.get(&action.task).unwrap().provides { - result_state - .entry(res.clone()) - .or_insert(IntervalSet::new()) - .insert(action.interval); - } - } - if result_state != target { - return Err(anyhow!( - "Actions generated produced\n\t{:?}\nExpected\n\t{:?}", - result_state, - target - )); - } - - if unsatisfied.is_empty() { - self.target = target; - Ok(()) - } else { - Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied)) - } + info!("Tick: Generated {} new actions", new_actions.len()); + self.actions.extend(new_actions); } - // We'll be using channels for running - pub async fn run(&mut self, stop: oneshot::Receiver) { - self.events.push(tokio::spawn(async move { - // This recv will fail if the channel is shutdown, so just ignore it. - stop.await.unwrap_or(RunnerMessage::Stop); - RunnerMessage::Stop - })); - self.queue_actions(); + pub async fn run(&mut self) { + self.events + .push(delayed_event(Duration::seconds(1), RunnerMessage::Tick)); - // Loop while we can make progress + // Need to incorporate the ability to receive messages + // + // Loop until the current state matches the end state while !self.is_done() { - // Queue up tasks - if self - .queue - .iter() - .all(|action| action.state == ActionState::Completed) - { - let now = Utc::now(); - let next_time = self - .tasks - .values() - .map(|t| t.schedule.next_time(now)) - .min() - .unwrap() - .with_timezone(&Utc); - let sleep_duration = next_time - now; - info!("Sleeping for {} until next task", sleep_duration); - self.events.push(gen_timeout(sleep_duration)); - self.tick().unwrap(); - } match self.events.next().await { - Some(Ok(RunnerMessage::Start)) => { + Some(Ok(RunnerMessage::Tick)) => { + debug!("Tick"); + // Enqueue new messages + while let Ok(msg) = self.messages.try_recv() { + self.events.push(delayed_event(Duration::seconds(0), msg)); + } + match self.actions.last() { + Some(action) => { + if action.interval.end <= Utc::now() { + self.tick() + } + } + None => self.tick(), + } + + // Perform maintenance self.queue_actions(); + + self.events + .push(delayed_event(Duration::seconds(5), RunnerMessage::Tick)); } Some(Ok(RunnerMessage::Stop)) => { + info!("Stopping"); break; } - Some(Ok(RunnerMessage::Timeout)) => { - self.queue_actions(); + Some(Ok(RunnerMessage::RetryAction { action_id })) => { + info!("Retrying action {}", action_id); + let action = &mut self.actions[action_id]; + action.state = ActionState::Queued; } - Some(Ok(RunnerMessage::TaskFailed { - task_name, - interval, + Some(Ok(RunnerMessage::ActionCompleted { + action_id, + succeeded, })) => { - println!("FAILED: {} / {}", task_name, interval); - println!("Well that sucks"); - } - Some(Ok(RunnerMessage::TaskCompleted { - task_name, - interval, - })) => { - let action = self - .queue - .iter_mut() - .find(|x| x.task == task_name && x.interval == interval) - .unwrap(); - let task = self.tasks.get(&task_name).unwrap(); - action.state = ActionState::Completed; - for res in &task.provides { - self.current - .entry(res.clone()) - .or_insert(IntervalSet::new()) - .insert(action.interval); + info!("Completing action {}", action_id); + if succeeded { + let action = &mut self.actions[action_id]; + let task = self.tasks.get(action.task).unwrap(); + action.state = ActionState::Completed; + for res in &task.provides { + self.current + .entry(res.clone()) + .or_insert(IntervalSet::new()) + .insert(action.interval); + } + self.storage + .send(StorageMessage::StoreState { + state: self.current.clone(), + }) + .unwrap(); + self.queue_actions(); + } else { + self.events.push(delayed_event( + Duration::seconds(30), + RunnerMessage::RetryAction { action_id }, + )); } - info!("Updating State"); - self.storage - .send(StorageMessage::StoreState { - state: self.current.clone(), - }) - .unwrap(); - self.queue_actions(); } Some(Err(e)) => { panic!("Something went wrong: {:?}", e) } - None => { - // No pending actions waiting - // Can probably wait to the next event - continue; - } + None => {} } // Log stuff } @@ -422,12 +383,14 @@ impl Runner { fn queue_actions(&mut self) { let now = Utc::now(); - // Collect any outstanding futures - for action in self.queue[self.qidx..] + // Submit any elligible jobs + for (action_id, action) in self + .actions .iter_mut() - .filter(|x| x.state == ActionState::Queued && x.interval.end <= now) + .enumerate() + .filter(|(_, x)| x.state == ActionState::Queued && x.interval.end <= now) { - let task = self.tasks.get(&action.task).unwrap(); + let task = self.tasks.get(action.task).unwrap(); if !task.can_run(action.interval, &self.current) { continue; } @@ -436,7 +399,7 @@ impl Runner { .iter() .chain(self.vars.iter()) .collect(); - let task_name = action.task.clone(); + let task_name = task.name.clone(); let interval = action.interval; let up = task.up.clone(); let check = task.check.clone(); @@ -445,6 +408,7 @@ impl Runner { let storage = self.storage.clone(); self.events.push(tokio::spawn(async move { up_task( + action_id, task_name.clone(), interval, kill, @@ -532,9 +496,11 @@ mod tests { "world_test".to_owned(), ); + let (runner_tx, runner_rx) = mpsc::unbounded_channel(); let mut runner = Runner::new( tasks, world_def.variables, + runner_rx, tx.clone(), storage_tx.clone(), world_def.output_options, @@ -543,8 +509,7 @@ mod tests { .await .unwrap(); - let (wtx, wrx) = oneshot::channel(); - runner.run(wrx).await; + runner.run().await; tx.send(ExecutorMessage::Stop {}).unwrap(); executor.await.unwrap(); diff --git a/src/task.rs b/src/task.rs index 8cc1d3c..5a277a2 100644 --- a/src/task.rs +++ b/src/task.rs @@ -126,6 +126,7 @@ impl TaskDefinition { let actual_end = schedule.interval(end, 0).start; Task { + name: name.to_owned(), up: self.up.clone(), down: self.down.clone(), check: self.check.clone(), @@ -147,6 +148,7 @@ impl TaskDefinition { */ #[derive(Clone, Serialize, Debug)] pub struct Task { + pub name: String, pub up: TaskDetails, pub down: Option, pub check: Option, @@ -238,6 +240,13 @@ impl Task { .all(|req| req.can_be_satisfied(interval, &self.schedule, available)) } + pub fn requires_resources(&self) -> HashSet { + self.requires.iter().fold(HashSet::new(), |mut acc, req| { + acc.extend(req.resources()); + acc + }) + } + pub fn up(&self, interval: &Interval) -> Result> { if self.check(interval) { Ok(self.provides.clone()) @@ -315,7 +324,7 @@ mod tests { // Produces a std let cal = Calendar::new(); - let task = task_def.to_task(&cal); + let task = task_def.to_task("test", &cal); // Assert the valid interval is correct assert_eq!( diff --git a/src/task_set.rs b/src/task_set.rs index 155788c..2765212 100644 --- a/src/task_set.rs +++ b/src/task_set.rs @@ -3,28 +3,67 @@ use std::convert::From; use std::ops::{Deref, DerefMut}; #[derive(Clone, Debug)] -pub struct TaskSet(HashMap); +pub struct TaskSet(Vec); impl TaskSet { pub fn new() -> Self { - TaskSet(HashMap::new()) + TaskSet(Vec::new()) } - pub fn coverage(&self) -> Result { + pub fn coverage(&self) -> ResourceInterval { self.get_state(MAX_TIME) } pub fn validate(&self) -> Result<()> { - self.get_state(MAX_TIME)?; + let state = self.coverage(); + + for task in &self.0 { + for resource in task.requires_resources() { + if !state.contains_key(&resource) { + return Err(anyhow!( + "Task {} requires resource {}, which isn't produced.", + task.name, + resource + )); + } + } + } + + // validate that no task generates the same resource on overlapping times + let providers: HashMap> = + self.0 + .iter() + .enumerate() + .fold(HashMap::new(), |mut acc, (idx, t)| { + for res in &t.provides { + acc.entry(res.clone()).or_insert(Vec::new()).push(idx) + } + acc + }); + for (res, tids) in providers { + let mut is = IntervalSet::new(); + for tid in tids { + let already_provided = is.intersection(&self.0[tid].valid_over); + if !already_provided.is_empty() { + return Err(anyhow!( + "Task set invalid: multiple tasks provide resource {} on the intervals {:?}", + res, + already_provided + )); + } + is.merge(&self.0[tid].valid_over); + } + } + Ok(()) } - pub fn get_state(&self, time: DateTime) -> Result { + pub fn get_state(&self, time: DateTime) -> ResourceInterval { let mut res = ResourceInterval::new(); // Insert all of the covered items - for task in self.values() { - // TODO Need to align each of these intervals with a scheduled time + for task in &self.0 { + // Need to align each of these intervals with a scheduled time let timeline = if time < MAX_TIME { let cur_intv = task.schedule.interval(time.clone(), 0); if cur_intv.end > time { @@ -37,25 +76,18 @@ impl TaskSet { }; let task_timeline = task.valid_over.intersection(&timeline); for resource in &task.provides { - let ris = res.entry(resource.clone()).or_insert(IntervalSet::new()); - let already_provided = ris.intersection(&task_timeline); - if !already_provided.is_empty() { - return Err(anyhow!( - "Task set invalid: multiple tasks provide resource {} on the intervals {:?}", - resource, - already_provided - )); - } - ris.merge(&task_timeline); + res.entry(resource.clone()) + .or_insert(IntervalSet::new()) + .merge(&task_timeline); } } - Ok(res) + res } } impl Deref for TaskSet { - type Target = HashMap; + type Target = Vec; fn deref(&self) -> &Self::Target { &self.0 } @@ -67,8 +99,8 @@ impl DerefMut for TaskSet { } } -impl From> for TaskSet { - fn from(data: HashMap) -> Self { +impl From> for TaskSet { + fn from(data: Vec) -> Self { Self(data) } } diff --git a/src/world.rs b/src/world.rs index c852038..39abd2a 100644 --- a/src/world.rs +++ b/src/world.rs @@ -26,15 +26,10 @@ impl WorldDefinition { )); } } - let tasks: HashMap = self + let tasks: Vec = self .tasks .iter() - .map(|(tn, td)| { - ( - tn.clone(), - td.to_task(tn, self.calendars.get(&td.calendar_name).unwrap()), - ) - }) + .map(|(tn, td)| td.to_task(tn, self.calendars.get(&td.calendar_name).unwrap())) .collect(); let ts = TaskSet::from(tasks);