Adding in redis storage

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-04 15:13:24 -03:00
parent fb1b6bc807
commit 08fcab41d3
6 changed files with 123 additions and 36 deletions
+17 -3
View File
@@ -255,9 +255,12 @@ pub async fn start_local_executor(
});
}
ExecuteTask {
task_name,
interval,
details,
varmap,
output_options,
storage,
response,
kill,
} => {
@@ -274,7 +277,15 @@ pub async fn start_local_executor(
..TaskAttempt::new()
},
};
response.send(attempt.succeeded).unwrap();
let rc = attempt.succeeded;
storage
.send(StorageMessage::StoreAttempt {
task_name,
interval,
attempt,
})
.unwrap();
response.send(rc).unwrap();
}));
}
Stop {} => {
@@ -284,8 +295,11 @@ pub async fn start_local_executor(
}
}
pub fn start(max_parallel: usize, msgs: mpsc::UnboundedReceiver<ExecutorMessage>) {
pub fn start(
max_parallel: usize,
msgs: mpsc::UnboundedReceiver<ExecutorMessage>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
start_local_executor(max_parallel, msgs).await;
});
})
}
+3
View File
@@ -16,9 +16,12 @@ pub enum ExecutorMessage {
/// Errors
/// Will return `Err` if the tasks are invalid, according to the executor
ExecuteTask {
task_name: String,
interval: Interval,
details: serde_json::Value,
varmap: VarMap,
output_options: TaskOutputOptions,
storage: mpsc::UnboundedSender<StorageMessage>,
response: oneshot::Sender<bool>,
kill: oneshot::Receiver<()>,
},
+1 -1
View File
@@ -5,7 +5,7 @@ use std::ops::{Add, Deref, DerefMut, Sub};
/// represent where a resource is available, or where it's required
/// Resources are independent, so overlaps between the
/// interval sets are possible.
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct ResourceInterval(HashMap<Resource, IntervalSet>);
impl ResourceInterval {
+76 -24
View File
@@ -29,7 +29,7 @@ pub struct Action {
}
#[derive(Debug, Serialize, Deserialize)]
pub enum WorldEvent {
pub enum RunnerEvent {
Start,
TaskFailed {
task_name: String,
@@ -57,16 +57,17 @@ pub struct Runner {
queue: Vec<Action>,
qidx: usize,
events: FuturesUnordered<tokio::task::JoinHandle<WorldEvent>>,
events: FuturesUnordered<tokio::task::JoinHandle<RunnerEvent>>,
last_horizon: DateTime<Utc>,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
}
fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<WorldEvent> {
fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<RunnerEvent> {
tokio::spawn(async move {
tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await;
WorldEvent::Timeout
RunnerEvent::Timeout
})
}
@@ -85,20 +86,27 @@ async fn validate_cmd(
}
async fn run_task(
task_name: String,
interval: Interval,
details: serde_json::Value,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
kill: oneshot::Receiver<()>,
output_options: &TaskOutputOptions,
varmap: &VarMap,
) -> bool {
println!("Running {}/{}", task_name, interval);
let (response, response_rx) = oneshot::channel();
executor
.send(ExecutorMessage::ExecuteTask {
task_name,
interval,
details,
output_options: output_options.clone(),
varmap: varmap.clone(),
response,
kill,
storage,
})
.unwrap();
response_rx.await.unwrap()
@@ -113,12 +121,16 @@ async fn up_task(
check: Option<TaskDetails>,
output_options: TaskOutputOptions,
executor: mpsc::UnboundedSender<ExecutorMessage>,
) -> WorldEvent {
storage: mpsc::UnboundedSender<StorageMessage>,
) -> RunnerEvent {
if let Some(check_cmd) = check.clone() {
let (subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task(
task_name.clone(),
interval,
check_cmd.clone(),
executor.clone(),
storage.clone(),
subkill_rx,
&output_options,
&varmap,
@@ -127,7 +139,7 @@ async fn up_task(
// If check succeeded, resources are up
if succeeded {
return WorldEvent::TaskCompleted {
return RunnerEvent::TaskCompleted {
task_name,
interval,
};
@@ -136,9 +148,19 @@ async fn up_task(
// UP
let (subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task(up, executor.clone(), subkill_rx, &output_options, &varmap).await;
let succeeded = run_task(
task_name.clone(),
interval,
up,
executor.clone(),
storage.clone(),
subkill_rx,
&output_options,
&varmap,
)
.await;
if !succeeded {
return WorldEvent::TaskFailed {
return RunnerEvent::TaskFailed {
task_name,
interval,
};
@@ -148,8 +170,11 @@ async fn up_task(
if let Some(check_cmd) = check {
let (subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task(
task_name.clone(),
interval,
check_cmd.clone(),
executor.clone(),
storage.clone(),
subkill_rx,
&output_options,
&varmap,
@@ -158,18 +183,18 @@ async fn up_task(
// If check succeeded, resources are up
if succeeded {
WorldEvent::TaskCompleted {
RunnerEvent::TaskCompleted {
task_name,
interval,
}
} else {
WorldEvent::TaskFailed {
RunnerEvent::TaskFailed {
task_name,
interval,
}
}
} else {
WorldEvent::TaskCompleted {
RunnerEvent::TaskCompleted {
task_name,
interval,
}
@@ -181,6 +206,7 @@ impl Runner {
tasks: TaskSet,
vars: VarMap,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
output_options: TaskOutputOptions,
) -> Result<Self> {
for tdef in tasks.values() {
@@ -193,6 +219,12 @@ impl Runner {
}
}
let (response, rx) = oneshot::channel();
storage
.send(StorageMessage::LoadState { response })
.unwrap();
let current = rx.await.unwrap();
let end_state = tasks.coverage()?;
let mut runner = Runner {
tasks,
@@ -200,12 +232,13 @@ impl Runner {
output_options,
end_state,
target: ResourceInterval::new(),
current: ResourceInterval::new(),
current,
queue: Vec::new(),
qidx: 0,
events: FuturesUnordered::new(),
last_horizon: DateTime::<Utc>::MIN_UTC,
executor,
storage,
};
runner.tick()?;
@@ -218,6 +251,8 @@ impl Runner {
// Create queue
let required = target.difference(&self.current);
println!("CUR: {:?}", self.current);
println!("REQ: {:?}", required);
self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| {
let res: Vec<Action> = task
.generate_intervals(&required)
@@ -247,7 +282,6 @@ impl Runner {
.can_be_satisfied(act.interval, &target)
})
.fold(HashSet::new(), |mut acc, a| {
println!("Task cannot be satisfied: {:?}", a);
acc.insert(a.task.clone());
acc
});
@@ -279,39 +313,37 @@ impl Runner {
}
// We'll be using channels for running
pub async fn run(&mut self, stop: oneshot::Receiver<WorldEvent>) {
pub async fn run(&mut self, stop: oneshot::Receiver<RunnerEvent>) {
self.events.push(tokio::spawn(async move {
stop.await.expect("Unable to get stop");
WorldEvent::Stop
RunnerEvent::Stop
}));
self.queue_actions();
// Loop while we can make progress
while !self.is_done() {
match self.events.next().await {
Some(Ok(WorldEvent::Start)) => {
println!("START");
Some(Ok(RunnerEvent::Start)) => {
self.queue_actions();
}
Some(Ok(WorldEvent::Stop)) => {
println!("Stop");
Some(Ok(RunnerEvent::Stop)) => {
break;
}
Some(Ok(WorldEvent::Timeout)) => {
println!("Timeout");
Some(Ok(RunnerEvent::Timeout)) => {
self.queue_actions();
}
Some(Ok(WorldEvent::TaskFailed {
Some(Ok(RunnerEvent::TaskFailed {
task_name,
interval,
})) => {
println!("FAILED: {} / {}", task_name, interval);
println!("Well that sucks");
}
Some(Ok(WorldEvent::TaskCompleted {
Some(Ok(RunnerEvent::TaskCompleted {
task_name,
interval,
})) => {
println!("Completing {}/{}", task_name, interval);
let action = self
.queue
.iter_mut()
@@ -325,6 +357,11 @@ impl Runner {
.or_insert(IntervalSet::new())
.insert(action.interval);
}
self.storage
.send(StorageMessage::StoreState {
state: self.current.clone(),
})
.unwrap();
self.queue_actions();
}
Some(Err(e)) => {
@@ -363,6 +400,7 @@ impl Runner {
let check = task.check.clone();
let output_options = self.output_options.clone();
let exe = self.executor.clone();
let storage = self.storage.clone();
self.events.push(tokio::spawn(async move {
up_task(
task_name.clone(),
@@ -373,6 +411,7 @@ impl Runner {
check,
output_options,
exe,
storage,
)
.await
}));
@@ -441,12 +480,21 @@ mod tests {
// Executor
let (tx, rx) = mpsc::unbounded_channel();
local_executor::start(10, rx);
let executor = local_executor::start(10, rx);
// Storage
let (storage_tx, storage_rx) = mpsc::unbounded_channel();
let storage = redis_store::start(
storage_rx,
"redis://localhost".to_owned(),
"world_test".to_owned(),
);
let mut runner = Runner::new(
tasks,
world_def.variables,
tx.clone(),
storage_tx.clone(),
world_def.output_options,
)
.await
@@ -456,6 +504,10 @@ mod tests {
runner.run(wrx).await;
tx.send(ExecutorMessage::Stop {}).unwrap();
executor.await.unwrap();
storage_tx.send(StorageMessage::Stop {}).unwrap();
storage.await.unwrap();
assert_eq!(1, 1);
}
+6
View File
@@ -9,6 +9,12 @@ pub enum StorageMessage {
interval: Interval,
attempt: TaskAttempt,
},
StoreState {
state: ResourceInterval,
},
LoadState {
response: oneshot::Sender<ResourceInterval>,
},
Stop {},
}
+20 -8
View File
@@ -13,7 +13,7 @@ pub async fn start_redis_storage(
let mut conn = client.get_async_connection().await?;
while let Some(msg) = msgs.recv().await {
use StorageMessage::{Stop, StoreAttempt};
use StorageMessage::*;
match msg {
StoreAttempt {
task_name,
@@ -21,11 +21,19 @@ pub async fn start_redis_storage(
attempt,
} => {
let tag = format!("{}_{}_{}", prefix, task_name, interval.end);
redis::cmd("PUSH")
.arg(&[&tag, &serde_json::to_string(&attempt).unwrap()])
.query_async(&mut conn)
.await
.unwrap_or(());
let payload = serde_json::to_string(&attempt).unwrap();
conn.rpush(&tag, &payload).await?;
}
StoreState { state } => {
let tag = format!("{}_state", prefix);
let payload = serde_json::to_string(&state).unwrap();
conn.set(&tag, &payload).await?;
}
LoadState { response } => {
let tag = format!("{}_state", prefix);
let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned());
let is: ResourceInterval = serde_json::from_str(&payload).unwrap();
response.send(is).unwrap();
}
Stop {} => {
break;
@@ -36,10 +44,14 @@ pub async fn start_redis_storage(
Ok(())
}
pub fn start(msgs: mpsc::UnboundedReceiver<StorageMessage>, url: String, prefix: String) {
pub fn start(
msgs: mpsc::UnboundedReceiver<StorageMessage>,
url: String,
prefix: String,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
start_redis_storage(msgs, url, prefix)
.await
.expect("Unable to start redis storage");
});
})
}