diff --git a/examples/long_world.json b/examples/long_world.json index a27da8e..c3023b2 100644 --- a/examples/long_world.json +++ b/examples/long_world.json @@ -3,7 +3,8 @@ "HOME": "/tmp/world_test" }, "calendars": { - "std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] } + "std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] }, + "weekly": { "mask": [ "Fri" ] } }, "tasks": { "task_a": { @@ -11,24 +12,39 @@ "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, - "provides": [ "task_a" ], + "provides": [ "alpha" ], "calendar_name": "std", "times": [ "09:00:00", "12:00:00"], "timezone": "America/New_York", - "valid_from": "2022-01-01T09:00:00", - "valid_to": "2023-01-08T09:00:00" + "valid_from": "2021-01-01T09:00:00", + "valid_to": "2022-06-01T09:00:00" }, + "task_a_new": { + "up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + "check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, + + "provides": [ "alpha" ], + + "calendar_name": "std", + "times": [ "09:00:00", "12:00:00"], + "timezone": "America/New_York", + + "valid_from": "2022-06-01T09:00:00", + "valid_to": "2023-05-01T09:00:00" + }, + "task_b": { "up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, "down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, "check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } }, - "provides": [ "task_b" ], - "requires": [ { "resource": "task_a", "offset": 0 } ], + "provides": [ "beta" ], + "requires": [ { "resource": "alpha", "offset": 0 } ], - "calendar_name": "std", + "calendar_name": "weekly", "times": [ "17:00:00" ], "timezone": "America/New_York", diff --git a/src/bin/wfd/main.rs b/src/bin/wfd/main.rs index edf2b57..ac8f531 100644 --- a/src/bin/wfd/main.rs +++ b/src/bin/wfd/main.rs @@ -1,6 +1,7 @@ use actix_cors::Cors; use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder}; use clap::Parser; +use log::*; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; @@ -126,22 +127,29 @@ async fn get_state(state: web::Data) -> impl Responder { ] */ +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] struct TimelineInterval { time_range: [DateTime; 2], val: ActionState, } +#[derive(Serialize)] struct TimelineLabel { label: String, data: Vec, } +#[derive(Serialize)] struct TimelineGroup { group: String, data: Vec, } -async fn timeline(span: web::Query, state: web::Data) -> impl Responder { +async fn get_detailed_timeline( + span: web::Json, + state: web::Data, +) -> impl Responder { let interval = span.into_inner(); let (response, rx) = oneshot::channel(); @@ -151,7 +159,38 @@ async fn timeline(span: web::Query, state: web::Data) -> imp .unwrap(); match rx.await { - Ok(world) => HttpResponse::Ok().json(world), + Ok(actions) => { + let mut timeline = Vec::new(); + info!( + "Querying for actions over {}, got {} responses.", + interval, + actions.len() + ); + + for (resource, tasks) in actions { + let mut group = TimelineGroup { + group: resource.clone(), + data: Vec::new(), + }; + for (task_name, intervals) in tasks.into_iter() { + let data = intervals + .into_iter() + .map(|a| TimelineInterval { + time_range: [a.interval.start, a.interval.end], + val: a.state, + }) + .collect(); + + group.data.push(TimelineLabel { + label: task_name, + data, + }); + } + timeline.push(group); + } + + HttpResponse::Ok().json(timeline) + } Err(error) => HttpResponse::BadRequest().json(SimpleError { error: format!("{:?}", error), }), @@ -295,7 +334,11 @@ async fn main() -> std::io::Result<()> { )) .app_data(json_config) .route("/ready", web::get().to(ready)) - .service(web::scope("/api/v1/").route("", web::get().to(get_state))) + .service( + web::scope("/api/v1") + .route("/state", web::get().to(get_state)) + .route("/details", web::post().to(get_detailed_timeline)), + ) }) .bind(config.server.listen_spec())? .run() diff --git a/src/runner.rs b/src/runner.rs index 82d2b3c..b22cdf7 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -24,8 +24,8 @@ pub enum ActionState { #[derive(Debug, Clone, Copy, Serialize)] pub struct Action { task: usize, - interval: Interval, - state: ActionState, + pub interval: Interval, + pub state: ActionState, // kill: Option>, } @@ -273,7 +273,8 @@ impl Runner { let res = rx.await.unwrap(); res }; - let target = current.clone(); + // let target = current.clone(); + let target = ResourceInterval::new(); let end_state = tasks.coverage(); let mut runner = Runner { @@ -306,6 +307,15 @@ impl Runner { .iter() .enumerate() .fold(Vec::new(), |mut acc, (idx, task)| { + let get_state = |intv: Interval| { + if task.provides.iter().all(|res| { + self.current.contains_key(res) && self.current[res].has_subset(intv) + }) { + ActionState::Completed + } else { + ActionState::Queued + } + }; let res: Vec = task .generate_intervals(&new_required) .unwrap() @@ -314,7 +324,7 @@ impl Runner { |interval| Action { task: idx, interval, - state: ActionState::Queued, + state: get_state(interval), } }) .collect(); @@ -393,6 +403,12 @@ impl Runner { .cloned() .collect(); + info!( + "Filtered {} actions down to {}", + self.actions.len(), + actions.len() + ); + for action in actions { let task = &self.tasks[action.task]; for resource in &task.provides { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6418111..42e2c8c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,5 +1,6 @@ use super::*; use crate::executors::TaskAttempt; +use crate::runner::ActionState; /// Messages for interacting with an Executor #[derive(Debug)] diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 1a8dc71..fa0efa8 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -39,6 +39,18 @@ pub async fn start_redis_storage( let payload = serde_json::to_string(&attempt).unwrap(); conn.rpush(&tag, &payload).await?; } + /* + SetTaskIntervalState { + task_name, + interval, + state, + } => { + let map = format!("{}:task_interval_states", prefix); + let key = format!("{}_{}-{}", task_name, interval.start, interval.end); + let value = serde_json::to_string(&state).unwrap(); + conn.hset(&map, &key, &value).await?; + } + */ StoreState { state } => { let tag = format!("{}:state", prefix); let payload = serde_json::to_string(&state).unwrap(); diff --git a/webui/index.html b/webui/index.html new file mode 100644 index 0000000..4566ab1 --- /dev/null +++ b/webui/index.html @@ -0,0 +1,43 @@ + + + + +
+ + +