Checkpointing work to add JS endpoint

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-06 17:13:12 -03:00
parent f5ca3315f0
commit 6f5f890b1e
4 changed files with 145 additions and 34 deletions
+1 -1
View File
@@ -136,7 +136,7 @@ async fn main() -> std::io::Result<()> {
.await
.unwrap();
runner.run().await;
runner.run(false).await;
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
exe_handle.await.unwrap();
+20 -3
View File
@@ -93,8 +93,25 @@ async fn get_state(state: web::Data<AppState>) -> impl Responder {
let (response, rx) = oneshot::channel();
state
.storage_tx
.send(StorageMessage::LoadState { response })
.runner_tx
.send(RunnerMessage::GetState { response })
.unwrap();
match rx.await {
Ok(world) => HttpResponse::Ok().json(world),
Err(error) => HttpResponse::BadRequest().json(SimpleError {
error: format!("{:?}", error),
}),
}
}
async fn timeline(span: web::Query<Interval>, state: web::Data<AppState>) -> impl Responder {
let interval = span.into_inner();
let (response, rx) = oneshot::channel();
state
.runner_tx
.send(RunnerMessage::GetResourceStateDetails { interval, response })
.unwrap();
match rx.await {
@@ -191,7 +208,7 @@ async fn main() -> std::io::Result<()> {
.unwrap();
let runner_handle = tokio::spawn(async move {
runner.run().await;
runner.run(true).await;
});
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
+1
View File
@@ -1,5 +1,6 @@
pub use crate::calendar::Calendar;
pub use crate::executors::*;
pub use crate::interval::Interval;
pub use crate::runner::{Runner, RunnerMessage};
pub use crate::storage::*;
pub use crate::task::{TaskDefinition, TaskResources};
+123 -30
View File
@@ -30,8 +30,20 @@ pub struct Action {
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RunnerState {
coverage: ResourceInterval,
current: ResourceInterval,
}
// Eventually we want to coerce the data into this format for timelines-chart
// Resource (group) -> Task (label) -> data [ { "timeRange": [date,date], "val": state } ]
pub type ResourceStateDetails =
HashMap<Resource, HashMap<String, Vec<(DateTime<Utc>, DateTime<Utc>, ActionState)>>>;
#[derive(Debug)]
pub enum RunnerMessage {
Tick,
PollMessages,
ActionCompleted {
action_id: usize,
succeeded: bool,
@@ -50,6 +62,13 @@ pub enum RunnerMessage {
resources: HashSet<String>,
interval: Interval,
},
GetState {
response: oneshot::Sender<RunnerState>,
},
GetResourceStateDetails {
interval: Interval,
response: oneshot::Sender<ResourceStateDetails>,
},
Stop,
}
@@ -274,14 +293,13 @@ impl Runner {
storage,
};
runner.tick();
runner.queue_actions();
runner.update_target();
Ok(runner)
}
// Generate a new target state and generate any required actions
pub fn tick(&mut self) {
pub fn update_target(&mut self) {
let new_target = self.tasks.get_state(Utc::now() + Duration::days(1));
let new_required = new_target.difference(&self.target);
let mut new_actions =
@@ -310,35 +328,108 @@ impl Runner {
self.actions.extend(new_actions);
}
pub async fn run(&mut self) {
self.events
.push(delayed_event(Duration::seconds(1), RunnerMessage::Tick));
fn tick(&mut self) {
debug!("Tick");
// Enqueue new messages
while let Ok(msg) = self.messages.try_recv() {
self.events.push(delayed_event(Duration::seconds(0), msg));
}
match self.actions.last() {
Some(action) => {
if action.interval.end <= Utc::now() {
self.tick()
}
}
None => self.tick(),
}
// Perform maintenance
self.queue_actions();
self.events.push(delayed_event(
Duration::milliseconds(250),
RunnerMessage::Tick,
));
}
fn poll_messages(&mut self) {
while let Ok(msg) = self.messages.try_recv() {
self.events.push(delayed_event(Duration::seconds(0), msg));
}
self.events.push(delayed_event(
Duration::milliseconds(10),
RunnerMessage::PollMessages,
));
}
fn get_resource_state_details(
&self,
interval: Interval,
response: oneshot::Sender<ResourceStateDetails>,
) {
// HashMap<Resource, HashMap<String, Vec<(DateTime<Utc>, DateTime<Utc>, ActionState)>>>;
let mut res : ResourceStateDetails = HashMap::new();
let all_resources : HashSet<Resource> = self.tasks.iter().fold(HashSet::new(), |mut acc, t| {
acc.extend(t.provides.clone());
acc
});
// Build out the hash
for resource in all_resources {
let mut res_ints = HashMap::new();
for task in self.tasks.iter() {
if task.provides.contains(&resource) {
res_ints.insert(task.name.clone(), Vec::new());
}
}
res.insert(resource.clone(), res_ints);
}
let actions = self
.actions
.iter()
.filter(|x| interval.is_contiguous(x.interval))
.collect();
for action in actions {
let task = &self.tasks[action.task];
for resource in task.provides {
res.entry(resource.clone()).or_insert(HashMap::new()).entry(task.name).or_insert(Hash
}
}
for task in &self.tasks {
for resource in &task.provides {}
}
response.send(res);
}
pub async fn run(&mut self, mut stay_up: bool) {
self.tick();
self.poll_messages();
// Need to incorporate the ability to receive messages
//
// Loop until the current state matches the end state
while !self.is_done() {
while stay_up || !self.is_done() {
match self.events.next().await {
Some(Ok(RunnerMessage::GetState { response })) => {
response
.send(RunnerState {
current: self.current.clone(),
coverage: self.end_state.clone(),
})
.unwrap_or(());
}
Some(Ok(RunnerMessage::PollMessages)) => {
self.poll_messages();
}
Some(Ok(RunnerMessage::Tick)) => {
debug!("Tick");
// Enqueue new messages
while let Ok(msg) = self.messages.try_recv() {
self.events.push(delayed_event(Duration::seconds(0), msg));
}
match self.actions.last() {
Some(action) => {
if action.interval.end <= Utc::now() {
self.tick()
}
}
None => self.tick(),
}
// Perform maintenance
self.queue_actions();
self.events
.push(delayed_event(Duration::seconds(5), RunnerMessage::Tick));
self.tick();
}
Some(Ok(RunnerMessage::GetResourceStateDetails { interval, response })) => {
self.get_resource_state_details(interval, response);
}
Some(Ok(RunnerMessage::ForceUp {
resources,
@@ -386,6 +477,7 @@ impl Runner {
}
Some(Ok(RunnerMessage::Stop)) => {
info!("Stopping");
stay_up = false;
break;
}
Some(Ok(RunnerMessage::RetryAction { action_id })) => {
@@ -410,8 +502,8 @@ impl Runner {
fn complete_task(&mut self, action_id: usize, succeeded: bool) {
info!("Completing action {}", action_id);
let action = &mut self.actions[action_id];
if succeeded {
let action = &mut self.actions[action_id];
let task = self.tasks.get(action.task).unwrap();
action.state = ActionState::Completed;
for res in &task.provides {
@@ -423,6 +515,7 @@ impl Runner {
self.store_state();
self.queue_actions();
} else {
action.state = ActionState::Errored;
self.events.push(delayed_event(
Duration::seconds(30),
RunnerMessage::RetryAction { action_id },
@@ -567,7 +660,7 @@ mod tests {
.await
.unwrap();
runner.run().await;
runner.run(false).await;
tx.send(ExecutorMessage::Stop {}).unwrap();
executor.await.unwrap();