diff options
Diffstat (limited to 'src')
65 files changed, 4918 insertions, 2839 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs new file mode 100644 index 0000000..5389b0c --- /dev/null +++ b/src/broadcast.rs @@ -0,0 +1,60 @@ +use chan; +use chan::{Sender, Receiver}; + + +/// Retain a list of all peers that should receive the incoming message. +pub struct Broadcast<A: Clone> { + peers: Vec<Sender<A>>, + rx: Receiver<A> +} + +impl<A: Clone> Broadcast<A> { + /// Instantiate a new broadcaster for the given `Receiver`. + pub fn new(rx: Receiver<A>) -> Broadcast<A> { + Broadcast { peers: vec![], rx: rx } + } + + /// Start receiving broadcasting messages and forwarding each to the list + /// of peers. + pub fn start(&self) { + loop { + self.rx.recv().map(|a| { + for subscriber in &self.peers { + subscriber.send(a.clone()); + } + }); + } + } + + /// Add a new subscriber to the list of peers that will receive the broadcast + /// messages. + pub fn subscribe(&mut self) -> Receiver<A> { + let (tx, rx) = chan::sync::<A>(0); + self.peers.push(tx); + rx + } +} + + +#[cfg(test)] +mod tests { + use chan; + use std::thread; + + use super::*; + + + #[test] + fn test_broadcasts_events() { + let (tx, rx) = chan::sync(0); + let mut broadcast = Broadcast::new(rx); + + let a = broadcast.subscribe(); + let b = broadcast.subscribe(); + thread::spawn(move || broadcast.start()); + + tx.send(123); + assert_eq!(123, a.recv().unwrap()); + assert_eq!(123, b.recv().unwrap()); + } +} diff --git a/src/configuration/client.rs b/src/configuration/client.rs deleted file mode 100644 index 625f4e9..0000000 --- a/src/configuration/client.rs +++ /dev/null @@ -1,177 +0,0 @@ -//! Handles the `client` section of the configuration file. - -use toml; - -use super::common::{get_required_key, get_optional_key, ConfTreeParser, Result}; - -/// Type to encode allowed keys for the `client` section of the configuration. -#[derive(Clone)] -pub struct ClientConfiguration { - /// Directory where chunks and packages will be stored. - pub storage_dir: String, - /// The full URL where RVI can be reached. - pub rvi_url: Option<String>, - /// The `host:port` combination where the client should bind and listen for incoming RVI calls. - pub edge_url: Option<String>, - /// How long to wait for further server messages before the `Transfer` will be dropped. - pub timeout: Option<i64>, - /// Index of the RVI service URL, that holds the VIN for this device. - pub vin_match: i32 -} - -impl ConfTreeParser<ClientConfiguration> for ClientConfiguration { - fn parse(tree: &toml::Table) -> Result<ClientConfiguration> { - let client_tree = try!(tree.get("client") - .ok_or("Missing required subgroup \"client\"")); - - let storage_dir = try!(get_required_key(client_tree, "storage_dir", "client")); - let rvi_url = try!(get_optional_key(client_tree, "rvi_url", "client")); - let edge_url = try!(get_optional_key(client_tree, "edge_url", "client")); - let timeout = try!(get_optional_key(client_tree, "timeout", "client")); - let vin_match = try!(get_optional_key(client_tree, "vin_match", "client")); - - Ok(ClientConfiguration { - storage_dir: storage_dir, - rvi_url: rvi_url, - edge_url: edge_url, - timeout: timeout, - vin_match: vin_match.unwrap_or(2) - }) - } -} - -#[cfg(test)] static STORAGE: &'static str = "/var/sota"; -#[cfg(test)] static RVI: &'static str = "/http://localhost:8901"; -#[cfg(test)] static EDGE: &'static str = "localhost:9080"; -#[cfg(test)] static TIMEOUT: i64 = 10; -#[cfg(test)] static VIN: i32 = 3; - -#[cfg(test)] -pub fn gen_valid_conf() -> String { - format!(r#" - [client] - storage_dir = "{}" - rvi_url = "{}" - edge_url = "{}" - timeout = {} - vin_match = {} - "#, STORAGE, RVI, EDGE, TIMEOUT, VIN) -} - -#[cfg(test)] -pub fn assert_conf(configuration: &ClientConfiguration) -> bool { - assert_eq!(&configuration.storage_dir, STORAGE); - assert_eq!(&configuration.rvi_url.clone().unwrap(), RVI); - assert_eq!(&configuration.edge_url.clone().unwrap(), EDGE); - assert_eq!(configuration.timeout.unwrap(), TIMEOUT); - assert_eq!(configuration.vin_match, VIN); - true -} - -#[cfg(test)] -pub mod test { - use super::*; - use super::{STORAGE, RVI, EDGE, TIMEOUT, VIN}; - use configuration::common::{ConfTreeParser, read_tree}; - - #[test] - fn it_requires_the_storage_dir_key() { - test_init!(); - let data = format!(r#" - [client] - rvi_url = "{}" - edge_url = "{}" - timeout = {} - vin_match = {} - "#, RVI, EDGE, TIMEOUT, VIN); - - let tree = read_tree(&data).unwrap(); - match ClientConfiguration::parse(&tree) { - Ok(..) => panic!("Accepted invalid configuration!"), - Err(e) => { - assert_eq!(e, - "Missing required key \"storage_dir\" in \"client\"" - .to_string()); - } - }; - } - - #[test] - fn it_doesnt_require_the_rvi_url_key() { - test_init!(); - let data = format!(r#" - [client] - storage_dir = "{}" - edge_url = "{}" - timeout = {} - vin_match = {} - "#, STORAGE, EDGE, TIMEOUT, VIN); - - let tree = read_tree(&data).unwrap(); - let configuration = ClientConfiguration::parse(&tree).unwrap(); - assert_eq!(&configuration.storage_dir, STORAGE); - assert_eq!(configuration.rvi_url, None); - assert_eq!(&configuration.edge_url.unwrap(), EDGE); - assert_eq!(configuration.timeout.unwrap(), TIMEOUT); - assert_eq!(configuration.vin_match, VIN); - } - - #[test] - fn it_doesnt_require_the_edge_url_key() { - test_init!(); - let data = format!(r#" - [client] - storage_dir = "{}" - rvi_url = "{}" - timeout = {} - vin_match = {} - "#, STORAGE, RVI, TIMEOUT, VIN); - - let tree = read_tree(&data).unwrap(); - let configuration = ClientConfiguration::parse(&tree).unwrap(); - assert_eq!(&configuration.storage_dir, STORAGE); - assert_eq!(&configuration.rvi_url.unwrap(), RVI); - assert_eq!(configuration.edge_url, None); - assert_eq!(configuration.vin_match, VIN); - } - - #[test] - fn it_doesnt_require_the_timeout_key() { - test_init!(); - let data = format!(r#" - [client] - storage_dir = "{}" - rvi_url = "{}" - edge_url = "{}" - vin_match = {} - "#, STORAGE, RVI, EDGE, VIN); - - let tree = read_tree(&data).unwrap(); - let configuration = ClientConfiguration::parse(&tree).unwrap(); - assert_eq!(&configuration.storage_dir, STORAGE); - assert_eq!(&configuration.rvi_url.unwrap(), RVI); - assert_eq!(&configuration.edge_url.unwrap(), EDGE); - assert_eq!(configuration.timeout, None); - assert_eq!(configuration.vin_match, VIN); - } - - #[test] - fn it_doesnt_require_the_vin_match_key_and_uses_a_default() { - test_init!(); - let data = format!(r#" - [client] - storage_dir = "{}" - rvi_url = "{}" - edge_url = "{}" - timeout = {} - "#, STORAGE, RVI, EDGE, TIMEOUT); - - let tree = read_tree(&data).unwrap(); - let configuration = ClientConfiguration::parse(&tree).unwrap(); - assert_eq!(&configuration.storage_dir, STORAGE); - assert_eq!(&configuration.rvi_url.unwrap(), RVI); - assert_eq!(&configuration.edge_url.unwrap(), EDGE); - assert_eq!(configuration.timeout.unwrap(), TIMEOUT); - assert_eq!(configuration.vin_match, 2); - } -} diff --git a/src/configuration/common.rs b/src/configuration/common.rs deleted file mode 100644 index e6b0fcd..0000000 --- a/src/configuration/common.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! Helper functions and traits for all configuration sections. - -use toml; -use std::result; -use std::fmt; - -/// `Result` type used throughout the configuration parser. -pub type Result<T> = result::Result<T, String>; - -/// Trait that provides a interface for parsing a (sub-) tree of the configuration. -pub trait ConfTreeParser<C> { - /// Try to parse the given `tree` into the type this trait is implemented for. - /// Returns the parsed object or a error message, with the first error encountered while - /// parsing the `tree`. - /// - /// # Arguments - /// * `tree`: The `toml` tree to parse - fn parse(tree: &toml::Table) -> Result<C>; -} - -/// Parse a required key, returning a appropriate error message, if the key can't be found in the -/// configuration. -/// -/// # Arguments -/// * `subtree`: The `toml` tree to parse. -/// * `key`: The key to look for. -/// * `group`: The group, this (sub-) tree is associated with. -pub fn get_required_key<D>(subtree: &toml::Value, key: &str, group: &str) - -> Result<D> where D: ParseTomlValue { - let value = try!(subtree.lookup(key) - .ok_or(format!("Missing required key \"{}\" in \"{}\"", - key, group))); - ParseTomlValue::parse(value, key, group) -} - -/// Parse a optional key, returning None if it can't be found. -/// -/// This basically does a `Option<Result>` to `Result<Option>` translation -/// -/// # Arguments -/// * `subtree`: The `toml` tree to parse. -/// * `key`: The key to look for. -/// * `group`: The group, this (sub-) tree is associated with. -pub fn get_optional_key<D>(subtree: &toml::Value, key: &str, group: &str) - -> Result<Option<D>> where D: ParseTomlValue { - match subtree.lookup(key) { - Some(val) => { - Ok(Some(try!(ParseTomlValue::parse(val, key, group)))) - }, - None => Ok(None) - } -} - -/// Trait that provides a interface for parsing a single `toml` value. -pub trait ParseTomlValue { - /// Parse a `String` from a `toml` value. Returns the parsed value on success or a error - /// message on failure. - /// - /// # Arguments - /// * `val`: The `toml` value to parse. - /// * `key`: The key this value is associated with. - /// * `group`: The group, this (sub-) tree is associated with. - fn parse(val: &toml::Value, key: &str, group: &str) -> Result<Self> where Self: Sized; -} - -impl ParseTomlValue for String { - fn parse(val: &toml::Value, key: &str, group: &str) - -> Result<String> { - val.as_str().map(|s| s.to_string()) - .ok_or(format!("Key \"{}\" in \"{}\" is not a string", key, group)) - } -} - -impl ParseTomlValue for i32 { - fn parse(val: &toml::Value, key: &str, group: &str) - -> Result<i32> { - val.as_integer().map(|i| i as i32) - .ok_or(format!("Key \"{}\" in \"{}\" is not a integer", key, group)) - } -} - -impl ParseTomlValue for i64 { - fn parse(val: &toml::Value, key: &str, group: &str) - -> Result<i64> { - val.as_integer() - .ok_or(format!("Key \"{}\" in \"{}\" is not a integer", key, group)) - } -} - -/// Helper function to format a `toml::Parser` error message to the format used in this -/// implementation. This is only safe to call if the `parser` is associated with a *real* file on -/// disk. -/// -/// # Arguments -/// * `parser`: Pointer to the `toml::Parser`, that produced a error. -#[cfg(not(test))] -pub fn format_parser_error(parser: &toml::Parser) -> String { - let linecol = parser.to_linecol(0); - format!("parse error: {}:{}: {:?}", linecol.0, linecol.1, parser.errors) -} - -/// Helper function to format a `toml::Parser` error message to the format used in this -/// implementation. This version is always safe to call, but doesn't print the line and column -/// where the error was encountered. -/// -/// # Arguments -/// * `parser`: Pointer to the `toml::Parser`, that produced a error. -#[cfg(test)] -pub fn format_parser_error(parser: &toml::Parser) -> String { - format!("parse error: {:?}", parser.errors) -} - -/// Helper function to copy anything that implements `Display` to a `String`. -pub fn stringify<T>(e: T) -> String - where T: fmt::Display { - format!("{}", e) -} - -/// Reads the provided `tree` as a `toml::Table`. Returns the `toml::Table` on success or a error -/// message on failure. -/// -/// # Arguments -/// * `tree`: Pointer to a `str`, that holds a toml configuration. -#[cfg(test)] -pub fn read_tree(tree: &str) -> Result<toml::Table> { - let mut parser = toml::Parser::new(tree); - parser.parse().ok_or(format_parser_error(&parser)) -} diff --git a/src/configuration/configuration.rs b/src/configuration/configuration.rs deleted file mode 100644 index c2c0416..0000000 --- a/src/configuration/configuration.rs +++ /dev/null @@ -1,176 +0,0 @@ -//! Main logic for parsing the configuration file - -use toml; -use std::io::prelude::*; -use std::path::PathBuf; -use std::fs::OpenOptions; -use std::env; - -use super::common::{ConfTreeParser, format_parser_error, stringify, Result}; -use super::client::ClientConfiguration; -use super::dbus::DBusConfiguration; - -/// Type to encode the full configuration. -#[derive(Clone)] -pub struct Configuration { - /// The `client` section of the configuration - pub client: ClientConfiguration, - /// The `dbus` section of the configuration - pub dbus: DBusConfiguration -} - -impl Configuration { - /// Try to read the configuration from the provided path and parse it into a `Configuration` - /// object. Returns the parsed `Configuration` on success or the first error message - /// encountered while reading or parsing the configuration file. - /// - /// # Arguments - /// * `path`: Path to the location of the configuration file. - pub fn read(path: &str) -> Result<Configuration> { - let path = PathBuf::from(path); - let mut f = try!(OpenOptions::new().open(path).map_err(stringify)); - let mut buf = Vec::new(); - try!(f.read_to_end(&mut buf).map_err(stringify)); - let data = try!(String::from_utf8(buf).map_err(stringify)); - Configuration::parse(&data) - } - - /// Try to parse the given string to a `Configuration`. - /// - /// # Arguments - /// * `conf`: The configuration to parse. - pub fn parse(conf: &str) -> Result<Configuration> { - let mut parser = toml::Parser::new(conf); - let tree = try!(parser.parse().ok_or(format_parser_error(&parser))); - - let client = try!(ClientConfiguration::parse(&tree)); - let dbus = try!(DBusConfiguration::parse(&tree)); - - Ok(Configuration { - client: client, - dbus: dbus - }) - } - - /// Try to find the configuration file in different paths. First - /// `$XDG_CONFIG_HOME/sota/client.toml` is tried, then `$HOME/.sota/client.toml` returns - /// `$PWD/.sota/client.toml` if none of the above can be found. The case where this file also - /// doesn't exist should be handled by [`read`](#method.read). - pub fn default_path() -> String { - match env::var_os("XDG_CONFIG_HOME") - .and_then(|s| s.into_string().ok()) { - Some(val) => { return val + "/sota/client.toml"; }, - None => { error!("$XDG_CONFIG_HOME is not set"); } - } - - match env::var_os("HOME").and_then(|s| s.into_string().ok()) { - Some(val) => { - warn!("Falling back to $HOME/.config"); - return val + "/.sota/client.toml"; - }, - None => { error!("$HOME is not set"); } - } - - warn!("Falling back to $PWD"); - ".sota/client.toml".to_string() - } -} - -#[cfg(test)] -mod test { - use super::*; - use std::env; - use configuration::client; - use configuration::dbus; - - #[test] - fn it_uses_fallbacks_for_its_configuration() { - test_init!(); - env::remove_var("XDG_CONFIG_HOME"); - env::set_var("XDG_CONFIG_HOME", "/some/thing"); - assert_eq!(Configuration::default_path(), - "/some/thing/sota/client.toml".to_string()); - env::remove_var("XDG_CONFIG_HOME"); - env::remove_var("HOME"); - env::set_var("HOME", "/some/thing"); - assert_eq!(Configuration::default_path(), - "/some/thing/.sota/client.toml".to_string()); - env::remove_var("XDG_CONFIG_HOME"); - env::remove_var("HOME"); - assert_eq!(Configuration::default_path(), - ".sota/client.toml".to_string()); - } - - #[test] - fn it_correctly_parses_a_valid_configuration() { - test_init!(); - let data = format!("{}\n{}", - client::gen_valid_conf(), - dbus::gen_valid_conf()); - - let configuration = Configuration::parse(&data).unwrap(); - assert!(client::assert_conf(&configuration.client)); - assert!(dbus::assert_conf(&configuration.dbus)); - } - - #[test] - fn it_ignores_extra_keys() { - test_init!(); - let data = format!(r#" - {} - test_key = "hello world" - - {} - test_key = "see ya world" - "#, client::gen_valid_conf(), - dbus::gen_valid_conf()); - - let configuration = Configuration::parse(&data).unwrap(); - assert!(client::assert_conf(&configuration.client)); - assert!(dbus::assert_conf(&configuration.dbus)); - } - - #[test] - fn it_ignores_extra_groups() { - test_init!(); - let data = format!(r#" - {} - - {} - - [test] - test_key = "hello world" - "#, client::gen_valid_conf(), - dbus::gen_valid_conf()); - - let configuration = Configuration::parse(&data).unwrap(); - assert!(client::assert_conf(&configuration.client)); - assert!(dbus::assert_conf(&configuration.dbus)); - } - - #[test] - fn it_requires_the_client_group() { - test_init!(); - let data = format!("{}", dbus::gen_valid_conf()); - match Configuration::parse(&data) { - Ok(..) => panic!("Accepted invalid configuration!"), - Err(e) => { - assert_eq!(e, "Missing required subgroup \"client\"" - .to_string()); - } - }; - } - - #[test] - fn it_requires_the_dbus_group() { - test_init!(); - let data = format!("{}", client::gen_valid_conf()); - match Configuration::parse(&data) { - Ok(..) => panic!("Accepted invalid configuration!"), - Err(e) => { - assert_eq!(e, "Missing required subgroup \"dbus\"" - .to_string()); - } - }; - } -} diff --git a/src/configuration/dbus.rs b/src/configuration/dbus.rs deleted file mode 100644 index 83a06dc..0000000 --- a/src/configuration/dbus.rs +++ /dev/null @@ -1,155 +0,0 @@ -//! Handles the `dbus` section of the configuration file. - -use toml; - -use super::common::{get_required_key, get_optional_key, ConfTreeParser, Result}; - -/// Type to encode allowed keys for the `dbus` section of the configuration. -#[derive(Clone)] -pub struct DBusConfiguration { - /// The DBus name that sota_client registers. - pub name: String, - /// The DBus path that sota_client registers. - pub path: String, - /// The interface name that sota_client provides. - pub interface: String, - /// The name and interface, where the software loading manager can be reached. - pub software_manager: String, - /// The name and interface, where the software loading manager can be reached. - pub software_manager_path: String, - /// Time to wait for installation of a package before it is considered a failure. In seconds. - pub timeout: i32 // dbus-rs expects a signed int -} - -#[cfg(test)] -impl DBusConfiguration { - /// Generate a test configuration. - pub fn gen_test() -> DBusConfiguration { - DBusConfiguration { - name: "org.test.test".to_string(), - path: "org.test.test".to_string(), - interface: "org.test.test".to_string(), - software_manager: "org.test.software_manager".to_string(), - software_manager_path: "org.test.software_manager".to_string(), - timeout: 20 - } - } -} - -impl ConfTreeParser<DBusConfiguration> for DBusConfiguration { - fn parse(tree: &toml::Table) -> Result<DBusConfiguration> { - let dbus_tree = try!(tree.get("dbus") - .ok_or("Missing required subgroup \"dbus\"")); - let name = try!(get_required_key(dbus_tree, "name", "dbus")); - let path = try!(get_required_key(dbus_tree, "path", "dbus")); - let interface = try!(get_required_key(dbus_tree, "interface", "dbus")); - let software_manager = try!(get_required_key(dbus_tree, "software_manager", "dbus")); - let software_manager_path = try!(get_required_key(dbus_tree, "software_manager_path", "dbus")); - let timeout = try!(get_optional_key(dbus_tree, "timeout", "dbus")); - - Ok(DBusConfiguration { - name: name, - path: path, - interface: interface, - software_manager: software_manager, - software_manager_path: software_manager_path, - timeout: timeout.unwrap_or(60) * 1000 - }) - } -} - -#[cfg(test)] static NAME: &'static str = "org.genivi.sota_client"; -#[cfg(test)] static PATH: &'static str = "/org/genivi/sota_client"; -#[cfg(test)] static INTERFACE: &'static str = "org.genivi.software_manager"; -#[cfg(test)] static SOFTWARE_MANAGER: &'static str = "org.genivi.software_manager"; -#[cfg(test)] static SOFTWARE_MANAGER_PATH: &'static str = "/org/genivi/software_manager"; - -#[cfg(test)] -pub fn gen_valid_conf() -> String { - format!(r#" - [dbus] - name = "{}" - path = "{}" - interface = "{}" - software_manager = "{}" - software_manager_path = "{}" - "#, NAME, PATH, INTERFACE, SOFTWARE_MANAGER, SOFTWARE_MANAGER_PATH) -} - -#[cfg(test)] -pub fn assert_conf(conf: &DBusConfiguration) -> bool { - assert_eq!(&conf.name, NAME); - assert_eq!(&conf.path, PATH); - assert_eq!(&conf.interface, INTERFACE); - assert_eq!(&conf.software_manager, SOFTWARE_MANAGER); - assert_eq!(&conf.software_manager_path, SOFTWARE_MANAGER_PATH); - true -} - -#[cfg(test)] -mod test { - use super::*; - use super::{NAME, PATH, INTERFACE, SOFTWARE_MANAGER}; - use configuration::common::{ConfTreeParser, read_tree}; - - #[test] - fn it_requires_the_dbus_name_key() { - test_init!(); - let data = format!(r#" - [dbus] - interface = "{}" - software_manager = "{}" - "#, INTERFACE, SOFTWARE_MANAGER); - - let tree = read_tree(&data).unwrap(); - match DBusConfiguration::parse(&tree) { - Ok(..) => panic!("Accepted invalid configuration!"), - Err(e) => { - assert_eq!(e, - "Missing required key \"name\" in \"dbus\"" - .to_string()); - } - }; - } - - #[test] - fn it_requires_the_dbus_interface_key() { - test_init!(); - let data = format!(r#" - [dbus] - name = "{}" - path = "{}" - software_manager = "{}" - "#, NAME, PATH, SOFTWARE_MANAGER); - - let tree = read_tree(&data).unwrap(); - match DBusConfiguration::parse(&tree) { - Ok(..) => panic!("Accepted invalid configuration!"), - Err(e) => { - assert_eq!(e, - "Missing required key \"interface\" in \"dbus\"" - .to_string()); - } - }; - } - - #[test] - fn it_requires_the_dbus_software_manager_key() { - test_init!(); - let data = format!(r#" - [dbus] - name = "{}" - path = "{}" - interface = "{}" - "#, NAME, PATH, INTERFACE); - - let tree = read_tree(&data).unwrap(); - match DBusConfiguration::parse(&tree) { - Ok(..) => panic!("Accepted invalid configuration!"), - Err(e) => { - assert_eq!(e, "Missing required key \"software_manager\" \ - in \"dbus\"".to_string()); - } - }; - } -} diff --git a/src/configuration/mod.rs b/src/configuration/mod.rs deleted file mode 100644 index e2a986f..0000000 --- a/src/configuration/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Parsing of the configuration file of `sota_client`. -//! -//! Also see the documentation for [`toml`](../../toml/index.html). - -mod configuration; -mod common; -mod client; -mod dbus; - -pub use self::configuration::Configuration; -pub use self::client::ClientConfiguration; -pub use self::dbus::DBusConfiguration; diff --git a/src/datatype/auth.rs b/src/datatype/auth.rs new file mode 100644 index 0000000..cbfd097 --- /dev/null +++ b/src/datatype/auth.rs @@ -0,0 +1,49 @@ +use std::borrow::Cow; + + +/// The available authentication types for communicating with the Auth server. +#[derive(Clone, Debug)] +pub enum Auth { + None, + Credentials(ClientId, ClientSecret), + Token(AccessToken), +} + +impl<'a> Into<Cow<'a, Auth>> for Auth { + fn into(self) -> Cow<'a, Auth> { + Cow::Owned(self) + } +} + + +/// For storage of the returned access token data following a successful +/// authentication. +#[derive(RustcDecodable, Debug, PartialEq, Clone, Default)] +pub struct AccessToken { + pub access_token: String, + pub token_type: String, + pub expires_in: i32, + pub scope: String +} + +impl<'a> Into<Cow<'a, AccessToken>> for AccessToken { + fn into(self) -> Cow<'a, AccessToken> { + Cow::Owned(self) + } +} + + +/// Encapsulates a `String` type for use in `Auth::Credentials` +#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] +pub struct ClientId(pub String); + +/// Encapsulates a `String` type for use in `Auth::Credentials` +#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] +pub struct ClientSecret(pub String); + +/// Encapsulates the client id and secret used during authentication. +#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] +pub struct ClientCredentials { + pub client_id: ClientId, + pub client_secret: ClientSecret, +} diff --git a/src/datatype/command.rs b/src/datatype/command.rs new file mode 100644 index 0000000..d449bb6 --- /dev/null +++ b/src/datatype/command.rs @@ -0,0 +1,306 @@ +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::str; +use std::str::FromStr; + +use nom::{IResult, space, eof}; +use datatype::{ClientCredentials, ClientId, ClientSecret, DownloadComplete, Error, + InstalledSoftware, Package, UpdateReport, UpdateRequestId, + UpdateResultCode}; + + +/// System-wide commands that are sent to the interpreter. +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] +pub enum Command { + /// Authenticate with the auth server. + Authenticate(Option<ClientCredentials>), + /// Shutdown the client immediately. + Shutdown, + + /// Check for any new updates. + GetNewUpdates, + /// List the installed packages on the system. + ListInstalledPackages, + /// Get the latest system information, and optionally publish it to Core. + RefreshSystemInfo(bool), + + /// Start downloading one or more updates. + StartDownload(Vec<UpdateRequestId>), + /// Start installing an update + StartInstall(DownloadComplete), + + /// Send a list of packages to the Core server. + SendInstalledPackages(Vec<Package>), + /// Send a list of all packages and firmware to the Core server. + SendInstalledSoftware(InstalledSoftware), + /// Send a package update report to the Core server. + SendUpdateReport(UpdateReport), +} + +impl Display for Command { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{:?}", self) + } +} + +impl FromStr for Command { + type Err = Error; + + fn from_str(s: &str) -> Result<Command, Error> { + match command(s.as_bytes()) { + IResult::Done(_, cmd) => parse_arguments(cmd.0, cmd.1.clone()), + _ => Err(Error::Command(format!("bad command: {}", s))) + } + } +} + + +named!(command <(Command, Vec<&str>)>, chain!( + space? + ~ cmd: alt!( + alt_complete!(tag!("Authenticate") | tag!("auth")) + => { |_| Command::Authenticate(None) } + | alt_complete!(tag!("GetNewUpdates") | tag!("new")) + => { |_| Command::GetNewUpdates } + | alt_complete!(tag!("ListInstalledPackages") | tag!("ls")) + => { |_| Command::ListInstalledPackages } + | alt_complete!(tag!("RefreshSystemInfo") | tag!("info")) + => { |_| Command::RefreshSystemInfo(false) } + | alt_complete!(tag!("Shutdown") | tag!("shutdown")) + => { |_| Command::Shutdown } + | alt_complete!(tag!("SendInstalledPackages") | tag!("sendpack")) + => { |_| Command::SendInstalledPackages(Vec::new()) } + | alt_complete!(tag!("SendInstalledSoftware") | tag!("sendinst")) + => { |_| Command::SendInstalledSoftware(InstalledSoftware::default()) } + | alt_complete!(tag!("SendUpdateReport") | tag!("sendup")) + => { |_| Command::SendUpdateReport(UpdateReport::default()) } + | alt_complete!(tag!("StartDownload") | tag!("dl")) + => { |_| Command::StartDownload(Vec::new()) } + | alt_complete!(tag!("StartInstall") | tag!("inst")) + => { |_| Command::StartInstall(DownloadComplete::default()) } + ) + ~ args: arguments + ~ alt!(eof | tag!("\r") | tag!("\n") | tag!(";")), + move || { (cmd, args) } +)); + +named!(arguments <&[u8], Vec<&str> >, chain!( + args: many0!(chain!( + space? + ~ text: map_res!(is_not!(" \t\r\n;"), str::from_utf8) + ~ space?, + || { text } + )), + move || { + args.into_iter() + .filter(|arg| arg.len() > 0) + .collect() + } +)); + +fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { + match cmd { + Command::Authenticate(_) => match args.len() { + 0 => Ok(Command::Authenticate(None)), + 1 => Err(Error::Command("usage: auth <client-id> <client-secret>".to_string())), + 2 => Ok(Command::Authenticate(Some(ClientCredentials { + client_id: ClientId(args[0].to_string()), + client_secret: ClientSecret(args[1].to_string())}))), + _ => Err(Error::Command(format!("unexpected Authenticate args: {:?}", args))), + }, + + Command::GetNewUpdates => match args.len() { + 0 => Ok(Command::GetNewUpdates), + _ => Err(Error::Command(format!("unexpected GetNewUpdates args: {:?}", args))), + }, + + Command::ListInstalledPackages => match args.len() { + 0 => Ok(Command::ListInstalledPackages), + _ => Err(Error::Command(format!("unexpected ListInstalledPackages args: {:?}", args))), + }, + + Command::RefreshSystemInfo(_) => match args.len() { + 0 => Ok(Command::RefreshSystemInfo(false)), + 1 => { + if let Ok(b) = args[0].parse::<bool>() { + Ok(Command::RefreshSystemInfo(b)) + } else { + Err(Error::Command("couldn't parse 1st argument as boolean".to_string())) + } + } + _ => Err(Error::Command(format!("unexpected RefreshSystemInfo args: {:?}", args))), + }, + + Command::SendInstalledPackages(_) => match args.len() { + 0 | 1 => Err(Error::Command("usage: sendpack (<name> <version> )+".to_string())), + n if n % 2 == 0 => { + let (names, versions): (Vec<(_, &str)>, Vec<(_, &str)>) = + args.into_iter().enumerate().partition(|&(n, _)| n % 2 == 0); + let packages = names.into_iter().zip(versions.into_iter()) + .map(|((_, name), (_, version))| Package { + name: name.to_string(), + version: version.to_string() + }).collect::<Vec<Package>>(); + Ok(Command::SendInstalledPackages(packages)) + } + _ => Err(Error::Command(format!("SendInstalledPackages expects an even number of 'name version' pairs"))), + }, + + Command::SendInstalledSoftware(_) => match args.len() { + // FIXME(PRO-1160): args + _ => Err(Error::Command(format!("unexpected SendInstalledSoftware args: {:?}", args))), + }, + + Command::SendUpdateReport(_) => match args.len() { + 0 | 1 => Err(Error::Command("usage: sendup <update-id> <result-code>".to_string())), + 2 => { + if let Ok(code) = args[1].parse::<UpdateResultCode>() { + Ok(Command::SendUpdateReport(UpdateReport::single(args[0].to_string(), code, "".to_string()))) + } else { + Err(Error::Command("couldn't parse 2nd argument as an UpdateResultCode".to_string())) + } + } + _ => Err(Error::Command(format!("unexpected SendUpdateReport args: {:?}", args))), + }, + + Command::Shutdown => match args.len() { + 0 => Ok(Command::Shutdown), + _ => Err(Error::Command(format!("unexpected Shutdown args: {:?}", args))), + }, + + Command::StartDownload(_) => match args.len() { + 0 => Err(Error::Command("usage: dl [<id>]".to_string())), + _ => Ok(Command::StartDownload(args.iter().map(|arg| String::from(*arg)).collect())), + }, + + Command::StartInstall(_) => match args.len() { + // FIXME(PRO-1160): args + _ => Err(Error::Command(format!("unexpected StartInstall args: {:?}", args))), + }, + + } +} + + +#[cfg(test)] +mod tests { + use super::{command, arguments}; + use datatype::{Command, ClientCredentials, ClientId, ClientSecret, Package, + UpdateReport, UpdateResultCode}; + use nom::IResult; + + + #[test] + fn parse_command_test() { + assert_eq!(command(&b"auth foo bar"[..]), + IResult::Done(&b""[..], (Command::Authenticate(None), vec!["foo", "bar"]))); + assert_eq!(command(&b"dl 1"[..]), + IResult::Done(&b""[..], (Command::StartDownload(Vec::new()), vec!["1"]))); + assert_eq!(command(&b"ls;\n"[..]), + IResult::Done(&b"\n"[..], (Command::ListInstalledPackages, Vec::new()))); + } + + #[test] + fn parse_arguments_test() { + assert_eq!(arguments(&b"one"[..]), IResult::Done(&b""[..], vec!["one"])); + assert_eq!(arguments(&b"foo bar"[..]), IResult::Done(&b""[..], vec!["foo", "bar"])); + assert_eq!(arguments(&b"n=5"[..]), IResult::Done(&b""[..], vec!["n=5"])); + assert_eq!(arguments(&b""[..]), IResult::Done(&b""[..], Vec::new())); + assert_eq!(arguments(&b" \t some"[..]), IResult::Done(&b""[..], vec!["some"])); + assert_eq!(arguments(&b";"[..]), IResult::Done(&b";"[..], Vec::new())); + } + + + #[test] + fn authenticate_test() { + assert_eq!("Authenticate".parse::<Command>().unwrap(), Command::Authenticate(None)); + assert_eq!("auth".parse::<Command>().unwrap(), Command::Authenticate(None)); + assert_eq!("auth user pass".parse::<Command>().unwrap(), + Command::Authenticate(Some(ClientCredentials { + client_id: ClientId("user".to_string()), + client_secret: ClientSecret("pass".to_string()), + }))); + assert!("auth one".parse::<Command>().is_err()); + assert!("auth one two three".parse::<Command>().is_err()); + } + + #[test] + fn get_new_updates_test() { + assert_eq!("GetNewUpdates".parse::<Command>().unwrap(), Command::GetNewUpdates); + assert_eq!("new".parse::<Command>().unwrap(), Command::GetNewUpdates); + assert!("new old".parse::<Command>().is_err()); + } + + #[test] + fn list_installed_test() { + assert_eq!("ListInstalledPackages".parse::<Command>().unwrap(), Command::ListInstalledPackages); + assert_eq!("ls".parse::<Command>().unwrap(), Command::ListInstalledPackages); + assert!("ls some".parse::<Command>().is_err()); + } + + #[test] + fn refresh_system_info_test() { + assert_eq!("RefreshSystemInfo true".parse::<Command>().unwrap(), Command::RefreshSystemInfo(true)); + assert_eq!("info".parse::<Command>().unwrap(), Command::RefreshSystemInfo(false)); + assert!("RefreshSystemInfo 1 2".parse::<Command>().is_err()); + assert!("info please".parse::<Command>().is_err()); + } + + #[test] + fn send_installed_packages_test() { + assert_eq!("SendInstalledPackages myname myversion".parse::<Command>().unwrap(), + Command::SendInstalledPackages(vec![Package { + name: "myname".to_string(), + version: "myversion".to_string() + }])); + assert_eq!("sendpack n1 v1 n2 v2".parse::<Command>().unwrap(), + Command::SendInstalledPackages(vec![Package { + name: "n1".to_string(), + version: "v1".to_string() + }, Package { + name: "n2".to_string(), + version: "v2".to_string() + }])); + assert!("SendInstalledPackages some".parse::<Command>().is_err()); + assert!("sendpack 1 2 3".parse::<Command>().is_err()); + } + + #[test] + fn send_installed_software_test() { + assert!("SendInstalledSoftware".parse::<Command>().is_err()); + assert!("sendsoft some".parse::<Command>().is_err()); + } + + #[test] + fn send_update_report_test() { + assert_eq!("SendUpdateReport myid OK".parse::<Command>().unwrap(), Command::SendUpdateReport( + UpdateReport::single("myid".to_string(), UpdateResultCode::OK, "".to_string()))); + assert_eq!("sendup myid 19".parse::<Command>().unwrap(), Command::SendUpdateReport( + UpdateReport::single("myid".to_string(), UpdateResultCode::GENERAL_ERROR, "".to_string()))); + assert!("sendup myid 20".parse::<Command>().is_err()); + assert!("SendInstalledPackages".parse::<Command>().is_err()); + assert!("sendup 1 2 3".parse::<Command>().is_err()); + } + + #[test] + fn shutdown_test() { + assert_eq!("Shutdown".parse::<Command>().unwrap(), Command::Shutdown); + assert_eq!("shutdown".parse::<Command>().unwrap(), Command::Shutdown); + assert!("Shutdown 1 2".parse::<Command>().is_err()); + assert!("shutdown now".parse::<Command>().is_err()); + } + + #[test] + fn start_download_test() { + assert_eq!("StartDownload this".parse::<Command>().unwrap(), + Command::StartDownload(vec!["this".to_string()])); + assert_eq!("dl some more".parse::<Command>().unwrap(), + Command::StartDownload(vec!["some".to_string(), "more".to_string()])); + assert!("dl".parse::<Command>().is_err()); + } + + #[test] + fn start_install_test() { + assert!("StartInstall".parse::<Command>().is_err()); + assert!("inst more than one".parse::<Command>().is_err()); + } +} diff --git a/src/datatype/config.rs b/src/datatype/config.rs new file mode 100644 index 0000000..5db0f12 --- /dev/null +++ b/src/datatype/config.rs @@ -0,0 +1,389 @@ +use rustc_serialize::Decodable; +use std::fs; +use std::fs::File; +use std::io::ErrorKind; +use std::os::unix::fs::PermissionsExt; +use std::io::prelude::*; +use std::path::Path; +use toml; +use toml::{Decoder, Parser, Table, Value}; + +use datatype::{Error, SystemInfo, Url}; +use package_manager::PackageManager; + + +/// An aggregation of all the configuration options parsed at startup. +#[derive(Default, PartialEq, Eq, Debug, Clone)] +pub struct Config { + pub auth: Option<AuthConfig>, + pub core: CoreConfig, + pub dbus: Option<DBusConfig>, + pub device: DeviceConfig, + pub gateway: GatewayConfig, + pub network: NetworkConfig, + pub rvi: Option<RviConfig>, +} + +impl Config { + pub fn load(path: &str) -> Result<Config, Error> { + info!("Loading config file: {}", path); + let mut file = try!(File::open(path).map_err(Error::Io)); + let mut toml = String::new(); + try!(file.read_to_string(&mut toml)); + Config::parse(&toml) + } + + pub fn parse(toml: &str) -> Result<Config, Error> { + let table = try!(parse_table(&toml)); + + let auth_cfg = if let Some(auth) = table.get("auth") { + let parsed = try!(decode_section(auth.clone())); + Some(try!(bootstrap_credentials(parsed))) + } else { + None + }; + + let dbus_cfg = if let Some(dbus) = table.get("dbus") { + Some(try!(decode_section(dbus.clone()))) + } else { + None + }; + + let rvi_cfg = if let Some(rvi) = table.get("rvi") { + Some(try!(decode_section(rvi.clone()))) + } else { + None + }; + + Ok(Config { + auth: auth_cfg, + core: try!(read_section(&table, "core")), + dbus: dbus_cfg, + device: try!(read_section(&table, "device")), + gateway: try!(read_section(&table, "gateway")), + network: try!(read_section(&table, "network")), + rvi: rvi_cfg, + }) + } +} + +fn parse_table(toml: &str) -> Result<Table, Error> { + let mut parser = Parser::new(toml); + Ok(try!(parser.parse().ok_or_else(move || parser.errors))) +} + +fn read_section<T: Decodable>(table: &Table, section: &str) -> Result<T, Error> { + let part = try!(table.get(section) + .ok_or_else(|| Error::Parse(format!("invalid section: {}", section)))); + decode_section(part.clone()) +} + +fn decode_section<T: Decodable>(section: Value) -> Result<T, Error> { + let mut decoder = Decoder::new(section); + Ok(try!(T::decode(&mut decoder))) +} + + +#[derive(RustcEncodable, RustcDecodable)] +struct CredentialsFile { + pub client_id: String, + pub client_secret: String, +} + +// Read AuthConfig values from the credentials file if it exists, or write the +// current AuthConfig values to a new credentials file otherwise. +fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> { + let creds = auth_cfg.credentials_file.clone(); + let path = Path::new(&creds); + debug!("bootstrap_credentials: {:?}", path); + + let credentials = match File::open(path) { + Ok(mut file) => { + let mut text = String::new(); + try!(file.read_to_string(&mut text)); + let table = try!(parse_table(&text)); + try!(read_section::<CredentialsFile>(&table, "auth")) + } + + Err(ref err) if err.kind() == ErrorKind::NotFound => { + let mut table = Table::new(); + let credentials = CredentialsFile { + client_id: auth_cfg.client_id, + client_secret: auth_cfg.client_secret + }; + table.insert("auth".to_string(), toml::encode(&credentials)); + + let dir = try!(path.parent().ok_or(Error::Parse("Invalid credentials file path".to_string()))); + try!(fs::create_dir_all(&dir)); + let mut file = try!(File::create(path)); + let mut perms = try!(file.metadata()).permissions(); + perms.set_mode(0o600); + try!(fs::set_permissions(path, perms)); + try!(file.write_all(&toml::encode_str(&table).into_bytes())); + + credentials + } + + Err(err) => return Err(Error::Io(err)) + }; + + Ok(AuthConfig { + server: auth_cfg.server, + client_id: credentials.client_id, + client_secret: credentials.client_secret, + credentials_file: auth_cfg.credentials_file, + }) +} + + +/// A parsed representation of the [auth] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct AuthConfig { + pub server: Url, + pub client_id: String, + pub client_secret: String, + pub credentials_file: String, +} + +impl Default for AuthConfig { + fn default() -> AuthConfig { + AuthConfig { + server: "http://127.0.0.1:9001".parse().unwrap(), + client_id: "client-id".to_string(), + client_secret: "client-secret".to_string(), + credentials_file: "/tmp/sota_credentials.toml".to_string(), + } + } +} + + +/// A parsed representation of the [core] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct CoreConfig { + pub server: Url +} + +impl Default for CoreConfig { + fn default() -> CoreConfig { + CoreConfig { + server: "http://127.0.0.1:8080".parse().unwrap() + } + } +} + + +/// A parsed representation of the [dbus] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct DBusConfig { + pub name: String, + pub path: String, + pub interface: String, + pub software_manager: String, + pub software_manager_path: String, + pub timeout: i32, // dbus-rs expects a signed int +} + +impl Default for DBusConfig { + fn default() -> DBusConfig { + DBusConfig { + name: "org.genivi.SotaClient".to_string(), + path: "/org/genivi/SotaClient".to_string(), + interface: "org.genivi.SotaClient".to_string(), + software_manager: "org.genivi.SoftwareLoadingManager".to_string(), + software_manager_path: "/org/genivi/SoftwareLoadingManager".to_string(), + timeout: 60 + } + } +} + + +/// A parsed representation of the [device] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct DeviceConfig { + pub uuid: String, + pub vin: String, + pub packages_dir: String, + pub package_manager: PackageManager, + pub system_info: SystemInfo, + pub polling_interval: u64, + pub certificates_path: String, +} + +impl Default for DeviceConfig { + fn default() -> DeviceConfig { + DeviceConfig { + uuid: "123e4567-e89b-12d3-a456-426655440000".to_string(), + vin: "V1234567890123456".to_string(), + packages_dir: "/tmp/".to_string(), + package_manager: PackageManager::Deb, + system_info: SystemInfo::default(), + polling_interval: 10, + certificates_path: "/tmp/sota_certificates".to_string() + } + } +} + + +/// A parsed representation of the [gateway] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct GatewayConfig { + pub console: bool, + pub dbus: bool, + pub http: bool, + pub rvi: bool, + pub socket: bool, + pub websocket: bool, +} + +impl Default for GatewayConfig { + fn default() -> GatewayConfig { + GatewayConfig { + console: false, + dbus: false, + http: false, + rvi: false, + socket: false, + websocket: true, + } + } +} + + +/// A parsed representation of the [network] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct NetworkConfig { + pub http_server: String, + pub rvi_edge_server: String, + pub socket_commands_path: String, + pub socket_events_path: String, + pub websocket_server: String +} + +impl Default for NetworkConfig { + fn default() -> NetworkConfig { + NetworkConfig { + http_server: "http://127.0.0.1:8888".to_string(), + rvi_edge_server: "http://127.0.0.1:9080".to_string(), + socket_commands_path: "/tmp/sota-commands.socket".to_string(), + socket_events_path: "/tmp/sota-events.socket".to_string(), + websocket_server: "ws://127.0.0.1:3012".to_string() + } + } +} + + +/// A parsed representation of the [rvi] configuration section. +#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] +pub struct RviConfig { + pub client: Url, + pub storage_dir: String, + pub timeout: Option<i64>, +} + +impl Default for RviConfig { + fn default() -> RviConfig { + RviConfig { + client: "http://127.0.0.1:8901".parse().unwrap(), + storage_dir: "/var/sota".to_string(), + timeout: Some(20), + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + + const AUTH_CONFIG: &'static str = + r#" + [auth] + server = "http://127.0.0.1:9001" + client_id = "client-id" + client_secret = "client-secret" + credentials_file = "/tmp/sota_credentials.toml" + "#; + + const CORE_CONFIG: &'static str = + r#" + [core] + server = "http://127.0.0.1:8080" + "#; + + const DBUS_CONFIG: &'static str = + r#" + [dbus] + name = "org.genivi.SotaClient" + path = "/org/genivi/SotaClient" + interface = "org.genivi.SotaClient" + software_manager = "org.genivi.SoftwareLoadingManager" + software_manager_path = "/org/genivi/SoftwareLoadingManager" + timeout = 60 + "#; + + const DEVICE_CONFIG: &'static str = + r#" + [device] + uuid = "123e4567-e89b-12d3-a456-426655440000" + vin = "V1234567890123456" + system_info = "system_info.sh" + polling_interval = 10 + packages_dir = "/tmp/" + package_manager = "deb" + certificates_path = "/tmp/sota_certificates" + "#; + + const GATEWAY_CONFIG: &'static str = + r#" + [gateway] + console = false + dbus = false + http = false + rvi = false + socket = false + websocket = true + "#; + + const NETWORK_CONFIG: &'static str = + r#" + [network] + http_server = "http://127.0.0.1:8888" + rvi_edge_server = "http://127.0.0.1:9080" + socket_commands_path = "/tmp/sota-commands.socket" + socket_events_path = "/tmp/sota-events.socket" + websocket_server = "ws://127.0.0.1:3012" + "#; + + const RVI_CONFIG: &'static str = + r#" + [rvi] + client = "http://127.0.0.1:8901" + storage_dir = "/var/sota" + timeout = 20 + "#; + + + #[test] + fn parse_default_config() { + let config = String::new() + + CORE_CONFIG + + DEVICE_CONFIG + + GATEWAY_CONFIG + + NETWORK_CONFIG; + assert_eq!(Config::parse(&config).unwrap(), Config::default()); + } + + #[test] + fn parse_example_config() { + let config = String::new() + + AUTH_CONFIG + + CORE_CONFIG + + DBUS_CONFIG + + DEVICE_CONFIG + + GATEWAY_CONFIG + + NETWORK_CONFIG + + RVI_CONFIG; + assert_eq!(Config::load("tests/sota.toml").unwrap(), Config::parse(&config).unwrap()); + } +} diff --git a/src/datatype/dbus.rs b/src/datatype/dbus.rs new file mode 100644 index 0000000..a038568 --- /dev/null +++ b/src/datatype/dbus.rs @@ -0,0 +1,81 @@ +use dbus::{FromMessageItem, MessageItem}; +use toml::{decode, Table, Value}; + +use datatype::update_report::{InstalledFirmware, InstalledPackage, OperationResult}; + + +static MISSING_ARG: &'static str = "Error.MissingArgument"; +static MALFORMED_ARG: &'static str = "Error.MalformedArgument"; + +/// Format a `DBus` error message indicating a missing argument. +pub fn missing_arg() -> (&'static str, String) { + (MISSING_ARG, "Missing argument".to_string()) +} + +/// Format a `DBus` error message indicating a malformed argument. +pub fn malformed_arg() -> (&'static str, String) { + (MALFORMED_ARG, "Malformed argument".to_string()) +} + + +struct DecodedValue(pub Value); + +impl<'m> FromMessageItem<'m> for DecodedValue { + fn from(m: &'m MessageItem) -> Result<Self, ()> { + match *m { + MessageItem::Str(ref b) => Ok(DecodedValue(Value::String(b.clone()))), + MessageItem::Bool(ref b) => Ok(DecodedValue(Value::Boolean(*b))), + MessageItem::Byte(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::Int16(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::Int32(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::Int64(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::UInt16(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::UInt32(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::UInt64(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))), + MessageItem::Variant(ref b) => FromMessageItem::from(&**b), + _ => Err(()) + } + } +} + + +struct DecodedStruct(pub Value); + +impl<'m> FromMessageItem<'m> for DecodedStruct { + fn from(item: &'m MessageItem) -> Result<Self, ()> { + let items: &Vec<MessageItem> = try!(FromMessageItem::from(item)); + items.iter().map(|entry| { + let entry: Result<(&MessageItem, &MessageItem), ()> = FromMessageItem::from(entry); + entry.and_then(|(key, val)| { + let key: Result<&String,()> = FromMessageItem::from(key); + key.and_then(|key| { + let val: Result<DecodedValue,()> = FromMessageItem::from(val); + val.map(|val| (key.clone(), val.0)) + }) + }) + }).collect::<Result<Vec<(_, _)>, ()>>() + .map(|arr| DecodedStruct(Value::Table(arr.into_iter().collect::<Table>()))) + } +} + + +impl<'m> FromMessageItem<'m> for OperationResult { + fn from(item: &'m MessageItem) -> Result<Self, ()> { + let item: DecodedStruct = try!(FromMessageItem::from(item)); + decode::<OperationResult>(item.0).ok_or(()) + } +} + +impl<'m> FromMessageItem<'m> for InstalledPackage { + fn from(item: &'m MessageItem) -> Result<Self, ()> { + let item: DecodedStruct = try!(FromMessageItem::from(item)); + decode::<InstalledPackage>(item.0).ok_or(()) + } +} + +impl<'m> FromMessageItem<'m> for InstalledFirmware { + fn from(item: &'m MessageItem) -> Result<Self, ()> { + let item: DecodedStruct = try!(FromMessageItem::from(item)); + decode::<InstalledFirmware>(item.0).ok_or(()) + } +} diff --git a/src/datatype/error.rs b/src/datatype/error.rs new file mode 100644 index 0000000..8267234 --- /dev/null +++ b/src/datatype/error.rs @@ -0,0 +1,119 @@ +use hyper::error::Error as HyperError; +use hyper::client::ClientError as HyperClientError; +use rustc_serialize::json::{EncoderError as JsonEncoderError, + DecoderError as JsonDecoderError, + ParserError as JsonParserError}; +use std::convert::From; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::io::Error as IoError; +use std::string::FromUtf8Error; +use std::sync::PoisonError; +use std::sync::mpsc::{SendError, RecvError}; +use toml::{ParserError as TomlParserError, DecodeError as TomlDecodeError}; +use url::ParseError as UrlParseError; + +use datatype::Event; +use http::auth_client::AuthHandler; +use gateway::Interpret; +use ws::Error as WebsocketError; + + +/// System-wide errors that are returned from `Result` type failures. +#[derive(Debug)] +pub enum Error { + Authorization(String), + Client(String), + Command(String), + FromUtf8(FromUtf8Error), + Hyper(HyperError), + HyperClient(HyperClientError<AuthHandler>), + Io(IoError), + JsonDecoder(JsonDecoderError), + JsonEncoder(JsonEncoderError), + JsonParser(JsonParserError), + Poison(String), + Package(String), + Parse(String), + Recv(RecvError), + SendEvent(SendError<Event>), + SendInterpret(SendError<Interpret>), + Socket(String), + SystemInfo(String), + TomlParser(Vec<TomlParserError>), + TomlDecode(TomlDecodeError), + UrlParse(UrlParseError), + Websocket(WebsocketError), +} + +impl<E> From<PoisonError<E>> for Error { + fn from(e: PoisonError<E>) -> Error { + Error::Poison(format!("{}", e)) + } +} + +macro_rules! derive_from { + ([ $( $from: ident => $to: ident ),* ]) => { + $(impl From<$from> for Error { + fn from(e: $from) -> Error { + Error::$to(e) + } + })* + }; + + ([ $( $error: ident < $ty: ty > => $to: ident),* ]) => { + $(impl From<$error<$ty>> for Error { + fn from(e: $error<$ty>) -> Error { + Error::$to(e) + } + })* + } +} + +derive_from!([ + FromUtf8Error => FromUtf8, + HyperError => Hyper, + IoError => Io, + JsonEncoderError => JsonEncoder, + JsonDecoderError => JsonDecoder, + RecvError => Recv, + TomlDecodeError => TomlDecode, + UrlParseError => UrlParse, + WebsocketError => Websocket +]); + +derive_from!([ + HyperClientError<AuthHandler> => HyperClient, + SendError<Event> => SendEvent, + SendError<Interpret> => SendInterpret, + Vec<TomlParserError> => TomlParser +]); + +impl Display for Error { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + let inner: String = match *self { + Error::Client(ref s) => format!("Http client error: {}", s.clone()), + Error::Authorization(ref s) => format!("Http client authorization error: {}", s.clone()), + Error::Command(ref e) => format!("Unknown Command: {}", e.clone()), + Error::FromUtf8(ref e) => format!("From utf8 error: {}", e.clone()), + Error::Hyper(ref e) => format!("Hyper error: {}", e.clone()), + Error::HyperClient(ref e) => format!("Hyper client error: {}", e.clone()), + Error::Io(ref e) => format!("IO error: {}", e.clone()), + Error::JsonDecoder(ref e) => format!("Failed to decode JSON: {}", e.clone()), + Error::JsonEncoder(ref e) => format!("Failed to encode JSON: {}", e.clone()), + Error::JsonParser(ref e) => format!("Failed to parse JSON: {}", e.clone()), + Error::Poison(ref e) => format!("Poison error: {}", e.clone()), + Error::Package(ref s) => format!("Package error: {}", s.clone()), + Error::Parse(ref s) => format!("Parse error: {}", s.clone()), + Error::Recv(ref s) => format!("Recv error: {}", s.clone()), + Error::SendEvent(ref s) => format!("Send error for Event: {}", s.clone()), + Error::SendInterpret(ref s) => format!("Send error for Interpret: {}", s.clone()), + Error::Socket(ref s) => format!("Unix Domain Socket error: {}", s.clone()), + Error::SystemInfo(ref s) => format!("System info error: {}", s.clone()), + Error::TomlDecode(ref e) => format!("Toml decode error: {}", e.clone()), + Error::TomlParser(ref e) => format!("Toml parser errors: {:?}", e.clone()), + Error::UrlParse(ref s) => format!("Url parse error: {}", s.clone()), + Error::Websocket(ref e) => format!("Websocket Error: {:?}", e.clone()), + }; + write!(f, "{}", inner) + } +} diff --git a/src/datatype/event.rs b/src/datatype/event.rs new file mode 100644 index 0000000..e3f84ca --- /dev/null +++ b/src/datatype/event.rs @@ -0,0 +1,56 @@ +use std::fmt::{Display, Formatter, Result as FmtResult}; + +use datatype::{DownloadComplete, Package, UpdateAvailable, UpdateReport, + UpdateRequestId}; + + +/// System-wide events that are broadcast to all interested parties. +#[derive(RustcEncodable, RustcDecodable, Debug, Clone, PartialEq, Eq)] +pub enum Event { + /// General error event with a printable representation for debugging. + Error(String), + + /// Authentication was successful. + Authenticated, + /// An operation failed because we are not currently authenticated. + NotAuthenticated, + + /// There are new updates available. + NewUpdatesReceived(Vec<UpdateRequestId>), + /// A notification from RVI of a new update. + NewUpdateAvailable(UpdateAvailable), + /// There are no new updates available. + NoNewUpdates, + + /// The following packages are installed on the device. + FoundInstalledPackages(Vec<Package>), + /// An update on the system information was received. + FoundSystemInfo(String), + /// A list of installed packages was sent to the Core server. + InstalledPackagesSent, + /// An update report was sent to the Core server. + UpdateReportSent, + + /// Downloading an update. + DownloadingUpdate(UpdateRequestId), + /// An update was downloaded. + DownloadComplete(DownloadComplete), + /// Downloading an update failed. + DownloadFailed(UpdateRequestId, String), + + /// Installing an update. + InstallingUpdate(UpdateRequestId), + /// An update was installed. + InstallComplete(UpdateReport), + /// The installation of an update failed. + InstallFailed(UpdateReport), + + /// A broadcast event requesting an update on externally installed software. + InstalledSoftwareNeeded, +} + +impl Display for Event { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{:?}", self) + } +} diff --git a/src/datatype/json_rpc.rs b/src/datatype/json_rpc.rs new file mode 100644 index 0000000..3eed9a2 --- /dev/null +++ b/src/datatype/json_rpc.rs @@ -0,0 +1,107 @@ +use rustc_serialize::{json, Decodable, Encodable}; +use time; + +use http::{AuthClient, Client}; +use super::Url; + + +/// Encode the body of a JSON-RPC call. +#[derive(RustcDecodable, RustcEncodable)] +pub struct RpcRequest<E: Encodable> { + pub jsonrpc: String, + pub id: u64, + pub method: String, + pub params: E +} + +impl<E: Encodable> RpcRequest<E> { + /// Instantiate a new `RpcRequest` with the default version (2.0) and an id + /// generated from the current time. + pub fn new(method: &str, params: E) -> RpcRequest<E> { + RpcRequest { + jsonrpc: "2.0".to_string(), + id: time::precise_time_ns(), + method: method.to_string(), + params: params + } + } + + /// Send a JSON-RPC POST request to the specified URL. + pub fn send(&self, url: Url) -> Result<String, String> { + let client = AuthClient::default(); + let body = json::encode(self).expect("couldn't encode RpcRequest"); + let resp_rx = client.post(url, Some(body.into_bytes())); + let resp = resp_rx.recv().expect("no RpcRequest response received"); + let data = try!(resp.map_err(|err| format!("{}", err))); + String::from_utf8(data).map_err(|err| format!("{}", err)) + } +} + + +/// Encapsulates a successful JSON-RPC response. +#[derive(RustcDecodable, RustcEncodable)] +pub struct RpcOk<D: Decodable> { + pub jsonrpc: String, + pub id: u64, + pub result: Option<D> +} + +impl<D: Decodable> RpcOk<D> { + /// Instantiate a new successful JSON-RPC response type. + pub fn new(id: u64, result: Option<D>) -> RpcOk<D> { + RpcOk { + jsonrpc: "2.0".to_string(), + id: id, + result: result + } + } +} + + +/// The error code as [specified by jsonrpc](http://www.jsonrpc.org/specification#error_object). +#[derive(RustcDecodable, RustcEncodable)] +pub struct ErrorCode { + pub code: i32, + pub message: String, + pub data: String +} + +/// Encapsulates a failed JSON-RPC response. +#[derive(RustcDecodable, RustcEncodable)] +pub struct RpcErr { + pub jsonrpc: String, + pub id: u64, + pub error: ErrorCode +} + +impl RpcErr { + /// Instantiate a new `RpcErr` type with the default JSON-RPC version (2.0). + pub fn new(id: u64, error: ErrorCode) -> Self { + RpcErr { jsonrpc: "2.0".to_string(), id: id, error: error } + } + + /// Create a new `RpcErr` with a reason of "Invalid Request". + pub fn invalid_request(id: u64, data: String) -> Self { + Self::new(id, ErrorCode { code: -32600, message: "Invalid Request".to_string(), data: data }) + } + + /// Create a new `RpcErr` with a reason of "Method not found". + pub fn method_not_found(id: u64, data: String) -> Self { + Self::new(id, ErrorCode { code: -32601, message: "Method not found".to_string(), data: data }) + } + + /// Create a new `RpcErr` with a reason of "Parse error". + pub fn parse_error(data: String) -> Self { + Self::new(0, ErrorCode { code: -32700, message: "Parse error".to_string(), data: data }) + } + + /// Create a new `RpcErr` with a reason of "Invalid params". + pub fn invalid_params(id: u64, data: String) -> Self { + Self::new(id, ErrorCode { code: -32602, message: "Invalid params".to_string(), data: data }) + } + + /// Create a new `RpcErr` with a reason of "Couldn't handle request". + pub fn unspecified(id: u64, data: String) -> Self { + Self::new(id, ErrorCode { code: -32100, message: "Couldn't handle request".to_string(), data: data }) + } +} diff --git a/src/datatype/mod.rs b/src/datatype/mod.rs new file mode 100644 index 0000000..8a9ca4e --- /dev/null +++ b/src/datatype/mod.rs @@ -0,0 +1,26 @@ +pub mod auth; +pub mod command; +pub mod config; +pub mod dbus; +pub mod error; +pub mod event; +pub mod json_rpc; +pub mod package; +pub mod system_info; +pub mod update_report; +pub mod url; + +pub use self::auth::{AccessToken, Auth, ClientId, ClientSecret, ClientCredentials}; +pub use self::command::Command; +pub use self::config::{AuthConfig, CoreConfig, Config, DBusConfig, DeviceConfig, + GatewayConfig, RviConfig}; +pub use self::error::Error; +pub use self::event::Event; +pub use self::json_rpc::{RpcRequest, RpcOk, RpcErr}; +pub use self::package::{ChunkReceived, DownloadStarted, DownloadComplete, Package, + PendingUpdateRequest, UpdateAvailable, UpdateRequestId}; +pub use self::system_info::SystemInfo; +pub use self::update_report::{DeviceReport, InstalledFirmware, InstalledPackage, + InstalledSoftware, OperationResult, UpdateResultCode, + UpdateReport}; +pub use self::url::{Method, Url}; diff --git a/src/datatype/package.rs b/src/datatype/package.rs new file mode 100644 index 0000000..146ff06 --- /dev/null +++ b/src/datatype/package.rs @@ -0,0 +1,79 @@ +use std::fmt::{Display, Formatter, Result as FmtResult}; + +use rvi::services::LocalServices; + + +/// Encapsulate a `String` type used to represent the `Package` version. +pub type Version = String; + +/// Encodes the name and version of a specific package. +#[derive(Debug, PartialEq, Eq, RustcEncodable, RustcDecodable, Clone)] +pub struct Package { + pub name: String, + pub version: Version +} + +impl Display for Package { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{} {}", self.name, self.version) + } +} + + +/// Encapsulate a `String` type as the id of a specific update request. +pub type UpdateRequestId = String; + +/// A single pending update request to be installed by the client. +#[allow(non_snake_case)] +#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] +pub struct PendingUpdateRequest { + pub requestId: UpdateRequestId, + pub installPos: i32, + pub packageId: Package, + pub createdAt: String +} + +/// A notification from RVI that a new update is available. +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] +pub struct UpdateAvailable { + pub update_id: String, + pub signature: String, + pub description: String, + pub request_confirmation: bool, + pub size: u64 +} + +/// A JSON-RPC request type to notify RVI that a new package download has started. +#[derive(RustcDecodable, RustcEncodable)] +pub struct DownloadStarted { + pub device: String, + pub update_id: UpdateRequestId, + pub services: LocalServices, +} + +/// A JSON-RPC request type to notify RVI that a new package chunk was received. +#[derive(RustcDecodable, RustcEncodable)] +pub struct ChunkReceived { + pub device: String, + pub update_id: UpdateRequestId, + pub chunks: Vec<u64>, +} + +/// A notification to indicate to any external package manager that the package +/// download has successfully completed. +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] +pub struct DownloadComplete { + pub update_id: String, + pub update_image: String, + pub signature: String +} + +impl Default for DownloadComplete { + fn default() -> Self { + DownloadComplete { + update_id: "".to_string(), + update_image: "".to_string(), + signature: "".to_string() + } + } +} diff --git a/src/datatype/system_info.rs b/src/datatype/system_info.rs new file mode 100644 index 0000000..2d8fff2 --- /dev/null +++ b/src/datatype/system_info.rs @@ -0,0 +1,46 @@ +use rustc_serialize::{Decoder, Decodable}; +use std::process::Command; +use std::str::FromStr; + +use datatype::Error; + + +/// A reference to the command that will report on the system information. +#[derive(PartialEq, Eq, Debug, Clone)] +pub struct SystemInfo { + command: String +} + +impl SystemInfo { + /// Instantiate a new type to report on the system information. + pub fn new(command: String) -> SystemInfo { + SystemInfo { command: command } + } + + /// Generate a new report of the system information. + pub fn report(&self) -> Result<String, Error> { + Command::new(&self.command) + .output().map_err(|err| Error::SystemInfo(err.to_string())) + .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8)) + } +} + +impl Default for SystemInfo { + fn default() -> SystemInfo { + SystemInfo::new("system_info.sh".to_string()) + } +} + +impl FromStr for SystemInfo { + type Err = Error; + + fn from_str(s: &str) -> Result<SystemInfo, Error> { + Ok(SystemInfo::new(s.to_string())) + } +} + +impl Decodable for SystemInfo { + fn decode<D: Decoder>(d: &mut D) -> Result<SystemInfo, D::Error> { + d.read_str().and_then(|s| Ok(s.parse::<SystemInfo>().unwrap())) + } +} diff --git a/src/datatype/update_report.rs b/src/datatype/update_report.rs new file mode 100644 index 0000000..f81fbc4 --- /dev/null +++ b/src/datatype/update_report.rs @@ -0,0 +1,182 @@ +use rustc_serialize::{Encodable, Encoder}; +use std::str::FromStr; + +use datatype::{Error, UpdateRequestId}; + + +/// An encodable report of the installation outcome. +#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)] +pub struct UpdateReport { + pub update_id: UpdateRequestId, + pub operation_results: Vec<OperationResult> +} + +impl UpdateReport { + /// Instantiate a new report with a vector of installation outcomes. + pub fn new(update_id: String, results: Vec<OperationResult>) -> UpdateReport { + UpdateReport { update_id: update_id, operation_results: results } + } + + /// Instantiate a new report with a single installation outcome. + pub fn single(update_id: UpdateRequestId, result_code: UpdateResultCode, result_text: String) -> UpdateReport { + let result = OperationResult { + id: update_id.clone(), + result_code: result_code, + result_text: result_text + }; + UpdateReport { update_id: update_id, operation_results: vec![result] } + } +} + +impl Default for UpdateReport { + fn default() -> Self { + UpdateReport { update_id: "".to_string(), operation_results: Vec::new() } + } +} + + +/// Bind the installation outcome report to a specific device. +#[derive(RustcEncodable, Clone, Debug)] +pub struct DeviceReport<'d, 'r> { + pub device: &'d str, + pub update_report: &'r UpdateReport +} + +impl<'d, 'r> DeviceReport<'d, 'r> { + /// Instantiate a new installation outcome report for a specific device. + pub fn new(device: &'d str, update_report: &'r UpdateReport) -> DeviceReport<'d, 'r> { + DeviceReport { device: device, update_report: update_report } + } +} + + +/// Enumerate the possible outcomes when trying to install a package. +#[allow(non_camel_case_types)] +#[derive(RustcDecodable, Clone, Debug, PartialEq, Eq)] +pub enum UpdateResultCode { + /// Operation executed successfully + OK = 0, + /// Operation has already been processed + ALREADY_PROCESSED, + /// Dependency failure during package install, upgrade, or removal + DEPENDENCY_FAILURE, + /// Update image integrity has been compromised + VALIDATION_FAILED, + /// Package installation failed + INSTALL_FAILED, + /// Package upgrade failed + UPGRADE_FAILED, + /// Package removal failed + REMOVAL_FAILED, + /// The module loader could not flash its managed module + FLASH_FAILED, + /// Partition creation failed + CREATE_PARTITION_FAILED, + /// Partition deletion failed + DELETE_PARTITION_FAILED, + /// Partition resize failed + RESIZE_PARTITION_FAILED, + /// Partition write failed + WRITE_PARTITION_FAILED, + /// Partition patching failed + PATCH_PARTITION_FAILED, + /// User declined the update + USER_DECLINED, + /// Software was blacklisted + SOFTWARE_BLACKLISTED, + /// Ran out of disk space + DISK_FULL, + /// Software package not found + NOT_FOUND, + /// Tried to downgrade to older version + OLD_VERSION, + /// SWM Internal integrity error + INTERNAL_ERROR, + /// Other error + GENERAL_ERROR, +} + +impl FromStr for UpdateResultCode { + type Err = Error; + + fn from_str(s: &str) -> Result<UpdateResultCode, Error> { + match &*s.to_uppercase() { + "0" | "OK" => Ok(UpdateResultCode::OK), + "1" | "ALREADY_PROCESSED" => Ok(UpdateResultCode::ALREADY_PROCESSED), + "2" | "DEPENDENCY_FAILURE" => Ok(UpdateResultCode::DEPENDENCY_FAILURE), + "3" | "VALIDATION_FAILED" => Ok(UpdateResultCode::VALIDATION_FAILED), + "4" | "INSTALL_FAILED" => Ok(UpdateResultCode::INSTALL_FAILED), + "5" | "UPGRADE_FAILED" => Ok(UpdateResultCode::UPGRADE_FAILED), + "6" | "REMOVAL_FAILED" => Ok(UpdateResultCode::REMOVAL_FAILED), + "7" | "FLASH_FAILED" => Ok(UpdateResultCode::FLASH_FAILED), + "8" | "CREATE_PARTITION_FAILED" => Ok(UpdateResultCode::CREATE_PARTITION_FAILED), + "9" | "DELETE_PARTITION_FAILED" => Ok(UpdateResultCode::DELETE_PARTITION_FAILED), + "10" | "RESIZE_PARTITION_FAILED" => Ok(UpdateResultCode::RESIZE_PARTITION_FAILED), + "11" | "WRITE_PARTITION_FAILED" => Ok(UpdateResultCode::WRITE_PARTITION_FAILED), + "12" | "PATCH_PARTITION_FAILED" => Ok(UpdateResultCode::PATCH_PARTITION_FAILED), + "13" | "USER_DECLINED" => Ok(UpdateResultCode::USER_DECLINED), + "14" | "SOFTWARE_BLACKLISTED" => Ok(UpdateResultCode::SOFTWARE_BLACKLISTED), + "15" | "DISK_FULL" => Ok(UpdateResultCode::DISK_FULL), + "16" | "NOT_FOUND" => Ok(UpdateResultCode::NOT_FOUND), + "17" | "OLD_VERSION" => Ok(UpdateResultCode::OLD_VERSION), + "18" | "INTERNAL_ERROR" => Ok(UpdateResultCode::INTERNAL_ERROR), + "19" | "GENERAL_ERROR" => Ok(UpdateResultCode::GENERAL_ERROR), + _ => Err(Error::Parse(format!("unknown UpdateResultCode: {}", s))) + } + } +} + +impl Encodable for UpdateResultCode { + fn encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error> { + s.emit_u64(self.clone() as u64) + } +} + + +/// An encodable response of the installation outcome for a particular update ID. +#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)] +pub struct OperationResult { + pub id: String, + pub result_code: UpdateResultCode, + pub result_text: String, +} + + +/// Encapsulates a single firmware installed on the device. +#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)] +pub struct InstalledFirmware { + pub module: String, + pub firmware_id: String, + pub last_modified: u64 +} + + +/// Encapsulates a single package installed on the device. +#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)] +pub struct InstalledPackage { + pub package_id: String, + pub name: String, + pub description: String, + pub last_modified: u64 +} + + +/// An encodable list of packages and firmwares to send to RVI. +#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)] +pub struct InstalledSoftware { + pub packages: Vec<InstalledPackage>, + pub firmwares: Vec<InstalledFirmware> +} + +impl InstalledSoftware { + /// Instantiate a new list of the software installed on the device. + pub fn new(packages: Vec<InstalledPackage>, firmwares: Vec<InstalledFirmware>) -> InstalledSoftware { + InstalledSoftware { packages: packages, firmwares: firmwares } + } +} + +impl Default for InstalledSoftware { + fn default() -> Self { + InstalledSoftware { packages: Vec::new(), firmwares: Vec::new() } + } +} diff --git a/src/datatype/url.rs b/src/datatype/url.rs new file mode 100644 index 0000000..5a9c97a --- /dev/null +++ b/src/datatype/url.rs @@ -0,0 +1,106 @@ +use hyper::method; +use rustc_serialize::{Decoder, Decodable}; +use std::borrow::Cow; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::io; +use std::net::ToSocketAddrs; +use std::str::FromStr; +use url; +use url::SocketAddrs; + +use datatype::Error; + + +/// Encapsulate a single crate URL with additional methods and traits. +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct Url(pub url::Url); + +impl Url { + /// Append the string suffix to this URL. + pub fn join(&self, suffix: &str) -> Result<Url, Error> { + let url = try!(self.0.join(suffix)); + Ok(Url(url)) + } + + /// Return the encapsulated crate URL. + pub fn inner(&self) -> url::Url { + self.0.clone() + } +} + +impl<'a> Into<Cow<'a, Url>> for Url { + fn into(self) -> Cow<'a, Url> { + Cow::Owned(self) + } +} + +impl FromStr for Url { + type Err = Error; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + let url = try!(url::Url::parse(s)); + Ok(Url(url)) + } +} + +impl Decodable for Url { + fn decode<D: Decoder>(d: &mut D) -> Result<Url, D::Error> { + let s = try!(d.read_str()); + s.parse().map_err(|e: Error| d.error(&e.to_string())) + } +} + +impl ToSocketAddrs for Url { + type Iter = SocketAddrs; + + fn to_socket_addrs(&self) -> io::Result<Self::Iter> { + self.0.to_socket_addrs() + } +} + +impl Display for Url { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + let host = self.0.host_str().unwrap_or("localhost"); + if let Some(port) = self.0.port() { + write!(f, "{}://{}:{}{}", self.0.scheme(), host, port, self.0.path()) + } else { + write!(f, "{}://{}{}", self.0.scheme(), host, self.0.path()) + } + } +} + + +/// Enumerate the supported HTTP methods. +#[derive(Clone, Debug)] +pub enum Method { + Get, + Post, + Put, +} + +impl Into<method::Method> for Method { + fn into(self) -> method::Method { + match self { + Method::Get => method::Method::Get, + Method::Post => method::Method::Post, + Method::Put => method::Method::Put, + } + } +} + +impl<'a> Into<Cow<'a, Method>> for Method { + fn into(self) -> Cow<'a, Method> { + Cow::Owned(self) + } +} + +impl Display for Method { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + let method = match *self { + Method::Get => "GET".to_string(), + Method::Post => "POST".to_string(), + Method::Put => "PUT".to_string(), + }; + write!(f, "{}", method) + } +} diff --git a/src/event/inbound.rs b/src/event/inbound.rs deleted file mode 100644 index 41a757c..0000000 --- a/src/event/inbound.rs +++ /dev/null @@ -1,28 +0,0 @@ - -#[derive(RustcDecodable, Clone)] -pub struct UpdateAvailable { - pub update_id: String, - pub signature: String, - pub description: String, - pub request_confirmation: bool, - pub size: u64 -} - -#[derive(RustcDecodable, Clone)] -pub struct DownloadComplete { - pub update_id: String, - pub update_image: String, - pub signature: String -} - -#[derive(RustcDecodable, Clone)] -pub struct GetInstalledSoftware { - pub include_packages: bool, - pub include_module_firmware: bool -} - -pub enum InboundEvent { - UpdateAvailable(UpdateAvailable), - DownloadComplete(DownloadComplete), - GetInstalledSoftware(GetInstalledSoftware) -} diff --git a/src/event/mod.rs b/src/event/mod.rs deleted file mode 100644 index b69d876..0000000 --- a/src/event/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub mod inbound; -pub mod outbound; - -pub type UpdateId = String; - -pub enum Event { - Inbound(inbound::InboundEvent), - OutBound(outbound::OutBoundEvent) -} diff --git a/src/event/outbound.rs b/src/event/outbound.rs deleted file mode 100644 index 4fb6fce..0000000 --- a/src/event/outbound.rs +++ /dev/null @@ -1,68 +0,0 @@ -use super::UpdateId; - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct InstalledFirmware { - pub module: String, - pub firmware_id: String, - pub last_modified: u64 -} - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct InstalledFirmwares(pub Vec<InstalledFirmware>); - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct InstalledPackage { - pub package_id: String, - pub name: String, - pub description: String, - pub last_modified: u64 -} - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct InstalledPackages(pub Vec<InstalledPackage>); - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct InstalledSoftware { - pub packages: Vec<InstalledPackage>, - pub firmware: Vec<InstalledFirmware> -} - -impl InstalledSoftware { - pub fn new(p: InstalledPackages, f: InstalledFirmwares) -> InstalledSoftware { - InstalledSoftware { - packages: p.0, - firmware: f.0 - } - } -} - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct OperationResult { - pub id: String, - pub result_code: u32, - pub result_text: String -} - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct OperationResults(pub Vec<OperationResult>); - -#[derive(RustcDecodable, RustcEncodable, Clone)] -pub struct UpdateReport { - pub update_id: String, - pub operation_results: Vec<OperationResult> -} - -impl UpdateReport { - pub fn new(id: String, res: OperationResults) -> UpdateReport { - UpdateReport { - update_id: id, - operation_results: res.0 - } - } -} - -pub enum OutBoundEvent { - InitiateDownload(UpdateId), - AbortDownload(UpdateId), - UpdateReport(UpdateReport) -} diff --git a/src/gateway/console.rs b/src/gateway/console.rs new file mode 100644 index 0000000..da8ba80 --- /dev/null +++ b/src/gateway/console.rs @@ -0,0 +1,46 @@ +use chan; +use chan::Sender; +use std::{io, thread}; +use std::io::Write; +use std::string::ToString; +use std::sync::{Arc, Mutex}; + +use datatype::{Command, Error, Event}; +use super::gateway::{Gateway, Interpret}; + + +/// The console gateway is used for REPL-style interaction with the client. +pub struct Console; + +impl Gateway for Console { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { + let (etx, erx) = chan::sync::<Event>(0); + let etx = Arc::new(Mutex::new(etx)); + + thread::spawn(move || { + loop { + match get_input() { + Ok(cmd) => itx.send(Interpret { command: cmd, response_tx: Some(etx.clone()) }), + Err(err) => error!("Console Error: {:?}", err) + } + } + }); + + thread::spawn(move || { + loop { + let e = erx.recv().expect("all console event transmitters are closed"); + info!("Console Response: {}", e.to_string()); + } + }); + + Ok(info!("Console gateway started.")) + } +} + +fn get_input() -> Result<Command, Error> { + let mut input = String::new(); + let _ = io::stdout().write(b"> "); + io::stdout().flush().expect("couldn't flush console stdout buffer"); + let _ = io::stdin().read_line(&mut input); + input.parse() +} diff --git a/src/gateway/dbus.rs b/src/gateway/dbus.rs new file mode 100644 index 0000000..3bd41ea --- /dev/null +++ b/src/gateway/dbus.rs @@ -0,0 +1,170 @@ +use chan::Sender; +use dbus::{Connection, BusType, ConnectionItem, FromMessageItem, + Message, MessageItem, NameFlag}; +use dbus::obj::{Argument, Interface, Method, MethodResult, ObjectPath}; +use std::thread; +use std::convert::From; + +use datatype::{Command, DBusConfig, Event, InstalledFirmware, InstalledPackage, + InstalledSoftware, OperationResult, UpdateReport}; +use datatype::dbus; +use super::{Gateway, Interpret}; + + +/// The `DBus` gateway is used with the RVI module for communicating with the +/// system session bus. +pub struct DBus { + pub dbus_cfg: DBusConfig, + pub itx: Sender<Interpret>, +} + +impl Gateway for DBus { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { + let dbus_cfg = self.dbus_cfg.clone(); + + thread::spawn(move || { + let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session"); + conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).unwrap(); + + let mut obj_path = ObjectPath::new(&conn, &dbus_cfg.path, true); + obj_path.insert_interface(&dbus_cfg.interface, default_interface(itx)); + obj_path.set_registered(true).expect("couldn't set registration status"); + + loop { + for item in conn.iter(1000) { + if let ConnectionItem::MethodCall(mut msg) = item { + info!("DBus method call: {:?}", msg); + obj_path.handle_message(&mut msg).map(|result| { + let _ = result.map_err(|_| error!("dbus method call failed: {:?}", msg)); + }); + } + } + } + }); + + Ok(info!("DBus gateway started.")) + } + + fn pulse(&self, event: Event) { + match event { + Event::NewUpdateAvailable(avail) => { + let msg = self.new_message("updateAvailable", &[ + MessageItem::from(avail.update_id), + MessageItem::from(avail.signature), + MessageItem::from(avail.description), + MessageItem::from(avail.request_confirmation) + ]); + let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session"); + let _ = conn.send(msg).map_err(|_| error!("couldn't send updateAvailable message")); + } + + Event::DownloadComplete(comp) => { + let msg = self.new_message("downloadComplete", &[ + MessageItem::from(comp.update_image), + MessageItem::from(comp.signature) + ]); + let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session"); + let _ = conn.send(msg).map_err(|_| error!("couldn't send downloadComplete message")); + } + + Event::InstalledSoftwareNeeded => { + let msg = self.new_message("getInstalledPackages", &[ + MessageItem::from(true), // include packages? + MessageItem::from(false) // include firmware? + ]); + let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session"); + let reply = conn.send_with_reply_and_block(msg, self.dbus_cfg.timeout).unwrap(); + + let _ = || -> Result<InstalledSoftware, ()> { + let mut args = reply.get_items().into_iter(); + + let pkg_arg = try!(args.next().ok_or(())); + let msgs: &Vec<MessageItem> = try!(FromMessageItem::from(&pkg_arg)); + let packages = try!(msgs.into_iter() + .map(|item| -> Result<InstalledPackage, ()> { + FromMessageItem::from(item) + }).collect::<Result<Vec<InstalledPackage>, ()>>()); + + let firm_arg = try!(args.next().ok_or(())); + let msgs: &Vec<MessageItem> = try!(FromMessageItem::from(&firm_arg)); + let firmwares = try!(msgs.into_iter() + .map(|item| -> Result<InstalledFirmware, ()> { + FromMessageItem::from(item) + }).collect::<Result<Vec<InstalledFirmware>, ()>>()); + + Ok(InstalledSoftware::new(packages, firmwares)) + }().map(|inst| send(&self.itx, Command::SendInstalledSoftware(inst))) + .map_err(|_| error!("unable to ReportInstalledSoftware")); + } + + _ => () + } + } +} + +impl DBus { + fn new_message(&self, method: &str, args: &[MessageItem]) -> Message { + let mgr = self.dbus_cfg.software_manager.clone(); + let path = self.dbus_cfg.software_manager_path.clone(); + let result = Message::new_method_call(&mgr, &path, &mgr, method); + let mut msg = result.expect("couldn't create dbus message"); + msg.append_items(args); + msg + } +} + +fn default_interface<'i>(itx: Sender<Interpret>) -> Interface<'i> { + let initiate_itx = itx.clone(); + let initiate_download = Method::new( + "initiateDownload", + vec![Argument::new("update_id", "s")], + vec![], + Box::new(move |msg| handle_initiate_download(&initiate_itx, msg)) + ); + + let update_itx = itx.clone(); + let update_report = Method::new( + "updateReport", + vec![Argument::new("update_id", "s"), Argument::new("operations_results", "aa{sv}")], + vec![], + Box::new(move |msg| handle_update_report(&update_itx, msg)) + ); + + Interface::new(vec![initiate_download, update_report], vec![], vec![]) +} + +fn send(itx: &Sender<Interpret>, cmd: Command) { + itx.send(Interpret { command: cmd, response_tx: None }); +} + +fn handle_initiate_download(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult { + let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg())); + debug!("handle_initiate_download: sender={:?}, msg={:?}", sender, msg); + + let mut args = msg.get_items().into_iter(); + let arg_id = try!(args.next().ok_or(dbus::missing_arg())); + let update_id: &String = try!(FromMessageItem::from(&arg_id).or(Err(dbus::malformed_arg()))); + send(itx, Command::StartDownload(vec![update_id.clone()])); + + Ok(vec![]) +} + +fn handle_update_report(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult { + let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg())); + debug!("handle_update_report: sender ={:?}, msg ={:?}", sender, msg); + let mut args = msg.get_items().into_iter(); + + let id_arg = try!(args.next().ok_or(dbus::missing_arg())); + let update_id: &String = try!(FromMessageItem::from(&id_arg).or(Err(dbus::malformed_arg()))); + + let results_arg = try!(args.next().ok_or(dbus::missing_arg())); + let msgs: &Vec<MessageItem> = try!(FromMessageItem::from(&results_arg).or(Err(dbus::malformed_arg()))); + let results = try!(msgs.into_iter() + .map(|item| -> Result<OperationResult, ()> { FromMessageItem::from(item) }) + .collect::<Result<Vec<OperationResult>, ()>>() + .or(Err(dbus::malformed_arg())) + ); + send(itx, Command::SendUpdateReport(UpdateReport::new(update_id.clone(), results))); + + Ok(vec![]) +} diff --git a/src/gateway/gateway.rs b/src/gateway/gateway.rs new file mode 100644 index 0000000..96fef8d --- /dev/null +++ b/src/gateway/gateway.rs @@ -0,0 +1,32 @@ +use chan::{Sender, Receiver}; +use std::process; +use std::sync::{Arc, Mutex}; + +use datatype::{Command, Event}; + + +/// Encapsulates a `Command` to be sent to the `GlobalInterpreter` for processing, +/// with an optional channel to receive the outcome `Event`. +pub struct Interpret { + pub command: Command, + pub response_tx: Option<Arc<Mutex<Sender<Event>>>>, +} + +/// A `Gateway` may send `Command`s to the `GlobalInterpreter`, as well as listen +/// to the system-wide `Event` messages. +pub trait Gateway { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String>; + + fn start(&mut self, itx: Sender<Interpret>, erx: Receiver<Event>) { + self.initialize(itx).unwrap_or_else(|err| { + error!("couldn't start gateway: {}", err); + process::exit(1); + }); + + loop { + self.pulse(erx.recv().expect("all gateway event transmitters are closed")); + } + } + + fn pulse(&self, _: Event) {} // ignore global events by default +} diff --git a/src/gateway/http.rs b/src/gateway/http.rs new file mode 100644 index 0000000..990a1fc --- /dev/null +++ b/src/gateway/http.rs @@ -0,0 +1,135 @@ +use chan; +use chan::{Sender, Receiver}; +use hyper::StatusCode; +use hyper::net::{HttpStream, Transport}; +use hyper::server::{Server as HyperServer, Request as HyperRequest}; +use rustc_serialize::json; +use std::thread; +use std::sync::{Arc, Mutex}; + +use datatype::{Command, Event}; +use gateway::{Gateway, Interpret}; +use http::{Server, ServerHandler}; + + +/// The `Http` gateway parses `Command`s from the body of incoming requests. +pub struct Http { + pub server: String, +} + +impl Gateway for Http { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { + let itx = Arc::new(Mutex::new(itx)); + let server = match HyperServer::http(&self.server.parse().expect("couldn't parse http address")) { + Ok(server) => server, + Err(err) => return Err(format!("couldn't start http gateway: {}", err)) + }; + thread::spawn(move || { + let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap(); + server.run(); + }); + + Ok(info!("HTTP gateway listening at http://{}", self.server)) + } +} + + +struct HttpHandler { + itx: Arc<Mutex<Sender<Interpret>>>, + response_rx: Option<Receiver<Event>> +} + +impl HttpHandler { + fn new(itx: Arc<Mutex<Sender<Interpret>>>) -> ServerHandler<HttpStream> { + ServerHandler::new(Box::new(HttpHandler { itx: itx, response_rx: None })) + } +} + +impl<T: Transport> Server<T> for HttpHandler { + fn headers(&mut self, _: HyperRequest<T>) {} + + fn request(&mut self, body: Vec<u8>) { + String::from_utf8(body).map(|body| { + json::decode::<Command>(&body).map(|cmd| { + info!("Incoming HTTP request command: {}", cmd); + let (etx, erx) = chan::async::<Event>(); + self.response_rx = Some(erx); + self.itx.lock().unwrap().send(Interpret { + command: cmd, + response_tx: Some(Arc::new(Mutex::new(etx))), + }); + }).unwrap_or_else(|err| error!("http request parse json: {}", err)) + }).unwrap_or_else(|err| error!("http request parse string: {}", err)) + } + + fn response(&mut self) -> (StatusCode, Option<Vec<u8>>) { + self.response_rx.as_ref().map_or((StatusCode::BadRequest, None), |rx| { + rx.recv().map_or_else(|| { + error!("on_response receiver error"); + (StatusCode::InternalServerError, None) + }, |event| { + json::encode(&event).map(|body| { + (StatusCode::Ok, Some(body.into_bytes())) + }).unwrap_or_else(|err| { + error!("on_response encoding json: {:?}", err); + (StatusCode::InternalServerError, None) + }) + }) + }) + } +} + + +#[cfg(test)] +mod tests { + use chan; + use crossbeam; + use rustc_serialize::json; + use std::path::Path; + use std::thread; + + use super::*; + use gateway::{Gateway, Interpret}; + use datatype::{Command, Event}; + use http::{AuthClient, Client, set_ca_certificates}; + + + #[test] + fn http_connections() { + set_ca_certificates(&Path::new("run/sota_certificates")); + + let (etx, erx) = chan::sync::<Event>(0); + let (itx, irx) = chan::sync::<Interpret>(0); + + thread::spawn(move || Http { server: "127.0.0.1:8888".to_string() }.start(itx, erx)); + thread::spawn(move || { + let _ = etx; // move into this scope + loop { + let interpret = irx.recv().expect("itx is closed"); + match interpret.command { + Command::StartDownload(ids) => { + let tx = interpret.response_tx.unwrap(); + tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + } + _ => panic!("expected AcceptUpdates"), + } + } + }); + + crossbeam::scope(|scope| { + for id in 0..10 { + scope.spawn(move || { + let cmd = Command::StartDownload(vec!(format!("{}", id))); + let client = AuthClient::default(); + let url = "http://127.0.0.1:8888".parse().unwrap(); + let body = json::encode(&cmd).unwrap(); + let resp_rx = client.post(url, Some(body.into_bytes())); + let resp = resp_rx.recv().unwrap().unwrap(); + let text = String::from_utf8(resp).unwrap(); + assert_eq!(json::decode::<Event>(&text).unwrap(), + Event::FoundSystemInfo(format!("{}", id))); + }); + } + }); + } +} diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs new file mode 100644 index 0000000..027ba51 --- /dev/null +++ b/src/gateway/mod.rs @@ -0,0 +1,13 @@ +pub mod console; +pub mod dbus; +pub mod gateway; +pub mod http; +pub mod socket; +pub mod websocket; + +pub use self::console::Console; +pub use self::dbus::DBus; +pub use self::gateway::{Gateway, Interpret}; +pub use self::http::Http; +pub use self::socket::Socket; +pub use self::websocket::Websocket; diff --git a/src/gateway/socket.rs b/src/gateway/socket.rs new file mode 100644 index 0000000..f252e7c --- /dev/null +++ b/src/gateway/socket.rs @@ -0,0 +1,161 @@ +use chan; +use chan::Sender; +use rustc_serialize::json; +use std::io::{BufReader, Read, Write}; +use std::net::Shutdown; +use std::sync::{Arc, Mutex}; +use std::{fs, thread}; + +use datatype::{Command, Error, Event}; +use super::{Gateway, Interpret}; +use unix_socket::{UnixListener, UnixStream}; + + +/// The `Socket` gateway is used for communication via Unix Domain Sockets. +pub struct Socket { + pub commands_path: String, + pub events_path: String, +} + +impl Gateway for Socket { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { + let _ = fs::remove_file(&self.commands_path); + let commands = match UnixListener::bind(&self.commands_path) { + Ok(sock) => sock, + Err(err) => return Err(format!("couldn't open commands socket: {}", err)) + }; + + let itx = Arc::new(Mutex::new(itx)); + thread::spawn(move || { + for conn in commands.incoming() { + if let Err(err) = conn { + error!("couldn't get commands socket connection: {}", err); + continue + } + let mut stream = conn.unwrap(); + let itx = itx.clone(); + + thread::spawn(move || { + let resp = handle_client(&mut stream, itx) + .map(|ev| json::encode(&ev).expect("couldn't encode Event").into_bytes()) + .unwrap_or_else(|err| format!("{}", err).into_bytes()); + + stream.write_all(&resp) + .unwrap_or_else(|err| error!("couldn't write to commands socket: {}", err)); + stream.shutdown(Shutdown::Write) + .unwrap_or_else(|err| error!("couldn't close commands socket: {}", err)); + }); + } + }); + + Ok(info!("Socket listening for commands at {} and sending events to {}.", + self.commands_path, self.events_path)) + } + + fn pulse(&self, event: Event) { + match event { + Event::DownloadComplete(dl) => { + let _ = UnixStream::connect(&self.events_path).map(|mut stream| { + stream.write_all(&json::encode(&dl).expect("couldn't encode Event").into_bytes()) + .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err)); + stream.shutdown(Shutdown::Write) + .unwrap_or_else(|err| error!("couldn't close events socket: {}", err)); + }).map_err(|err| error!("couldn't open events socket: {}", err)); + } + + _ => () + } + } +} + +fn handle_client(stream: &mut UnixStream, itx: Arc<Mutex<Sender<Interpret>>>) -> Result<Event, Error> { + info!("New domain socket connection"); + let mut reader = BufReader::new(stream); + let mut input = String::new(); + try!(reader.read_to_string(&mut input)); + debug!("socket input: {}", input); + + let cmd = try!(input.parse::<Command>()); + let (etx, erx) = chan::async::<Event>(); + itx.lock().unwrap().send(Interpret { + command: cmd, + response_tx: Some(Arc::new(Mutex::new(etx))), + }); + erx.recv().ok_or(Error::Socket("internal receiver error".to_string())) +} + + +#[cfg(test)] +mod tests { + use chan; + use crossbeam; + use rustc_serialize::json; + use std::{fs, thread}; + use std::io::{Read, Write}; + use std::net::Shutdown; + use std::time::Duration; + + use datatype::{Command, DownloadComplete, Event}; + use gateway::{Gateway, Interpret}; + use super::*; + use unix_socket::{UnixListener, UnixStream}; + + + #[test] + fn socket_commands_and_events() { + let (etx, erx) = chan::sync::<Event>(0); + let (itx, irx) = chan::sync::<Interpret>(0); + + thread::spawn(move || Socket { + commands_path: "/tmp/sota-commands.socket".to_string(), + events_path: "/tmp/sota-events.socket".to_string(), + }.start(itx, erx)); + thread::sleep(Duration::from_millis(100)); // wait until socket gateway is created + + let path = "/tmp/sota-events.socket"; + let _ = fs::remove_file(&path); + let server = UnixListener::bind(&path).expect("couldn't create events socket for testing"); + + let send = DownloadComplete { + update_id: "1".to_string(), + update_image: "/foo/bar".to_string(), + signature: "abc".to_string() + }; + etx.send(Event::DownloadComplete(send.clone())); + + let (mut stream, _) = server.accept().expect("couldn't read from events socket"); + let mut text = String::new(); + stream.read_to_string(&mut text).unwrap(); + let receive: DownloadComplete = json::decode(&text).expect("couldn't decode DownloadComplete message"); + assert_eq!(send, receive); + + thread::spawn(move || { + let _ = etx; // move into this scope + loop { + let interpret = irx.recv().expect("gtx is closed"); + match interpret.command { + Command::StartDownload(ids) => { + let tx = interpret.response_tx.unwrap(); + tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + } + _ => panic!("expected AcceptUpdates"), + } + } + }); + + crossbeam::scope(|scope| { + for id in 0..10 { + scope.spawn(move || { + let mut stream = UnixStream::connect("/tmp/sota-commands.socket").expect("couldn't connect to socket"); + let _ = stream.write_all(&format!("dl {}", id).into_bytes()).expect("couldn't write to stream"); + stream.shutdown(Shutdown::Write).expect("couldn't shut down writing"); + + let mut resp = String::new(); + stream.read_to_string(&mut resp).expect("couldn't read from stream"); + let ev: Event = json::decode(&resp).expect("couldn't decode json event"); + assert_eq!(ev, Event::FoundSystemInfo(format!("{}", id))); + }); + } + }); + } +} diff --git a/src/gateway/websocket.rs b/src/gateway/websocket.rs new file mode 100644 index 0000000..eb5e040 --- /dev/null +++ b/src/gateway/websocket.rs @@ -0,0 +1,169 @@ +use chan; +use chan::Sender; +use rustc_serialize::json; +use std::thread; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use ws; +use ws::{listen, CloseCode, Handler, Handshake, Message, Sender as WsSender}; +use ws::util::Token; + +use datatype::{Command, Error, Event}; +use super::gateway::{Gateway, Interpret}; + + +/// The `Websocket` gateway allows connected clients to listen to `Event`s that +/// happen in the SOTA client. +pub struct Websocket { + pub server: String, + pub clients: Arc<Mutex<HashMap<Token, WsSender>>> +} + +impl Gateway for Websocket { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { + let clients = self.clients.clone(); + let addr = self.server.clone(); + info!("Opening websocket listener at {}", addr); + + thread::spawn(move || { + listen(&addr as &str, |out| { + WebsocketHandler { + out: out, + itx: itx.clone(), + clients: clients.clone() + } + }).expect("couldn't start websocket listener"); + }); + + thread::sleep(Duration::from_secs(1)); // FIXME: ugly hack for blocking listen call + Ok(info!("Websocket gateway started.")) + } + + fn pulse(&self, event: Event) { + let json = encode(event); + for (_, out) in self.clients.lock().unwrap().iter() { + let _ = out.send(Message::Text(json.clone())); + } + } +} + + +pub struct WebsocketHandler { + out: WsSender, + itx: Sender<Interpret>, + clients: Arc<Mutex<HashMap<Token, WsSender>>> +} + +impl Handler for WebsocketHandler { + fn on_message(&mut self, msg: Message) -> ws::Result<()> { + debug!("received websocket message: {:?}", msg); + msg.as_text().or_else(|err| { + error!("websocket on_message text error: {}", err); + Err(err) + }).and_then(|msg| match decode(msg) { + Ok(cmd) => Ok(self.forward_command(cmd)), + + Err(Error::Websocket(err)) => { + error!("websocket on_message error: {}", err); + Err(err) + } + + Err(_) => unreachable!() + }) + } + + fn on_open(&mut self, _: Handshake) -> ws::Result<()> { + let _ = self.clients.lock().unwrap().insert(self.out.token(), self.out.clone()); + Ok(debug!("new websocket client: {:?}", self.out.token())) + } + + fn on_close(&mut self, code: CloseCode, _: &str) { + let _ = self.clients.lock().unwrap().remove(&self.out.token()); + debug!("closing websocket client {:?}: {:?}", self.out.token(), code); + } + + fn on_error(&mut self, err: ws::Error) { + error!("websocket error: {:?}", err); + } +} + +impl WebsocketHandler { + fn forward_command(&self, cmd: Command) { + let (etx, erx) = chan::sync::<Event>(0); + let etx = Arc::new(Mutex::new(etx.clone())); + self.itx.send(Interpret { command: cmd, response_tx: Some(etx) }); + + let e = erx.recv().expect("websocket response_tx is closed"); + let _ = self.out.send(Message::Text(encode(e))); + } +} + +fn encode(event: Event) -> String { + json::encode(&event).expect("Error encoding event into JSON") +} + +fn decode(s: &str) -> Result<Command, Error> { + Ok(try!(json::decode::<Command>(s))) +} + + +#[cfg(test)] +mod tests { + use chan; + use crossbeam; + use rustc_serialize::json; + use std::thread; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + use ws; + use ws::{connect, CloseCode}; + + use datatype::{Command, Event}; + use gateway::{Gateway, Interpret}; + use super::*; + + + #[test] + fn websocket_connections() { + let (etx, erx) = chan::sync::<Event>(0); + let (itx, irx) = chan::sync::<Interpret>(0); + + thread::spawn(move || { + Websocket { + server: "localhost:3012".to_string(), + clients: Arc::new(Mutex::new(HashMap::new())) + }.start(itx, erx); + }); + thread::spawn(move || { + let _ = etx; // move into this scope + loop { + let interpret = irx.recv().expect("gtx is closed"); + match interpret.command { + Command::StartDownload(ids) => { + let tx = interpret.response_tx.unwrap(); + tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + } + _ => panic!("expected AcceptUpdates"), + } + } + }); + + crossbeam::scope(|scope| { + for id in 0..10 { + scope.spawn(move || { + connect("ws://localhost:3012", |out| { + out.send(format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id)) + .expect("couldn't write to websocket"); + + move |msg: ws::Message| { + let ev: Event = json::decode(&format!("{}", msg)).unwrap(); + assert_eq!(ev, Event::FoundSystemInfo(format!("{}", id))); + out.close(CloseCode::Normal) + } + }).expect("couldn't connect to websocket"); + }); + } + }); + } +} diff --git a/src/genivi/dbus.rs b/src/genivi/dbus.rs deleted file mode 100644 index 7b89590..0000000 --- a/src/genivi/dbus.rs +++ /dev/null @@ -1,121 +0,0 @@ -use dbus::{FromMessageItem, MessageItem}; -use toml::{decode, Table, Value}; - -/// DBus error string to indicate a missing argument. -static MISSING_ARG: &'static str = "Error.MissingArgument"; -/// DBus error string to indicate a malformed argument. -static MALFORMED_ARG: &'static str = "Error.MalformedArgument"; - -/// Format a DBus error message indicating a missing argument. -pub fn missing_arg() -> (&'static str, String) { - (MISSING_ARG, "Missing argument".to_string()) -} - -/// Format a DBus error message indicating a malformed argument. -pub fn malformed_arg() -> (&'static str, String) { - (MALFORMED_ARG, "Malformed argument".to_string()) -} - - -struct DecodableValue(Value); - -impl<'a> FromMessageItem<'a> for DecodableValue { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - match m { - &MessageItem::Str(ref b) => Ok(DecodableValue(Value::String(b.clone()))), - &MessageItem::Bool(ref b) => Ok(DecodableValue(Value::Boolean(*b))), - &MessageItem::Byte(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::Int16(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::Int32(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::Int64(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::UInt16(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::UInt32(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::UInt64(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))), - &MessageItem::Variant(ref b) => FromMessageItem::from(&**b), - _ => Err(()) - } - } -} - -pub struct DecodableStruct(pub Value); - -impl<'a> FromMessageItem<'a> for DecodableStruct { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m)); - arr.iter() - .map(|entry| { - let v: Result<(&MessageItem, &MessageItem), ()> = FromMessageItem::from(entry); - v.and_then(|(k, v)| { - let k: Result<&String,()> = FromMessageItem::from(k); - k.and_then(|k| { - let v: Result<DecodableValue,()> = FromMessageItem::from(v); - v.map(|v| (k.clone(), v.0)) }) }) }) - .collect::<Result<Vec<(_, _)>, ()>>() - .map(|arr| DecodableStruct(Value::Table(arr.into_iter().collect::<Table>()))) - } -} - - -use event::outbound::{OperationResult, OperationResults}; - -impl<'a> FromMessageItem<'a> for OperationResult { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let m: DecodableStruct = try!(FromMessageItem::from(m)); - decode::<OperationResult>(m.0).ok_or(()) - } -} - -impl<'a> FromMessageItem<'a> for OperationResults { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m)); - arr.into_iter() - .map(|i| { - let i: Result<OperationResult, ()> = FromMessageItem::from(i); - i }) - .collect::<Result<Vec<_>, ()>>() - .map(|a| OperationResults(a)) - } -} - -use event::outbound::{InstalledPackage, InstalledPackages}; - -impl<'a> FromMessageItem<'a> for InstalledPackage { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let m: DecodableStruct = try!(FromMessageItem::from(m)); - decode::<InstalledPackage>(m.0).ok_or(()) - } -} - -impl<'a> FromMessageItem<'a> for InstalledPackages { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m)); - arr.into_iter() - .map(|i| { - let i: Result<InstalledPackage, ()> = FromMessageItem::from(i); - i }) - .collect::<Result<Vec<_>, ()>>() - .map(|a| InstalledPackages(a)) - } -} - -use event::outbound::{InstalledFirmware, InstalledFirmwares}; - -impl<'a> FromMessageItem<'a> for InstalledFirmware { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let m: DecodableStruct = try!(FromMessageItem::from(m)); - decode::<InstalledFirmware>(m.0).ok_or(()) - } -} - -impl<'a> FromMessageItem<'a> for InstalledFirmwares { - fn from(m: &'a MessageItem) -> Result<Self, ()> { - let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m)); - arr.into_iter() - .map(|i| { - let i: Result<InstalledFirmware, ()> = FromMessageItem::from(i); - i }) - .collect::<Result<Vec<_>, ()>>() - .map(|a| InstalledFirmwares(a)) - } -} - diff --git a/src/genivi/mod.rs b/src/genivi/mod.rs deleted file mode 100644 index b7ae755..0000000 --- a/src/genivi/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod dbus; -pub mod swm; -pub mod sc; -pub mod start; diff --git a/src/genivi/sc.rs b/src/genivi/sc.rs deleted file mode 100644 index 7c86edc..0000000 --- a/src/genivi/sc.rs +++ /dev/null @@ -1,126 +0,0 @@ -//! Receiving side of the DBus interface. - -use std::sync::mpsc::Sender; - -use dbus::{Connection, NameFlag, BusType, ConnectionItem, Message, FromMessageItem}; -use dbus::obj::*; - -use configuration::DBusConfiguration; -use event::Event; -use event::outbound::{OutBoundEvent, OperationResults, UpdateReport}; -use genivi::dbus::*; - - -/// Encodes the state that is needed to accept incoming DBus messages. -pub struct Receiver { - /// The configuration for the DBus interface. - config: DBusConfiguration, - /// A sender to forward incoming messages. - sender: Sender<Event> -} - -impl Receiver { - /// Create a new `Receiver`. - /// - /// # Arguments - /// * `c`: The configuration for the DBus interface. - /// * `s`: A sender to forward incoming messages. - pub fn new(c: DBusConfiguration, s: Sender<Event>) -> Receiver { - Receiver { - config: c, - sender: s - } - } - - /// Start the listener. It will register in DBus according to the configuration, wait for - /// incoming messages and forward them via the internal `Sender`. - pub fn start(&self) { - let conn = Connection::get_private(BusType::Session).unwrap(); - conn.register_name(&self.config.name, NameFlag::ReplaceExisting as u32).unwrap(); - - let initiate_download = Method::new( - "initiateDownload", - vec!(Argument::new("update_id", "s")), - vec!(), - Box::new(|msg| self.handle_initiate_download(msg))); - let abort_download = Method::new( - "abortDownload", - vec!(Argument::new("update_id", "s")), - vec!(), - Box::new(|msg| self.handle_abort_download(msg))); - let update_report = Method::new( - "updateReport", - vec!(Argument::new("update_id", "s"), Argument::new("operations_results", "aa{sv}")), - vec!(), - Box::new(|msg| self.handle_update_report(msg))); - let interface = Interface::new(vec!(initiate_download, abort_download, update_report), vec!(), vec!()); - - let mut object_path = ObjectPath::new(&conn, &self.config.path, true); - object_path.insert_interface(&self.config.interface, interface); - object_path.set_registered(true).unwrap(); - - for n in conn.iter(1000) { - match n { - ConnectionItem::MethodCall(mut m) => { - object_path.handle_message(&mut m); - }, - _ => {} - } - } - } - - /// Handles incoming "Initiate Download" messages. - /// - /// Parses the message and forwards it to the internal `Sender`. - /// - /// # Arguments - /// * `msg`: The message to handle. - fn handle_initiate_download(&self, msg: &mut Message) -> MethodResult { - let sender = try!(get_sender(msg).ok_or(missing_arg())); - trace!("sender: {:?}", sender); - trace!("msg: {:?}", msg); - - let mut args = msg.get_items().into_iter(); - let arg = try!(args.next().ok_or(missing_arg())); - let update_id: &String = try!(FromMessageItem::from(&arg).or(Err(malformed_arg()))); - let _ = self.sender.send( - Event::OutBound(OutBoundEvent::InitiateDownload(update_id.clone()))); - - Ok(vec!()) - } - - fn handle_abort_download(&self, msg: &mut Message) -> MethodResult { - let sender = try!(get_sender(msg).ok_or(missing_arg())); - trace!("sender: {:?}", sender); - trace!("msg: {:?}", msg); - - let mut args = msg.get_items().into_iter(); - let arg = try!(args.next().ok_or(missing_arg())); - let update_id: &String = try!(FromMessageItem::from(&arg).or(Err(malformed_arg()))); - let _ = self.sender.send( - Event::OutBound(OutBoundEvent::AbortDownload(update_id.clone()))); - - Ok(vec!()) - } - - fn handle_update_report(&self, msg: &mut Message) -> MethodResult { - let sender = try!(get_sender(msg).ok_or(missing_arg())); - trace!("sender: {:?}", sender); - trace!("msg: {:?}", msg); - - let mut args = msg.get_items().into_iter(); - let arg = try!(args.next().ok_or(missing_arg())); - let update_id: &String = try!(FromMessageItem::from(&arg).or(Err(malformed_arg()))); - - let arg = try!(args.next().ok_or(missing_arg())); - let operation_results: OperationResults = try!(FromMessageItem::from(&arg).or(Err(malformed_arg()))); - - let report = UpdateReport::new(update_id.clone(), operation_results); - let _ = self.sender.send( - Event::OutBound(OutBoundEvent::UpdateReport(report))); - - Ok(vec!()) - } -} - -fn get_sender(msg: &Message) -> Option<String> { msg.sender() } diff --git a/src/genivi/start.rs b/src/genivi/start.rs deleted file mode 100644 index 55bef27..0000000 --- a/src/genivi/start.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Main loop, starting the worker threads and wiring up communication channels between them. - -use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{channel, Receiver}; -use std::thread; - -use configuration::Configuration; -use configuration::DBusConfiguration; -use event::Event; -use event::inbound::InboundEvent; -use event::outbound::OutBoundEvent; -use remote::svc::{RemoteServices, ServiceHandler}; -use remote::rvi; - -pub fn handle(cfg: &DBusConfiguration, rx: Receiver<Event>, remote_svcs: Arc<Mutex<RemoteServices>>) { - loop { - match rx.recv().unwrap() { - Event::Inbound(i) => match i { - InboundEvent::UpdateAvailable(e) => { - info!("UpdateAvailable"); - super::swm::send_update_available(&cfg, e); - }, - InboundEvent::DownloadComplete(e) => { - info!("DownloadComplete"); - super::swm::send_download_complete(&cfg, e); - }, - InboundEvent::GetInstalledSoftware(e) => { - info!("GetInstalledSoftware"); - let _ = super::swm::send_get_installed_software(&cfg, e) - .and_then(|e| { - remote_svcs.lock().unwrap().send_installed_software(e) - .map_err(|e| error!("{}", e)) }); - } - }, - Event::OutBound(o) => match o { - OutBoundEvent::InitiateDownload(e) => { - info!("InitiateDownload"); - let _ = remote_svcs.lock().unwrap().send_start_download(e); - }, - OutBoundEvent::AbortDownload(_) => info!("AbortDownload"), - OutBoundEvent::UpdateReport(e) => { - info!("UpdateReport"); - let _ = remote_svcs.lock().unwrap().send_update_report(e); - } - } - } - } -} - -/// Main loop, starting the worker threads and wiring up communication channels between them. -/// -/// # Arguments -/// * `conf`: A pointer to a `Configuration` object see the [documentation of the configuration -/// crate](../configuration/index.html). -/// * `rvi_url`: The URL, where RVI can be found, with the protocol. -/// * `edge_url`: The `host:port` combination where the client should bind and listen for incoming -/// RVI calls. -pub fn start(conf: &Configuration, rvi_url: String, edge_url: String) { - // Main message channel from RVI and DBUS - let (tx, rx) = channel(); - - // RVI edge handler - let remote_svcs = Arc::new(Mutex::new(RemoteServices::new(rvi_url.clone()))); - let handler = ServiceHandler::new(tx.clone(), remote_svcs.clone(), conf.client.clone()); - let rvi_edge = rvi::ServiceEdge::new(rvi_url.clone(), edge_url, handler); - rvi_edge.start(); - - // DBUS handler - let dbus_receiver = super::sc::Receiver::new(conf.dbus.clone(), tx); - thread::spawn(move || dbus_receiver.start()); - handle(&conf.dbus, rx, remote_svcs); -} diff --git a/src/genivi/swm.rs b/src/genivi/swm.rs deleted file mode 100644 index e913e1e..0000000 --- a/src/genivi/swm.rs +++ /dev/null @@ -1,63 +0,0 @@ -//! Sending side of the DBus interface. - -use std::convert::From; - -use dbus::{Connection, BusType, MessageItem, Message, FromMessageItem}; - -use configuration::DBusConfiguration; -use event::inbound::{UpdateAvailable, DownloadComplete, GetInstalledSoftware}; -use event::outbound::{InstalledFirmwares, InstalledPackages, InstalledSoftware}; - -pub fn send_update_available(config: &DBusConfiguration, e: UpdateAvailable) { - let args = [ - MessageItem::from(e.update_id), - MessageItem::from(e.signature), - MessageItem::from(e.description), - MessageItem::from(e.request_confirmation)]; - let mut message = Message::new_method_call( - &config.software_manager, &config.software_manager_path, - &config.software_manager, "updateAvailable").unwrap(); - message.append_items(&args); - - let conn = Connection::get_private(BusType::Session).unwrap(); - let _ = conn.send(message) - .map_err(|_| error!("Couldn't forward message to D-Bus")); -} - -pub fn send_download_complete(config: &DBusConfiguration, e: DownloadComplete) { - let args = [ - MessageItem::from(e.update_image), - MessageItem::from(e.signature)]; - let mut message = Message::new_method_call( - &config.software_manager, &config.software_manager_path, - &config.software_manager, "downloadComplete").unwrap(); - message.append_items(&args); - - let conn = Connection::get_private(BusType::Session).unwrap(); - let _ = conn.send(message) - .map_err(|_| error!("Couldn't forward message to D-Bus")); -} - -pub fn send_get_installed_software(config: &DBusConfiguration, e: GetInstalledSoftware) - -> Result<InstalledSoftware, ()> { - let args = [ - MessageItem::from(e.include_packages), - MessageItem::from(e.include_module_firmware)]; - let mut message = Message::new_method_call( - &config.software_manager, &config.software_manager_path, - &config.software_manager, "getInstalledPackages").unwrap(); - message.append_items(&args); - - let conn = Connection::get_private(BusType::Session).unwrap(); - let msg = conn.send_with_reply_and_block(message, config.timeout).unwrap(); - - let mut args = msg.get_items().into_iter(); - let arg = try!(args.next().ok_or(())); - let installed_packages: InstalledPackages = try!(FromMessageItem::from(&arg)); - - let arg = try!(args.next().ok_or(())); - let installed_firmware: InstalledFirmwares = try!(FromMessageItem::from(&arg)); - - Ok(InstalledSoftware::new(installed_packages, installed_firmware)) -} - diff --git a/src/http/auth_client.rs b/src/http/auth_client.rs new file mode 100644 index 0000000..f4ad38b --- /dev/null +++ b/src/http/auth_client.rs @@ -0,0 +1,278 @@ +use chan::Sender; +use hyper; +use hyper::{Encoder, Decoder, Next}; +use hyper::client::{Client as HyperClient, Handler, HttpsConnector, + Request as HyperRequest, Response as HyperResponse}; +use hyper::header::{Authorization, Basic, Bearer, ContentLength, ContentType, Location}; +use hyper::mime::{Attr, Mime, TopLevel, SubLevel, Value}; +use hyper::net::{HttpStream, HttpsStream, OpensslStream}; +use hyper::status::StatusCode; +use std::{io, mem}; +use std::io::{ErrorKind, Write}; +use std::str; +use std::time::Duration; +use time; + +use datatype::{Auth, Error}; +use http::{Client, get_openssl, Request, Response}; + + +/// The `AuthClient` will attach an `Authentication` header to each outgoing +/// HTTP request. +#[derive(Clone)] +pub struct AuthClient { + auth: Auth, + client: HyperClient<AuthHandler>, +} + +impl Default for AuthClient { + fn default() -> Self { + Self::from(Auth::None) + } +} + +impl AuthClient { + /// Instantiates a new client ready to make requests for the given `Auth` type. + pub fn from(auth: Auth) -> Self { + let client = HyperClient::<AuthHandler>::configure() + .keep_alive(true) + .max_sockets(1024) + .connector(HttpsConnector::new(get_openssl())) + .build() + .expect("unable to create a new hyper Client"); + + AuthClient { + auth: auth, + client: client, + } + } +} + +impl Client for AuthClient { + fn chan_request(&self, req: Request, resp_tx: Sender<Response>) { + info!("{} {}", req.method, req.url); + let _ = self.client.request(req.url.inner(), AuthHandler { + auth: self.auth.clone(), + req: req, + timeout: Duration::from_secs(20), + started: None, + written: 0, + response: Vec::new(), + resp_tx: resp_tx.clone(), + }).map_err(|err| resp_tx.send(Err(Error::from(err)))); + } +} + + +/// The async handler for outgoing HTTP requests. +// FIXME: uncomment when yocto is at 1.8.0: #[derive(Debug)] +pub struct AuthHandler { + auth: Auth, + req: Request, + timeout: Duration, + started: Option<u64>, + written: usize, + response: Vec<u8>, + resp_tx: Sender<Response>, +} + +// FIXME: required for building on 1.7.0 only +impl ::std::fmt::Debug for AuthHandler { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "unimplemented") + } +} + +impl AuthHandler { + fn redirect_request(&mut self, resp: HyperResponse) { + match resp.headers().get::<Location>() { + Some(&Location(ref loc)) => self.req.url.join(loc).map(|url| { + debug!("redirecting to {}", url); + // drop Authentication Header on redirect + let client = AuthClient::default(); + let resp_rx = client.send_request(Request { + url: url, + method: self.req.method.clone(), + body: mem::replace(&mut self.req.body, None), + }); + match resp_rx.recv().expect("no redirect_request response") { + Ok(data) => self.resp_tx.send(Ok(data)), + Err(err) => self.resp_tx.send(Err(Error::from(err))) + } + }).unwrap_or_else(|err| self.resp_tx.send(Err(Error::from(err)))), + + None => self.resp_tx.send(Err(Error::Client("redirect missing Location header".to_string()))) + } + } +} + +/// The `AuthClient` may be used for both HTTP and HTTPS connections. +pub type Stream = HttpsStream<OpensslStream<HttpStream>>; + +impl Handler<Stream> for AuthHandler { + fn on_request(&mut self, req: &mut HyperRequest) -> Next { + req.set_method(self.req.method.clone().into()); + self.started = Some(time::precise_time_ns()); + let mut headers = req.headers_mut(); + + // empty Charset to keep RVI happy + let mime_json = Mime(TopLevel::Application, SubLevel::Json, vec![]); + let mime_form = Mime(TopLevel::Application, SubLevel::WwwFormUrlEncoded, + vec![(Attr::Charset, Value::Utf8)]); + + match self.auth { + Auth::None => { + headers.set(ContentType(mime_json)); + } + + Auth::Credentials(_, _) if self.req.body.is_some() => { + panic!("no request body expected for Auth::Credentials"); + } + + Auth::Credentials(ref id, ref secret) => { + headers.set(Authorization(Basic { username: id.0.clone(), + password: Some(secret.0.clone()) })); + headers.set(ContentType(mime_form)); + self.req.body = Some(br#"grant_type=client_credentials"#.to_vec()); + } + + Auth::Token(ref token) => { + headers.set(Authorization(Bearer { token: token.access_token.clone() })); + headers.set(ContentType(mime_json)); + } + }; + + self.req.body.as_ref().map_or(Next::read().timeout(self.timeout), |body| { + headers.set(ContentLength(body.len() as u64)); + Next::write() + }) + } + + fn on_request_writable(&mut self, encoder: &mut Encoder<Stream>) -> Next { + let body = self.req.body.as_ref().expect("on_request_writable expects a body"); + + match encoder.write(&body[self.written..]) { + Ok(0) => { + info!("Request length: {} bytes", body.len()); + if let Ok(body) = str::from_utf8(body) { + debug!("body:\n{}", body); + } + Next::read().timeout(self.timeout) + }, + + Ok(n) => { + self.written += n; + trace!("{} bytes written to request body", n); + Next::write() + } + + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + trace!("retry on_request_writable"); + Next::write() + } + + Err(err) => { + error!("unable to write request body: {}", err); + self.resp_tx.send(Err(Error::from(err))); + Next::remove() + } + } + } + + fn on_response(&mut self, resp: HyperResponse) -> Next { + info!("Response status: {}", resp.status()); + debug!("on_response headers:\n{}", resp.headers()); + let started = self.started.expect("expected start time"); + let latency = time::precise_time_ns() as f64 - started as f64; + debug!("on_response latency: {}ms", (latency / 1e6) as u32); + + if resp.status().is_success() { + if let Some(len) = resp.headers().get::<ContentLength>() { + if **len > 0 { + return Next::read(); + } + } + self.resp_tx.send(Ok(Vec::new())); + Next::end() + } else if resp.status().is_redirection() { + self.redirect_request(resp); + Next::end() + } else if resp.status() == &StatusCode::Forbidden { + self.resp_tx.send(Err(Error::Authorization(format!("{}", resp.status())))); + Next::end() + } else { + self.resp_tx.send(Err(Error::Client(format!("{}", resp.status())))); + Next::end() + } + } + + fn on_response_readable(&mut self, decoder: &mut Decoder<Stream>) -> Next { + match io::copy(decoder, &mut self.response) { + Ok(0) => { + debug!("on_response_readable bytes read: {}", self.response.len()); + self.resp_tx.send(Ok(mem::replace(&mut self.response, Vec::new()))); + Next::end() + } + + Ok(n) => { + trace!("{} more response bytes read", n); + Next::read() + } + + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + trace!("retry on_response_readable"); + Next::read() + } + + Err(err) => { + error!("unable to read response body: {}", err); + self.resp_tx.send(Err(Error::from(err))); + Next::end() + } + } + } + + fn on_error(&mut self, err: hyper::Error) -> Next { + error!("on_error: {}", err); + self.resp_tx.send(Err(Error::from(err))); + Next::remove() + } +} + + +#[cfg(test)] +mod tests { + use rustc_serialize::json::Json; + use std::path::Path; + + use super::*; + use http::{Client, set_ca_certificates}; + + + fn get_client() -> AuthClient { + set_ca_certificates(&Path::new("run/sota_certificates")); + AuthClient::default() + } + + #[test] + fn test_send_get_request() { + let client = get_client(); + let url = "http://eu.httpbin.org/bytes/16?seed=123".parse().unwrap(); + let resp_rx = client.get(url, None); + let data = resp_rx.recv().unwrap().unwrap(); + assert_eq!(data, vec![13, 22, 104, 27, 230, 9, 137, 85, 218, 40, 86, 85, 62, 0, 111, 22]); + } + + #[test] + fn test_send_post_request() { + let client = get_client(); + let url = "https://eu.httpbin.org/post".parse().unwrap(); + let resp_rx = client.post(url, Some(br#"foo"#.to_vec())); + let body = resp_rx.recv().unwrap().unwrap(); + let resp = String::from_utf8(body).unwrap(); + let json = Json::from_str(&resp).unwrap(); + let obj = json.as_object().unwrap(); + let data = obj.get("data").unwrap().as_string().unwrap(); + assert_eq!(data, "foo"); + } +} diff --git a/src/http/http_client.rs b/src/http/http_client.rs new file mode 100644 index 0000000..492166c --- /dev/null +++ b/src/http/http_client.rs @@ -0,0 +1,43 @@ +use chan; +use chan::{Sender, Receiver}; + +use datatype::{Error, Method, Url}; + + +/// Abstracts a particular HTTP Client implementation with the basic methods +/// for sending `Request`s and receiving asynchronous `Response`s via a channel. +pub trait Client { + fn chan_request(&self, req: Request, resp_tx: Sender<Response>); + + fn send_request(&self, req: Request) -> Receiver<Response> { + let (resp_tx, resp_rx) = chan::async::<Response>(); + self.chan_request(req, resp_tx); + resp_rx + } + + fn get(&self, url: Url, body: Option<Vec<u8>>) -> Receiver<Response> { + self.send_request(Request { method: Method::Get, url: url, body: body }) + } + + fn post(&self, url: Url, body: Option<Vec<u8>>) -> Receiver<Response> { + self.send_request(Request { method: Method::Post, url: url, body: body }) + } + + fn put(&self, url: Url, body: Option<Vec<u8>>) -> Receiver<Response> { + self.send_request(Request { method: Method::Put, url: url, body: body }) + } + + fn is_testing(&self) -> bool { false } +} + + +/// A simplified representation of an HTTP request for use in the client. +#[derive(Debug)] +pub struct Request { + pub method: Method, + pub url: Url, + pub body: Option<Vec<u8>> +} + +/// Return the body of an HTTP response on success, or an `Error` otherwise. +pub type Response = Result<Vec<u8>, Error>; diff --git a/src/http/http_server.rs b/src/http/http_server.rs new file mode 100644 index 0000000..2ecd7a2 --- /dev/null +++ b/src/http/http_server.rs @@ -0,0 +1,113 @@ +use hyper::{Decoder, Encoder, Next, StatusCode}; +use hyper::header::{ContentLength, ContentType}; +use hyper::mime::{Attr, Mime, TopLevel, SubLevel, Value}; +use hyper::net::Transport; +use hyper::server::{Handler, Request as HyperRequest, Response as HyperResponse}; +use std::{mem, io}; +use std::io::{ErrorKind, Write}; +use std::time::Duration; + + +/// An HTTP server handles the incoming headers and request body as well as the +/// setting the response status and body. Other concerns regarding the asynchronous +/// event loop handlers for writing to buffers are abstracted away. +pub trait Server<T: Transport>: Send { + fn headers(&mut self, req: HyperRequest<T>); + fn request(&mut self, body: Vec<u8>); + fn response(&mut self) -> (StatusCode, Option<Vec<u8>>); +} + + +/// This implements the `hyper::server::Handler` trait so that it can be used +/// to handle incoming HTTP connections with `hyper::server::Server`. +pub struct ServerHandler<T: Transport> { + server: Box<Server<T>>, + req_body: Vec<u8>, + resp_body: Vec<u8>, + written: usize +} + +impl<T: Transport> ServerHandler<T> { + /// Instantiate a new `ServerHandler` by passing a `Box<Server<T>` reference. + pub fn new(server: Box<Server<T>>) -> Self { + ServerHandler { + server: server, + req_body: Vec::new(), + resp_body: Vec::new(), + written: 0 + } + } +} + +impl<T: Transport> Handler<T> for ServerHandler<T> { + fn on_request(&mut self, req: HyperRequest<T>) -> Next { + info!("on_request: {} {}", req.method(), req.uri()); + self.server.headers(req); + Next::read() + } + + fn on_request_readable(&mut self, transport: &mut Decoder<T>) -> Next { + match io::copy(transport, &mut self.req_body) { + Ok(0) => { + debug!("on_request_readable bytes read: {}", self.req_body.len()); + self.server.request(mem::replace(&mut self.req_body, Vec::new())); + Next::write().timeout(Duration::from_secs(20)) + } + + Ok(n) => { + trace!("{} more request bytes read", n); + Next::read() + } + + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + trace!("retry on_request_readable"); + Next::read() + } + + Err(err) => { + error!("unable to read request body: {}", err); + Next::remove() + } + } + } + + fn on_response(&mut self, resp: &mut HyperResponse) -> Next { + let (status, body) = self.server.response(); + resp.set_status(status); + info!("on_response: status {}", resp.status()); + + let mut headers = resp.headers_mut(); + headers.set(ContentType(Mime(TopLevel::Application, SubLevel::Json, + vec![(Attr::Charset, Value::Utf8)]))); + body.map_or_else(Next::end, |body| { + headers.set(ContentLength(body.len() as u64)); + self.resp_body = body; + Next::write() + }) + } + + fn on_response_writable(&mut self, transport: &mut Encoder<T>) -> Next { + match transport.write(&self.resp_body[self.written..]) { + Ok(0) => { + debug!("{} bytes written to response body", self.written); + Next::end() + } + + Ok(n) => { + self.written += n; + trace!("{} bytes written to response body", n); + Next::write() + } + + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + trace!("retry on_response_writable"); + Next::write() + } + + Err(err) => { + error!("unable to write response body: {}", err); + Next::remove() + } + } + } +} diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..5e990a3 --- /dev/null +++ b/src/http/mod.rs @@ -0,0 +1,11 @@ +pub mod auth_client; +pub mod http_client; +pub mod http_server; +pub mod openssl; +pub mod test_client; + +pub use self::auth_client::{AuthClient, AuthHandler}; +pub use self::http_client::{Client, Request, Response}; +pub use self::http_server::{Server, ServerHandler}; +pub use self::openssl::{get_openssl, set_ca_certificates}; +pub use self::test_client::TestClient; diff --git a/src/http/openssl.rs b/src/http/openssl.rs new file mode 100644 index 0000000..378b078 --- /dev/null +++ b/src/http/openssl.rs @@ -0,0 +1,47 @@ +use hyper::net::Openssl; +use openssl::ssl::{SSL_OP_NO_SSLV2, SSL_OP_NO_SSLV3}; +use openssl::ssl::{SslContext, SslMethod}; +use std::path::Path; +use std::sync::{Arc, Mutex}; + + +lazy_static! { + static ref OPENSSL: Arc<Mutex<Option<Openssl>>> = Arc::new(Mutex::new(None)); +} + +// default cipher list taken from the Servo project: +// https://github.com/servo/servo/blob/master/components/net/connector.rs#L18 +const DEFAULT_CIPHERS: &'static str = concat!( + "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:", + "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:", + "DHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-SHA256:", + "ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:", + "ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA:", + "ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:", + "DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:", + "ECDHE-ECDSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:", + "AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA" +); + +/// This function *must* be called before any call is made to `get_openssl()` +pub fn set_ca_certificates(path: &Path) { + info!("Setting OpenSSL CA certificates path to {:?}", path); + let mut openssl = OPENSSL.lock().unwrap(); + let mut context = SslContext::new(SslMethod::Sslv23).unwrap(); + context.set_CA_file(path).unwrap_or_else(|err| { + panic!("couldn't set CA certificates: {}", err); + }); + context.set_cipher_list(DEFAULT_CIPHERS).unwrap(); + context.set_options(SSL_OP_NO_SSLV2 | SSL_OP_NO_SSLV3); + *openssl = Some(Openssl { context: context }); +} + +/// This function will return a clone of `Openssl` where the CA certificates +/// have been bound with `set_ca_certificates()`. +pub fn get_openssl() -> Openssl { + if let Some(ref openssl) = *OPENSSL.lock().unwrap() { + openssl.clone() + } else { + panic!("CA certificates not set") + } +} diff --git a/src/http/test_client.rs b/src/http/test_client.rs new file mode 100644 index 0000000..7857e0f --- /dev/null +++ b/src/http/test_client.rs @@ -0,0 +1,35 @@ +use chan::Sender; +use std::cell::RefCell; + +use datatype::Error; +use http::{Client, Request, Response}; + + +/// The `TestClient` will return HTTP responses from an existing list of strings. +pub struct TestClient { + responses: RefCell<Vec<String>> +} + +impl Default for TestClient { + fn default() -> Self { + TestClient { responses: RefCell::new(Vec::new()) } + } +} + +impl TestClient { + /// Create a new `TestClient` that will return these responses. + pub fn from(responses: Vec<String>) -> TestClient { + TestClient { responses: RefCell::new(responses) } + } +} + +impl Client for TestClient { + fn chan_request(&self, req: Request, resp_tx: Sender<Response>) { + match self.responses.borrow_mut().pop() { + Some(body) => resp_tx.send(Ok(body.as_bytes().to_vec())), + None => resp_tx.send(Err(Error::Client(req.url.to_string()))) + } + } + + fn is_testing(&self) -> bool { true } +} diff --git a/src/interpreter.rs b/src/interpreter.rs new file mode 100644 index 0000000..b286ba5 --- /dev/null +++ b/src/interpreter.rs @@ -0,0 +1,364 @@ +use chan; +use chan::{Sender, Receiver}; +use std; +use std::borrow::Cow; + +use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config, + Error, Event, Package, UpdateRequestId}; +use gateway::Interpret; +use http::{AuthClient, Client}; +use oauth2::authenticate; +use package_manager::PackageManager; +use rvi::Services; +use sota::Sota; + + +/// An `Interpreter` loops over any incoming values, on receipt of which it +/// delegates to the `interpret` function which will respond with output values. +pub trait Interpreter<I, O> { + fn interpret(&mut self, input: I, otx: &Sender<O>); + + fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) { + loop { + self.interpret(irx.recv().expect("interpreter sender closed"), &otx); + } + } +} + + +/// The `EventInterpreter` listens for `Event`s and optionally responds with +/// `Command`s that may be sent to the `CommandInterpreter`. +pub struct EventInterpreter { + pub package_manager: PackageManager +} + +impl Interpreter<Event, Command> for EventInterpreter { + fn interpret(&mut self, event: Event, ctx: &Sender<Command>) { + info!("Event received: {}", event); + match event { + Event::NotAuthenticated => { + info!("Trying to authenticate again..."); + ctx.send(Command::Authenticate(None)); + } + + Event::NewUpdatesReceived(ids) => { + ctx.send(Command::StartDownload(ids)); + } + + Event::DownloadComplete(dl) => { + if self.package_manager != PackageManager::Off { + ctx.send(Command::StartInstall(dl)); + } + } + + Event::InstallComplete(report) => { + ctx.send(Command::SendUpdateReport(report)); + } + + Event::UpdateReportSent => { + if self.package_manager != PackageManager::Off { + self.package_manager.installed_packages().map(|packages| { + ctx.send(Command::SendInstalledPackages(packages)); + }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); + } + } + + _ => () + } + } +} + + +/// The `CommandInterpreter` wraps each incoming `Command` inside an `Interpret` +/// type with no response channel for sending to the `GlobalInterpreter`. +pub struct CommandInterpreter; + +impl Interpreter<Command, Interpret> for CommandInterpreter { + fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) { + info!("Command received: {}", cmd); + itx.send(Interpret { command: cmd, response_tx: None }); + } +} + + +/// The `GlobalInterpreter` interprets the `Command` inside incoming `Interpret` +/// messages, broadcasting `Event`s globally and (optionally) sending the final +/// outcome `Event` to the `Interpret` response channel. +pub struct GlobalInterpreter<'t> { + pub config: Config, + pub token: Option<Cow<'t, AccessToken>>, + pub http_client: Box<Client>, + pub rvi: Option<Services>, + pub loopback_tx: Sender<Interpret>, +} + +impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { + fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) { + info!("Interpreter started: {}", interpret.command); + + let (multi_tx, multi_rx) = chan::async::<Event>(); + let outcome = match (self.token.as_ref(), self.config.auth.is_none()) { + (Some(_), _) | (_, true) => self.authenticated(interpret.command, multi_tx), + _ => self.unauthenticated(interpret.command, multi_tx) + }; + + let mut response_ev: Option<Event> = None; + match outcome { + Ok(_) => { + for ev in multi_rx { + etx.send(ev.clone()); + response_ev = Some(ev); + } + info!("Interpreter finished."); + } + + Err(Error::Authorization(_)) => { + let ev = Event::NotAuthenticated; + etx.send(ev.clone()); + response_ev = Some(ev); + error!("Interpreter authentication failed"); + } + + Err(err) => { + let ev = Event::Error(format!("{}", err)); + etx.send(ev.clone()); + response_ev = Some(ev); + error!("Interpreter failed: {}", err); + } + } + + let ev = response_ev.expect("no response event to send back"); + interpret.response_tx.map(|tx| tx.lock().unwrap().send(ev)); + } +} + +impl<'t> GlobalInterpreter<'t> { + fn authenticated(&self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> { + let mut sota = Sota::new(&self.config, self.http_client.as_ref()); + + // always send at least one Event response + match cmd { + Command::Authenticate(_) => etx.send(Event::Authenticated), + + Command::GetNewUpdates => { + let mut updates = try!(sota.get_pending_updates()); + if updates.is_empty() { + etx.send(Event::NoNewUpdates); + } else { + updates.sort_by_key(|u| u.installPos); + let ids = updates.iter().map(|u| u.requestId.clone()).collect::<Vec<UpdateRequestId>>(); + etx.send(Event::NewUpdatesReceived(ids)) + } + } + + Command::ListInstalledPackages => { + let mut packages: Vec<Package> = Vec::new(); + if self.config.device.package_manager != PackageManager::Off { + packages = try!(self.config.device.package_manager.installed_packages()); + } + etx.send(Event::FoundInstalledPackages(packages)); + } + + Command::RefreshSystemInfo(post) => { + let info = try!(self.config.device.system_info.report()); + etx.send(Event::FoundSystemInfo(info.clone())); + if post { + let _ = sota.send_system_info(&info) + .map_err(|err| etx.send(Event::Error(format!("{}", err)))); + } + } + + Command::SendInstalledPackages(packages) => { + let _ = sota.send_installed_packages(&packages) + .map_err(|err| error!("couldn't send installed packages: {}", err)); + etx.send(Event::InstalledPackagesSent); + } + + Command::SendInstalledSoftware(sw) => { + if let Some(ref rvi) = self.rvi { + let _ = rvi.remote.lock().unwrap().send_installed_software(sw); + } + } + + Command::SendUpdateReport(report) => { + if let Some(ref rvi) = self.rvi { + let _ = rvi.remote.lock().unwrap().send_update_report(report); + } else { + let _ = sota.send_update_report(&report) + .map_err(|err| error!("couldn't send update report: {}", err)); + } + etx.send(Event::UpdateReportSent); + } + + Command::StartDownload(ref ids) => { + for id in ids { + etx.send(Event::DownloadingUpdate(id.clone())); + + if let Some(ref rvi) = self.rvi { + let _ = rvi.remote.lock().unwrap().send_download_started(id.clone()); + } else { + let _ = sota.download_update(id.clone()) + .map(|dl| etx.send(Event::DownloadComplete(dl))) + .map_err(|err| etx.send(Event::DownloadFailed(id.clone(), format!("{}", err)))); + } + } + } + + Command::StartInstall(dl) => { + let _ = sota.install_update(dl) + .map(|report| etx.send(Event::InstallComplete(report))) + .map_err(|report| etx.send(Event::InstallFailed(report))); + } + + Command::Shutdown => std::process::exit(0), + } + + Ok(()) + } + + fn unauthenticated(&mut self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> { + match cmd { + Command::Authenticate(_) => { + let config = self.config.auth.clone().expect("trying to authenticate without auth config"); + self.set_client(Auth::Credentials(ClientId(config.client_id), + ClientSecret(config.client_secret))); + let server = config.server.join("/token").expect("couldn't build authentication url"); + let token = try!(authenticate(server, self.http_client.as_ref())); + self.set_client(Auth::Token(token.clone())); + self.token = Some(token.into()); + etx.send(Event::Authenticated); + } + + Command::GetNewUpdates | + Command::ListInstalledPackages | + Command::RefreshSystemInfo(_) | + Command::SendInstalledPackages(_) | + Command::SendInstalledSoftware(_) | + Command::SendUpdateReport(_) | + Command::StartDownload(_) | + Command::StartInstall(_) => etx.send(Event::NotAuthenticated), + + Command::Shutdown => std::process::exit(0), + } + + Ok(()) + } + + fn set_client(&mut self, auth: Auth) { + if !self.http_client.is_testing() { + self.http_client = Box::new(AuthClient::from(auth)); + } + } +} + + +#[cfg(test)] +mod tests { + use chan; + use chan::{Sender, Receiver}; + use std::thread; + + use super::*; + use datatype::{AccessToken, Command, Config, DownloadComplete, Event, + UpdateReport, UpdateResultCode}; + use gateway::Interpret; + use http::test_client::TestClient; + use package_manager::PackageManager; + use package_manager::tpm::assert_rx; + + + fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) { + let (etx, erx) = chan::sync::<Event>(0); + let (ctx, crx) = chan::sync::<Command>(0); + let (itx, _) = chan::sync::<Interpret>(0); + + thread::spawn(move || { + let mut gi = GlobalInterpreter { + config: Config::default(), + token: Some(AccessToken::default().into()), + http_client: Box::new(TestClient::from(replies)), + rvi: None, + loopback_tx: itx, + }; + gi.config.device.package_manager = pkg_mgr; + + loop { + match crx.recv() { + Some(cmd) => gi.interpret(Interpret { command: cmd, response_tx: None }, &etx), + None => break + } + } + }); + + (ctx, erx) + } + + #[test] + fn already_authenticated() { + let replies = Vec::new(); + let pkg_mgr = PackageManager::new_tpm(true); + let (ctx, erx) = new_interpreter(replies, pkg_mgr); + + ctx.send(Command::Authenticate(None)); + assert_rx(erx, &[Event::Authenticated]); + } + + #[test] + fn download_updates() { + let replies = vec!["[]".to_string(); 10]; + let pkg_mgr = PackageManager::new_tpm(true); + let (ctx, erx) = new_interpreter(replies, pkg_mgr); + + ctx.send(Command::StartDownload(vec!["1".to_string(), "2".to_string()])); + assert_rx(erx, &[ + Event::DownloadingUpdate("1".to_string()), + Event::DownloadComplete(DownloadComplete { + update_id: "1".to_string(), + update_image: "/tmp/1".to_string(), + signature: "".to_string() + }), + Event::DownloadingUpdate("2".to_string()), + Event::DownloadComplete(DownloadComplete { + update_id: "2".to_string(), + update_image: "/tmp/2".to_string(), + signature: "".to_string() + }), + ]); + } + + #[test] + fn install_update() { + let replies = vec!["[]".to_string(); 10]; + let pkg_mgr = PackageManager::new_tpm(true); + let (ctx, erx) = new_interpreter(replies, pkg_mgr); + + ctx.send(Command::StartInstall(DownloadComplete { + update_id: "1".to_string(), + update_image: "/tmp/1".to_string(), + signature: "".to_string() + })); + assert_rx(erx, &[ + Event::InstallComplete( + UpdateReport::single("1".to_string(), UpdateResultCode::OK, "".to_string()) + ) + ]); + } + + #[test] + fn failed_installation() { + let replies = vec!["[]".to_string(); 10]; + let pkg_mgr = PackageManager::new_tpm(false); + let (ctx, erx) = new_interpreter(replies, pkg_mgr); + + ctx.send(Command::StartInstall(DownloadComplete { + update_id: "1".to_string(), + update_image: "/tmp/1".to_string(), + signature: "".to_string() + })); + assert_rx(erx, &[ + Event::InstallFailed( + UpdateReport::single("1".to_string(), UpdateResultCode::INSTALL_FAILED, "failed".to_string()) + ) + ]); + } +} @@ -1,65 +1,27 @@ -//! This is the client in-vehicle portion of the SOTA project. See the [main SOTA Server -//! project](https://github.com/advancedtelematic/rvi_sota_server) and [associated architecture -//! document](http://advancedtelematic.github.io/rvi_sota_server/dev/architecture.html) for more -//! information. +#[macro_use] extern crate nom; // use before log to avoid error!() macro conflict +#[macro_use] extern crate chan; +extern crate crossbeam; +extern crate crypto; +extern crate dbus; extern crate hyper; +extern crate openssl; +#[macro_use] extern crate lazy_static; +#[macro_use] extern crate log; +extern crate rand; extern crate rustc_serialize; extern crate time; -extern crate url; -extern crate crypto; extern crate toml; -extern crate dbus; - -#[macro_use] extern crate log; -extern crate env_logger; - -#[cfg(test)] extern crate rand; - -#[cfg(test)] -#[macro_use] -mod test_library; - -/// Try to unwrap or log the error and run the second argument -/// -/// # Arguments -/// 1. Expression to evaluate, needs to return a `Result<T, E> where E: Display` type. -/// 2. Expression to run on errors, after logging the error as error message. -#[macro_export] -macro_rules! try_or { - ($expr:expr, $finalize:expr) => { - match $expr { - Ok(val) => val, - Err(e) => { - error!("{}", e); - $finalize; - } - } - } -} - -/// Try to unwrap or log the provided message and run the third argument -/// -/// # Arguments -/// 1. Expression to evaluate, needs to return a `Result<T, E>` type. -/// 2. Expression that returns a Object implementing the `Display` trait. This object will be -/// logged as a error message with `error!()` -/// 3. Expression to run on errors, after printing a error message. -#[macro_export] -macro_rules! try_msg_or { - ($expr:expr, $msg:expr, $finalize:expr) => { - match $expr { - Ok(val) => val, - Err(..) => { - error!("{}", $msg); - $finalize; - } - } - } -} - -mod event; -mod remote; - -pub mod configuration; -pub mod genivi; +extern crate unix_socket; +extern crate url; +extern crate ws; + +pub mod broadcast; +pub mod datatype; +pub mod gateway; +pub mod http; +pub mod interpreter; +pub mod oauth2; +pub mod package_manager; +pub mod rvi; +pub mod sota; diff --git a/src/main.rs b/src/main.rs index c55a339..61fa02a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,85 +1,308 @@ -//! Logic for starting and configuring the sota_client from the command line. - -extern crate sota_client; -#[macro_use] extern crate log; +#[macro_use] extern crate chan; +extern crate chan_signal; +extern crate crossbeam; extern crate env_logger; extern crate getopts; +extern crate hyper; +#[macro_use] extern crate log; +extern crate rustc_serialize; +#[macro_use] extern crate sota; +extern crate time; +use chan::{Sender, Receiver}; +use chan_signal::Signal; +use env_logger::LogBuilder; +use getopts::Options; +use log::{LogLevelFilter, LogRecord}; use std::env; -use getopts::{Options, Matches}; -use sota_client::configuration::Configuration; -use sota_client::genivi; - -/// Helper function to print usage information to stdout. -/// -/// # Arguments -/// * `program`: The invoking path or name of the executable -/// * `opts`: A pointer to a `Options` object, which generates the actual documentation. See the -/// [getopts documentation](https://doc.rust-lang.org/getopts/getopts/index.html) for details. -#[cfg_attr(test, allow(dead_code))] -fn print_usage(program: &str, opts: &Options) { - let brief = format!("Usage: {} [options]", program); - print!("{}", opts.usage(&brief)); +use std::collections::HashMap; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use sota::datatype::{Command, Config, Event, SystemInfo}; +use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket}; +use sota::broadcast::Broadcast; +use sota::http::{AuthClient, set_ca_certificates}; +use sota::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, GlobalInterpreter}; +use sota::package_manager::PackageManager; +use sota::rvi::{Edge, Services}; + + +macro_rules! exit { + ($fmt:expr, $($arg:tt)*) => {{ + print!(concat!($fmt, "\n"), $($arg)*); + std::process::exit(1); + }} } -/// Parses the command line and matches it against accepted flags and options. Returns a `Matches` -/// object. See the [getopts documentation](https://doc.rust-lang.org/getopts/getopts/index.html) -/// for details. -/// -/// # Arguments -/// * `args`: A pointer to a Array of Strings. This is supposed to be the commandline as returned -/// by [`env::args().collect()`](https://doc.rust-lang.org/stable/std/env/fn.args.html). -/// * `program`: The invoking path or name of the executable -fn match_args(args: &[String], program: &str) -> Matches { - let mut options = Options::new(); - options.optflag("h", "help", "print this help message"); - options.optopt("c", "config", "change the path where the configuration \ - is expected", "FILE"); - options.optopt("r", "rvi", "explicitly set the URL, where RVI can be \ - reached", "URL"); - options.optopt("e", "edge", "explicitly set the host and port, where the \ - client should listen for connections from RVI", "HOST:PORT"); - - let matches = match options.parse(args) { - Ok(m) => { m } - Err(f) => { - error!("{}", f.to_string()); - print_usage(program, &options); - std::process::exit(1); + +fn start_signal_handler(signals: Receiver<Signal>) { + loop { + match signals.recv() { + Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0), + _ => () } - }; + } +} - if matches.opt_present("h") { - print_usage(program, &options); - std::process::exit(0); +fn start_update_poller(interval: u64, itx: Sender<Interpret>) { + let (etx, erx) = chan::async::<Event>(); + let tick = chan::tick(Duration::from_secs(interval)); + loop { + let _ = tick.recv(); + itx.send(Interpret { + command: Command::GetNewUpdates, + response_tx: Some(Arc::new(Mutex::new(etx.clone()))) + }); + let _ = erx.recv(); + } +} + +fn send_startup_commands(config: &Config, ctx: &Sender<Command>) { + ctx.send(Command::Authenticate(None)); + ctx.send(Command::RefreshSystemInfo(true)); + + if config.device.package_manager != PackageManager::Off { + config.device.package_manager.installed_packages().map(|packages| { + ctx.send(Command::SendInstalledPackages(packages)); + }).unwrap_or_else(|err| exit!("Couldn't get list of packages: {}", err)); } - matches } -/// Program entrypoint. Parses command line arguments and starts the main loop accordingly. -#[cfg_attr(test, allow(dead_code))] fn main() { - env_logger::init().unwrap(); - let args: Vec<String> = env::args().collect(); - let program: &str = &args[0]; - let matches = match_args(&args[1..], program); - - let conf_file = matches.opt_str("c") - .unwrap_or(Configuration::default_path()); - let configuration = match Configuration::read(&conf_file) { - Ok(value) => value, - Err(e) => { - error!("Couldn't parse configuration file at {}: {}", conf_file, e); - std::process::exit(126); + setup_logging(); + + let config = build_config(); + set_ca_certificates(Path::new(&config.device.certificates_path)); + + let (etx, erx) = chan::async::<Event>(); + let (ctx, crx) = chan::async::<Command>(); + let (itx, irx) = chan::async::<Interpret>(); + + let mut broadcast = Broadcast::new(erx); + send_startup_commands(&config, &ctx); + + crossbeam::scope(|scope| { + // Must subscribe to the signal before spawning ANY other threads + let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]); + scope.spawn(move || start_signal_handler(signals)); + + let poll_tick = config.device.polling_interval; + let poll_itx = itx.clone(); + scope.spawn(move || start_update_poller(poll_tick, poll_itx)); + + if config.gateway.console { + let cons_itx = itx.clone(); + let cons_sub = broadcast.subscribe(); + scope.spawn(move || Console.start(cons_itx, cons_sub)); + } + + if config.gateway.dbus { + let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway")); + let dbus_itx = itx.clone(); + let dbus_sub = broadcast.subscribe(); + let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() }; + scope.spawn(move || dbus.start(dbus_itx, dbus_sub)); } - }; - let rvi_url: String = matches.opt_str("r") - .unwrap_or(configuration.client.rvi_url.clone() - .unwrap_or("http://localhost:8901".to_string())); - let edge_url: String = matches.opt_str("e") - .unwrap_or(configuration.client.edge_url.clone() - .unwrap_or("localhost:9080".to_string())); + if config.gateway.http { + let http_itx = itx.clone(); + let http_sub = broadcast.subscribe(); + let mut http = Http { server: config.network.http_server.clone() }; + scope.spawn(move || http.start(http_itx, http_sub)); + } + + let mut rvi = None; + if config.gateway.rvi { + let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway")); + let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway")); + let rvi_edge = config.network.rvi_edge_server.clone(); + let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone()); + let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone()); + scope.spawn(move || edge.start()); + rvi = Some(services); + } + + if config.gateway.socket { + let socket_itx = itx.clone(); + let socket_sub = broadcast.subscribe(); + let mut socket = Socket { + commands_path: config.network.socket_commands_path.clone(), + events_path: config.network.socket_events_path.clone() + }; + scope.spawn(move || socket.start(socket_itx, socket_sub)); + } + + if config.gateway.websocket { + let ws_server = config.network.websocket_server.clone(); + let ws_itx = itx.clone(); + let ws_sub = broadcast.subscribe(); + let mut ws = Websocket { server: ws_server, clients: Arc::new(Mutex::new(HashMap::new())) }; + scope.spawn(move || ws.start(ws_itx, ws_sub)); + } + + let event_sub = broadcast.subscribe(); + let event_ctx = ctx.clone(); + let event_mgr = config.device.package_manager.clone(); + scope.spawn(move || EventInterpreter { + package_manager: event_mgr + }.run(event_sub, event_ctx)); + + let cmd_itx = itx.clone(); + scope.spawn(move || CommandInterpreter.run(crx, cmd_itx)); + + scope.spawn(move || GlobalInterpreter { + config: config, + token: None, + http_client: Box::new(AuthClient::default()), + rvi: rvi, + loopback_tx: itx, + }.run(irx, etx)); + + scope.spawn(move || broadcast.start()); + }); +} + +fn setup_logging() { + let version = option_env!("SOTA_VERSION").unwrap_or("?"); + let mut builder = LogBuilder::new(); + builder.format(move |record: &LogRecord| { + let timestamp = format!("{}", time::now_utc().rfc3339()); + format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args()) + }); + builder.filter(Some("hyper"), LogLevelFilter::Info); + + let _ = env::var("RUST_LOG").map(|level| builder.parse(&level)); + builder.init().expect("env_logger::init() called twice, blame the programmers."); +} + +fn build_config() -> Config { + let args = env::args().collect::<Vec<String>>(); + let program = args[0].clone(); + let mut opts = Options::new(); + + opts.optflag("h", "help", "print this help menu"); + opts.optopt("", "config", "change config path", "PATH"); + + opts.optopt("", "auth-server", "change the auth server", "URL"); + opts.optopt("", "auth-client-id", "change the auth client id", "ID"); + opts.optopt("", "auth-client-secret", "change the auth client secret", "SECRET"); + opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH"); + + opts.optopt("", "core-server", "change the core server", "URL"); + + opts.optopt("", "dbus-name", "change the dbus registration name", "NAME"); + opts.optopt("", "dbus-path", "change the dbus path", "PATH"); + opts.optopt("", "dbus-interface", "change the dbus interface name", "INTERFACE"); + opts.optopt("", "dbus-software-manager", "change the dbus software manager name", "NAME"); + opts.optopt("", "dbus-software-manager-path", "change the dbus software manager path", "PATH"); + opts.optopt("", "dbus-timeout", "change the dbus installation timeout", "TIMEOUT"); + + opts.optopt("", "device-uuid", "change the device uuid", "UUID"); + opts.optopt("", "device-vin", "change the device vin", "VIN"); + opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH"); + opts.optopt("", "device-package-manager", "change the package manager", "MANAGER"); + opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL"); + opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH"); + opts.optopt("", "device-system-info", "change the system information command", "PATH"); + + opts.optopt("", "gateway-console", "toggle the console gateway", "BOOL"); + opts.optopt("", "gateway-dbus", "toggle the dbus gateway", "BOOL"); + opts.optopt("", "gateway-http", "toggle the http gateway", "BOOL"); + opts.optopt("", "gateway-rvi", "toggle the rvi gateway", "BOOL"); + opts.optopt("", "gateway-socket", "toggle the unix domain socket gateway", "BOOL"); + opts.optopt("", "gateway-websocket", "toggle the websocket gateway", "BOOL"); + + opts.optopt("", "network-http-server", "change the http server gateway address", "ADDR"); + opts.optopt("", "network-rvi-edge-server", "change the rvi edge server gateway address", "ADDR"); + opts.optopt("", "network-socket-commands-path", "change the socket path for reading commands", "PATH"); + opts.optopt("", "network-socket-events-path", "change the socket path for sending events", "PATH"); + opts.optopt("", "network-websocket-server", "change the websocket gateway address", "ADDR"); + + opts.optopt("", "rvi-client", "change the rvi client URL", "URL"); + opts.optopt("", "rvi-storage-dir", "change the rvi storage directory", "PATH"); + opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT"); + + let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string())); + if matches.opt_present("h") { + exit!("{}", opts.usage(&format!("Usage: {} [options]", program))); + } + + let config_file = matches.opt_str("config").unwrap_or_else(|| { + env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided.")) + }); + let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err)); + + config.auth.as_mut().map(|auth_cfg| { + matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id); + matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret); + matches.opt_str("auth-server").map(|text| { + auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err)); + }); + }); + + matches.opt_str("core-server").map(|text| { + config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err)); + }); + + config.dbus.as_mut().map(|dbus_cfg| { + matches.opt_str("dbus-name").map(|name| dbus_cfg.name = name); + matches.opt_str("dbus-path").map(|path| dbus_cfg.path = path); + matches.opt_str("dbus-interface").map(|interface| dbus_cfg.interface = interface); + matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr); + matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path); + matches.opt_str("dbus-timeout").map(|timeout| { + dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err)); + }); + }); + + matches.opt_str("device-uuid").map(|uuid| config.device.uuid = uuid); + matches.opt_str("device-vin").map(|vin| config.device.vin = vin); + matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path); + matches.opt_str("device-package-manager").map(|text| { + config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err)); + }); + matches.opt_str("device-polling-interval").map(|interval| { + config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err)); + }); + matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs); + matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(cmd)); + + matches.opt_str("gateway-console").map(|console| { + config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err)); + }); + matches.opt_str("gateway-dbus").map(|dbus| { + config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err)); + }); + matches.opt_str("gateway-http").map(|http| { + config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err)); + }); + matches.opt_str("gateway-rvi").map(|rvi| { + config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err)); + }); + matches.opt_str("gateway-socket").map(|socket| { + config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err)); + }); + matches.opt_str("gateway-websocket").map(|websocket| { + config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err)); + }); + + matches.opt_str("network-http-server").map(|server| config.network.http_server = server); + matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server); + matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path); + matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path); + matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server); + + config.rvi.as_mut().map(|rvi_cfg| { + matches.opt_str("rvi-client").map(|url| { + rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err)); + }); + matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir); + matches.opt_str("rvi-timeout").map(|timeout| { + rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err))); + }); + }); - genivi::start::start(&configuration, rvi_url, edge_url); + config } diff --git a/src/oauth2.rs b/src/oauth2.rs new file mode 100644 index 0000000..0c5f152 --- /dev/null +++ b/src/oauth2.rs @@ -0,0 +1,53 @@ +use rustc_serialize::json; + +use datatype::{AccessToken, Error, Url}; +use http::Client; + + +/// Authenticate with the specified OAuth2 server to retrieve a new `AccessToken`. +pub fn authenticate(server: Url, client: &Client) -> Result<AccessToken, Error> { + debug!("authenticating at {}", server); + let resp_rx = client.post(server, None); + let resp = resp_rx.recv().expect("no authenticate response received"); + let data = try!(resp); + let body = try!(String::from_utf8(data)); + Ok(try!(json::decode(&body))) +} + + +#[cfg(test)] +mod tests { + use super::*; + use datatype::{AccessToken, Url}; + use http::TestClient; + + + fn test_server() -> Url { + "http://localhost:8000".parse().unwrap() + } + + #[test] + fn test_authenticate() { + let token = r#"{ + "access_token": "token", + "token_type": "type", + "expires_in": 10, + "scope": "scope1 scope2" + }"#; + let client = TestClient::from(vec![token.to_string()]); + let expect = AccessToken { + access_token: "token".to_string(), + token_type: "type".to_string(), + expires_in: 10, + scope: "scope1 scope2".to_string() + }; + assert_eq!(expect, authenticate(test_server(), &client).unwrap()); + } + + #[test] + fn test_authenticate_bad_json() { + let client = TestClient::from(vec![r#"{"apa": 1}"#.to_string()]); + let expect = r#"Failed to decode JSON: MissingFieldError("access_token")"#; + assert_eq!(expect, format!("{}", authenticate(test_server(), &client).unwrap_err())); + } +} diff --git a/src/package_manager/deb.rs b/src/package_manager/deb.rs new file mode 100644 index 0000000..bba86e6 --- /dev/null +++ b/src/package_manager/deb.rs @@ -0,0 +1,48 @@ +use std::process::Command; + +use datatype::{Error, Package, UpdateResultCode}; +use package_manager::package_manager::{InstallOutcome, parse_package}; + + +/// Returns a list of installed DEB packages with +/// `dpkg-query -f='${Package} ${Version}\n -W`. +pub fn installed_packages() -> Result<Vec<Package>, Error> { + Command::new("dpkg-query").arg("-f='${Package} ${Version}\n'").arg("-W") + .output() + .map_err(|e| Error::Package(format!("Error fetching packages: {}", e))) + .and_then(|c| { + String::from_utf8(c.stdout) + .map_err(|e| Error::Parse(format!("Error parsing package: {}", e))) + .map(|s| s.lines().map(String::from).collect::<Vec<String>>()) + }) + .and_then(|lines| { + lines.iter() + .map(|line| parse_package(line)) + .filter(|pkg| pkg.is_ok()) + .collect::<Result<Vec<Package>, _>>() + }) +} + +/// Installs a new DEB package. +pub fn install_package(path: &str) -> Result<InstallOutcome, InstallOutcome> { + let output = try!(Command::new("dpkg").arg("-E").arg("-i").arg(path) + .output() + .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e)))); + + let stdout = String::from_utf8_lossy(&output.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + + match output.status.code() { + Some(0) => { + if (&stdout).contains("already installed") { + Ok((UpdateResultCode::ALREADY_PROCESSED, stdout)) + } else { + Ok((UpdateResultCode::OK, stdout)) + } + } + _ => { + let out = format!("stdout: {}\nstderr: {}", stdout, stderr); + Err((UpdateResultCode::INSTALL_FAILED, out)) + } + } +} diff --git a/src/package_manager/mod.rs b/src/package_manager/mod.rs new file mode 100644 index 0000000..6596686 --- /dev/null +++ b/src/package_manager/mod.rs @@ -0,0 +1,8 @@ +pub mod deb; +pub mod package_manager; +pub mod rpm; +pub mod tpm; +pub mod otb; + +pub use self::package_manager::PackageManager; +pub use self::tpm::{assert_rx, TestDir}; diff --git a/src/package_manager/otb.rs b/src/package_manager/otb.rs new file mode 100644 index 0000000..fece27b --- /dev/null +++ b/src/package_manager/otb.rs @@ -0,0 +1,53 @@ +use std::process::Command; + +use datatype::{Error, Package, UpdateResultCode}; +use package_manager::package_manager::{InstallOutcome, parse_package}; + + +/// Returns a list of installed `OSTree` packages with +/// `otbpkg --repo=${repodir} --query`. +pub fn installed_packages(repodir: &str) -> Result<Vec<Package>, Error> { + Command::new("otbpkg") + .arg(format!{"--repo={}", repodir}) + .arg("--query") + .output() + .map_err(|e| Error::Package(format!("Error fetching packages: {}", e))) + .and_then(|c| { + String::from_utf8(c.stdout) + .map_err(|e| Error::Parse(format!("Error parsing package: {}", e))) + .map(|s| s.lines().map(String::from).collect::<Vec<String>>()) + }) + .and_then(|lines| { + lines.iter() + .map(|line| parse_package(line)) + .filter(|pkg| pkg.is_ok()) + .collect::<Result<Vec<Package>, _>>() + }) +} + +/// Installs a new `OSTree` package. +pub fn install_package(repodir: &str, path: &str) -> Result<InstallOutcome, InstallOutcome> { + let output = try!(Command::new("otbpkg") + .arg("--install") + .arg(format!("--repo={}", repodir)) + .arg(path) + .output() + .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e)))); + + let stdout = String::from_utf8_lossy(&output.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + + match output.status.code() { + Some(0) => { + if (&stdout).contains("already installed") { + Ok((UpdateResultCode::ALREADY_PROCESSED, stdout)) + } else { + Ok((UpdateResultCode::OK, stdout)) + } + } + _ => { + let out = format!("stdout: {}\nstderr: {}", stdout, stderr); + Err((UpdateResultCode::INSTALL_FAILED, out)) + } + } +} diff --git a/src/package_manager/package_manager.rs b/src/package_manager/package_manager.rs new file mode 100644 index 0000000..09556a0 --- /dev/null +++ b/src/package_manager/package_manager.rs @@ -0,0 +1,135 @@ +use rustc_serialize::{Decoder, Decodable}; +use std::str::FromStr; + +use datatype::{Error, Package, UpdateResultCode}; +use package_manager::{deb, otb, rpm, tpm}; + + +/// The outcome when installing a package as a tuple of the `UpdateResultCode` +/// and any stdout/stderr output. +pub type InstallOutcome = (UpdateResultCode, String); + +/// An enumeration of the available package managers for querying and installing +/// new packages. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum PackageManager { + Off, + Deb, + Rpm, + File { filename: String, succeeds: bool }, + OSTree { repodir: String } +} + +impl PackageManager { + /// Delegates to the package manager specific function for returning a list + /// of installed packages. + pub fn installed_packages(&self) -> Result<Vec<Package>, Error> { + match *self { + PackageManager::Off => panic!("no package manager"), + PackageManager::Deb => deb::installed_packages(), + PackageManager::Rpm => rpm::installed_packages(), + PackageManager::File { ref filename, .. } => tpm::installed_packages(filename), + PackageManager::OSTree { ref repodir } => otb::installed_packages(repodir), + } + } + + /// Delegates to the package manager specific function for installing a new + /// package on the device. + pub fn install_package(&self, path: &str) -> Result<InstallOutcome, InstallOutcome> { + match *self { + PackageManager::Off => panic!("no package manager"), + PackageManager::Deb => deb::install_package(path), + PackageManager::Rpm => rpm::install_package(path), + PackageManager::File { ref filename, succeeds } => { + tpm::install_package(filename, path, succeeds) + } + PackageManager::OSTree { ref repodir } => { + otb::install_package(repodir, path) + } + } + } + + /// Returns a string representation of the package manager's extension. + pub fn extension(&self) -> String { + match *self { + PackageManager::Off => panic!("no package manager"), + PackageManager::Deb => "deb".to_string(), + PackageManager::Rpm => "rpm".to_string(), + PackageManager::File { ref filename, .. } => filename.to_string(), + PackageManager::OSTree {..} => "otb".to_string(), + } + } +} + +impl FromStr for PackageManager { + type Err = Error; + + fn from_str(s: &str) -> Result<PackageManager, Error> { + match s.to_lowercase().as_str() { + "off" => Ok(PackageManager::Off), + "deb" => Ok(PackageManager::Deb), + "rpm" => Ok(PackageManager::Rpm), + + file if file.len() > 5 && file[..5].as_bytes() == b"file:" => { + Ok(PackageManager::File { filename: file[5..].to_string(), succeeds: true }) + }, + + repo if repo.len() > 4 && repo[..4].as_bytes() == b"otb:" => { + Ok(PackageManager::OSTree { repodir: repo[4..].to_string() }) + } + + _ => Err(Error::Parse(format!("unknown package manager: {}", s))) + } + } +} + +impl Decodable for PackageManager { + fn decode<D: Decoder>(d: &mut D) -> Result<PackageManager, D::Error> { + d.read_str().and_then(|s| Ok(s.parse::<PackageManager>().expect("couldn't parse PackageManager"))) + } +} + +pub fn parse_package(line: &str) -> Result<Package, Error> { + match line.splitn(2, ' ').collect::<Vec<_>>() { + ref parts if parts.len() == 2 => { + // HACK: strip left single quotes from stdout + Ok(Package { + name: String::from(parts[0].trim_left_matches('\'')), + version: String::from(parts[1]) + }) + }, + _ => Err(Error::Parse(format!("Couldn't parse package: {}", line))) + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use datatype::Package; + + + #[test] + fn test_parses_normal_package() { + assert_eq!(parse_package("uuid-runtime 2.20.1-5.1ubuntu20.7").unwrap(), + Package { + name: "uuid-runtime".to_string(), + version: "2.20.1-5.1ubuntu20.7".to_string() + }); + } + + #[test] + fn test_separates_name_and_version_correctly() { + assert_eq!(parse_package("vim 2.1 foobar").unwrap(), + Package { + name: "vim".to_string(), + version: "2.1 foobar".to_string() + }); + } + + #[test] + fn test_rejects_bogus_input() { + assert_eq!(format!("{}", parse_package("foobar").unwrap_err()), + "Parse error: Couldn't parse package: foobar".to_string()); + } +} diff --git a/src/package_manager/rpm.rs b/src/package_manager/rpm.rs new file mode 100644 index 0000000..99aacbf --- /dev/null +++ b/src/package_manager/rpm.rs @@ -0,0 +1,46 @@ +use std::process::Command; + +use datatype::{Error, Package, UpdateResultCode}; +use package_manager::package_manager::{InstallOutcome, parse_package}; + + +/// Returns a list of installed RPM packages with +/// `rpm -qa ==queryformat ${NAME} ${VERSION}\n`. +pub fn installed_packages() -> Result<Vec<Package>, Error> { + Command::new("rpm").arg("-qa").arg("--queryformat").arg("%{NAME} %{VERSION}\n") + .output() + .map_err(|e| Error::Package(format!("Error fetching packages: {}", e))) + .and_then(|c| { + String::from_utf8(c.stdout) + .map_err(|e| Error::Parse(format!("Error parsing package: {}", e))) + .map(|s| s.lines().map(String::from).collect::<Vec<String>>()) + }) + .and_then(|lines| { + lines.iter() + .map(|line| parse_package(line)) + .filter(|item| item.is_ok()) + .collect::<Result<Vec<Package>, _>>() + }) +} + +/// Installs a new RPM package. +pub fn install_package(path: &str) -> Result<InstallOutcome, InstallOutcome> { + let output = try!(Command::new("rpm").arg("-Uvh").arg("--force").arg(path) + .output() + .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e)))); + + let stdout = String::from_utf8_lossy(&output.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + + match output.status.code() { + Some(0) => Ok((UpdateResultCode::OK, stdout)), + _ => { + let out = format!("stdout: {}\nstderr: {}", stdout, stderr); + if (&stderr).contains("already installed") { + Ok((UpdateResultCode::ALREADY_PROCESSED, out)) + } else { + Err((UpdateResultCode::INSTALL_FAILED, out)) + } + } + } +} diff --git a/src/package_manager/tpm.rs b/src/package_manager/tpm.rs new file mode 100644 index 0000000..feafe0a --- /dev/null +++ b/src/package_manager/tpm.rs @@ -0,0 +1,162 @@ +use chan::Receiver; +use std::fmt::Debug; +use std::fs; +use std::fs::File; +use std::fs::OpenOptions; +use std::io::BufReader; +use std::io::prelude::*; +use time; + +use datatype::{Error, Package, UpdateResultCode}; +use package_manager::package_manager::{InstallOutcome, PackageManager}; + + +impl PackageManager { + /// Creates a new Test Package Manager that writes to a temporary file. + pub fn new_tpm(succeeds: bool) -> Self { + let name = format!("/tmp/sota-tpm-{}", time::precise_time_ns().to_string()); + if succeeds { + let _ = File::create(name.clone()).expect("couldn't create Test Package Manager file"); + } + PackageManager::File { filename: name, succeeds: succeeds } + } +} + + +/// Encapsulate a directory whose contents will be destroyed when it drops out of scope. +pub struct TestDir(pub String); + +impl TestDir { + /// Create a new test directory that will be destroyed when it drops out of scope. + pub fn new(reason: &str) -> TestDir { + let dir = format!("/tmp/{}-{}", reason, time::precise_time_ns().to_string()); + fs::create_dir_all(dir.clone()).expect("couldn't create TempDir"); + TestDir(dir) + } +} + +impl Drop for TestDir { + fn drop(&mut self) { + fs::remove_dir_all(&self.0.clone()).expect("couldn't remove TempDir"); + } +} + + +/// For each item in the list, assert that it equals the next `Receiver` value. +pub fn assert_rx<X: PartialEq + Debug>(rx: Receiver<X>, xs: &[X]) { + let n = xs.len(); + let mut xs = xs.iter(); + for _ in 0..n { + let val = rx.recv().expect("assert_rx expected another val"); + let x = xs.next().expect(&format!("assert_rx: no match for val: {:?}", val)); + assert_eq!(val, *x); + } +} + + +/// Returns a list of installed packages from a format of `<name> <version>`. +pub fn installed_packages(path: &str) -> Result<Vec<Package>, Error> { + let f = try!(File::open(path)); + let reader = BufReader::new(f); + let mut pkgs = Vec::new(); + + for line in reader.lines() { + let line = try!(line); + let parts = line.split(' '); + + if parts.clone().count() == 2 { + if let Some(name) = parts.clone().nth(0) { + if let Some(version) = parts.clone().nth(1) { + pkgs.push(Package { + name: name.to_string(), + version: version.to_string() + }); + } + } + } + } + + Ok(pkgs) +} + +/// Installs a package to the specified path when succeeds is true, or fails otherwise. +pub fn install_package(path: &str, pkg: &str, succeeds: bool) -> Result<InstallOutcome, InstallOutcome> { + if !succeeds { + return Err((UpdateResultCode::INSTALL_FAILED, "failed".to_string())) + } + + let outcome = || -> Result<(), Error> { + let mut f = OpenOptions::new().create(true).write(true).append(true).open(path).unwrap(); + try!(f.write(pkg.as_bytes())); + try!(f.write(b"\n")); + Ok(()) + }(); + + match outcome { + Ok(_) => Ok((UpdateResultCode::OK, "".to_string())), + Err(err) => Err((UpdateResultCode::INSTALL_FAILED, format!("{:?}", err))) + } +} + + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::prelude::*; + + use super::*; + use datatype::Package; + + + fn pkg1() -> Package { + Package { + name: "apa".to_string(), + version: "0.0.0".to_string() + } + } + + fn pkg2() -> Package { + Package { + name: "bepa".to_string(), + version: "1.0.0".to_string() + } + } + + + #[test] + fn get_installed_packages() { + let dir = TestDir::new("sota-tpm-test-1"); + let path = format!("{}/tpm", dir.0); + let mut f = File::create(path.clone()).unwrap(); + f.write(b"apa 0.0.0\n").unwrap(); + f.write(b"bepa 1.0.0").unwrap(); + assert_eq!(installed_packages(&path).unwrap(), vec![pkg1(), pkg2()]); + } + + #[test] + fn ignore_bad_installed_packages() { + let dir = TestDir::new("sota-tpm-test-2"); + let path = format!("{}/tpm", dir.0); + let mut f = File::create(path.clone()).unwrap(); + f.write(b"cepa-2.0.0\n").unwrap(); + assert_eq!(installed_packages(&path).unwrap(), Vec::new()); + } + + #[test] + fn install_packages() { + let dir = TestDir::new("sota-tpm-test-3"); + let path = format!("{}/tpm", dir.0); + install_package(&path, "apa 0.0.0", true).unwrap(); + install_package(&path, "bepa 1.0.0", true).unwrap(); + assert_eq!(installed_packages(&path).unwrap(), vec![pkg1(), pkg2()]); + } + + #[test] + fn failed_installation() { + let dir = TestDir::new("sota-tpm-test-4"); + let path = format!("{}/tpm", dir.0); + assert!(install_package(&path, "apa 0.0.0", false).is_err()); + install_package(&path, "bepa 1.0.0", true).unwrap(); + assert_eq!(installed_packages(&path).unwrap(), vec![pkg2()]); + } +} diff --git a/src/remote/dw.rs b/src/remote/dw.rs deleted file mode 100644 index 0fd8b8e..0000000 --- a/src/remote/dw.rs +++ /dev/null @@ -1,581 +0,0 @@ -//! Handles caching and storage on disk for in-progress transfers and the assembly and verification -//! of finished transfers - -use std::fs; -use std::fs::{OpenOptions, DirEntry, File}; -use std::io::prelude::*; -use std::path::PathBuf; -use std::vec::Vec; -use std::str::FromStr; - -use time; - -#[cfg(test)] use rand; -#[cfg(test)] use rand::Rng; -#[cfg(test)] use test_library::PathPrefix; - -use crypto::sha1::Sha1; -use crypto::digest::Digest; - -use rustc_serialize::base64::FromBase64; - -use event::UpdateId; - -/// Type for storing the metadata of a in-progress transfer, which is defined as one package. -/// Will clear out the chunks on disk when freed. -pub struct Transfer { - pub update_id: UpdateId, - /// SHA1 checksum of the fully assembled package. - pub checksum: String, - /// `Vector` of transferred chunks. - pub transferred_chunks: Vec<u64>, - /// Path to the directory, where chunks will be cached and finished packages will be stored. - pub prefix_dir: String, - /// Timestamp, when the last chunk was received. Given as a unix epoch timestamp. - pub last_chunk_received: i64 -} - -impl Transfer { - /// Return a new `Transfer` - /// - /// # Arguments - /// * `prefix`: Path where transferred chunks and assembled package will be stored. - /// * `package`: [`PackageId`](../message/struct.PackageId.html) of this transfer. - /// * `checksum`: SHA1 checksum of the fully assembled package. - pub fn new(prefix: String, id: UpdateId, checksum: String) - -> Transfer { - Transfer { - update_id: id, - checksum: checksum, - transferred_chunks: Vec::new(), - prefix_dir: prefix, - last_chunk_received: time::get_time().sec - } - } - - /// Create a transfer with empty values. To be used in tests. - /// - /// # Arguments - /// * `prefix`: Path where transferred chunks and assembled package will be stored. This should - /// be a temporary directory for tests. - #[cfg(test)] - pub fn new_test(prefix: &PathPrefix) -> Transfer { - Transfer { - update_id: UpdateId::new(), - checksum: "".to_string(), - transferred_chunks: Vec::new(), - prefix_dir: prefix.to_string(), - last_chunk_received: time::get_time().sec - } - } - - /// Randomize a existing transfer, by creating a random - /// [`PackageId`](../message/struct.PackageId.html). Returns the created `PackageId`, so it can - /// be used in assertions. - /// - /// # Arguments - /// * `i`: Size of the name and version strings. - #[cfg(test)] - pub fn randomize(&mut self, i: usize) -> UpdateId { - let update_id = rand::thread_rng() - .gen_ascii_chars().take(i).collect::<String>(); - - trace!("Testing with:"); - trace!(" update_id: {}", update_id); - self.update_id = update_id.clone(); - update_id - } - - /// Write a transferred chunk to disk. Returns false and logs an error if something goes wrong. - /// - /// # Arguments - /// * `msg`: Base64 encoded data of this chunk. - /// * `index`: Index of this chunk - pub fn write_chunk(&mut self, - msg: &str, - index: u64) -> bool { - let success = msg.from_base64().map_err(|e| { - error!("Could not decode chunk {} for update_id {}", index, self.update_id); - error!("{}", e) - }).and_then(|msg| self.get_chunk_path(index).map_err(|e| { - error!("Could not get path for chunk {}", index); - error!("{}", e) - }).map(|path| { - trace!("Saving chunk to {}", path.display()); - if write_new_file(&path, &msg) { - self.transferred_chunks.push(index); - self.transferred_chunks.sort(); - self.transferred_chunks.dedup(); - true - } else { - error!("Couldn't write chunk {} for update_id {}", index, self.update_id); - false - } - })).unwrap_or(false); - - self.last_chunk_received = time::get_time().sec; - success - } - - /// Assemble the transferred chunks to a package and verify it with the provided checksum. - /// Returns `false` and prints a error message if either the package can't be assembled or the - /// checksum doesn't match. - pub fn assemble_package(&self) -> Result<PathBuf, String> { - trace!("Finalizing package {}", self.update_id); - self.assemble_chunks(). - and_then(|_| { - if self.checksum() { - self.get_package_path() - } else { - Err(format!("Cannot assemble_package for update_id: {}", self.update_id)) - } - }) - } - - /// Collect all chunks and concatenate them into one file. Returns a `String` with a error - /// message, should something go wrong. - fn assemble_chunks(&self) -> Result<(), String> { - let package_path = try!(self.get_package_path()); - - trace!("Saving update_id {} to {}", self.update_id, package_path.display()); - - let mut file = try!(OpenOptions::new() - .write(true).append(true) - .create(true).truncate(true) - .open(package_path) - .map_err(|x| format!("Couldn't open file: {}", x))); - - let path: PathBuf = try!(self.get_chunk_dir()); - - // Make sure all indices are valid and sort them - let mut indices = Vec::new(); - for entry in try!(read_dir(&path)) { - let entry = try!(entry.map_err(|x| format!("No entries: {}", x))); - indices.push(try!(parse_index(entry))); - } - indices.sort(); - - // Append indices to the final file - for index in indices { - try!(self.copy_chunk(&path, index, &mut file)); - } - Ok(()) - } - - /// Read a chunk file file and append it to a package file. Returns a `String` with a error - /// message should something go wrong. - /// - /// # Arguments - /// * `path`: Pointer to a [`PathBuf`] - /// (https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) where the chunks are - /// cached. - /// * `index`: Index of the chunk to append. - /// * `file`: Pointer to a `File` where the chunk should be appended. Should be created with - /// `OpenOptions` and the append only option. See the documentation for [`OpenOptions`] - /// (https://doc.rust-lang.org/stable/std/fs/struct.OpenOptions.html), [`File`] - /// (https://doc.rust-lang.org/stable/std/fs/struct.File.html), and the implementation of - /// [`assemble_chunks`](#method.assemble_chunks) for details. - fn copy_chunk(&self, path: &PathBuf, index: u64, file: &mut File) - -> Result<(), String> { - let name = index.to_string(); - let mut chunk_path = path.clone(); - chunk_path.push(&name); - let mut chunk = - try!(OpenOptions::new().open(chunk_path) - .map_err(|x| format!("Couldn't open file: {}", x))); - - let mut buf = Vec::new(); - try!(chunk.read_to_end(&mut buf) - .map_err(|x| format!("Couldn't read file {}: {}", name, x))); - try!(file.write(&mut buf) - .map_err(|x| format!("Couldn't write chunk {} to file {}: {}", - name, self.update_id, x))); - - trace!("Wrote chunk {} to update_id {}", name, self.update_id); - Ok(()) - } - - /// Verify the checksum of this transfer. Assumes the package was already assembled. Prints a - /// error message showing the mismatched checksums and returns false on errors. - fn checksum(&self) -> bool { - let path = try_or!(self.get_package_path(), return false); - let mut file = try_or!(OpenOptions::new().open(path), return false); - let mut data = Vec::new(); - - // TODO: avoid reading in the whole file at once - try_msg_or!(file.read_to_end(&mut data), - "Couldn't read file to check", - return false); - - let mut hasher = Sha1::new(); - hasher.input(&data); - let hash = hasher.result_str(); - - if hash == self.checksum { - true - } else { - error!("Checksums didn't match for update_id {}", self.update_id); - error!(" Expected: {}", self.checksum); - error!(" Got: {}", hash); - false - } - } - - /// Get the full path for the specified chunk index. Returns a - /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a - /// `String` on errors detailing what went wrong. - /// - /// # Arguments - /// * `index`: The index for which the path should be constructed - fn get_chunk_path(&self, index: u64) -> Result<PathBuf, String> { - let mut path = try!(self.get_chunk_dir()); - let filename = index.to_string(); - - trace!("Using filename {}", filename); - path.push(filename); - Ok(path) - } - - /// Get the full path for the package of this `Transfer`. Returns a - /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a - /// `String` on errors detailing what went wrong. - fn get_package_path(&self) -> Result<PathBuf, String> { - let mut path = try!(self.get_package_dir()); - path.push(format!("{}.spkg", self.update_id)); - Ok(path) - } - - /// Get the directory, where this `Transfer` caches chunks. Returns a - /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a - /// `String` on errors detailing what went wrong. - fn get_chunk_dir(&self) -> Result<PathBuf, String> { - let mut path = PathBuf::from(&self.prefix_dir); - path.push("downloads"); - path.push(format!("{}", self.update_id)); - - fs::create_dir_all(&path).map_err(|e| { - let path_str = path.to_str().unwrap_or("unknown"); - format!("Couldn't create chunk dir at '{}': {}", path_str, e) - }).map(|_| path) - } - - /// Get the directory, where this `Transfer` stores the assembled package. Returns a - /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a - /// `String` on errors detailing what went wrong. - fn get_package_dir(&self) -> Result<PathBuf, String> { - let mut path = PathBuf::from(&self.prefix_dir); - path.push("packages"); - - fs::create_dir_all(&path).map_err(|e| { - let path_str = path.to_str().unwrap_or("unknown"); - format!("Couldn't create packges dir at '{}': {}", path_str, e) - }).map(|_| path) - } -} - -impl Drop for Transfer { - /// When a `Transfer` is freed it will also clear out the associated chunk cache on disk. - fn drop(&mut self) { - let dir = try_or!(self.get_chunk_dir(), return); - trace!("Dropping transfer for package {}", self.update_id); - - for entry in try_or!(read_dir(&dir), return) { - let entry = try_or!(entry, continue); - let _ = entry.file_name().into_string().map_err(|_| - error!("Found a malformed entry!") - ).map(|name| { - trace!("Dropping chunk file {}", name); - try_or!(fs::remove_file(entry.path()), return); - }); - } - - try_or!(fs::remove_dir(dir), return); - } -} - -/// Write the provided `data` to the file at `path`. Will create the file if it doesn't exist and -/// overwrite existing files. Returns `false` on errors, after logging a error message. -/// -/// # Arguments -/// * `path`: Pointer to a [`PathBuf`] -/// (https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) where the data will be -/// written to. Needs to point to a (possibly nonexistent) file. -/// * `data`: The data to be written to disk. -fn write_new_file(path: &PathBuf, data: &Vec<u8>) -> bool { - let mut file = try_or!(OpenOptions::new() - .write(true).create(true) - .truncate(true).open(path), - return false); - - try_or!(file.write_all(data), return false); - try_or!(file.flush(), return false); - true -} - -/// Read the contents of a directory. Returns a -/// [`ReadDir`](https://doc.rust-lang.org/stable/std/fs/struct.ReadDir.html) iterator on success or -/// a `String` with a detailed error message on failure. -fn read_dir(path: &PathBuf) -> Result<fs::ReadDir, String> { - fs::read_dir(path).map_err(|e| { - let path_str = path.to_str().unwrap_or("unknown"); - format!("Couldn't read dir at '{}': {}", path_str, e) - }) -} - -/// Parse a [`DirEntry`](https://doc.rust-lang.org/stable/std/fs/struct.DirEntry.html) to a `u64`. -/// Returns the parsed number on success or a `String` with a detailed error message on failure. -/// -/// # Arguments -/// * `entry`: `DirEntry` to be parsed. -fn parse_index(entry: DirEntry) -> Result<u64, String> { - let name = entry.file_name().into_string() - .unwrap_or("unknown".to_string()); - u64::from_str(&name) - .map_err(|_| "Couldn't parse chunk index from filename".to_string()) -} - -use std::collections::HashMap; - -/// Type alias to hide the internal `HashMap`, that is used to store -/// [`Transfer`](../persistence/struct.Transfer.html)s. -pub struct Transfers { - items: HashMap<UpdateId, Transfer>, - storage_dir: String -} - -impl Transfers { - pub fn new(dir: String) -> Transfers { - Transfers { - items: HashMap::new(), - storage_dir: dir - } - } - - pub fn get(&self, pkg: &UpdateId) -> Option<&Transfer> { - self.items.get(pkg) - } - - pub fn get_mut(&mut self, pkg: &UpdateId) -> Option<&mut Transfer> { - self.items.get_mut(pkg) - } - - pub fn push(&mut self, pkg: UpdateId, cksum: String) { - self.items.insert( - pkg.clone(), - Transfer::new(self.storage_dir.to_string(), pkg, cksum)); - } - - #[cfg(test)] - pub fn push_test(&mut self, tr: Transfer) { - self.items.insert(tr.update_id.clone(), tr); - } - - #[cfg(test)] - pub fn is_empty(&self) -> bool { - self.items.is_empty() - } - - pub fn remove(&mut self, pkg: &UpdateId) { - self.items.remove(pkg); - } - - pub fn clear(&mut self) { - self.items.clear(); - } - - pub fn prune(&mut self, now: i64, timeout: i64) { - self.items.iter() - .filter(|&(_, v)| now - v.last_chunk_received > timeout) - .map(|(k, _)| k.clone()) - .collect::<Vec<UpdateId>>() - .iter().map(|k| { - self.items.remove(k); - info!("Transfer for update_id {} timed out after {} ms", k, timeout)}) - .collect::<Vec<()>>(); - } -} - - -#[cfg(test)] -mod test { - use super::*; - use test_library::*; - - use std::path::PathBuf; - use std::fs; - use std::fs::OpenOptions; - use std::io::prelude::*; - - use rand; - use rand::Rng; - use rustc_serialize::base64; - use rustc_serialize::base64::ToBase64; - - fn create_tmp_directories(prefix: &PathPrefix) { - for i in 1..20 { - let mut transfer = Transfer::new_test(prefix); - let update_id = transfer.randomize(i); - let chunk_dir: PathBuf = transfer.get_chunk_dir().unwrap(); - let path = format!("{}/downloads/{}", prefix, update_id); - assert_eq!(chunk_dir.to_str().unwrap(), path); - - let path = PathBuf::from(path); - // This also makes sure it's a directory - let dir = fs::read_dir(&path).unwrap(); - - for _ in dir { - panic!("Found non-empty directory!"); - } - } - } - - #[test] - fn it_creates_a_tmp_directory() { - test_init!(); - let prefix = PathPrefix::new(); - create_tmp_directories(&prefix); - } - - #[test] - fn it_cleans_up_the_tmp_directories() { - test_init!(); - let prefix = PathPrefix::new(); - create_tmp_directories(&prefix); - let path = PathBuf::from(format!("{}/downloads/", prefix)); - let dir = fs::read_dir(&path).unwrap(); - - for _ in dir { - panic!("Found non-empty directory!"); - } - } - - #[test] - fn it_creates_a_persistent_directory_per_package() { - test_init!(); - let prefix = PathPrefix::new(); - for i in 1..20 { - let mut transfer = Transfer::new_test(&prefix); - let update_id = transfer.randomize(i); - - let chunk_dir: PathBuf = transfer.get_package_path().unwrap(); - let path = format!("{}/packages/{}.spkg", prefix, update_id); - assert_eq!(chunk_dir.to_str().unwrap(), path); - } - } - - macro_rules! assert_chunk_written { - ($transfer:ident, - $prefix:ident, - $update_id:ident, - $index:ident, - $data:ident) => {{ - trace!("Testing with: {}", $data); - - let b64_data = $data.as_bytes().to_base64( - base64::Config { - char_set: base64::CharacterSet::UrlSafe, - newline: base64::Newline::LF, - pad: true, - line_length: None - }); - - trace!("Encoded as: {}", b64_data); - - $transfer.write_chunk(&b64_data, $index as u64); - - let path = format!("{}/downloads/{}/{}", $prefix, $update_id, $index); - - trace!("Expecting file at: {}", path); - - let mut from_disk = Vec::new(); - OpenOptions::new() - .open(PathBuf::from(path)) - .unwrap() - .read_to_end(&mut from_disk) - .unwrap(); - - assert_eq!($data.into_bytes(), from_disk); - }} - } - - #[test] - fn it_writes_decoded_data_to_disk() { - test_init!(); - let prefix = PathPrefix::new(); - for i in 1..20 { - let mut transfer = Transfer::new_test(&prefix); - let update_id = transfer.randomize(i); - for i in 1..20 { - let data = rand::thread_rng() - .gen_ascii_chars().take(i).collect::<String>(); - assert_chunk_written!(transfer, prefix, update_id, i, data); - } - } - } - - #[test] - fn it_correctly_assembles_stored_chunks() { - test_init!(); - let prefix = PathPrefix::new(); - for i in 1..20 { - let mut transfer = Transfer::new_test(&prefix); - let update_id = transfer.randomize(i); - let mut full_data = String::new(); - for i in 1..20 { - let data = rand::thread_rng() - .gen_ascii_chars().take(i).collect::<String>(); - full_data.push_str(&data); - - assert_chunk_written!(transfer, prefix, update_id, i, data); - } - - transfer.assemble_chunks().unwrap(); - - let path = format!("{}/packages/{}.spkg", prefix, update_id); - - trace!("Expecting assembled file at: {}", path); - - let mut from_disk = Vec::new(); - OpenOptions::new() - .open(PathBuf::from(path)) - .unwrap() - .read_to_end(&mut from_disk) - .unwrap(); - - assert_eq!(full_data.into_bytes(), from_disk); - } - } - - fn checksum_matching(data: String, checksum: String) -> bool { - let prefix = PathPrefix::new(); - let mut transfer = Transfer::new_test(&prefix); - let update_id = transfer.randomize(20); - let index = 0; - assert_chunk_written!(transfer, prefix, update_id, index, data); - transfer.assemble_chunks().unwrap(); - - transfer.checksum = checksum; - transfer.checksum() - } - - #[test] - fn it_returns_true_for_correct_checksums() { - test_init!(); - assert!(checksum_matching("test\n".to_string(), - "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83".to_string())); - } - - #[test] - fn it_returns_false_for_incorrect_checksums() { - test_init!(); - assert!(!checksum_matching("test\n".to_string(), - "fa7c4d75bae3a641d1f9ab5df028175bfb8a69ca".to_string())); - } - - #[test] - fn it_returns_false_for_invalid_checksums() { - test_init!(); - assert!(!checksum_matching("test\n".to_string(), - "invalid".to_string())); - } -} diff --git a/src/remote/jsonrpc.rs b/src/remote/jsonrpc.rs deleted file mode 100644 index 5676a37..0000000 --- a/src/remote/jsonrpc.rs +++ /dev/null @@ -1,150 +0,0 @@ -//! RVI specific implementation of the jsonrpc protocol -use time; - -/// Type to encode a generic jsonrpc call. -#[derive(RustcDecodable,RustcEncodable,Debug)] -pub struct Request<T> { - /// The version of jsonrpc to use, has to be set to `"2.0"`. - pub jsonrpc: String, - /// The identifier of the request. Only unsigned numbers are accepted - // TODO: id can be any type - pub id: u64, - /// The method to call on the receiving side. - pub method: String, - /// Arguments for the method. - pub params: T -} - -impl<T> Request<T> { - /// Returns a new `Request`. - /// - /// # Arguments - /// * `s`: The name of the method to call. - /// * `p`: The arguments of said method. - pub fn new(s: &str, p: T) -> Request<T> { - Request::<T> { - jsonrpc: "2.0".to_string(), - id: time::precise_time_ns(), - method: s.to_string(), - params: p - } - } -} - -/// Response to a jsonrpc call, indicating a successful method call. -#[derive(RustcDecodable,RustcEncodable)] -pub struct OkResponse<T> { - /// The version of jsonrpc to use, has to be set to `"2.0"`. - pub jsonrpc: String, - /// The identifier of the jsonrpc call this response belongs to. Only unsigned numbers are - /// accepted - // TODO: id can be any type - pub id: u64, - /// The result of the method call, if any. - pub result: Option<T> -} - -impl<T> OkResponse<T> { - /// Returns a new `OkResponse` - /// - /// # Arguments - /// * `id`: The identifier of the jsonrpc call the returned response belongs to. - /// * `result`: The result of the method call, if any. - pub fn new(id: u64, result: Option<T>) -> OkResponse<T> { - OkResponse { - jsonrpc: "2.0".to_string(), - id: id, - result: result - } - } -} - -/// Response to a jsonrpc call, indicating failure. -#[derive(RustcDecodable,RustcEncodable)] -pub struct ErrResponse { - /// The version of jsonrpc to use, has to be set to `"2.0"`. - pub jsonrpc: String, - /// The identifier of the jsonrpc call this response belongs to. Only unsigned numbers are - /// accepted - // TODO: id can be any type - pub id: u64, - /// The error code and message. - pub error: ErrorCode -} - -impl ErrResponse { - /// Returns a new `ErrResponse` - /// - /// # Arguments - /// * `id`: The identifier of the jsonrpc call the returned response belongs to. - /// * `error`: The error code and message. See [`ErrorCode`](./struct.ErrorCode.html). - pub fn new(id: u64, error: ErrorCode) -> ErrResponse { - ErrResponse { - jsonrpc: "2.0".to_string(), - id: id, - error: error - } - } - - /// Returns a new `ErrResponse`, indicating a ["Invalid - /// Request"](http://www.jsonrpc.org/specification#error_object) error. - pub fn invalid_request(id: u64) -> ErrResponse { - ErrResponse::new(id, - ErrorCode { - code: -32600, - message: "Invalid Request".to_string() - }) - } - - /// Returns a new `ErrResponse`, indicating a ["Method not - /// found"](http://www.jsonrpc.org/specification#error_object) error. - pub fn method_not_found(id: u64) -> ErrResponse { - ErrResponse::new(id, - ErrorCode { - code: -32601, - message: "Method not found".to_string() - }) - } - - /// Returns a new `ErrResponse`, indicating a ["Parse - /// error"](http://www.jsonrpc.org/specification#error_object) error. - pub fn parse_error() -> ErrResponse { - ErrResponse::new(0, - ErrorCode { - code: -32700, - message: "Parse error".to_string() - }) - } - - /// Returns a new `ErrResponse`, indicating a ["Invalid - /// params"](http://www.jsonrpc.org/specification#error_object) error. - pub fn invalid_params(id: u64) -> ErrResponse { - ErrResponse::new( - id, - ErrorCode { - code: -32602, - message: "Invalid params".to_string() - }) - } - - /// Returns a new `ErrResponse`, indicating a unspecified error. - pub fn unspecified(id: u64) -> ErrResponse { - ErrResponse::new( - id, - ErrorCode { - code: -32100, - message: "Couldn't handle request".to_string() - }) - } -} - -/// Type to encode a jsonrpc error. -#[derive(RustcDecodable,RustcEncodable)] -pub struct ErrorCode { - /// The error code as [specified by - /// jsonrpc](http://www.jsonrpc.org/specification#error_object). - pub code: i32, - /// The error message as [specified by - /// jsonrpc](http://www.jsonrpc.org/specification#error_object). - pub message: String -} diff --git a/src/remote/mod.rs b/src/remote/mod.rs deleted file mode 100644 index 42a2c7b..0000000 --- a/src/remote/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod dw; -mod jsonrpc; -mod parm; -pub mod rvi; -pub mod svc; diff --git a/src/remote/parm.rs b/src/remote/parm.rs deleted file mode 100644 index 5c2db8b..0000000 --- a/src/remote/parm.rs +++ /dev/null @@ -1,189 +0,0 @@ -use event::UpdateId; -use event::inbound::{InboundEvent, UpdateAvailable, GetInstalledSoftware, DownloadComplete}; - -use super::dw::Transfers; -use super::svc::{BackendServices, RemoteServices}; - -use std::result; -use std::sync::Mutex; - -#[derive(Debug)] -pub enum Error { - UnknownPackage, - IoFailure, - SendFailure -} -pub type Result = result::Result<Option<InboundEvent>, Error>; - -/// Trait that every message handler needs to implement. -pub trait ParamHandler { - /// Handle the message. - /// - /// Return a [`Event`](../message/enum.Event.html) to be passed to the - /// [`main_loop`](../main_loop/index.html) if apropriate. - fn handle(&self, - services: &Mutex<RemoteServices>, - transfers: &Mutex<Transfers>) - -> Result; -} - - -/// Type for "Notify" messages. -#[derive(RustcDecodable, Clone)] -pub struct NotifyParams { - /// A `Vector` of packages, that are available for download. - pub update_available: UpdateAvailable, - /// The service URLs, that the SOTA server supports. - pub services: BackendServices, -} - -impl ParamHandler for NotifyParams { - fn handle(&self, - services: &Mutex<RemoteServices>, - _: &Mutex<Transfers>) -> Result { - let mut services = services.lock().unwrap(); - services.set(self.services.clone()); - - Ok(Some(InboundEvent::UpdateAvailable(self.update_available.clone()))) - } -} - - -/// Type for "Start Transfer" messages. -#[derive(RustcDecodable)] -pub struct StartParams { - pub update_id: UpdateId, - /// The amount of chunks this `Transfer` will have. - pub chunkscount: u64, - /// The SHA1 checksum of the assembled package. - pub checksum: String -} - -impl ParamHandler for StartParams { - fn handle(&self, - services: &Mutex<RemoteServices>, - transfers: &Mutex<Transfers>) -> Result { - let services = services.lock().unwrap(); - let mut transfers = transfers.lock().unwrap(); - - info!("Starting transfer for update_id {}", self.update_id); - - transfers.push(self.update_id.clone(), self.checksum.clone()); - services.send_chunk_received( - ChunkReceived { - update_id: self.update_id.clone(), - chunks: Vec::new(), - vin: services.vin.clone() }) - .map_err(|e| { - error!("Error on sending start ACK: {}", e); - Error::SendFailure }) - .map(|_| None) - } -} - -/// Encodes the "Chunk Received" message, indicating that a chunk was successfully transferred. -#[derive(RustcEncodable)] -pub struct ChunkReceived { - /// The transfer to which the transferred chunk belongs. - pub update_id: UpdateId, - /// A list of the successfully transferred chunks. - pub chunks: Vec<u64>, - /// The VIN of this device. - pub vin: String -} - - -/// Type for messages transferring single chunks. -#[derive(RustcDecodable)] -pub struct ChunkParams { - /// The package transfer this chunk belongs to. - pub update_id: UpdateId, - /// The data of the transferred chunk. - pub bytes: String, - /// The index of this chunk. - pub index: u64 -} - -impl ParamHandler for ChunkParams { - fn handle(&self, - services: &Mutex<RemoteServices>, - transfers: &Mutex<Transfers>) -> Result { - let services = services.lock().unwrap(); - let mut transfers = transfers.lock().unwrap(); - transfers.get_mut(&self.update_id).map(|t| { - if t.write_chunk(&self.bytes, self.index) { - info!("Wrote chunk {} for package {}", self.index, self.update_id); - services.send_chunk_received( - ChunkReceived { - update_id: self.update_id.clone(), - chunks: t.transferred_chunks.clone(), - vin: services.vin.clone() }) - .map_err(|e| { - error!("Error on sending ChunkReceived: {}", e); - Error::SendFailure }) - .map(|_| None) - } else { - Err(Error::IoFailure) - } - }).unwrap_or_else(|| { - error!("Couldn't find transfer for update_id {}", self.update_id); - Err(Error::UnknownPackage) - }) - } -} - - -/// Type for "Finish Transfer" messages. -#[derive(RustcDecodable)] -pub struct FinishParams { - /// The package transfer to finalize. - pub update_id: UpdateId, - pub signature: String -} - -impl ParamHandler for FinishParams { - fn handle(&self, - _: &Mutex<RemoteServices>, - transfers: &Mutex<Transfers>) -> Result { - let mut transfers = transfers.lock().unwrap(); - transfers.get(&self.update_id).ok_or(Error::UnknownPackage) - .and_then(|t| { - t.assemble_package().map_err(|_| Error::IoFailure) }) - .and_then(|p| { - p.into_os_string().into_string().map_err(|_| Error::IoFailure) }) - .map(|p| { - transfers.remove(&self.update_id); - info!("Finished transfer of {}", self.update_id); - Some(InboundEvent::DownloadComplete(DownloadComplete { - update_id: self.update_id.clone(), - update_image: p, - signature: self.signature.clone() })) }) - } -} - - -/// Type for "Abort Transfer" messages. -#[derive(RustcDecodable)] -pub struct AbortParams; - -impl ParamHandler for AbortParams { - fn handle(&self, - _: &Mutex<RemoteServices>, - transfers: &Mutex<Transfers>) -> Result { - let mut transfers = transfers.lock().unwrap(); - transfers.clear(); - Ok(None) - } -} - - -/// Type for "Get All Packages" messages. -pub type ReportParams = GetInstalledSoftware; - -impl ParamHandler for ReportParams { - fn handle(&self, - _: &Mutex<RemoteServices>, - _: &Mutex<Transfers>) -> Result { - Ok(Some(InboundEvent::GetInstalledSoftware(self.clone()))) - } -} diff --git a/src/remote/rvi/edge.rs b/src/remote/rvi/edge.rs deleted file mode 100644 index 6eec01f..0000000 --- a/src/remote/rvi/edge.rs +++ /dev/null @@ -1,162 +0,0 @@ -//! Implements the RVI facing webservice. - -use std::io::{Read, Write}; -use std::thread; -use hyper::Server; -use hyper::server::{Handler, Request, Response}; -use rustc_serialize::json; -use rustc_serialize::json::Json; - -use remote::jsonrpc; -use remote::jsonrpc::{OkResponse, ErrResponse}; - -use remote::rvi::send; -use remote::rvi::message::{RegisterServiceRequest, RegisterServiceResponse}; - -pub trait ServiceHandler: Sync + Send { - fn handle_service(&self, id: u64, service: &str, message: &str) - -> Result<OkResponse<i32>, ErrResponse>; - fn register_services<F: Fn(&str) -> String>(&self, reg: F); -} - -/// Encodes the service edge of the webservice. -pub struct ServiceEdge<H: ServiceHandler + 'static> { - /// The full URL where RVI can be reached. - rvi_url: String, - /// The `host:port` to bind and listen for incoming RVI messages. - edge_url: String, - hdlr: H -} - -impl<H: ServiceHandler + 'static> ServiceEdge<H> { - /// Create a new service edge. - /// - /// # Arguments - /// * `r`: The full URL where RVI can be reached. - /// * `e`: The `host:port` combination where the edge should bind. - /// * `s`: A sender to communicate back the service URLs. - pub fn new(r: String, e: String, h: H) -> ServiceEdge<H> { - ServiceEdge { - rvi_url: r, - edge_url: e, - hdlr: h - } - } - - /// Register a service. Returns the full service URL as provided by RVI. Panics if the - /// registration in RVI failed. This can be handled by starting the RVI edge in a separate - /// thread. - /// - /// # Arguments - /// * `s`: The service to register. Will get prepended with the device identifier by RVI. - pub fn register_service(&self, s: &str) -> String { - let json_rpc = jsonrpc::Request::new( - "register_service", - RegisterServiceRequest { - network_address: self.edge_url.to_string(), - service: s.to_string() - }); - - let resp = send(&self.rvi_url, &json_rpc) - .map_err(|e| error!("Couldn't send registration to RVI\n{}", e)) - .and_then(|r| json::decode::<jsonrpc::OkResponse<RegisterServiceResponse>>(&r) - .map_err(|e| error!("Couldn't parse response when registering in RVI\n{}", e))) - .unwrap(); - - resp.result - .expect("Didn't get full service name when registering") - .service - } - - /// Starts the service edge. - /// - /// It binds on the provided `host:port` combination, registers all services and then waits for - /// incoming RVI messages. On incoming messages it forks another thread and passes the message - /// to the provided `Handler`. For details about how to implement a `Handler` see the - /// [`hyper`](../../hyper/index.html) documentation and the [reference - /// implementation](../handler/index.html). - /// - /// Panics if it can't reach or register in RVI. - /// - /// # Arguments - /// * `h`: The `Handler` all messages are passed to. - /// * `s`: A `Vector` of service strings to register in RVI. - pub fn start(self) { - let url = self.edge_url.clone(); - self.hdlr.register_services(|s| self.register_service(s)); - thread::spawn(move || { - Server::http(&*url).and_then(|srv| { - info!("Ready to accept connections."); - srv.handle(self) }) - .map_err(|e| error!("Couldn't start server\n{}", e)) - .unwrap() - }); - } - - /// Try to parse the type of a message and forward it to the appropriate message handler. - /// Returns the result of the message handling or a `jsonrpc` result indicating a parser error. - /// - /// Needs to be extended to support new services. - /// - /// # Arguments - /// * `message`: The message that will be parsed. - fn handle_message(&self, message: &str) - -> Result<OkResponse<i32>, ErrResponse> { - - let data = try!( - Json::from_str(message) - .map_err(|_| ErrResponse::parse_error())); - let obj = try!( - data.as_object().ok_or(ErrResponse::parse_error())); - let rpc_id = try!( - obj.get("id").and_then(|x| x.as_u64()) - .ok_or(ErrResponse::parse_error())); - - let method = try!( - obj.get("method").and_then(|x| x.as_string()) - .ok_or(ErrResponse::invalid_request(rpc_id))); - - if method == "services_available" { - Ok(OkResponse::new(rpc_id, None)) - } - else if method != "message" { - Err(ErrResponse::method_not_found(rpc_id)) - } else { - let service = try!(obj.get("params") - .and_then(|x| x.as_object()) - .and_then(|x| x.get("service_name")) - .and_then(|x| x.as_string()) - .ok_or(ErrResponse::invalid_request(rpc_id))); - - self.hdlr.handle_service(rpc_id, service, message) - } - } -} - -impl<H: ServiceHandler + 'static> Handler for ServiceEdge<H> { - fn handle(&self, mut req: Request, resp: Response) { - let mut rbody = String::new(); - try_or!(req.read_to_string(&mut rbody), return); - debug!(">>> Received Message: {}", rbody); - let mut resp = try_or!(resp.start(), return); - - macro_rules! send_response { - ($rtype:ty, $resp:ident) => { - match json::encode::<$rtype>(&$resp) { - Ok(decoded_msg) => { - try_or!(resp.write_all(decoded_msg.as_bytes()), return); - debug!("<<< Sent Response: {}", decoded_msg); - }, - Err(p) => { error!("{}", p); } - } - }; - } - - match self.handle_message(&rbody) { - Ok(msg) => send_response!(OkResponse<i32>, msg), - Err(msg) => send_response!(ErrResponse, msg) - } - - try_or!(resp.end(), return); - } -} diff --git a/src/remote/rvi/message.rs b/src/remote/rvi/message.rs deleted file mode 100644 index 8adf596..0000000 --- a/src/remote/rvi/message.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Wrappers for messages exchanged with RVI. - -use std::vec::Vec; -use time; - -/// A generic incoming message. -#[derive(RustcDecodable, RustcEncodable)] -pub struct Message<T> { - /// The service that got called. - pub service_name: String, - /// The paramaters to the service call. - pub parameters: Vec<T> -} - -/// A generic outgoing message. -#[derive(RustcDecodable, RustcEncodable)] -pub struct RVIMessage<T> { - /// The service name to call. - pub service_name: String, - /// A timestamp when this message should expire. In UTC UNIX epoch. - pub timeout: i64, - /// The parameters to the service call. - pub parameters: Vec<T> -} - -impl<T> RVIMessage<T> { - /// Create a new outgoing RVI message. - /// - /// # Arguments - /// * `service`: The service name to call. - /// * `parameters`: The parameters to the service call. - /// * `tdelta`: Amount of seconds before the message will expire. - pub fn new(service: &str, - parameters: Vec<T>, - tdelta: i64) -> RVIMessage<T> { - let timeout = time::Duration::seconds(tdelta); - RVIMessage { - timeout: (time::get_time() + timeout).sec, - service_name: service.to_string(), - parameters: parameters - } - } -} - -/// Encodes a registration request. -#[derive(RustcEncodable)] -pub struct RegisterServiceRequest { - /// The network address where RVI can be reached. - pub network_address: String, - /// The service (short name) to register. - pub service: String -} - -/// Encodes a registration response. -#[derive(RustcDecodable)] -pub struct RegisterServiceResponse { - /// Status number indicating success or failure. See the RVI documentation for details. - pub status: i32, - /// The full service URL, that RVI assigned. - pub service: String -} diff --git a/src/remote/rvi/mod.rs b/src/remote/rvi/mod.rs deleted file mode 100644 index a6791bc..0000000 --- a/src/remote/rvi/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -//! RVI bindings for Rust. -//! -//! RVI - Remote Vehicle Interaction - is the next generation of connected vehicle services. Based -//! on the discussions inside and outside the Automotive Grade Linux expert group. -//! -//! This module implements Rust bindings to simplify the interaction with it. -//! -//! It is intended to be split out into a separate crate at some point in the future. - -mod edge; -mod send; -mod message; - -// Export public interface -pub use super::rvi::edge::{ServiceEdge, ServiceHandler}; -pub use super::rvi::send::send; -pub use super::rvi::send::send_message; -pub use super::rvi::message::Message; diff --git a/src/remote/rvi/send.rs b/src/remote/rvi/send.rs deleted file mode 100644 index 8308203..0000000 --- a/src/remote/rvi/send.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Helper functions for sending messages to RVI. - -use std::io::Read; -use hyper::Client; -use rustc_serialize::{json, Encodable}; - -use remote::jsonrpc; -use remote::rvi::message::RVIMessage; - -/// Send a object to RVI. Either returns the full response from RVI or a error message. -/// -/// The object will get encoded to json. Apart from that no sanity checks are made. You usually -/// don't need this function. -/// -/// # Arguments -/// * `url`: The full URL where RVI can be reached. -/// * `b`: The object to encode and send to RVI. -pub fn send<E: Encodable>(url: &str, b: &E) -> Result<String, String> { - let client = Client::new(); - - let mut resp = try!(json::encode(b) - .map_err(|e| format!("{}", e)) - .and_then(|j| { - debug!("<<< Sent Message: {}", j); - client.post(url).body(&j).send() - .map_err(|e| format!("{}", e)) - })); - - let mut rbody = String::new(); - try!(resp.read_to_string(&mut rbody) - .map_err(|e| format!("{}", e))); - debug!(">>> Received Response: {}", rbody); - Ok(rbody) -} - -/// Prepare a message and send it to RVI. Returns the full response from RVI on success or a error -/// message on failure. -/// -/// This wraps the provided object into a proper RVI message and encodes it to json. You usually -/// should call this function. -/// -/// **NOTE:** This currently implements a workaround for RVI, that will get fixed in the upcoming -/// RVI version `0.5.0`, which will break this current implementation. For the new protocol you -/// don't have to wrap the `params` in a one element `Vector` any more. -/// -/// # Arguments -/// * `url`: The full URL where RVI can be reached. -/// * `b`: The object to wrap into a RVI Message, encode and send to RVI. -/// * `addr`: The full RVI address (service URL) where this message should be sent to. -#[cfg(not(test))] -pub fn send_message<E: Encodable>(url: &str, b: E, addr: &str) -> Result<String, String> { - let mut params = Vec::new(); - params.push(b); - let message = RVIMessage::<E>::new(addr, params, 90); - let json_rpc = jsonrpc::Request::new("message", message); - send(url, &json_rpc) -} -#[cfg(test)] -pub fn send_message<E: Encodable>(url: &str, _: E, addr: &str) -> Result<String, String> { - Ok(format!("Faked sending to RVI: {}, {}", url, addr)) -} diff --git a/src/remote/svc.rs b/src/remote/svc.rs deleted file mode 100644 index 873cb80..0000000 --- a/src/remote/svc.rs +++ /dev/null @@ -1,272 +0,0 @@ -//! The main service handler -//! -//! Parses incoming messages and delegates them to the appropriate individual message handlers, -//! passing on the results to the [`main_loop`](../main_loop/index.html) - -use std::ops::Deref; -use std::sync::{Arc, Mutex}; -use std::sync::mpsc::Sender; -use std::thread; -use std::thread::sleep_ms; - -use rustc_serialize::{json, Decodable}; -use time; - -use event::{Event, UpdateId}; -use event::inbound::InboundEvent; -use event::outbound::{UpdateReport, InstalledSoftware}; - -use super::parm::{NotifyParams, StartParams, ChunkParams, ChunkReceived, FinishParams}; -use super::parm::{ReportParams, AbortParams, ParamHandler}; -use super::dw::Transfers; - -use super::jsonrpc; -use super::jsonrpc::{OkResponse, ErrResponse}; -use super::rvi; - -use configuration::ClientConfiguration; - -/// Encodes the list of service URLs the client registered. -/// -/// Needs to be extended to introduce new services. -#[derive(RustcEncodable, Clone)] -pub struct LocalServices { - /// "Start Download" URL. - pub start: String, - /// "Chunk" URL. - pub chunk: String, - /// "Abort Download" URL. - pub abort: String, - /// "Finish Download" URL. - pub finish: String, - /// "Get All Packages" URL. - pub getpackages: String, -} - -impl LocalServices { - /// Returns the VIN of this device. - /// - /// # Arguments - /// * `vin_match`: The index, where to look for the VIN in the service URL. - pub fn get_vin(&self, vin_match: i32) -> String { - self.start.split("/").nth(vin_match as usize).unwrap().to_string() - } -} - -/// Encodes the service URLs, that the server provides. -#[derive(RustcDecodable, Clone)] -pub struct BackendServices { - /// URL for the "Start Download" call. - pub start: String, - /// URL for the "Chunk Received" call. - pub ack: String, - /// URL for the "Installation Report" call. - pub report: String, - /// URL for the "Get All Packages" call. - pub packages: String -} - -#[derive(RustcEncodable, Clone)] -struct StartDownload { - vin: String, - update_id: UpdateId, - services: LocalServices, -} - -#[derive(RustcEncodable, Clone)] -struct UpdateResult { - vin: String, - update_report: UpdateReport -} - -#[derive(RustcEncodable, Clone)] -struct InstalledSoftwareResult { - vin: String, - installed_software: InstalledSoftware -} - -pub struct RemoteServices { - pub vin: String, - url: String, - local_svcs: Option<LocalServices>, - svcs: Option<BackendServices> -} - -impl RemoteServices { - pub fn new(url: String) -> RemoteServices { - RemoteServices { - vin: String::new(), - url: url, - local_svcs: None, - svcs: None - } - } - - pub fn set_remote(&mut self, vin: String, svcs: LocalServices) { - self.vin = vin; - self.local_svcs = Some(svcs); - } - - pub fn set(&mut self, svcs: BackendServices) { - self.svcs = Some(svcs); - } - - pub fn send_chunk_received(&self, m: ChunkReceived) -> Result<String, String> { - self.svcs.iter().next().ok_or(format!("RemoteServices not set")) - .and_then(|ref svcs| rvi::send_message(&self.url, m, &svcs.ack)) - } - - fn make_start_download(&self, id: UpdateId) -> StartDownload { - StartDownload { - vin: self.vin.clone(), - services: self.local_svcs.iter().next().cloned().unwrap(), - update_id: id - } - } - - pub fn send_start_download(&self, id: UpdateId) -> Result<String, String> { - self.svcs.iter().next().ok_or(format!("RemoteServices not set")) - .and_then(|ref svcs| rvi::send_message( - &self.url, - self.make_start_download(id), - &svcs.start)) - } - - pub fn send_update_report(&self, m: UpdateReport) -> Result<String, String> { - self.svcs.iter().next().ok_or(format!("RemoteServices not set")) - .and_then(|ref svcs| rvi::send_message( - &self.url, - UpdateResult { - vin: self.vin.clone(), - update_report: m }, - &svcs.report)) - } - - pub fn send_installed_software(&self, m: InstalledSoftware) -> Result<String, String> { - self.svcs.iter().next().ok_or(format!("RemoteServices not set")) - .and_then(|ref svcs| rvi::send_message( - &self.url, - InstalledSoftwareResult { - vin: self.vin.clone(), - installed_software: m }, - &svcs.packages)) - } -} - - -/// Type that encodes a single service handler. -/// -/// Holds the necessary state, like in-progress transfers, that are needed for handling incoming -/// messages and sending replies to RVI. Needs to be thread safe as -/// [`hyper`](../../../hyper/index.html) handles requests asynchronously. -pub struct ServiceHandler { - /// A `Sender` that connects the handlers with the `main_loop`. - sender: Mutex<Sender<Event>>, - /// The currently in-progress `Transfer`s. - transfers: Arc<Mutex<Transfers>>, - /// The service URLs that the SOTA server advertised. - remote_services: Arc<Mutex<RemoteServices>>, - /// The full `Configuration` of sota_client. - conf: ClientConfiguration -} - -impl ServiceHandler { - /// Create a new `ServiceHandler`. - /// - /// # Arguments - /// * `transfers`: A `Transfers` object to store the in-progress `Transfer`s. - /// * `sender`: A `Sender` to call back into the `main_loop`. - /// * `url`: The full URL, where RVI can be reached. - /// * `c`: The full `Configuration` of sota_client. - pub fn new(sender: Sender<Event>, - r: Arc<Mutex<RemoteServices>>, - c: ClientConfiguration) -> ServiceHandler { - let transfers = Arc::new(Mutex::new(Transfers::new(c.storage_dir.clone()))); - let tc = transfers.clone(); - c.timeout - .map(|t| { - let _ = thread::spawn(move || ServiceHandler::start_timer(tc.deref(), t)); - info!("Transfers timeout after {}", t)}) - .unwrap_or(info!("No timeout configured, transfers will never time out.")); - - ServiceHandler { - sender: Mutex::new(sender), - transfers: transfers, - remote_services: r, - conf: c - } - } - - /// Starts a infinite loop to expire timed out transfers. Checks once a second for timed out - /// transfers. - /// - /// # Arguments - /// * `transfers`: Pointer to a `Transfers` object, that stores the transfers to be checked for - /// expired timeouts. - /// * `timeout`: The timeout in seconds. - pub fn start_timer(transfers: &Mutex<Transfers>, - timeout: i64) { - loop { - sleep_ms(1000); - let mut transfers = transfers.lock().unwrap(); - transfers.prune(time::get_time().sec, timeout); - } - } - - /// Helper function to send a `Event` to the `main_loop`. - /// - /// # Arguments - /// * `e`: `Event` to send. - fn push_notify(&self, e: InboundEvent) { - try_or!(self.sender.lock().unwrap().send(Event::Inbound(e)), return); - } - - /// Create a message handler `D`, and let it process the `message`. If it returns a - /// Event, forward it to the `main_loop`. Returns a `jsonrpc` response indicating - /// success or failure. - /// - /// # Arguments - /// * `message`: The message, that should be handled. - fn handle_message_params<D>(&self, id: u64, message: &str) - -> Result<OkResponse<i32>, ErrResponse> - where D: Decodable + ParamHandler { - json::decode::<jsonrpc::Request<rvi::Message<D>>>(&message) - .map_err(|_| ErrResponse::invalid_params(id)) - .and_then(|p| { - let handler = &p.params.parameters[0]; - handler.handle(&self.remote_services, &self.transfers) - .map_err(|_| ErrResponse::unspecified(p.id)) - .map(|r| { - r.map(|m| self.push_notify(m)); - OkResponse::new(p.id, None) }) - }) - } -} - -impl rvi::ServiceHandler for ServiceHandler { - fn handle_service(&self, id: u64, service: &str, message: &str) - -> Result<OkResponse<i32>, ErrResponse> { - match service { - "/sota/notify" => self.handle_message_params::<NotifyParams>(id, message), - "/sota/start" => self.handle_message_params::<StartParams>(id, message), - "/sota/chunk" => self.handle_message_params::<ChunkParams>(id, message), - "/sota/finish" => self.handle_message_params::<FinishParams>(id, message), - "/sota/abort" => self.handle_message_params::<AbortParams>(id, message), - "/sota/getpackages" => self.handle_message_params::<ReportParams>(id, message), - _ => Err(ErrResponse::invalid_request(id)) - } - } - - fn register_services<F: Fn(&str) -> String>(&self, reg: F) { - reg("/sota/notify"); - let mut remote_svcs = self.remote_services.lock().unwrap(); - let svcs = LocalServices { - start: reg("/sota/start"), - chunk: reg("/sota/chunk"), - abort: reg("/sota/abort"), - finish: reg("/sota/finish"), - getpackages: reg("/sota/getpackages") - }; - remote_svcs.set_remote(svcs.get_vin(self.conf.vin_match), svcs); - } -} diff --git a/src/rvi/edge.rs b/src/rvi/edge.rs new file mode 100644 index 0000000..cadea74 --- /dev/null +++ b/src/rvi/edge.rs @@ -0,0 +1,127 @@ +use hyper::StatusCode; +use hyper::net::{HttpStream, Transport}; +use hyper::server::{Server as HyperServer, Request as HyperRequest}; +use rustc_serialize::json; +use rustc_serialize::json::Json; +use std::{mem, str}; +use std::net::ToSocketAddrs; + +use datatype::{RpcRequest, RpcOk, RpcErr, Url}; +use http::{Server, ServerHandler}; +use super::services::Services; + + +/// The HTTP server endpoint for `RVI` client communication. +pub struct Edge { + rvi_edge: Url, + services: Services, +} + +impl Edge { + /// Create a new `Edge` by registering each `RVI` service. + pub fn new(mut services: Services, rvi_edge: String, rvi_client: Url) -> Self { + services.register_services(|service| { + let req = RpcRequest::new("register_service", RegisterServiceRequest { + network_address: rvi_edge.clone(), + service: service.to_string(), + }); + let resp = req.send(rvi_client.clone()) + .unwrap_or_else(|err| panic!("RegisterServiceRequest failed: {}", err)); + let rpc_ok = json::decode::<RpcOk<RegisterServiceResponse>>(&resp) + .unwrap_or_else(|err| panic!("couldn't decode RegisterServiceResponse: {}", err)); + rpc_ok.result.expect("expected rpc_ok result").service + }); + + Edge { rvi_edge: rvi_edge.parse().expect("couldn't parse edge server as url"), services: services } + } + + /// Start the HTTP server listening for incoming RVI client connections. + pub fn start(&mut self) { + let mut addrs = self.rvi_edge.to_socket_addrs() + .unwrap_or_else(|err| panic!("couldn't parse edge url: {}", err)); + let server = HyperServer::http(&addrs.next().expect("no SocketAddr found")) + .unwrap_or_else(|err| panic!("couldn't start rvi edge server: {}", err)); + let (addr, server) = server.handle(move |_| EdgeHandler::new(self.services.clone())).unwrap(); + info!("RVI server edge listening at http://{}.", addr); + server.run(); + } +} + + +#[derive(RustcEncodable)] +struct RegisterServiceRequest { + pub network_address: String, + pub service: String, +} + +#[derive(RustcDecodable)] +struct RegisterServiceResponse { + pub service: String, + pub status: i32, +} + + + +struct EdgeHandler { + services: Services, + resp_code: StatusCode, + resp_body: Option<Vec<u8>> +} + +impl EdgeHandler { + fn new(services: Services) -> ServerHandler<HttpStream> { + ServerHandler::new(Box::new(EdgeHandler { + services: services, + resp_code: StatusCode::InternalServerError, + resp_body: None, + })) + } +} + +impl<T: Transport> Server<T> for EdgeHandler { + fn headers(&mut self, _: HyperRequest<T>) {} + + fn request(&mut self, body: Vec<u8>) { + let outcome = || -> Result<RpcOk<i32>, RpcErr> { + let text = try!(str::from_utf8(&body).map_err(|err| RpcErr::parse_error(err.to_string()))); + let data = try!(Json::from_str(text).map_err(|err| RpcErr::parse_error(err.to_string()))); + let object = try!(data.as_object().ok_or(RpcErr::parse_error("not an object".to_string()))); + let id = try!(object.get("id").and_then(|x| x.as_u64()) + .ok_or(RpcErr::parse_error("expected id".to_string()))); + let method = try!(object.get("method").and_then(|x| x.as_string()) + .ok_or(RpcErr::invalid_request(id, "expected method".to_string()))); + + match method { + "services_available" => Ok(RpcOk::new(id, None)), + + "message" => { + let params = try!(object.get("params").and_then(|p| p.as_object()) + .ok_or(RpcErr::invalid_request(id, "expected params".to_string()))); + let service = try!(params.get("service_name").and_then(|s| s.as_string()) + .ok_or(RpcErr::invalid_request(id, "expected params.service_name".to_string()))); + self.services.handle_service(service, id, text) + } + + _ => Err(RpcErr::method_not_found(id, format!("unknown method: {}", method))) + } + }(); + + match outcome { + Ok(msg) => { + let body = json::encode::<RpcOk<i32>>(&msg).expect("couldn't encode RpcOk response"); + self.resp_code = StatusCode::Ok; + self.resp_body = Some(body.into_bytes()); + } + + Err(err) => { + let body = json::encode::<RpcErr>(&err).expect("couldn't encode RpcErr response"); + self.resp_code = StatusCode::BadRequest; + self.resp_body = Some(body.into_bytes()); + } + } + } + + fn response(&mut self) -> (StatusCode, Option<Vec<u8>>) { + (self.resp_code, mem::replace(&mut self.resp_body, None)) + } +} diff --git a/src/rvi/mod.rs b/src/rvi/mod.rs new file mode 100644 index 0000000..ba1d40a --- /dev/null +++ b/src/rvi/mod.rs @@ -0,0 +1,9 @@ +pub mod edge; +pub mod parameters; +pub mod services; +pub mod transfers; + +pub use self::edge::Edge; +pub use self::parameters::Parameter; +pub use self::services::{RemoteServices, Services}; +pub use self::transfers::Transfer; diff --git a/src/rvi/parameters.rs b/src/rvi/parameters.rs new file mode 100644 index 0000000..1fa1a87 --- /dev/null +++ b/src/rvi/parameters.rs @@ -0,0 +1,136 @@ +use std::str; +use std::sync::Mutex; + +use datatype::{ChunkReceived, Event, DownloadComplete, UpdateRequestId, UpdateAvailable}; +use super::services::{BackendServices, RemoteServices}; +use super::transfers::Transfers; + + +/// Each `Parameter` implementation handles a specific kind of RVI client request, +/// optionally responding with an `Event` on completion. +pub trait Parameter { + fn handle(&self, remote: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) + -> Result<Option<Event>, String>; +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct Notify { + update_available: UpdateAvailable, + services: BackendServices +} + +impl Parameter for Notify { + fn handle(&self, remote: &Mutex<RemoteServices>, _: &Mutex<Transfers>) -> Result<Option<Event>, String> { + remote.lock().unwrap().backend = Some(self.services.clone()); + Ok(Some(Event::NewUpdateAvailable(self.update_available.clone()))) + } +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct Start { + update_id: UpdateRequestId, + chunkscount: u64, + checksum: String +} + +impl Parameter for Start { + fn handle(&self, remote: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> { + info!("Starting transfer for update_id {}", self.update_id); + let mut transfers = transfers.lock().unwrap(); + transfers.push(self.update_id.clone(), self.checksum.clone()); + + let remote = remote.lock().unwrap(); + let chunk = ChunkReceived { + device: remote.device_id.clone(), + update_id: self.update_id.clone(), + chunks: Vec::new() + }; + remote.send_chunk_received(chunk) + .map(|_| None) + .map_err(|err| format!("error sending start ack: {}", err)) + } +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct Chunk { + update_id: UpdateRequestId, + bytes: String, + index: u64 +} + +impl Parameter for Chunk { + fn handle(&self, remote: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> { + let remote = remote.lock().unwrap(); + + let mut transfers = transfers.lock().unwrap(); + let transfer = try!(transfers.get_mut(self.update_id.clone()) + .ok_or(format!("couldn't find transfer for update_id {}", self.update_id))); + transfer.write_chunk(&self.bytes, self.index) + .map_err(|err| format!("couldn't write chunk: {}", err)) + .and_then(|_| { + trace!("wrote chunk {} for package {}", self.index, self.update_id); + let chunk = ChunkReceived { + device: remote.device_id.clone(), + update_id: self.update_id.clone(), + chunks: transfer.transferred_chunks.clone(), + }; + remote.send_chunk_received(chunk) + .map(|_| None) + .map_err(|err| format!("error sending ChunkReceived: {}", err)) + }) + } +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct Finish { + update_id: UpdateRequestId, + signature: String +} + +impl Parameter for Finish { + fn handle(&self, _: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> { + let mut transfers = transfers.lock().unwrap(); + let image = { + let transfer = try!(transfers.get(self.update_id.clone()) + .ok_or(format!("unknown package: {}", self.update_id))); + let package = try!(transfer.assemble_package() + .map_err(|err| format!("couldn't assemble package: {}", err))); + try!(package.into_os_string().into_string() + .map_err(|err| format!("couldn't get image: {:?}", err))) + }; + transfers.remove(self.update_id.clone()); + info!("Finished transfer of {}", self.update_id); + + let complete = DownloadComplete { + update_id: self.update_id.clone(), + update_image: image, + signature: self.signature.clone() + }; + Ok(Some(Event::DownloadComplete(complete))) + } +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct Report; + +impl Parameter for Report { + fn handle(&self, _: &Mutex<RemoteServices>, _: &Mutex<Transfers>) -> Result<Option<Event>, String> { + Ok(Some(Event::InstalledSoftwareNeeded)) + } +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct Abort; + +impl Parameter for Abort { + fn handle(&self, _: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> { + transfers.lock().unwrap().clear(); + Ok(None) + } +} diff --git a/src/rvi/services.rs b/src/rvi/services.rs new file mode 100644 index 0000000..fd5a640 --- /dev/null +++ b/src/rvi/services.rs @@ -0,0 +1,185 @@ +use chan; +use chan::Sender; +use rustc_serialize::{json, Decodable, Encodable}; +use std::thread; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use time; + +use datatype::{ChunkReceived, DownloadStarted, Event, InstalledSoftware, + RpcRequest, RpcOk, RpcErr, RviConfig, UpdateReport, UpdateRequestId, + Url}; +use super::parameters::{Abort, Chunk, Finish, Notify, Parameter, Report, Start}; +use super::transfers::Transfers; + + +/// Hold references to RVI service endpoints, currently active `Transfers`, and +/// where to broadcast outcome `Event`s to. +#[derive(Clone)] +pub struct Services { + pub remote: Arc<Mutex<RemoteServices>>, + pub sender: Arc<Mutex<Sender<Event>>>, + pub transfers: Arc<Mutex<Transfers>>, +} + +impl Services { + /// Set up a new RVI service handler, pruning any inactive `Transfer`s each second. + pub fn new(rvi_cfg: RviConfig, device_id: String, sender: Sender<Event>) -> Self { + let transfers = Arc::new(Mutex::new(Transfers::new(rvi_cfg.storage_dir))); + rvi_cfg.timeout.map_or_else(|| info!("Transfers will never time out."), |timeout| { + info!("Transfers timeout after {} seconds.", timeout); + let transfers = transfers.clone(); + thread::spawn(move || { + let tick = chan::tick(Duration::from_secs(1)); + loop { + let _ = tick.recv(); + let mut transfers = transfers.lock().unwrap(); + transfers.prune(time::get_time().sec, timeout); + } + }); + }); + + Services { + remote: Arc::new(Mutex::new(RemoteServices::new(device_id, rvi_cfg.client))), + sender: Arc::new(Mutex::new(sender)), + transfers: transfers, + } + } + + /// Register each RVI endpoint with the provided registration function which + /// should return a `String` representation of the URL used to contact that + /// service. + pub fn register_services<F: Fn(&str) -> String>(&mut self, register: F) { + let _ = register("/sota/notify"); + let mut remote = self.remote.lock().unwrap(); + remote.local = Some(LocalServices { + start: register("/sota/start"), + chunk: register("/sota/chunk"), + abort: register("/sota/abort"), + finish: register("/sota/finish"), + getpackages: register("/sota/getpackages") + }); + } + + /// Handle an incoming message for a specific service endpoint. + pub fn handle_service(&self, service: &str, id: u64, msg: &str) -> Result<RpcOk<i32>, RpcErr> { + match service { + "/sota/notify" => self.handle_message::<Notify>(id, msg), + "/sota/start" => self.handle_message::<Start>(id, msg), + "/sota/chunk" => self.handle_message::<Chunk>(id, msg), + "/sota/finish" => self.handle_message::<Finish>(id, msg), + "/sota/getpackages" => self.handle_message::<Report>(id, msg), + "/sota/abort" => self.handle_message::<Abort>(id, msg), + _ => Err(RpcErr::invalid_request(id, format!("unknown service: {}", service))) + } + } + + /// Parse the message as an `RpcRequest<RviMessage<Parameter>>` then delegate + /// to the specific `Parameter.handle()` function, forwarding any returned + /// `Event` to the `Services` sender. + fn handle_message<P>(&self, id: u64, msg: &str) -> Result<RpcOk<i32>, RpcErr> + where P: Parameter + Encodable + Decodable + { + let request = try!(json::decode::<RpcRequest<RviMessage<P>>>(&msg).map_err(|err| { + error!("couldn't decode message: {}", err); + RpcErr::invalid_params(id, format!("couldn't decode message: {}", err)) + })); + let event = try!(request.params.parameters[0].handle(&self.remote, &self.transfers).map_err(|err| { + error!("couldn't handle parameters: {}", err); + RpcErr::unspecified(request.id, format!("couldn't handle parameters: {}", err)) + })); + event.map(|ev| self.sender.lock().unwrap().send(ev)); + Ok(RpcOk::new(request.id, None)) + } +} + + +pub struct RemoteServices { + pub device_id: String, + pub rvi_client: Url, + pub local: Option<LocalServices>, + pub backend: Option<BackendServices> +} + +impl RemoteServices { + pub fn new(device_id: String, rvi_client: Url) -> RemoteServices { + RemoteServices { device_id: device_id, rvi_client: rvi_client, local: None, backend: None } + } + + fn send_message<E: Encodable>(&self, body: E, addr: &str) -> Result<String, String> { + RpcRequest::new("message", RviMessage::new(addr, vec![body], 60)).send(self.rvi_client.clone()) + } + + pub fn send_download_started(&self, update_id: UpdateRequestId) -> Result<String, String> { + let backend = try!(self.backend.as_ref().ok_or("BackendServices not set")); + let local = try!(self.local.as_ref().ok_or("LocalServices not set")); + let start = DownloadStarted { device: self.device_id.clone(), update_id: update_id, services: local.clone() }; + self.send_message(start, &backend.start) + } + + pub fn send_chunk_received(&self, chunk: ChunkReceived) -> Result<String, String> { + let backend = try!(self.backend.as_ref().ok_or("BackendServices not set")); + self.send_message(chunk, &backend.ack) + } + + pub fn send_update_report(&self, report: UpdateReport) -> Result<String, String> { + let backend = try!(self.backend.as_ref().ok_or("BackendServices not set")); + let result = UpdateReportResult { device: self.device_id.clone(), update_report: report }; + self.send_message(result, &backend.report) + } + + pub fn send_installed_software(&self, installed: InstalledSoftware) -> Result<String, String> { + let backend = try!(self.backend.as_ref().ok_or("BackendServices not set")); + let result = InstalledSoftwareResult { device_id: self.device_id.clone(), installed: installed }; + self.send_message(result, &backend.packages) + } +} + + +#[derive(Clone, RustcDecodable, RustcEncodable)] +pub struct LocalServices { + pub start: String, + pub abort: String, + pub chunk: String, + pub finish: String, + pub getpackages: String, +} + +#[derive(Clone, RustcDecodable, RustcEncodable)] +pub struct BackendServices { + pub start: String, + pub ack: String, + pub report: String, + pub packages: String +} + + +#[derive(RustcDecodable, RustcEncodable)] +struct UpdateReportResult { + pub device: String, + pub update_report: UpdateReport +} + +#[derive(RustcDecodable, RustcEncodable)] +struct InstalledSoftwareResult { + device_id: String, + installed: InstalledSoftware +} + + +#[derive(RustcDecodable, RustcEncodable)] +pub struct RviMessage<E: Encodable> { + pub service_name: String, + pub parameters: Vec<E>, + pub timeout: Option<i64> +} + +impl<E: Encodable> RviMessage<E> { + pub fn new(service: &str, parameters: Vec<E>, expire_in: i64) -> RviMessage<E> { + RviMessage { + service_name: service.to_string(), + parameters: parameters, + timeout: Some((time::get_time() + time::Duration::seconds(expire_in)).sec) + } + } +} diff --git a/src/rvi/transfers.rs b/src/rvi/transfers.rs new file mode 100644 index 0000000..859c2c1 --- /dev/null +++ b/src/rvi/transfers.rs @@ -0,0 +1,278 @@ +use crypto::digest::Digest; +use crypto::sha1::Sha1; +use rustc_serialize::base64::FromBase64; +use std::fs; +use std::collections::HashMap; +use std::fs::File; +use std::io::prelude::*; +use std::path::PathBuf; +use std::str::FromStr; +use std::vec::Vec; +use time; + +use datatype::UpdateRequestId; + + +/// Holds all currently active transfers where each is referenced by `UpdateRequestId`. +pub struct Transfers { + items: HashMap<UpdateRequestId, Transfer>, + storage_dir: String +} + +impl Transfers { + pub fn new(storage_dir: String) -> Transfers { + Transfers { items: HashMap::new(), storage_dir: storage_dir } + } + + pub fn get(&self, update_id: UpdateRequestId) -> Option<&Transfer> { + self.items.get(&update_id) + } + + pub fn get_mut(&mut self, update_id: UpdateRequestId) -> Option<&mut Transfer> { + self.items.get_mut(&update_id) + } + + pub fn push(&mut self, update_id: UpdateRequestId, checksum: String) { + let transfer = Transfer::new(self.storage_dir.to_string(), update_id.clone(), checksum); + self.items.insert(update_id, transfer); + } + + pub fn remove(&mut self, update_id: UpdateRequestId) { + self.items.remove(&update_id); + } + + pub fn clear(&mut self) { + self.items.clear(); + } + + pub fn prune(&mut self, now: i64, timeout: i64) { + let mut timeouts = Vec::new(); + for (id, transfer) in &mut self.items { + if now - transfer.last_chunk_received > timeout { + timeouts.push(id.clone()); + } + } + + for id in timeouts { + self.items.remove(&id); + info!("Transfer for update_id {} timed out.", id) + } + } +} + + +/// Holds the details of the transferred chunks relating to an `UpdateRequestId`. +pub struct Transfer { + pub update_id: UpdateRequestId, + pub checksum: String, + pub transferred_chunks: Vec<u64>, + pub storage_dir: String, + pub last_chunk_received: i64 +} + +impl Transfer { + /// Prepare for the transfer of a new package. + pub fn new(storage_dir: String, update_id: UpdateRequestId, checksum: String) -> Transfer { + Transfer { + update_id: update_id, + checksum: checksum, + transferred_chunks: Vec::new(), + storage_dir: storage_dir, + last_chunk_received: time::get_time().sec + } + } + + /// Write the received chunk to disk and store metadata inside `Transfer`. + pub fn write_chunk(&mut self, data: &str, index: u64) -> Result<(), String> { + self.last_chunk_received = time::get_time().sec; + let mut path = try!(self.get_chunk_dir().map_err(|err| format!("couldn't get chunk dir: {}", err))); + path.push(index.to_string()); + let mut file = try!(File::create(path).map_err(|err| format!("couldn't open chunk file: {}", err))); + + let data = try!(data.from_base64().map_err(|err| format!("couldn't decode chunk {}: {}", index, err))); + try!(file.write_all(&data) + .map_err(|err| format!("couldn't write chunk {} for update_id {}: {}", index, self.update_id, err))); + try!(file.flush().map_err(|err| format!("couldn't flush file: {}", err))); + + self.transferred_chunks.push(index); + self.transferred_chunks.sort(); + self.transferred_chunks.dedup(); + Ok(()) + } + + /// Assemble all received chunks into a complete package. + pub fn assemble_package(&self) -> Result<PathBuf, String> { + debug!("finalizing package {}", self.update_id); + try!(self.assemble_chunks()); + self.verify() + .and_then(|_| self.get_package_path()) + .map_err(|err| format!("couldn't assemble_package for update_id {}: {}", self.update_id, err)) + } + + fn assemble_chunks(&self) -> Result<(), String> { + let pkg_path = try!(self.get_package_path()); + debug!("saving update_id {} to {}", self.update_id, pkg_path.display()); + let mut file = try!(File::create(pkg_path).map_err(|err| format!("couldn't open package file: {}", err))); + + let chunk_dir = try!(self.get_chunk_dir()); + let entries = try!(fs::read_dir(chunk_dir.clone()).map_err(|err| format!("couldn't read dir: {}", err))); + let mut indices = Vec::new(); + for entry in entries { + let entry = try!(entry.map_err(|err| format!("bad entry: {}", err))); + let name = try!(entry.file_name().into_string().map_err(|err| format!("bad entry name: {:?}", err))); + let index = try!(u64::from_str(&name).map_err(|err| format!("couldn't parse chunk index: {}", err))); + indices.push(index); + } + indices.sort(); + + for index in indices { + try!(self.append_chunk(&mut file, chunk_dir.clone(), index)); + } + Ok(debug!("assembled chunks for update_id {}", self.update_id)) + } + + fn append_chunk(&self, file: &mut File, mut chunk_dir: PathBuf, index: u64) -> Result<(), String> { + chunk_dir.push(&index.to_string()); + let mut chunk = try!(File::open(chunk_dir).map_err(|err| format!("couldn't open chunk: {}", err))); + let mut buf = Vec::new(); + try!(chunk.read_to_end(&mut buf).map_err(|err| format!("couldn't read file {}: {}", index, err))); + try!(file.write(&buf).map_err(|err| format!("couldn't write chunk {}: {}", index, err))); + Ok(trace!("wrote chunk {} for update_id {}", index, self.update_id)) + } + + fn verify(&self) -> Result<(), String> { + let path = try!(self.get_package_path()); + let mut file = try!(File::open(path).map_err(|err| format!("couldn't open package path: {}", err))); + let mut data = Vec::new(); + try!(file.read_to_end(&mut data).map_err(|err| format!("couldn't read file: {}", err))); + + let mut hash = Sha1::new(); + hash.input(&data); + if hash.result_str() == self.checksum { + Ok(()) + } else { + Err(format!("update_id {} checksum failed: expected {}, got {}", self.update_id, self.checksum, hash.result_str())) + } + } + + fn get_chunk_dir(&self) -> Result<PathBuf, String> { + let mut path = PathBuf::from(&self.storage_dir); + path.push("downloads"); + path.push(self.update_id.clone()); + fs::create_dir_all(&path) + .map(|_| path) + .map_err(|err| format!("couldn't create chunk dir: {}", err)) + } + + fn get_package_path(&self) -> Result<PathBuf, String> { + let mut path = PathBuf::from(&self.storage_dir); + path.push("packages"); + try!(fs::create_dir_all(&path) + .map_err(|err| format!("couldn't create package dir {:?}: {}", path, err))); + path.push(format!("{}.spkg", self.update_id)); + Ok(path) + } +} + +impl Drop for Transfer { + fn drop(&mut self) { + let _ = self.get_chunk_dir().map(|dir| { + fs::read_dir(&dir) + .or_else(|err| Err(error!("couldn't read dir {:?}: {}", &dir, err))) + .and_then(|entries| { + for entry in entries { + let _ = entry.map(|entry| fs::remove_file(entry.path())) + .map_err(|err| error!("found a malformed entry: {}", err)); + } + Ok(fs::remove_dir(dir).map_err(|err| error!("couldn't remove dir: {}", err))) + }) + }); + } +} + + +#[cfg(test)] +mod test { + use rand; + use rand::Rng; + use rustc_serialize::base64; + use rustc_serialize::base64::ToBase64; + use std::path::PathBuf; + use std::fs::File; + use std::io::prelude::*; + use time; + + use super::*; + use package_manager::TestDir; + + + impl Transfer { + pub fn new_test(test_dir: &TestDir) -> Transfer { + Transfer { + update_id: rand::thread_rng().gen_ascii_chars().take(10).collect::<String>(), + checksum: "".to_string(), + transferred_chunks: Vec::new(), + storage_dir: test_dir.0.clone(), + last_chunk_received: time::get_time().sec + } + } + + pub fn assert_chunk_written(&mut self, test_dir: &TestDir, index: u64, data: &[u8]) { + let encoded = data.to_base64(base64::Config { + char_set: base64::CharacterSet::UrlSafe, + newline: base64::Newline::LF, + pad: true, + line_length: None + }); + self.write_chunk(encoded.as_ref(), index).expect("couldn't write chunk"); + + let path = PathBuf::from(format!("{}/downloads/{}/{}", test_dir.0.clone(), self.update_id, index)); + let mut file = File::open(path).map_err(|err| panic!("couldn't open file: {}", err)).unwrap(); + let mut buf = Vec::new(); + let _ = file.read_to_end(&mut buf).expect("couldn't read file"); + assert_eq!(data.to_vec(), buf); + } + } + + + #[test] + fn test_package_directory_created() { + let test_dir = TestDir::new("sota-test-transfers"); + let transfer = Transfer::new_test(&test_dir); + let chunk_dir = transfer.get_package_path().unwrap(); + let path = format!("{}/packages/{}.spkg", test_dir.0, transfer.update_id); + assert_eq!(chunk_dir.to_str().unwrap(), path); + } + + #[test] + fn test_checksum() { + let test_dir = TestDir::new("sota-test-transfers"); + let mut transfer = Transfer::new_test(&test_dir); + transfer.assert_chunk_written(&test_dir, 0, "test\n".to_string().as_bytes()); + transfer.assemble_chunks().expect("couldn't assemble chunks"); + + transfer.checksum = "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83".to_string(); + assert!(transfer.verify().is_ok()); + + transfer.checksum = "invalid".to_string(); + assert!(!transfer.verify().is_ok()); + } + + #[test] + fn test_assemble_chunks() { + let test_dir = TestDir::new("sota-test-transfers"); + let mut transfer = Transfer::new_test(&test_dir); + let mut assembly = String::new(); + for index in 1..20 { + let data = rand::thread_rng().gen_ascii_chars().take(index).collect::<String>(); + assembly.push_str(&data); + transfer.assert_chunk_written(&test_dir, index as u64, data.as_bytes()); + } + + transfer.assemble_chunks().expect("couldn't assemble chunks"); + let path = format!("{}/packages/{}.spkg", test_dir.0, transfer.update_id); + let mut buf = Vec::new(); + let _ = File::open(PathBuf::from(path)).unwrap().read_to_end(&mut buf).unwrap(); + assert_eq!(assembly.into_bytes(), buf); + } +} diff --git a/src/sota.rs b/src/sota.rs new file mode 100644 index 0000000..3dec33d --- /dev/null +++ b/src/sota.rs @@ -0,0 +1,141 @@ +use rustc_serialize::json; +use std::fs::File; +use std::io; +use std::path::PathBuf; + +use datatype::{Config, DeviceReport, DownloadComplete, Error, Package, + PendingUpdateRequest, UpdateRequestId, UpdateReport, Url}; +use http::Client; + + +/// Encapsulate the client configuration and HTTP client used for +/// software-over-the-air updates. +pub struct Sota<'c, 'h> { + config: &'c Config, + client: &'h Client, +} + +impl<'c, 'h> Sota<'c, 'h> { + /// Creates a new instance for Sota communication. + pub fn new(config: &'c Config, client: &'h Client) -> Sota<'c, 'h> { + Sota { config: config, client: client } + } + + /// Takes a path and returns a new endpoint of the format + /// `<Core server>/api/v1/device_updates/<uuid>/<path>`. + pub fn endpoint(&self, path: &str) -> Url { + let endpoint = if path.is_empty() { + format!("/api/v1/device_updates/{}", self.config.device.uuid) + } else { + format!("/api/v1/device_updates/{}/{}", self.config.device.uuid, path) + }; + self.config.core.server.join(&endpoint).expect("couldn't build endpoint url") + } + + /// Query the Core server to identify any new package updates available. + pub fn get_pending_updates(&mut self) -> Result<Vec<PendingUpdateRequest>, Error> { + let resp_rx = self.client.get(self.endpoint(""), None); + let resp = resp_rx.recv().expect("no get_package_updates response received"); + let data = try!(resp); + let text = try!(String::from_utf8(data)); + Ok(try!(json::decode::<Vec<PendingUpdateRequest>>(&text))) + } + + /// Download a specific update from the Core server. + pub fn download_update(&mut self, id: UpdateRequestId) -> Result<DownloadComplete, Error> { + let resp_rx = self.client.get(self.endpoint(&format!("{}/download", id)), None); + let resp = resp_rx.recv().expect("no download_package_update response received"); + let data = try!(resp); + + let mut path = PathBuf::new(); + path.push(&self.config.device.packages_dir); + path.push(id.clone()); // TODO: Use Content-Disposition filename from request? + let mut file = try!(File::create(path.as_path())); + + let _ = io::copy(&mut &*data, &mut file); + let path = try!(path.to_str().ok_or(Error::Parse(format!("Path is not valid UTF-8: {:?}", path)))); + + Ok(DownloadComplete { + update_id: id, + update_image: path.to_string(), + signature: "".to_string() + }) + } + + /// Install an update using the package manager. + pub fn install_update(&mut self, download: DownloadComplete) -> Result<UpdateReport, UpdateReport> { + let ref pacman = self.config.device.package_manager; + pacman.install_package(&download.update_image).and_then(|(code, output)| { + Ok(UpdateReport::single(download.update_id.clone(), code, output)) + }).or_else(|(code, output)| { + Err(UpdateReport::single(download.update_id.clone(), code, output)) + }) + } + + /// Get a list of the currently installed packages from the package manager. + pub fn get_installed_packages(&mut self) -> Result<Vec<Package>, Error> { + Ok(try!(self.config.device.package_manager.installed_packages())) + } + + /// Send a list of the currently installed packages to the Core server. + pub fn send_installed_packages(&mut self, packages: &Vec<Package>) -> Result<(), Error> { + let body = try!(json::encode(packages)); + let resp_rx = self.client.put(self.endpoint("installed"), Some(body.into_bytes())); + let _ = resp_rx.recv().expect("no update_installed_packages response received") + .map_err(|err| error!("update_installed_packages failed: {}", err)); + Ok(()) + } + + /// Send the outcome of a package update to the Core server. + pub fn send_update_report(&mut self, update_report: &UpdateReport) -> Result<(), Error> { + let report = DeviceReport::new(&self.config.device.uuid, update_report); + let body = try!(json::encode(&report)); + let url = self.endpoint(report.device); + let resp_rx = self.client.post(url, Some(body.into_bytes())); + let resp = resp_rx.recv().expect("no send_install_report response received"); + let _ = try!(resp); + Ok(()) + } + + /// Send system information from the device to the Core server. + pub fn send_system_info(&mut self, body: &str) -> Result<(), Error> { + let resp_rx = self.client.put(self.endpoint("system_info"), Some(body.as_bytes().to_vec())); + let resp = resp_rx.recv().expect("no send_system_info response received"); + let _ = try!(resp); + Ok(()) + } +} + + +#[cfg(test)] +mod tests { + use rustc_serialize::json; + + use super::*; + use datatype::{Config, Package, PendingUpdateRequest}; + use http::TestClient; + + + #[test] + fn test_get_pending_updates() { + let pending_update = PendingUpdateRequest { + requestId: "someid".to_string(), + installPos: 0, + packageId: Package { + name: "fake-pkg".to_string(), + version: "0.1.1".to_string() + }, + createdAt: "2010-01-01".to_string() + }; + + let json = format!("[{}]", json::encode(&pending_update).unwrap()); + let mut sota = Sota { + config: &Config::default(), + client: &mut TestClient::from(vec![json.to_string()]), + }; + + let updates: Vec<PendingUpdateRequest> = sota.get_pending_updates().unwrap(); + let ids: Vec<String> = updates.iter().map(|p| p.requestId.clone()).collect(); + assert_eq!(ids, vec!["someid".to_string()]) + } +} diff --git a/src/test_library.rs b/src/test_library.rs deleted file mode 100644 index b5af298..0000000 --- a/src/test_library.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Helper functions for testing `sota_client`. - -use std::path::PathBuf; -use std::fmt; -use std::fs; - -use time; -use log; -use log::{LogRecord, LogLevel, LogMetadata}; - -/// Initiates logging in tests. Can safely be called multiple times. -macro_rules! test_init { - () => { - use test_library::SimpleLogger; - use log::LogLevelFilter; - use log; - match log::set_logger(|max_log_level| { - max_log_level.set(LogLevelFilter::Trace); - Box::new(SimpleLogger) - }) { - Ok(..) => {}, - Err(..) => {} - } - } -} - -/// Implements a simple logger printing all log messages to stdout. -pub struct SimpleLogger; - -impl log::Log for SimpleLogger { - fn enabled(&self, metadata: &LogMetadata) -> bool { - metadata.level() <= LogLevel::Info - } - - fn log(&self, record: &LogRecord) { - if self.enabled(record.metadata()) { - println!("{} - {}", record.level(), record.args()); - } - } -} - -/// Wrapper for storing test data in a temporary directory. The created directory will be deleted, -/// when dropped. -pub struct PathPrefix { prefix: String } - -impl PathPrefix { - pub fn new() -> PathPrefix { - PathPrefix { - prefix: format!("/tmp/rust-test-{}", - time::precise_time_ns() - .to_string()) - } - } - - pub fn to_string(&self) -> String { - return self.prefix.clone(); - } -} - -impl Drop for PathPrefix { - fn drop(&mut self) { - let dir = PathBuf::from(&self.prefix); - fs::remove_dir_all(dir).unwrap(); - } -} - -impl fmt::Display for PathPrefix { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.prefix) - } -} - |