Simplifying action and task reference a bit, fixing some logic errors, and adding in runner message queue polling

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-06 14:20:23 -03:00
parent ce621dc9d5
commit 2c96b16ec8
7 changed files with 241 additions and 212 deletions
+3 -2
View File
@@ -123,9 +123,11 @@ async fn main() -> std::io::Result<()> {
debug!("Config: {:?}", args);
let (_runner_tx, runner_rx) = mpsc::unbounded_channel();
let mut runner = Runner::new(
tasks,
world_def.variables,
runner_rx,
exe_tx.clone(),
storage_tx.clone(),
world_def.output_options,
@@ -134,8 +136,7 @@ async fn main() -> std::io::Result<()> {
.await
.unwrap();
let (wtx, wrx) = oneshot::channel();
runner.run(wrx).await;
runner.run().await;
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
exe_handle.await.unwrap();
-6
View File
@@ -43,9 +43,3 @@ pub mod task;
pub mod task_set;
pub mod varmap;
pub mod world;
/*
TODO:
target_state -> TaskSet.coverage()
current state
*/
+33
View File
@@ -18,6 +18,8 @@ pub trait Satisfiable {
schedule: &Schedule,
available: &HashMap<String, IntervalSet>,
) -> bool;
fn resources(&self) -> HashSet<Resource>;
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
@@ -29,6 +31,23 @@ pub enum AggregateRequirement {
}
impl Satisfiable for AggregateRequirement {
fn resources(&self) -> HashSet<Resource> {
match self {
AggregateRequirement::All(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
}),
AggregateRequirement::Any(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
}),
AggregateRequirement::None(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
}),
}
}
fn is_satisfied(
&self,
interval: Interval,
@@ -76,6 +95,13 @@ pub enum SingleRequirement {
}
impl Satisfiable for SingleRequirement {
fn resources(&self) -> HashSet<Resource> {
match self {
SingleRequirement::Offset { resource, .. } => HashSet::from([resource.to_owned()]),
SingleRequirement::File { path } => HashSet::new(),
}
}
fn is_satisfied(
&self,
interval: Interval,
@@ -145,6 +171,13 @@ impl Satisfiable for Requirement {
Requirement::Group(req) => req.can_be_satisfied(interval, schedule, available),
}
}
fn resources(&self) -> HashSet<Resource> {
match self {
Requirement::One(req) => req.resources(),
Requirement::Group(req) => req.resources(),
}
}
}
#[cfg(test)]
+116 -151
View File
@@ -1,6 +1,7 @@
use super::*;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::StreamExt;
use std::collections::VecDeque;
/*
Runner is responsible for taking a TaskSet and a varmap and
@@ -22,7 +23,7 @@ pub enum ActionState {
#[derive(Debug, Clone)]
pub struct Action {
task: String,
task: usize,
interval: Interval,
state: ActionState,
// kill: Option<oneshot::Receiver<()>>,
@@ -30,15 +31,9 @@ pub struct Action {
#[derive(Debug, Serialize, Deserialize)]
pub enum RunnerMessage {
Start,
TaskFailed {
task_name: String,
interval: Interval,
},
TaskCompleted {
task_name: String,
interval: Interval,
},
Tick,
ActionCompleted { action_id: usize, succeeded: bool },
RetryAction { action_id: usize },
/*
ForceUp {
resources: HashSet<String>,
@@ -49,7 +44,6 @@ pub enum RunnerMessage {
interval: Interval,
},
*/
Timeout,
Stop,
}
@@ -64,23 +58,17 @@ pub struct Runner {
target: ResourceInterval,
current: ResourceInterval,
queue: Vec<Action>,
actions: Vec<Action>,
qidx: usize,
events: FuturesUnordered<tokio::task::JoinHandle<RunnerMessage>>,
last_horizon: DateTime<Utc>,
messages: mpsc::UnboundedReceiver<RunnerMessage>,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
}
fn gen_timeout(duration: Duration) -> tokio::task::JoinHandle<RunnerMessage> {
tokio::spawn(async move {
tokio::time::sleep(duration.to_std().unwrap()).await;
RunnerMessage::Timeout
})
}
async fn validate_cmd(
executor: mpsc::UnboundedSender<ExecutorMessage>,
cmd: serde_json::Value,
@@ -129,6 +117,7 @@ async fn run_task(
}
async fn up_task(
action_id: usize,
task_name: String,
interval: Interval,
_kill: oneshot::Receiver<()>,
@@ -155,9 +144,9 @@ async fn up_task(
// If check succeeded, resources are up
if succeeded {
return RunnerMessage::TaskCompleted {
task_name,
interval,
return RunnerMessage::ActionCompleted {
action_id,
succeeded: true,
};
}
}
@@ -176,9 +165,9 @@ async fn up_task(
)
.await;
if !succeeded {
return RunnerMessage::TaskFailed {
task_name,
interval,
return RunnerMessage::ActionCompleted {
action_id,
succeeded: false,
};
}
@@ -199,34 +188,45 @@ async fn up_task(
// If check succeeded, resources are up
if succeeded {
RunnerMessage::TaskCompleted {
task_name,
interval,
return RunnerMessage::ActionCompleted {
action_id,
succeeded: true,
};
} else {
return RunnerMessage::ActionCompleted {
action_id,
succeeded: false,
};
}
} else {
RunnerMessage::TaskFailed {
task_name,
interval,
}
}
} else {
RunnerMessage::TaskCompleted {
task_name,
interval,
}
return RunnerMessage::ActionCompleted {
action_id,
succeeded: true,
};
}
}
fn delayed_event(delay: Duration, event: RunnerMessage) -> tokio::task::JoinHandle<RunnerMessage> {
tokio::spawn(async move {
tokio::time::sleep(delay.to_std().unwrap()).await;
event
})
}
impl Runner {
pub async fn new(
tasks: TaskSet,
vars: VarMap,
messages: mpsc::UnboundedReceiver<RunnerMessage>,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
output_options: TaskOutputOptions,
force_check: bool,
) -> Result<Self> {
for tdef in tasks.values() {
tasks.validate()?;
// Validate the task commands can run on the executor
for tdef in tasks.iter() {
validate_cmd(executor.clone(), tdef.up.clone()).await?;
if let Some(cmd) = &tdef.down {
validate_cmd(executor.clone(), cmd.clone()).await?;
@@ -236,6 +236,7 @@ impl Runner {
}
}
// Load last-known state
let current = if force_check {
info!("Force re-check set, starting with empty current state.");
ResourceInterval::new()
@@ -248,41 +249,47 @@ impl Runner {
let res = rx.await.unwrap();
res
};
let target = current.clone();
let end_state = tasks.coverage()?;
let end_state = tasks.coverage();
let mut runner = Runner {
tasks,
vars,
output_options,
end_state,
target: ResourceInterval::new(),
target,
current,
queue: Vec::new(),
actions: Vec::new(),
qidx: 0,
events: FuturesUnordered::new(),
last_horizon: DateTime::<Utc>::MIN_UTC,
messages,
executor,
storage,
};
runner.tick()?;
runner.tick();
runner.queue_actions();
Ok(runner)
}
pub fn tick(&mut self) -> Result<()> {
let target = self.tasks.get_state(Utc::now())?;
// Create queue
let required = target.difference(&self.current);
self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| {
// Generate a new target state and generate any required actions
pub fn tick(&mut self) {
let new_target = self.tasks.get_state(Utc::now() + Duration::days(1));
let new_required = new_target.difference(&self.target);
let mut new_actions =
self.tasks
.iter()
.enumerate()
.fold(Vec::new(), |mut acc, (idx, task)| {
let res: Vec<Action> = task
.generate_intervals(&required)
.generate_intervals(&new_required)
.unwrap()
.into_iter()
.map({
|interval| Action {
task: name.clone(),
task: idx,
interval,
state: ActionState::Queued,
}
@@ -291,106 +298,59 @@ impl Runner {
acc.extend(res);
acc
});
new_actions.sort_unstable_by(|a, b| a.interval.end.partial_cmp(&b.interval.end).unwrap());
// Ensure that all actions can be satisfied
let unsatisfied = self
.queue
.iter()
.filter(|act| {
!self
.tasks
.get(&act.task)
.unwrap()
.can_be_satisfied(act.interval, &target)
})
.fold(HashSet::new(), |mut acc, a| {
acc.insert(a.task.clone());
acc
});
// Ensure current +
let mut result_state = self.current.clone();
for action in &self.queue {
for res in &self.tasks.get(&action.task).unwrap().provides {
result_state
.entry(res.clone())
.or_insert(IntervalSet::new())
.insert(action.interval);
}
}
if result_state != target {
return Err(anyhow!(
"Actions generated produced\n\t{:?}\nExpected\n\t{:?}",
result_state,
target
));
info!("Tick: Generated {} new actions", new_actions.len());
self.actions.extend(new_actions);
}
if unsatisfied.is_empty() {
self.target = target;
Ok(())
} else {
Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied))
}
}
pub async fn run(&mut self) {
self.events
.push(delayed_event(Duration::seconds(1), RunnerMessage::Tick));
// We'll be using channels for running
pub async fn run(&mut self, stop: oneshot::Receiver<RunnerMessage>) {
self.events.push(tokio::spawn(async move {
// This recv will fail if the channel is shutdown, so just ignore it.
stop.await.unwrap_or(RunnerMessage::Stop);
RunnerMessage::Stop
}));
self.queue_actions();
// Loop while we can make progress
// Need to incorporate the ability to receive messages
//
// Loop until the current state matches the end state
while !self.is_done() {
// Queue up tasks
if self
.queue
.iter()
.all(|action| action.state == ActionState::Completed)
{
let now = Utc::now();
let next_time = self
.tasks
.values()
.map(|t| t.schedule.next_time(now))
.min()
.unwrap()
.with_timezone(&Utc);
let sleep_duration = next_time - now;
info!("Sleeping for {} until next task", sleep_duration);
self.events.push(gen_timeout(sleep_duration));
self.tick().unwrap();
}
match self.events.next().await {
Some(Ok(RunnerMessage::Start)) => {
Some(Ok(RunnerMessage::Tick)) => {
debug!("Tick");
// Enqueue new messages
while let Ok(msg) = self.messages.try_recv() {
self.events.push(delayed_event(Duration::seconds(0), msg));
}
match self.actions.last() {
Some(action) => {
if action.interval.end <= Utc::now() {
self.tick()
}
}
None => self.tick(),
}
// Perform maintenance
self.queue_actions();
self.events
.push(delayed_event(Duration::seconds(5), RunnerMessage::Tick));
}
Some(Ok(RunnerMessage::Stop)) => {
info!("Stopping");
break;
}
Some(Ok(RunnerMessage::Timeout)) => {
self.queue_actions();
Some(Ok(RunnerMessage::RetryAction { action_id })) => {
info!("Retrying action {}", action_id);
let action = &mut self.actions[action_id];
action.state = ActionState::Queued;
}
Some(Ok(RunnerMessage::TaskFailed {
task_name,
interval,
Some(Ok(RunnerMessage::ActionCompleted {
action_id,
succeeded,
})) => {
println!("FAILED: {} / {}", task_name, interval);
println!("Well that sucks");
}
Some(Ok(RunnerMessage::TaskCompleted {
task_name,
interval,
})) => {
let action = self
.queue
.iter_mut()
.find(|x| x.task == task_name && x.interval == interval)
.unwrap();
let task = self.tasks.get(&task_name).unwrap();
info!("Completing action {}", action_id);
if succeeded {
let action = &mut self.actions[action_id];
let task = self.tasks.get(action.task).unwrap();
action.state = ActionState::Completed;
for res in &task.provides {
self.current
@@ -398,22 +358,23 @@ impl Runner {
.or_insert(IntervalSet::new())
.insert(action.interval);
}
info!("Updating State");
self.storage
.send(StorageMessage::StoreState {
state: self.current.clone(),
})
.unwrap();
self.queue_actions();
} else {
self.events.push(delayed_event(
Duration::seconds(30),
RunnerMessage::RetryAction { action_id },
));
}
}
Some(Err(e)) => {
panic!("Something went wrong: {:?}", e)
}
None => {
// No pending actions waiting
// Can probably wait to the next event
continue;
}
None => {}
}
// Log stuff
}
@@ -422,12 +383,14 @@ impl Runner {
fn queue_actions(&mut self) {
let now = Utc::now();
// Collect any outstanding futures
for action in self.queue[self.qidx..]
// Submit any elligible jobs
for (action_id, action) in self
.actions
.iter_mut()
.filter(|x| x.state == ActionState::Queued && x.interval.end <= now)
.enumerate()
.filter(|(_, x)| x.state == ActionState::Queued && x.interval.end <= now)
{
let task = self.tasks.get(&action.task).unwrap();
let task = self.tasks.get(action.task).unwrap();
if !task.can_run(action.interval, &self.current) {
continue;
}
@@ -436,7 +399,7 @@ impl Runner {
.iter()
.chain(self.vars.iter())
.collect();
let task_name = action.task.clone();
let task_name = task.name.clone();
let interval = action.interval;
let up = task.up.clone();
let check = task.check.clone();
@@ -445,6 +408,7 @@ impl Runner {
let storage = self.storage.clone();
self.events.push(tokio::spawn(async move {
up_task(
action_id,
task_name.clone(),
interval,
kill,
@@ -532,9 +496,11 @@ mod tests {
"world_test".to_owned(),
);
let (runner_tx, runner_rx) = mpsc::unbounded_channel();
let mut runner = Runner::new(
tasks,
world_def.variables,
runner_rx,
tx.clone(),
storage_tx.clone(),
world_def.output_options,
@@ -543,8 +509,7 @@ mod tests {
.await
.unwrap();
let (wtx, wrx) = oneshot::channel();
runner.run(wrx).await;
runner.run().await;
tx.send(ExecutorMessage::Stop {}).unwrap();
executor.await.unwrap();
+10 -1
View File
@@ -126,6 +126,7 @@ impl TaskDefinition {
let actual_end = schedule.interval(end, 0).start;
Task {
name: name.to_owned(),
up: self.up.clone(),
down: self.down.clone(),
check: self.check.clone(),
@@ -147,6 +148,7 @@ impl TaskDefinition {
*/
#[derive(Clone, Serialize, Debug)]
pub struct Task {
pub name: String,
pub up: TaskDetails,
pub down: Option<TaskDetails>,
pub check: Option<TaskDetails>,
@@ -238,6 +240,13 @@ impl Task {
.all(|req| req.can_be_satisfied(interval, &self.schedule, available))
}
pub fn requires_resources(&self) -> HashSet<Resource> {
self.requires.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
})
}
pub fn up(&self, interval: &Interval) -> Result<HashSet<String>> {
if self.check(interval) {
Ok(self.provides.clone())
@@ -315,7 +324,7 @@ mod tests {
// Produces a std
let cal = Calendar::new();
let task = task_def.to_task(&cal);
let task = task_def.to_task("test", &cal);
// Assert the valid interval is correct
assert_eq!(
+53 -21
View File
@@ -3,28 +3,67 @@ use std::convert::From;
use std::ops::{Deref, DerefMut};
#[derive(Clone, Debug)]
pub struct TaskSet(HashMap<String, Task>);
pub struct TaskSet(Vec<Task>);
impl TaskSet {
pub fn new() -> Self {
TaskSet(HashMap::new())
TaskSet(Vec::new())
}
pub fn coverage(&self) -> Result<ResourceInterval> {
pub fn coverage(&self) -> ResourceInterval {
self.get_state(MAX_TIME)
}
pub fn validate(&self) -> Result<()> {
self.get_state(MAX_TIME)?;
let state = self.coverage();
for task in &self.0 {
for resource in task.requires_resources() {
if !state.contains_key(&resource) {
return Err(anyhow!(
"Task {} requires resource {}, which isn't produced.",
task.name,
resource
));
}
}
}
// validate that no task generates the same resource on overlapping times
let providers: HashMap<Resource, Vec<usize>> =
self.0
.iter()
.enumerate()
.fold(HashMap::new(), |mut acc, (idx, t)| {
for res in &t.provides {
acc.entry(res.clone()).or_insert(Vec::new()).push(idx)
}
acc
});
for (res, tids) in providers {
let mut is = IntervalSet::new();
for tid in tids {
let already_provided = is.intersection(&self.0[tid].valid_over);
if !already_provided.is_empty() {
return Err(anyhow!(
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
res,
already_provided
));
}
is.merge(&self.0[tid].valid_over);
}
}
Ok(())
}
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> Result<ResourceInterval> {
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> ResourceInterval {
let mut res = ResourceInterval::new();
// Insert all of the covered items
for task in self.values() {
// TODO Need to align each of these intervals with a scheduled time
for task in &self.0 {
// Need to align each of these intervals with a scheduled time
let timeline = if time < MAX_TIME {
let cur_intv = task.schedule.interval(time.clone(), 0);
if cur_intv.end > time {
@@ -37,25 +76,18 @@ impl TaskSet {
};
let task_timeline = task.valid_over.intersection(&timeline);
for resource in &task.provides {
let ris = res.entry(resource.clone()).or_insert(IntervalSet::new());
let already_provided = ris.intersection(&task_timeline);
if !already_provided.is_empty() {
return Err(anyhow!(
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
resource,
already_provided
));
}
ris.merge(&task_timeline);
res.entry(resource.clone())
.or_insert(IntervalSet::new())
.merge(&task_timeline);
}
}
Ok(res)
res
}
}
impl Deref for TaskSet {
type Target = HashMap<String, Task>;
type Target = Vec<Task>;
fn deref(&self) -> &Self::Target {
&self.0
}
@@ -67,8 +99,8 @@ impl DerefMut for TaskSet {
}
}
impl From<HashMap<String, Task>> for TaskSet {
fn from(data: HashMap<String, Task>) -> Self {
impl From<Vec<Task>> for TaskSet {
fn from(data: Vec<Task>) -> Self {
Self(data)
}
}
+2 -7
View File
@@ -26,15 +26,10 @@ impl WorldDefinition {
));
}
}
let tasks: HashMap<String, Task> = self
let tasks: Vec<Task> = self
.tasks
.iter()
.map(|(tn, td)| {
(
tn.clone(),
td.to_task(tn, self.calendars.get(&td.calendar_name).unwrap()),
)
})
.map(|(tn, td)| td.to_task(tn, self.calendars.get(&td.calendar_name).unwrap()))
.collect();
let ts = TaskSet::from(tasks);