summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/datatype/config.rs441
-rw-r--r--src/datatype/error.rs2
-rw-r--r--src/datatype/mod.rs8
-rw-r--r--src/datatype/network.rs (renamed from src/datatype/url.rs)59
-rw-r--r--src/datatype/shell.rs12
-rw-r--r--src/datatype/system_info.rs41
-rw-r--r--src/gateway/http.rs15
-rw-r--r--src/http/auth_client.rs2
-rw-r--r--src/interpreter.rs19
-rw-r--r--src/main.rs185
-rw-r--r--src/rvi/edge.rs16
11 files changed, 562 insertions, 238 deletions
diff --git a/src/datatype/config.rs b/src/datatype/config.rs
index c9905f5..ad80c42 100644
--- a/src/datatype/config.rs
+++ b/src/datatype/config.rs
@@ -6,13 +6,13 @@ use std::os::unix::fs::PermissionsExt;
use std::io::prelude::*;
use std::path::Path;
use toml;
-use toml::{Decoder, Parser, Table, Value};
+use toml::{Decoder, Parser, Table};
-use datatype::{Error, SystemInfo, Url};
+use datatype::{Error, SocketAddr, Url};
use package_manager::PackageManager;
-/// An aggregation of all the configuration options parsed at startup.
+/// A container for all parsed configs.
#[derive(Default, PartialEq, Eq, Debug, Clone)]
pub struct Config {
pub auth: Option<AuthConfig>,
@@ -25,6 +25,8 @@ pub struct Config {
}
impl Config {
+ /// Read in a toml configuration file using default values for missing
+ /// sections or fields.
pub fn load(path: &str) -> Result<Config, Error> {
info!("Loading config file: {}", path);
let mut file = try!(File::open(path).map_err(Error::Io));
@@ -33,36 +35,34 @@ impl Config {
Config::parse(&toml)
}
+ /// Parse a toml configuration string using default values for missing
+ /// sections or fields while retaining backwards compatibility.
pub fn parse(toml: &str) -> Result<Config, Error> {
let table = try!(parse_table(&toml));
- let auth_cfg = if let Some(auth) = table.get("auth") {
- let parsed = try!(decode_section(auth.clone()));
- Some(try!(bootstrap_credentials(parsed)))
- } else {
- None
- };
-
- let dbus_cfg = if let Some(dbus) = table.get("dbus") {
- Some(try!(decode_section(dbus.clone())))
- } else {
- None
- };
-
- let rvi_cfg = if let Some(rvi) = table.get("rvi") {
- Some(try!(decode_section(rvi.clone())))
- } else {
- None
- };
+ let mut auth: Option<ParsedAuthConfig> = try!(maybe_parse_section(&table, "auth"));
+ let mut core: ParsedCoreConfig = try!(parse_section(&table, "core"));
+ let mut dbus: Option<ParsedDBusConfig> = try!(maybe_parse_section(&table, "dbus"));
+ let mut device: ParsedDeviceConfig = try!(parse_section(&table, "device"));
+ let mut gateway: ParsedGatewayConfig = try!(parse_section(&table, "gateway"));
+ let mut network: ParsedNetworkConfig = try!(parse_section(&table, "network"));
+ let mut rvi: Option<ParsedRviConfig> = try!(maybe_parse_section(&table, "rvi"));
+
+ if let Some(cfg) = auth {
+ auth = Some(try!(bootstrap_credentials(cfg)));
+ }
+
+ try!(apply_transformations(&mut auth, &mut core, &mut dbus, &mut device,
+ &mut gateway, &mut network, &mut rvi));
Ok(Config {
- auth: auth_cfg,
- core: try!(read_section(&table, "core")),
- dbus: dbus_cfg,
- device: try!(read_section(&table, "device")),
- gateway: try!(read_section(&table, "gateway")),
- network: try!(read_section(&table, "network")),
- rvi: rvi_cfg,
+ auth: auth.map(|mut cfg| cfg.defaultify()),
+ core: core.defaultify(),
+ dbus: dbus.map(|mut cfg| cfg.defaultify()),
+ device: device.defaultify(),
+ gateway: gateway.defaultify(),
+ network: network.defaultify(),
+ rvi: rvi.map(|mut cfg| cfg.defaultify())
})
}
}
@@ -72,15 +72,15 @@ fn parse_table(toml: &str) -> Result<Table, Error> {
Ok(try!(parser.parse().ok_or_else(move || parser.errors)))
}
-fn read_section<T: Decodable>(table: &Table, section: &str) -> Result<T, Error> {
- let part = try!(table.get(section)
- .ok_or_else(|| Error::Parse(format!("invalid section: {}", section))));
- decode_section(part.clone())
+fn parse_section<T: Decodable + Default>(table: &Table, section: &str) -> Result<T, Error> {
+ Ok(try!(maybe_parse_section(table, section)).unwrap_or(T::default()))
}
-fn decode_section<T: Decodable>(section: Value) -> Result<T, Error> {
- let mut decoder = Decoder::new(section);
- Ok(try!(T::decode(&mut decoder)))
+fn maybe_parse_section<T: Decodable>(table: &Table, section: &str) -> Result<Option<T>, Error> {
+ table.get(section).map_or(Ok(None), |sect| {
+ let mut decoder = Decoder::new(sect.clone());
+ Ok(Some(try!(T::decode(&mut decoder))))
+ })
}
@@ -92,8 +92,8 @@ struct CredentialsFile {
// Read AuthConfig values from the credentials file if it exists, or write the
// current AuthConfig values to a new credentials file otherwise.
-fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
- let creds = auth_cfg.credentials_file.clone();
+fn bootstrap_credentials(auth: ParsedAuthConfig) -> Result<ParsedAuthConfig, Error> {
+ let creds = auth.credentials_file.expect("couldn't get credentials_file");
let path = Path::new(&creds);
debug!("bootstrap_credentials: {:?}", path);
@@ -102,14 +102,16 @@ fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
let mut text = String::new();
try!(file.read_to_string(&mut text));
let table = try!(parse_table(&text));
- try!(read_section::<CredentialsFile>(&table, "auth"))
+ let auth = try!(table.get("auth").ok_or(Error::Config("no [auth] section".to_string())));
+ let mut decoder = Decoder::new(auth.clone());
+ try!(CredentialsFile::decode(&mut decoder))
}
Err(ref err) if err.kind() == ErrorKind::NotFound => {
let mut table = Table::new();
let credentials = CredentialsFile {
- client_id: auth_cfg.client_id,
- client_secret: auth_cfg.client_secret
+ client_id: auth.client_id.expect("expected client_id"),
+ client_secret: auth.client_secret.expect("expected client_secret")
};
table.insert("auth".to_string(), toml::encode(&credentials));
@@ -127,16 +129,52 @@ fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
Err(err) => return Err(Error::Io(err))
};
- Ok(AuthConfig {
- server: auth_cfg.server,
- client_id: credentials.client_id,
- client_secret: credentials.client_secret,
- credentials_file: auth_cfg.credentials_file,
+ Ok(ParsedAuthConfig {
+ server: auth.server,
+ client_id: Some(credentials.client_id),
+ client_secret: Some(credentials.client_secret),
+ credentials_file: Some(creds.clone()),
})
}
-/// A parsed representation of the [auth] configuration section.
+// Apply transformations from old to new config fields for backwards compatibility.
+fn apply_transformations(_: &mut Option<ParsedAuthConfig>,
+ core: &mut ParsedCoreConfig,
+ _: &mut Option<ParsedDBusConfig>,
+ device: &mut ParsedDeviceConfig,
+ _: &mut ParsedGatewayConfig,
+ _: &mut ParsedNetworkConfig,
+ _: &mut Option<ParsedRviConfig>) -> Result<(), Error> {
+
+ match (device.polling_interval, core.polling_sec) {
+ (Some(_), Some(_)) => {
+ return Err(Error::Config("core.polling_sec and device.polling_interval both set".to_string()))
+ }
+
+ (Some(interval), None) => {
+ if interval > 0 {
+ core.polling = Some(true);
+ core.polling_sec = Some(interval);
+ } else {
+ core.polling = Some(false);
+ }
+ }
+
+ _ => ()
+ }
+
+ Ok(())
+}
+
+
+/// Trait used to overwrite any `None` fields in a config with its default value.
+trait Defaultify<T: Default> {
+ fn defaultify(&mut self) -> T;
+}
+
+
+/// The [auth] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct AuthConfig {
pub server: Url,
@@ -146,7 +184,7 @@ pub struct AuthConfig {
}
impl Default for AuthConfig {
- fn default() -> AuthConfig {
+ fn default() -> Self {
AuthConfig {
server: "http://127.0.0.1:9001".parse().unwrap(),
client_id: "client-id".to_string(),
@@ -156,23 +194,86 @@ impl Default for AuthConfig {
}
}
+#[derive(RustcDecodable)]
+struct ParsedAuthConfig {
+ server: Option<Url>,
+ client_id: Option<String>,
+ client_secret: Option<String>,
+ credentials_file: Option<String>,
+}
+
+impl Default for ParsedAuthConfig {
+ fn default() -> Self {
+ ParsedAuthConfig {
+ server: None,
+ client_id: None,
+ client_secret: None,
+ credentials_file: None
+ }
+ }
+}
+
+impl Defaultify<AuthConfig> for ParsedAuthConfig {
+ fn defaultify(&mut self) -> AuthConfig {
+ let default = AuthConfig::default();
+ AuthConfig {
+ server: self.server.take().unwrap_or(default.server),
+ client_id: self.client_id.take().unwrap_or(default.client_id),
+ client_secret: self.client_secret.take().unwrap_or(default.client_secret),
+ credentials_file: self.credentials_file.take().unwrap_or(default.credentials_file)
+ }
+ }
+}
+
-/// A parsed representation of the [core] configuration section.
+/// The [core] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct CoreConfig {
- pub server: Url
+ pub server: Url,
+ pub polling: bool,
+ pub polling_sec: u64
}
impl Default for CoreConfig {
fn default() -> CoreConfig {
CoreConfig {
- server: "http://127.0.0.1:8080".parse().unwrap()
+ server: "http://127.0.0.1:8080".parse().unwrap(),
+ polling: true,
+ polling_sec: 10
}
}
}
+#[derive(RustcDecodable)]
+struct ParsedCoreConfig {
+ server: Option<Url>,
+ polling: Option<bool>,
+ polling_sec: Option<u64>
+}
+
+impl Default for ParsedCoreConfig {
+ fn default() -> Self {
+ ParsedCoreConfig {
+ server: None,
+ polling: None,
+ polling_sec: None
+ }
+ }
+}
+
+impl Defaultify<CoreConfig> for ParsedCoreConfig {
+ fn defaultify(&mut self) -> CoreConfig {
+ let default = CoreConfig::default();
+ CoreConfig {
+ server: self.server.take().unwrap_or(default.server),
+ polling: self.polling.take().unwrap_or(default.polling),
+ polling_sec: self.polling_sec.take().unwrap_or(default.polling_sec)
+ }
+ }
+}
-/// A parsed representation of the [dbus] configuration section.
+
+/// The [dbus] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct DBusConfig {
pub name: String,
@@ -180,7 +281,7 @@ pub struct DBusConfig {
pub interface: String,
pub software_manager: String,
pub software_manager_path: String,
- pub timeout: i32, // dbus-rs expects a signed int
+ pub timeout: i32,
}
impl Default for DBusConfig {
@@ -196,17 +297,53 @@ impl Default for DBusConfig {
}
}
+#[derive(RustcDecodable)]
+struct ParsedDBusConfig {
+ name: Option<String>,
+ path: Option<String>,
+ interface: Option<String>,
+ software_manager: Option<String>,
+ software_manager_path: Option<String>,
+ timeout: Option<i32>,
+}
+
+impl Default for ParsedDBusConfig {
+ fn default() -> Self {
+ ParsedDBusConfig {
+ name: None,
+ path: None,
+ interface: None,
+ software_manager: None,
+ software_manager_path: None,
+ timeout: None
+ }
+ }
+}
+
+impl Defaultify<DBusConfig> for ParsedDBusConfig {
+ fn defaultify(&mut self) -> DBusConfig {
+ let default = DBusConfig::default();
+ DBusConfig {
+ name: self.name.take().unwrap_or(default.name),
+ path: self.path.take().unwrap_or(default.path),
+ interface: self.interface.take().unwrap_or(default.interface),
+ software_manager: self.software_manager.take().unwrap_or(default.software_manager),
+ software_manager_path: self.software_manager_path.take().unwrap_or(default.software_manager_path),
+ timeout: self.timeout.take().unwrap_or(default.timeout)
+ }
+ }
+}
+
-/// A parsed representation of the [device] configuration section.
+/// The [device] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct DeviceConfig {
pub uuid: String,
pub vin: String,
pub packages_dir: String,
pub package_manager: PackageManager,
- pub system_info: Option<SystemInfo>,
- pub polling_interval: u64,
pub certificates_path: String,
+ pub system_info: Option<String>,
}
impl Default for DeviceConfig {
@@ -215,16 +352,54 @@ impl Default for DeviceConfig {
uuid: "123e4567-e89b-12d3-a456-426655440000".to_string(),
vin: "V1234567890123456".to_string(),
packages_dir: "/tmp/".to_string(),
- package_manager: PackageManager::Deb,
- system_info: Some(SystemInfo::default()),
- polling_interval: 10,
- certificates_path: "/tmp/sota_certificates".to_string()
+ package_manager: PackageManager::Off,
+ certificates_path: "/tmp/sota_certificates".to_string(),
+ system_info: Some("system_info.sh".to_string())
}
}
}
+#[derive(RustcDecodable)]
+struct ParsedDeviceConfig {
+ pub uuid: Option<String>,
+ pub vin: Option<String>,
+ pub packages_dir: Option<String>,
+ pub package_manager: Option<PackageManager>,
+ pub polling_interval: Option<u64>,
+ pub certificates_path: Option<String>,
+ pub system_info: Option<String>,
+}
-/// A parsed representation of the [gateway] configuration section.
+impl Default for ParsedDeviceConfig {
+ fn default() -> Self {
+ ParsedDeviceConfig {
+ uuid: None,
+ vin: None,
+ packages_dir: None,
+ package_manager: None,
+ polling_interval: None,
+ certificates_path: None,
+ system_info: None,
+ }
+ }
+}
+
+impl Defaultify<DeviceConfig> for ParsedDeviceConfig {
+ fn defaultify(&mut self) -> DeviceConfig {
+ let default = DeviceConfig::default();
+ DeviceConfig {
+ uuid: self.uuid.take().unwrap_or(default.uuid),
+ vin: self.vin.take().unwrap_or(default.vin),
+ packages_dir: self.packages_dir.take().unwrap_or(default.packages_dir),
+ package_manager: self.package_manager.take().unwrap_or(default.package_manager),
+ certificates_path: self.certificates_path.take().unwrap_or(default.certificates_path),
+ system_info: self.system_info.take().or(default.system_info),
+ }
+ }
+}
+
+
+/// The [gateway] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct GatewayConfig {
pub console: bool,
@@ -243,17 +418,54 @@ impl Default for GatewayConfig {
http: false,
rvi: false,
socket: false,
- websocket: true,
+ websocket: false,
}
}
}
+#[derive(RustcDecodable)]
+struct ParsedGatewayConfig {
+ console: Option<bool>,
+ dbus: Option<bool>,
+ http: Option<bool>,
+ rvi: Option<bool>,
+ socket: Option<bool>,
+ websocket: Option<bool>,
+}
-/// A parsed representation of the [network] configuration section.
+impl Default for ParsedGatewayConfig {
+ fn default() -> Self {
+ ParsedGatewayConfig {
+ console: None,
+ dbus: None,
+ http: None,
+ rvi: None,
+ socket: None,
+ websocket: None
+ }
+ }
+}
+
+impl Defaultify<GatewayConfig> for ParsedGatewayConfig {
+ fn defaultify(&mut self) -> GatewayConfig {
+ let default = GatewayConfig::default();
+ GatewayConfig {
+ console: self.console.take().unwrap_or(default.console),
+ dbus: self.dbus.take().unwrap_or(default.dbus),
+ http: self.http.take().unwrap_or(default.http),
+ rvi: self.rvi.take().unwrap_or(default.rvi),
+ socket: self.socket.take().unwrap_or(default.socket),
+ websocket: self.websocket.take().unwrap_or(default.websocket)
+ }
+ }
+}
+
+
+/// The [network] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct NetworkConfig {
- pub http_server: String,
- pub rvi_edge_server: String,
+ pub http_server: SocketAddr,
+ pub rvi_edge_server: SocketAddr,
pub socket_commands_path: String,
pub socket_events_path: String,
pub websocket_server: String
@@ -262,8 +474,8 @@ pub struct NetworkConfig {
impl Default for NetworkConfig {
fn default() -> NetworkConfig {
NetworkConfig {
- http_server: "http://127.0.0.1:8888".to_string(),
- rvi_edge_server: "http://127.0.0.1:9080".to_string(),
+ http_server: "127.0.0.1:8888".parse().unwrap(),
+ rvi_edge_server: "127.0.0.1:9080".parse().unwrap(),
socket_commands_path: "/tmp/sota-commands.socket".to_string(),
socket_events_path: "/tmp/sota-events.socket".to_string(),
websocket_server: "127.0.0.1:3012".to_string()
@@ -271,8 +483,42 @@ impl Default for NetworkConfig {
}
}
+#[derive(RustcDecodable)]
+struct ParsedNetworkConfig {
+ http_server: Option<SocketAddr>,
+ rvi_edge_server: Option<SocketAddr>,
+ socket_commands_path: Option<String>,
+ socket_events_path: Option<String>,
+ websocket_server: Option<String>
+}
+
+impl Default for ParsedNetworkConfig {
+ fn default() -> Self {
+ ParsedNetworkConfig {
+ http_server: None,
+ rvi_edge_server: None,
+ socket_commands_path: None,
+ socket_events_path: None,
+ websocket_server: None
+ }
+ }
+}
+
+impl Defaultify<NetworkConfig> for ParsedNetworkConfig {
+ fn defaultify(&mut self) -> NetworkConfig {
+ let default = NetworkConfig::default();
+ NetworkConfig {
+ http_server: self.http_server.take().unwrap_or(default.http_server),
+ rvi_edge_server: self.rvi_edge_server.take().unwrap_or(default.rvi_edge_server),
+ socket_commands_path: self.socket_commands_path.take().unwrap_or(default.socket_commands_path),
+ socket_events_path: self.socket_events_path.take().unwrap_or(default.socket_events_path),
+ websocket_server: self.websocket_server.take().unwrap_or(default.websocket_server)
+ }
+ }
+}
-/// A parsed representation of the [rvi] configuration section.
+
+/// The [rvi] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct RviConfig {
pub client: Url,
@@ -285,7 +531,35 @@ impl Default for RviConfig {
RviConfig {
client: "http://127.0.0.1:8901".parse().unwrap(),
storage_dir: "/var/sota".to_string(),
- timeout: Some(20),
+ timeout: None,
+ }
+ }
+}
+
+#[derive(RustcDecodable)]
+struct ParsedRviConfig {
+ client: Option<Url>,
+ storage_dir: Option<String>,
+ timeout: Option<i64>,
+}
+
+impl Default for ParsedRviConfig {
+ fn default() -> Self {
+ ParsedRviConfig {
+ client: None,
+ storage_dir: None,
+ timeout: None
+ }
+ }
+}
+
+impl Defaultify<RviConfig> for ParsedRviConfig {
+ fn defaultify(&mut self) -> RviConfig {
+ let default = RviConfig::default();
+ RviConfig {
+ client: self.client.take().unwrap_or(default.client),
+ storage_dir: self.storage_dir.take().unwrap_or(default.storage_dir),
+ timeout: self.timeout.take().or(default.timeout)
}
}
}
@@ -309,6 +583,8 @@ mod tests {
r#"
[core]
server = "http://127.0.0.1:8080"
+ polling = true
+ polling_sec = 10
"#;
const DBUS_CONFIG: &'static str =
@@ -327,11 +603,10 @@ mod tests {
[device]
uuid = "123e4567-e89b-12d3-a456-426655440000"
vin = "V1234567890123456"
- system_info = "./system_info.sh"
- polling_interval = 10
packages_dir = "/tmp/"
- package_manager = "deb"
+ package_manager = "off"
certificates_path = "/tmp/sota_certificates"
+ system_info = "system_info.sh"
"#;
const GATEWAY_CONFIG: &'static str =
@@ -342,14 +617,14 @@ mod tests {
http = false
rvi = false
socket = false
- websocket = true
+ websocket = false
"#;
const NETWORK_CONFIG: &'static str =
r#"
[network]
- http_server = "http://127.0.0.1:8888"
- rvi_edge_server = "http://127.0.0.1:9080"
+ http_server = "127.0.0.1:8888"
+ rvi_edge_server = "127.0.0.1:9080"
socket_commands_path = "/tmp/sota-commands.socket"
socket_events_path = "/tmp/sota-events.socket"
websocket_server = "127.0.0.1:3012"
@@ -365,7 +640,12 @@ mod tests {
#[test]
- fn parse_default_config() {
+ fn empty_config() {
+ assert_eq!(Config::parse("").unwrap(), Config::default());
+ }
+
+ #[test]
+ fn basic_config() {
let config = String::new()
+ CORE_CONFIG
+ DEVICE_CONFIG
@@ -375,7 +655,7 @@ mod tests {
}
#[test]
- fn parse_example_config() {
+ fn default_config() {
let config = String::new()
+ AUTH_CONFIG
+ CORE_CONFIG
@@ -384,6 +664,13 @@ mod tests {
+ GATEWAY_CONFIG
+ NETWORK_CONFIG
+ RVI_CONFIG;
- assert_eq!(Config::load("tests/sota.toml").unwrap(), Config::parse(&config).unwrap());
+ assert_eq!(Config::load("tests/toml/default.toml").unwrap(), Config::parse(&config).unwrap());
+ }
+
+ #[test]
+ fn backwards_compatible_config() {
+ let config = Config::load("tests/toml/old.toml").unwrap();
+ assert_eq!(config.core.polling, true);
+ assert_eq!(config.core.polling_sec, 10);
}
}
diff --git a/src/datatype/error.rs b/src/datatype/error.rs
index bb0ab4e..9503e2c 100644
--- a/src/datatype/error.rs
+++ b/src/datatype/error.rs
@@ -23,6 +23,7 @@ use ws::Error as WebsocketError;
pub enum Error {
Client(String),
Command(String),
+ Config(String),
FromUtf8(FromUtf8Error),
Http(ResponseData),
HttpAuth(ResponseData),
@@ -95,6 +96,7 @@ impl Display for Error {
let inner: String = match *self {
Error::Client(ref s) => format!("Http client error: {}", s.clone()),
Error::Command(ref e) => format!("Unknown Command: {}", e.clone()),
+ Error::Config(ref s) => format!("Bad Config: {}", s.clone()),
Error::FromUtf8(ref e) => format!("From utf8 error: {}", e.clone()),
Error::Http(ref r) => format!("HTTP client error: {}", r.clone()),
Error::HttpAuth(ref r) => format!("HTTP authorization error: {}", r.clone()),
diff --git a/src/datatype/mod.rs b/src/datatype/mod.rs
index 017868c..71e7e0a 100644
--- a/src/datatype/mod.rs
+++ b/src/datatype/mod.rs
@@ -5,10 +5,10 @@ pub mod dbus;
pub mod error;
pub mod event;
pub mod json_rpc;
-pub mod system_info;
+pub mod network;
+pub mod shell;
pub mod update_report;
pub mod update_request;
-pub mod url;
pub use self::auth::{AccessToken, Auth, ClientCredentials};
pub use self::command::Command;
@@ -17,11 +17,11 @@ pub use self::config::{AuthConfig, CoreConfig, Config, DBusConfig, DeviceConfig,
pub use self::error::Error;
pub use self::event::Event;
pub use self::json_rpc::{RpcRequest, RpcOk, RpcErr};
-pub use self::system_info::SystemInfo;
+pub use self::network::{Method, SocketAddr, Url};
+pub use self::shell::system_info;
pub use self::update_report::{DeviceReport, InstalledFirmware, InstalledPackage,
InstalledSoftware, OperationResult, UpdateResultCode,
UpdateReport};
pub use self::update_request::{ChunkReceived, DownloadComplete, DownloadFailed,
DownloadStarted, Package, UpdateAvailable,
UpdateRequest, UpdateRequestId, UpdateRequestStatus};
-pub use self::url::{Method, Url};
diff --git a/src/datatype/url.rs b/src/datatype/network.rs
index 5a9c97a..56f88ae 100644
--- a/src/datatype/url.rs
+++ b/src/datatype/network.rs
@@ -2,17 +2,53 @@ use hyper::method;
use rustc_serialize::{Decoder, Decodable};
use std::borrow::Cow;
use std::fmt::{Display, Formatter, Result as FmtResult};
-use std::io;
-use std::net::ToSocketAddrs;
+use std::net::{SocketAddr as StdSocketAddr};
+use std::ops::Deref;
use std::str::FromStr;
use url;
-use url::SocketAddrs;
use datatype::Error;
-/// Encapsulate a single crate URL with additional methods and traits.
-#[derive(PartialEq, Eq, Clone, Debug)]
+/// Encapsulate a socket address for implementing additional traits.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct SocketAddr(pub StdSocketAddr);
+
+impl Decodable for SocketAddr {
+ fn decode<D: Decoder>(d: &mut D) -> Result<SocketAddr, D::Error> {
+ let addr = try!(d.read_str());
+ addr.parse().or_else(|err| Err(d.error(&format!("{}", err))))
+ }
+}
+
+impl FromStr for SocketAddr {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match StdSocketAddr::from_str(s) {
+ Ok(addr) => Ok(SocketAddr(addr)),
+ Err(err) => Err(Error::Parse(format!("couldn't parse SocketAddr: {}", err)))
+ }
+ }
+}
+
+impl Deref for SocketAddr {
+ type Target = StdSocketAddr;
+
+ fn deref(&self) -> &StdSocketAddr {
+ &self.0
+ }
+}
+
+impl Display for SocketAddr {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "{}", self.0)
+ }
+}
+
+
+/// Encapsulate a url with additional methods and traits.
+#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Url(pub url::Url);
impl Url {
@@ -21,11 +57,6 @@ impl Url {
let url = try!(self.0.join(suffix));
Ok(Url(url))
}
-
- /// Return the encapsulated crate URL.
- pub fn inner(&self) -> url::Url {
- self.0.clone()
- }
}
impl<'a> Into<Cow<'a, Url>> for Url {
@@ -50,11 +81,11 @@ impl Decodable for Url {
}
}
-impl ToSocketAddrs for Url {
- type Iter = SocketAddrs;
+impl Deref for Url {
+ type Target = url::Url;
- fn to_socket_addrs(&self) -> io::Result<Self::Iter> {
- self.0.to_socket_addrs()
+ fn deref(&self) -> &url::Url {
+ &self.0
}
}
diff --git a/src/datatype/shell.rs b/src/datatype/shell.rs
new file mode 100644
index 0000000..d7cd4af
--- /dev/null
+++ b/src/datatype/shell.rs
@@ -0,0 +1,12 @@
+use std::process::Command;
+
+use datatype::Error;
+
+
+/// Generate a new system information report.
+pub fn system_info(cmd: &str) -> Result<String, Error> {
+ Command::new(cmd)
+ .output()
+ .map_err(|err| Error::SystemInfo(err.to_string()))
+ .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8))
+}
diff --git a/src/datatype/system_info.rs b/src/datatype/system_info.rs
deleted file mode 100644
index 987da3b..0000000
--- a/src/datatype/system_info.rs
+++ /dev/null
@@ -1,41 +0,0 @@
-use rustc_serialize::{Decoder, Decodable};
-use std::process::Command;
-
-use datatype::Error;
-
-
-/// A reference to the command that will report on the system information.
-#[derive(PartialEq, Eq, Debug, Clone)]
-pub struct SystemInfo {
- command: String
-}
-
-impl SystemInfo {
- /// Instantiate a new type to report on the system information.
- pub fn new(command: &str) -> Option<SystemInfo> {
- if command == "" {
- None
- } else {
- Some(SystemInfo { command: command.to_string() })
- }
- }
-
- /// Generate a new report of the system information.
- pub fn report(&self) -> Result<String, Error> {
- Command::new(&self.command)
- .output().map_err(|err| Error::SystemInfo(err.to_string()))
- .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8))
- }
-}
-
-impl Default for SystemInfo {
- fn default() -> SystemInfo {
- SystemInfo::new("./system_info.sh").expect("couldn't build command")
- }
-}
-
-impl Decodable for SystemInfo {
- fn decode<D: Decoder>(d: &mut D) -> Result<SystemInfo, D::Error> {
- d.read_str().and_then(|s| SystemInfo::new(&s).ok_or(d.error("bad SystemInfo command path")))
- }
-}
diff --git a/src/gateway/http.rs b/src/gateway/http.rs
index f397630..52a271c 100644
--- a/src/gateway/http.rs
+++ b/src/gateway/http.rs
@@ -4,6 +4,7 @@ use hyper::StatusCode;
use hyper::net::{HttpStream, Transport};
use hyper::server::{Server as HyperServer, Request as HyperRequest};
use rustc_serialize::json;
+use std::net::SocketAddr;
use std::thread;
use std::sync::{Arc, Mutex};
@@ -14,16 +15,16 @@ use http::{Server, ServerHandler};
/// The `Http` gateway parses `Command`s from the body of incoming requests.
pub struct Http {
- pub server: String,
+ pub server: SocketAddr
}
impl Gateway for Http {
fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
- let itx = Arc::new(Mutex::new(itx));
- let server = match HyperServer::http(&self.server.parse().expect("couldn't parse http address")) {
- Ok(server) => server,
- Err(err) => return Err(format!("couldn't start http gateway: {}", err))
- };
+ let itx = Arc::new(Mutex::new(itx));
+ let server = try!(HyperServer::http(&self.server).map_err(|err| {
+ format!("couldn't start http gateway: {}", err)
+ }));
+
thread::spawn(move || {
let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap();
server.run();
@@ -101,7 +102,7 @@ mod tests {
let (etx, erx) = chan::sync::<Event>(0);
let (itx, irx) = chan::sync::<Interpret>(0);
- thread::spawn(move || Http { server: "127.0.0.1:8888".to_string() }.start(itx, erx));
+ thread::spawn(move || Http { server: "127.0.0.1:8888".parse().unwrap() }.start(itx, erx));
thread::spawn(move || {
let _ = etx; // move into this scope
loop {
diff --git a/src/http/auth_client.rs b/src/http/auth_client.rs
index 7d10e5b..3121c21 100644
--- a/src/http/auth_client.rs
+++ b/src/http/auth_client.rs
@@ -51,7 +51,7 @@ impl AuthClient {
impl Client for AuthClient {
fn chan_request(&self, req: Request, resp_tx: Sender<Response>) {
info!("{} {}", req.method, req.url);
- let _ = self.client.request(req.url.inner(), AuthHandler {
+ let _ = self.client.request((*req.url).clone(), AuthHandler {
auth: self.auth.clone(),
req: req,
timeout: Duration::from_secs(20),
diff --git a/src/interpreter.rs b/src/interpreter.rs
index b15c37e..d2d3f99 100644
--- a/src/interpreter.rs
+++ b/src/interpreter.rs
@@ -6,7 +6,8 @@ use std::time::Duration;
use time;
use datatype::{AccessToken, Auth, ClientCredentials, Command, Config, Error, Event,
- Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode};
+ Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode,
+ system_info};
use gateway::Interpret;
use http::{AuthClient, Client};
use oauth2::authenticate;
@@ -42,8 +43,8 @@ pub trait Interpreter<I, O> {
/// The `EventInterpreter` listens for `Event`s and optionally responds with
/// `Command`s that may be sent to the `CommandInterpreter`.
pub struct EventInterpreter {
- pub pacman: PackageManager,
- pub send_sysinfo: bool,
+ pub pacman: PackageManager,
+ pub sysinfo: Option<String>,
}
impl Interpreter<Event, Command> for EventInterpreter {
@@ -58,9 +59,7 @@ impl Interpreter<Event, Command> for EventInterpreter {
}).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err));
}
- if self.send_sysinfo {
- ctx.send(Command::SendSystemInfo);
- }
+ self.sysinfo.as_ref().map(|_| ctx.send(Command::SendSystemInfo));
}
Event::NotAuthenticated => {
@@ -205,8 +204,8 @@ impl<'t> GlobalInterpreter<'t> {
}
Command::ListSystemInfo => {
- let sysinfo = self.config.device.system_info.as_ref().expect("SystemInfo command not set");
- etx.send(Event::FoundSystemInfo(try!(sysinfo.report())));
+ let cmd = self.config.device.system_info.as_ref().expect("system_info command not set");
+ etx.send(Event::FoundSystemInfo(try!(system_info(&cmd))));
}
Command::SendInstalledPackages(packages) => {
@@ -222,8 +221,8 @@ impl<'t> GlobalInterpreter<'t> {
}
Command::SendSystemInfo => {
- let sysinfo = self.config.device.system_info.as_ref().expect("SystemInfo command not set");
- try!(sota.send_system_info(&try!(sysinfo.report())));
+ let cmd = self.config.device.system_info.as_ref().expect("system_info command not set");
+ try!(sota.send_system_info(&try!(system_info(&cmd))));
etx.send(Event::SystemInfoSent);
}
diff --git a/src/main.rs b/src/main.rs
index fc001b7..9821b2d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,13 +14,13 @@ use chan_signal::Signal;
use env_logger::LogBuilder;
use getopts::Options;
use log::{LogLevelFilter, LogRecord};
-use std::{env, thread};
+use std::{env, process, thread};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use sota::datatype::{Command, Config, Event, SystemInfo};
+use sota::datatype::{Command, Config, Event};
use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket};
use sota::broadcast::Broadcast;
use sota::http::{AuthClient, set_ca_certificates};
@@ -29,55 +29,44 @@ use sota::rvi::{Edge, Services};
macro_rules! exit {
- ($fmt:expr, $($arg:tt)*) => {{
+ ($code:expr, $fmt:expr, $($arg:tt)*) => {{
print!(concat!($fmt, "\n"), $($arg)*);
- std::process::exit(1);
+ process::exit($code);
}}
}
-fn start_signal_handler(signals: Receiver<Signal>) {
- loop {
- match signals.recv() {
- Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0),
- _ => ()
- }
- }
-}
-
-fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) {
- info!("Polling for new updates every {} seconds.", interval);
- let (etx, erx) = chan::async::<Event>();
- let wait = Duration::from_secs(interval);
- loop {
- wg.wait(); // wait until not busy
- thread::sleep(wait); // then wait `interval` seconds
- itx.send(Interpret {
- command: Command::GetUpdateRequests,
- response_tx: Some(Arc::new(Mutex::new(etx.clone())))
- }); // then request new updates
- let _ = erx.recv(); // then wait for the response
- }
-}
-
fn main() {
- setup_logging();
- let config = build_config();
+ let version = start_logging();
+ let config = build_config(&version);
+
set_ca_certificates(Path::new(&config.device.certificates_path));
let (etx, erx) = chan::async::<Event>();
let (ctx, crx) = chan::async::<Command>();
let (itx, irx) = chan::async::<Interpret>();
+
let mut broadcast = Broadcast::new(erx);
let wg = WaitGroup::new();
ctx.send(Command::Authenticate(None));
crossbeam::scope(|scope| {
- // Must subscribe to the signal before spawning ANY other threads
+ // subscribe to signals first
let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]);
scope.spawn(move || start_signal_handler(signals));
+ if config.core.polling {
+ let poll_tick = config.core.polling_sec;
+ let poll_itx = itx.clone();
+ let poll_wg = wg.clone();
+ scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg));
+ }
+
+ //
+ // start gateways
+ //
+
if config.gateway.console {
let cons_itx = itx.clone();
let cons_sub = broadcast.subscribe();
@@ -85,7 +74,7 @@ fn main() {
}
if config.gateway.dbus {
- let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway"));
+ let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for dbus gateway"));
let dbus_itx = itx.clone();
let dbus_sub = broadcast.subscribe();
let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() };
@@ -95,23 +84,19 @@ fn main() {
if config.gateway.http {
let http_itx = itx.clone();
let http_sub = broadcast.subscribe();
- let mut http = Http { server: config.network.http_server.clone() };
+ let mut http = Http { server: *config.network.http_server };
scope.spawn(move || http.start(http_itx, http_sub));
}
let rvi_services = if config.gateway.rvi {
- let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway"));
- let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway"));
+ let _ = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for rvi gateway"));
+ let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!(1, "{}", "rvi config required for rvi gateway"));
let rvi_edge = config.network.rvi_edge_server.clone();
let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone());
let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone());
scope.spawn(move || edge.start());
Some(services)
} else {
- let poll_tick = config.device.polling_interval;
- let poll_itx = itx.clone();
- let poll_wg = wg.clone();
- scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg));
None
};
@@ -133,14 +118,18 @@ fn main() {
scope.spawn(move || ws.start(ws_itx, ws_sub));
}
+ //
+ // start interpreters
+ //
+
let event_sub = broadcast.subscribe();
let event_ctx = ctx.clone();
let event_mgr = config.device.package_manager.clone();
- let event_sys = config.device.system_info.is_some();
+ let event_sys = config.device.system_info.clone();
let event_wg = wg.clone();
scope.spawn(move || EventInterpreter {
- pacman: event_mgr,
- send_sysinfo: event_sys,
+ pacman: event_mgr,
+ sysinfo: event_sys,
}.run(event_sub, event_ctx, event_wg));
let cmd_itx = itx.clone();
@@ -158,26 +147,54 @@ fn main() {
});
}
-fn setup_logging() {
- let version = option_env!("SOTA_VERSION").unwrap_or("?");
+fn start_logging() -> String {
+ let version = option_env!("SOTA_VERSION").unwrap_or("unknown");
+
let mut builder = LogBuilder::new();
builder.format(move |record: &LogRecord| {
let timestamp = format!("{}", time::now_utc().rfc3339());
format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args())
});
builder.filter(Some("hyper"), LogLevelFilter::Info);
+ builder.parse(&env::var("RUST_LOG").unwrap_or("INFO".to_string()));
+ builder.init().expect("builder already initialized");
+
+ version.to_string()
+}
- let _ = env::var("RUST_LOG").map(|level| builder.parse(&level));
- builder.init().expect("env_logger::init() called twice, blame the programmers.");
+fn start_signal_handler(signals: Receiver<Signal>) {
+ loop {
+ match signals.recv() {
+ Some(Signal::INT) | Some(Signal::TERM) => process::exit(0),
+ _ => ()
+ }
+ }
}
-fn build_config() -> Config {
+fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) {
+ info!("Polling for new updates every {} seconds.", interval);
+ let (etx, erx) = chan::async::<Event>();
+ let wait = Duration::from_secs(interval);
+ loop {
+ wg.wait(); // wait until not busy
+ thread::sleep(wait); // then wait `interval` seconds
+ itx.send(Interpret {
+ command: Command::GetUpdateRequests,
+ response_tx: Some(Arc::new(Mutex::new(etx.clone())))
+ }); // then request new updates
+ let _ = erx.recv(); // then wait for the response
+ }
+}
+
+fn build_config(version: &str) -> Config {
let args = env::args().collect::<Vec<String>>();
let program = args[0].clone();
let mut opts = Options::new();
- opts.optflag("h", "help", "print this help menu");
- opts.optopt("", "config", "change config path", "PATH");
+ opts.optflag("h", "help", "print this help menu then quit");
+ opts.optflag("p", "print", "print the parsed config then quit");
+ opts.optflag("v", "version", "print the version then quit");
+ opts.optopt("c", "config", "change config path", "PATH");
opts.optopt("", "auth-server", "change the auth server", "URL");
opts.optopt("", "auth-client-id", "change the auth client id", "ID");
@@ -185,6 +202,8 @@ fn build_config() -> Config {
opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH");
opts.optopt("", "core-server", "change the core server", "URL");
+ opts.optopt("", "core-polling", "toggle polling the core server for updates", "BOOL");
+ opts.optopt("", "core-polling-sec", "change the core polling interval", "SECONDS");
opts.optopt("", "dbus-name", "change the dbus registration name", "NAME");
opts.optopt("", "dbus-path", "change the dbus path", "PATH");
@@ -197,7 +216,6 @@ fn build_config() -> Config {
opts.optopt("", "device-vin", "change the device vin", "VIN");
opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH");
opts.optopt("", "device-package-manager", "change the package manager", "MANAGER");
- opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL");
opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH");
opts.optopt("", "device-system-info", "change the system information command", "PATH");
@@ -219,25 +237,37 @@ fn build_config() -> Config {
opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT");
let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string()));
- if matches.opt_present("h") {
- exit!("{}", opts.usage(&format!("Usage: {} [options]", program)));
+
+ if matches.opt_present("help") {
+ exit!(0, "{}", opts.usage(&format!("Usage: {} [options]", program)));
+ } else if matches.opt_present("version") {
+ exit!(0, "{}", version);
}
- let config_file = matches.opt_str("config").unwrap_or_else(|| {
- env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided."))
- });
- let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err));
+ let mut config = match matches.opt_str("config").or(env::var("SOTA_CONFIG").ok()) {
+ Some(file) => Config::load(&file).unwrap_or_else(|err| exit!(1, "{}", err)),
+ None => {
+ warn!("No config file given. Falling back to defaults.");
+ Config::default()
+ }
+ };
config.auth.as_mut().map(|auth_cfg| {
matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id);
matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret);
matches.opt_str("auth-server").map(|text| {
- auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err));
+ auth_cfg.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid auth-server URL: {}", err));
});
});
matches.opt_str("core-server").map(|text| {
- config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err));
+ config.core.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid core-server URL: {}", err));
+ });
+ matches.opt_str("core-polling").map(|polling| {
+ config.core.polling = polling.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling boolean: {}", err));
+ });
+ matches.opt_str("core-polling-sec").map(|secs| {
+ config.core.polling_sec = secs.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling-sec: {}", err));
});
config.dbus.as_mut().map(|dbus_cfg| {
@@ -247,7 +277,7 @@ fn build_config() -> Config {
matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr);
matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path);
matches.opt_str("dbus-timeout").map(|timeout| {
- dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err));
+ dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid dbus-timeout: {}", err));
});
});
@@ -255,48 +285,55 @@ fn build_config() -> Config {
matches.opt_str("device-vin").map(|vin| config.device.vin = vin);
matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path);
matches.opt_str("device-package-manager").map(|text| {
- config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err));
- });
- matches.opt_str("device-polling-interval").map(|interval| {
- config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err));
+ config.device.package_manager = text.parse().unwrap_or_else(|err| exit!(1, "Invalid device-package-manager: {}", err));
});
matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs);
- matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(&cmd));
+ matches.opt_str("device-system-info").map(|cmd| {
+ config.device.system_info = if cmd.len() > 0 { Some(cmd) } else { None }
+ });
matches.opt_str("gateway-console").map(|console| {
- config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err));
+ config.gateway.console = console.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-console boolean: {}", err));
});
matches.opt_str("gateway-dbus").map(|dbus| {
- config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err));
+ config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-dbus boolean: {}", err));
});
matches.opt_str("gateway-http").map(|http| {
- config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err));
+ config.gateway.http = http.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-http boolean: {}", err));
});
matches.opt_str("gateway-rvi").map(|rvi| {
- config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err));
+ config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-rvi boolean: {}", err));
});
matches.opt_str("gateway-socket").map(|socket| {
- config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err));
+ config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-socket boolean: {}", err));
});
matches.opt_str("gateway-websocket").map(|websocket| {
- config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err));
+ config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-websocket boolean: {}", err));
});
- matches.opt_str("network-http-server").map(|server| config.network.http_server = server);
- matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server);
+ matches.opt_str("network-http-server").map(|addr| {
+ config.network.http_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-http-server: {}", err));
+ });
+ matches.opt_str("network-rvi-edge-server").map(|addr| {
+ config.network.rvi_edge_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-rvi-edge-server: {}", err));
+ });
matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path);
matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path);
matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server);
config.rvi.as_mut().map(|rvi_cfg| {
matches.opt_str("rvi-client").map(|url| {
- rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err));
+ rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-client URL: {}", err));
});
matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir);
matches.opt_str("rvi-timeout").map(|timeout| {
- rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err)));
+ rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-timeout: {}", err)));
});
});
+ if matches.opt_present("print") {
+ exit!(0, "{:#?}", config);
+ }
+
config
}
diff --git a/src/rvi/edge.rs b/src/rvi/edge.rs
index cadea74..85d46df 100644
--- a/src/rvi/edge.rs
+++ b/src/rvi/edge.rs
@@ -4,25 +4,24 @@ use hyper::server::{Server as HyperServer, Request as HyperRequest};
use rustc_serialize::json;
use rustc_serialize::json::Json;
use std::{mem, str};
-use std::net::ToSocketAddrs;
-use datatype::{RpcRequest, RpcOk, RpcErr, Url};
+use datatype::{RpcRequest, RpcOk, RpcErr, SocketAddr, Url};
use http::{Server, ServerHandler};
use super::services::Services;
/// The HTTP server endpoint for `RVI` client communication.
pub struct Edge {
- rvi_edge: Url,
+ rvi_edge: SocketAddr,
services: Services,
}
impl Edge {
/// Create a new `Edge` by registering each `RVI` service.
- pub fn new(mut services: Services, rvi_edge: String, rvi_client: Url) -> Self {
+ pub fn new(mut services: Services, rvi_edge: SocketAddr, rvi_client: Url) -> Self {
services.register_services(|service| {
let req = RpcRequest::new("register_service", RegisterServiceRequest {
- network_address: rvi_edge.clone(),
+ network_address: format!("http://{}", rvi_edge),
service: service.to_string(),
});
let resp = req.send(rvi_client.clone())
@@ -32,14 +31,12 @@ impl Edge {
rpc_ok.result.expect("expected rpc_ok result").service
});
- Edge { rvi_edge: rvi_edge.parse().expect("couldn't parse edge server as url"), services: services }
+ Edge { rvi_edge: rvi_edge, services: services }
}
/// Start the HTTP server listening for incoming RVI client connections.
pub fn start(&mut self) {
- let mut addrs = self.rvi_edge.to_socket_addrs()
- .unwrap_or_else(|err| panic!("couldn't parse edge url: {}", err));
- let server = HyperServer::http(&addrs.next().expect("no SocketAddr found"))
+ let server = HyperServer::http(&*self.rvi_edge)
.unwrap_or_else(|err| panic!("couldn't start rvi edge server: {}", err));
let (addr, server) = server.handle(move |_| EdgeHandler::new(self.services.clone())).unwrap();
info!("RVI server edge listening at http://{}.", addr);
@@ -61,7 +58,6 @@ struct RegisterServiceResponse {
}
-
struct EdgeHandler {
services: Services,
resp_code: StatusCode,