Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ca6f65bae2 | |||
| 6042507ab7 | |||
| 304e04cca9 | |||
| 2e89c1bac2 | |||
| e711b3249b | |||
| 6b49038db6 | |||
| 4d2a71d028 | |||
| 6f5f890b1e | |||
| f5ca3315f0 | |||
| eb590c848e | |||
| 5d4ff2bb8b | |||
| 03412bb79d | |||
| 2c96b16ec8 | |||
| ce621dc9d5 | |||
| 7c2c6cd8c7 | |||
| d6ced6db50 | |||
| 1201e93169 | |||
| d4ae655b4e | |||
| 4f957e91da | |||
| 8c29665962 | |||
| bbce4c208c | |||
| bb3c0d3972 | |||
| 8f6e96e989 | |||
| 56771a5f47 | |||
| 923025bc49 | |||
| d82b000f9b | |||
| 834b0f2c9c | |||
| 5a4c5034a3 | |||
| ca9a32c032 | |||
| 0d6cea4152 | |||
| 779852022a |
@@ -20,3 +20,6 @@ sysinfo = "0.23"
|
||||
redis = { version = "*", features = ["aio", "tokio-comp"] }
|
||||
clap = { version = "3.1", features = ["derive"] }
|
||||
env_logger = "0.9"
|
||||
log = "0.4"
|
||||
actix-web = "4"
|
||||
actix-cors = "0.6"
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
# Waterfall
|
||||
|
||||
Waterfall is a declarative task execution framework.
|
||||
|
||||
# Why Another Execution Framework
|
||||
|
||||
There are many, many execution frameworks out there that support defining
|
||||
tasks with inter-task dependencies. Most of them only partially include
|
||||
scheduling in their design.
|
||||
|
||||
# Building and Running
|
||||
|
||||
```bash
|
||||
cargo build
|
||||
|
||||
# wf is a cli for running worlds directly
|
||||
|
||||
# A redis instance is required for storage
|
||||
|
||||
# Run using the local executor
|
||||
cargo run --bin wf -- --config examples/config.json --world examples/world.json
|
||||
|
||||
# Starting an agent
|
||||
# wfw is a (W)ater(F)low (W)orker
|
||||
cargo run --bin wfw
|
||||
cargo run --bin wf -- --config examples/config_wfw.json --world examples/world.json
|
||||
```
|
||||
|
||||
# Overview
|
||||
|
||||
## Example
|
||||
|
||||
## Resources
|
||||
|
||||
Resources are at the heart of Waterfall. They are simple things: labels
|
||||
with an associated set of time intervals. Tasks produce resources for
|
||||
given intervals.
|
||||
|
||||
## Tasks
|
||||
|
||||
Tasks are commands that run on a set schedule. Each task produces one or
|
||||
more `Resource`. The run schedule naturally breaks up the timeline into
|
||||
intervals. When a task runs at time `T_n`, it will make make each resource
|
||||
it provides available over the interval `(T_{n-1},T]`.
|
||||
|
||||
### Commands
|
||||
|
||||
A task has three commands defined:
|
||||
|
||||
- **check** - Command used to run an out-of-band verification of the data. Should have no side effects.
|
||||
- **up** - Command run to create resources.
|
||||
- **down** - Command run when removing resources.
|
||||
|
||||
### Dependencies
|
||||
|
||||
Tasks will run at their scheduled time (or immediately if their scheduled time
|
||||
has passed already).
|
||||
|
||||
It's possible to define additional constraints on launching, though. Some tasks
|
||||
may need resources produced by other tasks before it can start.
|
||||
@@ -0,0 +1,10 @@
|
||||
- core
|
||||
- [ ] Convert Runner to be more like daggyr runner
|
||||
- [ ] Implement task kill / stop
|
||||
- [ ] Implement force resources up/down with cascade
|
||||
- wfd
|
||||
- [ ] Implement shutdown when world is complete
|
||||
- [ ] Implement endpoints
|
||||
- Get state over interval
|
||||
- Store (interval, {UP,QUEUED,ERRORED, ETC})
|
||||
- Get attempts
|
||||
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"storage": {
|
||||
"type": "redis",
|
||||
"url": "redis://localhost",
|
||||
"prefix": "world"
|
||||
},
|
||||
"executor": {
|
||||
"type": "agent",
|
||||
"targets": [
|
||||
{
|
||||
"base_url": "http://localhost:2504/api/v1",
|
||||
"resources": { "cores": 10 }
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
{
|
||||
"storage": {
|
||||
"type": "redis",
|
||||
"url": "redis://localhost",
|
||||
"prefix": "world"
|
||||
},
|
||||
"executor": {
|
||||
"type": "agent",
|
||||
"targets": [
|
||||
{
|
||||
"base_url": "http://localhost:2504/api/v1",
|
||||
"resources": { "cores": 1 }
|
||||
},
|
||||
{
|
||||
"base_url": "http://localhost:2505/api/v1",
|
||||
"resources": { "cores": 1 }
|
||||
},
|
||||
{
|
||||
"base_url": "http://localhost:2506/api/v1",
|
||||
"resources": { "cores": 1 }
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
{
|
||||
"variables": {
|
||||
"HOME": "/tmp/world_test"
|
||||
},
|
||||
"calendars": {
|
||||
"std": { "mask": [ "Mon", "Tue", "Wed", "Thu", "Fri" ] },
|
||||
"weekly": { "mask": [ "Fri" ] }
|
||||
},
|
||||
"tasks": {
|
||||
"task_a": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
|
||||
"provides": [ "alpha" ],
|
||||
|
||||
"calendar_name": "std",
|
||||
"times": [ "09:00:00", "12:00:00"],
|
||||
"timezone": "America/New_York",
|
||||
|
||||
"valid_from": "2021-01-01T09:00:00",
|
||||
"valid_to": "2022-06-01T09:00:00"
|
||||
},
|
||||
"task_a_new": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
|
||||
"provides": [ "alpha" ],
|
||||
|
||||
"calendar_name": "std",
|
||||
"times": [ "09:00:00", "12:00:00"],
|
||||
"timezone": "America/New_York",
|
||||
|
||||
"valid_from": "2022-06-01T09:00:00",
|
||||
"valid_to": "2023-05-01T09:00:00"
|
||||
},
|
||||
|
||||
"task_b": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
|
||||
"provides": [ "beta" ],
|
||||
"requires": [ { "resource": "alpha", "offset": 0 } ],
|
||||
|
||||
"calendar_name": "weekly",
|
||||
"times": [ "17:00:00" ],
|
||||
"timezone": "America/New_York",
|
||||
|
||||
"valid_from": "2022-01-04T09:00:00",
|
||||
"valid_to": "2023-01-07T00:00:00"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"server": {
|
||||
"ip": "127.0.0.1",
|
||||
"port": 2503
|
||||
},
|
||||
"storage": {
|
||||
"type": "redis",
|
||||
"url": "redis://localhost",
|
||||
"prefix": "another"
|
||||
},
|
||||
"executor": {
|
||||
"type": "local",
|
||||
"workers": 10
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,9 +7,9 @@
|
||||
},
|
||||
"tasks": {
|
||||
"task_a": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}" },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}" },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}" },
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_a_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
|
||||
"provides": [ "task_a" ],
|
||||
|
||||
@@ -21,9 +21,9 @@
|
||||
"valid_to": "2022-01-08T09:00:00"
|
||||
},
|
||||
"task_b": {
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}" },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}" },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}" },
|
||||
"up": { "command": "/usr//bin/touch ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"down": { "command": "/bin/rm ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
"check": { "command": "/bin/test -e ${HOME}/task_b_${yyyymmdd}${hhmmss}", "resources": { "cores": 1 } },
|
||||
|
||||
"provides": [ "task_b" ],
|
||||
"requires": [ { "resource": "task_a", "offset": 0 } ],
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use clap::Parser;
|
||||
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use waterfall;
|
||||
use waterfall::prelude::*;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -19,9 +21,10 @@ impl StorageConfig {
|
||||
) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
match self {
|
||||
StorageConfig::Redis { url, prefix } => {
|
||||
(tx, redis_store::start(rx, url.clone(), prefix.clone()))
|
||||
}
|
||||
StorageConfig::Redis { url, prefix } => (
|
||||
tx,
|
||||
waterfall::storage::redis::start(rx, url.clone(), prefix.clone()),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -29,7 +32,12 @@ impl StorageConfig {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
|
||||
enum ExecutorConfig {
|
||||
Local { workers: usize },
|
||||
Local {
|
||||
workers: usize,
|
||||
},
|
||||
Agent {
|
||||
targets: Vec<agent_executor::AgentTarget>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ExecutorConfig {
|
||||
@@ -42,6 +50,7 @@ impl ExecutorConfig {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
match self {
|
||||
ExecutorConfig::Local { workers } => (tx, local_executor::start(*workers, rx)),
|
||||
ExecutorConfig::Agent { targets } => (tx, agent_executor::start(targets.clone(), rx)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,6 +76,10 @@ struct Args {
|
||||
/// Enable verbose logging
|
||||
#[clap(short, long)]
|
||||
verbose: bool,
|
||||
|
||||
/// Force a full re-check
|
||||
#[clap(short, long)]
|
||||
force_recheck: bool,
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -108,18 +121,22 @@ async fn main() -> std::io::Result<()> {
|
||||
|
||||
let tasks = world_def.taskset().unwrap();
|
||||
|
||||
debug!("Config: {:?}", args);
|
||||
|
||||
let (_runner_tx, runner_rx) = mpsc::unbounded_channel();
|
||||
let mut runner = Runner::new(
|
||||
tasks,
|
||||
world_def.variables,
|
||||
runner_rx,
|
||||
exe_tx.clone(),
|
||||
storage_tx.clone(),
|
||||
world_def.output_options,
|
||||
args.force_recheck,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (wtx, wrx) = oneshot::channel();
|
||||
runner.run(wrx).await;
|
||||
runner.run(false).await;
|
||||
|
||||
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
|
||||
exe_handle.await.unwrap();
|
||||
|
||||
@@ -0,0 +1,426 @@
|
||||
use actix_cors::Cors;
|
||||
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
|
||||
use clap::Parser;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use waterfall::prelude::*;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ServerConfig {
|
||||
pub ip: String,
|
||||
pub port: u32,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
fn listen_spec(&self) -> String {
|
||||
format!("{}:{}", self.ip, self.port)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ServerConfig {
|
||||
fn default() -> Self {
|
||||
ServerConfig {
|
||||
ip: String::from("127.0.0.1"),
|
||||
port: 2503,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
|
||||
enum StorageConfig {
|
||||
Redis { url: String, prefix: String },
|
||||
}
|
||||
|
||||
impl StorageConfig {
|
||||
fn start(
|
||||
&self,
|
||||
) -> (
|
||||
mpsc::UnboundedSender<StorageMessage>,
|
||||
tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
match self {
|
||||
StorageConfig::Redis { url, prefix } => (
|
||||
tx,
|
||||
waterfall::storage::redis::start(rx, url.clone(), prefix.clone()),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
|
||||
enum ExecutorConfig {
|
||||
Local {
|
||||
workers: usize,
|
||||
},
|
||||
Agent {
|
||||
targets: Vec<agent_executor::AgentTarget>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ExecutorConfig {
|
||||
fn start(
|
||||
&self,
|
||||
) -> (
|
||||
mpsc::UnboundedSender<ExecutorMessage>,
|
||||
tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
match self {
|
||||
ExecutorConfig::Local { workers } => (tx, local_executor::start(*workers, rx)),
|
||||
ExecutorConfig::Agent { targets } => (tx, agent_executor::start(targets.clone(), rx)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct Config {
|
||||
storage: StorageConfig,
|
||||
executor: ExecutorConfig,
|
||||
server: ServerConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SimpleError {
|
||||
error: String,
|
||||
}
|
||||
|
||||
async fn get_state(state: web::Data<AppState>) -> impl Responder {
|
||||
let (response, rx) = oneshot::channel();
|
||||
|
||||
state
|
||||
.runner_tx
|
||||
.send(RunnerMessage::GetState { response })
|
||||
.unwrap();
|
||||
|
||||
match rx.await {
|
||||
Ok(world) => HttpResponse::Ok().json(world),
|
||||
Err(error) => HttpResponse::BadRequest().json(SimpleError {
|
||||
error: format!("{:?}", error),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Generates the data structure for [timelines-chart](https://github.com/vasturiano/timelines-chart)
|
||||
|
||||
[
|
||||
{
|
||||
"group": "resource",
|
||||
"data": [
|
||||
{
|
||||
label: "task_name",
|
||||
"data": [
|
||||
{
|
||||
"timeRange": [ "start", "end" ],
|
||||
"val": "State"
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
*/
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TimelineInterval {
|
||||
time_range: [DateTime<Utc>; 2],
|
||||
val: ActionState,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TimelineLabel {
|
||||
label: String,
|
||||
data: Vec<TimelineInterval>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TimelineGroup {
|
||||
group: String,
|
||||
data: Vec<TimelineLabel>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct DetailedTimelineOptions {
|
||||
#[serde(default)]
|
||||
max_intervals: Option<usize>,
|
||||
}
|
||||
|
||||
async fn get_detailed_timeline(
|
||||
options: web::Query<DetailedTimelineOptions>,
|
||||
span: web::Json<Interval>,
|
||||
state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let interval = span.into_inner();
|
||||
let max_intervals = options.into_inner().max_intervals;
|
||||
|
||||
let (response, rx) = oneshot::channel();
|
||||
state
|
||||
.runner_tx
|
||||
.send(RunnerMessage::GetResourceStateDetails {
|
||||
interval,
|
||||
response,
|
||||
max_intervals,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
match rx.await {
|
||||
Ok(actions) => {
|
||||
let mut timeline = Vec::new();
|
||||
info!(
|
||||
"Querying for actions over {}, got {} responses.",
|
||||
interval,
|
||||
actions.len()
|
||||
);
|
||||
|
||||
for (resource, tasks) in actions {
|
||||
let mut group = TimelineGroup {
|
||||
group: resource.clone(),
|
||||
data: Vec::new(),
|
||||
};
|
||||
for (task_name, intervals) in tasks.into_iter() {
|
||||
let data = intervals
|
||||
.into_iter()
|
||||
.map(|a| TimelineInterval {
|
||||
time_range: [a.interval.start, a.interval.end],
|
||||
val: a.state,
|
||||
})
|
||||
.collect();
|
||||
|
||||
group.data.push(TimelineLabel {
|
||||
label: task_name,
|
||||
data,
|
||||
});
|
||||
}
|
||||
timeline.push(group);
|
||||
}
|
||||
|
||||
HttpResponse::Ok().json(timeline)
|
||||
}
|
||||
Err(error) => HttpResponse::BadRequest().json(SimpleError {
|
||||
error: format!("{:?}", error),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve all data about a segment, including:
|
||||
/// What resources it relies on
|
||||
/// Last attempt (if any)
|
||||
async fn get_segment_details(
|
||||
max_intervals: web::Query<Option<usize>>,
|
||||
span: web::Json<Interval>,
|
||||
state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
/*
|
||||
let interval = span.into_inner();
|
||||
|
||||
let (response, rx) = oneshot::channel();
|
||||
state
|
||||
.runner_tx
|
||||
.send(RunnerMessage::GetResourceStateDetails {
|
||||
interval,
|
||||
response,
|
||||
max_intervals: max_intervals.into_inner(),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
match rx.await {
|
||||
Ok(actions) => {
|
||||
let mut timeline = Vec::new();
|
||||
for (resource, tasks) in actions {
|
||||
let mut group = TimelineGroup {
|
||||
group: resource.clone(),
|
||||
data: Vec::new(),
|
||||
};
|
||||
for (task_name, mut intervals) in tasks.into_iter() {
|
||||
// Collapse intervals
|
||||
if intervals.len() > 50 {}
|
||||
let data = intervals
|
||||
.into_iter()
|
||||
.map(|a| TimelineInterval {
|
||||
time_range: [a.interval.start, a.interval.end],
|
||||
val: a.state,
|
||||
})
|
||||
.collect();
|
||||
|
||||
group.data.push(TimelineLabel {
|
||||
label: task_name,
|
||||
data,
|
||||
});
|
||||
}
|
||||
timeline.push(group);
|
||||
}
|
||||
|
||||
HttpResponse::Ok().json(timeline)
|
||||
}
|
||||
Err(error) => HttpResponse::BadRequest().json(SimpleError {
|
||||
error: format!("{:?}", error),
|
||||
}),
|
||||
}
|
||||
*/
|
||||
HttpResponse::Ok()
|
||||
}
|
||||
|
||||
/*
|
||||
async fn stop_run(path: web::Path<RunID>, state: web::Data<AppState>) -> impl Responder {
|
||||
let run_id = path.into_inner();
|
||||
let (response, rx) = oneshot::channel();
|
||||
|
||||
state
|
||||
.config
|
||||
.runner
|
||||
.send(RunnerMessage::StopRun { run_id, response })
|
||||
.unwrap();
|
||||
|
||||
rx.await.unwrap();
|
||||
HttpResponse::Ok()
|
||||
}
|
||||
*/
|
||||
|
||||
async fn ready() -> impl Responder {
|
||||
HttpResponse::Ok()
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about)]
|
||||
struct Args {
|
||||
/// Configuration File
|
||||
#[clap(short, long, default_value = "")]
|
||||
config: String,
|
||||
|
||||
/// Configuration File
|
||||
#[clap(short, long, default_value = "")]
|
||||
world: String,
|
||||
|
||||
/// Enable verbose logging
|
||||
#[clap(short, long)]
|
||||
verbose: bool,
|
||||
|
||||
/// Force a full re-check
|
||||
#[clap(short, long)]
|
||||
force_recheck: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
storage_tx: mpsc::UnboundedSender<StorageMessage>,
|
||||
runner_tx: mpsc::UnboundedSender<RunnerMessage>,
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
// Parse the config
|
||||
let world_json = std::fs::read_to_string(&args.world)
|
||||
.expect(&format!("Unable to open {} for reading", args.config));
|
||||
let world_def: WorldDefinition =
|
||||
serde_json::from_str(&world_json).expect("Unable to parse world definition");
|
||||
|
||||
// Parse the config
|
||||
let config_json = std::fs::read_to_string(&args.config)
|
||||
.expect(&format!("Unable to open {} for reading", args.config));
|
||||
let config: Config =
|
||||
serde_json::from_str(&config_json).expect("Unable to parse config definition");
|
||||
|
||||
// Start the workers
|
||||
let (exe_tx, exe_handle) = config.executor.start();
|
||||
let (storage_tx, storage_handle) = config.storage.start();
|
||||
let (runner_tx, runner_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let data = web::Data::new(AppState {
|
||||
storage_tx: storage_tx.clone(),
|
||||
runner_tx: runner_tx.clone(),
|
||||
});
|
||||
|
||||
let tasks = world_def.taskset().unwrap();
|
||||
let mut runner = Runner::new(
|
||||
tasks,
|
||||
world_def.variables,
|
||||
runner_rx,
|
||||
exe_tx.clone(),
|
||||
storage_tx.clone(),
|
||||
world_def.output_options,
|
||||
args.force_recheck,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runner_handle = tokio::spawn(async move {
|
||||
runner.run(true).await;
|
||||
});
|
||||
|
||||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||
let res = HttpServer::new(move || {
|
||||
let cors = Cors::default()
|
||||
.allow_any_header()
|
||||
.allow_any_method()
|
||||
.allow_any_origin()
|
||||
.send_wildcard();
|
||||
|
||||
let json_config = web::JsonConfig::default()
|
||||
.limit(1048576)
|
||||
.error_handler(|err, _req| {
|
||||
use actix_web::error::JsonPayloadError;
|
||||
let payload = match &err {
|
||||
JsonPayloadError::OverflowKnownLength { length, limit } => SimpleError {
|
||||
error: format!("Payload too big ({} > {})", length, limit),
|
||||
},
|
||||
JsonPayloadError::Overflow { limit } => SimpleError {
|
||||
error: format!("Payload too big (> {})", limit),
|
||||
},
|
||||
JsonPayloadError::ContentType => SimpleError {
|
||||
error: "Unsupported Content-Type".to_owned(),
|
||||
},
|
||||
JsonPayloadError::Deserialize(e) => SimpleError {
|
||||
error: format!("Parsing error: {}", e),
|
||||
},
|
||||
JsonPayloadError::Serialize(e) => SimpleError {
|
||||
error: format!("JSON Generation error: {}", e),
|
||||
},
|
||||
JsonPayloadError::Payload(payload) => SimpleError {
|
||||
error: format!("Payload error: {}", payload),
|
||||
},
|
||||
_ => SimpleError {
|
||||
error: "Unknown error".to_owned(),
|
||||
},
|
||||
};
|
||||
|
||||
error::InternalError::from_response(err, HttpResponse::Conflict().json(payload))
|
||||
.into()
|
||||
});
|
||||
|
||||
App::new()
|
||||
.wrap(cors)
|
||||
.app_data(data.clone())
|
||||
.wrap(Logger::new(
|
||||
r#"%a "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#,
|
||||
))
|
||||
.app_data(json_config)
|
||||
.route("/ready", web::get().to(ready))
|
||||
.service(
|
||||
web::scope("/api/v1")
|
||||
.route("/state", web::get().to(get_state))
|
||||
.route("/details", web::post().to(get_detailed_timeline)),
|
||||
)
|
||||
})
|
||||
.bind(config.server.listen_spec())?
|
||||
.run()
|
||||
.await;
|
||||
|
||||
// Shutdown the runner
|
||||
runner_tx.send(RunnerMessage::Stop {}).unwrap();
|
||||
runner_handle.await.unwrap();
|
||||
exe_tx.send(ExecutorMessage::Stop {}).unwrap();
|
||||
exe_handle.await.unwrap();
|
||||
storage_tx.send(StorageMessage::Stop {}).unwrap();
|
||||
storage_handle.await.unwrap();
|
||||
|
||||
res
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
pub use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
use sysinfo::{RefreshKind, System, SystemExt};
|
||||
use tokio::sync::mpsc;
|
||||
use waterfall::prelude::*;
|
||||
|
||||
fn default_resources() -> TaskResources {
|
||||
let system = System::new_with_specifics(RefreshKind::new().with_cpu().with_memory());
|
||||
let cores = (system.processors().len() as i64) - 2;
|
||||
let free_memory = (system.total_memory() - system.used_memory()) as f64;
|
||||
let memory_mb = ((free_memory * 0.8) as i64) / 1024;
|
||||
|
||||
let mut resources = TaskResources::new();
|
||||
resources.insert("cores".to_owned(), cores);
|
||||
resources.insert("memory_mb".to_owned(), memory_mb);
|
||||
resources
|
||||
}
|
||||
|
||||
fn default_ip() -> String {
|
||||
"127.0.0.1".to_owned()
|
||||
}
|
||||
|
||||
fn default_port() -> u32 {
|
||||
2504
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct GlobalConfigSpec {
|
||||
#[serde(default = "default_ip")]
|
||||
pub ip: String,
|
||||
|
||||
#[serde(default = "default_port")]
|
||||
pub port: u32,
|
||||
|
||||
#[serde(default = "default_resources")]
|
||||
pub resources: TaskResources,
|
||||
}
|
||||
|
||||
impl Default for GlobalConfigSpec {
|
||||
fn default() -> Self {
|
||||
GlobalConfigSpec {
|
||||
ip: String::from("127.0.0.1"),
|
||||
port: default_port(),
|
||||
resources: default_resources(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GlobalConfig {
|
||||
pub ip: String,
|
||||
pub port: u32,
|
||||
pub resources: TaskResources,
|
||||
pub storage: mpsc::UnboundedSender<StorageMessage>,
|
||||
pub executor: mpsc::UnboundedSender<ExecutorMessage>,
|
||||
}
|
||||
|
||||
impl GlobalConfig {
|
||||
pub fn new(spec: &GlobalConfigSpec) -> Self {
|
||||
let def_res = default_resources();
|
||||
let cores = def_res.get("cores").unwrap();
|
||||
|
||||
let workers = spec.resources.get("cores").unwrap_or(cores);
|
||||
|
||||
let (executor, exe_rx) = mpsc::unbounded_channel();
|
||||
local_executor::start(*workers as usize, exe_rx);
|
||||
|
||||
// Tracker
|
||||
let (storage, trx) = mpsc::unbounded_channel();
|
||||
waterfall::storage::noop::start(trx);
|
||||
|
||||
GlobalConfig {
|
||||
ip: spec.ip.clone(),
|
||||
port: spec.port,
|
||||
resources: spec.resources.clone(),
|
||||
storage,
|
||||
executor,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn listen_spec(&self) -> String {
|
||||
format!("{}:{}", self.ip, self.port)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,185 @@
|
||||
mod config;
|
||||
|
||||
use actix_cors::Cors;
|
||||
use actix_web::{error, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
|
||||
use clap::Parser;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use config::*;
|
||||
use waterfall::executors::agent_executor::TaskSubmission;
|
||||
use waterfall::prelude::*;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SimpleError {
|
||||
error: String,
|
||||
}
|
||||
|
||||
async fn get_resources(data: web::Data<GlobalConfig>) -> impl Responder {
|
||||
HttpResponse::Ok().json(data.resources.clone())
|
||||
}
|
||||
|
||||
async fn submit_task(
|
||||
details: web::Json<TaskSubmission>,
|
||||
data: web::Data<GlobalConfig>,
|
||||
) -> impl Responder {
|
||||
let (response, rx) = oneshot::channel();
|
||||
|
||||
let submission = details.into_inner();
|
||||
|
||||
// Need to keep this unused, otherwise the LE will kill it immediately
|
||||
let (kill_tx, kill) = oneshot::channel();
|
||||
data.executor
|
||||
.send(ExecutorMessage::ExecuteTask {
|
||||
details: submission.details,
|
||||
output_options: submission.output_options,
|
||||
varmap: submission.varmap,
|
||||
response,
|
||||
kill,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
HttpResponse::Ok().json(rx.await.unwrap())
|
||||
}
|
||||
|
||||
/*
|
||||
async fn stop_task(
|
||||
path: web::Path<(RunID, TaskID)>,
|
||||
data: web::Data<GlobalConfig>,
|
||||
) -> impl Responder {
|
||||
let (run_id, task_id) = path.into_inner();
|
||||
let (response, rx) = oneshot::channel();
|
||||
|
||||
data.executor
|
||||
.send(ExecutorMessage::StopTask {
|
||||
run_id,
|
||||
task_id,
|
||||
response,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
rx.await.unwrap();
|
||||
HttpResponse::Ok()
|
||||
}
|
||||
*/
|
||||
|
||||
async fn ready() -> impl Responder {
|
||||
HttpResponse::Ok()
|
||||
}
|
||||
|
||||
fn init(config_file: &str) -> GlobalConfig {
|
||||
let spec: GlobalConfigSpec = if config_file.is_empty() {
|
||||
GlobalConfigSpec::default()
|
||||
} else {
|
||||
let json = std::fs::read_to_string(config_file)
|
||||
.unwrap_or_else(|_| panic!("Unable to open {} for reading", config_file));
|
||||
serde_json::from_str(&json).expect("Error parsing config json")
|
||||
};
|
||||
|
||||
GlobalConfig::new(&spec)
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about)]
|
||||
struct Args {
|
||||
/// Configuration File
|
||||
#[clap(short, long, default_value = "")]
|
||||
config: String,
|
||||
|
||||
/// Enable verbose logging
|
||||
#[clap(short, long)]
|
||||
verbose: bool,
|
||||
|
||||
/// Configuration File
|
||||
#[clap(short, long)]
|
||||
host: Option<String>,
|
||||
|
||||
/// Configuration File
|
||||
#[clap(short, long)]
|
||||
port: Option<u32>,
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
let data = web::Data::new(init(args.config.as_ref()));
|
||||
let config = data.clone();
|
||||
|
||||
let host = if let Some(h) = args.host {
|
||||
h
|
||||
} else {
|
||||
config.ip.clone()
|
||||
};
|
||||
|
||||
let port = if let Some(p) = args.port {
|
||||
p
|
||||
} else {
|
||||
config.port
|
||||
};
|
||||
|
||||
let listen_spec = format!("{}:{}", host, port);
|
||||
|
||||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||
let res = HttpServer::new(move || {
|
||||
let cors = Cors::default()
|
||||
.allow_any_header()
|
||||
.allow_any_method()
|
||||
.allow_any_origin()
|
||||
.send_wildcard();
|
||||
|
||||
let json_config = web::JsonConfig::default()
|
||||
.limit(1048576)
|
||||
.error_handler(|err, _req| {
|
||||
use actix_web::error::JsonPayloadError;
|
||||
let payload = match &err {
|
||||
JsonPayloadError::OverflowKnownLength { length, limit } => SimpleError {
|
||||
error: format!("Payload too big ({} > {})", length, limit),
|
||||
},
|
||||
JsonPayloadError::Overflow { limit } => SimpleError {
|
||||
error: format!("Payload too big (> {})", limit),
|
||||
},
|
||||
JsonPayloadError::ContentType => SimpleError {
|
||||
error: "Unsupported Content-Type".to_owned(),
|
||||
},
|
||||
JsonPayloadError::Deserialize(e) => SimpleError {
|
||||
error: format!("Parsing error: {}", e),
|
||||
},
|
||||
JsonPayloadError::Serialize(e) => SimpleError {
|
||||
error: format!("JSON Generation error: {}", e),
|
||||
},
|
||||
JsonPayloadError::Payload(payload) => SimpleError {
|
||||
error: format!("Payload error: {}", payload),
|
||||
},
|
||||
_ => SimpleError {
|
||||
error: "Unknown error".to_owned(),
|
||||
},
|
||||
};
|
||||
|
||||
error::InternalError::from_response(err, HttpResponse::Conflict().json(payload))
|
||||
.into()
|
||||
});
|
||||
|
||||
App::new()
|
||||
.wrap(cors)
|
||||
.app_data(data.clone())
|
||||
.wrap(Logger::new(
|
||||
r#"%a "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#,
|
||||
))
|
||||
.app_data(json_config)
|
||||
.route("/ready", web::get().to(ready))
|
||||
.service(
|
||||
web::scope("/api/v1")
|
||||
.route("/resources", web::get().to(get_resources))
|
||||
.route("/run", web::post().to(submit_task)),
|
||||
)
|
||||
})
|
||||
.bind(listen_spec)?
|
||||
.run()
|
||||
.await;
|
||||
|
||||
config.executor.send(ExecutorMessage::Stop {}).unwrap();
|
||||
config.storage.send(StorageMessage::Stop {}).unwrap();
|
||||
|
||||
res
|
||||
}
|
||||
@@ -0,0 +1,286 @@
|
||||
//! The Agent executor is essentially a wrapped version of the local executor.
|
||||
//! It dispatches tasks to remote hosts
|
||||
|
||||
use super::*;
|
||||
use futures::stream::futures_unordered::FuturesUnordered;
|
||||
use log::{info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use futures::StreamExt;
|
||||
|
||||
fn default_as_true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct AgentTarget {
|
||||
pub base_url: String,
|
||||
|
||||
#[serde(default)]
|
||||
pub resources: TaskResources,
|
||||
|
||||
#[serde(default)]
|
||||
pub current_resources: TaskResources,
|
||||
|
||||
#[serde(default)]
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
impl AgentTarget {
|
||||
fn new(base_url: String, resources: TaskResources) -> Self {
|
||||
AgentTarget {
|
||||
base_url,
|
||||
resources: resources.clone(),
|
||||
current_resources: resources,
|
||||
enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_resources(&mut self, client: &reqwest::Client) {
|
||||
let resource_url = format!("{}/resources", self.base_url);
|
||||
let disabled = match client.get(resource_url).send().await {
|
||||
Ok(result) => {
|
||||
if result.status() == reqwest::StatusCode::OK {
|
||||
self.resources = result.json().await.unwrap();
|
||||
self.current_resources = self.resources.clone();
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
Err(_) => true,
|
||||
};
|
||||
if self.enabled && disabled {
|
||||
warn!("Disabling {}: unable to refresh resources", self.base_url);
|
||||
}
|
||||
self.enabled = !disabled;
|
||||
}
|
||||
|
||||
async fn ping(&mut self, client: &reqwest::Client) -> Result<()> {
|
||||
let resource_url = format!("{}/ready", self.base_url);
|
||||
let result = client.get(resource_url).send().await?;
|
||||
self.enabled = result.status() == reqwest::StatusCode::OK;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains specifics on how to run a local task
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
struct AgentTaskDetail {
|
||||
/// The command and all arguments to run
|
||||
command: Cmd,
|
||||
|
||||
/// Environment variables to set
|
||||
#[serde(default)]
|
||||
environment: HashMap<String, String>,
|
||||
|
||||
/// Timeout in seconds
|
||||
#[serde(default)]
|
||||
timeout: i64,
|
||||
|
||||
/// resources required by the task
|
||||
resources: TaskResources,
|
||||
}
|
||||
|
||||
fn extract_details(details: &TaskDetails) -> Result<AgentTaskDetail, serde_json::Error> {
|
||||
serde_json::from_value::<AgentTaskDetail>(details.clone())
|
||||
}
|
||||
|
||||
fn validate_task(details: &TaskDetails, max_capacities: &[TaskResources]) -> Result<()> {
|
||||
let parsed = extract_details(details)?;
|
||||
if max_capacities.is_empty()
|
||||
|| max_capacities.iter().all(|x| x.values().all(|x| *x == 0))
|
||||
|| max_capacities
|
||||
.iter()
|
||||
.any(|x| x.can_satisfy(&parsed.resources))
|
||||
{
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("No Agent target satisfies the required resources"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains specifics on how to run a local task
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct TaskSubmission {
|
||||
pub details: TaskDetails,
|
||||
pub varmap: VarMap,
|
||||
pub output_options: TaskOutputOptions,
|
||||
}
|
||||
|
||||
async fn submit_task(
|
||||
base_url: String,
|
||||
details: TaskDetails,
|
||||
output_options: TaskOutputOptions,
|
||||
client: reqwest::Client,
|
||||
varmap: VarMap,
|
||||
) -> Result<TaskAttempt> {
|
||||
let submit_url = format!("{}/run", base_url);
|
||||
let submission = TaskSubmission {
|
||||
details,
|
||||
varmap,
|
||||
output_options,
|
||||
};
|
||||
match client.post(submit_url).json(&submission).send().await {
|
||||
Ok(result) => {
|
||||
if result.status() == reqwest::StatusCode::OK {
|
||||
let mut attempt: TaskAttempt = result.json().await.unwrap();
|
||||
attempt
|
||||
.executor
|
||||
.push(format!("Executed on agent at {}", base_url));
|
||||
Ok(attempt)
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Unable to dispatch to agent at {}: {:?}",
|
||||
base_url,
|
||||
result.text().await.unwrap()
|
||||
))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(anyhow!(
|
||||
"Unable to dispatch to agent at {}: {:?}",
|
||||
base_url,
|
||||
e
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
// async fn select_target() -> Option<usize> {}
|
||||
|
||||
struct RunningTask {
|
||||
resources: TaskResources,
|
||||
target_id: usize,
|
||||
}
|
||||
|
||||
/// The mpsc channel can be sized to fit max parallelism
|
||||
async fn start_agent_executor(
|
||||
mut targets: Vec<AgentTarget>,
|
||||
mut exe_msgs: mpsc::UnboundedReceiver<ExecutorMessage>,
|
||||
) {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
for target in &mut targets {
|
||||
target.refresh_resources(&client).await;
|
||||
}
|
||||
let mut max_caps: Vec<TaskResources> = targets.iter().map(|x| x.resources.clone()).collect();
|
||||
|
||||
// Set up the local executor
|
||||
let (le_tx, le_rx) = mpsc::unbounded_channel();
|
||||
local_executor::start(1, le_rx);
|
||||
|
||||
// Tasks waiting to release resources
|
||||
let mut running = FuturesUnordered::new();
|
||||
|
||||
while let Some(msg) = exe_msgs.recv().await {
|
||||
use ExecutorMessage::*;
|
||||
match msg {
|
||||
ValidateTask { details, response } => {
|
||||
let ltx = le_tx.clone();
|
||||
let caps = max_caps.clone();
|
||||
tokio::spawn(async move {
|
||||
let result = validate_task(&details, &caps);
|
||||
if result.is_err() {
|
||||
response.send(result).unwrap_or(());
|
||||
} else {
|
||||
ltx.send(ValidateTask { details, response }).unwrap_or(());
|
||||
}
|
||||
});
|
||||
}
|
||||
ExecuteTask {
|
||||
details,
|
||||
varmap,
|
||||
output_options,
|
||||
response,
|
||||
kill,
|
||||
} => {
|
||||
let task = extract_details(&details).unwrap();
|
||||
let resources = task.resources.clone();
|
||||
|
||||
loop {
|
||||
match targets.iter_mut().enumerate().find(|(_, x)| {
|
||||
x.enabled && x.current_resources.can_satisfy(&task.resources)
|
||||
}) {
|
||||
// There is a remote agent with capacity
|
||||
Some((tid, target)) => {
|
||||
info!("Dispatching job to {}", target.base_url);
|
||||
target.current_resources.sub(&resources).unwrap();
|
||||
let base_url = target.base_url.clone();
|
||||
let submit_client = client.clone();
|
||||
running.push(tokio::spawn(async move {
|
||||
let res = submit_task(
|
||||
base_url,
|
||||
details,
|
||||
output_options,
|
||||
submit_client,
|
||||
varmap,
|
||||
)
|
||||
.await;
|
||||
let mut rc = false;
|
||||
if let Ok(attempt) = res {
|
||||
response.send(attempt).unwrap();
|
||||
rc = true;
|
||||
}
|
||||
(tid, resources, rc)
|
||||
}));
|
||||
break;
|
||||
}
|
||||
// No agent has capacity
|
||||
None => {
|
||||
// Give the outstanding tasks a chance to complete or agents
|
||||
// recover
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
|
||||
|
||||
// Refresh any disabled targets
|
||||
for (tid, target) in targets.iter_mut().enumerate() {
|
||||
if target.enabled {
|
||||
continue;
|
||||
}
|
||||
target.refresh_resources(&client).await;
|
||||
if target.enabled {
|
||||
max_caps[tid] = target.resources.clone();
|
||||
info!("{} is now enabled.", target.base_url);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the next item
|
||||
if !running.is_empty() {
|
||||
let result: Result<
|
||||
(usize, TaskResources, bool),
|
||||
tokio::task::JoinError,
|
||||
> = running.next().await.unwrap();
|
||||
|
||||
let (tid, resources, submit_ok) = result.unwrap();
|
||||
if !submit_ok {
|
||||
warn!(
|
||||
"Disabling agent at {} due to incomplete submission.",
|
||||
targets[tid].base_url
|
||||
);
|
||||
targets[tid].enabled = false;
|
||||
}
|
||||
targets[tid].current_resources.add(&resources);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
msg @ StopTask { .. } => {
|
||||
le_tx.send(msg).unwrap_or(());
|
||||
}
|
||||
*/
|
||||
Stop {} => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
targets: Vec<AgentTarget>,
|
||||
msgs: mpsc::UnboundedReceiver<ExecutorMessage>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
start_agent_executor(targets, msgs).await;
|
||||
})
|
||||
}
|
||||
@@ -13,24 +13,6 @@ use tokio::io::AsyncReadExt;
|
||||
|
||||
type Environment = HashMap<String, Option<String>>;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(untagged)]
|
||||
enum Cmd {
|
||||
Simple(String),
|
||||
Split(Vec<String>),
|
||||
}
|
||||
|
||||
impl Cmd {
|
||||
fn generate(&self, varmap: &VarMap) -> Vec<String> {
|
||||
let cmd = match self {
|
||||
Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(),
|
||||
Cmd::Split(v) => v.clone(),
|
||||
};
|
||||
|
||||
cmd.into_iter().map(|x| varmap.apply_to(&x)).collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains specifics on how to run a local task
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
struct LocalTaskDetail {
|
||||
@@ -115,6 +97,8 @@ async fn run_task(
|
||||
let (program, args) = cmd.split_first().unwrap();
|
||||
attempt.executor.push(format!("{:?}\n", details));
|
||||
|
||||
debug!("Running command {:?}", cmd);
|
||||
|
||||
let mut command = Command::new(program);
|
||||
command.stdout(Stdio::piped());
|
||||
command.stderr(Stdio::piped());
|
||||
@@ -255,12 +239,9 @@ pub async fn start_local_executor(
|
||||
});
|
||||
}
|
||||
ExecuteTask {
|
||||
task_name,
|
||||
interval,
|
||||
details,
|
||||
varmap,
|
||||
output_options,
|
||||
storage,
|
||||
response,
|
||||
kill,
|
||||
} => {
|
||||
@@ -277,15 +258,7 @@ pub async fn start_local_executor(
|
||||
..TaskAttempt::new()
|
||||
},
|
||||
};
|
||||
let rc = attempt.succeeded;
|
||||
storage
|
||||
.send(StorageMessage::StoreAttempt {
|
||||
task_name,
|
||||
interval,
|
||||
attempt,
|
||||
})
|
||||
.unwrap();
|
||||
response.send(rc).unwrap();
|
||||
response.send(attempt).unwrap();
|
||||
}));
|
||||
}
|
||||
Stop {} => {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
pub mod agent_executor;
|
||||
pub mod local_executor;
|
||||
|
||||
/// Messages for interacting with an Executor
|
||||
@@ -16,13 +17,10 @@ pub enum ExecutorMessage {
|
||||
/// Errors
|
||||
/// Will return `Err` if the tasks are invalid, according to the executor
|
||||
ExecuteTask {
|
||||
task_name: String,
|
||||
interval: Interval,
|
||||
details: serde_json::Value,
|
||||
varmap: VarMap,
|
||||
output_options: TaskOutputOptions,
|
||||
storage: mpsc::UnboundedSender<StorageMessage>,
|
||||
response: oneshot::Sender<bool>,
|
||||
response: oneshot::Sender<TaskAttempt>,
|
||||
kill: oneshot::Receiver<()>,
|
||||
},
|
||||
Stop {},
|
||||
@@ -32,6 +30,24 @@ fn default_bytes() -> usize {
|
||||
20480
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(untagged)]
|
||||
pub enum Cmd {
|
||||
Simple(String),
|
||||
Split(Vec<String>),
|
||||
}
|
||||
|
||||
impl Cmd {
|
||||
pub fn generate(&self, varmap: &VarMap) -> Vec<String> {
|
||||
let cmd = match self {
|
||||
Cmd::Simple(s) => s.split_whitespace().map(|x| x.to_string()).collect(),
|
||||
Cmd::Split(v) => v.clone(),
|
||||
};
|
||||
|
||||
cmd.into_iter().map(|x| varmap.apply_to(&x)).collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Options in how to handle task output. Some tasks can be quite
|
||||
/// verbose, and the output may not be needed.
|
||||
#[derive(Clone, Serialize, Deserialize, Copy, Debug, PartialEq, Hash, Eq)]
|
||||
|
||||
@@ -126,6 +126,12 @@ impl IntervalSet {
|
||||
pub fn difference(&self, other: &Self) -> Self {
|
||||
self.intersection(&other.complement())
|
||||
}
|
||||
|
||||
/// Subtract all intervals in `other` from self
|
||||
/// both sides must be sorted
|
||||
pub fn subtract(&mut self, other: &Self) {
|
||||
self.0 = self.difference(other).0;
|
||||
}
|
||||
}
|
||||
impl Deref for IntervalSet {
|
||||
type Target = Vec<Interval>;
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(dead_code)]
|
||||
#![feature(slice_group_by)]
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use chrono::prelude::*;
|
||||
use chrono::{Duration, TimeZone};
|
||||
use chrono_tz::Tz;
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
@@ -42,9 +44,3 @@ pub mod task;
|
||||
pub mod task_set;
|
||||
pub mod varmap;
|
||||
pub mod world;
|
||||
|
||||
/*
|
||||
TODO:
|
||||
target_state -> TaskSet.coverage()
|
||||
current state
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
pub use chrono::prelude::*;
|
||||
pub use chrono_tz::*;
|
||||
|
||||
pub use crate::calendar::Calendar;
|
||||
pub use crate::executors::*;
|
||||
pub use crate::runner::Runner;
|
||||
pub use crate::interval::Interval;
|
||||
pub use crate::runner::{ActionState, Runner, RunnerMessage};
|
||||
pub use crate::storage::*;
|
||||
pub use crate::task::TaskDefinition;
|
||||
pub use crate::task::{TaskDefinition, TaskResources};
|
||||
pub use crate::world::WorldDefinition;
|
||||
|
||||
@@ -18,6 +18,8 @@ pub trait Satisfiable {
|
||||
schedule: &Schedule,
|
||||
available: &HashMap<String, IntervalSet>,
|
||||
) -> bool;
|
||||
|
||||
fn resources(&self) -> HashSet<Resource>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
|
||||
@@ -29,6 +31,23 @@ pub enum AggregateRequirement {
|
||||
}
|
||||
|
||||
impl Satisfiable for AggregateRequirement {
|
||||
fn resources(&self) -> HashSet<Resource> {
|
||||
match self {
|
||||
AggregateRequirement::All(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
|
||||
acc.extend(req.resources());
|
||||
acc
|
||||
}),
|
||||
AggregateRequirement::Any(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
|
||||
acc.extend(req.resources());
|
||||
acc
|
||||
}),
|
||||
AggregateRequirement::None(reqs) => reqs.iter().fold(HashSet::new(), |mut acc, req| {
|
||||
acc.extend(req.resources());
|
||||
acc
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_satisfied(
|
||||
&self,
|
||||
interval: Interval,
|
||||
@@ -76,6 +95,13 @@ pub enum SingleRequirement {
|
||||
}
|
||||
|
||||
impl Satisfiable for SingleRequirement {
|
||||
fn resources(&self) -> HashSet<Resource> {
|
||||
match self {
|
||||
SingleRequirement::Offset { resource, .. } => HashSet::from([resource.to_owned()]),
|
||||
SingleRequirement::File { path } => HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_satisfied(
|
||||
&self,
|
||||
interval: Interval,
|
||||
@@ -145,6 +171,13 @@ impl Satisfiable for Requirement {
|
||||
Requirement::Group(req) => req.can_be_satisfied(interval, schedule, available),
|
||||
}
|
||||
}
|
||||
|
||||
fn resources(&self) -> HashSet<Resource> {
|
||||
match self {
|
||||
Requirement::One(req) => req.resources(),
|
||||
Requirement::Group(req) => req.resources(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use super::*;
|
||||
use futures::stream::futures_unordered::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/*
|
||||
Runner is responsible for taking a TaskSet and a varmap and
|
||||
@@ -12,7 +14,7 @@ use futures::StreamExt;
|
||||
- current = TaskSet::coverage (the theoretical)
|
||||
*/
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, PartialOrd)]
|
||||
pub enum ActionState {
|
||||
Queued,
|
||||
Running,
|
||||
@@ -20,26 +22,54 @@ pub enum ActionState {
|
||||
Completed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
pub struct Action {
|
||||
task: String,
|
||||
interval: Interval,
|
||||
state: ActionState,
|
||||
task: usize,
|
||||
pub interval: Interval,
|
||||
pub state: ActionState,
|
||||
// kill: Option<oneshot::Receiver<()>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum RunnerEvent {
|
||||
Start,
|
||||
TaskFailed {
|
||||
task_name: String,
|
||||
pub struct RunnerState {
|
||||
coverage: ResourceInterval,
|
||||
current: ResourceInterval,
|
||||
}
|
||||
|
||||
// Eventually we want to coerce the data into this format for timelines-chart
|
||||
// Resource (group) -> Task (label) -> data [ { "timeRange": [date,date], "val": state } ]
|
||||
pub type ResourceStateDetails = HashMap<Resource, HashMap<String, Vec<Action>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RunnerMessage {
|
||||
Tick,
|
||||
PollMessages,
|
||||
ActionCompleted {
|
||||
action_id: usize,
|
||||
succeeded: bool,
|
||||
},
|
||||
RetryAction {
|
||||
action_id: usize,
|
||||
},
|
||||
/// Marks all resources in the set available over the interval
|
||||
ForceUp {
|
||||
resources: HashSet<String>,
|
||||
interval: Interval,
|
||||
},
|
||||
TaskCompleted {
|
||||
task_name: String,
|
||||
/// Marks all resources in the set as down over _at least_ the interval.
|
||||
/// Will cause a re-check / re-gen
|
||||
ForceDown {
|
||||
resources: HashSet<String>,
|
||||
interval: Interval,
|
||||
},
|
||||
Timeout,
|
||||
GetState {
|
||||
response: oneshot::Sender<RunnerState>,
|
||||
},
|
||||
GetResourceStateDetails {
|
||||
interval: Interval,
|
||||
response: oneshot::Sender<ResourceStateDetails>,
|
||||
max_intervals: Option<usize>,
|
||||
},
|
||||
Stop,
|
||||
}
|
||||
|
||||
@@ -54,23 +84,17 @@ pub struct Runner {
|
||||
target: ResourceInterval,
|
||||
current: ResourceInterval,
|
||||
|
||||
queue: Vec<Action>,
|
||||
actions: Vec<Action>,
|
||||
qidx: usize,
|
||||
|
||||
events: FuturesUnordered<tokio::task::JoinHandle<RunnerEvent>>,
|
||||
events: FuturesUnordered<tokio::task::JoinHandle<RunnerMessage>>,
|
||||
|
||||
last_horizon: DateTime<Utc>,
|
||||
messages: mpsc::UnboundedReceiver<RunnerMessage>,
|
||||
executor: mpsc::UnboundedSender<ExecutorMessage>,
|
||||
storage: mpsc::UnboundedSender<StorageMessage>,
|
||||
}
|
||||
|
||||
fn gen_timeout(timeout: i64) -> tokio::task::JoinHandle<RunnerEvent> {
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::seconds(timeout).to_std().unwrap()).await;
|
||||
RunnerEvent::Timeout
|
||||
})
|
||||
}
|
||||
|
||||
async fn validate_cmd(
|
||||
executor: mpsc::UnboundedSender<ExecutorMessage>,
|
||||
cmd: serde_json::Value,
|
||||
@@ -95,36 +119,43 @@ async fn run_task(
|
||||
output_options: &TaskOutputOptions,
|
||||
varmap: &VarMap,
|
||||
) -> bool {
|
||||
println!("Running {}/{}", task_name, interval);
|
||||
info!("Running {}/{}", task_name, interval);
|
||||
let (response, response_rx) = oneshot::channel();
|
||||
executor
|
||||
.send(ExecutorMessage::ExecuteTask {
|
||||
task_name,
|
||||
interval,
|
||||
details,
|
||||
output_options: output_options.clone(),
|
||||
varmap: varmap.clone(),
|
||||
response,
|
||||
kill,
|
||||
storage,
|
||||
})
|
||||
.unwrap();
|
||||
response_rx.await.unwrap()
|
||||
let attempt = response_rx.await.unwrap();
|
||||
let rc = attempt.succeeded;
|
||||
storage
|
||||
.send(StorageMessage::StoreAttempt {
|
||||
task_name,
|
||||
interval,
|
||||
attempt: attempt.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
rc
|
||||
}
|
||||
|
||||
async fn up_task(
|
||||
action_id: usize,
|
||||
task_name: String,
|
||||
interval: Interval,
|
||||
kill: oneshot::Receiver<()>,
|
||||
_kill: oneshot::Receiver<()>,
|
||||
varmap: VarMap,
|
||||
up: TaskDetails,
|
||||
check: Option<TaskDetails>,
|
||||
output_options: TaskOutputOptions,
|
||||
executor: mpsc::UnboundedSender<ExecutorMessage>,
|
||||
storage: mpsc::UnboundedSender<StorageMessage>,
|
||||
) -> RunnerEvent {
|
||||
) -> RunnerMessage {
|
||||
if let Some(check_cmd) = check.clone() {
|
||||
let (subkill, subkill_rx) = oneshot::channel();
|
||||
let (_subkill, subkill_rx) = oneshot::channel();
|
||||
let succeeded = run_task(
|
||||
task_name.clone(),
|
||||
interval,
|
||||
@@ -139,15 +170,15 @@ async fn up_task(
|
||||
|
||||
// If check succeeded, resources are up
|
||||
if succeeded {
|
||||
return RunnerEvent::TaskCompleted {
|
||||
task_name,
|
||||
interval,
|
||||
return RunnerMessage::ActionCompleted {
|
||||
action_id,
|
||||
succeeded: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// UP
|
||||
let (subkill, subkill_rx) = oneshot::channel();
|
||||
let (_subkill, subkill_rx) = oneshot::channel();
|
||||
let succeeded = run_task(
|
||||
task_name.clone(),
|
||||
interval,
|
||||
@@ -160,15 +191,15 @@ async fn up_task(
|
||||
)
|
||||
.await;
|
||||
if !succeeded {
|
||||
return RunnerEvent::TaskFailed {
|
||||
task_name,
|
||||
interval,
|
||||
return RunnerMessage::ActionCompleted {
|
||||
action_id,
|
||||
succeeded: false,
|
||||
};
|
||||
}
|
||||
|
||||
// recheck
|
||||
if let Some(check_cmd) = check {
|
||||
let (subkill, subkill_rx) = oneshot::channel();
|
||||
let (_subkill, subkill_rx) = oneshot::channel();
|
||||
let succeeded = run_task(
|
||||
task_name.clone(),
|
||||
interval,
|
||||
@@ -183,33 +214,79 @@ async fn up_task(
|
||||
|
||||
// If check succeeded, resources are up
|
||||
if succeeded {
|
||||
RunnerEvent::TaskCompleted {
|
||||
task_name,
|
||||
interval,
|
||||
}
|
||||
return RunnerMessage::ActionCompleted {
|
||||
action_id,
|
||||
succeeded: true,
|
||||
};
|
||||
} else {
|
||||
RunnerEvent::TaskFailed {
|
||||
task_name,
|
||||
interval,
|
||||
}
|
||||
return RunnerMessage::ActionCompleted {
|
||||
action_id,
|
||||
succeeded: false,
|
||||
};
|
||||
}
|
||||
} else {
|
||||
RunnerEvent::TaskCompleted {
|
||||
task_name,
|
||||
interval,
|
||||
return RunnerMessage::ActionCompleted {
|
||||
action_id,
|
||||
succeeded: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn delayed_event(delay: Duration, event: RunnerMessage) -> tokio::task::JoinHandle<RunnerMessage> {
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(delay.to_std().unwrap()).await;
|
||||
event
|
||||
})
|
||||
}
|
||||
|
||||
// Coalesces adjascent actions
|
||||
fn coalesce_actions(mut actions: Vec<Action>) -> Vec<Action> {
|
||||
if actions.is_empty() {
|
||||
return actions;
|
||||
}
|
||||
|
||||
actions.sort_unstable_by(|a, b| {
|
||||
let ord = a.task.partial_cmp(&b.task).unwrap();
|
||||
if ord == Ordering::Equal {
|
||||
a.state.partial_cmp(&b.state).unwrap()
|
||||
} else {
|
||||
ord
|
||||
}
|
||||
});
|
||||
|
||||
let mut res: Vec<Action> = Vec::new();
|
||||
for group in actions.group_by(|a, b| a.task == b.task && a.state == b.state) {
|
||||
let intervals: Vec<Interval> = group.iter().map(|x| x.interval).collect();
|
||||
let is = IntervalSet::from(intervals);
|
||||
let task = group.first().unwrap().task;
|
||||
let state = group.first().unwrap().state;
|
||||
|
||||
for interval in is.iter() {
|
||||
res.push(Action {
|
||||
task: task,
|
||||
state: state,
|
||||
interval: *interval,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
pub async fn new(
|
||||
tasks: TaskSet,
|
||||
vars: VarMap,
|
||||
messages: mpsc::UnboundedReceiver<RunnerMessage>,
|
||||
executor: mpsc::UnboundedSender<ExecutorMessage>,
|
||||
storage: mpsc::UnboundedSender<StorageMessage>,
|
||||
output_options: TaskOutputOptions,
|
||||
force_check: bool,
|
||||
) -> Result<Self> {
|
||||
for tdef in tasks.values() {
|
||||
tasks.validate()?;
|
||||
|
||||
// Validate the task commands can run on the executor
|
||||
for tdef in tasks.iter() {
|
||||
validate_cmd(executor.clone(), tdef.up.clone()).await?;
|
||||
if let Some(cmd) = &tdef.down {
|
||||
validate_cmd(executor.clone(), cmd.clone()).await?;
|
||||
@@ -219,172 +296,315 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
let (response, rx) = oneshot::channel();
|
||||
storage
|
||||
.send(StorageMessage::LoadState { response })
|
||||
.unwrap();
|
||||
let current = rx.await.unwrap();
|
||||
// Load last-known state
|
||||
let current = if force_check {
|
||||
info!("Force re-check set, starting with empty current state.");
|
||||
ResourceInterval::new()
|
||||
} else {
|
||||
info!("Pulling last state from storage");
|
||||
let (response, rx) = oneshot::channel();
|
||||
storage
|
||||
.send(StorageMessage::LoadState { response })
|
||||
.unwrap();
|
||||
let res = rx.await.unwrap();
|
||||
res
|
||||
};
|
||||
// let target = current.clone();
|
||||
let target = ResourceInterval::new();
|
||||
|
||||
let end_state = tasks.coverage()?;
|
||||
let end_state = tasks.coverage();
|
||||
let mut runner = Runner {
|
||||
tasks,
|
||||
vars,
|
||||
output_options,
|
||||
end_state,
|
||||
target: ResourceInterval::new(),
|
||||
target,
|
||||
current,
|
||||
queue: Vec::new(),
|
||||
actions: Vec::new(),
|
||||
qidx: 0,
|
||||
events: FuturesUnordered::new(),
|
||||
last_horizon: DateTime::<Utc>::MIN_UTC,
|
||||
messages,
|
||||
executor,
|
||||
storage,
|
||||
};
|
||||
|
||||
runner.tick()?;
|
||||
runner.update_target();
|
||||
|
||||
Ok(runner)
|
||||
}
|
||||
|
||||
pub fn tick(&mut self) -> Result<()> {
|
||||
let target = self.tasks.get_state(Utc::now())?;
|
||||
// Generate a new target state and generate any required actions
|
||||
pub fn update_target(&mut self) {
|
||||
let new_target = self.tasks.get_state(Utc::now() + Duration::days(1));
|
||||
let new_required = new_target.difference(&self.target);
|
||||
let mut new_actions =
|
||||
self.tasks
|
||||
.iter()
|
||||
.enumerate()
|
||||
.fold(Vec::new(), |mut acc, (idx, task)| {
|
||||
let get_state = |intv: Interval| {
|
||||
if task.provides.iter().all(|res| {
|
||||
self.current.contains_key(res) && self.current[res].has_subset(intv)
|
||||
}) {
|
||||
ActionState::Completed
|
||||
} else {
|
||||
ActionState::Queued
|
||||
}
|
||||
};
|
||||
let res: Vec<Action> = task
|
||||
.generate_intervals(&new_required)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map({
|
||||
|interval| Action {
|
||||
task: idx,
|
||||
interval,
|
||||
state: get_state(interval),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
acc.extend(res);
|
||||
acc
|
||||
});
|
||||
new_actions.sort_unstable_by(|a, b| a.interval.end.partial_cmp(&b.interval.end).unwrap());
|
||||
|
||||
// Create queue
|
||||
let required = target.difference(&self.current);
|
||||
self.queue = self.tasks.iter().fold(Vec::new(), |mut acc, (name, task)| {
|
||||
let res: Vec<Action> = task
|
||||
.generate_intervals(&required)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map({
|
||||
|interval| Action {
|
||||
task: name.clone(),
|
||||
interval,
|
||||
state: ActionState::Queued,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
acc.extend(res);
|
||||
acc
|
||||
});
|
||||
info!("Tick: Generated {} new actions", new_actions.len());
|
||||
self.actions.extend(new_actions);
|
||||
}
|
||||
|
||||
// Ensure that all actions can be satisfied
|
||||
let unsatisfied = self
|
||||
.queue
|
||||
.iter()
|
||||
.filter(|act| {
|
||||
!self
|
||||
.tasks
|
||||
.get(&act.task)
|
||||
.unwrap()
|
||||
.can_be_satisfied(act.interval, &target)
|
||||
})
|
||||
.fold(HashSet::new(), |mut acc, a| {
|
||||
acc.insert(a.task.clone());
|
||||
fn tick(&mut self) {
|
||||
debug!("Tick");
|
||||
// Enqueue new messages
|
||||
while let Ok(msg) = self.messages.try_recv() {
|
||||
self.events.push(delayed_event(Duration::seconds(0), msg));
|
||||
}
|
||||
match self.actions.last() {
|
||||
Some(action) => {
|
||||
if action.interval.end <= Utc::now() {
|
||||
self.tick()
|
||||
}
|
||||
}
|
||||
None => self.tick(),
|
||||
}
|
||||
|
||||
// Perform maintenance
|
||||
self.queue_actions();
|
||||
|
||||
self.events.push(delayed_event(
|
||||
Duration::milliseconds(250),
|
||||
RunnerMessage::Tick,
|
||||
));
|
||||
}
|
||||
|
||||
fn poll_messages(&mut self) {
|
||||
while let Ok(msg) = self.messages.try_recv() {
|
||||
self.events.push(delayed_event(Duration::seconds(0), msg));
|
||||
}
|
||||
self.events.push(delayed_event(
|
||||
Duration::milliseconds(10),
|
||||
RunnerMessage::PollMessages,
|
||||
));
|
||||
}
|
||||
|
||||
fn get_resource_state_details(
|
||||
&self,
|
||||
interval: Interval,
|
||||
response: oneshot::Sender<ResourceStateDetails>,
|
||||
max_intervals: Option<usize>,
|
||||
) {
|
||||
// HashMap<Resource, HashMap<String, Vec<(DateTime<Utc>, DateTime<Utc>, ActionState)>>>;
|
||||
let mut res: ResourceStateDetails = HashMap::new();
|
||||
|
||||
let all_resources: HashSet<Resource> =
|
||||
self.tasks.iter().fold(HashSet::new(), |mut acc, t| {
|
||||
acc.extend(t.provides.clone());
|
||||
acc
|
||||
});
|
||||
|
||||
// Ensure current +
|
||||
let mut result_state = self.current.clone();
|
||||
for action in &self.queue {
|
||||
for res in &self.tasks.get(&action.task).unwrap().provides {
|
||||
result_state
|
||||
.entry(res.clone())
|
||||
.or_insert(IntervalSet::new())
|
||||
.insert(action.interval);
|
||||
// Build out the hash
|
||||
for resource in all_resources {
|
||||
let mut res_ints = HashMap::new();
|
||||
for task in self.tasks.iter() {
|
||||
if task.provides.contains(&resource) {
|
||||
res_ints.insert(task.name.clone(), Vec::new());
|
||||
}
|
||||
}
|
||||
res.insert(resource.clone(), res_ints);
|
||||
}
|
||||
|
||||
let mut actions: Vec<Action> = self
|
||||
.actions
|
||||
.iter()
|
||||
.filter(|x| interval.is_contiguous(x.interval))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if let Some(max_intv) = max_intervals {
|
||||
if actions.len() > max_intv {
|
||||
actions = coalesce_actions(actions);
|
||||
}
|
||||
}
|
||||
if result_state != target {
|
||||
return Err(anyhow!(
|
||||
"Actions generated produce\n\t{:?}\nExpected\n\t{:?}",
|
||||
result_state,
|
||||
target
|
||||
));
|
||||
|
||||
info!(
|
||||
"Filtered {} actions down to {}",
|
||||
self.actions.len(),
|
||||
actions.len()
|
||||
);
|
||||
|
||||
for action in actions {
|
||||
let task = &self.tasks[action.task];
|
||||
for resource in &task.provides {
|
||||
res.get_mut(resource)
|
||||
.unwrap()
|
||||
.get_mut(&task.name)
|
||||
.unwrap()
|
||||
.push(action);
|
||||
}
|
||||
}
|
||||
|
||||
if unsatisfied.is_empty() {
|
||||
self.target = target;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("Tasks {:?} cannot complete as the target state does not provide required resources", unsatisfied))
|
||||
}
|
||||
response.send(res).unwrap();
|
||||
}
|
||||
|
||||
// We'll be using channels for running
|
||||
pub async fn run(&mut self, stop: oneshot::Receiver<RunnerEvent>) {
|
||||
self.events.push(tokio::spawn(async move {
|
||||
// This recv will fail if the channel is shutdown, so just ignore it.
|
||||
stop.await.unwrap_or(RunnerEvent::Stop);
|
||||
RunnerEvent::Stop
|
||||
}));
|
||||
self.queue_actions();
|
||||
pub async fn run(&mut self, mut stay_up: bool) {
|
||||
self.tick();
|
||||
self.poll_messages();
|
||||
|
||||
// Loop while we can make progress
|
||||
while !self.is_done() {
|
||||
// Loop until the current state matches the end state
|
||||
while stay_up || !self.is_done() {
|
||||
match self.events.next().await {
|
||||
Some(Ok(RunnerEvent::Start)) => {
|
||||
self.queue_actions();
|
||||
Some(Ok(RunnerMessage::GetState { response })) => {
|
||||
response
|
||||
.send(RunnerState {
|
||||
current: self.current.clone(),
|
||||
coverage: self.end_state.clone(),
|
||||
})
|
||||
.unwrap_or(());
|
||||
}
|
||||
Some(Ok(RunnerEvent::Stop)) => {
|
||||
Some(Ok(RunnerMessage::PollMessages)) => {
|
||||
self.poll_messages();
|
||||
}
|
||||
Some(Ok(RunnerMessage::Tick)) => {
|
||||
self.tick();
|
||||
}
|
||||
Some(Ok(RunnerMessage::GetResourceStateDetails {
|
||||
interval,
|
||||
response,
|
||||
max_intervals,
|
||||
})) => {
|
||||
self.get_resource_state_details(interval, response, max_intervals);
|
||||
}
|
||||
Some(Ok(RunnerMessage::ForceUp {
|
||||
resources,
|
||||
interval,
|
||||
})) => {
|
||||
for (tid, task) in self.tasks.iter().enumerate() {
|
||||
if task.provides.is_subset(&resources) {
|
||||
let aligned_is =
|
||||
IntervalSet::from(task.schedule.align_interval(interval));
|
||||
for resource in &task.provides {
|
||||
self.current.get_mut(resource).unwrap().merge(&aligned_is);
|
||||
}
|
||||
for action in &mut self.actions {
|
||||
if action.task == tid && aligned_is.has_subset(action.interval) {
|
||||
action.state = ActionState::Completed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.store_state();
|
||||
}
|
||||
Some(Ok(RunnerMessage::ForceDown {
|
||||
resources,
|
||||
interval,
|
||||
})) => {
|
||||
// Use the interval to identify
|
||||
for (tid, task) in self.tasks.iter().enumerate() {
|
||||
if task.provides.is_subset(&resources) {
|
||||
let aligned_is =
|
||||
IntervalSet::from(task.schedule.align_interval(interval));
|
||||
for resource in &task.provides {
|
||||
self.current
|
||||
.get_mut(resource)
|
||||
.unwrap()
|
||||
.subtract(&aligned_is);
|
||||
}
|
||||
for action in &mut self.actions {
|
||||
if action.task == tid && aligned_is.has_subset(action.interval) {
|
||||
action.state = ActionState::Queued;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.store_state();
|
||||
}
|
||||
Some(Ok(RunnerMessage::Stop)) => {
|
||||
info!("Stopping");
|
||||
stay_up = false;
|
||||
break;
|
||||
}
|
||||
Some(Ok(RunnerEvent::Timeout)) => {
|
||||
self.queue_actions();
|
||||
Some(Ok(RunnerMessage::RetryAction { action_id })) => {
|
||||
info!("Retrying action {}", action_id);
|
||||
let action = &mut self.actions[action_id];
|
||||
action.state = ActionState::Queued;
|
||||
}
|
||||
Some(Ok(RunnerEvent::TaskFailed {
|
||||
task_name,
|
||||
interval,
|
||||
Some(Ok(RunnerMessage::ActionCompleted {
|
||||
action_id,
|
||||
succeeded,
|
||||
})) => {
|
||||
println!("FAILED: {} / {}", task_name, interval);
|
||||
println!("Well that sucks");
|
||||
}
|
||||
Some(Ok(RunnerEvent::TaskCompleted {
|
||||
task_name,
|
||||
interval,
|
||||
})) => {
|
||||
println!("Completing {}/{}", task_name, interval);
|
||||
let action = self
|
||||
.queue
|
||||
.iter_mut()
|
||||
.find(|x| x.task == task_name && x.interval == interval)
|
||||
.unwrap();
|
||||
let task = self.tasks.get(&task_name).unwrap();
|
||||
action.state = ActionState::Completed;
|
||||
for res in &task.provides {
|
||||
self.current
|
||||
.entry(res.clone())
|
||||
.or_insert(IntervalSet::new())
|
||||
.insert(action.interval);
|
||||
}
|
||||
self.storage
|
||||
.send(StorageMessage::StoreState {
|
||||
state: self.current.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
self.queue_actions();
|
||||
self.complete_task(action_id, succeeded);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
panic!("Something went wrong: {:?}", e)
|
||||
}
|
||||
None => {
|
||||
// No pending actions waiting
|
||||
// Can probably wait to the next event
|
||||
continue;
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
// Log stuff
|
||||
}
|
||||
}
|
||||
|
||||
fn complete_task(&mut self, action_id: usize, succeeded: bool) {
|
||||
info!("Completing action {}", action_id);
|
||||
let action = &mut self.actions[action_id];
|
||||
if succeeded {
|
||||
let task = self.tasks.get(action.task).unwrap();
|
||||
action.state = ActionState::Completed;
|
||||
for res in &task.provides {
|
||||
self.current
|
||||
.entry(res.clone())
|
||||
.or_insert(IntervalSet::new())
|
||||
.insert(action.interval);
|
||||
}
|
||||
self.store_state();
|
||||
self.queue_actions();
|
||||
} else {
|
||||
action.state = ActionState::Errored;
|
||||
self.events.push(delayed_event(
|
||||
Duration::seconds(30),
|
||||
RunnerMessage::RetryAction { action_id },
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn store_state(&self) {
|
||||
self.storage
|
||||
.send(StorageMessage::StoreState {
|
||||
state: self.current.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn queue_actions(&mut self) {
|
||||
let now = Utc::now();
|
||||
|
||||
// Collect any outstanding futures
|
||||
for action in self.queue[self.qidx..]
|
||||
// Submit any elligible jobs
|
||||
for (action_id, action) in self
|
||||
.actions
|
||||
.iter_mut()
|
||||
.filter(|x| x.state == ActionState::Queued && x.interval.end <= now)
|
||||
.enumerate()
|
||||
.filter(|(_, x)| x.state == ActionState::Queued && x.interval.end <= now)
|
||||
{
|
||||
let task = self.tasks.get(&action.task).unwrap();
|
||||
let task = self.tasks.get(action.task).unwrap();
|
||||
if !task.can_run(action.interval, &self.current) {
|
||||
continue;
|
||||
}
|
||||
@@ -393,7 +613,7 @@ impl Runner {
|
||||
.iter()
|
||||
.chain(self.vars.iter())
|
||||
.collect();
|
||||
let task_name = action.task.clone();
|
||||
let task_name = task.name.clone();
|
||||
let interval = action.interval;
|
||||
let up = task.up.clone();
|
||||
let check = task.check.clone();
|
||||
@@ -402,6 +622,7 @@ impl Runner {
|
||||
let storage = self.storage.clone();
|
||||
self.events.push(tokio::spawn(async move {
|
||||
up_task(
|
||||
action_id,
|
||||
task_name.clone(),
|
||||
interval,
|
||||
kill,
|
||||
@@ -483,24 +704,26 @@ mod tests {
|
||||
|
||||
// Storage
|
||||
let (storage_tx, storage_rx) = mpsc::unbounded_channel();
|
||||
let storage = redis_store::start(
|
||||
let storage = storage::redis::start(
|
||||
storage_rx,
|
||||
"redis://localhost".to_owned(),
|
||||
"world_test".to_owned(),
|
||||
);
|
||||
|
||||
let (runner_tx, runner_rx) = mpsc::unbounded_channel();
|
||||
let mut runner = Runner::new(
|
||||
tasks,
|
||||
world_def.variables,
|
||||
runner_rx,
|
||||
tx.clone(),
|
||||
storage_tx.clone(),
|
||||
world_def.output_options,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (wtx, wrx) = oneshot::channel();
|
||||
runner.run(wrx).await;
|
||||
runner.run(false).await;
|
||||
|
||||
tx.send(ExecutorMessage::Stop {}).unwrap();
|
||||
executor.await.unwrap();
|
||||
|
||||
@@ -21,6 +21,32 @@ impl Schedule {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_end_time<T: TimeZone>(&self, dt: DateTime<T>) -> bool {
|
||||
// Need to get the current interval, then offset it
|
||||
let at = dt.with_timezone(&self.timezone);
|
||||
self.times.iter().any(|x| *x == at.time())
|
||||
&& self.calendar.includes(at.date().naive_local())
|
||||
}
|
||||
|
||||
/// Given an interval I, return the interval J that is the smallest
|
||||
/// set of schedule intervals that completely contain I.
|
||||
/// If the given interval is bounded by MIN_TIME or MAX_TIME, then the
|
||||
/// returned interval will be likewise bounded
|
||||
pub fn align_interval(&self, interval: Interval) -> Interval {
|
||||
let st = if interval.start == MIN_TIME {
|
||||
self.next_time(interval.start).with_timezone(&Utc)
|
||||
} else {
|
||||
interval.start
|
||||
};
|
||||
let et = if interval.end == MAX_TIME {
|
||||
self.prev_time(interval.end).with_timezone(&Utc)
|
||||
} else {
|
||||
interval.end
|
||||
};
|
||||
|
||||
Interval::new(self.interval(st, 0).start, self.interval(et, 0).end)
|
||||
}
|
||||
|
||||
pub fn generate(&self, interval: Interval) -> Vec<Interval> {
|
||||
if self.times.is_empty() {
|
||||
return Vec::new();
|
||||
@@ -67,9 +93,7 @@ impl Schedule {
|
||||
let at = dt.with_timezone(&self.timezone);
|
||||
|
||||
// If the time is at an edge
|
||||
let rt = if self.times.iter().any(|x| *x == at.time())
|
||||
&& self.calendar.includes(at.date().naive_local())
|
||||
{
|
||||
let rt = if self.is_end_time(at) {
|
||||
at
|
||||
} else {
|
||||
self.next_time(at)
|
||||
@@ -82,7 +106,7 @@ impl Schedule {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn next_time(&self, dt: DateTime<Tz>) -> DateTime<Tz> {
|
||||
pub fn next_time<T: TimeZone>(&self, dt: DateTime<T>) -> DateTime<Tz> {
|
||||
let st = dt.with_timezone(&self.timezone);
|
||||
|
||||
let mut date = st.date().naive_local();
|
||||
@@ -108,7 +132,7 @@ impl Schedule {
|
||||
}
|
||||
|
||||
/// Given a time, generate the preceding interval according to the schedule
|
||||
pub fn prev_time(&self, dt: DateTime<Tz>) -> DateTime<Tz> {
|
||||
pub fn prev_time<T: TimeZone>(&self, dt: DateTime<T>) -> DateTime<Tz> {
|
||||
let st = dt.with_timezone(&self.timezone);
|
||||
|
||||
let mut date = st.date().naive_local();
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use super::*;
|
||||
use crate::executors::TaskAttempt;
|
||||
use crate::runner::ActionState;
|
||||
|
||||
/// Messages for interacting with an Executor
|
||||
#[derive(Debug)]
|
||||
pub enum StorageMessage {
|
||||
Clear {},
|
||||
StoreAttempt {
|
||||
task_name: String,
|
||||
interval: Interval,
|
||||
@@ -25,4 +27,5 @@ pub enum StorageMessage {
|
||||
Stop {},
|
||||
}
|
||||
|
||||
pub mod redis_store;
|
||||
pub mod noop;
|
||||
pub mod redis;
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
use super::*;
|
||||
|
||||
/// The mpsc channel can be sized to fit max parallelism
|
||||
pub async fn start_storage(mut msgs: mpsc::UnboundedReceiver<StorageMessage>) -> Result<()> {
|
||||
let mut current_state = ResourceInterval::new();
|
||||
while let Some(msg) = msgs.recv().await {
|
||||
use StorageMessage::*;
|
||||
match msg {
|
||||
Clear {} => {
|
||||
current_state = ResourceInterval::new();
|
||||
}
|
||||
StoreAttempt { .. } => {}
|
||||
StoreState { state } => {
|
||||
current_state = state;
|
||||
}
|
||||
LoadState { response } => {
|
||||
response.send(current_state.clone()).unwrap();
|
||||
}
|
||||
Stop {} => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start(msgs: mpsc::UnboundedReceiver<StorageMessage>) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
start_storage(msgs).await.expect("Unable to start storage");
|
||||
})
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::*;
|
||||
|
||||
extern crate redis;
|
||||
|
||||
use futures::prelude::*;
|
||||
use redis::AsyncCommands;
|
||||
|
||||
@@ -15,22 +17,47 @@ pub async fn start_redis_storage(
|
||||
while let Some(msg) = msgs.recv().await {
|
||||
use StorageMessage::*;
|
||||
match msg {
|
||||
Clear {} => {
|
||||
let mut keys = Vec::new();
|
||||
{
|
||||
let mut iter: redis::AsyncIter<String> =
|
||||
conn.scan_match(format!("{}:*", prefix)).await?;
|
||||
while let Some(key) = iter.next_item().await {
|
||||
keys.push(key);
|
||||
}
|
||||
}
|
||||
for key in keys {
|
||||
conn.del(key).await?;
|
||||
}
|
||||
}
|
||||
StoreAttempt {
|
||||
task_name,
|
||||
interval,
|
||||
attempt,
|
||||
} => {
|
||||
let tag = format!("{}_{}_{}", prefix, task_name, interval.end);
|
||||
let tag = format!("{}:{}_{}", prefix, task_name, interval.end);
|
||||
let payload = serde_json::to_string(&attempt).unwrap();
|
||||
conn.rpush(&tag, &payload).await?;
|
||||
}
|
||||
/*
|
||||
SetTaskIntervalState {
|
||||
task_name,
|
||||
interval,
|
||||
state,
|
||||
} => {
|
||||
let map = format!("{}:task_interval_states", prefix);
|
||||
let key = format!("{}_{}-{}", task_name, interval.start, interval.end);
|
||||
let value = serde_json::to_string(&state).unwrap();
|
||||
conn.hset(&map, &key, &value).await?;
|
||||
}
|
||||
*/
|
||||
StoreState { state } => {
|
||||
let tag = format!("{}_state", prefix);
|
||||
let tag = format!("{}:state", prefix);
|
||||
let payload = serde_json::to_string(&state).unwrap();
|
||||
conn.set(&tag, &payload).await?;
|
||||
}
|
||||
LoadState { response } => {
|
||||
let tag = format!("{}_state", prefix);
|
||||
let tag = format!("{}:state", prefix);
|
||||
let payload: String = conn.get(&tag).await.unwrap_or("{}".to_owned());
|
||||
let is: ResourceInterval = serde_json::from_str(&payload).unwrap();
|
||||
response.send(is).unwrap();
|
||||
@@ -1,4 +1,63 @@
|
||||
use super::*;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
pub struct TaskResources(HashMap<String, i64>);
|
||||
|
||||
impl Deref for TaskResources {
|
||||
type Target = HashMap<String, i64>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for TaskResources {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskResources {
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
TaskResources(HashMap::new())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn can_satisfy(&self, requirements: &TaskResources) -> bool {
|
||||
requirements
|
||||
.iter()
|
||||
.all(|(k, v)| self.contains_key(k) && self[k] >= *v)
|
||||
}
|
||||
|
||||
/// Subtracts resources from available resources.
|
||||
/// # Errors
|
||||
/// Returns an `Err` if the requested resources cannot be fulfilled
|
||||
/// # Panics
|
||||
/// It doesn't, keys are checked for ahead-of-time
|
||||
pub fn sub(&mut self, resources: &TaskResources) -> Result<()> {
|
||||
if self.can_satisfy(resources) {
|
||||
for (k, v) in resources.iter() {
|
||||
*self.get_mut(k).unwrap() -= v;
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("Cannot satisfy requested resources"))
|
||||
}
|
||||
}
|
||||
|
||||
/// # Panics
|
||||
/// It doesn't, keys are checked for ahead-of-time
|
||||
pub fn add(&mut self, resources: &TaskResources) {
|
||||
for (k, v) in resources.iter() {
|
||||
if self.contains_key(k) {
|
||||
*self.get_mut(k).unwrap() += *v;
|
||||
} else {
|
||||
self.insert(k.clone(), *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Defines the struct to parse for tasks
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
|
||||
@@ -40,7 +99,7 @@ pub struct TaskDefinition {
|
||||
}
|
||||
|
||||
impl TaskDefinition {
|
||||
pub fn to_task(&self, calendar: &Calendar) -> Task {
|
||||
pub fn to_task(&self, name: &str, calendar: &Calendar) -> Task {
|
||||
let schedule = Schedule::new(calendar.clone(), self.times.clone(), self.timezone);
|
||||
/*
|
||||
The valid_{from,to} interval must be aligned to the actual schedule.
|
||||
@@ -53,6 +112,12 @@ impl TaskDefinition {
|
||||
)
|
||||
.start;
|
||||
|
||||
let provides = if self.provides.is_empty() {
|
||||
HashSet::from([name.to_owned()])
|
||||
} else {
|
||||
self.provides.clone()
|
||||
};
|
||||
|
||||
let end = match self.valid_to {
|
||||
Some(nt) => self.timezone.from_local_datetime(&nt).unwrap(),
|
||||
None => MAX_TIME.with_timezone(&self.timezone),
|
||||
@@ -61,11 +126,12 @@ impl TaskDefinition {
|
||||
let actual_end = schedule.interval(end, 0).start;
|
||||
|
||||
Task {
|
||||
name: name.to_owned(),
|
||||
up: self.up.clone(),
|
||||
down: self.down.clone(),
|
||||
check: self.check.clone(),
|
||||
|
||||
provides: self.provides.clone(),
|
||||
provides,
|
||||
requires: self.requires.clone(),
|
||||
|
||||
schedule: schedule,
|
||||
@@ -82,6 +148,7 @@ impl TaskDefinition {
|
||||
*/
|
||||
#[derive(Clone, Serialize, Debug)]
|
||||
pub struct Task {
|
||||
pub name: String,
|
||||
pub up: TaskDetails,
|
||||
pub down: Option<TaskDetails>,
|
||||
pub check: Option<TaskDetails>,
|
||||
@@ -173,6 +240,13 @@ impl Task {
|
||||
.all(|req| req.can_be_satisfied(interval, &self.schedule, available))
|
||||
}
|
||||
|
||||
pub fn requires_resources(&self) -> HashSet<Resource> {
|
||||
self.requires.iter().fold(HashSet::new(), |mut acc, req| {
|
||||
acc.extend(req.resources());
|
||||
acc
|
||||
})
|
||||
}
|
||||
|
||||
pub fn up(&self, interval: &Interval) -> Result<HashSet<String>> {
|
||||
if self.check(interval) {
|
||||
Ok(self.provides.clone())
|
||||
@@ -250,7 +324,7 @@ mod tests {
|
||||
// Produces a std
|
||||
let cal = Calendar::new();
|
||||
|
||||
let task = task_def.to_task(&cal);
|
||||
let task = task_def.to_task("test", &cal);
|
||||
|
||||
// Assert the valid interval is correct
|
||||
assert_eq!(
|
||||
@@ -334,7 +408,7 @@ mod tests {
|
||||
let cal = Calendar::new();
|
||||
{
|
||||
let task_def: TaskDefinition = serde_json::from_str(task_json).unwrap();
|
||||
let task = task_def.to_task(&cal);
|
||||
let task = task_def.to_task("task", &cal);
|
||||
|
||||
// Assert the valid interval is correct
|
||||
assert_eq!(
|
||||
@@ -354,7 +428,7 @@ mod tests {
|
||||
task_def.valid_from = NaiveDate::from_ymd(2022, 1, 1).and_hms(9, 0, 0);
|
||||
task_def.valid_to = Some(NaiveDate::from_ymd(2022, 1, 7).and_hms(17, 0, 0));
|
||||
|
||||
let task = task_def.to_task(&cal);
|
||||
let task = task_def.to_task("task", &cal);
|
||||
|
||||
// Assert the valid interval is correct
|
||||
assert_eq!(
|
||||
|
||||
@@ -3,50 +3,94 @@ use std::convert::From;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TaskSet(HashMap<String, Task>);
|
||||
pub struct TaskSet(Vec<Task>);
|
||||
|
||||
impl TaskSet {
|
||||
pub fn new() -> Self {
|
||||
TaskSet(HashMap::new())
|
||||
TaskSet(Vec::new())
|
||||
}
|
||||
|
||||
pub fn coverage(&self) -> Result<ResourceInterval> {
|
||||
pub fn coverage(&self) -> ResourceInterval {
|
||||
self.get_state(MAX_TIME)
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
self.get_state(MAX_TIME)?;
|
||||
Ok(())
|
||||
}
|
||||
let state = self.coverage();
|
||||
|
||||
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> Result<ResourceInterval> {
|
||||
let mut res = ResourceInterval::new();
|
||||
|
||||
let timeline = IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)));
|
||||
|
||||
// Insert all of the covered items
|
||||
for task in self.values() {
|
||||
let task_timeline = task.valid_over.intersection(&timeline);
|
||||
for resource in &task.provides {
|
||||
let ris = res.entry(resource.clone()).or_insert(IntervalSet::new());
|
||||
let already_provided = ris.intersection(&task_timeline);
|
||||
if !already_provided.is_empty() {
|
||||
// Ensures that all requirements are met
|
||||
for task in &self.0 {
|
||||
for resource in task.requires_resources() {
|
||||
if !state.contains_key(&resource) {
|
||||
return Err(anyhow!(
|
||||
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
|
||||
resource,
|
||||
already_provided
|
||||
"Task {} requires resource {}, which isn't produced.",
|
||||
task.name,
|
||||
resource
|
||||
));
|
||||
}
|
||||
ris.merge(&task_timeline);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
// TODO Ensure that all resources will be produced over the valid_over interval
|
||||
|
||||
// validate that no task generates the same resource on overlapping times
|
||||
let providers: HashMap<Resource, Vec<usize>> =
|
||||
self.0
|
||||
.iter()
|
||||
.enumerate()
|
||||
.fold(HashMap::new(), |mut acc, (idx, t)| {
|
||||
for res in &t.provides {
|
||||
acc.entry(res.clone()).or_insert(Vec::new()).push(idx)
|
||||
}
|
||||
acc
|
||||
});
|
||||
for (res, tids) in providers {
|
||||
let mut is = IntervalSet::new();
|
||||
for tid in tids {
|
||||
let already_provided = is.intersection(&self.0[tid].valid_over);
|
||||
if !already_provided.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"Task set invalid: multiple tasks provide resource {} on the intervals {:?}",
|
||||
res,
|
||||
already_provided
|
||||
));
|
||||
}
|
||||
is.merge(&self.0[tid].valid_over);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_state<T: TimeZone>(&self, time: DateTime<T>) -> ResourceInterval {
|
||||
let mut res = ResourceInterval::new();
|
||||
|
||||
// Insert all of the covered items
|
||||
for task in &self.0 {
|
||||
// Need to align each of these intervals with a scheduled time
|
||||
let timeline = if time < MAX_TIME {
|
||||
let cur_intv = task.schedule.interval(time.clone(), 0);
|
||||
if cur_intv.end > time {
|
||||
IntervalSet::from(Interval::new(MIN_TIME, cur_intv.start))
|
||||
} else {
|
||||
IntervalSet::from(Interval::new(MIN_TIME, cur_intv.end))
|
||||
}
|
||||
} else {
|
||||
IntervalSet::from(Interval::new(MIN_TIME, time.with_timezone(&Utc)))
|
||||
};
|
||||
let task_timeline = task.valid_over.intersection(&timeline);
|
||||
for resource in &task.provides {
|
||||
res.entry(resource.clone())
|
||||
.or_insert(IntervalSet::new())
|
||||
.merge(&task_timeline);
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for TaskSet {
|
||||
type Target = HashMap<String, Task>;
|
||||
type Target = Vec<Task>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
@@ -58,8 +102,8 @@ impl DerefMut for TaskSet {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<HashMap<String, Task>> for TaskSet {
|
||||
fn from(data: HashMap<String, Task>) -> Self {
|
||||
impl From<Vec<Task>> for TaskSet {
|
||||
fn from(data: Vec<Task>) -> Self {
|
||||
Self(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,15 +26,10 @@ impl WorldDefinition {
|
||||
));
|
||||
}
|
||||
}
|
||||
let tasks: HashMap<String, Task> = self
|
||||
let tasks: Vec<Task> = self
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|(tn, td)| {
|
||||
(
|
||||
tn.clone(),
|
||||
td.to_task(self.calendars.get(&td.calendar_name).unwrap()),
|
||||
)
|
||||
})
|
||||
.map(|(tn, td)| td.to_task(tn, self.calendars.get(&td.calendar_name).unwrap()))
|
||||
.collect();
|
||||
let ts = TaskSet::from(tasks);
|
||||
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
/* eslint-env node */
|
||||
require('@rushstack/eslint-patch/modern-module-resolution')
|
||||
|
||||
module.exports = {
|
||||
root: true,
|
||||
'extends': [
|
||||
'plugin:vue/vue3-essential',
|
||||
'eslint:recommended',
|
||||
'@vue/eslint-config-prettier'
|
||||
],
|
||||
parserOptions: {
|
||||
ecmaVersion: 'latest'
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
# Logs
|
||||
logs
|
||||
*.log
|
||||
npm-debug.log*
|
||||
yarn-debug.log*
|
||||
yarn-error.log*
|
||||
pnpm-debug.log*
|
||||
lerna-debug.log*
|
||||
|
||||
node_modules
|
||||
.DS_Store
|
||||
dist
|
||||
dist-ssr
|
||||
coverage
|
||||
*.local
|
||||
|
||||
/cypress/videos/
|
||||
/cypress/screenshots/
|
||||
|
||||
# Editor directories and files
|
||||
.vscode/*
|
||||
!.vscode/extensions.json
|
||||
.idea
|
||||
*.suo
|
||||
*.ntvs*
|
||||
*.njsproj
|
||||
*.sln
|
||||
*.sw?
|
||||
@@ -0,0 +1 @@
|
||||
{}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"recommendations": ["Vue.volar", "Vue.vscode-typescript-vue-plugin"]
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
# waterfall
|
||||
|
||||
This template should help get you started developing with Vue 3 in Vite.
|
||||
|
||||
## Recommended IDE Setup
|
||||
|
||||
[VSCode](https://code.visualstudio.com/) + [Volar](https://marketplace.visualstudio.com/items?itemName=Vue.volar) (and disable Vetur) + [TypeScript Vue Plugin (Volar)](https://marketplace.visualstudio.com/items?itemName=Vue.vscode-typescript-vue-plugin).
|
||||
|
||||
## Customize configuration
|
||||
|
||||
See [Vite Configuration Reference](https://vitejs.dev/config/).
|
||||
|
||||
## Project Setup
|
||||
|
||||
```sh
|
||||
npm install
|
||||
```
|
||||
|
||||
### Compile and Hot-Reload for Development
|
||||
|
||||
```sh
|
||||
npm run dev
|
||||
```
|
||||
|
||||
### Compile and Minify for Production
|
||||
|
||||
```sh
|
||||
npm run build
|
||||
```
|
||||
|
||||
### Lint with [ESLint](https://eslint.org/)
|
||||
|
||||
```sh
|
||||
npm run lint
|
||||
```
|
||||
@@ -0,0 +1,15 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<link rel="icon" href="/favicon.ico" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<script src="https://unpkg.com/timelines-chart"></script>
|
||||
<link rel="stylesheet" href="/waterfall.css">
|
||||
<title>Waterfall</title>
|
||||
</head>
|
||||
<body background="#FFF">
|
||||
<div id="app"></div>
|
||||
<script type="module" src="/src/main.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"name": "waterfall",
|
||||
"version": "0.0.0",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
"build": "vite build",
|
||||
"preview": "vite preview --port 4173",
|
||||
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix --ignore-path .gitignore"
|
||||
},
|
||||
"dependencies": {
|
||||
"vue": "^3.2.38"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@rushstack/eslint-patch": "^1.1.4",
|
||||
"@vitejs/plugin-vue": "^3.0.3",
|
||||
"@vue/eslint-config-prettier": "^7.0.0",
|
||||
"eslint": "^8.22.0",
|
||||
"eslint-plugin-vue": "^9.3.0",
|
||||
"prettier": "^2.7.1",
|
||||
"vite": "^3.0.9"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
function renderDetails(segment) {
|
||||
document.getElementById("details").innerHTML = JSON.stringify(segment);
|
||||
}
|
||||
|
||||
fetch("http://localhost:2503/api/v1/details",
|
||||
{
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: '{ "start": "2021-09-01T00:00:00Z", "end": "2022-10-01T00:00:00Z" }'
|
||||
}
|
||||
)
|
||||
.then((response) => {
|
||||
if (!response.ok) {
|
||||
throw new Error('Network response was not OK');
|
||||
}
|
||||
return response.json();
|
||||
})
|
||||
.then((payload) => {
|
||||
console.log(payload);
|
||||
payload.map((group) => {
|
||||
Object.values(group.data).map((label) => {
|
||||
label.data.map((interval) => {
|
||||
interval.timeRange = interval.timeRange.map((t) => new Date(t));
|
||||
})
|
||||
})
|
||||
});
|
||||
TimelinesChart()
|
||||
(document.getElementById("timeline"))
|
||||
.timeFormat("%Y-%m-%dT%H:%M:%S.%LZ") // ISO 8601 format
|
||||
.zScaleLabel('State')
|
||||
.zQualitative(true)
|
||||
.useUtc(false)
|
||||
.onSegmentClick(renderDetails)
|
||||
.data(payload)
|
||||
}
|
||||
)
|
||||
.catch(err => { throw err });
|
||||
@@ -0,0 +1,9 @@
|
||||
<html>
|
||||
<script src="https://unpkg.com/timelines-chart"></script>
|
||||
</html>
|
||||
<body>
|
||||
<div id="timeline"></div>
|
||||
<div id="details"></div>
|
||||
<script src="app.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
|
After Width: | Height: | Size: 4.2 KiB |
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-launch"><g><path class="primary" d="M14.57 6.96a2 2 0 0 1 2.47 2.47c.29.17.5.47.5.86v7.07a1 1 0 0 1-.3.71L13 22.31a1 1 0 0 1-1.7-.7v-3.58l-.49.19a1 1 0 0 1-1.17-.37 14.1 14.1 0 0 0-3.5-3.5 1 1 0 0 1-.36-1.16l.19-.48H2.39A1 1 0 0 1 1.7 11l4.24-4.24a1 1 0 0 1 .7-.3h7.08c.39 0 .7.21.86.5zM13.19 9.4l-2.15 2.15a3 3 0 0 1 .84.57 3 3 0 0 1 .57.84l2.15-2.15A2 2 0 0 1 13.2 9.4zm6.98-6.61a1 1 0 0 1 1.04 1.04c-.03.86-.13 1.71-.3 2.55-.47-.6-1.99-.19-2.55-.74-.55-.56-.14-2.08-.74-2.55.84-.17 1.7-.27 2.55-.3z"/><path class="secondary" d="M7.23 10.26A16.05 16.05 0 0 1 17.62 3.1a19.2 19.2 0 0 1 3.29 3.29 15.94 15.94 0 0 1-7.17 10.4 19.05 19.05 0 0 0-6.51-6.52zm-.86 5.5a16.2 16.2 0 0 1 1.87 1.87 1 1 0 0 1-.47 1.6c-.79.25-1.6.42-2.4.54a1 1 0 0 1-1.14-1.13c.12-.82.3-1.62.53-2.41a1 1 0 0 1 1.6-.47zm7.34-5.47a2 2 0 1 0 2.83-2.83 2 2 0 0 0-2.83 2.83z"/></g></svg>
|
||||
|
After Width: | Height: | Size: 926 B |
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-order-vertical"><path class="secondary" d="M7 18.59V9a1 1 0 0 1 2 0v9.59l2.3-2.3a1 1 0 0 1 1.4 1.42l-4 4a1 1 0 0 1-1.4 0l-4-4a1 1 0 1 1 1.4-1.42L7 18.6z"/><path class="primary" d="M17 5.41V15a1 1 0 1 1-2 0V5.41l-2.3 2.3a1 1 0 1 1-1.4-1.42l4-4a1 1 0 0 1 1.4 0l4 4a1 1 0 0 1-1.4 1.42L17 5.4z"/></svg>
|
||||
|
After Width: | Height: | Size: 370 B |
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-search"><circle cx="10" cy="10" r="7" class="primary"/><path class="secondary" d="M16.32 14.9l1.1 1.1c.4-.02.83.13 1.14.44l3 3a1.5 1.5 0 0 1-2.12 2.12l-3-3a1.5 1.5 0 0 1-.44-1.14l-1.1-1.1a8 8 0 1 1 1.41-1.41zM10 16a6 6 0 1 0 0-12 6 6 0 0 0 0 12z"/></svg>
|
||||
|
After Width: | Height: | Size: 326 B |
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-sort-ascending"><path class="secondary" d="M18 13v7a1 1 0 0 1-2 0v-7h-3a1 1 0 0 1-.7-1.7l4-4a1 1 0 0 1 1.4 0l4 4A1 1 0 0 1 21 13h-3z"/><path class="primary" d="M3 3h13a1 1 0 0 1 0 2H3a1 1 0 1 1 0-2zm0 4h9a1 1 0 0 1 0 2H3a1 1 0 1 1 0-2zm0 4h5a1 1 0 0 1 0 2H3a1 1 0 0 1 0-2z"/></svg>
|
||||
|
After Width: | Height: | Size: 353 B |
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-sort-decending"><path class="secondary" d="M6 11V4a1 1 0 1 1 2 0v7h3a1 1 0 0 1 .7 1.7l-4 4a1 1 0 0 1-1.4 0l-4-4A1 1 0 0 1 3 11h3z"/><path class="primary" d="M21 21H8a1 1 0 0 1 0-2h13a1 1 0 0 1 0 2zm0-4h-9a1 1 0 0 1 0-2h9a1 1 0 0 1 0 2zm0-4h-5a1 1 0 0 1 0-2h5a1 1 0 0 1 0 2z"/></svg>
|
||||
|
After Width: | Height: | Size: 354 B |
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" class="icon-trash"><path class="primary" d="M5 5h14l-.89 15.12a2 2 0 0 1-2 1.88H7.9a2 2 0 0 1-2-1.88L5 5zm5 5a1 1 0 0 0-1 1v6a1 1 0 0 0 2 0v-6a1 1 0 0 0-1-1zm4 0a1 1 0 0 0-1 1v6a1 1 0 0 0 2 0v-6a1 1 0 0 0-1-1z"/><path class="secondary" d="M8.59 4l1.7-1.7A1 1 0 0 1 11 2h2a1 1 0 0 1 .7.3L15.42 4H19a1 1 0 0 1 0 2H5a1 1 0 1 1 0-2h3.59z"/></svg>
|
||||
|
After Width: | Height: | Size: 402 B |
@@ -0,0 +1,23 @@
|
||||
:root {
|
||||
--base-fontsize: 1.0rem;
|
||||
}
|
||||
|
||||
body {
|
||||
max-width: 100%;
|
||||
padding: 0px;
|
||||
margin: 0px;
|
||||
}
|
||||
|
||||
pre {
|
||||
font-size: 1.0rem;
|
||||
}
|
||||
|
||||
#app {
|
||||
flex-flow: column wrap;
|
||||
}
|
||||
|
||||
.svgicon {
|
||||
height: 1em;
|
||||
width: auto;
|
||||
padding: 2px;
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
<script>
|
||||
import Timeline from './components/Timeline.vue'
|
||||
import GlobalSettings from './components/GlobalSettings.vue'
|
||||
import SegmentDetails from './components/SegmentDetails.vue'
|
||||
|
||||
export default {
|
||||
data() {
|
||||
return {
|
||||
refreshSeconds: 1, // How often to refresh
|
||||
waterfallURL: 'http://localhost:2503',
|
||||
activeSegment: null,
|
||||
maxDisplayIntervals: 500,
|
||||
}
|
||||
},
|
||||
|
||||
methods: {
|
||||
updateURL(url) {
|
||||
this.waterfallURL = url;
|
||||
},
|
||||
updateRefreshInterval(interval) {
|
||||
this.refreshSeconds = interval;
|
||||
},
|
||||
updateMaxDisplayIntervals(cnt) {
|
||||
this.maxDisplayIntervals = cnt;
|
||||
},
|
||||
|
||||
setActiveSegment(segment) {
|
||||
this.activeSegment = segment;
|
||||
},
|
||||
},
|
||||
|
||||
components: {
|
||||
GlobalSettings,
|
||||
Timeline,
|
||||
SegmentDetails,
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style>
|
||||
select { max-width: 25%; }
|
||||
input { max-width: 25%; }
|
||||
</style>
|
||||
|
||||
<template>
|
||||
<GlobalSettings
|
||||
:waterfallURL="waterfallURL"
|
||||
:refreshSeconds="refreshSeconds"
|
||||
:maxDisplayIntervals="maxDisplayIntervals"
|
||||
@update-refresh-interval="(interval) => this.updateRefreshInterval(interval)"
|
||||
@update-waterfall-url="(url) => this.updateURL(url)"
|
||||
@update-max-display-intervals="(cnt) => this.updateMaxDisplayIntervals(cnt)"
|
||||
/>
|
||||
<br/>
|
||||
<div>
|
||||
<Timeline
|
||||
:waterfallURL="waterfallURL"
|
||||
:refreshSeconds="refreshSeconds"
|
||||
:maxDisplayIntervals="maxDisplayIntervals"
|
||||
@update-active-segment="(segment) => this.setActiveSegment(segment)"
|
||||
/>
|
||||
</div>
|
||||
<br/>
|
||||
<SegmentDetails
|
||||
v-if="this.activeSegment !== null"
|
||||
:activeSegment="activeSegment"
|
||||
/>
|
||||
</template>
|
||||
@@ -0,0 +1,74 @@
|
||||
/* color palette from <https://github.com/vuejs/theme> */
|
||||
:root {
|
||||
--vt-c-white: #ffffff;
|
||||
--vt-c-white-soft: #f8f8f8;
|
||||
--vt-c-white-mute: #f2f2f2;
|
||||
|
||||
--vt-c-black: #181818;
|
||||
--vt-c-black-soft: #222222;
|
||||
--vt-c-black-mute: #282828;
|
||||
|
||||
--vt-c-indigo: #2c3e50;
|
||||
|
||||
--vt-c-divider-light-1: rgba(60, 60, 60, 0.29);
|
||||
--vt-c-divider-light-2: rgba(60, 60, 60, 0.12);
|
||||
--vt-c-divider-dark-1: rgba(84, 84, 84, 0.65);
|
||||
--vt-c-divider-dark-2: rgba(84, 84, 84, 0.48);
|
||||
|
||||
--vt-c-text-light-1: var(--vt-c-indigo);
|
||||
--vt-c-text-light-2: rgba(60, 60, 60, 0.66);
|
||||
--vt-c-text-dark-1: var(--vt-c-white);
|
||||
--vt-c-text-dark-2: rgba(235, 235, 235, 0.64);
|
||||
}
|
||||
|
||||
/* semantic color variables for this project */
|
||||
:root {
|
||||
--color-background: var(--vt-c-white);
|
||||
--color-background-soft: var(--vt-c-white-soft);
|
||||
--color-background-mute: var(--vt-c-white-mute);
|
||||
|
||||
--color-border: var(--vt-c-divider-light-2);
|
||||
--color-border-hover: var(--vt-c-divider-light-1);
|
||||
|
||||
--color-heading: var(--vt-c-text-light-1);
|
||||
--color-text: var(--vt-c-text-light-1);
|
||||
|
||||
--section-gap: 160px;
|
||||
}
|
||||
|
||||
@media (prefers-color-scheme: dark) {
|
||||
:root {
|
||||
--color-background: var(--vt-c-black);
|
||||
--color-background-soft: var(--vt-c-black-soft);
|
||||
--color-background-mute: var(--vt-c-black-mute);
|
||||
|
||||
--color-border: var(--vt-c-divider-dark-2);
|
||||
--color-border-hover: var(--vt-c-divider-dark-1);
|
||||
|
||||
--color-heading: var(--vt-c-text-dark-1);
|
||||
--color-text: var(--vt-c-text-dark-2);
|
||||
}
|
||||
}
|
||||
|
||||
*,
|
||||
*::before,
|
||||
*::after {
|
||||
box-sizing: border-box;
|
||||
margin: 0;
|
||||
position: relative;
|
||||
font-weight: normal;
|
||||
}
|
||||
|
||||
body {
|
||||
min-height: 100vh;
|
||||
color: var(--color-text);
|
||||
background: var(--color-background);
|
||||
transition: color 0.5s, background-color 0.5s;
|
||||
line-height: 1.6;
|
||||
font-family: Inter, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu,
|
||||
Cantarell, 'Fira Sans', 'Droid Sans', 'Helvetica Neue', sans-serif;
|
||||
font-size: 15px;
|
||||
text-rendering: optimizeLegibility;
|
||||
-webkit-font-smoothing: antialiased;
|
||||
-moz-osx-font-smoothing: grayscale;
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 261.76 226.69" xmlns:v="https://vecta.io/nano"><path d="M161.096.001l-30.225 52.351L100.647.001H-.005l130.877 226.688L261.749.001z" fill="#41b883"/><path d="M161.096.001l-30.225 52.351L100.647.001H52.346l78.526 136.01L209.398.001z" fill="#34495e"/></svg>
|
||||
|
After Width: | Height: | Size: 308 B |
@@ -0,0 +1,35 @@
|
||||
@import "./base.css";
|
||||
|
||||
#app {
|
||||
max-width: 1280px;
|
||||
margin: 0 auto;
|
||||
padding: 2rem;
|
||||
|
||||
font-weight: normal;
|
||||
}
|
||||
|
||||
a,
|
||||
.green {
|
||||
text-decoration: none;
|
||||
color: hsla(160, 100%, 37%, 1);
|
||||
transition: 0.4s;
|
||||
}
|
||||
|
||||
@media (hover: hover) {
|
||||
a:hover {
|
||||
background-color: hsla(160, 100%, 37%, 0.2);
|
||||
}
|
||||
}
|
||||
|
||||
@media (min-width: 1024px) {
|
||||
body {
|
||||
display: flex;
|
||||
place-items: center;
|
||||
}
|
||||
|
||||
#app {
|
||||
display: grid;
|
||||
grid-template-columns: 1fr 1fr;
|
||||
padding: 0 2rem;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
<script>
|
||||
export default {
|
||||
props: ['refreshSeconds', 'waterfallURL', 'maxDisplayIntervals'],
|
||||
data() {
|
||||
return {
|
||||
interval: this.refreshSeconds,
|
||||
url: this.waterfallURL,
|
||||
max_display_intervals: this.maxDisplayIntervals,
|
||||
};
|
||||
},
|
||||
emits: ['update-refresh-interval', 'update-waterfall-url', 'update-max-intervals'],
|
||||
computed: {
|
||||
validRefreshIntervals() {
|
||||
return [1, 5, 10, 15, 30, 60, 300, 600];
|
||||
},
|
||||
validDisplayIntervals() {
|
||||
return [0, 100, 250, 500, 1000, 1500];
|
||||
},
|
||||
isSelected(interval) {
|
||||
return (interval === this.refreshSeconds ? 'selected' : 'unselected');
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<details>
|
||||
<summary>Global Settings</summary>
|
||||
<label>
|
||||
Waterfall Base URL
|
||||
<input @change="$emit('update-waterfall-url', url)" v-model="url"/>
|
||||
</label>
|
||||
<label>
|
||||
Refresh Interval (seconds)
|
||||
<select @change="$emit('update-refresh-interval', interval)" v-model="interval">
|
||||
<option v-for="interval in validRefreshIntervals"
|
||||
:key="interval"
|
||||
:value="interval"
|
||||
>
|
||||
{{ interval }} Seconds
|
||||
</option>
|
||||
</select>
|
||||
</label>
|
||||
<label>
|
||||
Max Display Intervals
|
||||
<select @change="$emit('update-max-display-intervals', max_display_intervals)" v-model="max_display_intervals">
|
||||
<option v-for="cnt in validDisplayIntervals"
|
||||
:key="cnt"
|
||||
:value="cnt"
|
||||
>
|
||||
{{ cnt }} Segments
|
||||
</option>
|
||||
</select>
|
||||
</label>
|
||||
|
||||
</details>
|
||||
</template>
|
||||
@@ -0,0 +1,35 @@
|
||||
<script>
|
||||
export default {
|
||||
props: ['activeSegment']
|
||||
};
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<div id="segment-details" v-if="activeSegment !== null">
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Resource</th>
|
||||
<th>Task Name</th>
|
||||
<th>Interval</th>
|
||||
<th>State</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>{{activeSegment.group}}</td>
|
||||
<td>{{activeSegment.label}}</td>
|
||||
<td>{{activeSegment.timeRange}}</td>
|
||||
<td>{{activeSegment.val}}</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<style>
|
||||
.svgicon {
|
||||
height: 1em;
|
||||
width: auto;
|
||||
}
|
||||
</style>
|
||||
@@ -0,0 +1,117 @@
|
||||
<script>
|
||||
// import TimelinesChart from 'timelines-chart';
|
||||
|
||||
function lexsort(a, b, keyfunc) {
|
||||
const ka = keyfunc(a);
|
||||
const kb = keyfunc(b);
|
||||
|
||||
|
||||
let ret = 0;
|
||||
if (ka < kb) { ret = -1; }
|
||||
if (ka > kb) { ret = 1; }
|
||||
return ret;
|
||||
}
|
||||
|
||||
const MIN_TIME="1970-01-01T00:00:00Z";
|
||||
const MAX_TIME="2099-01-01T00:00:00Z";
|
||||
|
||||
export default {
|
||||
props: ['waterfallURL', 'refreshSeconds', 'maxDisplayIntervals'],
|
||||
data() {
|
||||
return {
|
||||
chart: null,
|
||||
data: {},
|
||||
start: MIN_TIME,
|
||||
end: MAX_TIME,
|
||||
}
|
||||
},
|
||||
|
||||
watch: {
|
||||
refreshSeconds() {
|
||||
this.fetchTimeline();
|
||||
},
|
||||
waterfallURL() {
|
||||
this.fetchTimeline();
|
||||
},
|
||||
maxDisplayIntervals() {
|
||||
this.fetchTimeline();
|
||||
},
|
||||
|
||||
},
|
||||
|
||||
methods: {
|
||||
async fetchTimeline() {
|
||||
fetch(`${this.waterfallURL}/api/v1/details?max_intervals=${this.maxDisplayIntervals}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: `{ "start": "${this.start}", "end": "${this.end}" }`
|
||||
})
|
||||
.then((response) => {
|
||||
if (!response.ok) {
|
||||
throw new Error('Network response was not OK');
|
||||
}
|
||||
return response.json();
|
||||
})
|
||||
.then((payload) => {
|
||||
payload.map((group) => {
|
||||
group.data.sort((a, b) => lexsort(a, b, (v) => v.label));
|
||||
Object.values(group.data).map((label) => {
|
||||
label.data.map((interval) => {
|
||||
interval.timeRange = interval.timeRange.map((t) => new Date(t));
|
||||
})
|
||||
})
|
||||
});
|
||||
payload.sort((a, b) => lexsort(a, b, (v) => v.group));
|
||||
|
||||
this.data = payload;
|
||||
this.chart.data(payload);
|
||||
this.chart.reresh();
|
||||
})
|
||||
.catch(err => { throw err });
|
||||
},
|
||||
|
||||
update() {
|
||||
this.fetchTimeline();
|
||||
setTimeout(() => {
|
||||
this.update();
|
||||
}, this.refreshSeconds * 1000);
|
||||
},
|
||||
|
||||
setVisibleRange(dateRange) {
|
||||
if (dateRange === null) {
|
||||
this.start = MIN_TIME;
|
||||
this.end = MAX_TIME;
|
||||
} else {
|
||||
this.start = dateRange[0].toISOString();
|
||||
this.end = dateRange[1].toISOString();
|
||||
}
|
||||
this.fetchTimeline();
|
||||
}
|
||||
},
|
||||
|
||||
mounted() {
|
||||
this.chart = TimelinesChart()(document.getElementById("timeline-graph"))
|
||||
.timeFormat("%Y-%m-%dT%H:%M:%S.%LZ") // ISO 8601 format
|
||||
.zScaleLabel('State')
|
||||
.zQualitative(true)
|
||||
.useUtc(false)
|
||||
.onZoom((dateRange, _) => this.setVisibleRange(dateRange))
|
||||
.onSegmentClick((segment) => this.$emit('updateActiveSegment', segment) );
|
||||
this.update();
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<div id="timeline-graph"></div>
|
||||
</template>
|
||||
|
||||
<style>
|
||||
.svgicon {
|
||||
height: 1em;
|
||||
width: auto;
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
import { reactive } from 'vue';
|
||||
|
||||
export const ALL_STATES = [
|
||||
{ name: 'QUEUED', display: 'Queued' },
|
||||
{ name: 'RUNNING', display: 'Running' },
|
||||
{ name: 'ERRORED', display: 'Errored' },
|
||||
{ name: 'COMPLETED', display: 'Completed' },
|
||||
{ name: 'KILLED', display: 'Killed' },
|
||||
];
|
||||
|
||||
export const defaultCountHandler = {
|
||||
get(target, name) {
|
||||
return name in target ? target[name] : 0;
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
import { createApp } from 'vue'
|
||||
import App from './App.vue'
|
||||
|
||||
import './assets/main.css'
|
||||
|
||||
createApp(App).mount('#app')
|
||||
@@ -0,0 +1,14 @@
|
||||
import { fileURLToPath, URL } from 'node:url'
|
||||
|
||||
import { defineConfig } from 'vite'
|
||||
import vue from '@vitejs/plugin-vue'
|
||||
|
||||
// https://vitejs.dev/config/
|
||||
export default defineConfig({
|
||||
plugins: [vue()],
|
||||
resolve: {
|
||||
alias: {
|
||||
'@': fileURLToPath(new URL('./src', import.meta.url))
|
||||
}
|
||||
}
|
||||
})
|
||||