31 Commits

Author SHA1 Message Date
Kinesin Data Technologies Incorporated ca6f65bae2 Changing refresh intervals 2022-10-07 17:53:23 -03:00
Kinesin Data Technologies Incorporated 6042507ab7 Making zoom for timeline sticky, and adaptively coalesce intervals to limit payload sizes 2022-10-07 17:37:25 -03:00
Kinesin Data Technologies Incorporated 304e04cca9 Checkpointing work, need to add resolutions for actions 2022-10-07 16:40:31 -03:00
Kinesin Data Technologies Incorporated 2e89c1bac2 Updating UI 2022-10-07 15:54:30 -03:00
Kinesin Data Technologies Incorporated e711b3249b Adding vue copy/paste from daggy 2022-10-07 12:25:41 -03:00
Kinesin Data Technologies Incorporated 6b49038db6 First pass at web interface is working 2022-10-07 10:49:30 -03:00
Kinesin Data Technologies Incorporated 4d2a71d028 Adding in structure for expected payload 2022-10-06 21:55:18 -03:00
Kinesin Data Technologies Incorporated 6f5f890b1e Checkpointing work to add JS endpoint 2022-10-06 17:13:12 -03:00
Kinesin Data Technologies Incorporated f5ca3315f0 Adding wfd config, endpoint to get state 2022-10-06 15:32:05 -03:00
Kinesin Data Technologies Incorporated eb590c848e Adding a Clear option for storage 2022-10-06 15:12:36 -03:00
Kinesin Data Technologies Incorporated 5d4ff2bb8b Adding ForceUp message 2022-10-06 14:52:54 -03:00
Kinesin Data Technologies Incorporated 03412bb79d Adding force-down option 2022-10-06 14:51:06 -03:00
Kinesin Data Technologies Incorporated 2c96b16ec8 Simplifying action and task reference a bit, fixing some logic errors, and adding in runner message queue polling 2022-10-06 14:20:23 -03:00
Kinesin Data Technologies Incorporated ce621dc9d5 updating todo 2022-10-06 08:49:26 -03:00
Kinesin Data Technologies Incorporated 7c2c6cd8c7 Updating TODO 2022-10-05 17:25:02 -03:00
Kinesin Data Technologies Incorporated d6ced6db50 Adding more examples, fixing an issue in the generation of state at time T 2022-10-05 17:23:53 -03:00
Kinesin Data Technologies Incorporated 1201e93169 Small code cleanup, getting ready to convert runner to a separate process 2022-10-05 15:32:24 -03:00
Kinesin Data Technologies Incorporated d4ae655b4e Adding TODO 2022-10-05 13:42:13 -03:00
Kinesin Data Technologies Incorporated 4f957e91da Adding world daemon 2022-10-05 13:34:22 -03:00
Kinesin Data Technologies Incorporated 8c29665962 Uopdating readme 2022-10-05 13:03:04 -03:00
Kinesin Data Technologies Incorporated bbce4c208c Adding documentation 2022-10-05 12:51:07 -03:00
Kinesin Data Technologies Incorporated bb3c0d3972 Adding default provides of the task name 2022-10-05 12:48:29 -03:00
Kinesin Data Technologies Incorporated 8f6e96e989 Adding in a force-recheck option 2022-10-05 12:29:20 -03:00
Kinesin Data Technologies Incorporated 56771a5f47 changing waterfallw to wfw (waterfall worker) 2022-10-05 10:43:52 -03:00
Kinesin Data Technologies Incorporated 923025bc49 Adding example for agent execution 2022-10-05 10:35:48 -03:00
Kinesin Data Technologies Incorporated d82b000f9b Remote agent working now 2022-10-05 10:20:54 -03:00
Kinesin Data Technologies Incorporated 834b0f2c9c Renaming agent to waterfallw 2022-10-05 09:25:49 -03:00
Kinesin Data Technologies Incorporated 5a4c5034a3 Agent compiling 2022-10-05 09:25:11 -03:00
Kinesin Data Technologies Incorporated ca9a32c032 Adapting the agent 2022-10-05 09:19:07 -03:00
Kinesin Data Technologies Incorporated 0d6cea4152 Adding remote_agent 2022-10-05 08:50:40 -03:00
Kinesin Data Technologies Incorporated 779852022a Adding agent executor 2022-10-05 08:36:16 -03:00
56 changed files with 6297 additions and 270 deletions
+3
View File
@@ -20,3 +20,6 @@ sysinfo = "0.23"
redis = { version = "*", features = ["aio", "tokio-comp"] }
clap = { version = "3.1", features = ["derive"] }
env_logger = "0.9"
log = "0.4"
actix-web = "4"
actix-cors = "0.6"
+60
View File
@@ -0,0 +1,60 @@
# Waterfall
Waterfall is a declarative task execution framework.
# Why Another Execution Framework
There are many, many execution frameworks out there that support defining
tasks with inter-task dependencies. Most of them only partially include
scheduling in their design.
# Building and Running
```bash
cargo build
# wf is a cli for running worlds directly
# A redis instance is required for storage
# Run using the local executor
cargo run --bin wf -- --config examples/config.json --world examples/world.json
# Starting an agent
# wfw is a (W)ater(F)low (W)orker
cargo run --bin wfw
cargo run --bin wf -- --config examples/config_wfw.json --world examples/world.json
```
# Overview
## Example
## Resources
Resources are at the heart of Waterfall. They are simple things: labels
with an associated set of time intervals. Tasks produce resources for
given intervals.
## Tasks
Tasks are commands that run on a set schedule. Each task produces one or
more `Resource`. The run schedule naturally breaks up the timeline into
intervals. When a task runs at time `T_n`, it will make make each resource
it provides available over the interval `(T_{n-1},T]`.
### Commands
A task has three commands defined:
- **check** - Command used to run an out-of-band verification of the data. Should have no side effects.
- **up** - Command run to create resources.
- **down** - Command run when removing resources.
### Dependencies
Tasks will run at their scheduled time (or immediately if their scheduled time
has passed already).
It's possible to define additional constraints on launching, though. Some tasks
may need resources produced by other tasks before it can start.
+10
View File
@@ -0,0 +1,10 @@
- core
- [ ] Convert Runner to be more like daggyr runner
- [ ] Implement task kill / stop
- [ ] Implement force resources up/down with cascade
- wfd
- [ ] Implement shutdown when world is complete
- [ ] Implement endpoints
- Get state over interval
- Store (interval, {UP,QUEUED,ERRORED, ETC})
- Get attempts
+17
View File
@@ -0,0 +1,17 @@
{
"storage": {
"type": "redis",
"url": "redis://localhost",
"prefix": "world"
},
"executor": {
"type": "agent",
"targets": [
{
"base_url": "http://localhost:2504/api/v1",
"resources": { "cores": 10 }
}
]
}
}
+25
View File
@@ -0,0 +1,25 @@
{
"storage": {
"type": "redis",
"url": "redis://localhost",
"prefix": "world"
},
"executor": {
"type": "agent",
"targets": [
{
"base_url": "http://localhost:2504/api/v1",
"resources": { "cores": 1 }
},
{
"base_url": "http://localhost:2505/api/v1",
"resources": { "cores": 1 }
},
{
"base_url": "http://localhost:2506/api/v1",
"resources": { "cores": 1 }
}
]
}
}
+55
View File
@@ -0,0 +1,55 @@
{
"variables": {
"HOME": "/tmp/world_test"
},
"calendars": {
"std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] },
"weekly": { "mask": [ "Fri" ] }
},
"tasks": {
"task_a": {
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "alpha" ],
"calendar_name": "std",
"times": [ "09:00:00", "12:00:00"],
"timezone": "America/New_York",
"valid_from": "2021-01-01T09:00:00",
"valid_to": "2022-06-01T09:00:00"
},
"task_a_new": {
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "alpha" ],
"calendar_name": "std",
"times": [ "09:00:00", "12:00:00"],
"timezone": "America/New_York",
"valid_from": "2022-06-01T09:00:00",
"valid_to": "2023-05-01T09:00:00"
},
"task_b": {
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "beta" ],
"requires": [ { "resource": "alpha", "offset": 0 } ],
"calendar_name": "weekly",
"times": [ "17:00:00" ],
"timezone": "America/New_York",
"valid_from": "2022-01-04T09:00:00",
"valid_to": "2023-01-07T00:00:00"
}
}
}
+17
View File
@@ -0,0 +1,17 @@
{
"server": {
"ip": "127.0.0.1",
"port": 2503
},
"storage": {
"type": "redis",
"url": "redis://localhost",
"prefix": "another"
},
"executor": {
"type": "local",
"workers": 10
}
}
+6 -6
View File
@@ -7,9 +7,9 @@
},
"tasks": {
"task_a": {
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}" },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}" },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}" },
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_a" ],
@@ -21,9 +21,9 @@
"valid_to": "2022-01-08T09:00:00"
},
"task_b": {
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}" },
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}" },
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}" },
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
"provides": [ "task_b" ],
"requires": [ { "resource": "task_a", "offset": 0 } ],
+23 -6
View File
@@ -1,7 +1,9 @@
use clap::Parser;
use log::*;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use waterfall;
use waterfall::prelude::*;
#[derive(Serialize, Deserialize, Debug)]
@@ -19,9 +21,10 @@ impl StorageConfig {
) {
let (tx, rx) = mpsc::unbounded_channel();
match self {
StorageConfig::Redis { url, prefix } => {
(tx, redis_store::start(rx, url.clone(), prefix.clone()))
}
StorageConfig::Redis { url, prefix } => (
tx,
waterfall::storage::redis::start(rx, url.clone(), prefix.clone()),
),
}
}
}
@@ -29,7 +32,12 @@ impl StorageConfig {
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
enum ExecutorConfig {
Local { workers: usize },
Local {
workers: usize,
},
Agent {
targets: Vec<agent_executor::AgentTarget>,
},
}
impl ExecutorConfig {
@@ -42,6 +50,7 @@ impl ExecutorConfig {
let (tx, rx) = mpsc::unbounded_channel();
match self {
ExecutorConfig::Local { workers } => (tx, local_executor::start(*workers, rx)),
ExecutorConfig::Agent { targets } => (tx, agent_executor::start(targets.clone(), rx)),
}
}
}
@@ -67,6 +76,10 @@ struct Args {
/// Enable verbose logging
#[clap(short, long)]
verbose: bool,
/// Force a full re-check
#[clap(short, long)]
force_recheck: bool,
}
/*
@@ -108,18 +121,22 @@ async fn main() -> std::io::Result<()> {
let tasks = world_def.taskset().unwrap();
debug!("Config: {:?}", args);
let (_runner_tx, runner_rx) = mpsc::unbounded_channel();
let mut runner = Runner::new(
tasks,
world_def.variables,
runner_rx,
exe_tx.clone(),
storage_tx.clone(),
world_def.output_options,
args.force_recheck,
)
.await
.unwrap();
let (wtx, wrx) = oneshot::channel();
runner.run(wrx).await;
runner.run(false).await;
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
exe_handle.await.unwrap();
+426
View File
@@ -0,0 +1,426 @@
use actix_cors::Cors;
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
use clap::Parser;
use log::*;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use waterfall::prelude::*;
#[derive(Serialize, Deserialize, Debug)]
pub struct ServerConfig {
pub ip: String,
pub port: u32,
}
impl ServerConfig {
fn listen_spec(&self) -> String {
format!("{}:{}", self.ip, self.port)
}
}
impl Default for ServerConfig {
fn default() -> Self {
ServerConfig {
ip: String::from("127.0.0.1"),
port: 2503,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
enum StorageConfig {
Redis { url: String, prefix: String },
}
impl StorageConfig {
fn start(
&self,
) -> (
mpsc::UnboundedSender<StorageMessage>,
tokio::task::JoinHandle<()>,
) {
let (tx, rx) = mpsc::unbounded_channel();
match self {
StorageConfig::Redis { url, prefix } => (
tx,
waterfall::storage::redis::start(rx, url.clone(), prefix.clone()),
),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
enum ExecutorConfig {
Local {
workers: usize,
},
Agent {
targets: Vec<agent_executor::AgentTarget>,
},
}
impl ExecutorConfig {
fn start(
&self,
) -> (
mpsc::UnboundedSender<ExecutorMessage>,
tokio::task::JoinHandle<()>,
) {
let (tx, rx) = mpsc::unbounded_channel();
match self {
ExecutorConfig::Local { workers } => (tx, local_executor::start(*workers, rx)),
ExecutorConfig::Agent { targets } => (tx, agent_executor::start(targets.clone(), rx)),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
struct Config {
storage: StorageConfig,
executor: ExecutorConfig,
server: ServerConfig,
}
#[derive(Serialize)]
struct SimpleError {
error: String,
}
async fn get_state(state: web::Data<AppState>) -> impl Responder {
let (response, rx) = oneshot::channel();
state
.runner_tx
.send(RunnerMessage::GetState { response })
.unwrap();
match rx.await {
Ok(world) => HttpResponse::Ok().json(world),
Err(error) => HttpResponse::BadRequest().json(SimpleError {
error: format!("{:?}", error),
}),
}
}
/*
Generates the data structure for [timelines-chart](https://github.com/vasturiano/timelines-chart)
[
{
"group": "resource",
"data": [
{
label: "task_name",
"data": [
{
"timeRange": [ "start", "end" ],
"val": "State"
},
]
}
]
}
]
*/
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct TimelineInterval {
time_range: [DateTime<Utc>; 2],
val: ActionState,
}
#[derive(Serialize)]
struct TimelineLabel {
label: String,
data: Vec<TimelineInterval>,
}
#[derive(Serialize)]
struct TimelineGroup {
group: String,
data: Vec<TimelineLabel>,
}
#[derive(Serialize, Deserialize)]
struct DetailedTimelineOptions {
#[serde(default)]
max_intervals: Option<usize>,
}
async fn get_detailed_timeline(
options: web::Query<DetailedTimelineOptions>,
span: web::Json<Interval>,
state: web::Data<AppState>,
) -> impl Responder {
let interval = span.into_inner();
let max_intervals = options.into_inner().max_intervals;
let (response, rx) = oneshot::channel();
state
.runner_tx
.send(RunnerMessage::GetResourceStateDetails {
interval,
response,
max_intervals,
})
.unwrap();
match rx.await {
Ok(actions) => {
let mut timeline = Vec::new();
info!(
"Querying for actions over {}, got {} responses.",
interval,
actions.len()
);
for (resource, tasks) in actions {
let mut group = TimelineGroup {
group: resource.clone(),
data: Vec::new(),
};
for (task_name, intervals) in tasks.into_iter() {
let data = intervals
.into_iter()
.map(|a| TimelineInterval {
time_range: [a.interval.start, a.interval.end],
val: a.state,
})
.collect();
group.data.push(TimelineLabel {
label: task_name,
data,
});
}
timeline.push(group);
}
HttpResponse::Ok().json(timeline)
}
Err(error) => HttpResponse::BadRequest().json(SimpleError {
error: format!("{:?}", error),
}),
}
}
/// Retrieve all data about a segment, including:
/// What resources it relies on
/// Last attempt (if any)
async fn get_segment_details(
max_intervals: web::Query<Option<usize>>,
span: web::Json<Interval>,
state: web::Data<AppState>,
) -> impl Responder {
/*
let interval = span.into_inner();
let (response, rx) = oneshot::channel();
state
.runner_tx
.send(RunnerMessage::GetResourceStateDetails {
interval,
response,
max_intervals: max_intervals.into_inner(),
})
.unwrap();
match rx.await {
Ok(actions) => {
let mut timeline = Vec::new();
for (resource, tasks) in actions {
let mut group = TimelineGroup {
group: resource.clone(),
data: Vec::new(),
};
for (task_name, mut intervals) in tasks.into_iter() {
// Collapse intervals
if intervals.len() > 50 {}
let data = intervals
.into_iter()
.map(|a| TimelineInterval {
time_range: [a.interval.start, a.interval.end],
val: a.state,
})
.collect();
group.data.push(TimelineLabel {
label: task_name,
data,
});
}
timeline.push(group);
}
HttpResponse::Ok().json(timeline)
}
Err(error) => HttpResponse::BadRequest().json(SimpleError {
error: format!("{:?}", error),
}),
}
*/
HttpResponse::Ok()
}
/*
async fn stop_run(path: web::Path<RunID>, state: web::Data<AppState>) -> impl Responder {
let run_id = path.into_inner();
let (response, rx) = oneshot::channel();
state
.config
.runner
.send(RunnerMessage::StopRun { run_id, response })
.unwrap();
rx.await.unwrap();
HttpResponse::Ok()
}
*/
async fn ready() -> impl Responder {
HttpResponse::Ok()
}
#[derive(Parser, Debug)]
#[clap(author, version, about)]
struct Args {
/// Configuration File
#[clap(short, long, default_value = "")]
config: String,
/// Configuration File
#[clap(short, long, default_value = "")]
world: String,
/// Enable verbose logging
#[clap(short, long)]
verbose: bool,
/// Force a full re-check
#[clap(short, long)]
force_recheck: bool,
}
#[derive(Clone)]
struct AppState {
storage_tx: mpsc::UnboundedSender<StorageMessage>,
runner_tx: mpsc::UnboundedSender<RunnerMessage>,
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = Args::parse();
// Parse the config
let world_json = std::fs::read_to_string(&args.world)
.expect(&format!("Unable to open {} for reading", args.config));
let world_def: WorldDefinition =
serde_json::from_str(&world_json).expect("Unable to parse world definition");
// Parse the config
let config_json = std::fs::read_to_string(&args.config)
.expect(&format!("Unable to open {} for reading", args.config));
let config: Config =
serde_json::from_str(&config_json).expect("Unable to parse config definition");
// Start the workers
let (exe_tx, exe_handle) = config.executor.start();
let (storage_tx, storage_handle) = config.storage.start();
let (runner_tx, runner_rx) = mpsc::unbounded_channel();
let data = web::Data::new(AppState {
storage_tx: storage_tx.clone(),
runner_tx: runner_tx.clone(),
});
let tasks = world_def.taskset().unwrap();
let mut runner = Runner::new(
tasks,
world_def.variables,
runner_rx,
exe_tx.clone(),
storage_tx.clone(),
world_def.output_options,
args.force_recheck,
)
.await
.unwrap();
let runner_handle = tokio::spawn(async move {
runner.run(true).await;
});
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let res = HttpServer::new(move || {
let cors = Cors::default()
.allow_any_header()
.allow_any_method()
.allow_any_origin()
.send_wildcard();
let json_config = web::JsonConfig::default()
.limit(1048576)
.error_handler(|err, _req| {
use actix_web::error::JsonPayloadError;
let payload = match &err {
JsonPayloadError::OverflowKnownLength { length, limit } => SimpleError {
error: format!("Payload too big ({} > {})", length, limit),
},
JsonPayloadError::Overflow { limit } => SimpleError {
error: format!("Payload too big (> {})", limit),
},
JsonPayloadError::ContentType => SimpleError {
error: "Unsupported Content-Type".to_owned(),
},
JsonPayloadError::Deserialize(e) => SimpleError {
error: format!("Parsing error: {}", e),
},
JsonPayloadError::Serialize(e) => SimpleError {
error: format!("JSON Generation error: {}", e),
},
JsonPayloadError::Payload(payload) => SimpleError {
error: format!("Payload error: {}", payload),
},
_ => SimpleError {
error: "Unknown error".to_owned(),
},
};
error::InternalError::from_response(err, HttpResponse::Conflict().json(payload))
.into()
});
App::new()
.wrap(cors)
.app_data(data.clone())
.wrap(Logger::new(
r#"%a "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#,
))
.app_data(json_config)
.route("/ready", web::get().to(ready))
.service(
web::scope("/api/v1")
.route("/state", web::get().to(get_state))
.route("/details", web::post().to(get_detailed_timeline)),
)
})
.bind(config.server.listen_spec())?
.run()
.await;
// Shutdown the runner
runner_tx.send(RunnerMessage::Stop {}).unwrap();
runner_handle.await.unwrap();
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
exe_handle.await.unwrap();
storage_tx.send(StorageMessage::Stop {}).unwrap();
storage_handle.await.unwrap();
res
}
+84
View File
@@ -0,0 +1,84 @@
pub use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use sysinfo::{RefreshKind, System, SystemExt};
use tokio::sync::mpsc;
use waterfall::prelude::*;
fn default_resources() -> TaskResources {
let system = System::new_with_specifics(RefreshKind::new().with_cpu().with_memory());
let cores = (system.processors().len() as i64) - 2;
let free_memory = (system.total_memory() - system.used_memory()) as f64;
let memory_mb = ((free_memory * 0.8) as i64) / 1024;
let mut resources = TaskResources::new();
resources.insert("cores".to_owned(), cores);
resources.insert("memory_mb".to_owned(), memory_mb);
resources
}
fn default_ip() -> String {
"127.0.0.1".to_owned()
}
fn default_port() -> u32 {
2504
}
#[derive(Deserialize, Debug, Clone)]
pub struct GlobalConfigSpec {
#[serde(default = "default_ip")]
pub ip: String,
#[serde(default = "default_port")]
pub port: u32,
#[serde(default = "default_resources")]
pub resources: TaskResources,
}
impl Default for GlobalConfigSpec {
fn default() -> Self {
GlobalConfigSpec {
ip: String::from("127.0.0.1"),
port: default_port(),
resources: default_resources(),
}
}
}
#[derive(Clone)]
pub struct GlobalConfig {
pub ip: String,
pub port: u32,
pub resources: TaskResources,
pub storage: mpsc::UnboundedSender<StorageMessage>,
pub executor: mpsc::UnboundedSender<ExecutorMessage>,
}
impl GlobalConfig {
pub fn new(spec: &GlobalConfigSpec) -> Self {
let def_res = default_resources();
let cores = def_res.get("cores").unwrap();
let workers = spec.resources.get("cores").unwrap_or(cores);
let (executor, exe_rx) = mpsc::unbounded_channel();
local_executor::start(*workers as usize, exe_rx);
// Tracker
let (storage, trx) = mpsc::unbounded_channel();
waterfall::storage::noop::start(trx);
GlobalConfig {
ip: spec.ip.clone(),
port: spec.port,
resources: spec.resources.clone(),
storage,
executor,
}
}
pub fn listen_spec(&self) -> String {
format!("{}:{}", self.ip, self.port)
}
}
+185
View File
@@ -0,0 +1,185 @@
mod config;
use actix_cors::Cors;
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
use clap::Parser;
use serde::Serialize;
use tokio::sync::oneshot;
use config::*;
use waterfall::executors::agent_executor::TaskSubmission;
use waterfall::prelude::*;
#[derive(Serialize)]
struct SimpleError {
error: String,
}
async fn get_resources(data: web::Data<GlobalConfig>) -> impl Responder {
HttpResponse::Ok().json(data.resources.clone())
}
async fn submit_task(
details: web::Json<TaskSubmission>,
data: web::Data<GlobalConfig>,
) -> impl Responder {
let (response, rx) = oneshot::channel();
let submission = details.into_inner();
// Need to keep this unused, otherwise the LE will kill it immediately
let (kill_tx, kill) = oneshot::channel();
data.executor
.send(ExecutorMessage::ExecuteTask {
details: submission.details,
output_options: submission.output_options,
varmap: submission.varmap,
response,
kill,
})
.unwrap();
HttpResponse::Ok().json(rx.await.unwrap())
}
/*
async fn stop_task(
path: web::Path<(RunID, TaskID)>,
data: web::Data<GlobalConfig>,
) -> impl Responder {
let (run_id, task_id) = path.into_inner();
let (response, rx) = oneshot::channel();
data.executor
.send(ExecutorMessage::StopTask {
run_id,
task_id,
response,
})
.unwrap();
rx.await.unwrap();
HttpResponse::Ok()
}
*/
async fn ready() -> impl Responder {
HttpResponse::Ok()
}
fn init(config_file: &str) -> GlobalConfig {
let spec: GlobalConfigSpec = if config_file.is_empty() {
GlobalConfigSpec::default()
} else {
let json = std::fs::read_to_string(config_file)
.unwrap_or_else(|_| panic!("Unable to open {} for reading", config_file));
serde_json::from_str(&json).expect("Error parsing config json")
};
GlobalConfig::new(&spec)
}
#[derive(Parser, Debug)]
#[clap(author, version, about)]
struct Args {
/// Configuration File
#[clap(short, long, default_value = "")]
config: String,
/// Enable verbose logging
#[clap(short, long)]
verbose: bool,
/// Configuration File
#[clap(short, long)]
host: Option<String>,
/// Configuration File
#[clap(short, long)]
port: Option<u32>,
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = Args::parse();
let data = web::Data::new(init(args.config.as_ref()));
let config = data.clone();
let host = if let Some(h) = args.host {
h
} else {
config.ip.clone()
};
let port = if let Some(p) = args.port {
p
} else {
config.port
};
let listen_spec = format!("{}:{}", host, port);
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let res = HttpServer::new(move || {
let cors = Cors::default()
.allow_any_header()
.allow_any_method()
.allow_any_origin()
.send_wildcard();
let json_config = web::JsonConfig::default()
.limit(1048576)
.error_handler(|err, _req| {
use actix_web::error::JsonPayloadError;
let payload = match &err {
JsonPayloadError::OverflowKnownLength { length, limit } => SimpleError {
error: format!("Payload too big ({} > {})", length, limit),
},
JsonPayloadError::Overflow { limit } => SimpleError {
error: format!("Payload too big (> {})", limit),
},
JsonPayloadError::ContentType => SimpleError {
error: "Unsupported Content-Type".to_owned(),
},
JsonPayloadError::Deserialize(e) => SimpleError {
error: format!("Parsing error: {}", e),
},
JsonPayloadError::Serialize(e) => SimpleError {
error: format!("JSON Generation error: {}", e),
},
JsonPayloadError::Payload(payload) => SimpleError {
error: format!("Payload error: {}", payload),
},
_ => SimpleError {
error: "Unknown error".to_owned(),
},
};
error::InternalError::from_response(err, HttpResponse::Conflict().json(payload))
.into()
});
App::new()
.wrap(cors)
.app_data(data.clone())
.wrap(Logger::new(
r#"%a "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#,
))
.app_data(json_config)
.route("/ready", web::get().to(ready))
.service(
web::scope("/api/v1")
.route("/resources", web::get().to(get_resources))
.route("/run", web::post().to(submit_task)),
)
})
.bind(listen_spec)?
.run()
.await;
config.executor.send(ExecutorMessage::Stop {}).unwrap();
config.storage.send(StorageMessage::Stop {}).unwrap();
res
}
+286
View File
@@ -0,0 +1,286 @@
//! The Agent executor is essentially a wrapped version of the local executor.
//! It dispatches tasks to remote hosts
use super::*;
use futures::stream::futures_unordered::FuturesUnordered;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use futures::StreamExt;
fn default_as_true() -> bool {
true
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AgentTarget {
pub base_url: String,
#[serde(default)]
pub resources: TaskResources,
#[serde(default)]
pub current_resources: TaskResources,
#[serde(default)]
pub enabled: bool,
}
impl AgentTarget {
fn new(base_url: String, resources: TaskResources) -> Self {
AgentTarget {
base_url,
resources: resources.clone(),
current_resources: resources,
enabled: true,
}
}
async fn refresh_resources(&mut self, client: &reqwest::Client) {
let resource_url = format!("{}/resources", self.base_url);
let disabled = match client.get(resource_url).send().await {
Ok(result) => {
if result.status() == reqwest::StatusCode::OK {
self.resources = result.json().await.unwrap();
self.current_resources = self.resources.clone();
false
} else {
true
}
}
Err(_) => true,
};
if self.enabled && disabled {
warn!("Disabling {}: unable to refresh resources", self.base_url);
}
self.enabled = !disabled;
}
async fn ping(&mut self, client: &reqwest::Client) -> Result<()> {
let resource_url = format!("{}/ready", self.base_url);
let result = client.get(resource_url).send().await?;
self.enabled = result.status() == reqwest::StatusCode::OK;
Ok(())
}
}
/// Contains specifics on how to run a local task
#[derive(Serialize, Deserialize, Clone, Debug)]
struct AgentTaskDetail {
/// The command and all arguments to run
command: Cmd,
/// Environment variables to set
#[serde(default)]
environment: HashMap<String, String>,
/// Timeout in seconds
#[serde(default)]
timeout: i64,
/// resources required by the task
resources: TaskResources,
}
fn extract_details(details: &TaskDetails) -> Result<AgentTaskDetail, serde_json::Error> {
serde_json::from_value::<AgentTaskDetail>(details.clone())
}
fn validate_task(details: &TaskDetails, max_capacities: &[TaskResources]) -> Result<()> {
let parsed = extract_details(details)?;
if max_capacities.is_empty()
|| max_capacities.iter().all(|x| x.values().all(|x| *x == 0))
|| max_capacities
.iter()
.any(|x| x.can_satisfy(&parsed.resources))
{
Ok(())
} else {
Err(anyhow!("No Agent target satisfies the required resources"))
}
}
/// Contains specifics on how to run a local task
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskSubmission {
pub details: TaskDetails,
pub varmap: VarMap,
pub output_options: TaskOutputOptions,
}
async fn submit_task(
base_url: String,
details: TaskDetails,
output_options: TaskOutputOptions,
client: reqwest::Client,
varmap: VarMap,
) -> Result<TaskAttempt> {
let submit_url = format!("{}/run", base_url);
let submission = TaskSubmission {
details,
varmap,
output_options,
};
match client.post(submit_url).json(&submission).send().await {
Ok(result) => {
if result.status() == reqwest::StatusCode::OK {
let mut attempt: TaskAttempt = result.json().await.unwrap();
attempt
.executor
.push(format!("Executed on agent at {}", base_url));
Ok(attempt)
} else {
Err(anyhow!(
"Unable to dispatch to agent at {}: {:?}",
base_url,
result.text().await.unwrap()
))
}
}
Err(e) => Err(anyhow!(
"Unable to dispatch to agent at {}: {:?}",
base_url,
e
)),
}
}
// async fn select_target() -> Option<usize> {}
struct RunningTask {
resources: TaskResources,
target_id: usize,
}
/// The mpsc channel can be sized to fit max parallelism
async fn start_agent_executor(
mut targets: Vec<AgentTarget>,
mut exe_msgs: mpsc::UnboundedReceiver<ExecutorMessage>,
) {
let client = reqwest::Client::new();
for target in &mut targets {
target.refresh_resources(&client).await;
}
let mut max_caps: Vec<TaskResources> = targets.iter().map(|x| x.resources.clone()).collect();
// Set up the local executor
let (le_tx, le_rx) = mpsc::unbounded_channel();
local_executor::start(1, le_rx);
// Tasks waiting to release resources
let mut running = FuturesUnordered::new();
while let Some(msg) = exe_msgs.recv().await {
use ExecutorMessage::*;
match msg {
ValidateTask { details, response } => {
let ltx = le_tx.clone();
let caps = max_caps.clone();
tokio::spawn(async move {
let result = validate_task(&details, &caps);
if result.is_err() {
response.send(result).unwrap_or(());
} else {
ltx.send(ValidateTask { details, response }).unwrap_or(());
}
});
}
ExecuteTask {
details,
varmap,
output_options,
response,
kill,
} => {
let task = extract_details(&details).unwrap();
let resources = task.resources.clone();
loop {
match targets.iter_mut().enumerate().find(|(_, x)| {
x.enabled && x.current_resources.can_satisfy(&task.resources)
}) {
// There is a remote agent with capacity
Some((tid, target)) => {
info!("Dispatching job to {}", target.base_url);
target.current_resources.sub(&resources).unwrap();
let base_url = target.base_url.clone();
let submit_client = client.clone();
running.push(tokio::spawn(async move {
let res = submit_task(
base_url,
details,
output_options,
submit_client,
varmap,
)
.await;
let mut rc = false;
if let Ok(attempt) = res {
response.send(attempt).unwrap();
rc = true;
}
(tid, resources, rc)
}));
break;
}
// No agent has capacity
None => {
// Give the outstanding tasks a chance to complete or agents
// recover
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
// Refresh any disabled targets
for (tid, target) in targets.iter_mut().enumerate() {
if target.enabled {
continue;
}
target.refresh_resources(&client).await;
if target.enabled {
max_caps[tid] = target.resources.clone();
info!("{} is now enabled.", target.base_url);
}
}
// Wait for the next item
if !running.is_empty() {
let result: Result<
(usize, TaskResources, bool),
tokio::task::JoinError,
> = running.next().await.unwrap();
let (tid, resources, submit_ok) = result.unwrap();
if !submit_ok {
warn!(
"Disabling agent at {} due to incomplete submission.",
targets[tid].base_url
);
targets[tid].enabled = false;
}
targets[tid].current_resources.add(&resources);
}
}
}
}
}
/*
msg @ StopTask { .. } => {
le_tx.send(msg).unwrap_or(());
}
*/
Stop {} => {
break;
}
}
}
}
pub fn start(
targets: Vec<AgentTarget>,
msgs: mpsc::UnboundedReceiver<ExecutorMessage>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
start_agent_executor(targets, msgs).await;
})
}
+3 -30
View File
@@ -13,24 +13,6 @@ use tokio::io::AsyncReadExt;
type Environment = HashMap<String, Option<String>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
enum Cmd {
Simple(String),
Split(Vec<String>),
}
impl Cmd {
fn generate(&self, varmap: &VarMap) -> Vec<String> {
let cmd = match self {
Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(),
Cmd::Split(v) => v.clone(),
};
cmd.into_iter().map(|x| varmap.apply_to(&x)).collect()
}
}
/// Contains specifics on how to run a local task
#[derive(Serialize, Deserialize, Clone, Debug)]
struct LocalTaskDetail {
@@ -115,6 +97,8 @@ async fn run_task(
let (program, args) = cmd.split_first().unwrap();
attempt.executor.push(format!("{:?}\n", details));
debug!("Running command {:?}", cmd);
let mut command = Command::new(program);
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
@@ -255,12 +239,9 @@ pub async fn start_local_executor(
});
}
ExecuteTask {
task_name,
interval,
details,
varmap,
output_options,
storage,
response,
kill,
} => {
@@ -277,15 +258,7 @@ pub async fn start_local_executor(
..TaskAttempt::new()
},
};
let rc = attempt.succeeded;
storage
.send(StorageMessage::StoreAttempt {
task_name,
interval,
attempt,
})
.unwrap();
response.send(rc).unwrap();
response.send(attempt).unwrap();
}));
}
Stop {} => {
+20 -4
View File
@@ -1,4 +1,5 @@
use super::*;
pub mod agent_executor;
pub mod local_executor;
/// Messages for interacting with an Executor
@@ -16,13 +17,10 @@ pub enum ExecutorMessage {
/// Errors
/// Will return `Err` if the tasks are invalid, according to the executor
ExecuteTask {
task_name: String,
interval: Interval,
details: serde_json::Value,
varmap: VarMap,
output_options: TaskOutputOptions,
storage: mpsc::UnboundedSender<StorageMessage>,
response: oneshot::Sender<bool>,
response: oneshot::Sender<TaskAttempt>,
kill: oneshot::Receiver<()>,
},
Stop {},
@@ -32,6 +30,24 @@ fn default_bytes() -> usize {
20480
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum Cmd {
Simple(String),
Split(Vec<String>),
}
impl Cmd {
pub fn generate(&self, varmap: &VarMap) -> Vec<String> {
let cmd = match self {
Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(),
Cmd::Split(v) => v.clone(),
};
cmd.into_iter().map(|x| varmap.apply_to(&x)).collect()
}
}
/// Options in how to handle task output. Some tasks can be quite
/// verbose, and the output may not be needed.
#[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)]
+6
View File
@@ -126,6 +126,12 @@ impl IntervalSet {
pub fn difference(&self, other: &Self) -> Self {
self.intersection(&other.complement())
}
/// Subtract all intervals in `other` from self
/// both sides must be sorted
pub fn subtract(&mut self, other: &Self) {
self.0 = self.difference(other).0;
}
}
impl Deref for IntervalSet {
type Target = Vec<Interval>;
+2 -6
View File
@@ -1,10 +1,12 @@
#![allow(unused_imports)]
#![allow(dead_code)]
#![feature(slice_group_by)]
use anyhow::{anyhow, Result};
use chrono::prelude::*;
use chrono::{Duration, TimeZone};
use chrono_tz::Tz;
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use tokio::sync::{mpsc, oneshot};
@@ -42,9 +44,3 @@ pub mod task;
pub mod task_set;
pub mod varmap;
pub mod world;
/*
TODO:
target_state -> TaskSet.coverage()
current state
*/
+6 -2
View File
@@ -1,6 +1,10 @@
pub use chrono::prelude::*;
pub use chrono_tz::*;
pub use crate::calendar::Calendar;
pub use crate::executors::*;
pub use crate::runner::Runner;
pub use crate::interval::Interval;
pub use crate::runner::{ActionState, Runner, RunnerMessage};
pub use crate::storage::*;
pub use crate::task::TaskDefinition;
pub use crate::task::{TaskDefinition, TaskResources};
pub use crate::world::WorldDefinition;
+33
View File
@@ -18,6 +18,8 @@ pub trait Satisfiable {
schedule: &Schedule,
available: &HashMap<String, IntervalSet>,
) -> bool;
fn resources(&self) -> HashSet<Resource>;
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
@@ -29,6 +31,23 @@ pub enum AggregateRequirement {
}
impl Satisfiable for AggregateRequirement {
fn resources(&self) -> HashSet<Resource> {
match self {
AggregateRequirement::All(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
}),
AggregateRequirement::Any(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
}),
AggregateRequirement::None(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
}),
}
}
fn is_satisfied(
&self,
interval: Interval,
@@ -76,6 +95,13 @@ pub enum SingleRequirement {
}
impl Satisfiable for SingleRequirement {
fn resources(&self) -> HashSet<Resource> {
match self {
SingleRequirement::Offset { resource, .. } => HashSet::from([resource.to_owned()]),
SingleRequirement::File { path } => HashSet::new(),
}
}
fn is_satisfied(
&self,
interval: Interval,
@@ -145,6 +171,13 @@ impl Satisfiable for Requirement {
Requirement::Group(req) => req.can_be_satisfied(interval, schedule, available),
}
}
fn resources(&self) -> HashSet<Resource> {
match self {
Requirement::One(req) => req.resources(),
Requirement::Group(req) => req.resources(),
}
}
}
#[cfg(test)]
+392 -169
View File
@@ -1,6 +1,8 @@
use super::*;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::StreamExt;
use std::cmp::Ordering;
use std::collections::VecDeque;
/*
Runner is responsible for taking a TaskSet and a varmap and
@@ -12,7 +14,7 @@ use futures::StreamExt;
- current = TaskSet::coverage (the theoretical)
*/
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, PartialOrd)]
pub enum ActionState {
Queued,
Running,
@@ -20,26 +22,54 @@ pub enum ActionState {
Completed,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy, Serialize)]
pub struct Action {
task: String,
interval: Interval,
state: ActionState,
task: usize,
pub interval: Interval,
pub state: ActionState,
// kill: Option<oneshot::Receiver<()>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum RunnerEvent {
Start,
TaskFailed {
task_name: String,
pub struct RunnerState {
coverage: ResourceInterval,
current: ResourceInterval,
}
// Eventually we want to coerce the data into this format for timelines-chart
// Resource (group) -> Task (label) -> data [ { "timeRange": [date,date], "val": state } ]
pub type ResourceStateDetails = HashMap<Resource, HashMap<String, Vec<Action>>>;
#[derive(Debug)]
pub enum RunnerMessage {
Tick,
PollMessages,
ActionCompleted {
action_id: usize,
succeeded: bool,
},
RetryAction {
action_id: usize,
},
/// Marks all resources in the set available over the interval
ForceUp {
resources: HashSet<String>,
interval: Interval,
},
TaskCompleted {
task_name: String,
/// Marks all resources in the set as down over _at least_ the interval.
/// Will cause a re-check / re-gen
ForceDown {
resources: HashSet<String>,
interval: Interval,
},
Timeout,
GetState {
response: oneshot::Sender<RunnerState>,
},
GetResourceStateDetails {
interval: Interval,
response: oneshot::Sender<ResourceStateDetails>,
max_intervals: Option<usize>,
},
Stop,
}
@@ -54,23 +84,17 @@ pub struct Runner {
target: ResourceInterval,
current: ResourceInterval,
queue: Vec<Action>,
actions: Vec<Action>,
qidx: usize,
events: FuturesUnordered<tokio::task::JoinHandle<RunnerEvent>>,
events: FuturesUnordered<tokio::task::JoinHandle<RunnerMessage>>,
last_horizon: DateTime<Utc>,
messages: mpsc::UnboundedReceiver<RunnerMessage>,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
}
fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<RunnerEvent> {
tokio::spawn(async move {
tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await;
RunnerEvent::Timeout
})
}
async fn validate_cmd(
executor: mpsc::UnboundedSender<ExecutorMessage>,
cmd: serde_json::Value,
@@ -95,36 +119,43 @@ async fn run_task(
output_options: &TaskOutputOptions,
varmap: &VarMap,
) -> bool {
println!("Running {}/{}", task_name, interval);
info!("Running {}/{}", task_name, interval);
let (response, response_rx) = oneshot::channel();
executor
.send(ExecutorMessage::ExecuteTask {
task_name,
interval,
details,
output_options: output_options.clone(),
varmap: varmap.clone(),
response,
kill,
storage,
})
.unwrap();
response_rx.await.unwrap()
let attempt = response_rx.await.unwrap();
let rc = attempt.succeeded;
storage
.send(StorageMessage::StoreAttempt {
task_name,
interval,
attempt: attempt.clone(),
})
.unwrap();
rc
}
async fn up_task(
action_id: usize,
task_name: String,
interval: Interval,
kill: oneshot::Receiver<()>,
_kill: oneshot::Receiver<()>,
varmap: VarMap,
up: TaskDetails,
check: Option<TaskDetails>,
output_options: TaskOutputOptions,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
) -> RunnerEvent {
) -> RunnerMessage {
if let Some(check_cmd) = check.clone() {
let (subkill, subkill_rx) = oneshot::channel();
let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task(
task_name.clone(),
interval,
@@ -139,15 +170,15 @@ async fn up_task(
// If check succeeded, resources are up
if succeeded {
return RunnerEvent::TaskCompleted {
task_name,
interval,
return RunnerMessage::ActionCompleted {
action_id,
succeeded: true,
};
}
}
// UP
let (subkill, subkill_rx) = oneshot::channel();
let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task(
task_name.clone(),
interval,
@@ -160,15 +191,15 @@ async fn up_task(
)
.await;
if !succeeded {
return RunnerEvent::TaskFailed {
task_name,
interval,
return RunnerMessage::ActionCompleted {
action_id,
succeeded: false,
};
}
// recheck
if let Some(check_cmd) = check {
let (subkill, subkill_rx) = oneshot::channel();
let (_subkill, subkill_rx) = oneshot::channel();
let succeeded = run_task(
task_name.clone(),
interval,
@@ -183,33 +214,79 @@ async fn up_task(
// If check succeeded, resources are up
if succeeded {
RunnerEvent::TaskCompleted {
task_name,
interval,
}
return RunnerMessage::ActionCompleted {
action_id,
succeeded: true,
};
} else {
RunnerEvent::TaskFailed {
task_name,
interval,
}
return RunnerMessage::ActionCompleted {
action_id,
succeeded: false,
};
}
} else {
RunnerEvent::TaskCompleted {
task_name,
interval,
return RunnerMessage::ActionCompleted {
action_id,
succeeded: true,
};
}
}
fn delayed_event(delay: Duration, event: RunnerMessage) -> tokio::task::JoinHandle<RunnerMessage> {
tokio::spawn(async move {
tokio::time::sleep(delay.to_std().unwrap()).await;
event
})
}
// Coalesces adjascent actions
fn coalesce_actions(mut actions: Vec<Action>) -> Vec<Action> {
if actions.is_empty() {
return actions;
}
actions.sort_unstable_by(|a, b| {
let ord = a.task.partial_cmp(&b.task).unwrap();
if ord == Ordering::Equal {
a.state.partial_cmp(&b.state).unwrap()
} else {
ord
}
});
let mut res: Vec<Action> = Vec::new();
for group in actions.group_by(|a, b| a.task == b.task && a.state == b.state) {
let intervals: Vec<Interval> = group.iter().map(|x| x.interval).collect();
let is = IntervalSet::from(intervals);
let task = group.first().unwrap().task;
let state = group.first().unwrap().state;
for interval in is.iter() {
res.push(Action {
task: task,
state: state,
interval: *interval,
})
}
}
res
}
impl Runner {
pub async fn new(
tasks: TaskSet,
vars: VarMap,
messages: mpsc::UnboundedReceiver<RunnerMessage>,
executor: mpsc::UnboundedSender<ExecutorMessage>,
storage: mpsc::UnboundedSender<StorageMessage>,
output_options: TaskOutputOptions,
force_check: bool,
) -> Result<Self> {
for tdef in tasks.values() {
tasks.validate()?;
// Validate the task commands can run on the executor
for tdef in tasks.iter() {
validate_cmd(executor.clone(), tdef.up.clone()).await?;
if let Some(cmd) = &tdef.down {
validate_cmd(executor.clone(), cmd.clone()).await?;
@@ -219,172 +296,315 @@ impl Runner {
}
}
let (response, rx) = oneshot::channel();
storage
.send(StorageMessage::LoadState { response })
.unwrap();
let current = rx.await.unwrap();
// Load last-known state
let current = if force_check {
info!("Force re-check set, starting with empty current state.");
ResourceInterval::new()
} else {
info!("Pulling last state from storage");
let (response, rx) = oneshot::channel();
storage
.send(StorageMessage::LoadState { response })
.unwrap();
let res = rx.await.unwrap();
res
};
// let target = current.clone();
let target = ResourceInterval::new();
let end_state = tasks.coverage()?;
let end_state = tasks.coverage();
let mut runner = Runner {
tasks,
vars,
output_options,
end_state,
target: ResourceInterval::new(),
target,
current,
queue: Vec::new(),
actions: Vec::new(),
qidx: 0,
events: FuturesUnordered::new(),
last_horizon: DateTime::<Utc>::MIN_UTC,
messages,
executor,
storage,
};
runner.tick()?;
runner.update_target();
Ok(runner)
}
pub fn tick(&mut self) -> Result<()> {
let target = self.tasks.get_state(Utc::now())?;
// Generate a new target state and generate any required actions
pub fn update_target(&mut self) {
let new_target = self.tasks.get_state(Utc::now() + Duration::days(1));
let new_required = new_target.difference(&self.target);
let mut new_actions =
self.tasks
.iter()
.enumerate()
.fold(Vec::new(), |mut acc, (idx, task)| {
let get_state = |intv: Interval| {
if task.provides.iter().all(|res| {
self.current.contains_key(res) && self.current[res].has_subset(intv)
}) {
ActionState::Completed
} else {
ActionState::Queued
}
};
let res: Vec<Action> = task
.generate_intervals(&new_required)
.unwrap()
.into_iter()
.map({
|interval| Action {
task: idx,
interval,
state: get_state(interval),
}
})
.collect();
acc.extend(res);
acc
});
new_actions.sort_unstable_by(|a, b| a.interval.end.partial_cmp(&b.interval.end).unwrap());
// Create queue
let required = target.difference(&self.current);
self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| {
let res: Vec<Action> = task
.generate_intervals(&required)
.unwrap()
.into_iter()
.map({
|interval| Action {
task: name.clone(),
interval,
state: ActionState::Queued,
}
})
.collect();
acc.extend(res);
acc
});
info!("Tick: Generated {} new actions", new_actions.len());
self.actions.extend(new_actions);
}
// Ensure that all actions can be satisfied
let unsatisfied = self
.queue
.iter()
.filter(|act| {
!self
.tasks
.get(&act.task)
.unwrap()
.can_be_satisfied(act.interval, &target)
})
.fold(HashSet::new(), |mut acc, a| {
acc.insert(a.task.clone());
fn tick(&mut self) {
debug!("Tick");
// Enqueue new messages
while let Ok(msg) = self.messages.try_recv() {
self.events.push(delayed_event(Duration::seconds(0), msg));
}
match self.actions.last() {
Some(action) => {
if action.interval.end <= Utc::now() {
self.tick()
}
}
None => self.tick(),
}
// Perform maintenance
self.queue_actions();
self.events.push(delayed_event(
Duration::milliseconds(250),
RunnerMessage::Tick,
));
}
fn poll_messages(&mut self) {
while let Ok(msg) = self.messages.try_recv() {
self.events.push(delayed_event(Duration::seconds(0), msg));
}
self.events.push(delayed_event(
Duration::milliseconds(10),
RunnerMessage::PollMessages,
));
}
fn get_resource_state_details(
&self,
interval: Interval,
response: oneshot::Sender<ResourceStateDetails>,
max_intervals: Option<usize>,
) {
// HashMap<Resource, HashMap<String, Vec<(DateTime<Utc>, DateTime<Utc>, ActionState)>>>;
let mut res: ResourceStateDetails = HashMap::new();
let all_resources: HashSet<Resource> =
self.tasks.iter().fold(HashSet::new(), |mut acc, t| {
acc.extend(t.provides.clone());
acc
});
// Ensure current +
let mut result_state = self.current.clone();
for action in &self.queue {
for res in &self.tasks.get(&action.task).unwrap().provides {
result_state
.entry(res.clone())
.or_insert(IntervalSet::new())
.insert(action.interval);
// Build out the hash
for resource in all_resources {
let mut res_ints = HashMap::new();
for task in self.tasks.iter() {
if task.provides.contains(&resource) {
res_ints.insert(task.name.clone(), Vec::new());
}
}
res.insert(resource.clone(), res_ints);
}
let mut actions: Vec<Action> = self
.actions
.iter()
.filter(|x| interval.is_contiguous(x.interval))
.cloned()
.collect();
if let Some(max_intv) = max_intervals {
if actions.len() > max_intv {
actions = coalesce_actions(actions);
}
}
if result_state != target {
return Err(anyhow!(
"Actions generated produce\n\t{:?}\nExpected\n\t{:?}",
result_state,
target
));
info!(
"Filtered {} actions down to {}",
self.actions.len(),
actions.len()
);
for action in actions {
let task = &self.tasks[action.task];
for resource in &task.provides {
res.get_mut(resource)
.unwrap()
.get_mut(&task.name)
.unwrap()
.push(action);
}
}
if unsatisfied.is_empty() {
self.target = target;
Ok(())
} else {
Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied))
}
response.send(res).unwrap();
}
// We'll be using channels for running
pub async fn run(&mut self, stop: oneshot::Receiver<RunnerEvent>) {
self.events.push(tokio::spawn(async move {
// This recv will fail if the channel is shutdown, so just ignore it.
stop.await.unwrap_or(RunnerEvent::Stop);
RunnerEvent::Stop
}));
self.queue_actions();
pub async fn run(&mut self, mut stay_up: bool) {
self.tick();
self.poll_messages();
// Loop while we can make progress
while !self.is_done() {
// Loop until the current state matches the end state
while stay_up || !self.is_done() {
match self.events.next().await {
Some(Ok(RunnerEvent::Start)) => {
self.queue_actions();
Some(Ok(RunnerMessage::GetState { response })) => {
response
.send(RunnerState {
current: self.current.clone(),
coverage: self.end_state.clone(),
})
.unwrap_or(());
}
Some(Ok(RunnerEvent::Stop)) => {
Some(Ok(RunnerMessage::PollMessages)) => {
self.poll_messages();
}
Some(Ok(RunnerMessage::Tick)) => {
self.tick();
}
Some(Ok(RunnerMessage::GetResourceStateDetails {
interval,
response,
max_intervals,
})) => {
self.get_resource_state_details(interval, response, max_intervals);
}
Some(Ok(RunnerMessage::ForceUp {
resources,
interval,
})) => {
for (tid, task) in self.tasks.iter().enumerate() {
if task.provides.is_subset(&resources) {
let aligned_is =
IntervalSet::from(task.schedule.align_interval(interval));
for resource in &task.provides {
self.current.get_mut(resource).unwrap().merge(&aligned_is);
}
for action in &mut self.actions {
if action.task == tid && aligned_is.has_subset(action.interval) {
action.state = ActionState::Completed;
}
}
}
}
self.store_state();
}
Some(Ok(RunnerMessage::ForceDown {
resources,
interval,
})) => {
// Use the interval to identify
for (tid, task) in self.tasks.iter().enumerate() {
if task.provides.is_subset(&resources) {
let aligned_is =
IntervalSet::from(task.schedule.align_interval(interval));
for resource in &task.provides {
self.current
.get_mut(resource)
.unwrap()
.subtract(&aligned_is);
}
for action in &mut self.actions {
if action.task == tid && aligned_is.has_subset(action.interval) {
action.state = ActionState::Queued;
}
}
}
}
self.store_state();
}
Some(Ok(RunnerMessage::Stop)) => {
info!("Stopping");
stay_up = false;
break;
}
Some(Ok(RunnerEvent::Timeout)) => {
self.queue_actions();
Some(Ok(RunnerMessage::RetryAction { action_id })) => {
info!("Retrying action {}", action_id);
let action = &mut self.actions[action_id];
action.state = ActionState::Queued;
}
Some(Ok(RunnerEvent::TaskFailed {
task_name,
interval,
Some(Ok(RunnerMessage::ActionCompleted {
action_id,
succeeded,
})) => {
println!("FAILED: {} / {}", task_name, interval);
println!("Well that sucks");
}
Some(Ok(RunnerEvent::TaskCompleted {
task_name,
interval,
})) => {
println!("Completing {}/{}", task_name, interval);
let action = self
.queue
.iter_mut()
.find(|x| x.task == task_name && x.interval == interval)
.unwrap();
let task = self.tasks.get(&task_name).unwrap();
action.state = ActionState::Completed;
for res in &task.provides {
self.current
.entry(res.clone())
.or_insert(IntervalSet::new())
.insert(action.interval);
}
self.storage
.send(StorageMessage::StoreState {
state: self.current.clone(),
})
.unwrap();
self.queue_actions();
self.complete_task(action_id, succeeded);
}
Some(Err(e)) => {
panic!("Something went wrong: {:?}", e)
}
None => {
// No pending actions waiting
// Can probably wait to the next event
continue;
}
None => {}
}
// Log stuff
}
}
fn complete_task(&mut self, action_id: usize, succeeded: bool) {
info!("Completing action {}", action_id);
let action = &mut self.actions[action_id];
if succeeded {
let task = self.tasks.get(action.task).unwrap();
action.state = ActionState::Completed;
for res in &task.provides {
self.current
.entry(res.clone())
.or_insert(IntervalSet::new())
.insert(action.interval);
}
self.store_state();
self.queue_actions();
} else {
action.state = ActionState::Errored;
self.events.push(delayed_event(
Duration::seconds(30),
RunnerMessage::RetryAction { action_id },
));
}
}
fn store_state(&self) {
self.storage
.send(StorageMessage::StoreState {
state: self.current.clone(),
})
.unwrap();
}
fn queue_actions(&mut self) {
let now = Utc::now();
// Collect any outstanding futures
for action in self.queue[self.qidx..]
// Submit any elligible jobs
for (action_id, action) in self
.actions
.iter_mut()
.filter(|x| x.state == ActionState::Queued && x.interval.end <= now)
.enumerate()
.filter(|(_, x)| x.state == ActionState::Queued && x.interval.end <= now)
{
let task = self.tasks.get(&action.task).unwrap();
let task = self.tasks.get(action.task).unwrap();
if !task.can_run(action.interval, &self.current) {
continue;
}
@@ -393,7 +613,7 @@ impl Runner {
.iter()
.chain(self.vars.iter())
.collect();
let task_name = action.task.clone();
let task_name = task.name.clone();
let interval = action.interval;
let up = task.up.clone();
let check = task.check.clone();
@@ -402,6 +622,7 @@ impl Runner {
let storage = self.storage.clone();
self.events.push(tokio::spawn(async move {
up_task(
action_id,
task_name.clone(),
interval,
kill,
@@ -483,24 +704,26 @@ mod tests {
// Storage
let (storage_tx, storage_rx) = mpsc::unbounded_channel();
let storage = redis_store::start(
let storage = storage::redis::start(
storage_rx,
"redis://localhost".to_owned(),
"world_test".to_owned(),
);
let (runner_tx, runner_rx) = mpsc::unbounded_channel();
let mut runner = Runner::new(
tasks,
world_def.variables,
runner_rx,
tx.clone(),
storage_tx.clone(),
world_def.output_options,
true,
)
.await
.unwrap();
let (wtx, wrx) = oneshot::channel();
runner.run(wrx).await;
runner.run(false).await;
tx.send(ExecutorMessage::Stop {}).unwrap();
executor.await.unwrap();
+29 -5
View File
@@ -21,6 +21,32 @@ impl Schedule {
}
}
fn is_end_time<T: TimeZone>(&self, dt: DateTime<T>) -> bool {
// Need to get the current interval, then offset it
let at = dt.with_timezone(&self.timezone);
self.times.iter().any(|x| *x == at.time())
&& self.calendar.includes(at.date().naive_local())
}
/// Given an interval I, return the interval J that is the smallest
/// set of schedule intervals that completely contain I.
/// If the given interval is bounded by MIN_TIME or MAX_TIME, then the
/// returned interval will be likewise bounded
pub fn align_interval(&self, interval: Interval) -> Interval {
let st = if interval.start == MIN_TIME {
self.next_time(interval.start).with_timezone(&Utc)
} else {
interval.start
};
let et = if interval.end == MAX_TIME {
self.prev_time(interval.end).with_timezone(&Utc)
} else {
interval.end
};
Interval::new(self.interval(st, 0).start, self.interval(et, 0).end)
}
pub fn generate(&self, interval: Interval) -> Vec<Interval> {
if self.times.is_empty() {
return Vec::new();
@@ -67,9 +93,7 @@ impl Schedule {
let at = dt.with_timezone(&self.timezone);
// If the time is at an edge
let rt = if self.times.iter().any(|x| *x == at.time())
&& self.calendar.includes(at.date().naive_local())
{
let rt = if self.is_end_time(at) {
at
} else {
self.next_time(at)
@@ -82,7 +106,7 @@ impl Schedule {
)
}
pub fn next_time(&self, dt: DateTime<Tz>) -> DateTime<Tz> {
pub fn next_time<T: TimeZone>(&self, dt: DateTime<T>) -> DateTime<Tz> {
let st = dt.with_timezone(&self.timezone);
let mut date = st.date().naive_local();
@@ -108,7 +132,7 @@ impl Schedule {
}
/// Given a time, generate the preceding interval according to the schedule
pub fn prev_time(&self, dt: DateTime<Tz>) -> DateTime<Tz> {
pub fn prev_time<T: TimeZone>(&self, dt: DateTime<T>) -> DateTime<Tz> {
let st = dt.with_timezone(&self.timezone);
let mut date = st.date().naive_local();
+4 -1
View File
@@ -1,9 +1,11 @@
use super::*;
use crate::executors::TaskAttempt;
use crate::runner::ActionState;
/// Messages for interacting with an Executor
#[derive(Debug)]
pub enum StorageMessage {
Clear {},
StoreAttempt {
task_name: String,
interval: Interval,
@@ -25,4 +27,5 @@ pub enum StorageMessage {
Stop {},
}
pub mod redis_store;
pub mod noop;
pub mod redis;
+32
View File
@@ -0,0 +1,32 @@
use super::*;
/// The mpsc channel can be sized to fit max parallelism
pub async fn start_storage(mut msgs: mpsc::UnboundedReceiver<StorageMessage>) -> Result<()> {
let mut current_state = ResourceInterval::new();
while let Some(msg) = msgs.recv().await {
use StorageMessage::*;
match msg {
Clear {} => {
current_state = ResourceInterval::new();
}
StoreAttempt { .. } => {}
StoreState { state } => {
current_state = state;
}
LoadState { response } => {
response.send(current_state.clone()).unwrap();
}
Stop {} => {
break;
}
}
}
Ok(())
}
pub fn start(msgs: mpsc::UnboundedReceiver<StorageMessage>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
start_storage(msgs).await.expect("Unable to start storage");
})
}
@@ -1,5 +1,7 @@
use super::*;
extern crate redis;
use futures::prelude::*;
use redis::AsyncCommands;
@@ -15,22 +17,47 @@ pub async fn start_redis_storage(
while let Some(msg) = msgs.recv().await {
use StorageMessage::*;
match msg {
Clear {} => {
let mut keys = Vec::new();
{
let mut iter: redis::AsyncIter<String> =
conn.scan_match(format!("{}:*", prefix)).await?;
while let Some(key) = iter.next_item().await {
keys.push(key);
}
}
for key in keys {
conn.del(key).await?;
}
}
StoreAttempt {
task_name,
interval,
attempt,
} => {
let tag = format!("{}_{}_{}", prefix, task_name, interval.end);
let tag = format!("{}:{}_{}", prefix, task_name, interval.end);
let payload = serde_json::to_string(&attempt).unwrap();
conn.rpush(&tag, &payload).await?;
}
/*
SetTaskIntervalState {
task_name,
interval,
state,
} => {
let map = format!("{}:task_interval_states", prefix);
let key = format!("{}_{}-{}", task_name, interval.start, interval.end);
let value = serde_json::to_string(&state).unwrap();
conn.hset(&map, &key, &value).await?;
}
*/
StoreState { state } => {
let tag = format!("{}_state", prefix);
let tag = format!("{}:state", prefix);
let payload = serde_json::to_string(&state).unwrap();
conn.set(&tag, &payload).await?;
}
LoadState { response } => {
let tag = format!("{}_state", prefix);
let tag = format!("{}:state", prefix);
let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned());
let is: ResourceInterval = serde_json::from_str(&payload).unwrap();
response.send(is).unwrap();
+79 -5
View File
@@ -1,4 +1,63 @@
use super::*;
use std::ops::{Deref, DerefMut};
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct TaskResources(HashMap<String, i64>);
impl Deref for TaskResources {
type Target = HashMap<String, i64>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for TaskResources {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl TaskResources {
#[must_use]
pub fn new() -> Self {
TaskResources(HashMap::new())
}
#[must_use]
pub fn can_satisfy(&self, requirements: &TaskResources) -> bool {
requirements
.iter()
.all(|(k, v)| self.contains_key(k) && self[k] >= *v)
}
/// Subtracts resources from available resources.
/// # Errors
/// Returns an `Err` if the requested resources cannot be fulfilled
/// # Panics
/// It doesn't, keys are checked for ahead-of-time
pub fn sub(&mut self, resources: &TaskResources) -> Result<()> {
if self.can_satisfy(resources) {
for (k, v) in resources.iter() {
*self.get_mut(k).unwrap() -= v;
}
Ok(())
} else {
Err(anyhow!("Cannot satisfy requested resources"))
}
}
/// # Panics
/// It doesn't, keys are checked for ahead-of-time
pub fn add(&mut self, resources: &TaskResources) {
for (k, v) in resources.iter() {
if self.contains_key(k) {
*self.get_mut(k).unwrap() += *v;
} else {
self.insert(k.clone(), *v);
}
}
}
}
/// Defines the struct to parse for tasks
#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
@@ -40,7 +99,7 @@ pub struct TaskDefinition {
}
impl TaskDefinition {
pub fn to_task(&self, calendar: &Calendar) -> Task {
pub fn to_task(&self, name: &str, calendar: &Calendar) -> Task {
let schedule = Schedule::new(calendar.clone(), self.times.clone(), self.timezone);
/*
The valid_{from,to} interval must be aligned to the actual schedule.
@@ -53,6 +112,12 @@ impl TaskDefinition {
)
.start;
let provides = if self.provides.is_empty() {
HashSet::from([name.to_owned()])
} else {
self.provides.clone()
};
let end = match self.valid_to {
Some(nt) => self.timezone.from_local_datetime(&nt).unwrap(),
None => MAX_TIME.with_timezone(&self.timezone),
@@ -61,11 +126,12 @@ impl TaskDefinition {
let actual_end = schedule.interval(end, 0).start;
Task {
name: name.to_owned(),
up: self.up.clone(),
down: self.down.clone(),
check: self.check.clone(),
provides: self.provides.clone(),
provides,
requires: self.requires.clone(),
schedule: schedule,
@@ -82,6 +148,7 @@ impl TaskDefinition {
*/
#[derive(Clone, Serialize, Debug)]
pub struct Task {
pub name: String,
pub up: TaskDetails,
pub down: Option<TaskDetails>,
pub check: Option<TaskDetails>,
@@ -173,6 +240,13 @@ impl Task {
.all(|req| req.can_be_satisfied(interval, &self.schedule, available))
}
pub fn requires_resources(&self) -> HashSet<Resource> {
self.requires.iter().fold(HashSet::new(), |mut acc, req| {
acc.extend(req.resources());
acc
})
}
pub fn up(&self, interval: &Interval) -> Result<HashSet<String>> {
if self.check(interval) {
Ok(self.provides.clone())
@@ -250,7 +324,7 @@ mod tests {
// Produces a std
let cal = Calendar::new();
let task = task_def.to_task(&cal);
let task = task_def.to_task("test", &cal);
// Assert the valid interval is correct
assert_eq!(
@@ -334,7 +408,7 @@ mod tests {
let cal = Calendar::new();
{
let task_def: TaskDefinition = serde_json::from_str(task_json).unwrap();
let task = task_def.to_task(&cal);
let task = task_def.to_task("task", &cal);
// Assert the valid interval is correct
assert_eq!(
@@ -354,7 +428,7 @@ mod tests {
task_def.valid_from = NaiveDate::from_ymd(2022, 1, 1).and_hms(9, 0, 0);
task_def.valid_to = Some(NaiveDate::from_ymd(2022, 1, 7).and_hms(17, 0, 0));
let task = task_def.to_task(&cal);
let task = task_def.to_task("task", &cal);
// Assert the valid interval is correct
assert_eq!(
+70 -26
View File
@@ -3,50 +3,94 @@ use std::convert::From;
use std::ops::{Deref, DerefMut};
#[derive(Clone, Debug)]
pub struct TaskSet(HashMap<String, Task>);
pub struct TaskSet(Vec<Task>);
impl TaskSet {
pub fn new() -> Self {
TaskSet(HashMap::new())
TaskSet(Vec::new())
}
pub fn coverage(&self) -> Result<ResourceInterval> {
pub fn coverage(&self) -> ResourceInterval {
self.get_state(MAX_TIME)
}
pub fn validate(&self) -> Result<()> {
self.get_state(MAX_TIME)?;
Ok(())
}
let state = self.coverage();
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> Result<ResourceInterval> {
let mut res = ResourceInterval::new();
let timeline = IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)));
// Insert all of the covered items
for task in self.values() {
let task_timeline = task.valid_over.intersection(&timeline);
for resource in &task.provides {
let ris = res.entry(resource.clone()).or_insert(IntervalSet::new());
let already_provided = ris.intersection(&task_timeline);
if !already_provided.is_empty() {
// Ensures that all requirements are met
for task in &self.0 {
for resource in task.requires_resources() {
if !state.contains_key(&resource) {
return Err(anyhow!(
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
resource,
already_provided
"Task {} requires resource {}, which isn't produced.",
task.name,
resource
));
}
ris.merge(&task_timeline);
}
}
Ok(res)
// TODO Ensure that all resources will be produced over the valid_over interval
// validate that no task generates the same resource on overlapping times
let providers: HashMap<Resource, Vec<usize>> =
self.0
.iter()
.enumerate()
.fold(HashMap::new(), |mut acc, (idx, t)| {
for res in &t.provides {
acc.entry(res.clone()).or_insert(Vec::new()).push(idx)
}
acc
});
for (res, tids) in providers {
let mut is = IntervalSet::new();
for tid in tids {
let already_provided = is.intersection(&self.0[tid].valid_over);
if !already_provided.is_empty() {
return Err(anyhow!(
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
res,
already_provided
));
}
is.merge(&self.0[tid].valid_over);
}
}
Ok(())
}
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> ResourceInterval {
let mut res = ResourceInterval::new();
// Insert all of the covered items
for task in &self.0 {
// Need to align each of these intervals with a scheduled time
let timeline = if time < MAX_TIME {
let cur_intv = task.schedule.interval(time.clone(), 0);
if cur_intv.end > time {
IntervalSet::from(Interval::new(MIN_TIME, cur_intv.start))
} else {
IntervalSet::from(Interval::new(MIN_TIME, cur_intv.end))
}
} else {
IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)))
};
let task_timeline = task.valid_over.intersection(&timeline);
for resource in &task.provides {
res.entry(resource.clone())
.or_insert(IntervalSet::new())
.merge(&task_timeline);
}
}
res
}
}
impl Deref for TaskSet {
type Target = HashMap<String, Task>;
type Target = Vec<Task>;
fn deref(&self) -> &Self::Target {
&self.0
}
@@ -58,8 +102,8 @@ impl DerefMut for TaskSet {
}
}
impl From<HashMap<String, Task>> for TaskSet {
fn from(data: HashMap<String, Task>) -> Self {
impl From<Vec<Task>> for TaskSet {
fn from(data: Vec<Task>) -> Self {
Self(data)
}
}
+2 -7
View File
@@ -26,15 +26,10 @@ impl WorldDefinition {
));
}
}
let tasks: HashMap<String, Task> = self
let tasks: Vec<Task> = self
.tasks
.iter()
.map(|(tn, td)| {
(
tn.clone(),
td.to_task(self.calendars.get(&td.calendar_name).unwrap()),
)
})
.map(|(tn, td)| td.to_task(tn, self.calendars.get(&td.calendar_name).unwrap()))
.collect();
let ts = TaskSet::from(tasks);
+14
View File
@@ -0,0 +1,14 @@
/* eslint-env node */
require('@rushstack/eslint-patch/modern-module-resolution')
module.exports = {
root: true,
'extends': [
'plugin:vue/vue3-essential',
'eslint:recommended',
'@vue/eslint-config-prettier'
],
parserOptions: {
ecmaVersion: 'latest'
}
}
+28
View File
@@ -0,0 +1,28 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
lerna-debug.log*
node_modules
.DS_Store
dist
dist-ssr
coverage
*.local
/cypress/videos/
/cypress/screenshots/
# Editor directories and files
.vscode/*
!.vscode/extensions.json
.idea
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
+1
View File
@@ -0,0 +1 @@
{}
+3
View File
@@ -0,0 +1,3 @@
{
"recommendations": ["Vue.volar", "Vue.vscode-typescript-vue-plugin"]
}
+35
View File
@@ -0,0 +1,35 @@
# waterfall
This template should help get you started developing with Vue 3 in Vite.
## Recommended IDE Setup
[VSCode](https://code.visualstudio.com/) + [Volar](https://marketplace.visualstudio.com/items?itemName=Vue.volar) (and disable Vetur) + [TypeScript Vue Plugin (Volar)](https://marketplace.visualstudio.com/items?itemName=Vue.vscode-typescript-vue-plugin).
## Customize configuration
See [Vite Configuration Reference](https://vitejs.dev/config/).
## Project Setup
```sh
npm install
```
### Compile and Hot-Reload for Development
```sh
npm run dev
```
### Compile and Minify for Production
```sh
npm run build
```
### Lint with [ESLint](https://eslint.org/)
```sh
npm run lint
```
+15
View File
@@ -0,0 +1,15 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<link rel="icon" href="/favicon.ico" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<script src="https://unpkg.com/timelines-chart"></script>
<link rel="stylesheet" href="/waterfall.css">
<title>Waterfall</title>
</head>
<body background="#FFF">
<div id="app"></div>
<script type="module" src="/src/main.js"></script>
</body>
</html>
+3772
View File
File diff suppressed because it is too large Load Diff
+22
View File
@@ -0,0 +1,22 @@
{
"name": "waterfall",
"version": "0.0.0",
"scripts": {
"dev": "vite",
"build": "vite build",
"preview": "vite preview --port 4173",
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix --ignore-path .gitignore"
},
"dependencies": {
"vue": "^3.2.38"
},
"devDependencies": {
"@rushstack/eslint-patch": "^1.1.4",
"@vitejs/plugin-vue": "^3.0.3",
"@vue/eslint-config-prettier": "^7.0.0",
"eslint": "^8.22.0",
"eslint-plugin-vue": "^9.3.0",
"prettier": "^2.7.1",
"vite": "^3.0.9"
}
}
+39
View File
@@ -0,0 +1,39 @@
function renderDetails(segment) {
document.getElementById("details").innerHTML = JSON.stringify(segment);
}
fetch("http://localhost:2503/api/v1/details",
{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: '{ "start": "2021-09-01T00:00:00Z", "end": "2022-10-01T00:00:00Z" }'
}
)
.then((response) => {
if (!response.ok) {
throw new Error('Network response was not OK');
}
return response.json();
})
.then((payload) => {
console.log(payload);
payload.map((group) => {
Object.values(group.data).map((label) => {
label.data.map((interval) => {
interval.timeRange = interval.timeRange.map((t) => new Date(t));
})
})
});
TimelinesChart()
(document.getElementById("timeline"))
.timeFormat("%Y-%m-%dT%H:%M:%S.%LZ") // ISO 8601 format
.zScaleLabel('State')
.zQualitative(true)
.useUtc(false)
.onSegmentClick(renderDetails)
.data(payload)
}
)
.catch(err => { throw err });
+9
View File
@@ -0,0 +1,9 @@
<html>
<script src="https://unpkg.com/timelines-chart"></script>
</html>
<body>
<div id="timeline"></div>
<div id="details"></div>
<script src="app.js"></script>
</body>
</html>
Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-launch"><g><path class="primary" d="M14.57 6.96a2 2 0 0 1 2.47 2.47c.29.17.5.47.5.86v7.07a1 1 0 0 1-.3.71L13 22.31a1 1 0 0 1-1.7-.7v-3.58l-.49.19a1 1 0 0 1-1.17-.37 14.1 14.1 0 0 0-3.5-3.5 1 1 0 0 1-.36-1.16l.19-.48H2.39A1 1 0 0 1 1.7 11l4.24-4.24a1 1 0 0 1 .7-.3h7.08c.39 0 .7.21.86.5zM13.19 9.4l-2.15 2.15a3 3 0 0 1 .84.57 3 3 0 0 1 .57.84l2.15-2.15A2 2 0 0 1 13.2 9.4zm6.98-6.61a1 1 0 0 1 1.04 1.04c-.03.86-.13 1.71-.3 2.55-.47-.6-1.99-.19-2.55-.74-.55-.56-.14-2.08-.74-2.55.84-.17 1.7-.27 2.55-.3z"/><path class="secondary" d="M7.23 10.26A16.05 16.05 0 0 1 17.62 3.1a19.2 19.2 0 0 1 3.29 3.29 15.94 15.94 0 0 1-7.17 10.4 19.05 19.05 0 0 0-6.51-6.52zm-.86 5.5a16.2 16.2 0 0 1 1.87 1.87 1 1 0 0 1-.47 1.6c-.79.25-1.6.42-2.4.54a1 1 0 0 1-1.14-1.13c.12-.82.3-1.62.53-2.41a1 1 0 0 1 1.6-.47zm7.34-5.47a2 2 0 1 0 2.83-2.83 2 2 0 0 0-2.83 2.83z"/></g></svg>

After

Width:  |  Height:  |  Size: 926 B

+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-order-vertical"><path class="secondary" d="M7 18.59V9a1 1 0 0 1 2 0v9.59l2.3-2.3a1 1 0 0 1 1.4 1.42l-4 4a1 1 0 0 1-1.4 0l-4-4a1 1 0 1 1 1.4-1.42L7 18.6z"/><path class="primary" d="M17 5.41V15a1 1 0 1 1-2 0V5.41l-2.3 2.3a1 1 0 1 1-1.4-1.42l4-4a1 1 0 0 1 1.4 0l4 4a1 1 0 0 1-1.4 1.42L17 5.4z"/></svg>

After

Width:  |  Height:  |  Size: 370 B

+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-search"><circle cx="10" cy="10" r="7" class="primary"/><path class="secondary" d="M16.32 14.9l1.1 1.1c.4-.02.83.13 1.14.44l3 3a1.5 1.5 0 0 1-2.12 2.12l-3-3a1.5 1.5 0 0 1-.44-1.14l-1.1-1.1a8 8 0 1 1 1.41-1.41zM10 16a6 6 0 1 0 0-12 6 6 0 0 0 0 12z"/></svg>

After

Width:  |  Height:  |  Size: 326 B

+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-sort-ascending"><path class="secondary" d="M18 13v7a1 1 0 0 1-2 0v-7h-3a1 1 0 0 1-.7-1.7l4-4a1 1 0 0 1 1.4 0l4 4A1 1 0 0 1 21 13h-3z"/><path class="primary" d="M3 3h13a1 1 0 0 1 0 2H3a1 1 0 1 1 0-2zm0 4h9a1 1 0 0 1 0 2H3a1 1 0 1 1 0-2zm0 4h5a1 1 0 0 1 0 2H3a1 1 0 0 1 0-2z"/></svg>

After

Width:  |  Height:  |  Size: 353 B

+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-sort-decending"><path class="secondary" d="M6 11V4a1 1 0 1 1 2 0v7h3a1 1 0 0 1 .7 1.7l-4 4a1 1 0 0 1-1.4 0l-4-4A1 1 0 0 1 3 11h3z"/><path class="primary" d="M21 21H8a1 1 0 0 1 0-2h13a1 1 0 0 1 0 2zm0-4h-9a1 1 0 0 1 0-2h9a1 1 0 0 1 0 2zm0-4h-5a1 1 0 0 1 0-2h5a1 1 0 0 1 0 2z"/></svg>

After

Width:  |  Height:  |  Size: 354 B

+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-trash"><path class="primary" d="M5 5h14l-.89 15.12a2 2 0 0 1-2 1.88H7.9a2 2 0 0 1-2-1.88L5 5zm5 5a1 1 0 0 0-1 1v6a1 1 0 0 0 2 0v-6a1 1 0 0 0-1-1zm4 0a1 1 0 0 0-1 1v6a1 1 0 0 0 2 0v-6a1 1 0 0 0-1-1z"/><path class="secondary" d="M8.59 4l1.7-1.7A1 1 0 0 1 11 2h2a1 1 0 0 1 .7.3L15.42 4H19a1 1 0 0 1 0 2H5a1 1 0 1 1 0-2h3.59z"/></svg>

After

Width:  |  Height:  |  Size: 402 B

+1
View File
File diff suppressed because one or more lines are too long
+23
View File
@@ -0,0 +1,23 @@
:root {
--base-fontsize: 1.0rem;
}
body {
max-width: 100%;
padding: 0px;
margin: 0px;
}
pre {
font-size: 1.0rem;
}
#app {
flex-flow: column wrap;
}
.svgicon {
height: 1em;
width: auto;
padding: 2px;
}
+68
View File
@@ -0,0 +1,68 @@
<script>
import Timeline from './components/Timeline.vue'
import GlobalSettings from './components/GlobalSettings.vue'
import SegmentDetails from './components/SegmentDetails.vue'
export default {
data() {
return {
refreshSeconds: 1, // How often to refresh
waterfallURL: 'http://localhost:2503',
activeSegment: null,
maxDisplayIntervals: 500,
}
},
methods: {
updateURL(url) {
this.waterfallURL = url;
},
updateRefreshInterval(interval) {
this.refreshSeconds = interval;
},
updateMaxDisplayIntervals(cnt) {
this.maxDisplayIntervals = cnt;
},
setActiveSegment(segment) {
this.activeSegment = segment;
},
},
components: {
GlobalSettings,
Timeline,
SegmentDetails,
},
};
</script>
<style>
select { max-width: 25%; }
input { max-width: 25%; }
</style>
<template>
<GlobalSettings
:waterfallURL="waterfallURL"
:refreshSeconds="refreshSeconds"
:maxDisplayIntervals="maxDisplayIntervals"
@update-refresh-interval="(interval) => this.updateRefreshInterval(interval)"
@update-waterfall-url="(url) => this.updateURL(url)"
@update-max-display-intervals="(cnt) => this.updateMaxDisplayIntervals(cnt)"
/>
<br/>
<div>
<Timeline
:waterfallURL="waterfallURL"
:refreshSeconds="refreshSeconds"
:maxDisplayIntervals="maxDisplayIntervals"
@update-active-segment="(segment) => this.setActiveSegment(segment)"
/>
</div>
<br/>
<SegmentDetails
v-if="this.activeSegment !== null"
:activeSegment="activeSegment"
/>
</template>
+74
View File
@@ -0,0 +1,74 @@
/* color palette from <https://github.com/vuejs/theme> */
:root {
--vt-c-white: #ffffff;
--vt-c-white-soft: #f8f8f8;
--vt-c-white-mute: #f2f2f2;
--vt-c-black: #181818;
--vt-c-black-soft: #222222;
--vt-c-black-mute: #282828;
--vt-c-indigo: #2c3e50;
--vt-c-divider-light-1: rgba(60, 60, 60, 0.29);
--vt-c-divider-light-2: rgba(60, 60, 60, 0.12);
--vt-c-divider-dark-1: rgba(84, 84, 84, 0.65);
--vt-c-divider-dark-2: rgba(84, 84, 84, 0.48);
--vt-c-text-light-1: var(--vt-c-indigo);
--vt-c-text-light-2: rgba(60, 60, 60, 0.66);
--vt-c-text-dark-1: var(--vt-c-white);
--vt-c-text-dark-2: rgba(235, 235, 235, 0.64);
}
/* semantic color variables for this project */
:root {
--color-background: var(--vt-c-white);
--color-background-soft: var(--vt-c-white-soft);
--color-background-mute: var(--vt-c-white-mute);
--color-border: var(--vt-c-divider-light-2);
--color-border-hover: var(--vt-c-divider-light-1);
--color-heading: var(--vt-c-text-light-1);
--color-text: var(--vt-c-text-light-1);
--section-gap: 160px;
}
@media (prefers-color-scheme: dark) {
:root {
--color-background: var(--vt-c-black);
--color-background-soft: var(--vt-c-black-soft);
--color-background-mute: var(--vt-c-black-mute);
--color-border: var(--vt-c-divider-dark-2);
--color-border-hover: var(--vt-c-divider-dark-1);
--color-heading: var(--vt-c-text-dark-1);
--color-text: var(--vt-c-text-dark-2);
}
}
*,
*::before,
*::after {
box-sizing: border-box;
margin: 0;
position: relative;
font-weight: normal;
}
body {
min-height: 100vh;
color: var(--color-text);
background: var(--color-background);
transition: color 0.5s, background-color 0.5s;
line-height: 1.6;
font-family: Inter, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu,
Cantarell, 'Fira Sans', 'Droid Sans', 'Helvetica Neue', sans-serif;
font-size: 15px;
text-rendering: optimizeLegibility;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
+1
View File
@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 261.76 226.69" xmlns:v="https://vecta.io/nano"><path d="M161.096.001l-30.225 52.351L100.647.001H-.005l130.877 226.688L261.749.001z" fill="#41b883"/><path d="M161.096.001l-30.225 52.351L100.647.001H52.346l78.526 136.01L209.398.001z" fill="#34495e"/></svg>

After

Width:  |  Height:  |  Size: 308 B

+35
View File
@@ -0,0 +1,35 @@
@import "./base.css";
#app {
max-width: 1280px;
margin: 0 auto;
padding: 2rem;
font-weight: normal;
}
a,
.green {
text-decoration: none;
color: hsla(160, 100%, 37%, 1);
transition: 0.4s;
}
@media (hover: hover) {
a:hover {
background-color: hsla(160, 100%, 37%, 0.2);
}
}
@media (min-width: 1024px) {
body {
display: flex;
place-items: center;
}
#app {
display: grid;
grid-template-columns: 1fr 1fr;
padding: 0 2rem;
}
}
+57
View File
@@ -0,0 +1,57 @@
<script>
export default {
props: ['refreshSeconds', 'waterfallURL', 'maxDisplayIntervals'],
data() {
return {
interval: this.refreshSeconds,
url: this.waterfallURL,
max_display_intervals: this.maxDisplayIntervals,
};
},
emits: ['update-refresh-interval', 'update-waterfall-url', 'update-max-intervals'],
computed: {
validRefreshIntervals() {
return [1, 5, 10, 15, 30, 60, 300, 600];
},
validDisplayIntervals() {
return [0, 100, 250, 500, 1000, 1500];
},
isSelected(interval) {
return (interval === this.refreshSeconds ? 'selected' : 'unselected');
},
},
};
</script>
<template>
<details>
<summary>Global Settings</summary>
<label>
Waterfall Base URL
<input @change="$emit('update-waterfall-url', url)" v-model="url"/>
</label>
<label>
Refresh Interval (seconds)
<select @change="$emit('update-refresh-interval', interval)" v-model="interval">
<option v-for="interval in validRefreshIntervals"
:key="interval"
:value="interval"
>
{{ interval }} Seconds
</option>
</select>
</label>
<label>
Max Display Intervals
<select @change="$emit('update-max-display-intervals', max_display_intervals)" v-model="max_display_intervals">
<option v-for="cnt in validDisplayIntervals"
:key="cnt"
:value="cnt"
>
{{ cnt }} Segments
</option>
</select>
</label>
</details>
</template>
+35
View File
@@ -0,0 +1,35 @@
<script>
export default {
props: ['activeSegment']
};
</script>
<template>
<div id="segment-details" v-if="activeSegment !== null">
<table>
<thead>
<tr>
<th>Resource</th>
<th>Task Name</th>
<th>Interval</th>
<th>State</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{activeSegment.group}}</td>
<td>{{activeSegment.label}}</td>
<td>{{activeSegment.timeRange}}</td>
<td>{{activeSegment.val}}</td>
</tr>
</tbody>
</table>
</div>
</template>
<style>
.svgicon {
height: 1em;
width: auto;
}
</style>
+117
View File
@@ -0,0 +1,117 @@
<script>
// import TimelinesChart from 'timelines-chart';
function lexsort(a, b, keyfunc) {
const ka = keyfunc(a);
const kb = keyfunc(b);
let ret = 0;
if (ka < kb) { ret = -1; }
if (ka > kb) { ret = 1; }
return ret;
}
const MIN_TIME="1970-01-01T00:00:00Z";
const MAX_TIME="2099-01-01T00:00:00Z";
export default {
props: ['waterfallURL', 'refreshSeconds', 'maxDisplayIntervals'],
data() {
return {
chart: null,
data: {},
start: MIN_TIME,
end: MAX_TIME,
}
},
watch: {
refreshSeconds() {
this.fetchTimeline();
},
waterfallURL() {
this.fetchTimeline();
},
maxDisplayIntervals() {
this.fetchTimeline();
},
},
methods: {
async fetchTimeline() {
fetch(`${this.waterfallURL}/api/v1/details?max_intervals=${this.maxDisplayIntervals}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: `{ "start": "${this.start}", "end": "${this.end}" }`
})
.then((response) => {
if (!response.ok) {
throw new Error('Network response was not OK');
}
return response.json();
})
.then((payload) => {
payload.map((group) => {
group.data.sort((a, b) => lexsort(a, b, (v) => v.label));
Object.values(group.data).map((label) => {
label.data.map((interval) => {
interval.timeRange = interval.timeRange.map((t) => new Date(t));
})
})
});
payload.sort((a, b) => lexsort(a, b, (v) => v.group));
this.data = payload;
this.chart.data(payload);
this.chart.reresh();
})
.catch(err => { throw err });
},
update() {
this.fetchTimeline();
setTimeout(() => {
this.update();
}, this.refreshSeconds * 1000);
},
setVisibleRange(dateRange) {
if (dateRange === null) {
this.start = MIN_TIME;
this.end = MAX_TIME;
} else {
this.start = dateRange[0].toISOString();
this.end = dateRange[1].toISOString();
}
this.fetchTimeline();
}
},
mounted() {
this.chart = TimelinesChart()(document.getElementById("timeline-graph"))
.timeFormat("%Y-%m-%dT%H:%M:%S.%LZ") // ISO 8601 format
.zScaleLabel('State')
.zQualitative(true)
.useUtc(false)
.onZoom((dateRange, _) => this.setVisibleRange(dateRange))
.onSegmentClick((segment) => this.$emit('updateActiveSegment', segment) );
this.update();
},
};
</script>
<template>
<div id="timeline-graph"></div>
</template>
<style>
.svgicon {
height: 1em;
width: auto;
}
</style>
+17
View File
@@ -0,0 +1,17 @@
import { reactive } from 'vue';
export const ALL_STATES = [
{ name: 'QUEUED', display: 'Queued' },
{ name: 'RUNNING', display: 'Running' },
{ name: 'ERRORED', display: 'Errored' },
{ name: 'COMPLETED', display: 'Completed' },
{ name: 'KILLED', display: 'Killed' },
];
export const defaultCountHandler = {
get(target, name) {
return name in target ? target[name] : 0;
},
};
+6
View File
@@ -0,0 +1,6 @@
import { createApp } from 'vue'
import App from './App.vue'
import './assets/main.css'
createApp(App).mount('#app')
+14
View File
@@ -0,0 +1,14 @@
import { fileURLToPath, URL } from 'node:url'
import { defineConfig } from 'vite'
import vue from '@vitejs/plugin-vue'
// https://vitejs.dev/config/
export default defineConfig({
plugins: [vue()],
resolve: {
alias: {
'@': fileURLToPath(new URL('./src', import.meta.url))
}
}
})