From ca906e75b3b7f86c69ef573eb55085557b681f99 Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Wed, 5 Oct 2022 08:01:17 -0300 Subject: [PATCH] wf cli tool running --- examples/config.json | 12 +++++ examples/world.json | 39 +++++++++++++++++ src/bin/wf/config.rs | 2 +- src/bin/wf/main.rs | 102 +++++++++++++++++++++++++++++++++++++++++-- src/runner.rs | 5 +-- src/storage/mod.rs | 2 + 6 files changed, 154 insertions(+), 8 deletions(-) create mode 100644 examples/config.json create mode 100644 examples/world.json diff --git a/examples/config.json b/examples/config.json new file mode 100644 index 0000000..2a233b3 --- /dev/null +++ b/examples/config.json @@ -0,0 +1,12 @@ +{ + "storage": { + "type": "redis", + "url": "redis://localhost", + "prefix": "world" + }, + "executor": { + "type": "local", + "workers": 10 + } +} + diff --git a/examples/world.json b/examples/world.json new file mode 100644 index 0000000..31ca5b1 --- /dev/null +++ b/examples/world.json @@ -0,0 +1,39 @@ +{ + "variables": { + "HOME": "/tmp/world_test" + }, + "calendars": { + "std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] } + }, + "tasks": { + "task_a": { + "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}" }, + "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}" }, + "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}" }, + + "provides": [ "task_a" ], + + "calendar_name": "std", + "times": [ "09:00:00", "12:00:00"], + "timezone": "America/New_York", + + "valid_from": "2022-01-01T09:00:00", + "valid_to": "2022-01-08T09:00:00" + }, + "task_b": { + "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}" }, + "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}" }, + "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}" }, + + "provides": [ "task_b" ], + "requires": [ { "resource": "task_a", "offset": 0 } ], + + "calendar_name": "std", + "times": [ "17:00:00" ], + "timezone": "America/New_York", + + "valid_from": "2022-01-04T09:00:00", + "valid_to": "2022-01-07T00:00:00" + } + } +} diff --git a/src/bin/wf/config.rs b/src/bin/wf/config.rs index 894fc12..848e0ff 100644 --- a/src/bin/wf/config.rs +++ b/src/bin/wf/config.rs @@ -60,7 +60,7 @@ pub struct GlobalConfigSpec { #[derive(Clone)] pub struct GlobalConfig { pub pools: HashMap>, - pub tracker: mpsc::UnboundedSender, + pub storage: mpsc::UnboundedSender, pub runner: mpsc::UnboundedSender, pub default_pool: String, pub spec: GlobalConfigSpec, diff --git a/src/bin/wf/main.rs b/src/bin/wf/main.rs index e1f550d..18ef9bb 100644 --- a/src/bin/wf/main.rs +++ b/src/bin/wf/main.rs @@ -1,7 +1,58 @@ use clap::Parser; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, oneshot}; use waterfall::prelude::*; +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] +enum StorageConfig { + Redis { url: String, prefix: String }, +} + +impl StorageConfig { + fn start( + &self, + ) -> ( + mpsc::UnboundedSender, + tokio::task::JoinHandle<()>, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + match self { + StorageConfig::Redis { url, prefix } => { + (tx, redis_store::start(rx, url.clone(), prefix.clone())) + } + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] +enum ExecutorConfig { + Local { workers: usize }, +} + +impl ExecutorConfig { + fn start( + &self, + ) -> ( + mpsc::UnboundedSender, + tokio::task::JoinHandle<()>, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + match self { + ExecutorConfig::Local { workers } => (tx, local_executor::start(*workers, rx)), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +struct Config { + storage: StorageConfig, + executor: ExecutorConfig, +} + #[derive(Parser, Debug)] #[clap(author, version, about)] struct Args { @@ -18,20 +69,63 @@ struct Args { verbose: bool, } +/* + Sample config + + { + "storage": { + "type": "redis", + "url": "redis://localhost", + "prefix": "world" + }, + "executor": { + "type": "local", + "workers": 10, + } + } +*/ + #[tokio::main] async fn main() -> std::io::Result<()> { let args = Args::parse(); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); // Parse the config - let json = std::fs::read_to_string(&args.config) + let world_json = std::fs::read_to_string(&args.world) .expect(&format!("Unable to open {} for reading", args.config)); - - // Some Deserializer. let world_def: WorldDefinition = - serde_json::from_str(&json).expect("Unable to parse world definition"); + serde_json::from_str(&world_json).expect("Unable to parse world definition"); + + // Parse the config + let config_json = std::fs::read_to_string(&args.config) + .expect(&format!("Unable to open {} for reading", args.config)); + let config: Config = + serde_json::from_str(&config_json).expect("Unable to parse config definition"); + + // Start the config + let (exe_tx, exe_handle) = config.executor.start(); + let (storage_tx, storage_handle) = config.storage.start(); let tasks = world_def.taskset().unwrap(); + let mut runner = Runner::new( + tasks, + world_def.variables, + exe_tx.clone(), + storage_tx.clone(), + world_def.output_options, + ) + .await + .unwrap(); + + let (wtx, wrx) = oneshot::channel(); + runner.run(wrx).await; + + exe_tx.send(ExecutorMessage::Stop {}).unwrap(); + exe_handle.await.unwrap(); + + storage_tx.send(StorageMessage::Stop {}).unwrap(); + storage_handle.await.unwrap(); + Ok(()) } diff --git a/src/runner.rs b/src/runner.rs index 932ab05..d1dff6c 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -251,8 +251,6 @@ impl Runner { // Create queue let required = target.difference(&self.current); - println!("CUR: {:?}", self.current); - println!("REQ: {:?}", required); self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| { let res: Vec = task .generate_intervals(&required) @@ -315,7 +313,8 @@ impl Runner { // We'll be using channels for running pub async fn run(&mut self, stop: oneshot::Receiver) { self.events.push(tokio::spawn(async move { - stop.await.expect("Unable to get stop"); + // This recv will fail if the channel is shutdown, so just ignore it. + stop.await.unwrap_or(RunnerEvent::Stop); RunnerEvent::Stop })); self.queue_actions(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 25d5e25..06ea17f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,11 +15,13 @@ pub enum StorageMessage { LoadState { response: oneshot::Sender, }, + /* GetAttempts { task_name: String, interval: Interval, response: oneshot::Sender, }, + */ Stop {}, }