First pass at web interface is working

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-07 10:49:30 -03:00
parent 4d2a71d028
commit 6b49038db6
6 changed files with 145 additions and 14 deletions
+23 -7
View File
@@ -3,7 +3,8 @@
"HOME": "/tmp/world_test"
},
"calendars": {
"std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] }
"std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] },
"weekly": { "mask": [ "Fri" ] }
},
"tasks": {
"task_a": {
@@ -11,24 +12,39 @@
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_a" ],
"provides": [ "alpha" ],
"calendar_name": "std",
"times": [ "09:00:00", "12:00:00"],
"timezone": "America/New_York",
"valid_from": "2022-01-01T09:00:00",
"valid_to": "2023-01-08T09:00:00"
"valid_from": "2021-01-01T09:00:00",
"valid_to": "2022-06-01T09:00:00"
},
"task_a_new": {
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "alpha" ],
"calendar_name": "std",
"times": [ "09:00:00", "12:00:00"],
"timezone": "America/New_York",
"valid_from": "2022-06-01T09:00:00",
"valid_to": "2023-05-01T09:00:00"
},
"task_b": {
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_b" ],
"requires": [ { "resource": "task_a", "offset": 0 } ],
"provides": [ "beta" ],
"requires": [ { "resource": "alpha", "offset": 0 } ],
"calendar_name": "std",
"calendar_name": "weekly",
"times": [ "17:00:00" ],
"timezone": "America/New_York",
+46 -3
View File
@@ -1,6 +1,7 @@
use actix_cors::Cors;
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
use clap::Parser;
use log::*;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
@@ -126,22 +127,29 @@ async fn get_state(state: web::Data<AppState>) -> impl Responder {
]
*/
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct TimelineInterval {
time_range: [DateTime<Utc>; 2],
val: ActionState,
}
#[derive(Serialize)]
struct TimelineLabel {
label: String,
data: Vec<TimelineInterval>,
}
#[derive(Serialize)]
struct TimelineGroup {
group: String,
data: Vec<TimelineLabel>,
}
async fn timeline(span: web::Query<Interval>, state: web::Data<AppState>) -> impl Responder {
async fn get_detailed_timeline(
span: web::Json<Interval>,
state: web::Data<AppState>,
) -> impl Responder {
let interval = span.into_inner();
let (response, rx) = oneshot::channel();
@@ -151,7 +159,38 @@ async fn timeline(span: web::Query<Interval>, state: web::Data<AppState>) -> imp
.unwrap();
match rx.await {
Ok(world) => HttpResponse::Ok().json(world),
Ok(actions) => {
let mut timeline = Vec::new();
info!(
"Querying for actions over {}, got {} responses.",
interval,
actions.len()
);
for (resource, tasks) in actions {
let mut group = TimelineGroup {
group: resource.clone(),
data: Vec::new(),
};
for (task_name, intervals) in tasks.into_iter() {
let data = intervals
.into_iter()
.map(|a| TimelineInterval {
time_range: [a.interval.start, a.interval.end],
val: a.state,
})
.collect();
group.data.push(TimelineLabel {
label: task_name,
data,
});
}
timeline.push(group);
}
HttpResponse::Ok().json(timeline)
}
Err(error) => HttpResponse::BadRequest().json(SimpleError {
error: format!("{:?}", error),
}),
@@ -295,7 +334,11 @@ async fn main() -> std::io::Result<()> {
))
.app_data(json_config)
.route("/ready", web::get().to(ready))
.service(web::scope("/api/v1/").route("", web::get().to(get_state)))
.service(
web::scope("/api/v1")
.route("/state", web::get().to(get_state))
.route("/details", web::post().to(get_detailed_timeline)),
)
})
.bind(config.server.listen_spec())?
.run()
+20 -4
View File
@@ -24,8 +24,8 @@ pub enum ActionState {
#[derive(Debug, Clone, Copy, Serialize)]
pub struct Action {
task: usize,
interval: Interval,
state: ActionState,
pub interval: Interval,
pub state: ActionState,
// kill: Option<oneshot::Receiver<()>>,
}
@@ -273,7 +273,8 @@ impl Runner {
let res = rx.await.unwrap();
res
};
let target = current.clone();
// let target = current.clone();
let target = ResourceInterval::new();
let end_state = tasks.coverage();
let mut runner = Runner {
@@ -306,6 +307,15 @@ impl Runner {
.iter()
.enumerate()
.fold(Vec::new(), |mut acc, (idx, task)| {
let get_state = |intv: Interval| {
if task.provides.iter().all(|res| {
self.current.contains_key(res) && self.current[res].has_subset(intv)
}) {
ActionState::Completed
} else {
ActionState::Queued
}
};
let res: Vec<Action> = task
.generate_intervals(&new_required)
.unwrap()
@@ -314,7 +324,7 @@ impl Runner {
|interval| Action {
task: idx,
interval,
state: ActionState::Queued,
state: get_state(interval),
}
})
.collect();
@@ -393,6 +403,12 @@ impl Runner {
.cloned()
.collect();
info!(
"Filtered {} actions down to {}",
self.actions.len(),
actions.len()
);
for action in actions {
let task = &self.tasks[action.task];
for resource in &task.provides {
+1
View File
@@ -1,5 +1,6 @@
use super::*;
use crate::executors::TaskAttempt;
use crate::runner::ActionState;
/// Messages for interacting with an Executor
#[derive(Debug)]
+12
View File
@@ -39,6 +39,18 @@ pub async fn start_redis_storage(
let payload = serde_json::to_string(&attempt).unwrap();
conn.rpush(&tag, &payload).await?;
}
/*
SetTaskIntervalState {
task_name,
interval,
state,
} => {
let map = format!("{}:task_interval_states", prefix);
let key = format!("{}_{}-{}", task_name, interval.start, interval.end);
let value = serde_json::to_string(&state).unwrap();
conn.hset(&map, &key, &value).await?;
}
*/
StoreState { state } => {
let tag = format!("{}:state", prefix);
let payload = serde_json::to_string(&state).unwrap();
+43
View File
@@ -0,0 +1,43 @@
<html>
<script src="https://unpkg.com/timelines-chart"></script>
</html>
<body>
<div id="timeline"></div>
<script>
fetch("http://localhost:2503/api/v1/details",
{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: '{ "start": "2021-09-01T00:00:00Z", "end": "2022-10-01T00:00:00Z" }'
}
)
.then((response) => {
if (!response.ok) {
throw new Error('Network response was not OK');
}
return response.json();
})
.then((payload) => {
console.log(payload);
payload.map((group) => {
Object.values(group.data).map((label) => {
label.data.map((interval) => {
interval.timeRange = interval.timeRange.map((t) => new Date(t));
})
})
});
TimelinesChart()
(document.body)
.timeFormat("%Y-%m-%dT%H:%M:%S.%LZ") // ISO 8601 format
.zScaleLabel('State')
.zQualitative(true)
.useUtc(false)
.data(payload)
}
)
.catch(err => { throw err });
</script>
</body>
</html>