diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 215 |
1 files changed, 123 insertions, 92 deletions
diff --git a/src/main.rs b/src/main.rs index 61fa02a..9821b2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,71 +9,37 @@ extern crate rustc_serialize; #[macro_use] extern crate sota; extern crate time; -use chan::{Sender, Receiver}; +use chan::{Sender, Receiver, WaitGroup}; use chan_signal::Signal; use env_logger::LogBuilder; use getopts::Options; use log::{LogLevelFilter, LogRecord}; -use std::env; +use std::{env, process, thread}; use std::collections::HashMap; use std::path::Path; use std::sync::{Arc, Mutex}; use std::time::Duration; -use sota::datatype::{Command, Config, Event, SystemInfo}; +use sota::datatype::{Command, Config, Event}; use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket}; use sota::broadcast::Broadcast; use sota::http::{AuthClient, set_ca_certificates}; use sota::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, GlobalInterpreter}; -use sota::package_manager::PackageManager; use sota::rvi::{Edge, Services}; macro_rules! exit { - ($fmt:expr, $($arg:tt)*) => {{ + ($code:expr, $fmt:expr, $($arg:tt)*) => {{ print!(concat!($fmt, "\n"), $($arg)*); - std::process::exit(1); + process::exit($code); }} } -fn start_signal_handler(signals: Receiver<Signal>) { - loop { - match signals.recv() { - Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0), - _ => () - } - } -} - -fn start_update_poller(interval: u64, itx: Sender<Interpret>) { - let (etx, erx) = chan::async::<Event>(); - let tick = chan::tick(Duration::from_secs(interval)); - loop { - let _ = tick.recv(); - itx.send(Interpret { - command: Command::GetNewUpdates, - response_tx: Some(Arc::new(Mutex::new(etx.clone()))) - }); - let _ = erx.recv(); - } -} - -fn send_startup_commands(config: &Config, ctx: &Sender<Command>) { - ctx.send(Command::Authenticate(None)); - ctx.send(Command::RefreshSystemInfo(true)); - - if config.device.package_manager != PackageManager::Off { - config.device.package_manager.installed_packages().map(|packages| { - ctx.send(Command::SendInstalledPackages(packages)); - }).unwrap_or_else(|err| exit!("Couldn't get list of packages: {}", err)); - } -} - fn main() { - setup_logging(); + let version = start_logging(); + let config = build_config(&version); - let config = build_config(); set_ca_certificates(Path::new(&config.device.certificates_path)); let (etx, erx) = chan::async::<Event>(); @@ -81,16 +47,25 @@ fn main() { let (itx, irx) = chan::async::<Interpret>(); let mut broadcast = Broadcast::new(erx); - send_startup_commands(&config, &ctx); + let wg = WaitGroup::new(); + + ctx.send(Command::Authenticate(None)); crossbeam::scope(|scope| { - // Must subscribe to the signal before spawning ANY other threads + // subscribe to signals first let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]); scope.spawn(move || start_signal_handler(signals)); - let poll_tick = config.device.polling_interval; - let poll_itx = itx.clone(); - scope.spawn(move || start_update_poller(poll_tick, poll_itx)); + if config.core.polling { + let poll_tick = config.core.polling_sec; + let poll_itx = itx.clone(); + let poll_wg = wg.clone(); + scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg)); + } + + // + // start gateways + // if config.gateway.console { let cons_itx = itx.clone(); @@ -99,7 +74,7 @@ fn main() { } if config.gateway.dbus { - let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway")); + let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for dbus gateway")); let dbus_itx = itx.clone(); let dbus_sub = broadcast.subscribe(); let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() }; @@ -109,20 +84,21 @@ fn main() { if config.gateway.http { let http_itx = itx.clone(); let http_sub = broadcast.subscribe(); - let mut http = Http { server: config.network.http_server.clone() }; + let mut http = Http { server: *config.network.http_server }; scope.spawn(move || http.start(http_itx, http_sub)); } - let mut rvi = None; - if config.gateway.rvi { - let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway")); - let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway")); + let rvi_services = if config.gateway.rvi { + let _ = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for rvi gateway")); + let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!(1, "{}", "rvi config required for rvi gateway")); let rvi_edge = config.network.rvi_edge_server.clone(); let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone()); let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone()); scope.spawn(move || edge.start()); - rvi = Some(services); - } + Some(services) + } else { + None + }; if config.gateway.socket { let socket_itx = itx.clone(); @@ -142,48 +118,83 @@ fn main() { scope.spawn(move || ws.start(ws_itx, ws_sub)); } + // + // start interpreters + // + let event_sub = broadcast.subscribe(); let event_ctx = ctx.clone(); let event_mgr = config.device.package_manager.clone(); + let event_sys = config.device.system_info.clone(); + let event_wg = wg.clone(); scope.spawn(move || EventInterpreter { - package_manager: event_mgr - }.run(event_sub, event_ctx)); + pacman: event_mgr, + sysinfo: event_sys, + }.run(event_sub, event_ctx, event_wg)); let cmd_itx = itx.clone(); - scope.spawn(move || CommandInterpreter.run(crx, cmd_itx)); + let cmd_wg = wg.clone(); + scope.spawn(move || CommandInterpreter.run(crx, cmd_itx, cmd_wg)); scope.spawn(move || GlobalInterpreter { config: config, token: None, http_client: Box::new(AuthClient::default()), - rvi: rvi, - loopback_tx: itx, - }.run(irx, etx)); + rvi: rvi_services, + }.run(irx, etx, wg)); scope.spawn(move || broadcast.start()); }); } -fn setup_logging() { - let version = option_env!("SOTA_VERSION").unwrap_or("?"); +fn start_logging() -> String { + let version = option_env!("SOTA_VERSION").unwrap_or("unknown"); + let mut builder = LogBuilder::new(); builder.format(move |record: &LogRecord| { let timestamp = format!("{}", time::now_utc().rfc3339()); format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args()) }); builder.filter(Some("hyper"), LogLevelFilter::Info); + builder.parse(&env::var("RUST_LOG").unwrap_or("INFO".to_string())); + builder.init().expect("builder already initialized"); + + version.to_string() +} - let _ = env::var("RUST_LOG").map(|level| builder.parse(&level)); - builder.init().expect("env_logger::init() called twice, blame the programmers."); +fn start_signal_handler(signals: Receiver<Signal>) { + loop { + match signals.recv() { + Some(Signal::INT) | Some(Signal::TERM) => process::exit(0), + _ => () + } + } } -fn build_config() -> Config { +fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) { + info!("Polling for new updates every {} seconds.", interval); + let (etx, erx) = chan::async::<Event>(); + let wait = Duration::from_secs(interval); + loop { + wg.wait(); // wait until not busy + thread::sleep(wait); // then wait `interval` seconds + itx.send(Interpret { + command: Command::GetUpdateRequests, + response_tx: Some(Arc::new(Mutex::new(etx.clone()))) + }); // then request new updates + let _ = erx.recv(); // then wait for the response + } +} + +fn build_config(version: &str) -> Config { let args = env::args().collect::<Vec<String>>(); let program = args[0].clone(); let mut opts = Options::new(); - opts.optflag("h", "help", "print this help menu"); - opts.optopt("", "config", "change config path", "PATH"); + opts.optflag("h", "help", "print this help menu then quit"); + opts.optflag("p", "print", "print the parsed config then quit"); + opts.optflag("v", "version", "print the version then quit"); + opts.optopt("c", "config", "change config path", "PATH"); opts.optopt("", "auth-server", "change the auth server", "URL"); opts.optopt("", "auth-client-id", "change the auth client id", "ID"); @@ -191,6 +202,8 @@ fn build_config() -> Config { opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH"); opts.optopt("", "core-server", "change the core server", "URL"); + opts.optopt("", "core-polling", "toggle polling the core server for updates", "BOOL"); + opts.optopt("", "core-polling-sec", "change the core polling interval", "SECONDS"); opts.optopt("", "dbus-name", "change the dbus registration name", "NAME"); opts.optopt("", "dbus-path", "change the dbus path", "PATH"); @@ -203,7 +216,6 @@ fn build_config() -> Config { opts.optopt("", "device-vin", "change the device vin", "VIN"); opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH"); opts.optopt("", "device-package-manager", "change the package manager", "MANAGER"); - opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL"); opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH"); opts.optopt("", "device-system-info", "change the system information command", "PATH"); @@ -225,25 +237,37 @@ fn build_config() -> Config { opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT"); let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string())); - if matches.opt_present("h") { - exit!("{}", opts.usage(&format!("Usage: {} [options]", program))); + + if matches.opt_present("help") { + exit!(0, "{}", opts.usage(&format!("Usage: {} [options]", program))); + } else if matches.opt_present("version") { + exit!(0, "{}", version); } - let config_file = matches.opt_str("config").unwrap_or_else(|| { - env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided.")) - }); - let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err)); + let mut config = match matches.opt_str("config").or(env::var("SOTA_CONFIG").ok()) { + Some(file) => Config::load(&file).unwrap_or_else(|err| exit!(1, "{}", err)), + None => { + warn!("No config file given. Falling back to defaults."); + Config::default() + } + }; config.auth.as_mut().map(|auth_cfg| { matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id); matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret); matches.opt_str("auth-server").map(|text| { - auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err)); + auth_cfg.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid auth-server URL: {}", err)); }); }); matches.opt_str("core-server").map(|text| { - config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err)); + config.core.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid core-server URL: {}", err)); + }); + matches.opt_str("core-polling").map(|polling| { + config.core.polling = polling.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling boolean: {}", err)); + }); + matches.opt_str("core-polling-sec").map(|secs| { + config.core.polling_sec = secs.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling-sec: {}", err)); }); config.dbus.as_mut().map(|dbus_cfg| { @@ -253,7 +277,7 @@ fn build_config() -> Config { matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr); matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path); matches.opt_str("dbus-timeout").map(|timeout| { - dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err)); + dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid dbus-timeout: {}", err)); }); }); @@ -261,48 +285,55 @@ fn build_config() -> Config { matches.opt_str("device-vin").map(|vin| config.device.vin = vin); matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path); matches.opt_str("device-package-manager").map(|text| { - config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err)); - }); - matches.opt_str("device-polling-interval").map(|interval| { - config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err)); + config.device.package_manager = text.parse().unwrap_or_else(|err| exit!(1, "Invalid device-package-manager: {}", err)); }); matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs); - matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(cmd)); + matches.opt_str("device-system-info").map(|cmd| { + config.device.system_info = if cmd.len() > 0 { Some(cmd) } else { None } + }); matches.opt_str("gateway-console").map(|console| { - config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err)); + config.gateway.console = console.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-console boolean: {}", err)); }); matches.opt_str("gateway-dbus").map(|dbus| { - config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err)); + config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-dbus boolean: {}", err)); }); matches.opt_str("gateway-http").map(|http| { - config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err)); + config.gateway.http = http.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-http boolean: {}", err)); }); matches.opt_str("gateway-rvi").map(|rvi| { - config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err)); + config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-rvi boolean: {}", err)); }); matches.opt_str("gateway-socket").map(|socket| { - config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err)); + config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-socket boolean: {}", err)); }); matches.opt_str("gateway-websocket").map(|websocket| { - config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err)); + config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-websocket boolean: {}", err)); }); - matches.opt_str("network-http-server").map(|server| config.network.http_server = server); - matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server); + matches.opt_str("network-http-server").map(|addr| { + config.network.http_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-http-server: {}", err)); + }); + matches.opt_str("network-rvi-edge-server").map(|addr| { + config.network.rvi_edge_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-rvi-edge-server: {}", err)); + }); matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path); matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path); matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server); config.rvi.as_mut().map(|rvi_cfg| { matches.opt_str("rvi-client").map(|url| { - rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err)); + rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-client URL: {}", err)); }); matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir); matches.opt_str("rvi-timeout").map(|timeout| { - rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err))); + rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-timeout: {}", err))); }); }); + if matches.opt_present("print") { + exit!(0, "{:#?}", config); + } + config } |