Small code cleanup, getting ready to convert runner to a separate process

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-05 15:32:24 -03:00
parent d4ae655b4e
commit 1201e93169
5 changed files with 41 additions and 25 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, R
use clap::Parser; use clap::Parser;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use waterfall::prelude::*; use waterfall::prelude::*;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
+1 -3
View File
@@ -4,14 +4,12 @@ use actix_cors::Cors;
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder}; use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
use clap::Parser; use clap::Parser;
use serde::Serialize; use serde::Serialize;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::oneshot;
use config::*; use config::*;
use waterfall::executors::agent_executor::TaskSubmission; use waterfall::executors::agent_executor::TaskSubmission;
use waterfall::prelude::*; use waterfall::prelude::*;
type TaskDetails = serde_json::Value;
#[derive(Serialize)] #[derive(Serialize)]
struct SimpleError { struct SimpleError {
error: String, error: String,
+1 -2
View File
@@ -117,7 +117,6 @@ async fn submit_task(
varmap: VarMap, varmap: VarMap,
) -> Result<TaskAttempt> { ) -> Result<TaskAttempt> {
let submit_url = format!("{}/run", base_url); let submit_url = format!("{}/run", base_url);
let mut attempt = TaskAttempt::new();
let submission = TaskSubmission { let submission = TaskSubmission {
details, details,
varmap, varmap,
@@ -126,7 +125,7 @@ async fn submit_task(
match client.post(submit_url).json(&submission).send().await { match client.post(submit_url).json(&submission).send().await {
Ok(result) => { Ok(result) => {
if result.status() == reqwest::StatusCode::OK { if result.status() == reqwest::StatusCode::OK {
attempt = result.json().await.unwrap(); let mut attempt: TaskAttempt = result.json().await.unwrap();
attempt attempt
.executor .executor
.push(format!("Executed on agent at {}", base_url)); .push(format!("Executed on agent at {}", base_url));
+1
View File
@@ -5,6 +5,7 @@ use anyhow::{anyhow, Result};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::{Duration, TimeZone}; use chrono::{Duration, TimeZone};
use chrono_tz::Tz; use chrono_tz::Tz;
use log::{error, info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
+37 -19
View File
@@ -29,7 +29,7 @@ pub struct Action {
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum RunnerEvent { pub enum RunnerMessage {
Start, Start,
TaskFailed { TaskFailed {
task_name: String, task_name: String,
@@ -39,6 +39,16 @@ pub enum RunnerEvent {
task_name: String, task_name: String,
interval: Interval, interval: Interval,
}, },
/*
ForceUp {
resources: HashSet<String>,
interval: Interval,
},
ForceDown {
resources: HashSet<String>,
interval: Interval,
},
*/
Timeout, Timeout,
Stop, Stop,
} }
@@ -57,17 +67,17 @@ pub struct Runner {
queue: Vec<Action>, queue: Vec<Action>,
qidx: usize, qidx: usize,
events: FuturesUnordered<tokio::task::JoinHandle<RunnerEvent>>, events: FuturesUnordered<tokio::task::JoinHandle<RunnerMessage>>,
last_horizon: DateTime<Utc>, last_horizon: DateTime<Utc>,
executor: mpsc::UnboundedSender<ExecutorMessage>, executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>, storage: mpsc::UnboundedSender<StorageMessage>,
} }
fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<RunnerEvent> { fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<RunnerMessage> {
tokio::spawn(async move { tokio::spawn(async move {
tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await; tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await;
RunnerEvent::Timeout RunnerMessage::Timeout
}) })
} }
@@ -95,7 +105,7 @@ async fn run_task(
output_options: &TaskOutputOptions, output_options: &TaskOutputOptions,
varmap: &VarMap, varmap: &VarMap,
) -> bool { ) -> bool {
println!("Running {}/{}", task_name, interval); info!("Running {}/{}", task_name, interval);
let (response, response_rx) = oneshot::channel(); let (response, response_rx) = oneshot::channel();
executor executor
.send(ExecutorMessage::ExecuteTask { .send(ExecutorMessage::ExecuteTask {
@@ -128,7 +138,7 @@ async fn up_task(
output_options: TaskOutputOptions, output_options: TaskOutputOptions,
executor: mpsc::UnboundedSender<ExecutorMessage>, executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>, storage: mpsc::UnboundedSender<StorageMessage>,
) -> RunnerEvent { ) -> RunnerMessage {
if let Some(check_cmd) = check.clone() { if let Some(check_cmd) = check.clone() {
let (_subkill, subkill_rx) = oneshot::channel(); let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task( let succeeded = run_task(
@@ -145,7 +155,7 @@ async fn up_task(
// If check succeeded, resources are up // If check succeeded, resources are up
if succeeded { if succeeded {
return RunnerEvent::TaskCompleted { return RunnerMessage::TaskCompleted {
task_name, task_name,
interval, interval,
}; };
@@ -166,7 +176,7 @@ async fn up_task(
) )
.await; .await;
if !succeeded { if !succeeded {
return RunnerEvent::TaskFailed { return RunnerMessage::TaskFailed {
task_name, task_name,
interval, interval,
}; };
@@ -189,18 +199,18 @@ async fn up_task(
// If check succeeded, resources are up // If check succeeded, resources are up
if succeeded { if succeeded {
RunnerEvent::TaskCompleted { RunnerMessage::TaskCompleted {
task_name, task_name,
interval, interval,
} }
} else { } else {
RunnerEvent::TaskFailed { RunnerMessage::TaskFailed {
task_name, task_name,
interval, interval,
} }
} }
} else { } else {
RunnerEvent::TaskCompleted { RunnerMessage::TaskCompleted {
task_name, task_name,
interval, interval,
} }
@@ -322,34 +332,42 @@ impl Runner {
} }
// We'll be using channels for running // We'll be using channels for running
pub async fn run(&mut self, stop: oneshot::Receiver<RunnerEvent>) { pub async fn run(&mut self, stop: oneshot::Receiver<RunnerMessage>) {
self.events.push(tokio::spawn(async move { self.events.push(tokio::spawn(async move {
// This recv will fail if the channel is shutdown, so just ignore it. // This recv will fail if the channel is shutdown, so just ignore it.
stop.await.unwrap_or(RunnerEvent::Stop); stop.await.unwrap_or(RunnerMessage::Stop);
RunnerEvent::Stop RunnerMessage::Stop
})); }));
self.queue_actions(); self.queue_actions();
// Loop while we can make progress // Loop while we can make progress
while !self.is_done() { while !self.is_done() {
// Queue up tasks
if self
.queue
.iter()
.all(|action| action.state == ActionState::Completed)
{
self.tick().unwrap();
}
match self.events.next().await { match self.events.next().await {
Some(Ok(RunnerEvent::Start)) => { Some(Ok(RunnerMessage::Start)) => {
self.queue_actions(); self.queue_actions();
} }
Some(Ok(RunnerEvent::Stop)) => { Some(Ok(RunnerMessage::Stop)) => {
break; break;
} }
Some(Ok(RunnerEvent::Timeout)) => { Some(Ok(RunnerMessage::Timeout)) => {
self.queue_actions(); self.queue_actions();
} }
Some(Ok(RunnerEvent::TaskFailed { Some(Ok(RunnerMessage::TaskFailed {
task_name, task_name,
interval, interval,
})) => { })) => {
println!("FAILED: {} / {}", task_name, interval); println!("FAILED: {} / {}", task_name, interval);
println!("Well that sucks"); println!("Well that sucks");
} }
Some(Ok(RunnerEvent::TaskCompleted { Some(Ok(RunnerMessage::TaskCompleted {
task_name, task_name,
interval, interval,
})) => { })) => {