Adding tests and ensuring that left-open intervals are consistent

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-09-29 08:55:01 -03:00
parent 811057ecad
commit b57aa7b858
3 changed files with 119 additions and 182 deletions
+1 -1
View File
@@ -14,7 +14,7 @@ use crate::interval::*;
use crate::interval_set::*;
use crate::requirement::*;
use crate::schedule::*;
use crate::task::*;
// use crate::task::*;
pub type Resource = String;
pub type TaskDetails = serde_json::Value;
+69 -18
View File
@@ -21,29 +21,38 @@ impl Schedule {
}
}
pub fn generate(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<DateTime<Utc>> {
let st = start.with_timezone(&self.timezone);
let et = end.with_timezone(&self.timezone);
pub fn generate(&self, interval: Interval) -> Vec<Interval> {
if self.times.is_empty() {
return Vec::new();
}
let mut date = st.date().naive_local();
let end_date = et.date().succ().naive_local();
let st = interval.start.with_timezone(&self.timezone);
let et = interval.end.with_timezone(&self.timezone);
let mut date = self.calendar.prev(st.date().naive_local());
let end_date = self.calendar.next(et.date().succ().naive_local());
let mut times = Vec::new();
let mut prev_time = self
.timezone
.from_local_datetime(&date.and_time(self.times[0]))
.unwrap()
.with_timezone(&Utc);
while date < end_date {
if self.calendar.includes(date) {
for time in &self.times {
let dt = self
.timezone
.from_local_datetime(&date.and_time(*time))
.unwrap();
if dt > start && dt <= end {
times.push(dt.with_timezone(&Utc));
} else if end < dt {
.unwrap()
.with_timezone(&Utc);
if dt > interval.start && dt <= interval.end {
times.push(Interval::new(prev_time, dt));
} else if interval.end < dt {
break;
}
prev_time = dt;
}
}
date = date.succ();
date = self.calendar.next(date);
}
times
@@ -164,7 +173,7 @@ mod tests {
};
// Simple generation
let times = sched.generate(
let times = sched.generate(Interval::new(
timezone
.ymd(2022, 1, 3)
.and_hms(11, 0, 0)
@@ -173,20 +182,26 @@ mod tests {
.ymd(2022, 1, 3)
.and_hms(12, 0, 0)
.with_timezone(&Utc),
);
));
assert_eq!(times.len(), 1);
assert_eq!(
times,
vec![timezone
vec![Interval::new(
timezone
.ymd(2022, 1, 3)
.and_hms(10, 30, 0)
.with_timezone(&Utc),
timezone
.ymd(2022, 1, 3)
.and_hms(11, 30, 0)
.with_timezone(&Utc),]
.with_timezone(&Utc),
)]
);
// Generating scheduled times over a timerange
assert_eq!(
sched.generate(
sched.generate(Interval::new(
timezone
.ymd(2021, 12, 31)
.and_hms(0, 0, 0)
@@ -195,8 +210,19 @@ mod tests {
.ymd(2022, 1, 5)
.and_hms(0, 0, 0)
.with_timezone(&Utc),
),
)),
vec![
Interval::new(
timezone
.ymd(2021, 12, 30)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
timezone
.ymd(2021, 12, 31)
.and_hms(10, 30, 0)
.with_timezone(&Utc),
),
Interval::new(
timezone
.ymd(2021, 12, 31)
.and_hms(10, 30, 0)
@@ -205,6 +231,18 @@ mod tests {
.ymd(2021, 12, 31)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
),
Interval::new(
timezone
.ymd(2021, 12, 31)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
timezone
.ymd(2022, 1, 3)
.and_hms(10, 30, 0)
.with_timezone(&Utc),
),
Interval::new(
timezone
.ymd(2022, 1, 3)
.and_hms(10, 30, 0)
@@ -213,6 +251,18 @@ mod tests {
.ymd(2022, 1, 3)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
),
Interval::new(
timezone
.ymd(2022, 1, 3)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
timezone
.ymd(2022, 1, 4)
.and_hms(10, 30, 0)
.with_timezone(&Utc),
),
Interval::new(
timezone
.ymd(2022, 1, 4)
.and_hms(10, 30, 0)
@@ -221,6 +271,7 @@ mod tests {
.ymd(2022, 1, 4)
.and_hms(11, 30, 0)
.with_timezone(&Utc),
)
]
);
}
+18 -132
View File
@@ -1,45 +1,5 @@
use super::*;
fn default_bytes() -> usize {
20480
}
/// Options in how to handle task output. Some tasks can be quite
/// verbose, and the output may not be needed.
#[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)]
#[serde(deny_unknown_fields)]
pub struct TaskOutputOptions {
/// If true, output from successful tasks is discarded entirely, in
/// keeping with the UNIX philosophy of no news is good news
#[serde(default)]
pub discard_successful: bool,
/// If true, and output is not discarded, truncate the output of
/// each task to a maximum of the first / last `preserve` kb of
/// data
#[serde(default)]
pub truncate: bool,
/// Number of KB of output to preserve at the beginning of the ouptut
#[serde(default = "default_bytes")]
pub head_bytes: usize,
/// Number of KB of output to preserve at the end of the outut
#[serde(default = "default_bytes")]
pub tail_bytes: usize,
}
impl Default for TaskOutputOptions {
fn default() -> Self {
TaskOutputOptions {
discard_successful: true,
truncate: true,
head_bytes: default_bytes(),
tail_bytes: default_bytes(),
}
}
}
/// Defines the struct to parse for tasks
#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
#[serde(deny_unknown_fields)]
@@ -136,10 +96,10 @@ pub struct Task {
// Really need to rethink this valid_over and scheduling times. When generating
impl Task {
pub fn generate_times(
pub fn generate_intervals(
&self,
required: &HashMap<Resource, IntervalSet>,
) -> Result<Vec<DateTime<Utc>>> {
) -> Result<Vec<Interval>> {
// Ensure that all intervals that are required are provided by this instance
let reqs: Vec<IntervalSet> = self
.provides
@@ -164,10 +124,11 @@ impl Task {
))
} else {
Ok(ris.iter().fold(Vec::new(), |mut acc, intv| {
acc.extend(self.schedule.generate(
let mut new_intervals = self.schedule.generate(Interval::new(
std::cmp::max(intv.start, self.valid_over.start().unwrap()),
std::cmp::min(intv.end, self.valid_over.end().unwrap()),
));
acc.append(&mut new_intervals);
acc
}))
}
@@ -295,7 +256,7 @@ mod tests {
// No times when out of validity
let times = task
.generate_times(&HashMap::from([
.generate_intervals(&HashMap::from([
("resource_a".to_owned(), isv!(13, 20)),
("resource_b".to_owned(), isv!(13, 20)),
]))
@@ -304,7 +265,7 @@ mod tests {
// Requiring within a valid time range generates times
let times = task
.generate_times(&HashMap::from([
.generate_intervals(&HashMap::from([
("resource_a".to_owned(), isv!(6, 8)),
("resource_b".to_owned(), isv!(6, 8)),
]))
@@ -312,7 +273,7 @@ mod tests {
assert_eq!(times.len(), 6);
// Raise error if unequal requirements
let res = task.generate_times(&HashMap::from([
let res = task.generate_intervals(&HashMap::from([
("resource_a".to_owned(), isv!(6, 7)),
("resource_b".to_owned(), isv!(6, 8)),
]));
@@ -320,94 +281,19 @@ mod tests {
// Require that all times generated be within the
// valid_over
let res = task.generate_times(&HashMap::from([
let res = task.generate_intervals(&HashMap::from([
("resource_a".to_owned(), isv!(1, 30)),
("resource_b".to_owned(), isv!(1, 30)),
]));
assert!(res.is_ok());
let times = res.unwrap();
assert!(times.iter().all(|time| task.valid_over.contains(*time)));
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskAttempt {
#[serde(default)]
pub task_name: String,
#[serde(default = "chrono::Utc::now")]
pub scheduled_time: DateTime<Utc>,
#[serde(default = "chrono::Utc::now")]
pub start_time: DateTime<Utc>,
#[serde(default = "chrono::Utc::now")]
pub stop_time: DateTime<Utc>,
#[serde(default)]
pub succeeded: bool,
#[serde(default)]
pub killed: bool,
#[serde(default)]
pub infra_failure: bool,
#[serde(default)]
pub output: String,
#[serde(default)]
pub error: String,
#[serde(default)]
pub executor: Vec<String>,
#[serde(default)]
pub exit_code: i32,
/// as a percentage
#[serde(default)]
pub max_cpu: f32,
/// as a percentage
#[serde(default)]
pub avg_cpu: f32,
/// In bytes
#[serde(default)]
pub max_rss: u64,
/// In bytes
#[serde(default)]
pub avg_rss: f32,
}
impl Default for TaskAttempt {
fn default() -> Self {
TaskAttempt {
task_name: String::new(),
scheduled_time: Utc::now(),
start_time: Utc::now(),
stop_time: Utc::now(),
succeeded: false,
killed: false,
infra_failure: false,
output: "".to_owned(),
error: "".to_owned(),
executor: Vec::new(),
exit_code: 0i32,
max_cpu: 0.0,
avg_cpu: 0.0,
max_rss: 0,
avg_rss: 0.0,
}
}
}
impl TaskAttempt {
#[must_use]
pub fn new() -> Self {
TaskAttempt::default()
match res {
Ok(intervals) => {
assert!(intervals
.iter()
.all(|interval| task.valid_over.has_subset(*interval)));
}
Err(e) => {
panic!("{:?}", e);
}
};
}
}