Adding a Clear option for storage

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-06 15:12:36 -03:00
parent 5d4ff2bb8b
commit eb590c848e
3 changed files with 20 additions and 3 deletions
+1
View File
@@ -4,6 +4,7 @@ use crate::executors::TaskAttempt;
/// Messages for interacting with an Executor /// Messages for interacting with an Executor
#[derive(Debug)] #[derive(Debug)]
pub enum StorageMessage { pub enum StorageMessage {
Clear {},
StoreAttempt { StoreAttempt {
task_name: String, task_name: String,
interval: Interval, interval: Interval,
+3
View File
@@ -6,6 +6,9 @@ pub async fn start_storage(mut msgs: mpsc::UnboundedReceiver<StorageMessage>) ->
while let Some(msg) = msgs.recv().await { while let Some(msg) = msgs.recv().await {
use StorageMessage::*; use StorageMessage::*;
match msg { match msg {
Clear {} => {
current_state = ResourceInterval::new();
}
StoreAttempt { .. } => {} StoreAttempt { .. } => {}
StoreState { state } => { StoreState { state } => {
current_state = state; current_state = state;
+16 -3
View File
@@ -17,22 +17,35 @@ pub async fn start_redis_storage(
while let Some(msg) = msgs.recv().await { while let Some(msg) = msgs.recv().await {
use StorageMessage::*; use StorageMessage::*;
match msg { match msg {
Clear {} => {
let mut keys = Vec::new();
{
let mut iter: redis::AsyncIter<String> =
conn.scan_match(format!("{}:*", prefix)).await?;
while let Some(key) = iter.next_item().await {
keys.push(key);
}
}
for key in keys {
conn.del(key).await?;
}
}
StoreAttempt { StoreAttempt {
task_name, task_name,
interval, interval,
attempt, attempt,
} => { } => {
let tag = format!("{}_{}_{}", prefix, task_name, interval.end); let tag = format!("{}:{}_{}", prefix, task_name, interval.end);
let payload = serde_json::to_string(&attempt).unwrap(); let payload = serde_json::to_string(&attempt).unwrap();
conn.rpush(&tag, &payload).await?; conn.rpush(&tag, &payload).await?;
} }
StoreState { state } => { StoreState { state } => {
let tag = format!("{}_state", prefix); let tag = format!("{}:state", prefix);
let payload = serde_json::to_string(&state).unwrap(); let payload = serde_json::to_string(&state).unwrap();
conn.set(&tag, &payload).await?; conn.set(&tag, &payload).await?;
} }
LoadState { response } => { LoadState { response } => {
let tag = format!("{}_state", prefix); let tag = format!("{}:state", prefix);
let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned()); let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned());
let is: ResourceInterval = serde_json::from_str(&payload).unwrap(); let is: ResourceInterval = serde_json::from_str(&payload).unwrap();
response.send(is).unwrap(); response.send(is).unwrap();