Adding more examples, fixing an issue in the generation of state at time T

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-05 17:23:53 -03:00
parent 1201e93169
commit d6ced6db50
10 changed files with 128 additions and 14 deletions
+25
View File
@@ -0,0 +1,25 @@
{
"storage": {
"type": "redis",
"url": "redis://localhost",
"prefix": "world"
},
"executor": {
"type": "agent",
"targets": [
{
"base_url": "http://localhost:2504/api/v1",
"resources": { "cores": 1 }
},
{
"base_url": "http://localhost:2505/api/v1",
"resources": { "cores": 1 }
},
{
"base_url": "http://localhost:2506/api/v1",
"resources": { "cores": 1 }
}
]
}
}
+39
View File
@@ -0,0 +1,39 @@
{
"variables": {
"HOME": "/tmp/world_test"
},
"calendars": {
"std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] }
},
"tasks": {
"task_a": {
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_a" ],
"calendar_name": "std",
"times": [ "09:00:00", "12:00:00"],
"timezone": "America/New_York",
"valid_from": "2022-01-01T09:00:00",
"valid_to": "2023-01-08T09:00:00"
},
"task_b": {
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_b" ],
"requires": [ { "resource": "task_a", "offset": 0 } ],
"calendar_name": "std",
"times": [ "17:00:00" ],
"timezone": "America/New_York",
"valid_from": "2022-01-04T09:00:00",
"valid_to": "2023-01-07T00:00:00"
}
}
}
+3
View File
@@ -1,5 +1,6 @@
use clap::Parser;
use log::*;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use waterfall;
@@ -120,6 +121,8 @@ async fn main() -> std::io::Result<()> {
let tasks = world_def.taskset().unwrap();
debug!("Config: {:?}", args);
let mut runner = Runner::new(
tasks,
world_def.variables,
+23 -1
View File
@@ -89,6 +89,14 @@ struct Args {
/// Enable verbose logging
#[clap(short, long)]
verbose: bool,
/// Configuration File
#[clap(short, long)]
host: Option<String>,
/// Configuration File
#[clap(short, long)]
port: Option<u32>,
}
#[actix_web::main]
@@ -98,6 +106,20 @@ async fn main() -> std::io::Result<()> {
let data = web::Data::new(init(args.config.as_ref()));
let config = data.clone();
let host = if let Some(h) = args.host {
h
} else {
config.ip.clone()
};
let port = if let Some(p) = args.port {
p
} else {
config.port
};
let listen_spec = format!("{}:{}", host, port);
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let res = HttpServer::new(move || {
let cors = Cors::default()
@@ -152,7 +174,7 @@ async fn main() -> std::io::Result<()> {
.route("/run", web::post().to(submit_task)),
)
})
.bind(config.listen_spec())?
.bind(listen_spec)?
.run()
.await;
+1 -1
View File
@@ -229,7 +229,7 @@ async fn start_agent_executor(
None => {
// Give the outstanding tasks a chance to complete or agents
// recover
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
// Refresh any disabled targets
for (tid, target) in targets.iter_mut().enumerate() {
+2
View File
@@ -97,6 +97,8 @@ async fn run_task(
let (program, args) = cmd.split_first().unwrap();
attempt.executor.push(format!("{:?}\n", details));
debug!("Running command {:?}", cmd);
let mut command = Command::new(program);
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
+1 -1
View File
@@ -5,7 +5,7 @@ use anyhow::{anyhow, Result};
use chrono::prelude::*;
use chrono::{Duration, TimeZone};
use chrono_tz::Tz;
use log::{error, info, warn};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use tokio::sync::{mpsc, oneshot};
+21 -7
View File
@@ -74,9 +74,9 @@ pub struct Runner {
storage: mpsc::UnboundedSender<StorageMessage>,
}
fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<RunnerMessage> {
fn gen_timeout(duration: Duration) -> tokio::task::JoinHandle<RunnerMessage> {
tokio::spawn(async move {
tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await;
tokio::time::sleep(duration.to_std().unwrap()).await;
RunnerMessage::Timeout
})
}
@@ -237,13 +237,16 @@ impl Runner {
}
let current = if force_check {
info!("Force re-check set, starting with empty current state.");
ResourceInterval::new()
} else {
info!("Pulling last state from storage");
let (response, rx) = oneshot::channel();
storage
.send(StorageMessage::LoadState { response })
.unwrap();
rx.await.unwrap()
} else {
ResourceInterval::new()
let res = rx.await.unwrap();
res
};
let end_state = tasks.coverage()?;
@@ -317,7 +320,7 @@ impl Runner {
}
if result_state != target {
return Err(anyhow!(
"Actions generated produce\n\t{:?}\nExpected\n\t{:?}",
"Actions generated produced\n\t{:?}\nExpected\n\t{:?}",
result_state,
target
));
@@ -348,6 +351,17 @@ impl Runner {
.iter()
.all(|action| action.state == ActionState::Completed)
{
let now = Utc::now();
let next_time = self
.tasks
.values()
.map(|t| t.schedule.next_time(now))
.min()
.unwrap()
.with_timezone(&Utc);
let sleep_duration = next_time - now;
info!("Sleeping for {} until next task", sleep_duration);
self.events.push(gen_timeout(sleep_duration));
self.tick().unwrap();
}
match self.events.next().await {
@@ -371,7 +385,6 @@ impl Runner {
task_name,
interval,
})) => {
println!("Completing {}/{}", task_name, interval);
let action = self
.queue
.iter_mut()
@@ -385,6 +398,7 @@ impl Runner {
.or_insert(IntervalSet::new())
.insert(action.interval);
}
info!("Updating State");
self.storage
.send(StorageMessage::StoreState {
state: self.current.clone(),
+2 -2
View File
@@ -82,7 +82,7 @@ impl Schedule {
)
}
pub fn next_time(&self, dt: DateTime<Tz>) -> DateTime<Tz> {
pub fn next_time<T: TimeZone>(&self, dt: DateTime<T>) -> DateTime<Tz> {
let st = dt.with_timezone(&self.timezone);
let mut date = st.date().naive_local();
@@ -108,7 +108,7 @@ impl Schedule {
}
/// Given a time, generate the preceding interval according to the schedule
pub fn prev_time(&self, dt: DateTime<Tz>) -> DateTime<Tz> {
pub fn prev_time<T: TimeZone>(&self, dt: DateTime<T>) -> DateTime<Tz> {
let st = dt.with_timezone(&self.timezone);
let mut date = st.date().naive_local();
+11 -2
View File
@@ -22,10 +22,19 @@ impl TaskSet {
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> Result<ResourceInterval> {
let mut res = ResourceInterval::new();
let timeline = IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)));
// Insert all of the covered items
for task in self.values() {
// TODO Need to align each of these intervals with a scheduled time
let timeline = if time < MAX_TIME {
let cur_intv = task.schedule.interval(time.clone(), 0);
if cur_intv.end > time {
IntervalSet::from(Interval::new(MIN_TIME, cur_intv.start))
} else {
IntervalSet::from(Interval::new(MIN_TIME, cur_intv.end))
}
} else {
IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)))
};
let task_timeline = task.valid_over.intersection(&timeline);
for resource in &task.provides {
let ris = res.entry(resource.clone()).or_insert(IntervalSet::new());