Adding in storage

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-09-29 10:17:22 -03:00
parent b436ea6da5
commit 838f2754cb
4 changed files with 66 additions and 0 deletions
+1
View File
@@ -17,3 +17,4 @@ tokio = { version = "1", features = ["full"] }
users = { version = "0.11", optional = true } users = { version = "0.11", optional = true }
psutil = { version = "3.2", features = ["process"] } psutil = { version = "3.2", features = ["process"] }
sysinfo = "0.23" sysinfo = "0.23"
redis = { version = "*", features = ["aio", "tokio-comp"] }
+3
View File
@@ -10,10 +10,12 @@ use std::collections::{HashMap, HashSet};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::calendar::*; use crate::calendar::*;
use crate::executors::*;
use crate::interval::*; use crate::interval::*;
use crate::interval_set::*; use crate::interval_set::*;
use crate::requirement::*; use crate::requirement::*;
use crate::schedule::*; use crate::schedule::*;
use crate::storage::*;
use crate::task::*; use crate::task::*;
use crate::varmap::*; use crate::varmap::*;
@@ -26,5 +28,6 @@ pub mod interval;
pub mod interval_set; pub mod interval_set;
pub mod requirement; pub mod requirement;
pub mod schedule; pub mod schedule;
pub mod storage;
pub mod task; pub mod task;
pub mod varmap; pub mod varmap;
+15
View File
@@ -0,0 +1,15 @@
use super::*;
use crate::executors::TaskAttempt;
/// Messages for interacting with an Executor
#[derive(Debug)]
pub enum StorageMessage {
StoreAttempt {
task_name: String,
interval: Interval,
attempt: TaskAttempt,
},
Stop {},
}
pub mod redis_store;
+47
View File
@@ -0,0 +1,47 @@
use super::*;
extern crate redis;
use futures::prelude::*;
use redis::AsyncCommands;
/// The mpsc channel can be sized to fit max parallelism
pub async fn start_redis_storage(
mut msgs: mpsc::UnboundedReceiver<StorageMessage>,
url: String,
prefix: String,
) -> Result<()> {
let client = redis::Client::open(url)?;
let mut conn = client.get_async_connection().await?;
while let Some(msg) = msgs.recv().await {
use StorageMessage::{Stop, StoreAttempt};
match msg {
StoreAttempt {
task_name,
interval,
attempt,
} => {
let tag = format!("{}_{}_{}", prefix, task_name, interval.end);
redis::cmd("PUSH")
.arg(&[&tag, &serde_json::to_string(&attempt).unwrap()])
.query_async(&mut conn)
.await
.unwrap_or(());
}
Stop {} => {
break;
}
}
}
Ok(())
}
pub fn start(msgs: mpsc::UnboundedReceiver<StorageMessage>, url: String, prefix: String) {
tokio::spawn(async move {
start_redis_storage(msgs, url, prefix)
.await
.expect("Unable to start redis storage");
});
}