From f7d276e825c9a676549e7edfdf2fea7f283b5ac1 Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Tue, 4 Oct 2022 16:54:24 -0300 Subject: [PATCH] checkpointing work for the night --- src/bin/wf/config.rs | 137 +++++++++++++++++++++++++++++++++++++++++++ src/bin/wf/main.rs | 7 +-- src/lib.rs | 1 + src/prelude.rs | 6 ++ src/storage/mod.rs | 5 ++ 5 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 src/bin/wf/config.rs create mode 100644 src/prelude.rs diff --git a/src/bin/wf/config.rs b/src/bin/wf/config.rs new file mode 100644 index 0000000..894fc12 --- /dev/null +++ b/src/bin/wf/config.rs @@ -0,0 +1,137 @@ +pub use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt::Debug; +use sysinfo::{RefreshKind, System, SystemExt}; +use tokio::sync::mpsc; +use waterfall::prelude::*; + +fn default_workers() -> usize { + let system = System::new_with_specifics(RefreshKind::new().with_cpu()); + let workers = system.processors().len(); + if workers > 2 { + workers - 2 + } else { + workers + } +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(tag = "executor", rename_all = "lowercase")] +pub enum PoolConfig { + Local { + #[serde(default = "default_workers")] + workers: usize, + }, +} + +fn default_pools() -> HashMap { + HashMap::from([( + "default".to_owned(), + PoolConfig::Local { + workers: default_workers(), + }, + )]) +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(tag = "storage")] +pub enum StorageConfig { + Redis, +} + +impl Default for StorageConfig { + fn default() -> Self { + StorageConfig::Redis + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct GlobalConfigSpec { + #[serde(default = "default_pools")] + pub pools: HashMap, + + #[serde(default)] + pub tracker: StorageConfig, + + #[serde(default)] + pub default_pool: String, +} + +#[derive(Clone)] +pub struct GlobalConfig { + pub pools: HashMap>, + pub tracker: mpsc::UnboundedSender, + pub runner: mpsc::UnboundedSender, + pub default_pool: String, + pub spec: GlobalConfigSpec, +} + +impl GlobalConfig { + pub async fn new(spec: &GlobalConfigSpec) -> Self { + let mut pools = HashMap::new(); + + use PoolConfig::*; + for (pool, pool_spec) in spec.pools.iter() { + let (tx, rx) = mpsc::unbounded_channel(); + match pool_spec { + Local { workers } => { + local_executor::start(*workers, rx); + } + + Ssh { targets } => { + ssh_executor::start(targets.clone(), rx); + } + + Agent { targets } => { + agent_executor::start(targets.clone(), rx); + } + + #[cfg(feature = "slurm")] + Slurm { base_url } => { + slurm_executor::start(base_url.clone(), rx); + } + } + pools.insert(pool.clone(), tx); + } + + // Storage + let (tracker, trx) = mpsc::unbounded_channel(); + use StorageConfig::*; + match spec.tracker { + Memory => memory_tracker::start(trx), + } + + // Runner + let (runner, rrx) = mpsc::unbounded_channel(); + let rtx = runner.clone(); + runner::start(rtx, rrx); + + let default_pool = if spec.default_pool.is_empty() { + pools.keys().next().unwrap().clone() + } else { + spec.default_pool.clone() + }; + + GlobalConfig { + server: spec.server.clone(), + pools, + tracker, + runner, + default_pool, + spec: spec.clone(), + } + } + + pub fn listen_spec(&self) -> String { + format!("{}:{}", self.server.ip, self.server.port) + } +} + +impl Debug for GlobalConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GlobalConfig") + .field("spec", &self.spec) + .field("default_pool", &self.default_pool) + .finish() + } +} diff --git a/src/bin/wf/main.rs b/src/bin/wf/main.rs index b2784b9..e1f550d 100644 --- a/src/bin/wf/main.rs +++ b/src/bin/wf/main.rs @@ -1,10 +1,6 @@ use clap::Parser; -use serde::Serialize; -use waterfall::executors::local_executor; -use waterfall::runner::Runner; -use waterfall::storage::redis_store; -use waterfall::world::WorldDefinition; +use waterfall::prelude::*; #[derive(Parser, Debug)] #[clap(author, version, about)] @@ -27,6 +23,7 @@ async fn main() -> std::io::Result<()> { let args = Args::parse(); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + // Parse the config let json = std::fs::read_to_string(&args.config) .expect(&format!("Unable to open {} for reading", args.config)); diff --git a/src/lib.rs b/src/lib.rs index 738bb0a..9ad9023 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ pub mod calendar; pub mod executors; pub mod interval; pub mod interval_set; +pub mod prelude; pub mod requirement; pub mod resource_interval; pub mod runner; diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 0000000..907474b --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,6 @@ +pub use crate::calendar::Calendar; +pub use crate::executors::*; +pub use crate::runner::Runner; +pub use crate::storage::*; +pub use crate::task::TaskDefinition; +pub use crate::world::WorldDefinition; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2638461..25d5e25 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,6 +15,11 @@ pub enum StorageMessage { LoadState { response: oneshot::Sender, }, + GetAttempts { + task_name: String, + interval: Interval, + response: oneshot::Sender, + }, Stop {}, }