diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bef9a63..6418111 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,6 +4,7 @@ use crate::executors::TaskAttempt; /// Messages for interacting with an Executor #[derive(Debug)] pub enum StorageMessage { + Clear {}, StoreAttempt { task_name: String, interval: Interval, diff --git a/src/storage/noop.rs b/src/storage/noop.rs index cbc3aaf..0293b75 100644 --- a/src/storage/noop.rs +++ b/src/storage/noop.rs @@ -6,6 +6,9 @@ pub async fn start_storage(mut msgs: mpsc::UnboundedReceiver) -> while let Some(msg) = msgs.recv().await { use StorageMessage::*; match msg { + Clear {} => { + current_state = ResourceInterval::new(); + } StoreAttempt { .. } => {} StoreState { state } => { current_state = state; diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 814afbe..1a8dc71 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -17,22 +17,35 @@ pub async fn start_redis_storage( while let Some(msg) = msgs.recv().await { use StorageMessage::*; match msg { + Clear {} => { + let mut keys = Vec::new(); + { + let mut iter: redis::AsyncIter = + conn.scan_match(format!("{}:*", prefix)).await?; + while let Some(key) = iter.next_item().await { + keys.push(key); + } + } + for key in keys { + conn.del(key).await?; + } + } StoreAttempt { task_name, interval, attempt, } => { - let tag = format!("{}_{}_{}", prefix, task_name, interval.end); + let tag = format!("{}:{}_{}", prefix, task_name, interval.end); let payload = serde_json::to_string(&attempt).unwrap(); conn.rpush(&tag, &payload).await?; } StoreState { state } => { - let tag = format!("{}_state", prefix); + let tag = format!("{}:state", prefix); let payload = serde_json::to_string(&state).unwrap(); conn.set(&tag, &payload).await?; } LoadState { response } => { - let tag = format!("{}_state", prefix); + let tag = format!("{}:state", prefix); let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned()); let is: ResourceInterval = serde_json::from_str(&payload).unwrap(); response.send(is).unwrap();