From 838f2754cb1daf67b285af77bfcd50d58d019510 Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Thu, 29 Sep 2022 10:17:22 -0300 Subject: [PATCH] Adding in storage --- Cargo.toml | 1 + src/lib.rs | 3 +++ src/storage/mod.rs | 15 ++++++++++++ src/storage/redis_store.rs | 47 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+) create mode 100644 src/storage/mod.rs create mode 100644 src/storage/redis_store.rs diff --git a/Cargo.toml b/Cargo.toml index ebe9958..214f3e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,4 @@ tokio = { version = "1", features = ["full"] } users = { version = "0.11", optional = true } psutil = { version = "3.2", features = ["process"] } sysinfo = "0.23" +redis = { version = "*", features = ["aio", "tokio-comp"] } diff --git a/src/lib.rs b/src/lib.rs index 4cf2c6b..79945dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,12 @@ use std::collections::{HashMap, HashSet}; use tokio::sync::{mpsc, oneshot}; use crate::calendar::*; +use crate::executors::*; use crate::interval::*; use crate::interval_set::*; use crate::requirement::*; use crate::schedule::*; +use crate::storage::*; use crate::task::*; use crate::varmap::*; @@ -26,5 +28,6 @@ pub mod interval; pub mod interval_set; pub mod requirement; pub mod schedule; +pub mod storage; pub mod task; pub mod varmap; diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..222eaba --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,15 @@ +use super::*; +use crate::executors::TaskAttempt; + +/// Messages for interacting with an Executor +#[derive(Debug)] +pub enum StorageMessage { + StoreAttempt { + task_name: String, + interval: Interval, + attempt: TaskAttempt, + }, + Stop {}, +} + +pub mod redis_store; diff --git a/src/storage/redis_store.rs b/src/storage/redis_store.rs new file mode 100644 index 0000000..229e5a0 --- /dev/null +++ b/src/storage/redis_store.rs @@ -0,0 +1,47 @@ +use super::*; + +extern crate redis; + +use futures::prelude::*; +use redis::AsyncCommands; + +/// The mpsc channel can be sized to fit max parallelism +pub async fn start_redis_storage( + mut msgs: mpsc::UnboundedReceiver, + url: String, + prefix: String, +) -> Result<()> { + let client = redis::Client::open(url)?; + let mut conn = client.get_async_connection().await?; + + while let Some(msg) = msgs.recv().await { + use StorageMessage::{Stop, StoreAttempt}; + match msg { + StoreAttempt { + task_name, + interval, + attempt, + } => { + let tag = format!("{}_{}_{}", prefix, task_name, interval.end); + redis::cmd("PUSH") + .arg(&[&tag, &serde_json::to_string(&attempt).unwrap()]) + .query_async(&mut conn) + .await + .unwrap_or(()); + } + Stop {} => { + break; + } + } + } + + Ok(()) +} + +pub fn start(msgs: mpsc::UnboundedReceiver, url: String, prefix: String) { + tokio::spawn(async move { + start_redis_storage(msgs, url, prefix) + .await + .expect("Unable to start redis storage"); + }); +}