Adding remote_agent

This commit is contained in:
Kinesin Data Technologies Incorporated
2022-10-05 08:50:40 -03:00
parent 779852022a
commit 0d6cea4152
12 changed files with 305 additions and 40 deletions
+2
View File
@@ -21,3 +21,5 @@ redis = { version = "*", features = ["aio", "tokio-comp"] }
clap = { version = "3.1", features = ["derive"] } clap = { version = "3.1", features = ["derive"] }
env_logger = "0.9" env_logger = "0.9"
log = "0.4" log = "0.4"
actix-web = "4"
actix-cors = "0.6"
+84
View File
@@ -0,0 +1,84 @@
pub use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use sysinfo::{RefreshKind, System, SystemExt};
use tokio::sync::mpsc;
use waterfall::prelude::*;
fn default_resources() -> TaskResources {
let system = System::new_with_specifics(RefreshKind::new().with_cpu().with_memory());
let cores = (system.processors().len() as i64) - 2;
let free_memory = (system.total_memory() - system.used_memory()) as f64;
let memory_mb = ((free_memory * 0.8) as i64) / 1024;
let mut resources = TaskResources::new();
resources.insert("cores".to_owned(), cores);
resources.insert("memory_mb".to_owned(), memory_mb);
resources
}
fn default_ip() -> String {
"127.0.0.1".to_owned()
}
fn default_port() -> u32 {
2504
}
#[derive(Deserialize, Debug, Clone)]
pub struct GlobalConfigSpec {
#[serde(default = "default_ip")]
pub ip: String,
#[serde(default = "default_port")]
pub port: u32,
#[serde(default = "default_resources")]
pub resources: TaskResources,
}
impl Default for GlobalConfigSpec {
fn default() -> Self {
GlobalConfigSpec {
ip: String::from("127.0.0.1"),
port: default_port(),
resources: default_resources(),
}
}
}
#[derive(Clone)]
pub struct GlobalConfig {
pub ip: String,
pub port: u32,
pub resources: TaskResources,
pub tracker: mpsc::UnboundedSender<TrackerMessage>,
pub executor: mpsc::UnboundedSender<ExecutorMessage>,
}
impl GlobalConfig {
pub fn new(spec: &GlobalConfigSpec) -> Self {
let def_res = default_resources();
let cores = def_res.get("cores").unwrap();
let workers = spec.resources.get("cores").unwrap_or(cores);
let (executor, exe_rx) = mpsc::unbounded_channel();
local_executor::start(*workers as usize, exe_rx);
// Tracker
let (tracker, trx) = mpsc::unbounded_channel();
noop_tracker::start(trx);
GlobalConfig {
ip: spec.ip.clone(),
port: spec.port,
resources: spec.resources.clone(),
tracker,
executor,
}
}
pub fn listen_spec(&self) -> String {
format!("{}:{}", self.ip, self.port)
}
}
+163
View File
@@ -0,0 +1,163 @@
mod config;
use actix_cors::Cors;
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
use clap::Parser;
use serde::Serialize;
use tokio::sync::{mpsc, oneshot};
use config::*;
use waterfall::prelude::*;
#[derive(Serialize)]
struct SimpleError {
error: String,
}
async fn get_resources(data: web::Data<GlobalConfig>) -> impl Responder {
HttpResponse::Ok().json(data.resources.clone())
}
async fn submit_task(
details: web::Json<TaskDetails>,
data: web::Data<GlobalConfig>,
) -> impl Responder {
let (response, mut rx) = mpsc::unbounded_channel();
let trx = data.tracker.clone();
data.executor
.send(ExecutorMessage::ExecuteTask {
details: details.into_inner(),
output_options: TaskOutputOptions::default(),
tracker: trx,
response,
})
.unwrap();
match rx.recv().await.unwrap() {
RunnerMessage::ExecutionReport { attempt, .. } => HttpResponse::Ok().json(attempt),
other => HttpResponse::BadRequest().json(SimpleError {
error: format!("Unexpected message {:?}", other),
}),
}
}
async fn stop_task(
path: web::Path<(RunID, TaskID)>,
data: web::Data<GlobalConfig>,
) -> impl Responder {
let (run_id, task_id) = path.into_inner();
let (response, rx) = oneshot::channel();
data.executor
.send(ExecutorMessage::StopTask {
run_id,
task_id,
response,
})
.unwrap();
rx.await.unwrap();
HttpResponse::Ok()
}
async fn ready() -> impl Responder {
HttpResponse::Ok()
}
fn init(config_file: &str) -> GlobalConfig {
let spec: GlobalConfigSpec = if config_file.is_empty() {
GlobalConfigSpec::default()
} else {
let json = std::fs::read_to_string(config_file)
.unwrap_or_else(|_| panic!("Unable to open {} for reading", config_file));
serde_json::from_str(&json).expect("Error parsing config json")
};
GlobalConfig::new(&spec)
}
#[derive(Parser, Debug)]
#[clap(author, version, about)]
struct Args {
/// Configuration File
#[clap(short, long, default_value = "")]
config: String,
/// Enable verbose logging
#[clap(short, long)]
verbose: bool,
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = Args::parse();
let data = web::Data::new(init(args.config.as_ref()));
let config = data.clone();
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let res = HttpServer::new(move || {
let cors = Cors::default()
.allow_any_header()
.allow_any_method()
.allow_any_origin()
.send_wildcard();
let json_config = web::JsonConfig::default()
.limit(1048576)
.error_handler(|err, _req| {
use actix_web::error::JsonPayloadError;
let payload = match &err {
JsonPayloadError::OverflowKnownLength { length, limit } => SimpleError {
error: format!("Payload too big ({} > {})", length, limit),
},
JsonPayloadError::Overflow { limit } => SimpleError {
error: format!("Payload too big (> {})", limit),
},
JsonPayloadError::ContentType => SimpleError {
error: "Unsupported Content-Type".to_owned(),
},
JsonPayloadError::Deserialize(e) => SimpleError {
error: format!("Parsing error: {}", e),
},
JsonPayloadError::Serialize(e) => SimpleError {
error: format!("JSON Generation error: {}", e),
},
JsonPayloadError::Payload(payload) => SimpleError {
error: format!("Payload error: {}", payload),
},
_ => SimpleError {
error: "Unknown error".to_owned(),
},
};
error::InternalError::from_response(err, HttpResponse::Conflict().json(payload))
.into()
});
App::new()
.wrap(cors)
.app_data(data.clone())
.wrap(Logger::new(
r#"%a "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#,
))
.app_data(json_config)
.route("/ready", web::get().to(ready))
.service(
web::scope("/api/v1")
.route("/resources", web::get().to(get_resources))
.route("/{run_id}/{task_id}", web::post().to(submit_task))
.route("/{run_id}/{task_id}", web::delete().to(stop_task)),
)
})
.bind(config.listen_spec())?
.run()
.await;
config.executor.send(ExecutorMessage::Stop {}).unwrap();
config.tracker.send(TrackerMessage::Stop {}).unwrap();
res
}
+5 -3
View File
@@ -2,6 +2,7 @@ use clap::Parser;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use waterfall;
use waterfall::prelude::*; use waterfall::prelude::*;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@@ -19,9 +20,10 @@ impl StorageConfig {
) { ) {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
match self { match self {
StorageConfig::Redis { url, prefix } => { StorageConfig::Redis { url, prefix } => (
(tx, redis_store::start(rx, url.clone(), prefix.clone())) tx,
} waterfall::storage::redis::start(rx, url.clone(), prefix.clone()),
),
} }
} }
} }
+1 -11
View File
@@ -183,12 +183,9 @@ async fn start_agent_executor(
}); });
} }
ExecuteTask { ExecuteTask {
task_name,
interval,
details, details,
varmap, varmap,
output_options, output_options,
storage,
response, response,
kill, kill,
} => { } => {
@@ -214,14 +211,7 @@ async fn start_agent_executor(
) )
.await; .await;
let rc = attempt.succeeded; let rc = attempt.succeeded;
storage response.send(attempt).unwrap();
.send(StorageMessage::StoreAttempt {
task_name,
interval,
attempt,
})
.unwrap();
response.send(rc).unwrap();
(tid, resources, rc) (tid, resources, rc)
})); }));
break; break;
+1 -12
View File
@@ -255,12 +255,9 @@ pub async fn start_local_executor(
}); });
} }
ExecuteTask { ExecuteTask {
task_name,
interval,
details, details,
varmap, varmap,
output_options, output_options,
storage,
response, response,
kill, kill,
} => { } => {
@@ -277,15 +274,7 @@ pub async fn start_local_executor(
..TaskAttempt::new() ..TaskAttempt::new()
}, },
}; };
let rc = attempt.succeeded; response.send(attempt).unwrap();
storage
.send(StorageMessage::StoreAttempt {
task_name,
interval,
attempt,
})
.unwrap();
response.send(rc).unwrap();
})); }));
} }
Stop {} => { Stop {} => {
+1 -4
View File
@@ -17,13 +17,10 @@ pub enum ExecutorMessage {
/// Errors /// Errors
/// Will return `Err` if the tasks are invalid, according to the executor /// Will return `Err` if the tasks are invalid, according to the executor
ExecuteTask { ExecuteTask {
task_name: String,
interval: Interval,
details: serde_json::Value, details: serde_json::Value,
varmap: VarMap, varmap: VarMap,
output_options: TaskOutputOptions, output_options: TaskOutputOptions,
storage: mpsc::UnboundedSender<StorageMessage>, response: oneshot::Sender<TaskAttempt>,
response: oneshot::Sender<bool>,
kill: oneshot::Receiver<()>, kill: oneshot::Receiver<()>,
}, },
Stop {}, Stop {},
+1 -1
View File
@@ -2,5 +2,5 @@ pub use crate::calendar::Calendar;
pub use crate::executors::*; pub use crate::executors::*;
pub use crate::runner::Runner; pub use crate::runner::Runner;
pub use crate::storage::*; pub use crate::storage::*;
pub use crate::task::TaskDefinition; pub use crate::task::{TaskDefinition, TaskResources};
pub use crate::world::WorldDefinition; pub use crate::world::WorldDefinition;
+14 -8
View File
@@ -99,23 +99,29 @@ async fn run_task(
let (response, response_rx) = oneshot::channel(); let (response, response_rx) = oneshot::channel();
executor executor
.send(ExecutorMessage::ExecuteTask { .send(ExecutorMessage::ExecuteTask {
task_name,
interval,
details, details,
output_options: output_options.clone(), output_options: output_options.clone(),
varmap: varmap.clone(), varmap: varmap.clone(),
response, response,
kill, kill,
storage,
}) })
.unwrap(); .unwrap();
response_rx.await.unwrap() let attempt = response_rx.await.unwrap();
let rc = attempt.succeeded;
storage
.send(StorageMessage::StoreAttempt {
task_name,
interval,
attempt: attempt.clone(),
})
.unwrap();
rc
} }
async fn up_task( async fn up_task(
task_name: String, task_name: String,
interval: Interval, interval: Interval,
kill: oneshot::Receiver<()>, _kill: oneshot::Receiver<()>,
varmap: VarMap, varmap: VarMap,
up: TaskDetails, up: TaskDetails,
check: Option<TaskDetails>, check: Option<TaskDetails>,
@@ -124,7 +130,7 @@ async fn up_task(
storage: mpsc::UnboundedSender<StorageMessage>, storage: mpsc::UnboundedSender<StorageMessage>,
) -> RunnerEvent { ) -> RunnerEvent {
if let Some(check_cmd) = check.clone() { if let Some(check_cmd) = check.clone() {
let (subkill, subkill_rx) = oneshot::channel(); let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task( let succeeded = run_task(
task_name.clone(), task_name.clone(),
interval, interval,
@@ -147,7 +153,7 @@ async fn up_task(
} }
// UP // UP
let (subkill, subkill_rx) = oneshot::channel(); let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task( let succeeded = run_task(
task_name.clone(), task_name.clone(),
interval, interval,
@@ -168,7 +174,7 @@ async fn up_task(
// recheck // recheck
if let Some(check_cmd) = check { if let Some(check_cmd) = check {
let (subkill, subkill_rx) = oneshot::channel(); let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task( let succeeded = run_task(
task_name.clone(), task_name.clone(),
interval, interval,
+2 -1
View File
@@ -25,4 +25,5 @@ pub enum StorageMessage {
Stop {}, Stop {},
} }
pub mod redis_store; pub mod noop;
pub mod redis;
+29
View File
@@ -0,0 +1,29 @@
use super::*;
/// The mpsc channel can be sized to fit max parallelism
pub async fn start_storage(mut msgs: mpsc::UnboundedReceiver<StorageMessage>) -> Result<()> {
let mut current_state = ResourceInterval::new();
while let Some(msg) = msgs.recv().await {
use StorageMessage::*;
match msg {
StoreAttempt { .. } => {}
StoreState { state } => {
current_state = state;
}
LoadState { response } => {
response.send(current_state.clone()).unwrap();
}
Stop {} => {
break;
}
}
}
Ok(())
}
pub fn start(msgs: mpsc::UnboundedReceiver<StorageMessage>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
start_storage(msgs).await.expect("Unable to start storage");
})
}
@@ -1,5 +1,7 @@
use super::*; use super::*;
extern crate redis;
use futures::prelude::*; use futures::prelude::*;
use redis::AsyncCommands; use redis::AsyncCommands;