From 08fcab41d315120725d00382543a5e78135d6bce Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Tue, 4 Oct 2022 15:13:24 -0300 Subject: [PATCH] Adding in redis storage --- src/executors/local_executor.rs | 20 ++++++- src/executors/mod.rs | 3 + src/resource_interval.rs | 2 +- src/runner.rs | 100 ++++++++++++++++++++++++-------- src/storage/mod.rs | 6 ++ src/storage/redis_store.rs | 28 ++++++--- 6 files changed, 123 insertions(+), 36 deletions(-) diff --git a/src/executors/local_executor.rs b/src/executors/local_executor.rs index 5efd5a7..a507fa0 100644 --- a/src/executors/local_executor.rs +++ b/src/executors/local_executor.rs @@ -255,9 +255,12 @@ pub async fn start_local_executor( }); } ExecuteTask { + task_name, + interval, details, varmap, output_options, + storage, response, kill, } => { @@ -274,7 +277,15 @@ pub async fn start_local_executor( ..TaskAttempt::new() }, }; - response.send(attempt.succeeded).unwrap(); + let rc = attempt.succeeded; + storage + .send(StorageMessage::StoreAttempt { + task_name, + interval, + attempt, + }) + .unwrap(); + response.send(rc).unwrap(); })); } Stop {} => { @@ -284,8 +295,11 @@ pub async fn start_local_executor( } } -pub fn start(max_parallel: usize, msgs: mpsc::UnboundedReceiver) { +pub fn start( + max_parallel: usize, + msgs: mpsc::UnboundedReceiver, +) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { start_local_executor(max_parallel, msgs).await; - }); + }) } diff --git a/src/executors/mod.rs b/src/executors/mod.rs index f0a9e9f..5058392 100644 --- a/src/executors/mod.rs +++ b/src/executors/mod.rs @@ -16,9 +16,12 @@ pub enum ExecutorMessage { /// Errors /// Will return `Err` if the tasks are invalid, according to the executor ExecuteTask { + task_name: String, + interval: Interval, details: serde_json::Value, varmap: VarMap, output_options: TaskOutputOptions, + storage: mpsc::UnboundedSender, response: oneshot::Sender, kill: oneshot::Receiver<()>, }, diff --git a/src/resource_interval.rs b/src/resource_interval.rs index 6942157..69353c9 100644 --- a/src/resource_interval.rs +++ b/src/resource_interval.rs @@ -5,7 +5,7 @@ use std::ops::{Add, Deref, DerefMut, Sub}; /// represent where a resource is available, or where it's required /// Resources are independent, so overlaps between the /// interval sets are possible. -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub struct ResourceInterval(HashMap); impl ResourceInterval { diff --git a/src/runner.rs b/src/runner.rs index 433c267..932ab05 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -29,7 +29,7 @@ pub struct Action { } #[derive(Debug, Serialize, Deserialize)] -pub enum WorldEvent { +pub enum RunnerEvent { Start, TaskFailed { task_name: String, @@ -57,16 +57,17 @@ pub struct Runner { queue: Vec, qidx: usize, - events: FuturesUnordered>, + events: FuturesUnordered>, last_horizon: DateTime, executor: mpsc::UnboundedSender, + storage: mpsc::UnboundedSender, } -fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle { +fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle { tokio::spawn(async move { tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await; - WorldEvent::Timeout + RunnerEvent::Timeout }) } @@ -85,20 +86,27 @@ async fn validate_cmd( } async fn run_task( + task_name: String, + interval: Interval, details: serde_json::Value, executor: mpsc::UnboundedSender, + storage: mpsc::UnboundedSender, kill: oneshot::Receiver<()>, output_options: &TaskOutputOptions, varmap: &VarMap, ) -> bool { + println!("Running {}/{}", task_name, interval); let (response, response_rx) = oneshot::channel(); executor .send(ExecutorMessage::ExecuteTask { + task_name, + interval, details, output_options: output_options.clone(), varmap: varmap.clone(), response, kill, + storage, }) .unwrap(); response_rx.await.unwrap() @@ -113,12 +121,16 @@ async fn up_task( check: Option, output_options: TaskOutputOptions, executor: mpsc::UnboundedSender, -) -> WorldEvent { + storage: mpsc::UnboundedSender, +) -> RunnerEvent { if let Some(check_cmd) = check.clone() { let (subkill, subkill_rx) = oneshot::channel(); let succeeded = run_task( + task_name.clone(), + interval, check_cmd.clone(), executor.clone(), + storage.clone(), subkill_rx, &output_options, &varmap, @@ -127,7 +139,7 @@ async fn up_task( // If check succeeded, resources are up if succeeded { - return WorldEvent::TaskCompleted { + return RunnerEvent::TaskCompleted { task_name, interval, }; @@ -136,9 +148,19 @@ async fn up_task( // UP let (subkill, subkill_rx) = oneshot::channel(); - let succeeded = run_task(up, executor.clone(), subkill_rx, &output_options, &varmap).await; + let succeeded = run_task( + task_name.clone(), + interval, + up, + executor.clone(), + storage.clone(), + subkill_rx, + &output_options, + &varmap, + ) + .await; if !succeeded { - return WorldEvent::TaskFailed { + return RunnerEvent::TaskFailed { task_name, interval, }; @@ -148,8 +170,11 @@ async fn up_task( if let Some(check_cmd) = check { let (subkill, subkill_rx) = oneshot::channel(); let succeeded = run_task( + task_name.clone(), + interval, check_cmd.clone(), executor.clone(), + storage.clone(), subkill_rx, &output_options, &varmap, @@ -158,18 +183,18 @@ async fn up_task( // If check succeeded, resources are up if succeeded { - WorldEvent::TaskCompleted { + RunnerEvent::TaskCompleted { task_name, interval, } } else { - WorldEvent::TaskFailed { + RunnerEvent::TaskFailed { task_name, interval, } } } else { - WorldEvent::TaskCompleted { + RunnerEvent::TaskCompleted { task_name, interval, } @@ -181,6 +206,7 @@ impl Runner { tasks: TaskSet, vars: VarMap, executor: mpsc::UnboundedSender, + storage: mpsc::UnboundedSender, output_options: TaskOutputOptions, ) -> Result { for tdef in tasks.values() { @@ -193,6 +219,12 @@ impl Runner { } } + let (response, rx) = oneshot::channel(); + storage + .send(StorageMessage::LoadState { response }) + .unwrap(); + let current = rx.await.unwrap(); + let end_state = tasks.coverage()?; let mut runner = Runner { tasks, @@ -200,12 +232,13 @@ impl Runner { output_options, end_state, target: ResourceInterval::new(), - current: ResourceInterval::new(), + current, queue: Vec::new(), qidx: 0, events: FuturesUnordered::new(), last_horizon: DateTime::::MIN_UTC, executor, + storage, }; runner.tick()?; @@ -218,6 +251,8 @@ impl Runner { // Create queue let required = target.difference(&self.current); + println!("CUR: {:?}", self.current); + println!("REQ: {:?}", required); self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| { let res: Vec = task .generate_intervals(&required) @@ -247,7 +282,6 @@ impl Runner { .can_be_satisfied(act.interval, &target) }) .fold(HashSet::new(), |mut acc, a| { - println!("Task cannot be satisfied: {:?}", a); acc.insert(a.task.clone()); acc }); @@ -279,39 +313,37 @@ impl Runner { } // We'll be using channels for running - pub async fn run(&mut self, stop: oneshot::Receiver) { + pub async fn run(&mut self, stop: oneshot::Receiver) { self.events.push(tokio::spawn(async move { stop.await.expect("Unable to get stop"); - WorldEvent::Stop + RunnerEvent::Stop })); self.queue_actions(); // Loop while we can make progress while !self.is_done() { match self.events.next().await { - Some(Ok(WorldEvent::Start)) => { - println!("START"); + Some(Ok(RunnerEvent::Start)) => { self.queue_actions(); } - Some(Ok(WorldEvent::Stop)) => { - println!("Stop"); + Some(Ok(RunnerEvent::Stop)) => { break; } - Some(Ok(WorldEvent::Timeout)) => { - println!("Timeout"); + Some(Ok(RunnerEvent::Timeout)) => { self.queue_actions(); } - Some(Ok(WorldEvent::TaskFailed { + Some(Ok(RunnerEvent::TaskFailed { task_name, interval, })) => { println!("FAILED: {} / {}", task_name, interval); println!("Well that sucks"); } - Some(Ok(WorldEvent::TaskCompleted { + Some(Ok(RunnerEvent::TaskCompleted { task_name, interval, })) => { + println!("Completing {}/{}", task_name, interval); let action = self .queue .iter_mut() @@ -325,6 +357,11 @@ impl Runner { .or_insert(IntervalSet::new()) .insert(action.interval); } + self.storage + .send(StorageMessage::StoreState { + state: self.current.clone(), + }) + .unwrap(); self.queue_actions(); } Some(Err(e)) => { @@ -363,6 +400,7 @@ impl Runner { let check = task.check.clone(); let output_options = self.output_options.clone(); let exe = self.executor.clone(); + let storage = self.storage.clone(); self.events.push(tokio::spawn(async move { up_task( task_name.clone(), @@ -373,6 +411,7 @@ impl Runner { check, output_options, exe, + storage, ) .await })); @@ -441,12 +480,21 @@ mod tests { // Executor let (tx, rx) = mpsc::unbounded_channel(); - local_executor::start(10, rx); + let executor = local_executor::start(10, rx); + + // Storage + let (storage_tx, storage_rx) = mpsc::unbounded_channel(); + let storage = redis_store::start( + storage_rx, + "redis://localhost".to_owned(), + "world_test".to_owned(), + ); let mut runner = Runner::new( tasks, world_def.variables, tx.clone(), + storage_tx.clone(), world_def.output_options, ) .await @@ -456,6 +504,10 @@ mod tests { runner.run(wrx).await; tx.send(ExecutorMessage::Stop {}).unwrap(); + executor.await.unwrap(); + + storage_tx.send(StorageMessage::Stop {}).unwrap(); + storage.await.unwrap(); assert_eq!(1, 1); } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 222eaba..2638461 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -9,6 +9,12 @@ pub enum StorageMessage { interval: Interval, attempt: TaskAttempt, }, + StoreState { + state: ResourceInterval, + }, + LoadState { + response: oneshot::Sender, + }, Stop {}, } diff --git a/src/storage/redis_store.rs b/src/storage/redis_store.rs index fdde230..78968f2 100644 --- a/src/storage/redis_store.rs +++ b/src/storage/redis_store.rs @@ -13,7 +13,7 @@ pub async fn start_redis_storage( let mut conn = client.get_async_connection().await?; while let Some(msg) = msgs.recv().await { - use StorageMessage::{Stop, StoreAttempt}; + use StorageMessage::*; match msg { StoreAttempt { task_name, @@ -21,11 +21,19 @@ pub async fn start_redis_storage( attempt, } => { let tag = format!("{}_{}_{}", prefix, task_name, interval.end); - redis::cmd("PUSH") - .arg(&[&tag, &serde_json::to_string(&attempt).unwrap()]) - .query_async(&mut conn) - .await - .unwrap_or(()); + let payload = serde_json::to_string(&attempt).unwrap(); + conn.rpush(&tag, &payload).await?; + } + StoreState { state } => { + let tag = format!("{}_state", prefix); + let payload = serde_json::to_string(&state).unwrap(); + conn.set(&tag, &payload).await?; + } + LoadState { response } => { + let tag = format!("{}_state", prefix); + let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned()); + let is: ResourceInterval = serde_json::from_str(&payload).unwrap(); + response.send(is).unwrap(); } Stop {} => { break; @@ -36,10 +44,14 @@ pub async fn start_redis_storage( Ok(()) } -pub fn start(msgs: mpsc::UnboundedReceiver, url: String, prefix: String) { +pub fn start( + msgs: mpsc::UnboundedReceiver, + url: String, + prefix: String, +) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { start_redis_storage(msgs, url, prefix) .await .expect("Unable to start redis storage"); - }); + }) }