wf cli tool running
This commit is contained in:
parent
f7d276e825
commit
ca906e75b3
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"storage": {
|
||||
"type": "redis",
|
||||
"url": "redis://localhost",
|
||||
"prefix": "world"
|
||||
},
|
||||
"executor": {
|
||||
"type": "local",
|
||||
"workers": 10
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
{
|
||||
"variables": {
|
||||
"HOME": "/tmp/world_test"
|
||||
},
|
||||
"calendars": {
|
||||
"std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] }
|
||||
},
|
||||
"tasks": {
|
||||
"task_a": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}" },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}" },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}" },
|
||||
|
||||
"provides": [ "task_a" ],
|
||||
|
||||
"calendar_name": "std",
|
||||
"times": [ "09:00:00", "12:00:00"],
|
||||
"timezone": "America/New_York",
|
||||
|
||||
"valid_from": "2022-01-01T09:00:00",
|
||||
"valid_to": "2022-01-08T09:00:00"
|
||||
},
|
||||
"task_b": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}" },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}" },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}" },
|
||||
|
||||
"provides": [ "task_b" ],
|
||||
"requires": [ { "resource": "task_a", "offset": 0 } ],
|
||||
|
||||
"calendar_name": "std",
|
||||
"times": [ "17:00:00" ],
|
||||
"timezone": "America/New_York",
|
||||
|
||||
"valid_from": "2022-01-04T09:00:00",
|
||||
"valid_to": "2022-01-07T00:00:00"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,7 +60,7 @@ pub struct GlobalConfigSpec {
|
||||
#[derive(Clone)]
|
||||
pub struct GlobalConfig {
|
||||
pub pools: HashMap<String, mpsc::UnboundedSender<ExecutorMessage>>,
|
||||
pub tracker: mpsc::UnboundedSender<StorageMessage>,
|
||||
pub storage: mpsc::UnboundedSender<StorageMessage>,
|
||||
pub runner: mpsc::UnboundedSender<RunnerMessage>,
|
||||
pub default_pool: String,
|
||||
pub spec: GlobalConfigSpec,
|
||||
|
||||
+98
-4
@@ -1,7 +1,58 @@
|
||||
use clap::Parser;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use waterfall::prelude::*;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
|
||||
enum StorageConfig {
|
||||
Redis { url: String, prefix: String },
|
||||
}
|
||||
|
||||
impl StorageConfig {
|
||||
fn start(
|
||||
&self,
|
||||
) -> (
|
||||
mpsc::UnboundedSender<StorageMessage>,
|
||||
tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
match self {
|
||||
StorageConfig::Redis { url, prefix } => {
|
||||
(tx, redis_store::start(rx, url.clone(), prefix.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
|
||||
enum ExecutorConfig {
|
||||
Local { workers: usize },
|
||||
}
|
||||
|
||||
impl ExecutorConfig {
|
||||
fn start(
|
||||
&self,
|
||||
) -> (
|
||||
mpsc::UnboundedSender<ExecutorMessage>,
|
||||
tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
match self {
|
||||
ExecutorConfig::Local { workers } => (tx, local_executor::start(*workers, rx)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct Config {
|
||||
storage: StorageConfig,
|
||||
executor: ExecutorConfig,
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about)]
|
||||
struct Args {
|
||||
@@ -18,20 +69,63 @@ struct Args {
|
||||
verbose: bool,
|
||||
}
|
||||
|
||||
/*
|
||||
Sample config
|
||||
|
||||
{
|
||||
"storage": {
|
||||
"type": "redis",
|
||||
"url": "redis://localhost",
|
||||
"prefix": "world"
|
||||
},
|
||||
"executor": {
|
||||
"type": "local",
|
||||
"workers": 10,
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let args = Args::parse();
|
||||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||
|
||||
// Parse the config
|
||||
let json = std::fs::read_to_string(&args.config)
|
||||
let world_json = std::fs::read_to_string(&args.world)
|
||||
.expect(&format!("Unable to open {} for reading", args.config));
|
||||
|
||||
// Some Deserializer.
|
||||
let world_def: WorldDefinition =
|
||||
serde_json::from_str(&json).expect("Unable to parse world definition");
|
||||
serde_json::from_str(&world_json).expect("Unable to parse world definition");
|
||||
|
||||
// Parse the config
|
||||
let config_json = std::fs::read_to_string(&args.config)
|
||||
.expect(&format!("Unable to open {} for reading", args.config));
|
||||
let config: Config =
|
||||
serde_json::from_str(&config_json).expect("Unable to parse config definition");
|
||||
|
||||
// Start the config
|
||||
let (exe_tx, exe_handle) = config.executor.start();
|
||||
let (storage_tx, storage_handle) = config.storage.start();
|
||||
|
||||
let tasks = world_def.taskset().unwrap();
|
||||
|
||||
let mut runner = Runner::new(
|
||||
tasks,
|
||||
world_def.variables,
|
||||
exe_tx.clone(),
|
||||
storage_tx.clone(),
|
||||
world_def.output_options,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (wtx, wrx) = oneshot::channel();
|
||||
runner.run(wrx).await;
|
||||
|
||||
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
|
||||
exe_handle.await.unwrap();
|
||||
|
||||
storage_tx.send(StorageMessage::Stop {}).unwrap();
|
||||
storage_handle.await.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+2
-3
@@ -251,8 +251,6 @@ impl Runner {
|
||||
|
||||
// Create queue
|
||||
let required = target.difference(&self.current);
|
||||
println!("CUR: {:?}", self.current);
|
||||
println!("REQ: {:?}", required);
|
||||
self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| {
|
||||
let res: Vec<Action> = task
|
||||
.generate_intervals(&required)
|
||||
@@ -315,7 +313,8 @@ impl Runner {
|
||||
// We'll be using channels for running
|
||||
pub async fn run(&mut self, stop: oneshot::Receiver<RunnerEvent>) {
|
||||
self.events.push(tokio::spawn(async move {
|
||||
stop.await.expect("Unable to get stop");
|
||||
// This recv will fail if the channel is shutdown, so just ignore it.
|
||||
stop.await.unwrap_or(RunnerEvent::Stop);
|
||||
RunnerEvent::Stop
|
||||
}));
|
||||
self.queue_actions();
|
||||
|
||||
@@ -15,11 +15,13 @@ pub enum StorageMessage {
|
||||
LoadState {
|
||||
response: oneshot::Sender<ResourceInterval>,
|
||||
},
|
||||
/*
|
||||
GetAttempts {
|
||||
task_name: String,
|
||||
interval: Interval,
|
||||
response: oneshot::Sender<TaskAttempt>,
|
||||
},
|
||||
*/
|
||||
Stop {},
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user