From 1201e9316908f733ed3c48a13375ef774d00538a Mon Sep 17 00:00:00 2001 From: Kinesin Data Technologies Incorporated <93931750+kinesintech@users.noreply.github.com> Date: Wed, 5 Oct 2022 15:32:24 -0300 Subject: [PATCH] Small code cleanup, getting ready to convert runner to a separate process --- src/bin/wfd/main.rs | 2 +- src/bin/wfw/main.rs | 4 +-- src/executors/agent_executor.rs | 3 +- src/lib.rs | 1 + src/runner.rs | 56 ++++++++++++++++++++++----------- 5 files changed, 41 insertions(+), 25 deletions(-) diff --git a/src/bin/wfd/main.rs b/src/bin/wfd/main.rs index 314a283..b3d44f5 100644 --- a/src/bin/wfd/main.rs +++ b/src/bin/wfd/main.rs @@ -3,7 +3,7 @@ use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, R use clap::Parser; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use waterfall::prelude::*; #[derive(Serialize, Deserialize, Debug)] diff --git a/src/bin/wfw/main.rs b/src/bin/wfw/main.rs index 09dafa5..ddaed98 100644 --- a/src/bin/wfw/main.rs +++ b/src/bin/wfw/main.rs @@ -4,14 +4,12 @@ use actix_cors::Cors; use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder}; use clap::Parser; use serde::Serialize; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use config::*; use waterfall::executors::agent_executor::TaskSubmission; use waterfall::prelude::*; -type TaskDetails = serde_json::Value; - #[derive(Serialize)] struct SimpleError { error: String, diff --git a/src/executors/agent_executor.rs b/src/executors/agent_executor.rs index 2b1b71d..f2ed7f1 100644 --- a/src/executors/agent_executor.rs +++ b/src/executors/agent_executor.rs @@ -117,7 +117,6 @@ async fn submit_task( varmap: VarMap, ) -> Result { let submit_url = format!("{}/run", base_url); - let mut attempt = TaskAttempt::new(); let submission = TaskSubmission { details, varmap, @@ -126,7 +125,7 @@ async fn submit_task( match client.post(submit_url).json(&submission).send().await { Ok(result) => { if result.status() == reqwest::StatusCode::OK { - attempt = result.json().await.unwrap(); + let mut attempt: TaskAttempt = result.json().await.unwrap(); attempt .executor .push(format!("Executed on agent at {}", base_url)); diff --git a/src/lib.rs b/src/lib.rs index 9ad9023..11f2b32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ use anyhow::{anyhow, Result}; use chrono::prelude::*; use chrono::{Duration, TimeZone}; use chrono_tz::Tz; +use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use tokio::sync::{mpsc, oneshot}; diff --git a/src/runner.rs b/src/runner.rs index c5a963e..b61bdce 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -29,7 +29,7 @@ pub struct Action { } #[derive(Debug, Serialize, Deserialize)] -pub enum RunnerEvent { +pub enum RunnerMessage { Start, TaskFailed { task_name: String, @@ -39,6 +39,16 @@ pub enum RunnerEvent { task_name: String, interval: Interval, }, + /* + ForceUp { + resources: HashSet, + interval: Interval, + }, + ForceDown { + resources: HashSet, + interval: Interval, + }, + */ Timeout, Stop, } @@ -57,17 +67,17 @@ pub struct Runner { queue: Vec, qidx: usize, - events: FuturesUnordered>, + events: FuturesUnordered>, last_horizon: DateTime, executor: mpsc::UnboundedSender, storage: mpsc::UnboundedSender, } -fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle { +fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle { tokio::spawn(async move { tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await; - RunnerEvent::Timeout + RunnerMessage::Timeout }) } @@ -95,7 +105,7 @@ async fn run_task( output_options: &TaskOutputOptions, varmap: &VarMap, ) -> bool { - println!("Running {}/{}", task_name, interval); + info!("Running {}/{}", task_name, interval); let (response, response_rx) = oneshot::channel(); executor .send(ExecutorMessage::ExecuteTask { @@ -128,7 +138,7 @@ async fn up_task( output_options: TaskOutputOptions, executor: mpsc::UnboundedSender, storage: mpsc::UnboundedSender, -) -> RunnerEvent { +) -> RunnerMessage { if let Some(check_cmd) = check.clone() { let (_subkill, subkill_rx) = oneshot::channel(); let succeeded = run_task( @@ -145,7 +155,7 @@ async fn up_task( // If check succeeded, resources are up if succeeded { - return RunnerEvent::TaskCompleted { + return RunnerMessage::TaskCompleted { task_name, interval, }; @@ -166,7 +176,7 @@ async fn up_task( ) .await; if !succeeded { - return RunnerEvent::TaskFailed { + return RunnerMessage::TaskFailed { task_name, interval, }; @@ -189,18 +199,18 @@ async fn up_task( // If check succeeded, resources are up if succeeded { - RunnerEvent::TaskCompleted { + RunnerMessage::TaskCompleted { task_name, interval, } } else { - RunnerEvent::TaskFailed { + RunnerMessage::TaskFailed { task_name, interval, } } } else { - RunnerEvent::TaskCompleted { + RunnerMessage::TaskCompleted { task_name, interval, } @@ -322,34 +332,42 @@ impl Runner { } // We'll be using channels for running - pub async fn run(&mut self, stop: oneshot::Receiver) { + pub async fn run(&mut self, stop: oneshot::Receiver) { self.events.push(tokio::spawn(async move { // This recv will fail if the channel is shutdown, so just ignore it. - stop.await.unwrap_or(RunnerEvent::Stop); - RunnerEvent::Stop + stop.await.unwrap_or(RunnerMessage::Stop); + RunnerMessage::Stop })); self.queue_actions(); // Loop while we can make progress while !self.is_done() { + // Queue up tasks + if self + .queue + .iter() + .all(|action| action.state == ActionState::Completed) + { + self.tick().unwrap(); + } match self.events.next().await { - Some(Ok(RunnerEvent::Start)) => { + Some(Ok(RunnerMessage::Start)) => { self.queue_actions(); } - Some(Ok(RunnerEvent::Stop)) => { + Some(Ok(RunnerMessage::Stop)) => { break; } - Some(Ok(RunnerEvent::Timeout)) => { + Some(Ok(RunnerMessage::Timeout)) => { self.queue_actions(); } - Some(Ok(RunnerEvent::TaskFailed { + Some(Ok(RunnerMessage::TaskFailed { task_name, interval, })) => { println!("FAILED: {} / {}", task_name, interval); println!("Well that sucks"); } - Some(Ok(RunnerEvent::TaskCompleted { + Some(Ok(RunnerMessage::TaskCompleted { task_name, interval, })) => {