summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/broadcast.rs60
-rw-r--r--src/configuration/client.rs177
-rw-r--r--src/configuration/common.rs128
-rw-r--r--src/configuration/configuration.rs176
-rw-r--r--src/configuration/dbus.rs155
-rw-r--r--src/configuration/mod.rs12
-rw-r--r--src/datatype/auth.rs49
-rw-r--r--src/datatype/command.rs306
-rw-r--r--src/datatype/config.rs389
-rw-r--r--src/datatype/dbus.rs81
-rw-r--r--src/datatype/error.rs119
-rw-r--r--src/datatype/event.rs56
-rw-r--r--src/datatype/json_rpc.rs107
-rw-r--r--src/datatype/mod.rs26
-rw-r--r--src/datatype/package.rs79
-rw-r--r--src/datatype/system_info.rs46
-rw-r--r--src/datatype/update_report.rs182
-rw-r--r--src/datatype/url.rs106
-rw-r--r--src/event/inbound.rs28
-rw-r--r--src/event/mod.rs9
-rw-r--r--src/event/outbound.rs68
-rw-r--r--src/gateway/console.rs46
-rw-r--r--src/gateway/dbus.rs170
-rw-r--r--src/gateway/gateway.rs32
-rw-r--r--src/gateway/http.rs135
-rw-r--r--src/gateway/mod.rs13
-rw-r--r--src/gateway/socket.rs161
-rw-r--r--src/gateway/websocket.rs169
-rw-r--r--src/genivi/dbus.rs121
-rw-r--r--src/genivi/mod.rs4
-rw-r--r--src/genivi/sc.rs126
-rw-r--r--src/genivi/start.rs72
-rw-r--r--src/genivi/swm.rs63
-rw-r--r--src/http/auth_client.rs278
-rw-r--r--src/http/http_client.rs43
-rw-r--r--src/http/http_server.rs113
-rw-r--r--src/http/mod.rs11
-rw-r--r--src/http/openssl.rs47
-rw-r--r--src/http/test_client.rs35
-rw-r--r--src/interpreter.rs364
-rw-r--r--src/lib.rs82
-rw-r--r--src/main.rs361
-rw-r--r--src/oauth2.rs53
-rw-r--r--src/package_manager/deb.rs48
-rw-r--r--src/package_manager/mod.rs8
-rw-r--r--src/package_manager/otb.rs53
-rw-r--r--src/package_manager/package_manager.rs135
-rw-r--r--src/package_manager/rpm.rs46
-rw-r--r--src/package_manager/tpm.rs162
-rw-r--r--src/remote/dw.rs581
-rw-r--r--src/remote/jsonrpc.rs150
-rw-r--r--src/remote/mod.rs5
-rw-r--r--src/remote/parm.rs189
-rw-r--r--src/remote/rvi/edge.rs162
-rw-r--r--src/remote/rvi/message.rs61
-rw-r--r--src/remote/rvi/mod.rs18
-rw-r--r--src/remote/rvi/send.rs61
-rw-r--r--src/remote/svc.rs272
-rw-r--r--src/rvi/edge.rs127
-rw-r--r--src/rvi/mod.rs9
-rw-r--r--src/rvi/parameters.rs136
-rw-r--r--src/rvi/services.rs185
-rw-r--r--src/rvi/transfers.rs278
-rw-r--r--src/sota.rs141
-rw-r--r--src/test_library.rs72
65 files changed, 4918 insertions, 2839 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs
new file mode 100644
index 0000000..5389b0c
--- /dev/null
+++ b/src/broadcast.rs
@@ -0,0 +1,60 @@
+use chan;
+use chan::{Sender, Receiver};
+
+
+/// Retain a list of all peers that should receive the incoming message.
+pub struct Broadcast<A: Clone> {
+ peers: Vec<Sender<A>>,
+ rx: Receiver<A>
+}
+
+impl<A: Clone> Broadcast<A> {
+ /// Instantiate a new broadcaster for the given `Receiver`.
+ pub fn new(rx: Receiver<A>) -> Broadcast<A> {
+ Broadcast { peers: vec![], rx: rx }
+ }
+
+ /// Start receiving broadcasting messages and forwarding each to the list
+ /// of peers.
+ pub fn start(&self) {
+ loop {
+ self.rx.recv().map(|a| {
+ for subscriber in &self.peers {
+ subscriber.send(a.clone());
+ }
+ });
+ }
+ }
+
+ /// Add a new subscriber to the list of peers that will receive the broadcast
+ /// messages.
+ pub fn subscribe(&mut self) -> Receiver<A> {
+ let (tx, rx) = chan::sync::<A>(0);
+ self.peers.push(tx);
+ rx
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use chan;
+ use std::thread;
+
+ use super::*;
+
+
+ #[test]
+ fn test_broadcasts_events() {
+ let (tx, rx) = chan::sync(0);
+ let mut broadcast = Broadcast::new(rx);
+
+ let a = broadcast.subscribe();
+ let b = broadcast.subscribe();
+ thread::spawn(move || broadcast.start());
+
+ tx.send(123);
+ assert_eq!(123, a.recv().unwrap());
+ assert_eq!(123, b.recv().unwrap());
+ }
+}
diff --git a/src/configuration/client.rs b/src/configuration/client.rs
deleted file mode 100644
index 625f4e9..0000000
--- a/src/configuration/client.rs
+++ /dev/null
@@ -1,177 +0,0 @@
-//! Handles the `client` section of the configuration file.
-
-use toml;
-
-use super::common::{get_required_key, get_optional_key, ConfTreeParser, Result};
-
-/// Type to encode allowed keys for the `client` section of the configuration.
-#[derive(Clone)]
-pub struct ClientConfiguration {
- /// Directory where chunks and packages will be stored.
- pub storage_dir: String,
- /// The full URL where RVI can be reached.
- pub rvi_url: Option<String>,
- /// The `host:port` combination where the client should bind and listen for incoming RVI calls.
- pub edge_url: Option<String>,
- /// How long to wait for further server messages before the `Transfer` will be dropped.
- pub timeout: Option<i64>,
- /// Index of the RVI service URL, that holds the VIN for this device.
- pub vin_match: i32
-}
-
-impl ConfTreeParser<ClientConfiguration> for ClientConfiguration {
- fn parse(tree: &toml::Table) -> Result<ClientConfiguration> {
- let client_tree = try!(tree.get("client")
- .ok_or("Missing required subgroup \"client\""));
-
- let storage_dir = try!(get_required_key(client_tree, "storage_dir", "client"));
- let rvi_url = try!(get_optional_key(client_tree, "rvi_url", "client"));
- let edge_url = try!(get_optional_key(client_tree, "edge_url", "client"));
- let timeout = try!(get_optional_key(client_tree, "timeout", "client"));
- let vin_match = try!(get_optional_key(client_tree, "vin_match", "client"));
-
- Ok(ClientConfiguration {
- storage_dir: storage_dir,
- rvi_url: rvi_url,
- edge_url: edge_url,
- timeout: timeout,
- vin_match: vin_match.unwrap_or(2)
- })
- }
-}
-
-#[cfg(test)] static STORAGE: &'static str = "/var/sota";
-#[cfg(test)] static RVI: &'static str = "/http://localhost:8901";
-#[cfg(test)] static EDGE: &'static str = "localhost:9080";
-#[cfg(test)] static TIMEOUT: i64 = 10;
-#[cfg(test)] static VIN: i32 = 3;
-
-#[cfg(test)]
-pub fn gen_valid_conf() -> String {
- format!(r#"
- [client]
- storage_dir = "{}"
- rvi_url = "{}"
- edge_url = "{}"
- timeout = {}
- vin_match = {}
- "#, STORAGE, RVI, EDGE, TIMEOUT, VIN)
-}
-
-#[cfg(test)]
-pub fn assert_conf(configuration: &ClientConfiguration) -> bool {
- assert_eq!(&configuration.storage_dir, STORAGE);
- assert_eq!(&configuration.rvi_url.clone().unwrap(), RVI);
- assert_eq!(&configuration.edge_url.clone().unwrap(), EDGE);
- assert_eq!(configuration.timeout.unwrap(), TIMEOUT);
- assert_eq!(configuration.vin_match, VIN);
- true
-}
-
-#[cfg(test)]
-pub mod test {
- use super::*;
- use super::{STORAGE, RVI, EDGE, TIMEOUT, VIN};
- use configuration::common::{ConfTreeParser, read_tree};
-
- #[test]
- fn it_requires_the_storage_dir_key() {
- test_init!();
- let data = format!(r#"
- [client]
- rvi_url = "{}"
- edge_url = "{}"
- timeout = {}
- vin_match = {}
- "#, RVI, EDGE, TIMEOUT, VIN);
-
- let tree = read_tree(&data).unwrap();
- match ClientConfiguration::parse(&tree) {
- Ok(..) => panic!("Accepted invalid configuration!"),
- Err(e) => {
- assert_eq!(e,
- "Missing required key \"storage_dir\" in \"client\""
- .to_string());
- }
- };
- }
-
- #[test]
- fn it_doesnt_require_the_rvi_url_key() {
- test_init!();
- let data = format!(r#"
- [client]
- storage_dir = "{}"
- edge_url = "{}"
- timeout = {}
- vin_match = {}
- "#, STORAGE, EDGE, TIMEOUT, VIN);
-
- let tree = read_tree(&data).unwrap();
- let configuration = ClientConfiguration::parse(&tree).unwrap();
- assert_eq!(&configuration.storage_dir, STORAGE);
- assert_eq!(configuration.rvi_url, None);
- assert_eq!(&configuration.edge_url.unwrap(), EDGE);
- assert_eq!(configuration.timeout.unwrap(), TIMEOUT);
- assert_eq!(configuration.vin_match, VIN);
- }
-
- #[test]
- fn it_doesnt_require_the_edge_url_key() {
- test_init!();
- let data = format!(r#"
- [client]
- storage_dir = "{}"
- rvi_url = "{}"
- timeout = {}
- vin_match = {}
- "#, STORAGE, RVI, TIMEOUT, VIN);
-
- let tree = read_tree(&data).unwrap();
- let configuration = ClientConfiguration::parse(&tree).unwrap();
- assert_eq!(&configuration.storage_dir, STORAGE);
- assert_eq!(&configuration.rvi_url.unwrap(), RVI);
- assert_eq!(configuration.edge_url, None);
- assert_eq!(configuration.vin_match, VIN);
- }
-
- #[test]
- fn it_doesnt_require_the_timeout_key() {
- test_init!();
- let data = format!(r#"
- [client]
- storage_dir = "{}"
- rvi_url = "{}"
- edge_url = "{}"
- vin_match = {}
- "#, STORAGE, RVI, EDGE, VIN);
-
- let tree = read_tree(&data).unwrap();
- let configuration = ClientConfiguration::parse(&tree).unwrap();
- assert_eq!(&configuration.storage_dir, STORAGE);
- assert_eq!(&configuration.rvi_url.unwrap(), RVI);
- assert_eq!(&configuration.edge_url.unwrap(), EDGE);
- assert_eq!(configuration.timeout, None);
- assert_eq!(configuration.vin_match, VIN);
- }
-
- #[test]
- fn it_doesnt_require_the_vin_match_key_and_uses_a_default() {
- test_init!();
- let data = format!(r#"
- [client]
- storage_dir = "{}"
- rvi_url = "{}"
- edge_url = "{}"
- timeout = {}
- "#, STORAGE, RVI, EDGE, TIMEOUT);
-
- let tree = read_tree(&data).unwrap();
- let configuration = ClientConfiguration::parse(&tree).unwrap();
- assert_eq!(&configuration.storage_dir, STORAGE);
- assert_eq!(&configuration.rvi_url.unwrap(), RVI);
- assert_eq!(&configuration.edge_url.unwrap(), EDGE);
- assert_eq!(configuration.timeout.unwrap(), TIMEOUT);
- assert_eq!(configuration.vin_match, 2);
- }
-}
diff --git a/src/configuration/common.rs b/src/configuration/common.rs
deleted file mode 100644
index e6b0fcd..0000000
--- a/src/configuration/common.rs
+++ /dev/null
@@ -1,128 +0,0 @@
-//! Helper functions and traits for all configuration sections.
-
-use toml;
-use std::result;
-use std::fmt;
-
-/// `Result` type used throughout the configuration parser.
-pub type Result<T> = result::Result<T, String>;
-
-/// Trait that provides a interface for parsing a (sub-) tree of the configuration.
-pub trait ConfTreeParser<C> {
- /// Try to parse the given `tree` into the type this trait is implemented for.
- /// Returns the parsed object or a error message, with the first error encountered while
- /// parsing the `tree`.
- ///
- /// # Arguments
- /// * `tree`: The `toml` tree to parse
- fn parse(tree: &toml::Table) -> Result<C>;
-}
-
-/// Parse a required key, returning a appropriate error message, if the key can't be found in the
-/// configuration.
-///
-/// # Arguments
-/// * `subtree`: The `toml` tree to parse.
-/// * `key`: The key to look for.
-/// * `group`: The group, this (sub-) tree is associated with.
-pub fn get_required_key<D>(subtree: &toml::Value, key: &str, group: &str)
- -> Result<D> where D: ParseTomlValue {
- let value = try!(subtree.lookup(key)
- .ok_or(format!("Missing required key \"{}\" in \"{}\"",
- key, group)));
- ParseTomlValue::parse(value, key, group)
-}
-
-/// Parse a optional key, returning None if it can't be found.
-///
-/// This basically does a `Option<Result>` to `Result<Option>` translation
-///
-/// # Arguments
-/// * `subtree`: The `toml` tree to parse.
-/// * `key`: The key to look for.
-/// * `group`: The group, this (sub-) tree is associated with.
-pub fn get_optional_key<D>(subtree: &toml::Value, key: &str, group: &str)
- -> Result<Option<D>> where D: ParseTomlValue {
- match subtree.lookup(key) {
- Some(val) => {
- Ok(Some(try!(ParseTomlValue::parse(val, key, group))))
- },
- None => Ok(None)
- }
-}
-
-/// Trait that provides a interface for parsing a single `toml` value.
-pub trait ParseTomlValue {
- /// Parse a `String` from a `toml` value. Returns the parsed value on success or a error
- /// message on failure.
- ///
- /// # Arguments
- /// * `val`: The `toml` value to parse.
- /// * `key`: The key this value is associated with.
- /// * `group`: The group, this (sub-) tree is associated with.
- fn parse(val: &toml::Value, key: &str, group: &str) -> Result<Self> where Self: Sized;
-}
-
-impl ParseTomlValue for String {
- fn parse(val: &toml::Value, key: &str, group: &str)
- -> Result<String> {
- val.as_str().map(|s| s.to_string())
- .ok_or(format!("Key \"{}\" in \"{}\" is not a string", key, group))
- }
-}
-
-impl ParseTomlValue for i32 {
- fn parse(val: &toml::Value, key: &str, group: &str)
- -> Result<i32> {
- val.as_integer().map(|i| i as i32)
- .ok_or(format!("Key \"{}\" in \"{}\" is not a integer", key, group))
- }
-}
-
-impl ParseTomlValue for i64 {
- fn parse(val: &toml::Value, key: &str, group: &str)
- -> Result<i64> {
- val.as_integer()
- .ok_or(format!("Key \"{}\" in \"{}\" is not a integer", key, group))
- }
-}
-
-/// Helper function to format a `toml::Parser` error message to the format used in this
-/// implementation. This is only safe to call if the `parser` is associated with a *real* file on
-/// disk.
-///
-/// # Arguments
-/// * `parser`: Pointer to the `toml::Parser`, that produced a error.
-#[cfg(not(test))]
-pub fn format_parser_error(parser: &toml::Parser) -> String {
- let linecol = parser.to_linecol(0);
- format!("parse error: {}:{}: {:?}", linecol.0, linecol.1, parser.errors)
-}
-
-/// Helper function to format a `toml::Parser` error message to the format used in this
-/// implementation. This version is always safe to call, but doesn't print the line and column
-/// where the error was encountered.
-///
-/// # Arguments
-/// * `parser`: Pointer to the `toml::Parser`, that produced a error.
-#[cfg(test)]
-pub fn format_parser_error(parser: &toml::Parser) -> String {
- format!("parse error: {:?}", parser.errors)
-}
-
-/// Helper function to copy anything that implements `Display` to a `String`.
-pub fn stringify<T>(e: T) -> String
- where T: fmt::Display {
- format!("{}", e)
-}
-
-/// Reads the provided `tree` as a `toml::Table`. Returns the `toml::Table` on success or a error
-/// message on failure.
-///
-/// # Arguments
-/// * `tree`: Pointer to a `str`, that holds a toml configuration.
-#[cfg(test)]
-pub fn read_tree(tree: &str) -> Result<toml::Table> {
- let mut parser = toml::Parser::new(tree);
- parser.parse().ok_or(format_parser_error(&parser))
-}
diff --git a/src/configuration/configuration.rs b/src/configuration/configuration.rs
deleted file mode 100644
index c2c0416..0000000
--- a/src/configuration/configuration.rs
+++ /dev/null
@@ -1,176 +0,0 @@
-//! Main logic for parsing the configuration file
-
-use toml;
-use std::io::prelude::*;
-use std::path::PathBuf;
-use std::fs::OpenOptions;
-use std::env;
-
-use super::common::{ConfTreeParser, format_parser_error, stringify, Result};
-use super::client::ClientConfiguration;
-use super::dbus::DBusConfiguration;
-
-/// Type to encode the full configuration.
-#[derive(Clone)]
-pub struct Configuration {
- /// The `client` section of the configuration
- pub client: ClientConfiguration,
- /// The `dbus` section of the configuration
- pub dbus: DBusConfiguration
-}
-
-impl Configuration {
- /// Try to read the configuration from the provided path and parse it into a `Configuration`
- /// object. Returns the parsed `Configuration` on success or the first error message
- /// encountered while reading or parsing the configuration file.
- ///
- /// # Arguments
- /// * `path`: Path to the location of the configuration file.
- pub fn read(path: &str) -> Result<Configuration> {
- let path = PathBuf::from(path);
- let mut f = try!(OpenOptions::new().open(path).map_err(stringify));
- let mut buf = Vec::new();
- try!(f.read_to_end(&mut buf).map_err(stringify));
- let data = try!(String::from_utf8(buf).map_err(stringify));
- Configuration::parse(&data)
- }
-
- /// Try to parse the given string to a `Configuration`.
- ///
- /// # Arguments
- /// * `conf`: The configuration to parse.
- pub fn parse(conf: &str) -> Result<Configuration> {
- let mut parser = toml::Parser::new(conf);
- let tree = try!(parser.parse().ok_or(format_parser_error(&parser)));
-
- let client = try!(ClientConfiguration::parse(&tree));
- let dbus = try!(DBusConfiguration::parse(&tree));
-
- Ok(Configuration {
- client: client,
- dbus: dbus
- })
- }
-
- /// Try to find the configuration file in different paths. First
- /// `$XDG_CONFIG_HOME/sota/client.toml` is tried, then `$HOME/.sota/client.toml` returns
- /// `$PWD/.sota/client.toml` if none of the above can be found. The case where this file also
- /// doesn't exist should be handled by [`read`](#method.read).
- pub fn default_path() -> String {
- match env::var_os("XDG_CONFIG_HOME")
- .and_then(|s| s.into_string().ok()) {
- Some(val) => { return val + "/sota/client.toml"; },
- None => { error!("$XDG_CONFIG_HOME is not set"); }
- }
-
- match env::var_os("HOME").and_then(|s| s.into_string().ok()) {
- Some(val) => {
- warn!("Falling back to $HOME/.config");
- return val + "/.sota/client.toml";
- },
- None => { error!("$HOME is not set"); }
- }
-
- warn!("Falling back to $PWD");
- ".sota/client.toml".to_string()
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
- use std::env;
- use configuration::client;
- use configuration::dbus;
-
- #[test]
- fn it_uses_fallbacks_for_its_configuration() {
- test_init!();
- env::remove_var("XDG_CONFIG_HOME");
- env::set_var("XDG_CONFIG_HOME", "/some/thing");
- assert_eq!(Configuration::default_path(),
- "/some/thing/sota/client.toml".to_string());
- env::remove_var("XDG_CONFIG_HOME");
- env::remove_var("HOME");
- env::set_var("HOME", "/some/thing");
- assert_eq!(Configuration::default_path(),
- "/some/thing/.sota/client.toml".to_string());
- env::remove_var("XDG_CONFIG_HOME");
- env::remove_var("HOME");
- assert_eq!(Configuration::default_path(),
- ".sota/client.toml".to_string());
- }
-
- #[test]
- fn it_correctly_parses_a_valid_configuration() {
- test_init!();
- let data = format!("{}\n{}",
- client::gen_valid_conf(),
- dbus::gen_valid_conf());
-
- let configuration = Configuration::parse(&data).unwrap();
- assert!(client::assert_conf(&configuration.client));
- assert!(dbus::assert_conf(&configuration.dbus));
- }
-
- #[test]
- fn it_ignores_extra_keys() {
- test_init!();
- let data = format!(r#"
- {}
- test_key = "hello world"
-
- {}
- test_key = "see ya world"
- "#, client::gen_valid_conf(),
- dbus::gen_valid_conf());
-
- let configuration = Configuration::parse(&data).unwrap();
- assert!(client::assert_conf(&configuration.client));
- assert!(dbus::assert_conf(&configuration.dbus));
- }
-
- #[test]
- fn it_ignores_extra_groups() {
- test_init!();
- let data = format!(r#"
- {}
-
- {}
-
- [test]
- test_key = "hello world"
- "#, client::gen_valid_conf(),
- dbus::gen_valid_conf());
-
- let configuration = Configuration::parse(&data).unwrap();
- assert!(client::assert_conf(&configuration.client));
- assert!(dbus::assert_conf(&configuration.dbus));
- }
-
- #[test]
- fn it_requires_the_client_group() {
- test_init!();
- let data = format!("{}", dbus::gen_valid_conf());
- match Configuration::parse(&data) {
- Ok(..) => panic!("Accepted invalid configuration!"),
- Err(e) => {
- assert_eq!(e, "Missing required subgroup \"client\""
- .to_string());
- }
- };
- }
-
- #[test]
- fn it_requires_the_dbus_group() {
- test_init!();
- let data = format!("{}", client::gen_valid_conf());
- match Configuration::parse(&data) {
- Ok(..) => panic!("Accepted invalid configuration!"),
- Err(e) => {
- assert_eq!(e, "Missing required subgroup \"dbus\""
- .to_string());
- }
- };
- }
-}
diff --git a/src/configuration/dbus.rs b/src/configuration/dbus.rs
deleted file mode 100644
index 83a06dc..0000000
--- a/src/configuration/dbus.rs
+++ /dev/null
@@ -1,155 +0,0 @@
-//! Handles the `dbus` section of the configuration file.
-
-use toml;
-
-use super::common::{get_required_key, get_optional_key, ConfTreeParser, Result};
-
-/// Type to encode allowed keys for the `dbus` section of the configuration.
-#[derive(Clone)]
-pub struct DBusConfiguration {
- /// The DBus name that sota_client registers.
- pub name: String,
- /// The DBus path that sota_client registers.
- pub path: String,
- /// The interface name that sota_client provides.
- pub interface: String,
- /// The name and interface, where the software loading manager can be reached.
- pub software_manager: String,
- /// The name and interface, where the software loading manager can be reached.
- pub software_manager_path: String,
- /// Time to wait for installation of a package before it is considered a failure. In seconds.
- pub timeout: i32 // dbus-rs expects a signed int
-}
-
-#[cfg(test)]
-impl DBusConfiguration {
- /// Generate a test configuration.
- pub fn gen_test() -> DBusConfiguration {
- DBusConfiguration {
- name: "org.test.test".to_string(),
- path: "org.test.test".to_string(),
- interface: "org.test.test".to_string(),
- software_manager: "org.test.software_manager".to_string(),
- software_manager_path: "org.test.software_manager".to_string(),
- timeout: 20
- }
- }
-}
-
-impl ConfTreeParser<DBusConfiguration> for DBusConfiguration {
- fn parse(tree: &toml::Table) -> Result<DBusConfiguration> {
- let dbus_tree = try!(tree.get("dbus")
- .ok_or("Missing required subgroup \"dbus\""));
- let name = try!(get_required_key(dbus_tree, "name", "dbus"));
- let path = try!(get_required_key(dbus_tree, "path", "dbus"));
- let interface = try!(get_required_key(dbus_tree, "interface", "dbus"));
- let software_manager = try!(get_required_key(dbus_tree, "software_manager", "dbus"));
- let software_manager_path = try!(get_required_key(dbus_tree, "software_manager_path", "dbus"));
- let timeout = try!(get_optional_key(dbus_tree, "timeout", "dbus"));
-
- Ok(DBusConfiguration {
- name: name,
- path: path,
- interface: interface,
- software_manager: software_manager,
- software_manager_path: software_manager_path,
- timeout: timeout.unwrap_or(60) * 1000
- })
- }
-}
-
-#[cfg(test)] static NAME: &'static str = "org.genivi.sota_client";
-#[cfg(test)] static PATH: &'static str = "/org/genivi/sota_client";
-#[cfg(test)] static INTERFACE: &'static str = "org.genivi.software_manager";
-#[cfg(test)] static SOFTWARE_MANAGER: &'static str = "org.genivi.software_manager";
-#[cfg(test)] static SOFTWARE_MANAGER_PATH: &'static str = "/org/genivi/software_manager";
-
-#[cfg(test)]
-pub fn gen_valid_conf() -> String {
- format!(r#"
- [dbus]
- name = "{}"
- path = "{}"
- interface = "{}"
- software_manager = "{}"
- software_manager_path = "{}"
- "#, NAME, PATH, INTERFACE, SOFTWARE_MANAGER, SOFTWARE_MANAGER_PATH)
-}
-
-#[cfg(test)]
-pub fn assert_conf(conf: &DBusConfiguration) -> bool {
- assert_eq!(&conf.name, NAME);
- assert_eq!(&conf.path, PATH);
- assert_eq!(&conf.interface, INTERFACE);
- assert_eq!(&conf.software_manager, SOFTWARE_MANAGER);
- assert_eq!(&conf.software_manager_path, SOFTWARE_MANAGER_PATH);
- true
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
- use super::{NAME, PATH, INTERFACE, SOFTWARE_MANAGER};
- use configuration::common::{ConfTreeParser, read_tree};
-
- #[test]
- fn it_requires_the_dbus_name_key() {
- test_init!();
- let data = format!(r#"
- [dbus]
- interface = "{}"
- software_manager = "{}"
- "#, INTERFACE, SOFTWARE_MANAGER);
-
- let tree = read_tree(&data).unwrap();
- match DBusConfiguration::parse(&tree) {
- Ok(..) => panic!("Accepted invalid configuration!"),
- Err(e) => {
- assert_eq!(e,
- "Missing required key \"name\" in \"dbus\""
- .to_string());
- }
- };
- }
-
- #[test]
- fn it_requires_the_dbus_interface_key() {
- test_init!();
- let data = format!(r#"
- [dbus]
- name = "{}"
- path = "{}"
- software_manager = "{}"
- "#, NAME, PATH, SOFTWARE_MANAGER);
-
- let tree = read_tree(&data).unwrap();
- match DBusConfiguration::parse(&tree) {
- Ok(..) => panic!("Accepted invalid configuration!"),
- Err(e) => {
- assert_eq!(e,
- "Missing required key \"interface\" in \"dbus\""
- .to_string());
- }
- };
- }
-
- #[test]
- fn it_requires_the_dbus_software_manager_key() {
- test_init!();
- let data = format!(r#"
- [dbus]
- name = "{}"
- path = "{}"
- interface = "{}"
- "#, NAME, PATH, INTERFACE);
-
- let tree = read_tree(&data).unwrap();
- match DBusConfiguration::parse(&tree) {
- Ok(..) => panic!("Accepted invalid configuration!"),
- Err(e) => {
- assert_eq!(e, "Missing required key \"software_manager\" \
- in \"dbus\"".to_string());
- }
- };
- }
-}
diff --git a/src/configuration/mod.rs b/src/configuration/mod.rs
deleted file mode 100644
index e2a986f..0000000
--- a/src/configuration/mod.rs
+++ /dev/null
@@ -1,12 +0,0 @@
-//! Parsing of the configuration file of `sota_client`.
-//!
-//! Also see the documentation for [`toml`](../../toml/index.html).
-
-mod configuration;
-mod common;
-mod client;
-mod dbus;
-
-pub use self::configuration::Configuration;
-pub use self::client::ClientConfiguration;
-pub use self::dbus::DBusConfiguration;
diff --git a/src/datatype/auth.rs b/src/datatype/auth.rs
new file mode 100644
index 0000000..cbfd097
--- /dev/null
+++ b/src/datatype/auth.rs
@@ -0,0 +1,49 @@
+use std::borrow::Cow;
+
+
+/// The available authentication types for communicating with the Auth server.
+#[derive(Clone, Debug)]
+pub enum Auth {
+ None,
+ Credentials(ClientId, ClientSecret),
+ Token(AccessToken),
+}
+
+impl<'a> Into<Cow<'a, Auth>> for Auth {
+ fn into(self) -> Cow<'a, Auth> {
+ Cow::Owned(self)
+ }
+}
+
+
+/// For storage of the returned access token data following a successful
+/// authentication.
+#[derive(RustcDecodable, Debug, PartialEq, Clone, Default)]
+pub struct AccessToken {
+ pub access_token: String,
+ pub token_type: String,
+ pub expires_in: i32,
+ pub scope: String
+}
+
+impl<'a> Into<Cow<'a, AccessToken>> for AccessToken {
+ fn into(self) -> Cow<'a, AccessToken> {
+ Cow::Owned(self)
+ }
+}
+
+
+/// Encapsulates a `String` type for use in `Auth::Credentials`
+#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
+pub struct ClientId(pub String);
+
+/// Encapsulates a `String` type for use in `Auth::Credentials`
+#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
+pub struct ClientSecret(pub String);
+
+/// Encapsulates the client id and secret used during authentication.
+#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
+pub struct ClientCredentials {
+ pub client_id: ClientId,
+ pub client_secret: ClientSecret,
+}
diff --git a/src/datatype/command.rs b/src/datatype/command.rs
new file mode 100644
index 0000000..d449bb6
--- /dev/null
+++ b/src/datatype/command.rs
@@ -0,0 +1,306 @@
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::str;
+use std::str::FromStr;
+
+use nom::{IResult, space, eof};
+use datatype::{ClientCredentials, ClientId, ClientSecret, DownloadComplete, Error,
+ InstalledSoftware, Package, UpdateReport, UpdateRequestId,
+ UpdateResultCode};
+
+
+/// System-wide commands that are sent to the interpreter.
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
+pub enum Command {
+ /// Authenticate with the auth server.
+ Authenticate(Option<ClientCredentials>),
+ /// Shutdown the client immediately.
+ Shutdown,
+
+ /// Check for any new updates.
+ GetNewUpdates,
+ /// List the installed packages on the system.
+ ListInstalledPackages,
+ /// Get the latest system information, and optionally publish it to Core.
+ RefreshSystemInfo(bool),
+
+ /// Start downloading one or more updates.
+ StartDownload(Vec<UpdateRequestId>),
+ /// Start installing an update
+ StartInstall(DownloadComplete),
+
+ /// Send a list of packages to the Core server.
+ SendInstalledPackages(Vec<Package>),
+ /// Send a list of all packages and firmware to the Core server.
+ SendInstalledSoftware(InstalledSoftware),
+ /// Send a package update report to the Core server.
+ SendUpdateReport(UpdateReport),
+}
+
+impl Display for Command {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "{:?}", self)
+ }
+}
+
+impl FromStr for Command {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<Command, Error> {
+ match command(s.as_bytes()) {
+ IResult::Done(_, cmd) => parse_arguments(cmd.0, cmd.1.clone()),
+ _ => Err(Error::Command(format!("bad command: {}", s)))
+ }
+ }
+}
+
+
+named!(command <(Command, Vec<&str>)>, chain!(
+ space?
+ ~ cmd: alt!(
+ alt_complete!(tag!("Authenticate") | tag!("auth"))
+ => { |_| Command::Authenticate(None) }
+ | alt_complete!(tag!("GetNewUpdates") | tag!("new"))
+ => { |_| Command::GetNewUpdates }
+ | alt_complete!(tag!("ListInstalledPackages") | tag!("ls"))
+ => { |_| Command::ListInstalledPackages }
+ | alt_complete!(tag!("RefreshSystemInfo") | tag!("info"))
+ => { |_| Command::RefreshSystemInfo(false) }
+ | alt_complete!(tag!("Shutdown") | tag!("shutdown"))
+ => { |_| Command::Shutdown }
+ | alt_complete!(tag!("SendInstalledPackages") | tag!("sendpack"))
+ => { |_| Command::SendInstalledPackages(Vec::new()) }
+ | alt_complete!(tag!("SendInstalledSoftware") | tag!("sendinst"))
+ => { |_| Command::SendInstalledSoftware(InstalledSoftware::default()) }
+ | alt_complete!(tag!("SendUpdateReport") | tag!("sendup"))
+ => { |_| Command::SendUpdateReport(UpdateReport::default()) }
+ | alt_complete!(tag!("StartDownload") | tag!("dl"))
+ => { |_| Command::StartDownload(Vec::new()) }
+ | alt_complete!(tag!("StartInstall") | tag!("inst"))
+ => { |_| Command::StartInstall(DownloadComplete::default()) }
+ )
+ ~ args: arguments
+ ~ alt!(eof | tag!("\r") | tag!("\n") | tag!(";")),
+ move || { (cmd, args) }
+));
+
+named!(arguments <&[u8], Vec<&str> >, chain!(
+ args: many0!(chain!(
+ space?
+ ~ text: map_res!(is_not!(" \t\r\n;"), str::from_utf8)
+ ~ space?,
+ || { text }
+ )),
+ move || {
+ args.into_iter()
+ .filter(|arg| arg.len() > 0)
+ .collect()
+ }
+));
+
+fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
+ match cmd {
+ Command::Authenticate(_) => match args.len() {
+ 0 => Ok(Command::Authenticate(None)),
+ 1 => Err(Error::Command("usage: auth <client-id> <client-secret>".to_string())),
+ 2 => Ok(Command::Authenticate(Some(ClientCredentials {
+ client_id: ClientId(args[0].to_string()),
+ client_secret: ClientSecret(args[1].to_string())}))),
+ _ => Err(Error::Command(format!("unexpected Authenticate args: {:?}", args))),
+ },
+
+ Command::GetNewUpdates => match args.len() {
+ 0 => Ok(Command::GetNewUpdates),
+ _ => Err(Error::Command(format!("unexpected GetNewUpdates args: {:?}", args))),
+ },
+
+ Command::ListInstalledPackages => match args.len() {
+ 0 => Ok(Command::ListInstalledPackages),
+ _ => Err(Error::Command(format!("unexpected ListInstalledPackages args: {:?}", args))),
+ },
+
+ Command::RefreshSystemInfo(_) => match args.len() {
+ 0 => Ok(Command::RefreshSystemInfo(false)),
+ 1 => {
+ if let Ok(b) = args[0].parse::<bool>() {
+ Ok(Command::RefreshSystemInfo(b))
+ } else {
+ Err(Error::Command("couldn't parse 1st argument as boolean".to_string()))
+ }
+ }
+ _ => Err(Error::Command(format!("unexpected RefreshSystemInfo args: {:?}", args))),
+ },
+
+ Command::SendInstalledPackages(_) => match args.len() {
+ 0 | 1 => Err(Error::Command("usage: sendpack (<name> <version> )+".to_string())),
+ n if n % 2 == 0 => {
+ let (names, versions): (Vec<(_, &str)>, Vec<(_, &str)>) =
+ args.into_iter().enumerate().partition(|&(n, _)| n % 2 == 0);
+ let packages = names.into_iter().zip(versions.into_iter())
+ .map(|((_, name), (_, version))| Package {
+ name: name.to_string(),
+ version: version.to_string()
+ }).collect::<Vec<Package>>();
+ Ok(Command::SendInstalledPackages(packages))
+ }
+ _ => Err(Error::Command(format!("SendInstalledPackages expects an even number of 'name version' pairs"))),
+ },
+
+ Command::SendInstalledSoftware(_) => match args.len() {
+ // FIXME(PRO-1160): args
+ _ => Err(Error::Command(format!("unexpected SendInstalledSoftware args: {:?}", args))),
+ },
+
+ Command::SendUpdateReport(_) => match args.len() {
+ 0 | 1 => Err(Error::Command("usage: sendup <update-id> <result-code>".to_string())),
+ 2 => {
+ if let Ok(code) = args[1].parse::<UpdateResultCode>() {
+ Ok(Command::SendUpdateReport(UpdateReport::single(args[0].to_string(), code, "".to_string())))
+ } else {
+ Err(Error::Command("couldn't parse 2nd argument as an UpdateResultCode".to_string()))
+ }
+ }
+ _ => Err(Error::Command(format!("unexpected SendUpdateReport args: {:?}", args))),
+ },
+
+ Command::Shutdown => match args.len() {
+ 0 => Ok(Command::Shutdown),
+ _ => Err(Error::Command(format!("unexpected Shutdown args: {:?}", args))),
+ },
+
+ Command::StartDownload(_) => match args.len() {
+ 0 => Err(Error::Command("usage: dl [<id>]".to_string())),
+ _ => Ok(Command::StartDownload(args.iter().map(|arg| String::from(*arg)).collect())),
+ },
+
+ Command::StartInstall(_) => match args.len() {
+ // FIXME(PRO-1160): args
+ _ => Err(Error::Command(format!("unexpected StartInstall args: {:?}", args))),
+ },
+
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::{command, arguments};
+ use datatype::{Command, ClientCredentials, ClientId, ClientSecret, Package,
+ UpdateReport, UpdateResultCode};
+ use nom::IResult;
+
+
+ #[test]
+ fn parse_command_test() {
+ assert_eq!(command(&b"auth foo bar"[..]),
+ IResult::Done(&b""[..], (Command::Authenticate(None), vec!["foo", "bar"])));
+ assert_eq!(command(&b"dl 1"[..]),
+ IResult::Done(&b""[..], (Command::StartDownload(Vec::new()), vec!["1"])));
+ assert_eq!(command(&b"ls;\n"[..]),
+ IResult::Done(&b"\n"[..], (Command::ListInstalledPackages, Vec::new())));
+ }
+
+ #[test]
+ fn parse_arguments_test() {
+ assert_eq!(arguments(&b"one"[..]), IResult::Done(&b""[..], vec!["one"]));
+ assert_eq!(arguments(&b"foo bar"[..]), IResult::Done(&b""[..], vec!["foo", "bar"]));
+ assert_eq!(arguments(&b"n=5"[..]), IResult::Done(&b""[..], vec!["n=5"]));
+ assert_eq!(arguments(&b""[..]), IResult::Done(&b""[..], Vec::new()));
+ assert_eq!(arguments(&b" \t some"[..]), IResult::Done(&b""[..], vec!["some"]));
+ assert_eq!(arguments(&b";"[..]), IResult::Done(&b";"[..], Vec::new()));
+ }
+
+
+ #[test]
+ fn authenticate_test() {
+ assert_eq!("Authenticate".parse::<Command>().unwrap(), Command::Authenticate(None));
+ assert_eq!("auth".parse::<Command>().unwrap(), Command::Authenticate(None));
+ assert_eq!("auth user pass".parse::<Command>().unwrap(),
+ Command::Authenticate(Some(ClientCredentials {
+ client_id: ClientId("user".to_string()),
+ client_secret: ClientSecret("pass".to_string()),
+ })));
+ assert!("auth one".parse::<Command>().is_err());
+ assert!("auth one two three".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn get_new_updates_test() {
+ assert_eq!("GetNewUpdates".parse::<Command>().unwrap(), Command::GetNewUpdates);
+ assert_eq!("new".parse::<Command>().unwrap(), Command::GetNewUpdates);
+ assert!("new old".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn list_installed_test() {
+ assert_eq!("ListInstalledPackages".parse::<Command>().unwrap(), Command::ListInstalledPackages);
+ assert_eq!("ls".parse::<Command>().unwrap(), Command::ListInstalledPackages);
+ assert!("ls some".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn refresh_system_info_test() {
+ assert_eq!("RefreshSystemInfo true".parse::<Command>().unwrap(), Command::RefreshSystemInfo(true));
+ assert_eq!("info".parse::<Command>().unwrap(), Command::RefreshSystemInfo(false));
+ assert!("RefreshSystemInfo 1 2".parse::<Command>().is_err());
+ assert!("info please".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn send_installed_packages_test() {
+ assert_eq!("SendInstalledPackages myname myversion".parse::<Command>().unwrap(),
+ Command::SendInstalledPackages(vec![Package {
+ name: "myname".to_string(),
+ version: "myversion".to_string()
+ }]));
+ assert_eq!("sendpack n1 v1 n2 v2".parse::<Command>().unwrap(),
+ Command::SendInstalledPackages(vec![Package {
+ name: "n1".to_string(),
+ version: "v1".to_string()
+ }, Package {
+ name: "n2".to_string(),
+ version: "v2".to_string()
+ }]));
+ assert!("SendInstalledPackages some".parse::<Command>().is_err());
+ assert!("sendpack 1 2 3".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn send_installed_software_test() {
+ assert!("SendInstalledSoftware".parse::<Command>().is_err());
+ assert!("sendsoft some".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn send_update_report_test() {
+ assert_eq!("SendUpdateReport myid OK".parse::<Command>().unwrap(), Command::SendUpdateReport(
+ UpdateReport::single("myid".to_string(), UpdateResultCode::OK, "".to_string())));
+ assert_eq!("sendup myid 19".parse::<Command>().unwrap(), Command::SendUpdateReport(
+ UpdateReport::single("myid".to_string(), UpdateResultCode::GENERAL_ERROR, "".to_string())));
+ assert!("sendup myid 20".parse::<Command>().is_err());
+ assert!("SendInstalledPackages".parse::<Command>().is_err());
+ assert!("sendup 1 2 3".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn shutdown_test() {
+ assert_eq!("Shutdown".parse::<Command>().unwrap(), Command::Shutdown);
+ assert_eq!("shutdown".parse::<Command>().unwrap(), Command::Shutdown);
+ assert!("Shutdown 1 2".parse::<Command>().is_err());
+ assert!("shutdown now".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn start_download_test() {
+ assert_eq!("StartDownload this".parse::<Command>().unwrap(),
+ Command::StartDownload(vec!["this".to_string()]));
+ assert_eq!("dl some more".parse::<Command>().unwrap(),
+ Command::StartDownload(vec!["some".to_string(), "more".to_string()]));
+ assert!("dl".parse::<Command>().is_err());
+ }
+
+ #[test]
+ fn start_install_test() {
+ assert!("StartInstall".parse::<Command>().is_err());
+ assert!("inst more than one".parse::<Command>().is_err());
+ }
+}
diff --git a/src/datatype/config.rs b/src/datatype/config.rs
new file mode 100644
index 0000000..5db0f12
--- /dev/null
+++ b/src/datatype/config.rs
@@ -0,0 +1,389 @@
+use rustc_serialize::Decodable;
+use std::fs;
+use std::fs::File;
+use std::io::ErrorKind;
+use std::os::unix::fs::PermissionsExt;
+use std::io::prelude::*;
+use std::path::Path;
+use toml;
+use toml::{Decoder, Parser, Table, Value};
+
+use datatype::{Error, SystemInfo, Url};
+use package_manager::PackageManager;
+
+
+/// An aggregation of all the configuration options parsed at startup.
+#[derive(Default, PartialEq, Eq, Debug, Clone)]
+pub struct Config {
+ pub auth: Option<AuthConfig>,
+ pub core: CoreConfig,
+ pub dbus: Option<DBusConfig>,
+ pub device: DeviceConfig,
+ pub gateway: GatewayConfig,
+ pub network: NetworkConfig,
+ pub rvi: Option<RviConfig>,
+}
+
+impl Config {
+ pub fn load(path: &str) -> Result<Config, Error> {
+ info!("Loading config file: {}", path);
+ let mut file = try!(File::open(path).map_err(Error::Io));
+ let mut toml = String::new();
+ try!(file.read_to_string(&mut toml));
+ Config::parse(&toml)
+ }
+
+ pub fn parse(toml: &str) -> Result<Config, Error> {
+ let table = try!(parse_table(&toml));
+
+ let auth_cfg = if let Some(auth) = table.get("auth") {
+ let parsed = try!(decode_section(auth.clone()));
+ Some(try!(bootstrap_credentials(parsed)))
+ } else {
+ None
+ };
+
+ let dbus_cfg = if let Some(dbus) = table.get("dbus") {
+ Some(try!(decode_section(dbus.clone())))
+ } else {
+ None
+ };
+
+ let rvi_cfg = if let Some(rvi) = table.get("rvi") {
+ Some(try!(decode_section(rvi.clone())))
+ } else {
+ None
+ };
+
+ Ok(Config {
+ auth: auth_cfg,
+ core: try!(read_section(&table, "core")),
+ dbus: dbus_cfg,
+ device: try!(read_section(&table, "device")),
+ gateway: try!(read_section(&table, "gateway")),
+ network: try!(read_section(&table, "network")),
+ rvi: rvi_cfg,
+ })
+ }
+}
+
+fn parse_table(toml: &str) -> Result<Table, Error> {
+ let mut parser = Parser::new(toml);
+ Ok(try!(parser.parse().ok_or_else(move || parser.errors)))
+}
+
+fn read_section<T: Decodable>(table: &Table, section: &str) -> Result<T, Error> {
+ let part = try!(table.get(section)
+ .ok_or_else(|| Error::Parse(format!("invalid section: {}", section))));
+ decode_section(part.clone())
+}
+
+fn decode_section<T: Decodable>(section: Value) -> Result<T, Error> {
+ let mut decoder = Decoder::new(section);
+ Ok(try!(T::decode(&mut decoder)))
+}
+
+
+#[derive(RustcEncodable, RustcDecodable)]
+struct CredentialsFile {
+ pub client_id: String,
+ pub client_secret: String,
+}
+
+// Read AuthConfig values from the credentials file if it exists, or write the
+// current AuthConfig values to a new credentials file otherwise.
+fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
+ let creds = auth_cfg.credentials_file.clone();
+ let path = Path::new(&creds);
+ debug!("bootstrap_credentials: {:?}", path);
+
+ let credentials = match File::open(path) {
+ Ok(mut file) => {
+ let mut text = String::new();
+ try!(file.read_to_string(&mut text));
+ let table = try!(parse_table(&text));
+ try!(read_section::<CredentialsFile>(&table, "auth"))
+ }
+
+ Err(ref err) if err.kind() == ErrorKind::NotFound => {
+ let mut table = Table::new();
+ let credentials = CredentialsFile {
+ client_id: auth_cfg.client_id,
+ client_secret: auth_cfg.client_secret
+ };
+ table.insert("auth".to_string(), toml::encode(&credentials));
+
+ let dir = try!(path.parent().ok_or(Error::Parse("Invalid credentials file path".to_string())));
+ try!(fs::create_dir_all(&dir));
+ let mut file = try!(File::create(path));
+ let mut perms = try!(file.metadata()).permissions();
+ perms.set_mode(0o600);
+ try!(fs::set_permissions(path, perms));
+ try!(file.write_all(&toml::encode_str(&table).into_bytes()));
+
+ credentials
+ }
+
+ Err(err) => return Err(Error::Io(err))
+ };
+
+ Ok(AuthConfig {
+ server: auth_cfg.server,
+ client_id: credentials.client_id,
+ client_secret: credentials.client_secret,
+ credentials_file: auth_cfg.credentials_file,
+ })
+}
+
+
+/// A parsed representation of the [auth] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct AuthConfig {
+ pub server: Url,
+ pub client_id: String,
+ pub client_secret: String,
+ pub credentials_file: String,
+}
+
+impl Default for AuthConfig {
+ fn default() -> AuthConfig {
+ AuthConfig {
+ server: "http://127.0.0.1:9001".parse().unwrap(),
+ client_id: "client-id".to_string(),
+ client_secret: "client-secret".to_string(),
+ credentials_file: "/tmp/sota_credentials.toml".to_string(),
+ }
+ }
+}
+
+
+/// A parsed representation of the [core] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct CoreConfig {
+ pub server: Url
+}
+
+impl Default for CoreConfig {
+ fn default() -> CoreConfig {
+ CoreConfig {
+ server: "http://127.0.0.1:8080".parse().unwrap()
+ }
+ }
+}
+
+
+/// A parsed representation of the [dbus] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct DBusConfig {
+ pub name: String,
+ pub path: String,
+ pub interface: String,
+ pub software_manager: String,
+ pub software_manager_path: String,
+ pub timeout: i32, // dbus-rs expects a signed int
+}
+
+impl Default for DBusConfig {
+ fn default() -> DBusConfig {
+ DBusConfig {
+ name: "org.genivi.SotaClient".to_string(),
+ path: "/org/genivi/SotaClient".to_string(),
+ interface: "org.genivi.SotaClient".to_string(),
+ software_manager: "org.genivi.SoftwareLoadingManager".to_string(),
+ software_manager_path: "/org/genivi/SoftwareLoadingManager".to_string(),
+ timeout: 60
+ }
+ }
+}
+
+
+/// A parsed representation of the [device] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct DeviceConfig {
+ pub uuid: String,
+ pub vin: String,
+ pub packages_dir: String,
+ pub package_manager: PackageManager,
+ pub system_info: SystemInfo,
+ pub polling_interval: u64,
+ pub certificates_path: String,
+}
+
+impl Default for DeviceConfig {
+ fn default() -> DeviceConfig {
+ DeviceConfig {
+ uuid: "123e4567-e89b-12d3-a456-426655440000".to_string(),
+ vin: "V1234567890123456".to_string(),
+ packages_dir: "/tmp/".to_string(),
+ package_manager: PackageManager::Deb,
+ system_info: SystemInfo::default(),
+ polling_interval: 10,
+ certificates_path: "/tmp/sota_certificates".to_string()
+ }
+ }
+}
+
+
+/// A parsed representation of the [gateway] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct GatewayConfig {
+ pub console: bool,
+ pub dbus: bool,
+ pub http: bool,
+ pub rvi: bool,
+ pub socket: bool,
+ pub websocket: bool,
+}
+
+impl Default for GatewayConfig {
+ fn default() -> GatewayConfig {
+ GatewayConfig {
+ console: false,
+ dbus: false,
+ http: false,
+ rvi: false,
+ socket: false,
+ websocket: true,
+ }
+ }
+}
+
+
+/// A parsed representation of the [network] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct NetworkConfig {
+ pub http_server: String,
+ pub rvi_edge_server: String,
+ pub socket_commands_path: String,
+ pub socket_events_path: String,
+ pub websocket_server: String
+}
+
+impl Default for NetworkConfig {
+ fn default() -> NetworkConfig {
+ NetworkConfig {
+ http_server: "http://127.0.0.1:8888".to_string(),
+ rvi_edge_server: "http://127.0.0.1:9080".to_string(),
+ socket_commands_path: "/tmp/sota-commands.socket".to_string(),
+ socket_events_path: "/tmp/sota-events.socket".to_string(),
+ websocket_server: "ws://127.0.0.1:3012".to_string()
+ }
+ }
+}
+
+
+/// A parsed representation of the [rvi] configuration section.
+#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
+pub struct RviConfig {
+ pub client: Url,
+ pub storage_dir: String,
+ pub timeout: Option<i64>,
+}
+
+impl Default for RviConfig {
+ fn default() -> RviConfig {
+ RviConfig {
+ client: "http://127.0.0.1:8901".parse().unwrap(),
+ storage_dir: "/var/sota".to_string(),
+ timeout: Some(20),
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+
+ const AUTH_CONFIG: &'static str =
+ r#"
+ [auth]
+ server = "http://127.0.0.1:9001"
+ client_id = "client-id"
+ client_secret = "client-secret"
+ credentials_file = "/tmp/sota_credentials.toml"
+ "#;
+
+ const CORE_CONFIG: &'static str =
+ r#"
+ [core]
+ server = "http://127.0.0.1:8080"
+ "#;
+
+ const DBUS_CONFIG: &'static str =
+ r#"
+ [dbus]
+ name = "org.genivi.SotaClient"
+ path = "/org/genivi/SotaClient"
+ interface = "org.genivi.SotaClient"
+ software_manager = "org.genivi.SoftwareLoadingManager"
+ software_manager_path = "/org/genivi/SoftwareLoadingManager"
+ timeout = 60
+ "#;
+
+ const DEVICE_CONFIG: &'static str =
+ r#"
+ [device]
+ uuid = "123e4567-e89b-12d3-a456-426655440000"
+ vin = "V1234567890123456"
+ system_info = "system_info.sh"
+ polling_interval = 10
+ packages_dir = "/tmp/"
+ package_manager = "deb"
+ certificates_path = "/tmp/sota_certificates"
+ "#;
+
+ const GATEWAY_CONFIG: &'static str =
+ r#"
+ [gateway]
+ console = false
+ dbus = false
+ http = false
+ rvi = false
+ socket = false
+ websocket = true
+ "#;
+
+ const NETWORK_CONFIG: &'static str =
+ r#"
+ [network]
+ http_server = "http://127.0.0.1:8888"
+ rvi_edge_server = "http://127.0.0.1:9080"
+ socket_commands_path = "/tmp/sota-commands.socket"
+ socket_events_path = "/tmp/sota-events.socket"
+ websocket_server = "ws://127.0.0.1:3012"
+ "#;
+
+ const RVI_CONFIG: &'static str =
+ r#"
+ [rvi]
+ client = "http://127.0.0.1:8901"
+ storage_dir = "/var/sota"
+ timeout = 20
+ "#;
+
+
+ #[test]
+ fn parse_default_config() {
+ let config = String::new()
+ + CORE_CONFIG
+ + DEVICE_CONFIG
+ + GATEWAY_CONFIG
+ + NETWORK_CONFIG;
+ assert_eq!(Config::parse(&config).unwrap(), Config::default());
+ }
+
+ #[test]
+ fn parse_example_config() {
+ let config = String::new()
+ + AUTH_CONFIG
+ + CORE_CONFIG
+ + DBUS_CONFIG
+ + DEVICE_CONFIG
+ + GATEWAY_CONFIG
+ + NETWORK_CONFIG
+ + RVI_CONFIG;
+ assert_eq!(Config::load("tests/sota.toml").unwrap(), Config::parse(&config).unwrap());
+ }
+}
diff --git a/src/datatype/dbus.rs b/src/datatype/dbus.rs
new file mode 100644
index 0000000..a038568
--- /dev/null
+++ b/src/datatype/dbus.rs
@@ -0,0 +1,81 @@
+use dbus::{FromMessageItem, MessageItem};
+use toml::{decode, Table, Value};
+
+use datatype::update_report::{InstalledFirmware, InstalledPackage, OperationResult};
+
+
+static MISSING_ARG: &'static str = "Error.MissingArgument";
+static MALFORMED_ARG: &'static str = "Error.MalformedArgument";
+
+/// Format a `DBus` error message indicating a missing argument.
+pub fn missing_arg() -> (&'static str, String) {
+ (MISSING_ARG, "Missing argument".to_string())
+}
+
+/// Format a `DBus` error message indicating a malformed argument.
+pub fn malformed_arg() -> (&'static str, String) {
+ (MALFORMED_ARG, "Malformed argument".to_string())
+}
+
+
+struct DecodedValue(pub Value);
+
+impl<'m> FromMessageItem<'m> for DecodedValue {
+ fn from(m: &'m MessageItem) -> Result<Self, ()> {
+ match *m {
+ MessageItem::Str(ref b) => Ok(DecodedValue(Value::String(b.clone()))),
+ MessageItem::Bool(ref b) => Ok(DecodedValue(Value::Boolean(*b))),
+ MessageItem::Byte(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::Int16(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::Int32(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::Int64(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::UInt16(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::UInt32(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::UInt64(ref b) => Ok(DecodedValue(Value::Integer(*b as i64))),
+ MessageItem::Variant(ref b) => FromMessageItem::from(&**b),
+ _ => Err(())
+ }
+ }
+}
+
+
+struct DecodedStruct(pub Value);
+
+impl<'m> FromMessageItem<'m> for DecodedStruct {
+ fn from(item: &'m MessageItem) -> Result<Self, ()> {
+ let items: &Vec<MessageItem> = try!(FromMessageItem::from(item));
+ items.iter().map(|entry| {
+ let entry: Result<(&MessageItem, &MessageItem), ()> = FromMessageItem::from(entry);
+ entry.and_then(|(key, val)| {
+ let key: Result<&String,()> = FromMessageItem::from(key);
+ key.and_then(|key| {
+ let val: Result<DecodedValue,()> = FromMessageItem::from(val);
+ val.map(|val| (key.clone(), val.0))
+ })
+ })
+ }).collect::<Result<Vec<(_, _)>, ()>>()
+ .map(|arr| DecodedStruct(Value::Table(arr.into_iter().collect::<Table>())))
+ }
+}
+
+
+impl<'m> FromMessageItem<'m> for OperationResult {
+ fn from(item: &'m MessageItem) -> Result<Self, ()> {
+ let item: DecodedStruct = try!(FromMessageItem::from(item));
+ decode::<OperationResult>(item.0).ok_or(())
+ }
+}
+
+impl<'m> FromMessageItem<'m> for InstalledPackage {
+ fn from(item: &'m MessageItem) -> Result<Self, ()> {
+ let item: DecodedStruct = try!(FromMessageItem::from(item));
+ decode::<InstalledPackage>(item.0).ok_or(())
+ }
+}
+
+impl<'m> FromMessageItem<'m> for InstalledFirmware {
+ fn from(item: &'m MessageItem) -> Result<Self, ()> {
+ let item: DecodedStruct = try!(FromMessageItem::from(item));
+ decode::<InstalledFirmware>(item.0).ok_or(())
+ }
+}
diff --git a/src/datatype/error.rs b/src/datatype/error.rs
new file mode 100644
index 0000000..8267234
--- /dev/null
+++ b/src/datatype/error.rs
@@ -0,0 +1,119 @@
+use hyper::error::Error as HyperError;
+use hyper::client::ClientError as HyperClientError;
+use rustc_serialize::json::{EncoderError as JsonEncoderError,
+ DecoderError as JsonDecoderError,
+ ParserError as JsonParserError};
+use std::convert::From;
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::io::Error as IoError;
+use std::string::FromUtf8Error;
+use std::sync::PoisonError;
+use std::sync::mpsc::{SendError, RecvError};
+use toml::{ParserError as TomlParserError, DecodeError as TomlDecodeError};
+use url::ParseError as UrlParseError;
+
+use datatype::Event;
+use http::auth_client::AuthHandler;
+use gateway::Interpret;
+use ws::Error as WebsocketError;
+
+
+/// System-wide errors that are returned from `Result` type failures.
+#[derive(Debug)]
+pub enum Error {
+ Authorization(String),
+ Client(String),
+ Command(String),
+ FromUtf8(FromUtf8Error),
+ Hyper(HyperError),
+ HyperClient(HyperClientError<AuthHandler>),
+ Io(IoError),
+ JsonDecoder(JsonDecoderError),
+ JsonEncoder(JsonEncoderError),
+ JsonParser(JsonParserError),
+ Poison(String),
+ Package(String),
+ Parse(String),
+ Recv(RecvError),
+ SendEvent(SendError<Event>),
+ SendInterpret(SendError<Interpret>),
+ Socket(String),
+ SystemInfo(String),
+ TomlParser(Vec<TomlParserError>),
+ TomlDecode(TomlDecodeError),
+ UrlParse(UrlParseError),
+ Websocket(WebsocketError),
+}
+
+impl<E> From<PoisonError<E>> for Error {
+ fn from(e: PoisonError<E>) -> Error {
+ Error::Poison(format!("{}", e))
+ }
+}
+
+macro_rules! derive_from {
+ ([ $( $from: ident => $to: ident ),* ]) => {
+ $(impl From<$from> for Error {
+ fn from(e: $from) -> Error {
+ Error::$to(e)
+ }
+ })*
+ };
+
+ ([ $( $error: ident < $ty: ty > => $to: ident),* ]) => {
+ $(impl From<$error<$ty>> for Error {
+ fn from(e: $error<$ty>) -> Error {
+ Error::$to(e)
+ }
+ })*
+ }
+}
+
+derive_from!([
+ FromUtf8Error => FromUtf8,
+ HyperError => Hyper,
+ IoError => Io,
+ JsonEncoderError => JsonEncoder,
+ JsonDecoderError => JsonDecoder,
+ RecvError => Recv,
+ TomlDecodeError => TomlDecode,
+ UrlParseError => UrlParse,
+ WebsocketError => Websocket
+]);
+
+derive_from!([
+ HyperClientError<AuthHandler> => HyperClient,
+ SendError<Event> => SendEvent,
+ SendError<Interpret> => SendInterpret,
+ Vec<TomlParserError> => TomlParser
+]);
+
+impl Display for Error {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ let inner: String = match *self {
+ Error::Client(ref s) => format!("Http client error: {}", s.clone()),
+ Error::Authorization(ref s) => format!("Http client authorization error: {}", s.clone()),
+ Error::Command(ref e) => format!("Unknown Command: {}", e.clone()),
+ Error::FromUtf8(ref e) => format!("From utf8 error: {}", e.clone()),
+ Error::Hyper(ref e) => format!("Hyper error: {}", e.clone()),
+ Error::HyperClient(ref e) => format!("Hyper client error: {}", e.clone()),
+ Error::Io(ref e) => format!("IO error: {}", e.clone()),
+ Error::JsonDecoder(ref e) => format!("Failed to decode JSON: {}", e.clone()),
+ Error::JsonEncoder(ref e) => format!("Failed to encode JSON: {}", e.clone()),
+ Error::JsonParser(ref e) => format!("Failed to parse JSON: {}", e.clone()),
+ Error::Poison(ref e) => format!("Poison error: {}", e.clone()),
+ Error::Package(ref s) => format!("Package error: {}", s.clone()),
+ Error::Parse(ref s) => format!("Parse error: {}", s.clone()),
+ Error::Recv(ref s) => format!("Recv error: {}", s.clone()),
+ Error::SendEvent(ref s) => format!("Send error for Event: {}", s.clone()),
+ Error::SendInterpret(ref s) => format!("Send error for Interpret: {}", s.clone()),
+ Error::Socket(ref s) => format!("Unix Domain Socket error: {}", s.clone()),
+ Error::SystemInfo(ref s) => format!("System info error: {}", s.clone()),
+ Error::TomlDecode(ref e) => format!("Toml decode error: {}", e.clone()),
+ Error::TomlParser(ref e) => format!("Toml parser errors: {:?}", e.clone()),
+ Error::UrlParse(ref s) => format!("Url parse error: {}", s.clone()),
+ Error::Websocket(ref e) => format!("Websocket Error: {:?}", e.clone()),
+ };
+ write!(f, "{}", inner)
+ }
+}
diff --git a/src/datatype/event.rs b/src/datatype/event.rs
new file mode 100644
index 0000000..e3f84ca
--- /dev/null
+++ b/src/datatype/event.rs
@@ -0,0 +1,56 @@
+use std::fmt::{Display, Formatter, Result as FmtResult};
+
+use datatype::{DownloadComplete, Package, UpdateAvailable, UpdateReport,
+ UpdateRequestId};
+
+
+/// System-wide events that are broadcast to all interested parties.
+#[derive(RustcEncodable, RustcDecodable, Debug, Clone, PartialEq, Eq)]
+pub enum Event {
+ /// General error event with a printable representation for debugging.
+ Error(String),
+
+ /// Authentication was successful.
+ Authenticated,
+ /// An operation failed because we are not currently authenticated.
+ NotAuthenticated,
+
+ /// There are new updates available.
+ NewUpdatesReceived(Vec<UpdateRequestId>),
+ /// A notification from RVI of a new update.
+ NewUpdateAvailable(UpdateAvailable),
+ /// There are no new updates available.
+ NoNewUpdates,
+
+ /// The following packages are installed on the device.
+ FoundInstalledPackages(Vec<Package>),
+ /// An update on the system information was received.
+ FoundSystemInfo(String),
+ /// A list of installed packages was sent to the Core server.
+ InstalledPackagesSent,
+ /// An update report was sent to the Core server.
+ UpdateReportSent,
+
+ /// Downloading an update.
+ DownloadingUpdate(UpdateRequestId),
+ /// An update was downloaded.
+ DownloadComplete(DownloadComplete),
+ /// Downloading an update failed.
+ DownloadFailed(UpdateRequestId, String),
+
+ /// Installing an update.
+ InstallingUpdate(UpdateRequestId),
+ /// An update was installed.
+ InstallComplete(UpdateReport),
+ /// The installation of an update failed.
+ InstallFailed(UpdateReport),
+
+ /// A broadcast event requesting an update on externally installed software.
+ InstalledSoftwareNeeded,
+}
+
+impl Display for Event {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "{:?}", self)
+ }
+}
diff --git a/src/datatype/json_rpc.rs b/src/datatype/json_rpc.rs
new file mode 100644
index 0000000..3eed9a2
--- /dev/null
+++ b/src/datatype/json_rpc.rs
@@ -0,0 +1,107 @@
+use rustc_serialize::{json, Decodable, Encodable};
+use time;
+
+use http::{AuthClient, Client};
+use super::Url;
+
+
+/// Encode the body of a JSON-RPC call.
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct RpcRequest<E: Encodable> {
+ pub jsonrpc: String,
+ pub id: u64,
+ pub method: String,
+ pub params: E
+}
+
+impl<E: Encodable> RpcRequest<E> {
+ /// Instantiate a new `RpcRequest` with the default version (2.0) and an id
+ /// generated from the current time.
+ pub fn new(method: &str, params: E) -> RpcRequest<E> {
+ RpcRequest {
+ jsonrpc: "2.0".to_string(),
+ id: time::precise_time_ns(),
+ method: method.to_string(),
+ params: params
+ }
+ }
+
+ /// Send a JSON-RPC POST request to the specified URL.
+ pub fn send(&self, url: Url) -> Result<String, String> {
+ let client = AuthClient::default();
+ let body = json::encode(self).expect("couldn't encode RpcRequest");
+ let resp_rx = client.post(url, Some(body.into_bytes()));
+ let resp = resp_rx.recv().expect("no RpcRequest response received");
+ let data = try!(resp.map_err(|err| format!("{}", err)));
+ String::from_utf8(data).map_err(|err| format!("{}", err))
+ }
+}
+
+
+/// Encapsulates a successful JSON-RPC response.
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct RpcOk<D: Decodable> {
+ pub jsonrpc: String,
+ pub id: u64,
+ pub result: Option<D>
+}
+
+impl<D: Decodable> RpcOk<D> {
+ /// Instantiate a new successful JSON-RPC response type.
+ pub fn new(id: u64, result: Option<D>) -> RpcOk<D> {
+ RpcOk {
+ jsonrpc: "2.0".to_string(),
+ id: id,
+ result: result
+ }
+ }
+}
+
+
+/// The error code as [specified by jsonrpc](http://www.jsonrpc.org/specification#error_object).
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct ErrorCode {
+ pub code: i32,
+ pub message: String,
+ pub data: String
+}
+
+/// Encapsulates a failed JSON-RPC response.
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct RpcErr {
+ pub jsonrpc: String,
+ pub id: u64,
+ pub error: ErrorCode
+}
+
+impl RpcErr {
+ /// Instantiate a new `RpcErr` type with the default JSON-RPC version (2.0).
+ pub fn new(id: u64, error: ErrorCode) -> Self {
+ RpcErr { jsonrpc: "2.0".to_string(), id: id, error: error }
+ }
+
+ /// Create a new `RpcErr` with a reason of "Invalid Request".
+ pub fn invalid_request(id: u64, data: String) -> Self {
+ Self::new(id, ErrorCode { code: -32600, message: "Invalid Request".to_string(), data: data })
+ }
+
+ /// Create a new `RpcErr` with a reason of "Method not found".
+ pub fn method_not_found(id: u64, data: String) -> Self {
+ Self::new(id, ErrorCode { code: -32601, message: "Method not found".to_string(), data: data })
+ }
+
+ /// Create a new `RpcErr` with a reason of "Parse error".
+ pub fn parse_error(data: String) -> Self {
+ Self::new(0, ErrorCode { code: -32700, message: "Parse error".to_string(), data: data })
+ }
+
+ /// Create a new `RpcErr` with a reason of "Invalid params".
+ pub fn invalid_params(id: u64, data: String) -> Self {
+ Self::new(id, ErrorCode { code: -32602, message: "Invalid params".to_string(), data: data })
+ }
+
+ /// Create a new `RpcErr` with a reason of "Couldn't handle request".
+ pub fn unspecified(id: u64, data: String) -> Self {
+ Self::new(id, ErrorCode { code: -32100, message: "Couldn't handle request".to_string(), data: data })
+ }
+}
diff --git a/src/datatype/mod.rs b/src/datatype/mod.rs
new file mode 100644
index 0000000..8a9ca4e
--- /dev/null
+++ b/src/datatype/mod.rs
@@ -0,0 +1,26 @@
+pub mod auth;
+pub mod command;
+pub mod config;
+pub mod dbus;
+pub mod error;
+pub mod event;
+pub mod json_rpc;
+pub mod package;
+pub mod system_info;
+pub mod update_report;
+pub mod url;
+
+pub use self::auth::{AccessToken, Auth, ClientId, ClientSecret, ClientCredentials};
+pub use self::command::Command;
+pub use self::config::{AuthConfig, CoreConfig, Config, DBusConfig, DeviceConfig,
+ GatewayConfig, RviConfig};
+pub use self::error::Error;
+pub use self::event::Event;
+pub use self::json_rpc::{RpcRequest, RpcOk, RpcErr};
+pub use self::package::{ChunkReceived, DownloadStarted, DownloadComplete, Package,
+ PendingUpdateRequest, UpdateAvailable, UpdateRequestId};
+pub use self::system_info::SystemInfo;
+pub use self::update_report::{DeviceReport, InstalledFirmware, InstalledPackage,
+ InstalledSoftware, OperationResult, UpdateResultCode,
+ UpdateReport};
+pub use self::url::{Method, Url};
diff --git a/src/datatype/package.rs b/src/datatype/package.rs
new file mode 100644
index 0000000..146ff06
--- /dev/null
+++ b/src/datatype/package.rs
@@ -0,0 +1,79 @@
+use std::fmt::{Display, Formatter, Result as FmtResult};
+
+use rvi::services::LocalServices;
+
+
+/// Encapsulate a `String` type used to represent the `Package` version.
+pub type Version = String;
+
+/// Encodes the name and version of a specific package.
+#[derive(Debug, PartialEq, Eq, RustcEncodable, RustcDecodable, Clone)]
+pub struct Package {
+ pub name: String,
+ pub version: Version
+}
+
+impl Display for Package {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "{} {}", self.name, self.version)
+ }
+}
+
+
+/// Encapsulate a `String` type as the id of a specific update request.
+pub type UpdateRequestId = String;
+
+/// A single pending update request to be installed by the client.
+#[allow(non_snake_case)]
+#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
+pub struct PendingUpdateRequest {
+ pub requestId: UpdateRequestId,
+ pub installPos: i32,
+ pub packageId: Package,
+ pub createdAt: String
+}
+
+/// A notification from RVI that a new update is available.
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
+pub struct UpdateAvailable {
+ pub update_id: String,
+ pub signature: String,
+ pub description: String,
+ pub request_confirmation: bool,
+ pub size: u64
+}
+
+/// A JSON-RPC request type to notify RVI that a new package download has started.
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct DownloadStarted {
+ pub device: String,
+ pub update_id: UpdateRequestId,
+ pub services: LocalServices,
+}
+
+/// A JSON-RPC request type to notify RVI that a new package chunk was received.
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct ChunkReceived {
+ pub device: String,
+ pub update_id: UpdateRequestId,
+ pub chunks: Vec<u64>,
+}
+
+/// A notification to indicate to any external package manager that the package
+/// download has successfully completed.
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
+pub struct DownloadComplete {
+ pub update_id: String,
+ pub update_image: String,
+ pub signature: String
+}
+
+impl Default for DownloadComplete {
+ fn default() -> Self {
+ DownloadComplete {
+ update_id: "".to_string(),
+ update_image: "".to_string(),
+ signature: "".to_string()
+ }
+ }
+}
diff --git a/src/datatype/system_info.rs b/src/datatype/system_info.rs
new file mode 100644
index 0000000..2d8fff2
--- /dev/null
+++ b/src/datatype/system_info.rs
@@ -0,0 +1,46 @@
+use rustc_serialize::{Decoder, Decodable};
+use std::process::Command;
+use std::str::FromStr;
+
+use datatype::Error;
+
+
+/// A reference to the command that will report on the system information.
+#[derive(PartialEq, Eq, Debug, Clone)]
+pub struct SystemInfo {
+ command: String
+}
+
+impl SystemInfo {
+ /// Instantiate a new type to report on the system information.
+ pub fn new(command: String) -> SystemInfo {
+ SystemInfo { command: command }
+ }
+
+ /// Generate a new report of the system information.
+ pub fn report(&self) -> Result<String, Error> {
+ Command::new(&self.command)
+ .output().map_err(|err| Error::SystemInfo(err.to_string()))
+ .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8))
+ }
+}
+
+impl Default for SystemInfo {
+ fn default() -> SystemInfo {
+ SystemInfo::new("system_info.sh".to_string())
+ }
+}
+
+impl FromStr for SystemInfo {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<SystemInfo, Error> {
+ Ok(SystemInfo::new(s.to_string()))
+ }
+}
+
+impl Decodable for SystemInfo {
+ fn decode<D: Decoder>(d: &mut D) -> Result<SystemInfo, D::Error> {
+ d.read_str().and_then(|s| Ok(s.parse::<SystemInfo>().unwrap()))
+ }
+}
diff --git a/src/datatype/update_report.rs b/src/datatype/update_report.rs
new file mode 100644
index 0000000..f81fbc4
--- /dev/null
+++ b/src/datatype/update_report.rs
@@ -0,0 +1,182 @@
+use rustc_serialize::{Encodable, Encoder};
+use std::str::FromStr;
+
+use datatype::{Error, UpdateRequestId};
+
+
+/// An encodable report of the installation outcome.
+#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)]
+pub struct UpdateReport {
+ pub update_id: UpdateRequestId,
+ pub operation_results: Vec<OperationResult>
+}
+
+impl UpdateReport {
+ /// Instantiate a new report with a vector of installation outcomes.
+ pub fn new(update_id: String, results: Vec<OperationResult>) -> UpdateReport {
+ UpdateReport { update_id: update_id, operation_results: results }
+ }
+
+ /// Instantiate a new report with a single installation outcome.
+ pub fn single(update_id: UpdateRequestId, result_code: UpdateResultCode, result_text: String) -> UpdateReport {
+ let result = OperationResult {
+ id: update_id.clone(),
+ result_code: result_code,
+ result_text: result_text
+ };
+ UpdateReport { update_id: update_id, operation_results: vec![result] }
+ }
+}
+
+impl Default for UpdateReport {
+ fn default() -> Self {
+ UpdateReport { update_id: "".to_string(), operation_results: Vec::new() }
+ }
+}
+
+
+/// Bind the installation outcome report to a specific device.
+#[derive(RustcEncodable, Clone, Debug)]
+pub struct DeviceReport<'d, 'r> {
+ pub device: &'d str,
+ pub update_report: &'r UpdateReport
+}
+
+impl<'d, 'r> DeviceReport<'d, 'r> {
+ /// Instantiate a new installation outcome report for a specific device.
+ pub fn new(device: &'d str, update_report: &'r UpdateReport) -> DeviceReport<'d, 'r> {
+ DeviceReport { device: device, update_report: update_report }
+ }
+}
+
+
+/// Enumerate the possible outcomes when trying to install a package.
+#[allow(non_camel_case_types)]
+#[derive(RustcDecodable, Clone, Debug, PartialEq, Eq)]
+pub enum UpdateResultCode {
+ /// Operation executed successfully
+ OK = 0,
+ /// Operation has already been processed
+ ALREADY_PROCESSED,
+ /// Dependency failure during package install, upgrade, or removal
+ DEPENDENCY_FAILURE,
+ /// Update image integrity has been compromised
+ VALIDATION_FAILED,
+ /// Package installation failed
+ INSTALL_FAILED,
+ /// Package upgrade failed
+ UPGRADE_FAILED,
+ /// Package removal failed
+ REMOVAL_FAILED,
+ /// The module loader could not flash its managed module
+ FLASH_FAILED,
+ /// Partition creation failed
+ CREATE_PARTITION_FAILED,
+ /// Partition deletion failed
+ DELETE_PARTITION_FAILED,
+ /// Partition resize failed
+ RESIZE_PARTITION_FAILED,
+ /// Partition write failed
+ WRITE_PARTITION_FAILED,
+ /// Partition patching failed
+ PATCH_PARTITION_FAILED,
+ /// User declined the update
+ USER_DECLINED,
+ /// Software was blacklisted
+ SOFTWARE_BLACKLISTED,
+ /// Ran out of disk space
+ DISK_FULL,
+ /// Software package not found
+ NOT_FOUND,
+ /// Tried to downgrade to older version
+ OLD_VERSION,
+ /// SWM Internal integrity error
+ INTERNAL_ERROR,
+ /// Other error
+ GENERAL_ERROR,
+}
+
+impl FromStr for UpdateResultCode {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<UpdateResultCode, Error> {
+ match &*s.to_uppercase() {
+ "0" | "OK" => Ok(UpdateResultCode::OK),
+ "1" | "ALREADY_PROCESSED" => Ok(UpdateResultCode::ALREADY_PROCESSED),
+ "2" | "DEPENDENCY_FAILURE" => Ok(UpdateResultCode::DEPENDENCY_FAILURE),
+ "3" | "VALIDATION_FAILED" => Ok(UpdateResultCode::VALIDATION_FAILED),
+ "4" | "INSTALL_FAILED" => Ok(UpdateResultCode::INSTALL_FAILED),
+ "5" | "UPGRADE_FAILED" => Ok(UpdateResultCode::UPGRADE_FAILED),
+ "6" | "REMOVAL_FAILED" => Ok(UpdateResultCode::REMOVAL_FAILED),
+ "7" | "FLASH_FAILED" => Ok(UpdateResultCode::FLASH_FAILED),
+ "8" | "CREATE_PARTITION_FAILED" => Ok(UpdateResultCode::CREATE_PARTITION_FAILED),
+ "9" | "DELETE_PARTITION_FAILED" => Ok(UpdateResultCode::DELETE_PARTITION_FAILED),
+ "10" | "RESIZE_PARTITION_FAILED" => Ok(UpdateResultCode::RESIZE_PARTITION_FAILED),
+ "11" | "WRITE_PARTITION_FAILED" => Ok(UpdateResultCode::WRITE_PARTITION_FAILED),
+ "12" | "PATCH_PARTITION_FAILED" => Ok(UpdateResultCode::PATCH_PARTITION_FAILED),
+ "13" | "USER_DECLINED" => Ok(UpdateResultCode::USER_DECLINED),
+ "14" | "SOFTWARE_BLACKLISTED" => Ok(UpdateResultCode::SOFTWARE_BLACKLISTED),
+ "15" | "DISK_FULL" => Ok(UpdateResultCode::DISK_FULL),
+ "16" | "NOT_FOUND" => Ok(UpdateResultCode::NOT_FOUND),
+ "17" | "OLD_VERSION" => Ok(UpdateResultCode::OLD_VERSION),
+ "18" | "INTERNAL_ERROR" => Ok(UpdateResultCode::INTERNAL_ERROR),
+ "19" | "GENERAL_ERROR" => Ok(UpdateResultCode::GENERAL_ERROR),
+ _ => Err(Error::Parse(format!("unknown UpdateResultCode: {}", s)))
+ }
+ }
+}
+
+impl Encodable for UpdateResultCode {
+ fn encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error> {
+ s.emit_u64(self.clone() as u64)
+ }
+}
+
+
+/// An encodable response of the installation outcome for a particular update ID.
+#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)]
+pub struct OperationResult {
+ pub id: String,
+ pub result_code: UpdateResultCode,
+ pub result_text: String,
+}
+
+
+/// Encapsulates a single firmware installed on the device.
+#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)]
+pub struct InstalledFirmware {
+ pub module: String,
+ pub firmware_id: String,
+ pub last_modified: u64
+}
+
+
+/// Encapsulates a single package installed on the device.
+#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)]
+pub struct InstalledPackage {
+ pub package_id: String,
+ pub name: String,
+ pub description: String,
+ pub last_modified: u64
+}
+
+
+/// An encodable list of packages and firmwares to send to RVI.
+#[derive(RustcDecodable, RustcEncodable, Clone, Debug, PartialEq, Eq)]
+pub struct InstalledSoftware {
+ pub packages: Vec<InstalledPackage>,
+ pub firmwares: Vec<InstalledFirmware>
+}
+
+impl InstalledSoftware {
+ /// Instantiate a new list of the software installed on the device.
+ pub fn new(packages: Vec<InstalledPackage>, firmwares: Vec<InstalledFirmware>) -> InstalledSoftware {
+ InstalledSoftware { packages: packages, firmwares: firmwares }
+ }
+}
+
+impl Default for InstalledSoftware {
+ fn default() -> Self {
+ InstalledSoftware { packages: Vec::new(), firmwares: Vec::new() }
+ }
+}
diff --git a/src/datatype/url.rs b/src/datatype/url.rs
new file mode 100644
index 0000000..5a9c97a
--- /dev/null
+++ b/src/datatype/url.rs
@@ -0,0 +1,106 @@
+use hyper::method;
+use rustc_serialize::{Decoder, Decodable};
+use std::borrow::Cow;
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::io;
+use std::net::ToSocketAddrs;
+use std::str::FromStr;
+use url;
+use url::SocketAddrs;
+
+use datatype::Error;
+
+
+/// Encapsulate a single crate URL with additional methods and traits.
+#[derive(PartialEq, Eq, Clone, Debug)]
+pub struct Url(pub url::Url);
+
+impl Url {
+ /// Append the string suffix to this URL.
+ pub fn join(&self, suffix: &str) -> Result<Url, Error> {
+ let url = try!(self.0.join(suffix));
+ Ok(Url(url))
+ }
+
+ /// Return the encapsulated crate URL.
+ pub fn inner(&self) -> url::Url {
+ self.0.clone()
+ }
+}
+
+impl<'a> Into<Cow<'a, Url>> for Url {
+ fn into(self) -> Cow<'a, Url> {
+ Cow::Owned(self)
+ }
+}
+
+impl FromStr for Url {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let url = try!(url::Url::parse(s));
+ Ok(Url(url))
+ }
+}
+
+impl Decodable for Url {
+ fn decode<D: Decoder>(d: &mut D) -> Result<Url, D::Error> {
+ let s = try!(d.read_str());
+ s.parse().map_err(|e: Error| d.error(&e.to_string()))
+ }
+}
+
+impl ToSocketAddrs for Url {
+ type Iter = SocketAddrs;
+
+ fn to_socket_addrs(&self) -> io::Result<Self::Iter> {
+ self.0.to_socket_addrs()
+ }
+}
+
+impl Display for Url {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ let host = self.0.host_str().unwrap_or("localhost");
+ if let Some(port) = self.0.port() {
+ write!(f, "{}://{}:{}{}", self.0.scheme(), host, port, self.0.path())
+ } else {
+ write!(f, "{}://{}{}", self.0.scheme(), host, self.0.path())
+ }
+ }
+}
+
+
+/// Enumerate the supported HTTP methods.
+#[derive(Clone, Debug)]
+pub enum Method {
+ Get,
+ Post,
+ Put,
+}
+
+impl Into<method::Method> for Method {
+ fn into(self) -> method::Method {
+ match self {
+ Method::Get => method::Method::Get,
+ Method::Post => method::Method::Post,
+ Method::Put => method::Method::Put,
+ }
+ }
+}
+
+impl<'a> Into<Cow<'a, Method>> for Method {
+ fn into(self) -> Cow<'a, Method> {
+ Cow::Owned(self)
+ }
+}
+
+impl Display for Method {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ let method = match *self {
+ Method::Get => "GET".to_string(),
+ Method::Post => "POST".to_string(),
+ Method::Put => "PUT".to_string(),
+ };
+ write!(f, "{}", method)
+ }
+}
diff --git a/src/event/inbound.rs b/src/event/inbound.rs
deleted file mode 100644
index 41a757c..0000000
--- a/src/event/inbound.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-
-#[derive(RustcDecodable, Clone)]
-pub struct UpdateAvailable {
- pub update_id: String,
- pub signature: String,
- pub description: String,
- pub request_confirmation: bool,
- pub size: u64
-}
-
-#[derive(RustcDecodable, Clone)]
-pub struct DownloadComplete {
- pub update_id: String,
- pub update_image: String,
- pub signature: String
-}
-
-#[derive(RustcDecodable, Clone)]
-pub struct GetInstalledSoftware {
- pub include_packages: bool,
- pub include_module_firmware: bool
-}
-
-pub enum InboundEvent {
- UpdateAvailable(UpdateAvailable),
- DownloadComplete(DownloadComplete),
- GetInstalledSoftware(GetInstalledSoftware)
-}
diff --git a/src/event/mod.rs b/src/event/mod.rs
deleted file mode 100644
index b69d876..0000000
--- a/src/event/mod.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-pub mod inbound;
-pub mod outbound;
-
-pub type UpdateId = String;
-
-pub enum Event {
- Inbound(inbound::InboundEvent),
- OutBound(outbound::OutBoundEvent)
-}
diff --git a/src/event/outbound.rs b/src/event/outbound.rs
deleted file mode 100644
index 4fb6fce..0000000
--- a/src/event/outbound.rs
+++ /dev/null
@@ -1,68 +0,0 @@
-use super::UpdateId;
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct InstalledFirmware {
- pub module: String,
- pub firmware_id: String,
- pub last_modified: u64
-}
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct InstalledFirmwares(pub Vec<InstalledFirmware>);
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct InstalledPackage {
- pub package_id: String,
- pub name: String,
- pub description: String,
- pub last_modified: u64
-}
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct InstalledPackages(pub Vec<InstalledPackage>);
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct InstalledSoftware {
- pub packages: Vec<InstalledPackage>,
- pub firmware: Vec<InstalledFirmware>
-}
-
-impl InstalledSoftware {
- pub fn new(p: InstalledPackages, f: InstalledFirmwares) -> InstalledSoftware {
- InstalledSoftware {
- packages: p.0,
- firmware: f.0
- }
- }
-}
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct OperationResult {
- pub id: String,
- pub result_code: u32,
- pub result_text: String
-}
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct OperationResults(pub Vec<OperationResult>);
-
-#[derive(RustcDecodable, RustcEncodable, Clone)]
-pub struct UpdateReport {
- pub update_id: String,
- pub operation_results: Vec<OperationResult>
-}
-
-impl UpdateReport {
- pub fn new(id: String, res: OperationResults) -> UpdateReport {
- UpdateReport {
- update_id: id,
- operation_results: res.0
- }
- }
-}
-
-pub enum OutBoundEvent {
- InitiateDownload(UpdateId),
- AbortDownload(UpdateId),
- UpdateReport(UpdateReport)
-}
diff --git a/src/gateway/console.rs b/src/gateway/console.rs
new file mode 100644
index 0000000..da8ba80
--- /dev/null
+++ b/src/gateway/console.rs
@@ -0,0 +1,46 @@
+use chan;
+use chan::Sender;
+use std::{io, thread};
+use std::io::Write;
+use std::string::ToString;
+use std::sync::{Arc, Mutex};
+
+use datatype::{Command, Error, Event};
+use super::gateway::{Gateway, Interpret};
+
+
+/// The console gateway is used for REPL-style interaction with the client.
+pub struct Console;
+
+impl Gateway for Console {
+ fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
+ let (etx, erx) = chan::sync::<Event>(0);
+ let etx = Arc::new(Mutex::new(etx));
+
+ thread::spawn(move || {
+ loop {
+ match get_input() {
+ Ok(cmd) => itx.send(Interpret { command: cmd, response_tx: Some(etx.clone()) }),
+ Err(err) => error!("Console Error: {:?}", err)
+ }
+ }
+ });
+
+ thread::spawn(move || {
+ loop {
+ let e = erx.recv().expect("all console event transmitters are closed");
+ info!("Console Response: {}", e.to_string());
+ }
+ });
+
+ Ok(info!("Console gateway started."))
+ }
+}
+
+fn get_input() -> Result<Command, Error> {
+ let mut input = String::new();
+ let _ = io::stdout().write(b"> ");
+ io::stdout().flush().expect("couldn't flush console stdout buffer");
+ let _ = io::stdin().read_line(&mut input);
+ input.parse()
+}
diff --git a/src/gateway/dbus.rs b/src/gateway/dbus.rs
new file mode 100644
index 0000000..3bd41ea
--- /dev/null
+++ b/src/gateway/dbus.rs
@@ -0,0 +1,170 @@
+use chan::Sender;
+use dbus::{Connection, BusType, ConnectionItem, FromMessageItem,
+ Message, MessageItem, NameFlag};
+use dbus::obj::{Argument, Interface, Method, MethodResult, ObjectPath};
+use std::thread;
+use std::convert::From;
+
+use datatype::{Command, DBusConfig, Event, InstalledFirmware, InstalledPackage,
+ InstalledSoftware, OperationResult, UpdateReport};
+use datatype::dbus;
+use super::{Gateway, Interpret};
+
+
+/// The `DBus` gateway is used with the RVI module for communicating with the
+/// system session bus.
+pub struct DBus {
+ pub dbus_cfg: DBusConfig,
+ pub itx: Sender<Interpret>,
+}
+
+impl Gateway for DBus {
+ fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
+ let dbus_cfg = self.dbus_cfg.clone();
+
+ thread::spawn(move || {
+ let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session");
+ conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).unwrap();
+
+ let mut obj_path = ObjectPath::new(&conn, &dbus_cfg.path, true);
+ obj_path.insert_interface(&dbus_cfg.interface, default_interface(itx));
+ obj_path.set_registered(true).expect("couldn't set registration status");
+
+ loop {
+ for item in conn.iter(1000) {
+ if let ConnectionItem::MethodCall(mut msg) = item {
+ info!("DBus method call: {:?}", msg);
+ obj_path.handle_message(&mut msg).map(|result| {
+ let _ = result.map_err(|_| error!("dbus method call failed: {:?}", msg));
+ });
+ }
+ }
+ }
+ });
+
+ Ok(info!("DBus gateway started."))
+ }
+
+ fn pulse(&self, event: Event) {
+ match event {
+ Event::NewUpdateAvailable(avail) => {
+ let msg = self.new_message("updateAvailable", &[
+ MessageItem::from(avail.update_id),
+ MessageItem::from(avail.signature),
+ MessageItem::from(avail.description),
+ MessageItem::from(avail.request_confirmation)
+ ]);
+ let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session");
+ let _ = conn.send(msg).map_err(|_| error!("couldn't send updateAvailable message"));
+ }
+
+ Event::DownloadComplete(comp) => {
+ let msg = self.new_message("downloadComplete", &[
+ MessageItem::from(comp.update_image),
+ MessageItem::from(comp.signature)
+ ]);
+ let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session");
+ let _ = conn.send(msg).map_err(|_| error!("couldn't send downloadComplete message"));
+ }
+
+ Event::InstalledSoftwareNeeded => {
+ let msg = self.new_message("getInstalledPackages", &[
+ MessageItem::from(true), // include packages?
+ MessageItem::from(false) // include firmware?
+ ]);
+ let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session");
+ let reply = conn.send_with_reply_and_block(msg, self.dbus_cfg.timeout).unwrap();
+
+ let _ = || -> Result<InstalledSoftware, ()> {
+ let mut args = reply.get_items().into_iter();
+
+ let pkg_arg = try!(args.next().ok_or(()));
+ let msgs: &Vec<MessageItem> = try!(FromMessageItem::from(&pkg_arg));
+ let packages = try!(msgs.into_iter()
+ .map(|item| -> Result<InstalledPackage, ()> {
+ FromMessageItem::from(item)
+ }).collect::<Result<Vec<InstalledPackage>, ()>>());
+
+ let firm_arg = try!(args.next().ok_or(()));
+ let msgs: &Vec<MessageItem> = try!(FromMessageItem::from(&firm_arg));
+ let firmwares = try!(msgs.into_iter()
+ .map(|item| -> Result<InstalledFirmware, ()> {
+ FromMessageItem::from(item)
+ }).collect::<Result<Vec<InstalledFirmware>, ()>>());
+
+ Ok(InstalledSoftware::new(packages, firmwares))
+ }().map(|inst| send(&self.itx, Command::SendInstalledSoftware(inst)))
+ .map_err(|_| error!("unable to ReportInstalledSoftware"));
+ }
+
+ _ => ()
+ }
+ }
+}
+
+impl DBus {
+ fn new_message(&self, method: &str, args: &[MessageItem]) -> Message {
+ let mgr = self.dbus_cfg.software_manager.clone();
+ let path = self.dbus_cfg.software_manager_path.clone();
+ let result = Message::new_method_call(&mgr, &path, &mgr, method);
+ let mut msg = result.expect("couldn't create dbus message");
+ msg.append_items(args);
+ msg
+ }
+}
+
+fn default_interface<'i>(itx: Sender<Interpret>) -> Interface<'i> {
+ let initiate_itx = itx.clone();
+ let initiate_download = Method::new(
+ "initiateDownload",
+ vec![Argument::new("update_id", "s")],
+ vec![],
+ Box::new(move |msg| handle_initiate_download(&initiate_itx, msg))
+ );
+
+ let update_itx = itx.clone();
+ let update_report = Method::new(
+ "updateReport",
+ vec![Argument::new("update_id", "s"), Argument::new("operations_results", "aa{sv}")],
+ vec![],
+ Box::new(move |msg| handle_update_report(&update_itx, msg))
+ );
+
+ Interface::new(vec![initiate_download, update_report], vec![], vec![])
+}
+
+fn send(itx: &Sender<Interpret>, cmd: Command) {
+ itx.send(Interpret { command: cmd, response_tx: None });
+}
+
+fn handle_initiate_download(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult {
+ let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg()));
+ debug!("handle_initiate_download: sender={:?}, msg={:?}", sender, msg);
+
+ let mut args = msg.get_items().into_iter();
+ let arg_id = try!(args.next().ok_or(dbus::missing_arg()));
+ let update_id: &String = try!(FromMessageItem::from(&arg_id).or(Err(dbus::malformed_arg())));
+ send(itx, Command::StartDownload(vec![update_id.clone()]));
+
+ Ok(vec![])
+}
+
+fn handle_update_report(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult {
+ let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg()));
+ debug!("handle_update_report: sender ={:?}, msg ={:?}", sender, msg);
+ let mut args = msg.get_items().into_iter();
+
+ let id_arg = try!(args.next().ok_or(dbus::missing_arg()));
+ let update_id: &String = try!(FromMessageItem::from(&id_arg).or(Err(dbus::malformed_arg())));
+
+ let results_arg = try!(args.next().ok_or(dbus::missing_arg()));
+ let msgs: &Vec<MessageItem> = try!(FromMessageItem::from(&results_arg).or(Err(dbus::malformed_arg())));
+ let results = try!(msgs.into_iter()
+ .map(|item| -> Result<OperationResult, ()> { FromMessageItem::from(item) })
+ .collect::<Result<Vec<OperationResult>, ()>>()
+ .or(Err(dbus::malformed_arg()))
+ );
+ send(itx, Command::SendUpdateReport(UpdateReport::new(update_id.clone(), results)));
+
+ Ok(vec![])
+}
diff --git a/src/gateway/gateway.rs b/src/gateway/gateway.rs
new file mode 100644
index 0000000..96fef8d
--- /dev/null
+++ b/src/gateway/gateway.rs
@@ -0,0 +1,32 @@
+use chan::{Sender, Receiver};
+use std::process;
+use std::sync::{Arc, Mutex};
+
+use datatype::{Command, Event};
+
+
+/// Encapsulates a `Command` to be sent to the `GlobalInterpreter` for processing,
+/// with an optional channel to receive the outcome `Event`.
+pub struct Interpret {
+ pub command: Command,
+ pub response_tx: Option<Arc<Mutex<Sender<Event>>>>,
+}
+
+/// A `Gateway` may send `Command`s to the `GlobalInterpreter`, as well as listen
+/// to the system-wide `Event` messages.
+pub trait Gateway {
+ fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String>;
+
+ fn start(&mut self, itx: Sender<Interpret>, erx: Receiver<Event>) {
+ self.initialize(itx).unwrap_or_else(|err| {
+ error!("couldn't start gateway: {}", err);
+ process::exit(1);
+ });
+
+ loop {
+ self.pulse(erx.recv().expect("all gateway event transmitters are closed"));
+ }
+ }
+
+ fn pulse(&self, _: Event) {} // ignore global events by default
+}
diff --git a/src/gateway/http.rs b/src/gateway/http.rs
new file mode 100644
index 0000000..990a1fc
--- /dev/null
+++ b/src/gateway/http.rs
@@ -0,0 +1,135 @@
+use chan;
+use chan::{Sender, Receiver};
+use hyper::StatusCode;
+use hyper::net::{HttpStream, Transport};
+use hyper::server::{Server as HyperServer, Request as HyperRequest};
+use rustc_serialize::json;
+use std::thread;
+use std::sync::{Arc, Mutex};
+
+use datatype::{Command, Event};
+use gateway::{Gateway, Interpret};
+use http::{Server, ServerHandler};
+
+
+/// The `Http` gateway parses `Command`s from the body of incoming requests.
+pub struct Http {
+ pub server: String,
+}
+
+impl Gateway for Http {
+ fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
+ let itx = Arc::new(Mutex::new(itx));
+ let server = match HyperServer::http(&self.server.parse().expect("couldn't parse http address")) {
+ Ok(server) => server,
+ Err(err) => return Err(format!("couldn't start http gateway: {}", err))
+ };
+ thread::spawn(move || {
+ let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap();
+ server.run();
+ });
+
+ Ok(info!("HTTP gateway listening at http://{}", self.server))
+ }
+}
+
+
+struct HttpHandler {
+ itx: Arc<Mutex<Sender<Interpret>>>,
+ response_rx: Option<Receiver<Event>>
+}
+
+impl HttpHandler {
+ fn new(itx: Arc<Mutex<Sender<Interpret>>>) -> ServerHandler<HttpStream> {
+ ServerHandler::new(Box::new(HttpHandler { itx: itx, response_rx: None }))
+ }
+}
+
+impl<T: Transport> Server<T> for HttpHandler {
+ fn headers(&mut self, _: HyperRequest<T>) {}
+
+ fn request(&mut self, body: Vec<u8>) {
+ String::from_utf8(body).map(|body| {
+ json::decode::<Command>(&body).map(|cmd| {
+ info!("Incoming HTTP request command: {}", cmd);
+ let (etx, erx) = chan::async::<Event>();
+ self.response_rx = Some(erx);
+ self.itx.lock().unwrap().send(Interpret {
+ command: cmd,
+ response_tx: Some(Arc::new(Mutex::new(etx))),
+ });
+ }).unwrap_or_else(|err| error!("http request parse json: {}", err))
+ }).unwrap_or_else(|err| error!("http request parse string: {}", err))
+ }
+
+ fn response(&mut self) -> (StatusCode, Option<Vec<u8>>) {
+ self.response_rx.as_ref().map_or((StatusCode::BadRequest, None), |rx| {
+ rx.recv().map_or_else(|| {
+ error!("on_response receiver error");
+ (StatusCode::InternalServerError, None)
+ }, |event| {
+ json::encode(&event).map(|body| {
+ (StatusCode::Ok, Some(body.into_bytes()))
+ }).unwrap_or_else(|err| {
+ error!("on_response encoding json: {:?}", err);
+ (StatusCode::InternalServerError, None)
+ })
+ })
+ })
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use chan;
+ use crossbeam;
+ use rustc_serialize::json;
+ use std::path::Path;
+ use std::thread;
+
+ use super::*;
+ use gateway::{Gateway, Interpret};
+ use datatype::{Command, Event};
+ use http::{AuthClient, Client, set_ca_certificates};
+
+
+ #[test]
+ fn http_connections() {
+ set_ca_certificates(&Path::new("run/sota_certificates"));
+
+ let (etx, erx) = chan::sync::<Event>(0);
+ let (itx, irx) = chan::sync::<Interpret>(0);
+
+ thread::spawn(move || Http { server: "127.0.0.1:8888".to_string() }.start(itx, erx));
+ thread::spawn(move || {
+ let _ = etx; // move into this scope
+ loop {
+ let interpret = irx.recv().expect("itx is closed");
+ match interpret.command {
+ Command::StartDownload(ids) => {
+ let tx = interpret.response_tx.unwrap();
+ tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ }
+ _ => panic!("expected AcceptUpdates"),
+ }
+ }
+ });
+
+ crossbeam::scope(|scope| {
+ for id in 0..10 {
+ scope.spawn(move || {
+ let cmd = Command::StartDownload(vec!(format!("{}", id)));
+ let client = AuthClient::default();
+ let url = "http://127.0.0.1:8888".parse().unwrap();
+ let body = json::encode(&cmd).unwrap();
+ let resp_rx = client.post(url, Some(body.into_bytes()));
+ let resp = resp_rx.recv().unwrap().unwrap();
+ let text = String::from_utf8(resp).unwrap();
+ assert_eq!(json::decode::<Event>(&text).unwrap(),
+ Event::FoundSystemInfo(format!("{}", id)));
+ });
+ }
+ });
+ }
+}
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
new file mode 100644
index 0000000..027ba51
--- /dev/null
+++ b/src/gateway/mod.rs
@@ -0,0 +1,13 @@
+pub mod console;
+pub mod dbus;
+pub mod gateway;
+pub mod http;
+pub mod socket;
+pub mod websocket;
+
+pub use self::console::Console;
+pub use self::dbus::DBus;
+pub use self::gateway::{Gateway, Interpret};
+pub use self::http::Http;
+pub use self::socket::Socket;
+pub use self::websocket::Websocket;
diff --git a/src/gateway/socket.rs b/src/gateway/socket.rs
new file mode 100644
index 0000000..f252e7c
--- /dev/null
+++ b/src/gateway/socket.rs
@@ -0,0 +1,161 @@
+use chan;
+use chan::Sender;
+use rustc_serialize::json;
+use std::io::{BufReader, Read, Write};
+use std::net::Shutdown;
+use std::sync::{Arc, Mutex};
+use std::{fs, thread};
+
+use datatype::{Command, Error, Event};
+use super::{Gateway, Interpret};
+use unix_socket::{UnixListener, UnixStream};
+
+
+/// The `Socket` gateway is used for communication via Unix Domain Sockets.
+pub struct Socket {
+ pub commands_path: String,
+ pub events_path: String,
+}
+
+impl Gateway for Socket {
+ fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
+ let _ = fs::remove_file(&self.commands_path);
+ let commands = match UnixListener::bind(&self.commands_path) {
+ Ok(sock) => sock,
+ Err(err) => return Err(format!("couldn't open commands socket: {}", err))
+ };
+
+ let itx = Arc::new(Mutex::new(itx));
+ thread::spawn(move || {
+ for conn in commands.incoming() {
+ if let Err(err) = conn {
+ error!("couldn't get commands socket connection: {}", err);
+ continue
+ }
+ let mut stream = conn.unwrap();
+ let itx = itx.clone();
+
+ thread::spawn(move || {
+ let resp = handle_client(&mut stream, itx)
+ .map(|ev| json::encode(&ev).expect("couldn't encode Event").into_bytes())
+ .unwrap_or_else(|err| format!("{}", err).into_bytes());
+
+ stream.write_all(&resp)
+ .unwrap_or_else(|err| error!("couldn't write to commands socket: {}", err));
+ stream.shutdown(Shutdown::Write)
+ .unwrap_or_else(|err| error!("couldn't close commands socket: {}", err));
+ });
+ }
+ });
+
+ Ok(info!("Socket listening for commands at {} and sending events to {}.",
+ self.commands_path, self.events_path))
+ }
+
+ fn pulse(&self, event: Event) {
+ match event {
+ Event::DownloadComplete(dl) => {
+ let _ = UnixStream::connect(&self.events_path).map(|mut stream| {
+ stream.write_all(&json::encode(&dl).expect("couldn't encode Event").into_bytes())
+ .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err));
+ stream.shutdown(Shutdown::Write)
+ .unwrap_or_else(|err| error!("couldn't close events socket: {}", err));
+ }).map_err(|err| error!("couldn't open events socket: {}", err));
+ }
+
+ _ => ()
+ }
+ }
+}
+
+fn handle_client(stream: &mut UnixStream, itx: Arc<Mutex<Sender<Interpret>>>) -> Result<Event, Error> {
+ info!("New domain socket connection");
+ let mut reader = BufReader::new(stream);
+ let mut input = String::new();
+ try!(reader.read_to_string(&mut input));
+ debug!("socket input: {}", input);
+
+ let cmd = try!(input.parse::<Command>());
+ let (etx, erx) = chan::async::<Event>();
+ itx.lock().unwrap().send(Interpret {
+ command: cmd,
+ response_tx: Some(Arc::new(Mutex::new(etx))),
+ });
+ erx.recv().ok_or(Error::Socket("internal receiver error".to_string()))
+}
+
+
+#[cfg(test)]
+mod tests {
+ use chan;
+ use crossbeam;
+ use rustc_serialize::json;
+ use std::{fs, thread};
+ use std::io::{Read, Write};
+ use std::net::Shutdown;
+ use std::time::Duration;
+
+ use datatype::{Command, DownloadComplete, Event};
+ use gateway::{Gateway, Interpret};
+ use super::*;
+ use unix_socket::{UnixListener, UnixStream};
+
+
+ #[test]
+ fn socket_commands_and_events() {
+ let (etx, erx) = chan::sync::<Event>(0);
+ let (itx, irx) = chan::sync::<Interpret>(0);
+
+ thread::spawn(move || Socket {
+ commands_path: "/tmp/sota-commands.socket".to_string(),
+ events_path: "/tmp/sota-events.socket".to_string(),
+ }.start(itx, erx));
+ thread::sleep(Duration::from_millis(100)); // wait until socket gateway is created
+
+ let path = "/tmp/sota-events.socket";
+ let _ = fs::remove_file(&path);
+ let server = UnixListener::bind(&path).expect("couldn't create events socket for testing");
+
+ let send = DownloadComplete {
+ update_id: "1".to_string(),
+ update_image: "/foo/bar".to_string(),
+ signature: "abc".to_string()
+ };
+ etx.send(Event::DownloadComplete(send.clone()));
+
+ let (mut stream, _) = server.accept().expect("couldn't read from events socket");
+ let mut text = String::new();
+ stream.read_to_string(&mut text).unwrap();
+ let receive: DownloadComplete = json::decode(&text).expect("couldn't decode DownloadComplete message");
+ assert_eq!(send, receive);
+
+ thread::spawn(move || {
+ let _ = etx; // move into this scope
+ loop {
+ let interpret = irx.recv().expect("gtx is closed");
+ match interpret.command {
+ Command::StartDownload(ids) => {
+ let tx = interpret.response_tx.unwrap();
+ tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ }
+ _ => panic!("expected AcceptUpdates"),
+ }
+ }
+ });
+
+ crossbeam::scope(|scope| {
+ for id in 0..10 {
+ scope.spawn(move || {
+ let mut stream = UnixStream::connect("/tmp/sota-commands.socket").expect("couldn't connect to socket");
+ let _ = stream.write_all(&format!("dl {}", id).into_bytes()).expect("couldn't write to stream");
+ stream.shutdown(Shutdown::Write).expect("couldn't shut down writing");
+
+ let mut resp = String::new();
+ stream.read_to_string(&mut resp).expect("couldn't read from stream");
+ let ev: Event = json::decode(&resp).expect("couldn't decode json event");
+ assert_eq!(ev, Event::FoundSystemInfo(format!("{}", id)));
+ });
+ }
+ });
+ }
+}
diff --git a/src/gateway/websocket.rs b/src/gateway/websocket.rs
new file mode 100644
index 0000000..eb5e040
--- /dev/null
+++ b/src/gateway/websocket.rs
@@ -0,0 +1,169 @@
+use chan;
+use chan::Sender;
+use rustc_serialize::json;
+use std::thread;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use ws;
+use ws::{listen, CloseCode, Handler, Handshake, Message, Sender as WsSender};
+use ws::util::Token;
+
+use datatype::{Command, Error, Event};
+use super::gateway::{Gateway, Interpret};
+
+
+/// The `Websocket` gateway allows connected clients to listen to `Event`s that
+/// happen in the SOTA client.
+pub struct Websocket {
+ pub server: String,
+ pub clients: Arc<Mutex<HashMap<Token, WsSender>>>
+}
+
+impl Gateway for Websocket {
+ fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
+ let clients = self.clients.clone();
+ let addr = self.server.clone();
+ info!("Opening websocket listener at {}", addr);
+
+ thread::spawn(move || {
+ listen(&addr as &str, |out| {
+ WebsocketHandler {
+ out: out,
+ itx: itx.clone(),
+ clients: clients.clone()
+ }
+ }).expect("couldn't start websocket listener");
+ });
+
+ thread::sleep(Duration::from_secs(1)); // FIXME: ugly hack for blocking listen call
+ Ok(info!("Websocket gateway started."))
+ }
+
+ fn pulse(&self, event: Event) {
+ let json = encode(event);
+ for (_, out) in self.clients.lock().unwrap().iter() {
+ let _ = out.send(Message::Text(json.clone()));
+ }
+ }
+}
+
+
+pub struct WebsocketHandler {
+ out: WsSender,
+ itx: Sender<Interpret>,
+ clients: Arc<Mutex<HashMap<Token, WsSender>>>
+}
+
+impl Handler for WebsocketHandler {
+ fn on_message(&mut self, msg: Message) -> ws::Result<()> {
+ debug!("received websocket message: {:?}", msg);
+ msg.as_text().or_else(|err| {
+ error!("websocket on_message text error: {}", err);
+ Err(err)
+ }).and_then(|msg| match decode(msg) {
+ Ok(cmd) => Ok(self.forward_command(cmd)),
+
+ Err(Error::Websocket(err)) => {
+ error!("websocket on_message error: {}", err);
+ Err(err)
+ }
+
+ Err(_) => unreachable!()
+ })
+ }
+
+ fn on_open(&mut self, _: Handshake) -> ws::Result<()> {
+ let _ = self.clients.lock().unwrap().insert(self.out.token(), self.out.clone());
+ Ok(debug!("new websocket client: {:?}", self.out.token()))
+ }
+
+ fn on_close(&mut self, code: CloseCode, _: &str) {
+ let _ = self.clients.lock().unwrap().remove(&self.out.token());
+ debug!("closing websocket client {:?}: {:?}", self.out.token(), code);
+ }
+
+ fn on_error(&mut self, err: ws::Error) {
+ error!("websocket error: {:?}", err);
+ }
+}
+
+impl WebsocketHandler {
+ fn forward_command(&self, cmd: Command) {
+ let (etx, erx) = chan::sync::<Event>(0);
+ let etx = Arc::new(Mutex::new(etx.clone()));
+ self.itx.send(Interpret { command: cmd, response_tx: Some(etx) });
+
+ let e = erx.recv().expect("websocket response_tx is closed");
+ let _ = self.out.send(Message::Text(encode(e)));
+ }
+}
+
+fn encode(event: Event) -> String {
+ json::encode(&event).expect("Error encoding event into JSON")
+}
+
+fn decode(s: &str) -> Result<Command, Error> {
+ Ok(try!(json::decode::<Command>(s)))
+}
+
+
+#[cfg(test)]
+mod tests {
+ use chan;
+ use crossbeam;
+ use rustc_serialize::json;
+ use std::thread;
+ use std::collections::HashMap;
+ use std::sync::{Arc, Mutex};
+ use ws;
+ use ws::{connect, CloseCode};
+
+ use datatype::{Command, Event};
+ use gateway::{Gateway, Interpret};
+ use super::*;
+
+
+ #[test]
+ fn websocket_connections() {
+ let (etx, erx) = chan::sync::<Event>(0);
+ let (itx, irx) = chan::sync::<Interpret>(0);
+
+ thread::spawn(move || {
+ Websocket {
+ server: "localhost:3012".to_string(),
+ clients: Arc::new(Mutex::new(HashMap::new()))
+ }.start(itx, erx);
+ });
+ thread::spawn(move || {
+ let _ = etx; // move into this scope
+ loop {
+ let interpret = irx.recv().expect("gtx is closed");
+ match interpret.command {
+ Command::StartDownload(ids) => {
+ let tx = interpret.response_tx.unwrap();
+ tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ }
+ _ => panic!("expected AcceptUpdates"),
+ }
+ }
+ });
+
+ crossbeam::scope(|scope| {
+ for id in 0..10 {
+ scope.spawn(move || {
+ connect("ws://localhost:3012", |out| {
+ out.send(format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id))
+ .expect("couldn't write to websocket");
+
+ move |msg: ws::Message| {
+ let ev: Event = json::decode(&format!("{}", msg)).unwrap();
+ assert_eq!(ev, Event::FoundSystemInfo(format!("{}", id)));
+ out.close(CloseCode::Normal)
+ }
+ }).expect("couldn't connect to websocket");
+ });
+ }
+ });
+ }
+}
diff --git a/src/genivi/dbus.rs b/src/genivi/dbus.rs
deleted file mode 100644
index 7b89590..0000000
--- a/src/genivi/dbus.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-use dbus::{FromMessageItem, MessageItem};
-use toml::{decode, Table, Value};
-
-/// DBus error string to indicate a missing argument.
-static MISSING_ARG: &'static str = "Error.MissingArgument";
-/// DBus error string to indicate a malformed argument.
-static MALFORMED_ARG: &'static str = "Error.MalformedArgument";
-
-/// Format a DBus error message indicating a missing argument.
-pub fn missing_arg() -> (&'static str, String) {
- (MISSING_ARG, "Missing argument".to_string())
-}
-
-/// Format a DBus error message indicating a malformed argument.
-pub fn malformed_arg() -> (&'static str, String) {
- (MALFORMED_ARG, "Malformed argument".to_string())
-}
-
-
-struct DecodableValue(Value);
-
-impl<'a> FromMessageItem<'a> for DecodableValue {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- match m {
- &MessageItem::Str(ref b) => Ok(DecodableValue(Value::String(b.clone()))),
- &MessageItem::Bool(ref b) => Ok(DecodableValue(Value::Boolean(*b))),
- &MessageItem::Byte(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::Int16(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::Int32(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::Int64(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::UInt16(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::UInt32(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::UInt64(ref b) => Ok(DecodableValue(Value::Integer(*b as i64))),
- &MessageItem::Variant(ref b) => FromMessageItem::from(&**b),
- _ => Err(())
- }
- }
-}
-
-pub struct DecodableStruct(pub Value);
-
-impl<'a> FromMessageItem<'a> for DecodableStruct {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m));
- arr.iter()
- .map(|entry| {
- let v: Result<(&MessageItem, &MessageItem), ()> = FromMessageItem::from(entry);
- v.and_then(|(k, v)| {
- let k: Result<&String,()> = FromMessageItem::from(k);
- k.and_then(|k| {
- let v: Result<DecodableValue,()> = FromMessageItem::from(v);
- v.map(|v| (k.clone(), v.0)) }) }) })
- .collect::<Result<Vec<(_, _)>, ()>>()
- .map(|arr| DecodableStruct(Value::Table(arr.into_iter().collect::<Table>())))
- }
-}
-
-
-use event::outbound::{OperationResult, OperationResults};
-
-impl<'a> FromMessageItem<'a> for OperationResult {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let m: DecodableStruct = try!(FromMessageItem::from(m));
- decode::<OperationResult>(m.0).ok_or(())
- }
-}
-
-impl<'a> FromMessageItem<'a> for OperationResults {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m));
- arr.into_iter()
- .map(|i| {
- let i: Result<OperationResult, ()> = FromMessageItem::from(i);
- i })
- .collect::<Result<Vec<_>, ()>>()
- .map(|a| OperationResults(a))
- }
-}
-
-use event::outbound::{InstalledPackage, InstalledPackages};
-
-impl<'a> FromMessageItem<'a> for InstalledPackage {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let m: DecodableStruct = try!(FromMessageItem::from(m));
- decode::<InstalledPackage>(m.0).ok_or(())
- }
-}
-
-impl<'a> FromMessageItem<'a> for InstalledPackages {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m));
- arr.into_iter()
- .map(|i| {
- let i: Result<InstalledPackage, ()> = FromMessageItem::from(i);
- i })
- .collect::<Result<Vec<_>, ()>>()
- .map(|a| InstalledPackages(a))
- }
-}
-
-use event::outbound::{InstalledFirmware, InstalledFirmwares};
-
-impl<'a> FromMessageItem<'a> for InstalledFirmware {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let m: DecodableStruct = try!(FromMessageItem::from(m));
- decode::<InstalledFirmware>(m.0).ok_or(())
- }
-}
-
-impl<'a> FromMessageItem<'a> for InstalledFirmwares {
- fn from(m: &'a MessageItem) -> Result<Self, ()> {
- let arr: &Vec<MessageItem> = try!(FromMessageItem::from(m));
- arr.into_iter()
- .map(|i| {
- let i: Result<InstalledFirmware, ()> = FromMessageItem::from(i);
- i })
- .collect::<Result<Vec<_>, ()>>()
- .map(|a| InstalledFirmwares(a))
- }
-}
-
diff --git a/src/genivi/mod.rs b/src/genivi/mod.rs
deleted file mode 100644
index b7ae755..0000000
--- a/src/genivi/mod.rs
+++ /dev/null
@@ -1,4 +0,0 @@
-pub mod dbus;
-pub mod swm;
-pub mod sc;
-pub mod start;
diff --git a/src/genivi/sc.rs b/src/genivi/sc.rs
deleted file mode 100644
index 7c86edc..0000000
--- a/src/genivi/sc.rs
+++ /dev/null
@@ -1,126 +0,0 @@
-//! Receiving side of the DBus interface.
-
-use std::sync::mpsc::Sender;
-
-use dbus::{Connection, NameFlag, BusType, ConnectionItem, Message, FromMessageItem};
-use dbus::obj::*;
-
-use configuration::DBusConfiguration;
-use event::Event;
-use event::outbound::{OutBoundEvent, OperationResults, UpdateReport};
-use genivi::dbus::*;
-
-
-/// Encodes the state that is needed to accept incoming DBus messages.
-pub struct Receiver {
- /// The configuration for the DBus interface.
- config: DBusConfiguration,
- /// A sender to forward incoming messages.
- sender: Sender<Event>
-}
-
-impl Receiver {
- /// Create a new `Receiver`.
- ///
- /// # Arguments
- /// * `c`: The configuration for the DBus interface.
- /// * `s`: A sender to forward incoming messages.
- pub fn new(c: DBusConfiguration, s: Sender<Event>) -> Receiver {
- Receiver {
- config: c,
- sender: s
- }
- }
-
- /// Start the listener. It will register in DBus according to the configuration, wait for
- /// incoming messages and forward them via the internal `Sender`.
- pub fn start(&self) {
- let conn = Connection::get_private(BusType::Session).unwrap();
- conn.register_name(&self.config.name, NameFlag::ReplaceExisting as u32).unwrap();
-
- let initiate_download = Method::new(
- "initiateDownload",
- vec!(Argument::new("update_id", "s")),
- vec!(),
- Box::new(|msg| self.handle_initiate_download(msg)));
- let abort_download = Method::new(
- "abortDownload",
- vec!(Argument::new("update_id", "s")),
- vec!(),
- Box::new(|msg| self.handle_abort_download(msg)));
- let update_report = Method::new(
- "updateReport",
- vec!(Argument::new("update_id", "s"), Argument::new("operations_results", "aa{sv}")),
- vec!(),
- Box::new(|msg| self.handle_update_report(msg)));
- let interface = Interface::new(vec!(initiate_download, abort_download, update_report), vec!(), vec!());
-
- let mut object_path = ObjectPath::new(&conn, &self.config.path, true);
- object_path.insert_interface(&self.config.interface, interface);
- object_path.set_registered(true).unwrap();
-
- for n in conn.iter(1000) {
- match n {
- ConnectionItem::MethodCall(mut m) => {
- object_path.handle_message(&mut m);
- },
- _ => {}
- }
- }
- }
-
- /// Handles incoming "Initiate Download" messages.
- ///
- /// Parses the message and forwards it to the internal `Sender`.
- ///
- /// # Arguments
- /// * `msg`: The message to handle.
- fn handle_initiate_download(&self, msg: &mut Message) -> MethodResult {
- let sender = try!(get_sender(msg).ok_or(missing_arg()));
- trace!("sender: {:?}", sender);
- trace!("msg: {:?}", msg);
-
- let mut args = msg.get_items().into_iter();
- let arg = try!(args.next().ok_or(missing_arg()));
- let update_id: &String = try!(FromMessageItem::from(&arg).or(Err(malformed_arg())));
- let _ = self.sender.send(
- Event::OutBound(OutBoundEvent::InitiateDownload(update_id.clone())));
-
- Ok(vec!())
- }
-
- fn handle_abort_download(&self, msg: &mut Message) -> MethodResult {
- let sender = try!(get_sender(msg).ok_or(missing_arg()));
- trace!("sender: {:?}", sender);
- trace!("msg: {:?}", msg);
-
- let mut args = msg.get_items().into_iter();
- let arg = try!(args.next().ok_or(missing_arg()));
- let update_id: &String = try!(FromMessageItem::from(&arg).or(Err(malformed_arg())));
- let _ = self.sender.send(
- Event::OutBound(OutBoundEvent::AbortDownload(update_id.clone())));
-
- Ok(vec!())
- }
-
- fn handle_update_report(&self, msg: &mut Message) -> MethodResult {
- let sender = try!(get_sender(msg).ok_or(missing_arg()));
- trace!("sender: {:?}", sender);
- trace!("msg: {:?}", msg);
-
- let mut args = msg.get_items().into_iter();
- let arg = try!(args.next().ok_or(missing_arg()));
- let update_id: &String = try!(FromMessageItem::from(&arg).or(Err(malformed_arg())));
-
- let arg = try!(args.next().ok_or(missing_arg()));
- let operation_results: OperationResults = try!(FromMessageItem::from(&arg).or(Err(malformed_arg())));
-
- let report = UpdateReport::new(update_id.clone(), operation_results);
- let _ = self.sender.send(
- Event::OutBound(OutBoundEvent::UpdateReport(report)));
-
- Ok(vec!())
- }
-}
-
-fn get_sender(msg: &Message) -> Option<String> { msg.sender() }
diff --git a/src/genivi/start.rs b/src/genivi/start.rs
deleted file mode 100644
index 55bef27..0000000
--- a/src/genivi/start.rs
+++ /dev/null
@@ -1,72 +0,0 @@
-//! Main loop, starting the worker threads and wiring up communication channels between them.
-
-use std::sync::{Arc, Mutex};
-use std::sync::mpsc::{channel, Receiver};
-use std::thread;
-
-use configuration::Configuration;
-use configuration::DBusConfiguration;
-use event::Event;
-use event::inbound::InboundEvent;
-use event::outbound::OutBoundEvent;
-use remote::svc::{RemoteServices, ServiceHandler};
-use remote::rvi;
-
-pub fn handle(cfg: &DBusConfiguration, rx: Receiver<Event>, remote_svcs: Arc<Mutex<RemoteServices>>) {
- loop {
- match rx.recv().unwrap() {
- Event::Inbound(i) => match i {
- InboundEvent::UpdateAvailable(e) => {
- info!("UpdateAvailable");
- super::swm::send_update_available(&cfg, e);
- },
- InboundEvent::DownloadComplete(e) => {
- info!("DownloadComplete");
- super::swm::send_download_complete(&cfg, e);
- },
- InboundEvent::GetInstalledSoftware(e) => {
- info!("GetInstalledSoftware");
- let _ = super::swm::send_get_installed_software(&cfg, e)
- .and_then(|e| {
- remote_svcs.lock().unwrap().send_installed_software(e)
- .map_err(|e| error!("{}", e)) });
- }
- },
- Event::OutBound(o) => match o {
- OutBoundEvent::InitiateDownload(e) => {
- info!("InitiateDownload");
- let _ = remote_svcs.lock().unwrap().send_start_download(e);
- },
- OutBoundEvent::AbortDownload(_) => info!("AbortDownload"),
- OutBoundEvent::UpdateReport(e) => {
- info!("UpdateReport");
- let _ = remote_svcs.lock().unwrap().send_update_report(e);
- }
- }
- }
- }
-}
-
-/// Main loop, starting the worker threads and wiring up communication channels between them.
-///
-/// # Arguments
-/// * `conf`: A pointer to a `Configuration` object see the [documentation of the configuration
-/// crate](../configuration/index.html).
-/// * `rvi_url`: The URL, where RVI can be found, with the protocol.
-/// * `edge_url`: The `host:port` combination where the client should bind and listen for incoming
-/// RVI calls.
-pub fn start(conf: &Configuration, rvi_url: String, edge_url: String) {
- // Main message channel from RVI and DBUS
- let (tx, rx) = channel();
-
- // RVI edge handler
- let remote_svcs = Arc::new(Mutex::new(RemoteServices::new(rvi_url.clone())));
- let handler = ServiceHandler::new(tx.clone(), remote_svcs.clone(), conf.client.clone());
- let rvi_edge = rvi::ServiceEdge::new(rvi_url.clone(), edge_url, handler);
- rvi_edge.start();
-
- // DBUS handler
- let dbus_receiver = super::sc::Receiver::new(conf.dbus.clone(), tx);
- thread::spawn(move || dbus_receiver.start());
- handle(&conf.dbus, rx, remote_svcs);
-}
diff --git a/src/genivi/swm.rs b/src/genivi/swm.rs
deleted file mode 100644
index e913e1e..0000000
--- a/src/genivi/swm.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-//! Sending side of the DBus interface.
-
-use std::convert::From;
-
-use dbus::{Connection, BusType, MessageItem, Message, FromMessageItem};
-
-use configuration::DBusConfiguration;
-use event::inbound::{UpdateAvailable, DownloadComplete, GetInstalledSoftware};
-use event::outbound::{InstalledFirmwares, InstalledPackages, InstalledSoftware};
-
-pub fn send_update_available(config: &DBusConfiguration, e: UpdateAvailable) {
- let args = [
- MessageItem::from(e.update_id),
- MessageItem::from(e.signature),
- MessageItem::from(e.description),
- MessageItem::from(e.request_confirmation)];
- let mut message = Message::new_method_call(
- &config.software_manager, &config.software_manager_path,
- &config.software_manager, "updateAvailable").unwrap();
- message.append_items(&args);
-
- let conn = Connection::get_private(BusType::Session).unwrap();
- let _ = conn.send(message)
- .map_err(|_| error!("Couldn't forward message to D-Bus"));
-}
-
-pub fn send_download_complete(config: &DBusConfiguration, e: DownloadComplete) {
- let args = [
- MessageItem::from(e.update_image),
- MessageItem::from(e.signature)];
- let mut message = Message::new_method_call(
- &config.software_manager, &config.software_manager_path,
- &config.software_manager, "downloadComplete").unwrap();
- message.append_items(&args);
-
- let conn = Connection::get_private(BusType::Session).unwrap();
- let _ = conn.send(message)
- .map_err(|_| error!("Couldn't forward message to D-Bus"));
-}
-
-pub fn send_get_installed_software(config: &DBusConfiguration, e: GetInstalledSoftware)
- -> Result<InstalledSoftware, ()> {
- let args = [
- MessageItem::from(e.include_packages),
- MessageItem::from(e.include_module_firmware)];
- let mut message = Message::new_method_call(
- &config.software_manager, &config.software_manager_path,
- &config.software_manager, "getInstalledPackages").unwrap();
- message.append_items(&args);
-
- let conn = Connection::get_private(BusType::Session).unwrap();
- let msg = conn.send_with_reply_and_block(message, config.timeout).unwrap();
-
- let mut args = msg.get_items().into_iter();
- let arg = try!(args.next().ok_or(()));
- let installed_packages: InstalledPackages = try!(FromMessageItem::from(&arg));
-
- let arg = try!(args.next().ok_or(()));
- let installed_firmware: InstalledFirmwares = try!(FromMessageItem::from(&arg));
-
- Ok(InstalledSoftware::new(installed_packages, installed_firmware))
-}
-
diff --git a/src/http/auth_client.rs b/src/http/auth_client.rs
new file mode 100644
index 0000000..f4ad38b
--- /dev/null
+++ b/src/http/auth_client.rs
@@ -0,0 +1,278 @@
+use chan::Sender;
+use hyper;
+use hyper::{Encoder, Decoder, Next};
+use hyper::client::{Client as HyperClient, Handler, HttpsConnector,
+ Request as HyperRequest, Response as HyperResponse};
+use hyper::header::{Authorization, Basic, Bearer, ContentLength, ContentType, Location};
+use hyper::mime::{Attr, Mime, TopLevel, SubLevel, Value};
+use hyper::net::{HttpStream, HttpsStream, OpensslStream};
+use hyper::status::StatusCode;
+use std::{io, mem};
+use std::io::{ErrorKind, Write};
+use std::str;
+use std::time::Duration;
+use time;
+
+use datatype::{Auth, Error};
+use http::{Client, get_openssl, Request, Response};
+
+
+/// The `AuthClient` will attach an `Authentication` header to each outgoing
+/// HTTP request.
+#[derive(Clone)]
+pub struct AuthClient {
+ auth: Auth,
+ client: HyperClient<AuthHandler>,
+}
+
+impl Default for AuthClient {
+ fn default() -> Self {
+ Self::from(Auth::None)
+ }
+}
+
+impl AuthClient {
+ /// Instantiates a new client ready to make requests for the given `Auth` type.
+ pub fn from(auth: Auth) -> Self {
+ let client = HyperClient::<AuthHandler>::configure()
+ .keep_alive(true)
+ .max_sockets(1024)
+ .connector(HttpsConnector::new(get_openssl()))
+ .build()
+ .expect("unable to create a new hyper Client");
+
+ AuthClient {
+ auth: auth,
+ client: client,
+ }
+ }
+}
+
+impl Client for AuthClient {
+ fn chan_request(&self, req: Request, resp_tx: Sender<Response>) {
+ info!("{} {}", req.method, req.url);
+ let _ = self.client.request(req.url.inner(), AuthHandler {
+ auth: self.auth.clone(),
+ req: req,
+ timeout: Duration::from_secs(20),
+ started: None,
+ written: 0,
+ response: Vec::new(),
+ resp_tx: resp_tx.clone(),
+ }).map_err(|err| resp_tx.send(Err(Error::from(err))));
+ }
+}
+
+
+/// The async handler for outgoing HTTP requests.
+// FIXME: uncomment when yocto is at 1.8.0: #[derive(Debug)]
+pub struct AuthHandler {
+ auth: Auth,
+ req: Request,
+ timeout: Duration,
+ started: Option<u64>,
+ written: usize,
+ response: Vec<u8>,
+ resp_tx: Sender<Response>,
+}
+
+// FIXME: required for building on 1.7.0 only
+impl ::std::fmt::Debug for AuthHandler {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "unimplemented")
+ }
+}
+
+impl AuthHandler {
+ fn redirect_request(&mut self, resp: HyperResponse) {
+ match resp.headers().get::<Location>() {
+ Some(&Location(ref loc)) => self.req.url.join(loc).map(|url| {
+ debug!("redirecting to {}", url);
+ // drop Authentication Header on redirect
+ let client = AuthClient::default();
+ let resp_rx = client.send_request(Request {
+ url: url,
+ method: self.req.method.clone(),
+ body: mem::replace(&mut self.req.body, None),
+ });
+ match resp_rx.recv().expect("no redirect_request response") {
+ Ok(data) => self.resp_tx.send(Ok(data)),
+ Err(err) => self.resp_tx.send(Err(Error::from(err)))
+ }
+ }).unwrap_or_else(|err| self.resp_tx.send(Err(Error::from(err)))),
+
+ None => self.resp_tx.send(Err(Error::Client("redirect missing Location header".to_string())))
+ }
+ }
+}
+
+/// The `AuthClient` may be used for both HTTP and HTTPS connections.
+pub type Stream = HttpsStream<OpensslStream<HttpStream>>;
+
+impl Handler<Stream> for AuthHandler {
+ fn on_request(&mut self, req: &mut HyperRequest) -> Next {
+ req.set_method(self.req.method.clone().into());
+ self.started = Some(time::precise_time_ns());
+ let mut headers = req.headers_mut();
+
+ // empty Charset to keep RVI happy
+ let mime_json = Mime(TopLevel::Application, SubLevel::Json, vec![]);
+ let mime_form = Mime(TopLevel::Application, SubLevel::WwwFormUrlEncoded,
+ vec![(Attr::Charset, Value::Utf8)]);
+
+ match self.auth {
+ Auth::None => {
+ headers.set(ContentType(mime_json));
+ }
+
+ Auth::Credentials(_, _) if self.req.body.is_some() => {
+ panic!("no request body expected for Auth::Credentials");
+ }
+
+ Auth::Credentials(ref id, ref secret) => {
+ headers.set(Authorization(Basic { username: id.0.clone(),
+ password: Some(secret.0.clone()) }));
+ headers.set(ContentType(mime_form));
+ self.req.body = Some(br#"grant_type=client_credentials"#.to_vec());
+ }
+
+ Auth::Token(ref token) => {
+ headers.set(Authorization(Bearer { token: token.access_token.clone() }));
+ headers.set(ContentType(mime_json));
+ }
+ };
+
+ self.req.body.as_ref().map_or(Next::read().timeout(self.timeout), |body| {
+ headers.set(ContentLength(body.len() as u64));
+ Next::write()
+ })
+ }
+
+ fn on_request_writable(&mut self, encoder: &mut Encoder<Stream>) -> Next {
+ let body = self.req.body.as_ref().expect("on_request_writable expects a body");
+
+ match encoder.write(&body[self.written..]) {
+ Ok(0) => {
+ info!("Request length: {} bytes", body.len());
+ if let Ok(body) = str::from_utf8(body) {
+ debug!("body:\n{}", body);
+ }
+ Next::read().timeout(self.timeout)
+ },
+
+ Ok(n) => {
+ self.written += n;
+ trace!("{} bytes written to request body", n);
+ Next::write()
+ }
+
+ Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
+ trace!("retry on_request_writable");
+ Next::write()
+ }
+
+ Err(err) => {
+ error!("unable to write request body: {}", err);
+ self.resp_tx.send(Err(Error::from(err)));
+ Next::remove()
+ }
+ }
+ }
+
+ fn on_response(&mut self, resp: HyperResponse) -> Next {
+ info!("Response status: {}", resp.status());
+ debug!("on_response headers:\n{}", resp.headers());
+ let started = self.started.expect("expected start time");
+ let latency = time::precise_time_ns() as f64 - started as f64;
+ debug!("on_response latency: {}ms", (latency / 1e6) as u32);
+
+ if resp.status().is_success() {
+ if let Some(len) = resp.headers().get::<ContentLength>() {
+ if **len > 0 {
+ return Next::read();
+ }
+ }
+ self.resp_tx.send(Ok(Vec::new()));
+ Next::end()
+ } else if resp.status().is_redirection() {
+ self.redirect_request(resp);
+ Next::end()
+ } else if resp.status() == &StatusCode::Forbidden {
+ self.resp_tx.send(Err(Error::Authorization(format!("{}", resp.status()))));
+ Next::end()
+ } else {
+ self.resp_tx.send(Err(Error::Client(format!("{}", resp.status()))));
+ Next::end()
+ }
+ }
+
+ fn on_response_readable(&mut self, decoder: &mut Decoder<Stream>) -> Next {
+ match io::copy(decoder, &mut self.response) {
+ Ok(0) => {
+ debug!("on_response_readable bytes read: {}", self.response.len());
+ self.resp_tx.send(Ok(mem::replace(&mut self.response, Vec::new())));
+ Next::end()
+ }
+
+ Ok(n) => {
+ trace!("{} more response bytes read", n);
+ Next::read()
+ }
+
+ Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
+ trace!("retry on_response_readable");
+ Next::read()
+ }
+
+ Err(err) => {
+ error!("unable to read response body: {}", err);
+ self.resp_tx.send(Err(Error::from(err)));
+ Next::end()
+ }
+ }
+ }
+
+ fn on_error(&mut self, err: hyper::Error) -> Next {
+ error!("on_error: {}", err);
+ self.resp_tx.send(Err(Error::from(err)));
+ Next::remove()
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use rustc_serialize::json::Json;
+ use std::path::Path;
+
+ use super::*;
+ use http::{Client, set_ca_certificates};
+
+
+ fn get_client() -> AuthClient {
+ set_ca_certificates(&Path::new("run/sota_certificates"));
+ AuthClient::default()
+ }
+
+ #[test]
+ fn test_send_get_request() {
+ let client = get_client();
+ let url = "http://eu.httpbin.org/bytes/16?seed=123".parse().unwrap();
+ let resp_rx = client.get(url, None);
+ let data = resp_rx.recv().unwrap().unwrap();
+ assert_eq!(data, vec![13, 22, 104, 27, 230, 9, 137, 85, 218, 40, 86, 85, 62, 0, 111, 22]);
+ }
+
+ #[test]
+ fn test_send_post_request() {
+ let client = get_client();
+ let url = "https://eu.httpbin.org/post".parse().unwrap();
+ let resp_rx = client.post(url, Some(br#"foo"#.to_vec()));
+ let body = resp_rx.recv().unwrap().unwrap();
+ let resp = String::from_utf8(body).unwrap();
+ let json = Json::from_str(&resp).unwrap();
+ let obj = json.as_object().unwrap();
+ let data = obj.get("data").unwrap().as_string().unwrap();
+ assert_eq!(data, "foo");
+ }
+}
diff --git a/src/http/http_client.rs b/src/http/http_client.rs
new file mode 100644
index 0000000..492166c
--- /dev/null
+++ b/src/http/http_client.rs
@@ -0,0 +1,43 @@
+use chan;
+use chan::{Sender, Receiver};
+
+use datatype::{Error, Method, Url};
+
+
+/// Abstracts a particular HTTP Client implementation with the basic methods
+/// for sending `Request`s and receiving asynchronous `Response`s via a channel.
+pub trait Client {
+ fn chan_request(&self, req: Request, resp_tx: Sender<Response>);
+
+ fn send_request(&self, req: Request) -> Receiver<Response> {
+ let (resp_tx, resp_rx) = chan::async::<Response>();
+ self.chan_request(req, resp_tx);
+ resp_rx
+ }
+
+ fn get(&self, url: Url, body: Option<Vec<u8>>) -> Receiver<Response> {
+ self.send_request(Request { method: Method::Get, url: url, body: body })
+ }
+
+ fn post(&self, url: Url, body: Option<Vec<u8>>) -> Receiver<Response> {
+ self.send_request(Request { method: Method::Post, url: url, body: body })
+ }
+
+ fn put(&self, url: Url, body: Option<Vec<u8>>) -> Receiver<Response> {
+ self.send_request(Request { method: Method::Put, url: url, body: body })
+ }
+
+ fn is_testing(&self) -> bool { false }
+}
+
+
+/// A simplified representation of an HTTP request for use in the client.
+#[derive(Debug)]
+pub struct Request {
+ pub method: Method,
+ pub url: Url,
+ pub body: Option<Vec<u8>>
+}
+
+/// Return the body of an HTTP response on success, or an `Error` otherwise.
+pub type Response = Result<Vec<u8>, Error>;
diff --git a/src/http/http_server.rs b/src/http/http_server.rs
new file mode 100644
index 0000000..2ecd7a2
--- /dev/null
+++ b/src/http/http_server.rs
@@ -0,0 +1,113 @@
+use hyper::{Decoder, Encoder, Next, StatusCode};
+use hyper::header::{ContentLength, ContentType};
+use hyper::mime::{Attr, Mime, TopLevel, SubLevel, Value};
+use hyper::net::Transport;
+use hyper::server::{Handler, Request as HyperRequest, Response as HyperResponse};
+use std::{mem, io};
+use std::io::{ErrorKind, Write};
+use std::time::Duration;
+
+
+/// An HTTP server handles the incoming headers and request body as well as the
+/// setting the response status and body. Other concerns regarding the asynchronous
+/// event loop handlers for writing to buffers are abstracted away.
+pub trait Server<T: Transport>: Send {
+ fn headers(&mut self, req: HyperRequest<T>);
+ fn request(&mut self, body: Vec<u8>);
+ fn response(&mut self) -> (StatusCode, Option<Vec<u8>>);
+}
+
+
+/// This implements the `hyper::server::Handler` trait so that it can be used
+/// to handle incoming HTTP connections with `hyper::server::Server`.
+pub struct ServerHandler<T: Transport> {
+ server: Box<Server<T>>,
+ req_body: Vec<u8>,
+ resp_body: Vec<u8>,
+ written: usize
+}
+
+impl<T: Transport> ServerHandler<T> {
+ /// Instantiate a new `ServerHandler` by passing a `Box<Server<T>` reference.
+ pub fn new(server: Box<Server<T>>) -> Self {
+ ServerHandler {
+ server: server,
+ req_body: Vec::new(),
+ resp_body: Vec::new(),
+ written: 0
+ }
+ }
+}
+
+impl<T: Transport> Handler<T> for ServerHandler<T> {
+ fn on_request(&mut self, req: HyperRequest<T>) -> Next {
+ info!("on_request: {} {}", req.method(), req.uri());
+ self.server.headers(req);
+ Next::read()
+ }
+
+ fn on_request_readable(&mut self, transport: &mut Decoder<T>) -> Next {
+ match io::copy(transport, &mut self.req_body) {
+ Ok(0) => {
+ debug!("on_request_readable bytes read: {}", self.req_body.len());
+ self.server.request(mem::replace(&mut self.req_body, Vec::new()));
+ Next::write().timeout(Duration::from_secs(20))
+ }
+
+ Ok(n) => {
+ trace!("{} more request bytes read", n);
+ Next::read()
+ }
+
+ Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
+ trace!("retry on_request_readable");
+ Next::read()
+ }
+
+ Err(err) => {
+ error!("unable to read request body: {}", err);
+ Next::remove()
+ }
+ }
+ }
+
+ fn on_response(&mut self, resp: &mut HyperResponse) -> Next {
+ let (status, body) = self.server.response();
+ resp.set_status(status);
+ info!("on_response: status {}", resp.status());
+
+ let mut headers = resp.headers_mut();
+ headers.set(ContentType(Mime(TopLevel::Application, SubLevel::Json,
+ vec![(Attr::Charset, Value::Utf8)])));
+ body.map_or_else(Next::end, |body| {
+ headers.set(ContentLength(body.len() as u64));
+ self.resp_body = body;
+ Next::write()
+ })
+ }
+
+ fn on_response_writable(&mut self, transport: &mut Encoder<T>) -> Next {
+ match transport.write(&self.resp_body[self.written..]) {
+ Ok(0) => {
+ debug!("{} bytes written to response body", self.written);
+ Next::end()
+ }
+
+ Ok(n) => {
+ self.written += n;
+ trace!("{} bytes written to response body", n);
+ Next::write()
+ }
+
+ Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
+ trace!("retry on_response_writable");
+ Next::write()
+ }
+
+ Err(err) => {
+ error!("unable to write response body: {}", err);
+ Next::remove()
+ }
+ }
+ }
+}
diff --git a/src/http/mod.rs b/src/http/mod.rs
new file mode 100644
index 0000000..5e990a3
--- /dev/null
+++ b/src/http/mod.rs
@@ -0,0 +1,11 @@
+pub mod auth_client;
+pub mod http_client;
+pub mod http_server;
+pub mod openssl;
+pub mod test_client;
+
+pub use self::auth_client::{AuthClient, AuthHandler};
+pub use self::http_client::{Client, Request, Response};
+pub use self::http_server::{Server, ServerHandler};
+pub use self::openssl::{get_openssl, set_ca_certificates};
+pub use self::test_client::TestClient;
diff --git a/src/http/openssl.rs b/src/http/openssl.rs
new file mode 100644
index 0000000..378b078
--- /dev/null
+++ b/src/http/openssl.rs
@@ -0,0 +1,47 @@
+use hyper::net::Openssl;
+use openssl::ssl::{SSL_OP_NO_SSLV2, SSL_OP_NO_SSLV3};
+use openssl::ssl::{SslContext, SslMethod};
+use std::path::Path;
+use std::sync::{Arc, Mutex};
+
+
+lazy_static! {
+ static ref OPENSSL: Arc<Mutex<Option<Openssl>>> = Arc::new(Mutex::new(None));
+}
+
+// default cipher list taken from the Servo project:
+// https://github.com/servo/servo/blob/master/components/net/connector.rs#L18
+const DEFAULT_CIPHERS: &'static str = concat!(
+ "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:",
+ "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:",
+ "DHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-SHA256:",
+ "ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:",
+ "ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA:",
+ "ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:",
+ "DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:",
+ "ECDHE-ECDSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:",
+ "AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA"
+);
+
+/// This function *must* be called before any call is made to `get_openssl()`
+pub fn set_ca_certificates(path: &Path) {
+ info!("Setting OpenSSL CA certificates path to {:?}", path);
+ let mut openssl = OPENSSL.lock().unwrap();
+ let mut context = SslContext::new(SslMethod::Sslv23).unwrap();
+ context.set_CA_file(path).unwrap_or_else(|err| {
+ panic!("couldn't set CA certificates: {}", err);
+ });
+ context.set_cipher_list(DEFAULT_CIPHERS).unwrap();
+ context.set_options(SSL_OP_NO_SSLV2 | SSL_OP_NO_SSLV3);
+ *openssl = Some(Openssl { context: context });
+}
+
+/// This function will return a clone of `Openssl` where the CA certificates
+/// have been bound with `set_ca_certificates()`.
+pub fn get_openssl() -> Openssl {
+ if let Some(ref openssl) = *OPENSSL.lock().unwrap() {
+ openssl.clone()
+ } else {
+ panic!("CA certificates not set")
+ }
+}
diff --git a/src/http/test_client.rs b/src/http/test_client.rs
new file mode 100644
index 0000000..7857e0f
--- /dev/null
+++ b/src/http/test_client.rs
@@ -0,0 +1,35 @@
+use chan::Sender;
+use std::cell::RefCell;
+
+use datatype::Error;
+use http::{Client, Request, Response};
+
+
+/// The `TestClient` will return HTTP responses from an existing list of strings.
+pub struct TestClient {
+ responses: RefCell<Vec<String>>
+}
+
+impl Default for TestClient {
+ fn default() -> Self {
+ TestClient { responses: RefCell::new(Vec::new()) }
+ }
+}
+
+impl TestClient {
+ /// Create a new `TestClient` that will return these responses.
+ pub fn from(responses: Vec<String>) -> TestClient {
+ TestClient { responses: RefCell::new(responses) }
+ }
+}
+
+impl Client for TestClient {
+ fn chan_request(&self, req: Request, resp_tx: Sender<Response>) {
+ match self.responses.borrow_mut().pop() {
+ Some(body) => resp_tx.send(Ok(body.as_bytes().to_vec())),
+ None => resp_tx.send(Err(Error::Client(req.url.to_string())))
+ }
+ }
+
+ fn is_testing(&self) -> bool { true }
+}
diff --git a/src/interpreter.rs b/src/interpreter.rs
new file mode 100644
index 0000000..b286ba5
--- /dev/null
+++ b/src/interpreter.rs
@@ -0,0 +1,364 @@
+use chan;
+use chan::{Sender, Receiver};
+use std;
+use std::borrow::Cow;
+
+use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config,
+ Error, Event, Package, UpdateRequestId};
+use gateway::Interpret;
+use http::{AuthClient, Client};
+use oauth2::authenticate;
+use package_manager::PackageManager;
+use rvi::Services;
+use sota::Sota;
+
+
+/// An `Interpreter` loops over any incoming values, on receipt of which it
+/// delegates to the `interpret` function which will respond with output values.
+pub trait Interpreter<I, O> {
+ fn interpret(&mut self, input: I, otx: &Sender<O>);
+
+ fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) {
+ loop {
+ self.interpret(irx.recv().expect("interpreter sender closed"), &otx);
+ }
+ }
+}
+
+
+/// The `EventInterpreter` listens for `Event`s and optionally responds with
+/// `Command`s that may be sent to the `CommandInterpreter`.
+pub struct EventInterpreter {
+ pub package_manager: PackageManager
+}
+
+impl Interpreter<Event, Command> for EventInterpreter {
+ fn interpret(&mut self, event: Event, ctx: &Sender<Command>) {
+ info!("Event received: {}", event);
+ match event {
+ Event::NotAuthenticated => {
+ info!("Trying to authenticate again...");
+ ctx.send(Command::Authenticate(None));
+ }
+
+ Event::NewUpdatesReceived(ids) => {
+ ctx.send(Command::StartDownload(ids));
+ }
+
+ Event::DownloadComplete(dl) => {
+ if self.package_manager != PackageManager::Off {
+ ctx.send(Command::StartInstall(dl));
+ }
+ }
+
+ Event::InstallComplete(report) => {
+ ctx.send(Command::SendUpdateReport(report));
+ }
+
+ Event::UpdateReportSent => {
+ if self.package_manager != PackageManager::Off {
+ self.package_manager.installed_packages().map(|packages| {
+ ctx.send(Command::SendInstalledPackages(packages));
+ }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err));
+ }
+ }
+
+ _ => ()
+ }
+ }
+}
+
+
+/// The `CommandInterpreter` wraps each incoming `Command` inside an `Interpret`
+/// type with no response channel for sending to the `GlobalInterpreter`.
+pub struct CommandInterpreter;
+
+impl Interpreter<Command, Interpret> for CommandInterpreter {
+ fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) {
+ info!("Command received: {}", cmd);
+ itx.send(Interpret { command: cmd, response_tx: None });
+ }
+}
+
+
+/// The `GlobalInterpreter` interprets the `Command` inside incoming `Interpret`
+/// messages, broadcasting `Event`s globally and (optionally) sending the final
+/// outcome `Event` to the `Interpret` response channel.
+pub struct GlobalInterpreter<'t> {
+ pub config: Config,
+ pub token: Option<Cow<'t, AccessToken>>,
+ pub http_client: Box<Client>,
+ pub rvi: Option<Services>,
+ pub loopback_tx: Sender<Interpret>,
+}
+
+impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> {
+ fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) {
+ info!("Interpreter started: {}", interpret.command);
+
+ let (multi_tx, multi_rx) = chan::async::<Event>();
+ let outcome = match (self.token.as_ref(), self.config.auth.is_none()) {
+ (Some(_), _) | (_, true) => self.authenticated(interpret.command, multi_tx),
+ _ => self.unauthenticated(interpret.command, multi_tx)
+ };
+
+ let mut response_ev: Option<Event> = None;
+ match outcome {
+ Ok(_) => {
+ for ev in multi_rx {
+ etx.send(ev.clone());
+ response_ev = Some(ev);
+ }
+ info!("Interpreter finished.");
+ }
+
+ Err(Error::Authorization(_)) => {
+ let ev = Event::NotAuthenticated;
+ etx.send(ev.clone());
+ response_ev = Some(ev);
+ error!("Interpreter authentication failed");
+ }
+
+ Err(err) => {
+ let ev = Event::Error(format!("{}", err));
+ etx.send(ev.clone());
+ response_ev = Some(ev);
+ error!("Interpreter failed: {}", err);
+ }
+ }
+
+ let ev = response_ev.expect("no response event to send back");
+ interpret.response_tx.map(|tx| tx.lock().unwrap().send(ev));
+ }
+}
+
+impl<'t> GlobalInterpreter<'t> {
+ fn authenticated(&self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> {
+ let mut sota = Sota::new(&self.config, self.http_client.as_ref());
+
+ // always send at least one Event response
+ match cmd {
+ Command::Authenticate(_) => etx.send(Event::Authenticated),
+
+ Command::GetNewUpdates => {
+ let mut updates = try!(sota.get_pending_updates());
+ if updates.is_empty() {
+ etx.send(Event::NoNewUpdates);
+ } else {
+ updates.sort_by_key(|u| u.installPos);
+ let ids = updates.iter().map(|u| u.requestId.clone()).collect::<Vec<UpdateRequestId>>();
+ etx.send(Event::NewUpdatesReceived(ids))
+ }
+ }
+
+ Command::ListInstalledPackages => {
+ let mut packages: Vec<Package> = Vec::new();
+ if self.config.device.package_manager != PackageManager::Off {
+ packages = try!(self.config.device.package_manager.installed_packages());
+ }
+ etx.send(Event::FoundInstalledPackages(packages));
+ }
+
+ Command::RefreshSystemInfo(post) => {
+ let info = try!(self.config.device.system_info.report());
+ etx.send(Event::FoundSystemInfo(info.clone()));
+ if post {
+ let _ = sota.send_system_info(&info)
+ .map_err(|err| etx.send(Event::Error(format!("{}", err))));
+ }
+ }
+
+ Command::SendInstalledPackages(packages) => {
+ let _ = sota.send_installed_packages(&packages)
+ .map_err(|err| error!("couldn't send installed packages: {}", err));
+ etx.send(Event::InstalledPackagesSent);
+ }
+
+ Command::SendInstalledSoftware(sw) => {
+ if let Some(ref rvi) = self.rvi {
+ let _ = rvi.remote.lock().unwrap().send_installed_software(sw);
+ }
+ }
+
+ Command::SendUpdateReport(report) => {
+ if let Some(ref rvi) = self.rvi {
+ let _ = rvi.remote.lock().unwrap().send_update_report(report);
+ } else {
+ let _ = sota.send_update_report(&report)
+ .map_err(|err| error!("couldn't send update report: {}", err));
+ }
+ etx.send(Event::UpdateReportSent);
+ }
+
+ Command::StartDownload(ref ids) => {
+ for id in ids {
+ etx.send(Event::DownloadingUpdate(id.clone()));
+
+ if let Some(ref rvi) = self.rvi {
+ let _ = rvi.remote.lock().unwrap().send_download_started(id.clone());
+ } else {
+ let _ = sota.download_update(id.clone())
+ .map(|dl| etx.send(Event::DownloadComplete(dl)))
+ .map_err(|err| etx.send(Event::DownloadFailed(id.clone(), format!("{}", err))));
+ }
+ }
+ }
+
+ Command::StartInstall(dl) => {
+ let _ = sota.install_update(dl)
+ .map(|report| etx.send(Event::InstallComplete(report)))
+ .map_err(|report| etx.send(Event::InstallFailed(report)));
+ }
+
+ Command::Shutdown => std::process::exit(0),
+ }
+
+ Ok(())
+ }
+
+ fn unauthenticated(&mut self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> {
+ match cmd {
+ Command::Authenticate(_) => {
+ let config = self.config.auth.clone().expect("trying to authenticate without auth config");
+ self.set_client(Auth::Credentials(ClientId(config.client_id),
+ ClientSecret(config.client_secret)));
+ let server = config.server.join("/token").expect("couldn't build authentication url");
+ let token = try!(authenticate(server, self.http_client.as_ref()));
+ self.set_client(Auth::Token(token.clone()));
+ self.token = Some(token.into());
+ etx.send(Event::Authenticated);
+ }
+
+ Command::GetNewUpdates |
+ Command::ListInstalledPackages |
+ Command::RefreshSystemInfo(_) |
+ Command::SendInstalledPackages(_) |
+ Command::SendInstalledSoftware(_) |
+ Command::SendUpdateReport(_) |
+ Command::StartDownload(_) |
+ Command::StartInstall(_) => etx.send(Event::NotAuthenticated),
+
+ Command::Shutdown => std::process::exit(0),
+ }
+
+ Ok(())
+ }
+
+ fn set_client(&mut self, auth: Auth) {
+ if !self.http_client.is_testing() {
+ self.http_client = Box::new(AuthClient::from(auth));
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use chan;
+ use chan::{Sender, Receiver};
+ use std::thread;
+
+ use super::*;
+ use datatype::{AccessToken, Command, Config, DownloadComplete, Event,
+ UpdateReport, UpdateResultCode};
+ use gateway::Interpret;
+ use http::test_client::TestClient;
+ use package_manager::PackageManager;
+ use package_manager::tpm::assert_rx;
+
+
+ fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) {
+ let (etx, erx) = chan::sync::<Event>(0);
+ let (ctx, crx) = chan::sync::<Command>(0);
+ let (itx, _) = chan::sync::<Interpret>(0);
+
+ thread::spawn(move || {
+ let mut gi = GlobalInterpreter {
+ config: Config::default(),
+ token: Some(AccessToken::default().into()),
+ http_client: Box::new(TestClient::from(replies)),
+ rvi: None,
+ loopback_tx: itx,
+ };
+ gi.config.device.package_manager = pkg_mgr;
+
+ loop {
+ match crx.recv() {
+ Some(cmd) => gi.interpret(Interpret { command: cmd, response_tx: None }, &etx),
+ None => break
+ }
+ }
+ });
+
+ (ctx, erx)
+ }
+
+ #[test]
+ fn already_authenticated() {
+ let replies = Vec::new();
+ let pkg_mgr = PackageManager::new_tpm(true);
+ let (ctx, erx) = new_interpreter(replies, pkg_mgr);
+
+ ctx.send(Command::Authenticate(None));
+ assert_rx(erx, &[Event::Authenticated]);
+ }
+
+ #[test]
+ fn download_updates() {
+ let replies = vec!["[]".to_string(); 10];
+ let pkg_mgr = PackageManager::new_tpm(true);
+ let (ctx, erx) = new_interpreter(replies, pkg_mgr);
+
+ ctx.send(Command::StartDownload(vec!["1".to_string(), "2".to_string()]));
+ assert_rx(erx, &[
+ Event::DownloadingUpdate("1".to_string()),
+ Event::DownloadComplete(DownloadComplete {
+ update_id: "1".to_string(),
+ update_image: "/tmp/1".to_string(),
+ signature: "".to_string()
+ }),
+ Event::DownloadingUpdate("2".to_string()),
+ Event::DownloadComplete(DownloadComplete {
+ update_id: "2".to_string(),
+ update_image: "/tmp/2".to_string(),
+ signature: "".to_string()
+ }),
+ ]);
+ }
+
+ #[test]
+ fn install_update() {
+ let replies = vec!["[]".to_string(); 10];
+ let pkg_mgr = PackageManager::new_tpm(true);
+ let (ctx, erx) = new_interpreter(replies, pkg_mgr);
+
+ ctx.send(Command::StartInstall(DownloadComplete {
+ update_id: "1".to_string(),
+ update_image: "/tmp/1".to_string(),
+ signature: "".to_string()
+ }));
+ assert_rx(erx, &[
+ Event::InstallComplete(
+ UpdateReport::single("1".to_string(), UpdateResultCode::OK, "".to_string())
+ )
+ ]);
+ }
+
+ #[test]
+ fn failed_installation() {
+ let replies = vec!["[]".to_string(); 10];
+ let pkg_mgr = PackageManager::new_tpm(false);
+ let (ctx, erx) = new_interpreter(replies, pkg_mgr);
+
+ ctx.send(Command::StartInstall(DownloadComplete {
+ update_id: "1".to_string(),
+ update_image: "/tmp/1".to_string(),
+ signature: "".to_string()
+ }));
+ assert_rx(erx, &[
+ Event::InstallFailed(
+ UpdateReport::single("1".to_string(), UpdateResultCode::INSTALL_FAILED, "failed".to_string())
+ )
+ ]);
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 62b445c..ec6d3af 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,65 +1,27 @@
-//! This is the client in-vehicle portion of the SOTA project. See the [main SOTA Server
-//! project](https://github.com/advancedtelematic/rvi_sota_server) and [associated architecture
-//! document](http://advancedtelematic.github.io/rvi_sota_server/dev/architecture.html) for more
-//! information.
+#[macro_use] extern crate nom; // use before log to avoid error!() macro conflict
+#[macro_use] extern crate chan;
+extern crate crossbeam;
+extern crate crypto;
+extern crate dbus;
extern crate hyper;
+extern crate openssl;
+#[macro_use] extern crate lazy_static;
+#[macro_use] extern crate log;
+extern crate rand;
extern crate rustc_serialize;
extern crate time;
-extern crate url;
-extern crate crypto;
extern crate toml;
-extern crate dbus;
-
-#[macro_use] extern crate log;
-extern crate env_logger;
-
-#[cfg(test)] extern crate rand;
-
-#[cfg(test)]
-#[macro_use]
-mod test_library;
-
-/// Try to unwrap or log the error and run the second argument
-///
-/// # Arguments
-/// 1. Expression to evaluate, needs to return a `Result<T, E> where E: Display` type.
-/// 2. Expression to run on errors, after logging the error as error message.
-#[macro_export]
-macro_rules! try_or {
- ($expr:expr, $finalize:expr) => {
- match $expr {
- Ok(val) => val,
- Err(e) => {
- error!("{}", e);
- $finalize;
- }
- }
- }
-}
-
-/// Try to unwrap or log the provided message and run the third argument
-///
-/// # Arguments
-/// 1. Expression to evaluate, needs to return a `Result<T, E>` type.
-/// 2. Expression that returns a Object implementing the `Display` trait. This object will be
-/// logged as a error message with `error!()`
-/// 3. Expression to run on errors, after printing a error message.
-#[macro_export]
-macro_rules! try_msg_or {
- ($expr:expr, $msg:expr, $finalize:expr) => {
- match $expr {
- Ok(val) => val,
- Err(..) => {
- error!("{}", $msg);
- $finalize;
- }
- }
- }
-}
-
-mod event;
-mod remote;
-
-pub mod configuration;
-pub mod genivi;
+extern crate unix_socket;
+extern crate url;
+extern crate ws;
+
+pub mod broadcast;
+pub mod datatype;
+pub mod gateway;
+pub mod http;
+pub mod interpreter;
+pub mod oauth2;
+pub mod package_manager;
+pub mod rvi;
+pub mod sota;
diff --git a/src/main.rs b/src/main.rs
index c55a339..61fa02a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,85 +1,308 @@
-//! Logic for starting and configuring the sota_client from the command line.
-
-extern crate sota_client;
-#[macro_use] extern crate log;
+#[macro_use] extern crate chan;
+extern crate chan_signal;
+extern crate crossbeam;
extern crate env_logger;
extern crate getopts;
+extern crate hyper;
+#[macro_use] extern crate log;
+extern crate rustc_serialize;
+#[macro_use] extern crate sota;
+extern crate time;
+use chan::{Sender, Receiver};
+use chan_signal::Signal;
+use env_logger::LogBuilder;
+use getopts::Options;
+use log::{LogLevelFilter, LogRecord};
use std::env;
-use getopts::{Options, Matches};
-use sota_client::configuration::Configuration;
-use sota_client::genivi;
-
-/// Helper function to print usage information to stdout.
-///
-/// # Arguments
-/// * `program`: The invoking path or name of the executable
-/// * `opts`: A pointer to a `Options` object, which generates the actual documentation. See the
-/// [getopts documentation](https://doc.rust-lang.org/getopts/getopts/index.html) for details.
-#[cfg_attr(test, allow(dead_code))]
-fn print_usage(program: &str, opts: &Options) {
- let brief = format!("Usage: {} [options]", program);
- print!("{}", opts.usage(&brief));
+use std::collections::HashMap;
+use std::path::Path;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use sota::datatype::{Command, Config, Event, SystemInfo};
+use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket};
+use sota::broadcast::Broadcast;
+use sota::http::{AuthClient, set_ca_certificates};
+use sota::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, GlobalInterpreter};
+use sota::package_manager::PackageManager;
+use sota::rvi::{Edge, Services};
+
+
+macro_rules! exit {
+ ($fmt:expr, $($arg:tt)*) => {{
+ print!(concat!($fmt, "\n"), $($arg)*);
+ std::process::exit(1);
+ }}
}
-/// Parses the command line and matches it against accepted flags and options. Returns a `Matches`
-/// object. See the [getopts documentation](https://doc.rust-lang.org/getopts/getopts/index.html)
-/// for details.
-///
-/// # Arguments
-/// * `args`: A pointer to a Array of Strings. This is supposed to be the commandline as returned
-/// by [`env::args().collect()`](https://doc.rust-lang.org/stable/std/env/fn.args.html).
-/// * `program`: The invoking path or name of the executable
-fn match_args(args: &[String], program: &str) -> Matches {
- let mut options = Options::new();
- options.optflag("h", "help", "print this help message");
- options.optopt("c", "config", "change the path where the configuration \
- is expected", "FILE");
- options.optopt("r", "rvi", "explicitly set the URL, where RVI can be \
- reached", "URL");
- options.optopt("e", "edge", "explicitly set the host and port, where the \
- client should listen for connections from RVI", "HOST:PORT");
-
- let matches = match options.parse(args) {
- Ok(m) => { m }
- Err(f) => {
- error!("{}", f.to_string());
- print_usage(program, &options);
- std::process::exit(1);
+
+fn start_signal_handler(signals: Receiver<Signal>) {
+ loop {
+ match signals.recv() {
+ Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0),
+ _ => ()
}
- };
+ }
+}
- if matches.opt_present("h") {
- print_usage(program, &options);
- std::process::exit(0);
+fn start_update_poller(interval: u64, itx: Sender<Interpret>) {
+ let (etx, erx) = chan::async::<Event>();
+ let tick = chan::tick(Duration::from_secs(interval));
+ loop {
+ let _ = tick.recv();
+ itx.send(Interpret {
+ command: Command::GetNewUpdates,
+ response_tx: Some(Arc::new(Mutex::new(etx.clone())))
+ });
+ let _ = erx.recv();
+ }
+}
+
+fn send_startup_commands(config: &Config, ctx: &Sender<Command>) {
+ ctx.send(Command::Authenticate(None));
+ ctx.send(Command::RefreshSystemInfo(true));
+
+ if config.device.package_manager != PackageManager::Off {
+ config.device.package_manager.installed_packages().map(|packages| {
+ ctx.send(Command::SendInstalledPackages(packages));
+ }).unwrap_or_else(|err| exit!("Couldn't get list of packages: {}", err));
}
- matches
}
-/// Program entrypoint. Parses command line arguments and starts the main loop accordingly.
-#[cfg_attr(test, allow(dead_code))]
fn main() {
- env_logger::init().unwrap();
- let args: Vec<String> = env::args().collect();
- let program: &str = &args[0];
- let matches = match_args(&args[1..], program);
-
- let conf_file = matches.opt_str("c")
- .unwrap_or(Configuration::default_path());
- let configuration = match Configuration::read(&conf_file) {
- Ok(value) => value,
- Err(e) => {
- error!("Couldn't parse configuration file at {}: {}", conf_file, e);
- std::process::exit(126);
+ setup_logging();
+
+ let config = build_config();
+ set_ca_certificates(Path::new(&config.device.certificates_path));
+
+ let (etx, erx) = chan::async::<Event>();
+ let (ctx, crx) = chan::async::<Command>();
+ let (itx, irx) = chan::async::<Interpret>();
+
+ let mut broadcast = Broadcast::new(erx);
+ send_startup_commands(&config, &ctx);
+
+ crossbeam::scope(|scope| {
+ // Must subscribe to the signal before spawning ANY other threads
+ let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]);
+ scope.spawn(move || start_signal_handler(signals));
+
+ let poll_tick = config.device.polling_interval;
+ let poll_itx = itx.clone();
+ scope.spawn(move || start_update_poller(poll_tick, poll_itx));
+
+ if config.gateway.console {
+ let cons_itx = itx.clone();
+ let cons_sub = broadcast.subscribe();
+ scope.spawn(move || Console.start(cons_itx, cons_sub));
+ }
+
+ if config.gateway.dbus {
+ let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway"));
+ let dbus_itx = itx.clone();
+ let dbus_sub = broadcast.subscribe();
+ let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() };
+ scope.spawn(move || dbus.start(dbus_itx, dbus_sub));
}
- };
- let rvi_url: String = matches.opt_str("r")
- .unwrap_or(configuration.client.rvi_url.clone()
- .unwrap_or("http://localhost:8901".to_string()));
- let edge_url: String = matches.opt_str("e")
- .unwrap_or(configuration.client.edge_url.clone()
- .unwrap_or("localhost:9080".to_string()));
+ if config.gateway.http {
+ let http_itx = itx.clone();
+ let http_sub = broadcast.subscribe();
+ let mut http = Http { server: config.network.http_server.clone() };
+ scope.spawn(move || http.start(http_itx, http_sub));
+ }
+
+ let mut rvi = None;
+ if config.gateway.rvi {
+ let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway"));
+ let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway"));
+ let rvi_edge = config.network.rvi_edge_server.clone();
+ let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone());
+ let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone());
+ scope.spawn(move || edge.start());
+ rvi = Some(services);
+ }
+
+ if config.gateway.socket {
+ let socket_itx = itx.clone();
+ let socket_sub = broadcast.subscribe();
+ let mut socket = Socket {
+ commands_path: config.network.socket_commands_path.clone(),
+ events_path: config.network.socket_events_path.clone()
+ };
+ scope.spawn(move || socket.start(socket_itx, socket_sub));
+ }
+
+ if config.gateway.websocket {
+ let ws_server = config.network.websocket_server.clone();
+ let ws_itx = itx.clone();
+ let ws_sub = broadcast.subscribe();
+ let mut ws = Websocket { server: ws_server, clients: Arc::new(Mutex::new(HashMap::new())) };
+ scope.spawn(move || ws.start(ws_itx, ws_sub));
+ }
+
+ let event_sub = broadcast.subscribe();
+ let event_ctx = ctx.clone();
+ let event_mgr = config.device.package_manager.clone();
+ scope.spawn(move || EventInterpreter {
+ package_manager: event_mgr
+ }.run(event_sub, event_ctx));
+
+ let cmd_itx = itx.clone();
+ scope.spawn(move || CommandInterpreter.run(crx, cmd_itx));
+
+ scope.spawn(move || GlobalInterpreter {
+ config: config,
+ token: None,
+ http_client: Box::new(AuthClient::default()),
+ rvi: rvi,
+ loopback_tx: itx,
+ }.run(irx, etx));
+
+ scope.spawn(move || broadcast.start());
+ });
+}
+
+fn setup_logging() {
+ let version = option_env!("SOTA_VERSION").unwrap_or("?");
+ let mut builder = LogBuilder::new();
+ builder.format(move |record: &LogRecord| {
+ let timestamp = format!("{}", time::now_utc().rfc3339());
+ format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args())
+ });
+ builder.filter(Some("hyper"), LogLevelFilter::Info);
+
+ let _ = env::var("RUST_LOG").map(|level| builder.parse(&level));
+ builder.init().expect("env_logger::init() called twice, blame the programmers.");
+}
+
+fn build_config() -> Config {
+ let args = env::args().collect::<Vec<String>>();
+ let program = args[0].clone();
+ let mut opts = Options::new();
+
+ opts.optflag("h", "help", "print this help menu");
+ opts.optopt("", "config", "change config path", "PATH");
+
+ opts.optopt("", "auth-server", "change the auth server", "URL");
+ opts.optopt("", "auth-client-id", "change the auth client id", "ID");
+ opts.optopt("", "auth-client-secret", "change the auth client secret", "SECRET");
+ opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH");
+
+ opts.optopt("", "core-server", "change the core server", "URL");
+
+ opts.optopt("", "dbus-name", "change the dbus registration name", "NAME");
+ opts.optopt("", "dbus-path", "change the dbus path", "PATH");
+ opts.optopt("", "dbus-interface", "change the dbus interface name", "INTERFACE");
+ opts.optopt("", "dbus-software-manager", "change the dbus software manager name", "NAME");
+ opts.optopt("", "dbus-software-manager-path", "change the dbus software manager path", "PATH");
+ opts.optopt("", "dbus-timeout", "change the dbus installation timeout", "TIMEOUT");
+
+ opts.optopt("", "device-uuid", "change the device uuid", "UUID");
+ opts.optopt("", "device-vin", "change the device vin", "VIN");
+ opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH");
+ opts.optopt("", "device-package-manager", "change the package manager", "MANAGER");
+ opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL");
+ opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH");
+ opts.optopt("", "device-system-info", "change the system information command", "PATH");
+
+ opts.optopt("", "gateway-console", "toggle the console gateway", "BOOL");
+ opts.optopt("", "gateway-dbus", "toggle the dbus gateway", "BOOL");
+ opts.optopt("", "gateway-http", "toggle the http gateway", "BOOL");
+ opts.optopt("", "gateway-rvi", "toggle the rvi gateway", "BOOL");
+ opts.optopt("", "gateway-socket", "toggle the unix domain socket gateway", "BOOL");
+ opts.optopt("", "gateway-websocket", "toggle the websocket gateway", "BOOL");
+
+ opts.optopt("", "network-http-server", "change the http server gateway address", "ADDR");
+ opts.optopt("", "network-rvi-edge-server", "change the rvi edge server gateway address", "ADDR");
+ opts.optopt("", "network-socket-commands-path", "change the socket path for reading commands", "PATH");
+ opts.optopt("", "network-socket-events-path", "change the socket path for sending events", "PATH");
+ opts.optopt("", "network-websocket-server", "change the websocket gateway address", "ADDR");
+
+ opts.optopt("", "rvi-client", "change the rvi client URL", "URL");
+ opts.optopt("", "rvi-storage-dir", "change the rvi storage directory", "PATH");
+ opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT");
+
+ let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string()));
+ if matches.opt_present("h") {
+ exit!("{}", opts.usage(&format!("Usage: {} [options]", program)));
+ }
+
+ let config_file = matches.opt_str("config").unwrap_or_else(|| {
+ env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided."))
+ });
+ let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err));
+
+ config.auth.as_mut().map(|auth_cfg| {
+ matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id);
+ matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret);
+ matches.opt_str("auth-server").map(|text| {
+ auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err));
+ });
+ });
+
+ matches.opt_str("core-server").map(|text| {
+ config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err));
+ });
+
+ config.dbus.as_mut().map(|dbus_cfg| {
+ matches.opt_str("dbus-name").map(|name| dbus_cfg.name = name);
+ matches.opt_str("dbus-path").map(|path| dbus_cfg.path = path);
+ matches.opt_str("dbus-interface").map(|interface| dbus_cfg.interface = interface);
+ matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr);
+ matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path);
+ matches.opt_str("dbus-timeout").map(|timeout| {
+ dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err));
+ });
+ });
+
+ matches.opt_str("device-uuid").map(|uuid| config.device.uuid = uuid);
+ matches.opt_str("device-vin").map(|vin| config.device.vin = vin);
+ matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path);
+ matches.opt_str("device-package-manager").map(|text| {
+ config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err));
+ });
+ matches.opt_str("device-polling-interval").map(|interval| {
+ config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err));
+ });
+ matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs);
+ matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(cmd));
+
+ matches.opt_str("gateway-console").map(|console| {
+ config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err));
+ });
+ matches.opt_str("gateway-dbus").map(|dbus| {
+ config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err));
+ });
+ matches.opt_str("gateway-http").map(|http| {
+ config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err));
+ });
+ matches.opt_str("gateway-rvi").map(|rvi| {
+ config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err));
+ });
+ matches.opt_str("gateway-socket").map(|socket| {
+ config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err));
+ });
+ matches.opt_str("gateway-websocket").map(|websocket| {
+ config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err));
+ });
+
+ matches.opt_str("network-http-server").map(|server| config.network.http_server = server);
+ matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server);
+ matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path);
+ matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path);
+ matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server);
+
+ config.rvi.as_mut().map(|rvi_cfg| {
+ matches.opt_str("rvi-client").map(|url| {
+ rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err));
+ });
+ matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir);
+ matches.opt_str("rvi-timeout").map(|timeout| {
+ rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err)));
+ });
+ });
- genivi::start::start(&configuration, rvi_url, edge_url);
+ config
}
diff --git a/src/oauth2.rs b/src/oauth2.rs
new file mode 100644
index 0000000..0c5f152
--- /dev/null
+++ b/src/oauth2.rs
@@ -0,0 +1,53 @@
+use rustc_serialize::json;
+
+use datatype::{AccessToken, Error, Url};
+use http::Client;
+
+
+/// Authenticate with the specified OAuth2 server to retrieve a new `AccessToken`.
+pub fn authenticate(server: Url, client: &Client) -> Result<AccessToken, Error> {
+ debug!("authenticating at {}", server);
+ let resp_rx = client.post(server, None);
+ let resp = resp_rx.recv().expect("no authenticate response received");
+ let data = try!(resp);
+ let body = try!(String::from_utf8(data));
+ Ok(try!(json::decode(&body)))
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use datatype::{AccessToken, Url};
+ use http::TestClient;
+
+
+ fn test_server() -> Url {
+ "http://localhost:8000".parse().unwrap()
+ }
+
+ #[test]
+ fn test_authenticate() {
+ let token = r#"{
+ "access_token": "token",
+ "token_type": "type",
+ "expires_in": 10,
+ "scope": "scope1 scope2"
+ }"#;
+ let client = TestClient::from(vec![token.to_string()]);
+ let expect = AccessToken {
+ access_token: "token".to_string(),
+ token_type: "type".to_string(),
+ expires_in: 10,
+ scope: "scope1 scope2".to_string()
+ };
+ assert_eq!(expect, authenticate(test_server(), &client).unwrap());
+ }
+
+ #[test]
+ fn test_authenticate_bad_json() {
+ let client = TestClient::from(vec![r#"{"apa": 1}"#.to_string()]);
+ let expect = r#"Failed to decode JSON: MissingFieldError("access_token")"#;
+ assert_eq!(expect, format!("{}", authenticate(test_server(), &client).unwrap_err()));
+ }
+}
diff --git a/src/package_manager/deb.rs b/src/package_manager/deb.rs
new file mode 100644
index 0000000..bba86e6
--- /dev/null
+++ b/src/package_manager/deb.rs
@@ -0,0 +1,48 @@
+use std::process::Command;
+
+use datatype::{Error, Package, UpdateResultCode};
+use package_manager::package_manager::{InstallOutcome, parse_package};
+
+
+/// Returns a list of installed DEB packages with
+/// `dpkg-query -f='${Package} ${Version}\n -W`.
+pub fn installed_packages() -> Result<Vec<Package>, Error> {
+ Command::new("dpkg-query").arg("-f='${Package} ${Version}\n'").arg("-W")
+ .output()
+ .map_err(|e| Error::Package(format!("Error fetching packages: {}", e)))
+ .and_then(|c| {
+ String::from_utf8(c.stdout)
+ .map_err(|e| Error::Parse(format!("Error parsing package: {}", e)))
+ .map(|s| s.lines().map(String::from).collect::<Vec<String>>())
+ })
+ .and_then(|lines| {
+ lines.iter()
+ .map(|line| parse_package(line))
+ .filter(|pkg| pkg.is_ok())
+ .collect::<Result<Vec<Package>, _>>()
+ })
+}
+
+/// Installs a new DEB package.
+pub fn install_package(path: &str) -> Result<InstallOutcome, InstallOutcome> {
+ let output = try!(Command::new("dpkg").arg("-E").arg("-i").arg(path)
+ .output()
+ .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e))));
+
+ let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+
+ match output.status.code() {
+ Some(0) => {
+ if (&stdout).contains("already installed") {
+ Ok((UpdateResultCode::ALREADY_PROCESSED, stdout))
+ } else {
+ Ok((UpdateResultCode::OK, stdout))
+ }
+ }
+ _ => {
+ let out = format!("stdout: {}\nstderr: {}", stdout, stderr);
+ Err((UpdateResultCode::INSTALL_FAILED, out))
+ }
+ }
+}
diff --git a/src/package_manager/mod.rs b/src/package_manager/mod.rs
new file mode 100644
index 0000000..6596686
--- /dev/null
+++ b/src/package_manager/mod.rs
@@ -0,0 +1,8 @@
+pub mod deb;
+pub mod package_manager;
+pub mod rpm;
+pub mod tpm;
+pub mod otb;
+
+pub use self::package_manager::PackageManager;
+pub use self::tpm::{assert_rx, TestDir};
diff --git a/src/package_manager/otb.rs b/src/package_manager/otb.rs
new file mode 100644
index 0000000..fece27b
--- /dev/null
+++ b/src/package_manager/otb.rs
@@ -0,0 +1,53 @@
+use std::process::Command;
+
+use datatype::{Error, Package, UpdateResultCode};
+use package_manager::package_manager::{InstallOutcome, parse_package};
+
+
+/// Returns a list of installed `OSTree` packages with
+/// `otbpkg --repo=${repodir} --query`.
+pub fn installed_packages(repodir: &str) -> Result<Vec<Package>, Error> {
+ Command::new("otbpkg")
+ .arg(format!{"--repo={}", repodir})
+ .arg("--query")
+ .output()
+ .map_err(|e| Error::Package(format!("Error fetching packages: {}", e)))
+ .and_then(|c| {
+ String::from_utf8(c.stdout)
+ .map_err(|e| Error::Parse(format!("Error parsing package: {}", e)))
+ .map(|s| s.lines().map(String::from).collect::<Vec<String>>())
+ })
+ .and_then(|lines| {
+ lines.iter()
+ .map(|line| parse_package(line))
+ .filter(|pkg| pkg.is_ok())
+ .collect::<Result<Vec<Package>, _>>()
+ })
+}
+
+/// Installs a new `OSTree` package.
+pub fn install_package(repodir: &str, path: &str) -> Result<InstallOutcome, InstallOutcome> {
+ let output = try!(Command::new("otbpkg")
+ .arg("--install")
+ .arg(format!("--repo={}", repodir))
+ .arg(path)
+ .output()
+ .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e))));
+
+ let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+
+ match output.status.code() {
+ Some(0) => {
+ if (&stdout).contains("already installed") {
+ Ok((UpdateResultCode::ALREADY_PROCESSED, stdout))
+ } else {
+ Ok((UpdateResultCode::OK, stdout))
+ }
+ }
+ _ => {
+ let out = format!("stdout: {}\nstderr: {}", stdout, stderr);
+ Err((UpdateResultCode::INSTALL_FAILED, out))
+ }
+ }
+}
diff --git a/src/package_manager/package_manager.rs b/src/package_manager/package_manager.rs
new file mode 100644
index 0000000..09556a0
--- /dev/null
+++ b/src/package_manager/package_manager.rs
@@ -0,0 +1,135 @@
+use rustc_serialize::{Decoder, Decodable};
+use std::str::FromStr;
+
+use datatype::{Error, Package, UpdateResultCode};
+use package_manager::{deb, otb, rpm, tpm};
+
+
+/// The outcome when installing a package as a tuple of the `UpdateResultCode`
+/// and any stdout/stderr output.
+pub type InstallOutcome = (UpdateResultCode, String);
+
+/// An enumeration of the available package managers for querying and installing
+/// new packages.
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum PackageManager {
+ Off,
+ Deb,
+ Rpm,
+ File { filename: String, succeeds: bool },
+ OSTree { repodir: String }
+}
+
+impl PackageManager {
+ /// Delegates to the package manager specific function for returning a list
+ /// of installed packages.
+ pub fn installed_packages(&self) -> Result<Vec<Package>, Error> {
+ match *self {
+ PackageManager::Off => panic!("no package manager"),
+ PackageManager::Deb => deb::installed_packages(),
+ PackageManager::Rpm => rpm::installed_packages(),
+ PackageManager::File { ref filename, .. } => tpm::installed_packages(filename),
+ PackageManager::OSTree { ref repodir } => otb::installed_packages(repodir),
+ }
+ }
+
+ /// Delegates to the package manager specific function for installing a new
+ /// package on the device.
+ pub fn install_package(&self, path: &str) -> Result<InstallOutcome, InstallOutcome> {
+ match *self {
+ PackageManager::Off => panic!("no package manager"),
+ PackageManager::Deb => deb::install_package(path),
+ PackageManager::Rpm => rpm::install_package(path),
+ PackageManager::File { ref filename, succeeds } => {
+ tpm::install_package(filename, path, succeeds)
+ }
+ PackageManager::OSTree { ref repodir } => {
+ otb::install_package(repodir, path)
+ }
+ }
+ }
+
+ /// Returns a string representation of the package manager's extension.
+ pub fn extension(&self) -> String {
+ match *self {
+ PackageManager::Off => panic!("no package manager"),
+ PackageManager::Deb => "deb".to_string(),
+ PackageManager::Rpm => "rpm".to_string(),
+ PackageManager::File { ref filename, .. } => filename.to_string(),
+ PackageManager::OSTree {..} => "otb".to_string(),
+ }
+ }
+}
+
+impl FromStr for PackageManager {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<PackageManager, Error> {
+ match s.to_lowercase().as_str() {
+ "off" => Ok(PackageManager::Off),
+ "deb" => Ok(PackageManager::Deb),
+ "rpm" => Ok(PackageManager::Rpm),
+
+ file if file.len() > 5 && file[..5].as_bytes() == b"file:" => {
+ Ok(PackageManager::File { filename: file[5..].to_string(), succeeds: true })
+ },
+
+ repo if repo.len() > 4 && repo[..4].as_bytes() == b"otb:" => {
+ Ok(PackageManager::OSTree { repodir: repo[4..].to_string() })
+ }
+
+ _ => Err(Error::Parse(format!("unknown package manager: {}", s)))
+ }
+ }
+}
+
+impl Decodable for PackageManager {
+ fn decode<D: Decoder>(d: &mut D) -> Result<PackageManager, D::Error> {
+ d.read_str().and_then(|s| Ok(s.parse::<PackageManager>().expect("couldn't parse PackageManager")))
+ }
+}
+
+pub fn parse_package(line: &str) -> Result<Package, Error> {
+ match line.splitn(2, ' ').collect::<Vec<_>>() {
+ ref parts if parts.len() == 2 => {
+ // HACK: strip left single quotes from stdout
+ Ok(Package {
+ name: String::from(parts[0].trim_left_matches('\'')),
+ version: String::from(parts[1])
+ })
+ },
+ _ => Err(Error::Parse(format!("Couldn't parse package: {}", line)))
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use datatype::Package;
+
+
+ #[test]
+ fn test_parses_normal_package() {
+ assert_eq!(parse_package("uuid-runtime 2.20.1-5.1ubuntu20.7").unwrap(),
+ Package {
+ name: "uuid-runtime".to_string(),
+ version: "2.20.1-5.1ubuntu20.7".to_string()
+ });
+ }
+
+ #[test]
+ fn test_separates_name_and_version_correctly() {
+ assert_eq!(parse_package("vim 2.1 foobar").unwrap(),
+ Package {
+ name: "vim".to_string(),
+ version: "2.1 foobar".to_string()
+ });
+ }
+
+ #[test]
+ fn test_rejects_bogus_input() {
+ assert_eq!(format!("{}", parse_package("foobar").unwrap_err()),
+ "Parse error: Couldn't parse package: foobar".to_string());
+ }
+}
diff --git a/src/package_manager/rpm.rs b/src/package_manager/rpm.rs
new file mode 100644
index 0000000..99aacbf
--- /dev/null
+++ b/src/package_manager/rpm.rs
@@ -0,0 +1,46 @@
+use std::process::Command;
+
+use datatype::{Error, Package, UpdateResultCode};
+use package_manager::package_manager::{InstallOutcome, parse_package};
+
+
+/// Returns a list of installed RPM packages with
+/// `rpm -qa ==queryformat ${NAME} ${VERSION}\n`.
+pub fn installed_packages() -> Result<Vec<Package>, Error> {
+ Command::new("rpm").arg("-qa").arg("--queryformat").arg("%{NAME} %{VERSION}\n")
+ .output()
+ .map_err(|e| Error::Package(format!("Error fetching packages: {}", e)))
+ .and_then(|c| {
+ String::from_utf8(c.stdout)
+ .map_err(|e| Error::Parse(format!("Error parsing package: {}", e)))
+ .map(|s| s.lines().map(String::from).collect::<Vec<String>>())
+ })
+ .and_then(|lines| {
+ lines.iter()
+ .map(|line| parse_package(line))
+ .filter(|item| item.is_ok())
+ .collect::<Result<Vec<Package>, _>>()
+ })
+}
+
+/// Installs a new RPM package.
+pub fn install_package(path: &str) -> Result<InstallOutcome, InstallOutcome> {
+ let output = try!(Command::new("rpm").arg("-Uvh").arg("--force").arg(path)
+ .output()
+ .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e))));
+
+ let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+
+ match output.status.code() {
+ Some(0) => Ok((UpdateResultCode::OK, stdout)),
+ _ => {
+ let out = format!("stdout: {}\nstderr: {}", stdout, stderr);
+ if (&stderr).contains("already installed") {
+ Ok((UpdateResultCode::ALREADY_PROCESSED, out))
+ } else {
+ Err((UpdateResultCode::INSTALL_FAILED, out))
+ }
+ }
+ }
+}
diff --git a/src/package_manager/tpm.rs b/src/package_manager/tpm.rs
new file mode 100644
index 0000000..feafe0a
--- /dev/null
+++ b/src/package_manager/tpm.rs
@@ -0,0 +1,162 @@
+use chan::Receiver;
+use std::fmt::Debug;
+use std::fs;
+use std::fs::File;
+use std::fs::OpenOptions;
+use std::io::BufReader;
+use std::io::prelude::*;
+use time;
+
+use datatype::{Error, Package, UpdateResultCode};
+use package_manager::package_manager::{InstallOutcome, PackageManager};
+
+
+impl PackageManager {
+ /// Creates a new Test Package Manager that writes to a temporary file.
+ pub fn new_tpm(succeeds: bool) -> Self {
+ let name = format!("/tmp/sota-tpm-{}", time::precise_time_ns().to_string());
+ if succeeds {
+ let _ = File::create(name.clone()).expect("couldn't create Test Package Manager file");
+ }
+ PackageManager::File { filename: name, succeeds: succeeds }
+ }
+}
+
+
+/// Encapsulate a directory whose contents will be destroyed when it drops out of scope.
+pub struct TestDir(pub String);
+
+impl TestDir {
+ /// Create a new test directory that will be destroyed when it drops out of scope.
+ pub fn new(reason: &str) -> TestDir {
+ let dir = format!("/tmp/{}-{}", reason, time::precise_time_ns().to_string());
+ fs::create_dir_all(dir.clone()).expect("couldn't create TempDir");
+ TestDir(dir)
+ }
+}
+
+impl Drop for TestDir {
+ fn drop(&mut self) {
+ fs::remove_dir_all(&self.0.clone()).expect("couldn't remove TempDir");
+ }
+}
+
+
+/// For each item in the list, assert that it equals the next `Receiver` value.
+pub fn assert_rx<X: PartialEq + Debug>(rx: Receiver<X>, xs: &[X]) {
+ let n = xs.len();
+ let mut xs = xs.iter();
+ for _ in 0..n {
+ let val = rx.recv().expect("assert_rx expected another val");
+ let x = xs.next().expect(&format!("assert_rx: no match for val: {:?}", val));
+ assert_eq!(val, *x);
+ }
+}
+
+
+/// Returns a list of installed packages from a format of `<name> <version>`.
+pub fn installed_packages(path: &str) -> Result<Vec<Package>, Error> {
+ let f = try!(File::open(path));
+ let reader = BufReader::new(f);
+ let mut pkgs = Vec::new();
+
+ for line in reader.lines() {
+ let line = try!(line);
+ let parts = line.split(' ');
+
+ if parts.clone().count() == 2 {
+ if let Some(name) = parts.clone().nth(0) {
+ if let Some(version) = parts.clone().nth(1) {
+ pkgs.push(Package {
+ name: name.to_string(),
+ version: version.to_string()
+ });
+ }
+ }
+ }
+ }
+
+ Ok(pkgs)
+}
+
+/// Installs a package to the specified path when succeeds is true, or fails otherwise.
+pub fn install_package(path: &str, pkg: &str, succeeds: bool) -> Result<InstallOutcome, InstallOutcome> {
+ if !succeeds {
+ return Err((UpdateResultCode::INSTALL_FAILED, "failed".to_string()))
+ }
+
+ let outcome = || -> Result<(), Error> {
+ let mut f = OpenOptions::new().create(true).write(true).append(true).open(path).unwrap();
+ try!(f.write(pkg.as_bytes()));
+ try!(f.write(b"\n"));
+ Ok(())
+ }();
+
+ match outcome {
+ Ok(_) => Ok((UpdateResultCode::OK, "".to_string())),
+ Err(err) => Err((UpdateResultCode::INSTALL_FAILED, format!("{:?}", err)))
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use std::fs::File;
+ use std::io::prelude::*;
+
+ use super::*;
+ use datatype::Package;
+
+
+ fn pkg1() -> Package {
+ Package {
+ name: "apa".to_string(),
+ version: "0.0.0".to_string()
+ }
+ }
+
+ fn pkg2() -> Package {
+ Package {
+ name: "bepa".to_string(),
+ version: "1.0.0".to_string()
+ }
+ }
+
+
+ #[test]
+ fn get_installed_packages() {
+ let dir = TestDir::new("sota-tpm-test-1");
+ let path = format!("{}/tpm", dir.0);
+ let mut f = File::create(path.clone()).unwrap();
+ f.write(b"apa 0.0.0\n").unwrap();
+ f.write(b"bepa 1.0.0").unwrap();
+ assert_eq!(installed_packages(&path).unwrap(), vec![pkg1(), pkg2()]);
+ }
+
+ #[test]
+ fn ignore_bad_installed_packages() {
+ let dir = TestDir::new("sota-tpm-test-2");
+ let path = format!("{}/tpm", dir.0);
+ let mut f = File::create(path.clone()).unwrap();
+ f.write(b"cepa-2.0.0\n").unwrap();
+ assert_eq!(installed_packages(&path).unwrap(), Vec::new());
+ }
+
+ #[test]
+ fn install_packages() {
+ let dir = TestDir::new("sota-tpm-test-3");
+ let path = format!("{}/tpm", dir.0);
+ install_package(&path, "apa 0.0.0", true).unwrap();
+ install_package(&path, "bepa 1.0.0", true).unwrap();
+ assert_eq!(installed_packages(&path).unwrap(), vec![pkg1(), pkg2()]);
+ }
+
+ #[test]
+ fn failed_installation() {
+ let dir = TestDir::new("sota-tpm-test-4");
+ let path = format!("{}/tpm", dir.0);
+ assert!(install_package(&path, "apa 0.0.0", false).is_err());
+ install_package(&path, "bepa 1.0.0", true).unwrap();
+ assert_eq!(installed_packages(&path).unwrap(), vec![pkg2()]);
+ }
+}
diff --git a/src/remote/dw.rs b/src/remote/dw.rs
deleted file mode 100644
index 0fd8b8e..0000000
--- a/src/remote/dw.rs
+++ /dev/null
@@ -1,581 +0,0 @@
-//! Handles caching and storage on disk for in-progress transfers and the assembly and verification
-//! of finished transfers
-
-use std::fs;
-use std::fs::{OpenOptions, DirEntry, File};
-use std::io::prelude::*;
-use std::path::PathBuf;
-use std::vec::Vec;
-use std::str::FromStr;
-
-use time;
-
-#[cfg(test)] use rand;
-#[cfg(test)] use rand::Rng;
-#[cfg(test)] use test_library::PathPrefix;
-
-use crypto::sha1::Sha1;
-use crypto::digest::Digest;
-
-use rustc_serialize::base64::FromBase64;
-
-use event::UpdateId;
-
-/// Type for storing the metadata of a in-progress transfer, which is defined as one package.
-/// Will clear out the chunks on disk when freed.
-pub struct Transfer {
- pub update_id: UpdateId,
- /// SHA1 checksum of the fully assembled package.
- pub checksum: String,
- /// `Vector` of transferred chunks.
- pub transferred_chunks: Vec<u64>,
- /// Path to the directory, where chunks will be cached and finished packages will be stored.
- pub prefix_dir: String,
- /// Timestamp, when the last chunk was received. Given as a unix epoch timestamp.
- pub last_chunk_received: i64
-}
-
-impl Transfer {
- /// Return a new `Transfer`
- ///
- /// # Arguments
- /// * `prefix`: Path where transferred chunks and assembled package will be stored.
- /// * `package`: [`PackageId`](../message/struct.PackageId.html) of this transfer.
- /// * `checksum`: SHA1 checksum of the fully assembled package.
- pub fn new(prefix: String, id: UpdateId, checksum: String)
- -> Transfer {
- Transfer {
- update_id: id,
- checksum: checksum,
- transferred_chunks: Vec::new(),
- prefix_dir: prefix,
- last_chunk_received: time::get_time().sec
- }
- }
-
- /// Create a transfer with empty values. To be used in tests.
- ///
- /// # Arguments
- /// * `prefix`: Path where transferred chunks and assembled package will be stored. This should
- /// be a temporary directory for tests.
- #[cfg(test)]
- pub fn new_test(prefix: &PathPrefix) -> Transfer {
- Transfer {
- update_id: UpdateId::new(),
- checksum: "".to_string(),
- transferred_chunks: Vec::new(),
- prefix_dir: prefix.to_string(),
- last_chunk_received: time::get_time().sec
- }
- }
-
- /// Randomize a existing transfer, by creating a random
- /// [`PackageId`](../message/struct.PackageId.html). Returns the created `PackageId`, so it can
- /// be used in assertions.
- ///
- /// # Arguments
- /// * `i`: Size of the name and version strings.
- #[cfg(test)]
- pub fn randomize(&mut self, i: usize) -> UpdateId {
- let update_id = rand::thread_rng()
- .gen_ascii_chars().take(i).collect::<String>();
-
- trace!("Testing with:");
- trace!(" update_id: {}", update_id);
- self.update_id = update_id.clone();
- update_id
- }
-
- /// Write a transferred chunk to disk. Returns false and logs an error if something goes wrong.
- ///
- /// # Arguments
- /// * `msg`: Base64 encoded data of this chunk.
- /// * `index`: Index of this chunk
- pub fn write_chunk(&mut self,
- msg: &str,
- index: u64) -> bool {
- let success = msg.from_base64().map_err(|e| {
- error!("Could not decode chunk {} for update_id {}", index, self.update_id);
- error!("{}", e)
- }).and_then(|msg| self.get_chunk_path(index).map_err(|e| {
- error!("Could not get path for chunk {}", index);
- error!("{}", e)
- }).map(|path| {
- trace!("Saving chunk to {}", path.display());
- if write_new_file(&path, &msg) {
- self.transferred_chunks.push(index);
- self.transferred_chunks.sort();
- self.transferred_chunks.dedup();
- true
- } else {
- error!("Couldn't write chunk {} for update_id {}", index, self.update_id);
- false
- }
- })).unwrap_or(false);
-
- self.last_chunk_received = time::get_time().sec;
- success
- }
-
- /// Assemble the transferred chunks to a package and verify it with the provided checksum.
- /// Returns `false` and prints a error message if either the package can't be assembled or the
- /// checksum doesn't match.
- pub fn assemble_package(&self) -> Result<PathBuf, String> {
- trace!("Finalizing package {}", self.update_id);
- self.assemble_chunks().
- and_then(|_| {
- if self.checksum() {
- self.get_package_path()
- } else {
- Err(format!("Cannot assemble_package for update_id: {}", self.update_id))
- }
- })
- }
-
- /// Collect all chunks and concatenate them into one file. Returns a `String` with a error
- /// message, should something go wrong.
- fn assemble_chunks(&self) -> Result<(), String> {
- let package_path = try!(self.get_package_path());
-
- trace!("Saving update_id {} to {}", self.update_id, package_path.display());
-
- let mut file = try!(OpenOptions::new()
- .write(true).append(true)
- .create(true).truncate(true)
- .open(package_path)
- .map_err(|x| format!("Couldn't open file: {}", x)));
-
- let path: PathBuf = try!(self.get_chunk_dir());
-
- // Make sure all indices are valid and sort them
- let mut indices = Vec::new();
- for entry in try!(read_dir(&path)) {
- let entry = try!(entry.map_err(|x| format!("No entries: {}", x)));
- indices.push(try!(parse_index(entry)));
- }
- indices.sort();
-
- // Append indices to the final file
- for index in indices {
- try!(self.copy_chunk(&path, index, &mut file));
- }
- Ok(())
- }
-
- /// Read a chunk file file and append it to a package file. Returns a `String` with a error
- /// message should something go wrong.
- ///
- /// # Arguments
- /// * `path`: Pointer to a [`PathBuf`]
- /// (https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) where the chunks are
- /// cached.
- /// * `index`: Index of the chunk to append.
- /// * `file`: Pointer to a `File` where the chunk should be appended. Should be created with
- /// `OpenOptions` and the append only option. See the documentation for [`OpenOptions`]
- /// (https://doc.rust-lang.org/stable/std/fs/struct.OpenOptions.html), [`File`]
- /// (https://doc.rust-lang.org/stable/std/fs/struct.File.html), and the implementation of
- /// [`assemble_chunks`](#method.assemble_chunks) for details.
- fn copy_chunk(&self, path: &PathBuf, index: u64, file: &mut File)
- -> Result<(), String> {
- let name = index.to_string();
- let mut chunk_path = path.clone();
- chunk_path.push(&name);
- let mut chunk =
- try!(OpenOptions::new().open(chunk_path)
- .map_err(|x| format!("Couldn't open file: {}", x)));
-
- let mut buf = Vec::new();
- try!(chunk.read_to_end(&mut buf)
- .map_err(|x| format!("Couldn't read file {}: {}", name, x)));
- try!(file.write(&mut buf)
- .map_err(|x| format!("Couldn't write chunk {} to file {}: {}",
- name, self.update_id, x)));
-
- trace!("Wrote chunk {} to update_id {}", name, self.update_id);
- Ok(())
- }
-
- /// Verify the checksum of this transfer. Assumes the package was already assembled. Prints a
- /// error message showing the mismatched checksums and returns false on errors.
- fn checksum(&self) -> bool {
- let path = try_or!(self.get_package_path(), return false);
- let mut file = try_or!(OpenOptions::new().open(path), return false);
- let mut data = Vec::new();
-
- // TODO: avoid reading in the whole file at once
- try_msg_or!(file.read_to_end(&mut data),
- "Couldn't read file to check",
- return false);
-
- let mut hasher = Sha1::new();
- hasher.input(&data);
- let hash = hasher.result_str();
-
- if hash == self.checksum {
- true
- } else {
- error!("Checksums didn't match for update_id {}", self.update_id);
- error!(" Expected: {}", self.checksum);
- error!(" Got: {}", hash);
- false
- }
- }
-
- /// Get the full path for the specified chunk index. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- ///
- /// # Arguments
- /// * `index`: The index for which the path should be constructed
- fn get_chunk_path(&self, index: u64) -> Result<PathBuf, String> {
- let mut path = try!(self.get_chunk_dir());
- let filename = index.to_string();
-
- trace!("Using filename {}", filename);
- path.push(filename);
- Ok(path)
- }
-
- /// Get the full path for the package of this `Transfer`. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- fn get_package_path(&self) -> Result<PathBuf, String> {
- let mut path = try!(self.get_package_dir());
- path.push(format!("{}.spkg", self.update_id));
- Ok(path)
- }
-
- /// Get the directory, where this `Transfer` caches chunks. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- fn get_chunk_dir(&self) -> Result<PathBuf, String> {
- let mut path = PathBuf::from(&self.prefix_dir);
- path.push("downloads");
- path.push(format!("{}", self.update_id));
-
- fs::create_dir_all(&path).map_err(|e| {
- let path_str = path.to_str().unwrap_or("unknown");
- format!("Couldn't create chunk dir at '{}': {}", path_str, e)
- }).map(|_| path)
- }
-
- /// Get the directory, where this `Transfer` stores the assembled package. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- fn get_package_dir(&self) -> Result<PathBuf, String> {
- let mut path = PathBuf::from(&self.prefix_dir);
- path.push("packages");
-
- fs::create_dir_all(&path).map_err(|e| {
- let path_str = path.to_str().unwrap_or("unknown");
- format!("Couldn't create packges dir at '{}': {}", path_str, e)
- }).map(|_| path)
- }
-}
-
-impl Drop for Transfer {
- /// When a `Transfer` is freed it will also clear out the associated chunk cache on disk.
- fn drop(&mut self) {
- let dir = try_or!(self.get_chunk_dir(), return);
- trace!("Dropping transfer for package {}", self.update_id);
-
- for entry in try_or!(read_dir(&dir), return) {
- let entry = try_or!(entry, continue);
- let _ = entry.file_name().into_string().map_err(|_|
- error!("Found a malformed entry!")
- ).map(|name| {
- trace!("Dropping chunk file {}", name);
- try_or!(fs::remove_file(entry.path()), return);
- });
- }
-
- try_or!(fs::remove_dir(dir), return);
- }
-}
-
-/// Write the provided `data` to the file at `path`. Will create the file if it doesn't exist and
-/// overwrite existing files. Returns `false` on errors, after logging a error message.
-///
-/// # Arguments
-/// * `path`: Pointer to a [`PathBuf`]
-/// (https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) where the data will be
-/// written to. Needs to point to a (possibly nonexistent) file.
-/// * `data`: The data to be written to disk.
-fn write_new_file(path: &PathBuf, data: &Vec<u8>) -> bool {
- let mut file = try_or!(OpenOptions::new()
- .write(true).create(true)
- .truncate(true).open(path),
- return false);
-
- try_or!(file.write_all(data), return false);
- try_or!(file.flush(), return false);
- true
-}
-
-/// Read the contents of a directory. Returns a
-/// [`ReadDir`](https://doc.rust-lang.org/stable/std/fs/struct.ReadDir.html) iterator on success or
-/// a `String` with a detailed error message on failure.
-fn read_dir(path: &PathBuf) -> Result<fs::ReadDir, String> {
- fs::read_dir(path).map_err(|e| {
- let path_str = path.to_str().unwrap_or("unknown");
- format!("Couldn't read dir at '{}': {}", path_str, e)
- })
-}
-
-/// Parse a [`DirEntry`](https://doc.rust-lang.org/stable/std/fs/struct.DirEntry.html) to a `u64`.
-/// Returns the parsed number on success or a `String` with a detailed error message on failure.
-///
-/// # Arguments
-/// * `entry`: `DirEntry` to be parsed.
-fn parse_index(entry: DirEntry) -> Result<u64, String> {
- let name = entry.file_name().into_string()
- .unwrap_or("unknown".to_string());
- u64::from_str(&name)
- .map_err(|_| "Couldn't parse chunk index from filename".to_string())
-}
-
-use std::collections::HashMap;
-
-/// Type alias to hide the internal `HashMap`, that is used to store
-/// [`Transfer`](../persistence/struct.Transfer.html)s.
-pub struct Transfers {
- items: HashMap<UpdateId, Transfer>,
- storage_dir: String
-}
-
-impl Transfers {
- pub fn new(dir: String) -> Transfers {
- Transfers {
- items: HashMap::new(),
- storage_dir: dir
- }
- }
-
- pub fn get(&self, pkg: &UpdateId) -> Option<&Transfer> {
- self.items.get(pkg)
- }
-
- pub fn get_mut(&mut self, pkg: &UpdateId) -> Option<&mut Transfer> {
- self.items.get_mut(pkg)
- }
-
- pub fn push(&mut self, pkg: UpdateId, cksum: String) {
- self.items.insert(
- pkg.clone(),
- Transfer::new(self.storage_dir.to_string(), pkg, cksum));
- }
-
- #[cfg(test)]
- pub fn push_test(&mut self, tr: Transfer) {
- self.items.insert(tr.update_id.clone(), tr);
- }
-
- #[cfg(test)]
- pub fn is_empty(&self) -> bool {
- self.items.is_empty()
- }
-
- pub fn remove(&mut self, pkg: &UpdateId) {
- self.items.remove(pkg);
- }
-
- pub fn clear(&mut self) {
- self.items.clear();
- }
-
- pub fn prune(&mut self, now: i64, timeout: i64) {
- self.items.iter()
- .filter(|&(_, v)| now - v.last_chunk_received > timeout)
- .map(|(k, _)| k.clone())
- .collect::<Vec<UpdateId>>()
- .iter().map(|k| {
- self.items.remove(k);
- info!("Transfer for update_id {} timed out after {} ms", k, timeout)})
- .collect::<Vec<()>>();
- }
-}
-
-
-#[cfg(test)]
-mod test {
- use super::*;
- use test_library::*;
-
- use std::path::PathBuf;
- use std::fs;
- use std::fs::OpenOptions;
- use std::io::prelude::*;
-
- use rand;
- use rand::Rng;
- use rustc_serialize::base64;
- use rustc_serialize::base64::ToBase64;
-
- fn create_tmp_directories(prefix: &PathPrefix) {
- for i in 1..20 {
- let mut transfer = Transfer::new_test(prefix);
- let update_id = transfer.randomize(i);
- let chunk_dir: PathBuf = transfer.get_chunk_dir().unwrap();
- let path = format!("{}/downloads/{}", prefix, update_id);
- assert_eq!(chunk_dir.to_str().unwrap(), path);
-
- let path = PathBuf::from(path);
- // This also makes sure it's a directory
- let dir = fs::read_dir(&path).unwrap();
-
- for _ in dir {
- panic!("Found non-empty directory!");
- }
- }
- }
-
- #[test]
- fn it_creates_a_tmp_directory() {
- test_init!();
- let prefix = PathPrefix::new();
- create_tmp_directories(&prefix);
- }
-
- #[test]
- fn it_cleans_up_the_tmp_directories() {
- test_init!();
- let prefix = PathPrefix::new();
- create_tmp_directories(&prefix);
- let path = PathBuf::from(format!("{}/downloads/", prefix));
- let dir = fs::read_dir(&path).unwrap();
-
- for _ in dir {
- panic!("Found non-empty directory!");
- }
- }
-
- #[test]
- fn it_creates_a_persistent_directory_per_package() {
- test_init!();
- let prefix = PathPrefix::new();
- for i in 1..20 {
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(i);
-
- let chunk_dir: PathBuf = transfer.get_package_path().unwrap();
- let path = format!("{}/packages/{}.spkg", prefix, update_id);
- assert_eq!(chunk_dir.to_str().unwrap(), path);
- }
- }
-
- macro_rules! assert_chunk_written {
- ($transfer:ident,
- $prefix:ident,
- $update_id:ident,
- $index:ident,
- $data:ident) => {{
- trace!("Testing with: {}", $data);
-
- let b64_data = $data.as_bytes().to_base64(
- base64::Config {
- char_set: base64::CharacterSet::UrlSafe,
- newline: base64::Newline::LF,
- pad: true,
- line_length: None
- });
-
- trace!("Encoded as: {}", b64_data);
-
- $transfer.write_chunk(&b64_data, $index as u64);
-
- let path = format!("{}/downloads/{}/{}", $prefix, $update_id, $index);
-
- trace!("Expecting file at: {}", path);
-
- let mut from_disk = Vec::new();
- OpenOptions::new()
- .open(PathBuf::from(path))
- .unwrap()
- .read_to_end(&mut from_disk)
- .unwrap();
-
- assert_eq!($data.into_bytes(), from_disk);
- }}
- }
-
- #[test]
- fn it_writes_decoded_data_to_disk() {
- test_init!();
- let prefix = PathPrefix::new();
- for i in 1..20 {
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(i);
- for i in 1..20 {
- let data = rand::thread_rng()
- .gen_ascii_chars().take(i).collect::<String>();
- assert_chunk_written!(transfer, prefix, update_id, i, data);
- }
- }
- }
-
- #[test]
- fn it_correctly_assembles_stored_chunks() {
- test_init!();
- let prefix = PathPrefix::new();
- for i in 1..20 {
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(i);
- let mut full_data = String::new();
- for i in 1..20 {
- let data = rand::thread_rng()
- .gen_ascii_chars().take(i).collect::<String>();
- full_data.push_str(&data);
-
- assert_chunk_written!(transfer, prefix, update_id, i, data);
- }
-
- transfer.assemble_chunks().unwrap();
-
- let path = format!("{}/packages/{}.spkg", prefix, update_id);
-
- trace!("Expecting assembled file at: {}", path);
-
- let mut from_disk = Vec::new();
- OpenOptions::new()
- .open(PathBuf::from(path))
- .unwrap()
- .read_to_end(&mut from_disk)
- .unwrap();
-
- assert_eq!(full_data.into_bytes(), from_disk);
- }
- }
-
- fn checksum_matching(data: String, checksum: String) -> bool {
- let prefix = PathPrefix::new();
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(20);
- let index = 0;
- assert_chunk_written!(transfer, prefix, update_id, index, data);
- transfer.assemble_chunks().unwrap();
-
- transfer.checksum = checksum;
- transfer.checksum()
- }
-
- #[test]
- fn it_returns_true_for_correct_checksums() {
- test_init!();
- assert!(checksum_matching("test\n".to_string(),
- "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83".to_string()));
- }
-
- #[test]
- fn it_returns_false_for_incorrect_checksums() {
- test_init!();
- assert!(!checksum_matching("test\n".to_string(),
- "fa7c4d75bae3a641d1f9ab5df028175bfb8a69ca".to_string()));
- }
-
- #[test]
- fn it_returns_false_for_invalid_checksums() {
- test_init!();
- assert!(!checksum_matching("test\n".to_string(),
- "invalid".to_string()));
- }
-}
diff --git a/src/remote/jsonrpc.rs b/src/remote/jsonrpc.rs
deleted file mode 100644
index 5676a37..0000000
--- a/src/remote/jsonrpc.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-//! RVI specific implementation of the jsonrpc protocol
-use time;
-
-/// Type to encode a generic jsonrpc call.
-#[derive(RustcDecodable,RustcEncodable,Debug)]
-pub struct Request<T> {
- /// The version of jsonrpc to use, has to be set to `"2.0"`.
- pub jsonrpc: String,
- /// The identifier of the request. Only unsigned numbers are accepted
- // TODO: id can be any type
- pub id: u64,
- /// The method to call on the receiving side.
- pub method: String,
- /// Arguments for the method.
- pub params: T
-}
-
-impl<T> Request<T> {
- /// Returns a new `Request`.
- ///
- /// # Arguments
- /// * `s`: The name of the method to call.
- /// * `p`: The arguments of said method.
- pub fn new(s: &str, p: T) -> Request<T> {
- Request::<T> {
- jsonrpc: "2.0".to_string(),
- id: time::precise_time_ns(),
- method: s.to_string(),
- params: p
- }
- }
-}
-
-/// Response to a jsonrpc call, indicating a successful method call.
-#[derive(RustcDecodable,RustcEncodable)]
-pub struct OkResponse<T> {
- /// The version of jsonrpc to use, has to be set to `"2.0"`.
- pub jsonrpc: String,
- /// The identifier of the jsonrpc call this response belongs to. Only unsigned numbers are
- /// accepted
- // TODO: id can be any type
- pub id: u64,
- /// The result of the method call, if any.
- pub result: Option<T>
-}
-
-impl<T> OkResponse<T> {
- /// Returns a new `OkResponse`
- ///
- /// # Arguments
- /// * `id`: The identifier of the jsonrpc call the returned response belongs to.
- /// * `result`: The result of the method call, if any.
- pub fn new(id: u64, result: Option<T>) -> OkResponse<T> {
- OkResponse {
- jsonrpc: "2.0".to_string(),
- id: id,
- result: result
- }
- }
-}
-
-/// Response to a jsonrpc call, indicating failure.
-#[derive(RustcDecodable,RustcEncodable)]
-pub struct ErrResponse {
- /// The version of jsonrpc to use, has to be set to `"2.0"`.
- pub jsonrpc: String,
- /// The identifier of the jsonrpc call this response belongs to. Only unsigned numbers are
- /// accepted
- // TODO: id can be any type
- pub id: u64,
- /// The error code and message.
- pub error: ErrorCode
-}
-
-impl ErrResponse {
- /// Returns a new `ErrResponse`
- ///
- /// # Arguments
- /// * `id`: The identifier of the jsonrpc call the returned response belongs to.
- /// * `error`: The error code and message. See [`ErrorCode`](./struct.ErrorCode.html).
- pub fn new(id: u64, error: ErrorCode) -> ErrResponse {
- ErrResponse {
- jsonrpc: "2.0".to_string(),
- id: id,
- error: error
- }
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Invalid
- /// Request"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn invalid_request(id: u64) -> ErrResponse {
- ErrResponse::new(id,
- ErrorCode {
- code: -32600,
- message: "Invalid Request".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Method not
- /// found"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn method_not_found(id: u64) -> ErrResponse {
- ErrResponse::new(id,
- ErrorCode {
- code: -32601,
- message: "Method not found".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Parse
- /// error"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn parse_error() -> ErrResponse {
- ErrResponse::new(0,
- ErrorCode {
- code: -32700,
- message: "Parse error".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Invalid
- /// params"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn invalid_params(id: u64) -> ErrResponse {
- ErrResponse::new(
- id,
- ErrorCode {
- code: -32602,
- message: "Invalid params".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a unspecified error.
- pub fn unspecified(id: u64) -> ErrResponse {
- ErrResponse::new(
- id,
- ErrorCode {
- code: -32100,
- message: "Couldn't handle request".to_string()
- })
- }
-}
-
-/// Type to encode a jsonrpc error.
-#[derive(RustcDecodable,RustcEncodable)]
-pub struct ErrorCode {
- /// The error code as [specified by
- /// jsonrpc](http://www.jsonrpc.org/specification#error_object).
- pub code: i32,
- /// The error message as [specified by
- /// jsonrpc](http://www.jsonrpc.org/specification#error_object).
- pub message: String
-}
diff --git a/src/remote/mod.rs b/src/remote/mod.rs
deleted file mode 100644
index 42a2c7b..0000000
--- a/src/remote/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-mod dw;
-mod jsonrpc;
-mod parm;
-pub mod rvi;
-pub mod svc;
diff --git a/src/remote/parm.rs b/src/remote/parm.rs
deleted file mode 100644
index 5c2db8b..0000000
--- a/src/remote/parm.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-use event::UpdateId;
-use event::inbound::{InboundEvent, UpdateAvailable, GetInstalledSoftware, DownloadComplete};
-
-use super::dw::Transfers;
-use super::svc::{BackendServices, RemoteServices};
-
-use std::result;
-use std::sync::Mutex;
-
-#[derive(Debug)]
-pub enum Error {
- UnknownPackage,
- IoFailure,
- SendFailure
-}
-pub type Result = result::Result<Option<InboundEvent>, Error>;
-
-/// Trait that every message handler needs to implement.
-pub trait ParamHandler {
- /// Handle the message.
- ///
- /// Return a [`Event`](../message/enum.Event.html) to be passed to the
- /// [`main_loop`](../main_loop/index.html) if apropriate.
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>)
- -> Result;
-}
-
-
-/// Type for "Notify" messages.
-#[derive(RustcDecodable, Clone)]
-pub struct NotifyParams {
- /// A `Vector` of packages, that are available for download.
- pub update_available: UpdateAvailable,
- /// The service URLs, that the SOTA server supports.
- pub services: BackendServices,
-}
-
-impl ParamHandler for NotifyParams {
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- _: &Mutex<Transfers>) -> Result {
- let mut services = services.lock().unwrap();
- services.set(self.services.clone());
-
- Ok(Some(InboundEvent::UpdateAvailable(self.update_available.clone())))
- }
-}
-
-
-/// Type for "Start Transfer" messages.
-#[derive(RustcDecodable)]
-pub struct StartParams {
- pub update_id: UpdateId,
- /// The amount of chunks this `Transfer` will have.
- pub chunkscount: u64,
- /// The SHA1 checksum of the assembled package.
- pub checksum: String
-}
-
-impl ParamHandler for StartParams {
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let services = services.lock().unwrap();
- let mut transfers = transfers.lock().unwrap();
-
- info!("Starting transfer for update_id {}", self.update_id);
-
- transfers.push(self.update_id.clone(), self.checksum.clone());
- services.send_chunk_received(
- ChunkReceived {
- update_id: self.update_id.clone(),
- chunks: Vec::new(),
- vin: services.vin.clone() })
- .map_err(|e| {
- error!("Error on sending start ACK: {}", e);
- Error::SendFailure })
- .map(|_| None)
- }
-}
-
-/// Encodes the "Chunk Received" message, indicating that a chunk was successfully transferred.
-#[derive(RustcEncodable)]
-pub struct ChunkReceived {
- /// The transfer to which the transferred chunk belongs.
- pub update_id: UpdateId,
- /// A list of the successfully transferred chunks.
- pub chunks: Vec<u64>,
- /// The VIN of this device.
- pub vin: String
-}
-
-
-/// Type for messages transferring single chunks.
-#[derive(RustcDecodable)]
-pub struct ChunkParams {
- /// The package transfer this chunk belongs to.
- pub update_id: UpdateId,
- /// The data of the transferred chunk.
- pub bytes: String,
- /// The index of this chunk.
- pub index: u64
-}
-
-impl ParamHandler for ChunkParams {
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let services = services.lock().unwrap();
- let mut transfers = transfers.lock().unwrap();
- transfers.get_mut(&self.update_id).map(|t| {
- if t.write_chunk(&self.bytes, self.index) {
- info!("Wrote chunk {} for package {}", self.index, self.update_id);
- services.send_chunk_received(
- ChunkReceived {
- update_id: self.update_id.clone(),
- chunks: t.transferred_chunks.clone(),
- vin: services.vin.clone() })
- .map_err(|e| {
- error!("Error on sending ChunkReceived: {}", e);
- Error::SendFailure })
- .map(|_| None)
- } else {
- Err(Error::IoFailure)
- }
- }).unwrap_or_else(|| {
- error!("Couldn't find transfer for update_id {}", self.update_id);
- Err(Error::UnknownPackage)
- })
- }
-}
-
-
-/// Type for "Finish Transfer" messages.
-#[derive(RustcDecodable)]
-pub struct FinishParams {
- /// The package transfer to finalize.
- pub update_id: UpdateId,
- pub signature: String
-}
-
-impl ParamHandler for FinishParams {
- fn handle(&self,
- _: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let mut transfers = transfers.lock().unwrap();
- transfers.get(&self.update_id).ok_or(Error::UnknownPackage)
- .and_then(|t| {
- t.assemble_package().map_err(|_| Error::IoFailure) })
- .and_then(|p| {
- p.into_os_string().into_string().map_err(|_| Error::IoFailure) })
- .map(|p| {
- transfers.remove(&self.update_id);
- info!("Finished transfer of {}", self.update_id);
- Some(InboundEvent::DownloadComplete(DownloadComplete {
- update_id: self.update_id.clone(),
- update_image: p,
- signature: self.signature.clone() })) })
- }
-}
-
-
-/// Type for "Abort Transfer" messages.
-#[derive(RustcDecodable)]
-pub struct AbortParams;
-
-impl ParamHandler for AbortParams {
- fn handle(&self,
- _: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let mut transfers = transfers.lock().unwrap();
- transfers.clear();
- Ok(None)
- }
-}
-
-
-/// Type for "Get All Packages" messages.
-pub type ReportParams = GetInstalledSoftware;
-
-impl ParamHandler for ReportParams {
- fn handle(&self,
- _: &Mutex<RemoteServices>,
- _: &Mutex<Transfers>) -> Result {
- Ok(Some(InboundEvent::GetInstalledSoftware(self.clone())))
- }
-}
diff --git a/src/remote/rvi/edge.rs b/src/remote/rvi/edge.rs
deleted file mode 100644
index 6eec01f..0000000
--- a/src/remote/rvi/edge.rs
+++ /dev/null
@@ -1,162 +0,0 @@
-//! Implements the RVI facing webservice.
-
-use std::io::{Read, Write};
-use std::thread;
-use hyper::Server;
-use hyper::server::{Handler, Request, Response};
-use rustc_serialize::json;
-use rustc_serialize::json::Json;
-
-use remote::jsonrpc;
-use remote::jsonrpc::{OkResponse, ErrResponse};
-
-use remote::rvi::send;
-use remote::rvi::message::{RegisterServiceRequest, RegisterServiceResponse};
-
-pub trait ServiceHandler: Sync + Send {
- fn handle_service(&self, id: u64, service: &str, message: &str)
- -> Result<OkResponse<i32>, ErrResponse>;
- fn register_services<F: Fn(&str) -> String>(&self, reg: F);
-}
-
-/// Encodes the service edge of the webservice.
-pub struct ServiceEdge<H: ServiceHandler + 'static> {
- /// The full URL where RVI can be reached.
- rvi_url: String,
- /// The `host:port` to bind and listen for incoming RVI messages.
- edge_url: String,
- hdlr: H
-}
-
-impl<H: ServiceHandler + 'static> ServiceEdge<H> {
- /// Create a new service edge.
- ///
- /// # Arguments
- /// * `r`: The full URL where RVI can be reached.
- /// * `e`: The `host:port` combination where the edge should bind.
- /// * `s`: A sender to communicate back the service URLs.
- pub fn new(r: String, e: String, h: H) -> ServiceEdge<H> {
- ServiceEdge {
- rvi_url: r,
- edge_url: e,
- hdlr: h
- }
- }
-
- /// Register a service. Returns the full service URL as provided by RVI. Panics if the
- /// registration in RVI failed. This can be handled by starting the RVI edge in a separate
- /// thread.
- ///
- /// # Arguments
- /// * `s`: The service to register. Will get prepended with the device identifier by RVI.
- pub fn register_service(&self, s: &str) -> String {
- let json_rpc = jsonrpc::Request::new(
- "register_service",
- RegisterServiceRequest {
- network_address: self.edge_url.to_string(),
- service: s.to_string()
- });
-
- let resp = send(&self.rvi_url, &json_rpc)
- .map_err(|e| error!("Couldn't send registration to RVI\n{}", e))
- .and_then(|r| json::decode::<jsonrpc::OkResponse<RegisterServiceResponse>>(&r)
- .map_err(|e| error!("Couldn't parse response when registering in RVI\n{}", e)))
- .unwrap();
-
- resp.result
- .expect("Didn't get full service name when registering")
- .service
- }
-
- /// Starts the service edge.
- ///
- /// It binds on the provided `host:port` combination, registers all services and then waits for
- /// incoming RVI messages. On incoming messages it forks another thread and passes the message
- /// to the provided `Handler`. For details about how to implement a `Handler` see the
- /// [`hyper`](../../hyper/index.html) documentation and the [reference
- /// implementation](../handler/index.html).
- ///
- /// Panics if it can't reach or register in RVI.
- ///
- /// # Arguments
- /// * `h`: The `Handler` all messages are passed to.
- /// * `s`: A `Vector` of service strings to register in RVI.
- pub fn start(self) {
- let url = self.edge_url.clone();
- self.hdlr.register_services(|s| self.register_service(s));
- thread::spawn(move || {
- Server::http(&*url).and_then(|srv| {
- info!("Ready to accept connections.");
- srv.handle(self) })
- .map_err(|e| error!("Couldn't start server\n{}", e))
- .unwrap()
- });
- }
-
- /// Try to parse the type of a message and forward it to the appropriate message handler.
- /// Returns the result of the message handling or a `jsonrpc` result indicating a parser error.
- ///
- /// Needs to be extended to support new services.
- ///
- /// # Arguments
- /// * `message`: The message that will be parsed.
- fn handle_message(&self, message: &str)
- -> Result<OkResponse<i32>, ErrResponse> {
-
- let data = try!(
- Json::from_str(message)
- .map_err(|_| ErrResponse::parse_error()));
- let obj = try!(
- data.as_object().ok_or(ErrResponse::parse_error()));
- let rpc_id = try!(
- obj.get("id").and_then(|x| x.as_u64())
- .ok_or(ErrResponse::parse_error()));
-
- let method = try!(
- obj.get("method").and_then(|x| x.as_string())
- .ok_or(ErrResponse::invalid_request(rpc_id)));
-
- if method == "services_available" {
- Ok(OkResponse::new(rpc_id, None))
- }
- else if method != "message" {
- Err(ErrResponse::method_not_found(rpc_id))
- } else {
- let service = try!(obj.get("params")
- .and_then(|x| x.as_object())
- .and_then(|x| x.get("service_name"))
- .and_then(|x| x.as_string())
- .ok_or(ErrResponse::invalid_request(rpc_id)));
-
- self.hdlr.handle_service(rpc_id, service, message)
- }
- }
-}
-
-impl<H: ServiceHandler + 'static> Handler for ServiceEdge<H> {
- fn handle(&self, mut req: Request, resp: Response) {
- let mut rbody = String::new();
- try_or!(req.read_to_string(&mut rbody), return);
- debug!(">>> Received Message: {}", rbody);
- let mut resp = try_or!(resp.start(), return);
-
- macro_rules! send_response {
- ($rtype:ty, $resp:ident) => {
- match json::encode::<$rtype>(&$resp) {
- Ok(decoded_msg) => {
- try_or!(resp.write_all(decoded_msg.as_bytes()), return);
- debug!("<<< Sent Response: {}", decoded_msg);
- },
- Err(p) => { error!("{}", p); }
- }
- };
- }
-
- match self.handle_message(&rbody) {
- Ok(msg) => send_response!(OkResponse<i32>, msg),
- Err(msg) => send_response!(ErrResponse, msg)
- }
-
- try_or!(resp.end(), return);
- }
-}
diff --git a/src/remote/rvi/message.rs b/src/remote/rvi/message.rs
deleted file mode 100644
index 8adf596..0000000
--- a/src/remote/rvi/message.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-//! Wrappers for messages exchanged with RVI.
-
-use std::vec::Vec;
-use time;
-
-/// A generic incoming message.
-#[derive(RustcDecodable, RustcEncodable)]
-pub struct Message<T> {
- /// The service that got called.
- pub service_name: String,
- /// The paramaters to the service call.
- pub parameters: Vec<T>
-}
-
-/// A generic outgoing message.
-#[derive(RustcDecodable, RustcEncodable)]
-pub struct RVIMessage<T> {
- /// The service name to call.
- pub service_name: String,
- /// A timestamp when this message should expire. In UTC UNIX epoch.
- pub timeout: i64,
- /// The parameters to the service call.
- pub parameters: Vec<T>
-}
-
-impl<T> RVIMessage<T> {
- /// Create a new outgoing RVI message.
- ///
- /// # Arguments
- /// * `service`: The service name to call.
- /// * `parameters`: The parameters to the service call.
- /// * `tdelta`: Amount of seconds before the message will expire.
- pub fn new(service: &str,
- parameters: Vec<T>,
- tdelta: i64) -> RVIMessage<T> {
- let timeout = time::Duration::seconds(tdelta);
- RVIMessage {
- timeout: (time::get_time() + timeout).sec,
- service_name: service.to_string(),
- parameters: parameters
- }
- }
-}
-
-/// Encodes a registration request.
-#[derive(RustcEncodable)]
-pub struct RegisterServiceRequest {
- /// The network address where RVI can be reached.
- pub network_address: String,
- /// The service (short name) to register.
- pub service: String
-}
-
-/// Encodes a registration response.
-#[derive(RustcDecodable)]
-pub struct RegisterServiceResponse {
- /// Status number indicating success or failure. See the RVI documentation for details.
- pub status: i32,
- /// The full service URL, that RVI assigned.
- pub service: String
-}
diff --git a/src/remote/rvi/mod.rs b/src/remote/rvi/mod.rs
deleted file mode 100644
index a6791bc..0000000
--- a/src/remote/rvi/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-//! RVI bindings for Rust.
-//!
-//! RVI - Remote Vehicle Interaction - is the next generation of connected vehicle services. Based
-//! on the discussions inside and outside the Automotive Grade Linux expert group.
-//!
-//! This module implements Rust bindings to simplify the interaction with it.
-//!
-//! It is intended to be split out into a separate crate at some point in the future.
-
-mod edge;
-mod send;
-mod message;
-
-// Export public interface
-pub use super::rvi::edge::{ServiceEdge, ServiceHandler};
-pub use super::rvi::send::send;
-pub use super::rvi::send::send_message;
-pub use super::rvi::message::Message;
diff --git a/src/remote/rvi/send.rs b/src/remote/rvi/send.rs
deleted file mode 100644
index 8308203..0000000
--- a/src/remote/rvi/send.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-//! Helper functions for sending messages to RVI.
-
-use std::io::Read;
-use hyper::Client;
-use rustc_serialize::{json, Encodable};
-
-use remote::jsonrpc;
-use remote::rvi::message::RVIMessage;
-
-/// Send a object to RVI. Either returns the full response from RVI or a error message.
-///
-/// The object will get encoded to json. Apart from that no sanity checks are made. You usually
-/// don't need this function.
-///
-/// # Arguments
-/// * `url`: The full URL where RVI can be reached.
-/// * `b`: The object to encode and send to RVI.
-pub fn send<E: Encodable>(url: &str, b: &E) -> Result<String, String> {
- let client = Client::new();
-
- let mut resp = try!(json::encode(b)
- .map_err(|e| format!("{}", e))
- .and_then(|j| {
- debug!("<<< Sent Message: {}", j);
- client.post(url).body(&j).send()
- .map_err(|e| format!("{}", e))
- }));
-
- let mut rbody = String::new();
- try!(resp.read_to_string(&mut rbody)
- .map_err(|e| format!("{}", e)));
- debug!(">>> Received Response: {}", rbody);
- Ok(rbody)
-}
-
-/// Prepare a message and send it to RVI. Returns the full response from RVI on success or a error
-/// message on failure.
-///
-/// This wraps the provided object into a proper RVI message and encodes it to json. You usually
-/// should call this function.
-///
-/// **NOTE:** This currently implements a workaround for RVI, that will get fixed in the upcoming
-/// RVI version `0.5.0`, which will break this current implementation. For the new protocol you
-/// don't have to wrap the `params` in a one element `Vector` any more.
-///
-/// # Arguments
-/// * `url`: The full URL where RVI can be reached.
-/// * `b`: The object to wrap into a RVI Message, encode and send to RVI.
-/// * `addr`: The full RVI address (service URL) where this message should be sent to.
-#[cfg(not(test))]
-pub fn send_message<E: Encodable>(url: &str, b: E, addr: &str) -> Result<String, String> {
- let mut params = Vec::new();
- params.push(b);
- let message = RVIMessage::<E>::new(addr, params, 90);
- let json_rpc = jsonrpc::Request::new("message", message);
- send(url, &json_rpc)
-}
-#[cfg(test)]
-pub fn send_message<E: Encodable>(url: &str, _: E, addr: &str) -> Result<String, String> {
- Ok(format!("Faked sending to RVI: {}, {}", url, addr))
-}
diff --git a/src/remote/svc.rs b/src/remote/svc.rs
deleted file mode 100644
index 873cb80..0000000
--- a/src/remote/svc.rs
+++ /dev/null
@@ -1,272 +0,0 @@
-//! The main service handler
-//!
-//! Parses incoming messages and delegates them to the appropriate individual message handlers,
-//! passing on the results to the [`main_loop`](../main_loop/index.html)
-
-use std::ops::Deref;
-use std::sync::{Arc, Mutex};
-use std::sync::mpsc::Sender;
-use std::thread;
-use std::thread::sleep_ms;
-
-use rustc_serialize::{json, Decodable};
-use time;
-
-use event::{Event, UpdateId};
-use event::inbound::InboundEvent;
-use event::outbound::{UpdateReport, InstalledSoftware};
-
-use super::parm::{NotifyParams, StartParams, ChunkParams, ChunkReceived, FinishParams};
-use super::parm::{ReportParams, AbortParams, ParamHandler};
-use super::dw::Transfers;
-
-use super::jsonrpc;
-use super::jsonrpc::{OkResponse, ErrResponse};
-use super::rvi;
-
-use configuration::ClientConfiguration;
-
-/// Encodes the list of service URLs the client registered.
-///
-/// Needs to be extended to introduce new services.
-#[derive(RustcEncodable, Clone)]
-pub struct LocalServices {
- /// "Start Download" URL.
- pub start: String,
- /// "Chunk" URL.
- pub chunk: String,
- /// "Abort Download" URL.
- pub abort: String,
- /// "Finish Download" URL.
- pub finish: String,
- /// "Get All Packages" URL.
- pub getpackages: String,
-}
-
-impl LocalServices {
- /// Returns the VIN of this device.
- ///
- /// # Arguments
- /// * `vin_match`: The index, where to look for the VIN in the service URL.
- pub fn get_vin(&self, vin_match: i32) -> String {
- self.start.split("/").nth(vin_match as usize).unwrap().to_string()
- }
-}
-
-/// Encodes the service URLs, that the server provides.
-#[derive(RustcDecodable, Clone)]
-pub struct BackendServices {
- /// URL for the "Start Download" call.
- pub start: String,
- /// URL for the "Chunk Received" call.
- pub ack: String,
- /// URL for the "Installation Report" call.
- pub report: String,
- /// URL for the "Get All Packages" call.
- pub packages: String
-}
-
-#[derive(RustcEncodable, Clone)]
-struct StartDownload {
- vin: String,
- update_id: UpdateId,
- services: LocalServices,
-}
-
-#[derive(RustcEncodable, Clone)]
-struct UpdateResult {
- vin: String,
- update_report: UpdateReport
-}
-
-#[derive(RustcEncodable, Clone)]
-struct InstalledSoftwareResult {
- vin: String,
- installed_software: InstalledSoftware
-}
-
-pub struct RemoteServices {
- pub vin: String,
- url: String,
- local_svcs: Option<LocalServices>,
- svcs: Option<BackendServices>
-}
-
-impl RemoteServices {
- pub fn new(url: String) -> RemoteServices {
- RemoteServices {
- vin: String::new(),
- url: url,
- local_svcs: None,
- svcs: None
- }
- }
-
- pub fn set_remote(&mut self, vin: String, svcs: LocalServices) {
- self.vin = vin;
- self.local_svcs = Some(svcs);
- }
-
- pub fn set(&mut self, svcs: BackendServices) {
- self.svcs = Some(svcs);
- }
-
- pub fn send_chunk_received(&self, m: ChunkReceived) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(&self.url, m, &svcs.ack))
- }
-
- fn make_start_download(&self, id: UpdateId) -> StartDownload {
- StartDownload {
- vin: self.vin.clone(),
- services: self.local_svcs.iter().next().cloned().unwrap(),
- update_id: id
- }
- }
-
- pub fn send_start_download(&self, id: UpdateId) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(
- &self.url,
- self.make_start_download(id),
- &svcs.start))
- }
-
- pub fn send_update_report(&self, m: UpdateReport) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(
- &self.url,
- UpdateResult {
- vin: self.vin.clone(),
- update_report: m },
- &svcs.report))
- }
-
- pub fn send_installed_software(&self, m: InstalledSoftware) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(
- &self.url,
- InstalledSoftwareResult {
- vin: self.vin.clone(),
- installed_software: m },
- &svcs.packages))
- }
-}
-
-
-/// Type that encodes a single service handler.
-///
-/// Holds the necessary state, like in-progress transfers, that are needed for handling incoming
-/// messages and sending replies to RVI. Needs to be thread safe as
-/// [`hyper`](../../../hyper/index.html) handles requests asynchronously.
-pub struct ServiceHandler {
- /// A `Sender` that connects the handlers with the `main_loop`.
- sender: Mutex<Sender<Event>>,
- /// The currently in-progress `Transfer`s.
- transfers: Arc<Mutex<Transfers>>,
- /// The service URLs that the SOTA server advertised.
- remote_services: Arc<Mutex<RemoteServices>>,
- /// The full `Configuration` of sota_client.
- conf: ClientConfiguration
-}
-
-impl ServiceHandler {
- /// Create a new `ServiceHandler`.
- ///
- /// # Arguments
- /// * `transfers`: A `Transfers` object to store the in-progress `Transfer`s.
- /// * `sender`: A `Sender` to call back into the `main_loop`.
- /// * `url`: The full URL, where RVI can be reached.
- /// * `c`: The full `Configuration` of sota_client.
- pub fn new(sender: Sender<Event>,
- r: Arc<Mutex<RemoteServices>>,
- c: ClientConfiguration) -> ServiceHandler {
- let transfers = Arc::new(Mutex::new(Transfers::new(c.storage_dir.clone())));
- let tc = transfers.clone();
- c.timeout
- .map(|t| {
- let _ = thread::spawn(move || ServiceHandler::start_timer(tc.deref(), t));
- info!("Transfers timeout after {}", t)})
- .unwrap_or(info!("No timeout configured, transfers will never time out."));
-
- ServiceHandler {
- sender: Mutex::new(sender),
- transfers: transfers,
- remote_services: r,
- conf: c
- }
- }
-
- /// Starts a infinite loop to expire timed out transfers. Checks once a second for timed out
- /// transfers.
- ///
- /// # Arguments
- /// * `transfers`: Pointer to a `Transfers` object, that stores the transfers to be checked for
- /// expired timeouts.
- /// * `timeout`: The timeout in seconds.
- pub fn start_timer(transfers: &Mutex<Transfers>,
- timeout: i64) {
- loop {
- sleep_ms(1000);
- let mut transfers = transfers.lock().unwrap();
- transfers.prune(time::get_time().sec, timeout);
- }
- }
-
- /// Helper function to send a `Event` to the `main_loop`.
- ///
- /// # Arguments
- /// * `e`: `Event` to send.
- fn push_notify(&self, e: InboundEvent) {
- try_or!(self.sender.lock().unwrap().send(Event::Inbound(e)), return);
- }
-
- /// Create a message handler `D`, and let it process the `message`. If it returns a
- /// Event, forward it to the `main_loop`. Returns a `jsonrpc` response indicating
- /// success or failure.
- ///
- /// # Arguments
- /// * `message`: The message, that should be handled.
- fn handle_message_params<D>(&self, id: u64, message: &str)
- -> Result<OkResponse<i32>, ErrResponse>
- where D: Decodable + ParamHandler {
- json::decode::<jsonrpc::Request<rvi::Message<D>>>(&message)
- .map_err(|_| ErrResponse::invalid_params(id))
- .and_then(|p| {
- let handler = &p.params.parameters[0];
- handler.handle(&self.remote_services, &self.transfers)
- .map_err(|_| ErrResponse::unspecified(p.id))
- .map(|r| {
- r.map(|m| self.push_notify(m));
- OkResponse::new(p.id, None) })
- })
- }
-}
-
-impl rvi::ServiceHandler for ServiceHandler {
- fn handle_service(&self, id: u64, service: &str, message: &str)
- -> Result<OkResponse<i32>, ErrResponse> {
- match service {
- "/sota/notify" => self.handle_message_params::<NotifyParams>(id, message),
- "/sota/start" => self.handle_message_params::<StartParams>(id, message),
- "/sota/chunk" => self.handle_message_params::<ChunkParams>(id, message),
- "/sota/finish" => self.handle_message_params::<FinishParams>(id, message),
- "/sota/abort" => self.handle_message_params::<AbortParams>(id, message),
- "/sota/getpackages" => self.handle_message_params::<ReportParams>(id, message),
- _ => Err(ErrResponse::invalid_request(id))
- }
- }
-
- fn register_services<F: Fn(&str) -> String>(&self, reg: F) {
- reg("/sota/notify");
- let mut remote_svcs = self.remote_services.lock().unwrap();
- let svcs = LocalServices {
- start: reg("/sota/start"),
- chunk: reg("/sota/chunk"),
- abort: reg("/sota/abort"),
- finish: reg("/sota/finish"),
- getpackages: reg("/sota/getpackages")
- };
- remote_svcs.set_remote(svcs.get_vin(self.conf.vin_match), svcs);
- }
-}
diff --git a/src/rvi/edge.rs b/src/rvi/edge.rs
new file mode 100644
index 0000000..cadea74
--- /dev/null
+++ b/src/rvi/edge.rs
@@ -0,0 +1,127 @@
+use hyper::StatusCode;
+use hyper::net::{HttpStream, Transport};
+use hyper::server::{Server as HyperServer, Request as HyperRequest};
+use rustc_serialize::json;
+use rustc_serialize::json::Json;
+use std::{mem, str};
+use std::net::ToSocketAddrs;
+
+use datatype::{RpcRequest, RpcOk, RpcErr, Url};
+use http::{Server, ServerHandler};
+use super::services::Services;
+
+
+/// The HTTP server endpoint for `RVI` client communication.
+pub struct Edge {
+ rvi_edge: Url,
+ services: Services,
+}
+
+impl Edge {
+ /// Create a new `Edge` by registering each `RVI` service.
+ pub fn new(mut services: Services, rvi_edge: String, rvi_client: Url) -> Self {
+ services.register_services(|service| {
+ let req = RpcRequest::new("register_service", RegisterServiceRequest {
+ network_address: rvi_edge.clone(),
+ service: service.to_string(),
+ });
+ let resp = req.send(rvi_client.clone())
+ .unwrap_or_else(|err| panic!("RegisterServiceRequest failed: {}", err));
+ let rpc_ok = json::decode::<RpcOk<RegisterServiceResponse>>(&resp)
+ .unwrap_or_else(|err| panic!("couldn't decode RegisterServiceResponse: {}", err));
+ rpc_ok.result.expect("expected rpc_ok result").service
+ });
+
+ Edge { rvi_edge: rvi_edge.parse().expect("couldn't parse edge server as url"), services: services }
+ }
+
+ /// Start the HTTP server listening for incoming RVI client connections.
+ pub fn start(&mut self) {
+ let mut addrs = self.rvi_edge.to_socket_addrs()
+ .unwrap_or_else(|err| panic!("couldn't parse edge url: {}", err));
+ let server = HyperServer::http(&addrs.next().expect("no SocketAddr found"))
+ .unwrap_or_else(|err| panic!("couldn't start rvi edge server: {}", err));
+ let (addr, server) = server.handle(move |_| EdgeHandler::new(self.services.clone())).unwrap();
+ info!("RVI server edge listening at http://{}.", addr);
+ server.run();
+ }
+}
+
+
+#[derive(RustcEncodable)]
+struct RegisterServiceRequest {
+ pub network_address: String,
+ pub service: String,
+}
+
+#[derive(RustcDecodable)]
+struct RegisterServiceResponse {
+ pub service: String,
+ pub status: i32,
+}
+
+
+
+struct EdgeHandler {
+ services: Services,
+ resp_code: StatusCode,
+ resp_body: Option<Vec<u8>>
+}
+
+impl EdgeHandler {
+ fn new(services: Services) -> ServerHandler<HttpStream> {
+ ServerHandler::new(Box::new(EdgeHandler {
+ services: services,
+ resp_code: StatusCode::InternalServerError,
+ resp_body: None,
+ }))
+ }
+}
+
+impl<T: Transport> Server<T> for EdgeHandler {
+ fn headers(&mut self, _: HyperRequest<T>) {}
+
+ fn request(&mut self, body: Vec<u8>) {
+ let outcome = || -> Result<RpcOk<i32>, RpcErr> {
+ let text = try!(str::from_utf8(&body).map_err(|err| RpcErr::parse_error(err.to_string())));
+ let data = try!(Json::from_str(text).map_err(|err| RpcErr::parse_error(err.to_string())));
+ let object = try!(data.as_object().ok_or(RpcErr::parse_error("not an object".to_string())));
+ let id = try!(object.get("id").and_then(|x| x.as_u64())
+ .ok_or(RpcErr::parse_error("expected id".to_string())));
+ let method = try!(object.get("method").and_then(|x| x.as_string())
+ .ok_or(RpcErr::invalid_request(id, "expected method".to_string())));
+
+ match method {
+ "services_available" => Ok(RpcOk::new(id, None)),
+
+ "message" => {
+ let params = try!(object.get("params").and_then(|p| p.as_object())
+ .ok_or(RpcErr::invalid_request(id, "expected params".to_string())));
+ let service = try!(params.get("service_name").and_then(|s| s.as_string())
+ .ok_or(RpcErr::invalid_request(id, "expected params.service_name".to_string())));
+ self.services.handle_service(service, id, text)
+ }
+
+ _ => Err(RpcErr::method_not_found(id, format!("unknown method: {}", method)))
+ }
+ }();
+
+ match outcome {
+ Ok(msg) => {
+ let body = json::encode::<RpcOk<i32>>(&msg).expect("couldn't encode RpcOk response");
+ self.resp_code = StatusCode::Ok;
+ self.resp_body = Some(body.into_bytes());
+ }
+
+ Err(err) => {
+ let body = json::encode::<RpcErr>(&err).expect("couldn't encode RpcErr response");
+ self.resp_code = StatusCode::BadRequest;
+ self.resp_body = Some(body.into_bytes());
+ }
+ }
+ }
+
+ fn response(&mut self) -> (StatusCode, Option<Vec<u8>>) {
+ (self.resp_code, mem::replace(&mut self.resp_body, None))
+ }
+}
diff --git a/src/rvi/mod.rs b/src/rvi/mod.rs
new file mode 100644
index 0000000..ba1d40a
--- /dev/null
+++ b/src/rvi/mod.rs
@@ -0,0 +1,9 @@
+pub mod edge;
+pub mod parameters;
+pub mod services;
+pub mod transfers;
+
+pub use self::edge::Edge;
+pub use self::parameters::Parameter;
+pub use self::services::{RemoteServices, Services};
+pub use self::transfers::Transfer;
diff --git a/src/rvi/parameters.rs b/src/rvi/parameters.rs
new file mode 100644
index 0000000..1fa1a87
--- /dev/null
+++ b/src/rvi/parameters.rs
@@ -0,0 +1,136 @@
+use std::str;
+use std::sync::Mutex;
+
+use datatype::{ChunkReceived, Event, DownloadComplete, UpdateRequestId, UpdateAvailable};
+use super::services::{BackendServices, RemoteServices};
+use super::transfers::Transfers;
+
+
+/// Each `Parameter` implementation handles a specific kind of RVI client request,
+/// optionally responding with an `Event` on completion.
+pub trait Parameter {
+ fn handle(&self, remote: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>)
+ -> Result<Option<Event>, String>;
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct Notify {
+ update_available: UpdateAvailable,
+ services: BackendServices
+}
+
+impl Parameter for Notify {
+ fn handle(&self, remote: &Mutex<RemoteServices>, _: &Mutex<Transfers>) -> Result<Option<Event>, String> {
+ remote.lock().unwrap().backend = Some(self.services.clone());
+ Ok(Some(Event::NewUpdateAvailable(self.update_available.clone())))
+ }
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct Start {
+ update_id: UpdateRequestId,
+ chunkscount: u64,
+ checksum: String
+}
+
+impl Parameter for Start {
+ fn handle(&self, remote: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> {
+ info!("Starting transfer for update_id {}", self.update_id);
+ let mut transfers = transfers.lock().unwrap();
+ transfers.push(self.update_id.clone(), self.checksum.clone());
+
+ let remote = remote.lock().unwrap();
+ let chunk = ChunkReceived {
+ device: remote.device_id.clone(),
+ update_id: self.update_id.clone(),
+ chunks: Vec::new()
+ };
+ remote.send_chunk_received(chunk)
+ .map(|_| None)
+ .map_err(|err| format!("error sending start ack: {}", err))
+ }
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct Chunk {
+ update_id: UpdateRequestId,
+ bytes: String,
+ index: u64
+}
+
+impl Parameter for Chunk {
+ fn handle(&self, remote: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> {
+ let remote = remote.lock().unwrap();
+
+ let mut transfers = transfers.lock().unwrap();
+ let transfer = try!(transfers.get_mut(self.update_id.clone())
+ .ok_or(format!("couldn't find transfer for update_id {}", self.update_id)));
+ transfer.write_chunk(&self.bytes, self.index)
+ .map_err(|err| format!("couldn't write chunk: {}", err))
+ .and_then(|_| {
+ trace!("wrote chunk {} for package {}", self.index, self.update_id);
+ let chunk = ChunkReceived {
+ device: remote.device_id.clone(),
+ update_id: self.update_id.clone(),
+ chunks: transfer.transferred_chunks.clone(),
+ };
+ remote.send_chunk_received(chunk)
+ .map(|_| None)
+ .map_err(|err| format!("error sending ChunkReceived: {}", err))
+ })
+ }
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct Finish {
+ update_id: UpdateRequestId,
+ signature: String
+}
+
+impl Parameter for Finish {
+ fn handle(&self, _: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> {
+ let mut transfers = transfers.lock().unwrap();
+ let image = {
+ let transfer = try!(transfers.get(self.update_id.clone())
+ .ok_or(format!("unknown package: {}", self.update_id)));
+ let package = try!(transfer.assemble_package()
+ .map_err(|err| format!("couldn't assemble package: {}", err)));
+ try!(package.into_os_string().into_string()
+ .map_err(|err| format!("couldn't get image: {:?}", err)))
+ };
+ transfers.remove(self.update_id.clone());
+ info!("Finished transfer of {}", self.update_id);
+
+ let complete = DownloadComplete {
+ update_id: self.update_id.clone(),
+ update_image: image,
+ signature: self.signature.clone()
+ };
+ Ok(Some(Event::DownloadComplete(complete)))
+ }
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct Report;
+
+impl Parameter for Report {
+ fn handle(&self, _: &Mutex<RemoteServices>, _: &Mutex<Transfers>) -> Result<Option<Event>, String> {
+ Ok(Some(Event::InstalledSoftwareNeeded))
+ }
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct Abort;
+
+impl Parameter for Abort {
+ fn handle(&self, _: &Mutex<RemoteServices>, transfers: &Mutex<Transfers>) -> Result<Option<Event>, String> {
+ transfers.lock().unwrap().clear();
+ Ok(None)
+ }
+}
diff --git a/src/rvi/services.rs b/src/rvi/services.rs
new file mode 100644
index 0000000..fd5a640
--- /dev/null
+++ b/src/rvi/services.rs
@@ -0,0 +1,185 @@
+use chan;
+use chan::Sender;
+use rustc_serialize::{json, Decodable, Encodable};
+use std::thread;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use time;
+
+use datatype::{ChunkReceived, DownloadStarted, Event, InstalledSoftware,
+ RpcRequest, RpcOk, RpcErr, RviConfig, UpdateReport, UpdateRequestId,
+ Url};
+use super::parameters::{Abort, Chunk, Finish, Notify, Parameter, Report, Start};
+use super::transfers::Transfers;
+
+
+/// Hold references to RVI service endpoints, currently active `Transfers`, and
+/// where to broadcast outcome `Event`s to.
+#[derive(Clone)]
+pub struct Services {
+ pub remote: Arc<Mutex<RemoteServices>>,
+ pub sender: Arc<Mutex<Sender<Event>>>,
+ pub transfers: Arc<Mutex<Transfers>>,
+}
+
+impl Services {
+ /// Set up a new RVI service handler, pruning any inactive `Transfer`s each second.
+ pub fn new(rvi_cfg: RviConfig, device_id: String, sender: Sender<Event>) -> Self {
+ let transfers = Arc::new(Mutex::new(Transfers::new(rvi_cfg.storage_dir)));
+ rvi_cfg.timeout.map_or_else(|| info!("Transfers will never time out."), |timeout| {
+ info!("Transfers timeout after {} seconds.", timeout);
+ let transfers = transfers.clone();
+ thread::spawn(move || {
+ let tick = chan::tick(Duration::from_secs(1));
+ loop {
+ let _ = tick.recv();
+ let mut transfers = transfers.lock().unwrap();
+ transfers.prune(time::get_time().sec, timeout);
+ }
+ });
+ });
+
+ Services {
+ remote: Arc::new(Mutex::new(RemoteServices::new(device_id, rvi_cfg.client))),
+ sender: Arc::new(Mutex::new(sender)),
+ transfers: transfers,
+ }
+ }
+
+ /// Register each RVI endpoint with the provided registration function which
+ /// should return a `String` representation of the URL used to contact that
+ /// service.
+ pub fn register_services<F: Fn(&str) -> String>(&mut self, register: F) {
+ let _ = register("/sota/notify");
+ let mut remote = self.remote.lock().unwrap();
+ remote.local = Some(LocalServices {
+ start: register("/sota/start"),
+ chunk: register("/sota/chunk"),
+ abort: register("/sota/abort"),
+ finish: register("/sota/finish"),
+ getpackages: register("/sota/getpackages")
+ });
+ }
+
+ /// Handle an incoming message for a specific service endpoint.
+ pub fn handle_service(&self, service: &str, id: u64, msg: &str) -> Result<RpcOk<i32>, RpcErr> {
+ match service {
+ "/sota/notify" => self.handle_message::<Notify>(id, msg),
+ "/sota/start" => self.handle_message::<Start>(id, msg),
+ "/sota/chunk" => self.handle_message::<Chunk>(id, msg),
+ "/sota/finish" => self.handle_message::<Finish>(id, msg),
+ "/sota/getpackages" => self.handle_message::<Report>(id, msg),
+ "/sota/abort" => self.handle_message::<Abort>(id, msg),
+ _ => Err(RpcErr::invalid_request(id, format!("unknown service: {}", service)))
+ }
+ }
+
+ /// Parse the message as an `RpcRequest<RviMessage<Parameter>>` then delegate
+ /// to the specific `Parameter.handle()` function, forwarding any returned
+ /// `Event` to the `Services` sender.
+ fn handle_message<P>(&self, id: u64, msg: &str) -> Result<RpcOk<i32>, RpcErr>
+ where P: Parameter + Encodable + Decodable
+ {
+ let request = try!(json::decode::<RpcRequest<RviMessage<P>>>(&msg).map_err(|err| {
+ error!("couldn't decode message: {}", err);
+ RpcErr::invalid_params(id, format!("couldn't decode message: {}", err))
+ }));
+ let event = try!(request.params.parameters[0].handle(&self.remote, &self.transfers).map_err(|err| {
+ error!("couldn't handle parameters: {}", err);
+ RpcErr::unspecified(request.id, format!("couldn't handle parameters: {}", err))
+ }));
+ event.map(|ev| self.sender.lock().unwrap().send(ev));
+ Ok(RpcOk::new(request.id, None))
+ }
+}
+
+
+pub struct RemoteServices {
+ pub device_id: String,
+ pub rvi_client: Url,
+ pub local: Option<LocalServices>,
+ pub backend: Option<BackendServices>
+}
+
+impl RemoteServices {
+ pub fn new(device_id: String, rvi_client: Url) -> RemoteServices {
+ RemoteServices { device_id: device_id, rvi_client: rvi_client, local: None, backend: None }
+ }
+
+ fn send_message<E: Encodable>(&self, body: E, addr: &str) -> Result<String, String> {
+ RpcRequest::new("message", RviMessage::new(addr, vec![body], 60)).send(self.rvi_client.clone())
+ }
+
+ pub fn send_download_started(&self, update_id: UpdateRequestId) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ let local = try!(self.local.as_ref().ok_or("LocalServices not set"));
+ let start = DownloadStarted { device: self.device_id.clone(), update_id: update_id, services: local.clone() };
+ self.send_message(start, &backend.start)
+ }
+
+ pub fn send_chunk_received(&self, chunk: ChunkReceived) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ self.send_message(chunk, &backend.ack)
+ }
+
+ pub fn send_update_report(&self, report: UpdateReport) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ let result = UpdateReportResult { device: self.device_id.clone(), update_report: report };
+ self.send_message(result, &backend.report)
+ }
+
+ pub fn send_installed_software(&self, installed: InstalledSoftware) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ let result = InstalledSoftwareResult { device_id: self.device_id.clone(), installed: installed };
+ self.send_message(result, &backend.packages)
+ }
+}
+
+
+#[derive(Clone, RustcDecodable, RustcEncodable)]
+pub struct LocalServices {
+ pub start: String,
+ pub abort: String,
+ pub chunk: String,
+ pub finish: String,
+ pub getpackages: String,
+}
+
+#[derive(Clone, RustcDecodable, RustcEncodable)]
+pub struct BackendServices {
+ pub start: String,
+ pub ack: String,
+ pub report: String,
+ pub packages: String
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+struct UpdateReportResult {
+ pub device: String,
+ pub update_report: UpdateReport
+}
+
+#[derive(RustcDecodable, RustcEncodable)]
+struct InstalledSoftwareResult {
+ device_id: String,
+ installed: InstalledSoftware
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct RviMessage<E: Encodable> {
+ pub service_name: String,
+ pub parameters: Vec<E>,
+ pub timeout: Option<i64>
+}
+
+impl<E: Encodable> RviMessage<E> {
+ pub fn new(service: &str, parameters: Vec<E>, expire_in: i64) -> RviMessage<E> {
+ RviMessage {
+ service_name: service.to_string(),
+ parameters: parameters,
+ timeout: Some((time::get_time() + time::Duration::seconds(expire_in)).sec)
+ }
+ }
+}
diff --git a/src/rvi/transfers.rs b/src/rvi/transfers.rs
new file mode 100644
index 0000000..859c2c1
--- /dev/null
+++ b/src/rvi/transfers.rs
@@ -0,0 +1,278 @@
+use crypto::digest::Digest;
+use crypto::sha1::Sha1;
+use rustc_serialize::base64::FromBase64;
+use std::fs;
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::prelude::*;
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::vec::Vec;
+use time;
+
+use datatype::UpdateRequestId;
+
+
+/// Holds all currently active transfers where each is referenced by `UpdateRequestId`.
+pub struct Transfers {
+ items: HashMap<UpdateRequestId, Transfer>,
+ storage_dir: String
+}
+
+impl Transfers {
+ pub fn new(storage_dir: String) -> Transfers {
+ Transfers { items: HashMap::new(), storage_dir: storage_dir }
+ }
+
+ pub fn get(&self, update_id: UpdateRequestId) -> Option<&Transfer> {
+ self.items.get(&update_id)
+ }
+
+ pub fn get_mut(&mut self, update_id: UpdateRequestId) -> Option<&mut Transfer> {
+ self.items.get_mut(&update_id)
+ }
+
+ pub fn push(&mut self, update_id: UpdateRequestId, checksum: String) {
+ let transfer = Transfer::new(self.storage_dir.to_string(), update_id.clone(), checksum);
+ self.items.insert(update_id, transfer);
+ }
+
+ pub fn remove(&mut self, update_id: UpdateRequestId) {
+ self.items.remove(&update_id);
+ }
+
+ pub fn clear(&mut self) {
+ self.items.clear();
+ }
+
+ pub fn prune(&mut self, now: i64, timeout: i64) {
+ let mut timeouts = Vec::new();
+ for (id, transfer) in &mut self.items {
+ if now - transfer.last_chunk_received > timeout {
+ timeouts.push(id.clone());
+ }
+ }
+
+ for id in timeouts {
+ self.items.remove(&id);
+ info!("Transfer for update_id {} timed out.", id)
+ }
+ }
+}
+
+
+/// Holds the details of the transferred chunks relating to an `UpdateRequestId`.
+pub struct Transfer {
+ pub update_id: UpdateRequestId,
+ pub checksum: String,
+ pub transferred_chunks: Vec<u64>,
+ pub storage_dir: String,
+ pub last_chunk_received: i64
+}
+
+impl Transfer {
+ /// Prepare for the transfer of a new package.
+ pub fn new(storage_dir: String, update_id: UpdateRequestId, checksum: String) -> Transfer {
+ Transfer {
+ update_id: update_id,
+ checksum: checksum,
+ transferred_chunks: Vec::new(),
+ storage_dir: storage_dir,
+ last_chunk_received: time::get_time().sec
+ }
+ }
+
+ /// Write the received chunk to disk and store metadata inside `Transfer`.
+ pub fn write_chunk(&mut self, data: &str, index: u64) -> Result<(), String> {
+ self.last_chunk_received = time::get_time().sec;
+ let mut path = try!(self.get_chunk_dir().map_err(|err| format!("couldn't get chunk dir: {}", err)));
+ path.push(index.to_string());
+ let mut file = try!(File::create(path).map_err(|err| format!("couldn't open chunk file: {}", err)));
+
+ let data = try!(data.from_base64().map_err(|err| format!("couldn't decode chunk {}: {}", index, err)));
+ try!(file.write_all(&data)
+ .map_err(|err| format!("couldn't write chunk {} for update_id {}: {}", index, self.update_id, err)));
+ try!(file.flush().map_err(|err| format!("couldn't flush file: {}", err)));
+
+ self.transferred_chunks.push(index);
+ self.transferred_chunks.sort();
+ self.transferred_chunks.dedup();
+ Ok(())
+ }
+
+ /// Assemble all received chunks into a complete package.
+ pub fn assemble_package(&self) -> Result<PathBuf, String> {
+ debug!("finalizing package {}", self.update_id);
+ try!(self.assemble_chunks());
+ self.verify()
+ .and_then(|_| self.get_package_path())
+ .map_err(|err| format!("couldn't assemble_package for update_id {}: {}", self.update_id, err))
+ }
+
+ fn assemble_chunks(&self) -> Result<(), String> {
+ let pkg_path = try!(self.get_package_path());
+ debug!("saving update_id {} to {}", self.update_id, pkg_path.display());
+ let mut file = try!(File::create(pkg_path).map_err(|err| format!("couldn't open package file: {}", err)));
+
+ let chunk_dir = try!(self.get_chunk_dir());
+ let entries = try!(fs::read_dir(chunk_dir.clone()).map_err(|err| format!("couldn't read dir: {}", err)));
+ let mut indices = Vec::new();
+ for entry in entries {
+ let entry = try!(entry.map_err(|err| format!("bad entry: {}", err)));
+ let name = try!(entry.file_name().into_string().map_err(|err| format!("bad entry name: {:?}", err)));
+ let index = try!(u64::from_str(&name).map_err(|err| format!("couldn't parse chunk index: {}", err)));
+ indices.push(index);
+ }
+ indices.sort();
+
+ for index in indices {
+ try!(self.append_chunk(&mut file, chunk_dir.clone(), index));
+ }
+ Ok(debug!("assembled chunks for update_id {}", self.update_id))
+ }
+
+ fn append_chunk(&self, file: &mut File, mut chunk_dir: PathBuf, index: u64) -> Result<(), String> {
+ chunk_dir.push(&index.to_string());
+ let mut chunk = try!(File::open(chunk_dir).map_err(|err| format!("couldn't open chunk: {}", err)));
+ let mut buf = Vec::new();
+ try!(chunk.read_to_end(&mut buf).map_err(|err| format!("couldn't read file {}: {}", index, err)));
+ try!(file.write(&buf).map_err(|err| format!("couldn't write chunk {}: {}", index, err)));
+ Ok(trace!("wrote chunk {} for update_id {}", index, self.update_id))
+ }
+
+ fn verify(&self) -> Result<(), String> {
+ let path = try!(self.get_package_path());
+ let mut file = try!(File::open(path).map_err(|err| format!("couldn't open package path: {}", err)));
+ let mut data = Vec::new();
+ try!(file.read_to_end(&mut data).map_err(|err| format!("couldn't read file: {}", err)));
+
+ let mut hash = Sha1::new();
+ hash.input(&data);
+ if hash.result_str() == self.checksum {
+ Ok(())
+ } else {
+ Err(format!("update_id {} checksum failed: expected {}, got {}", self.update_id, self.checksum, hash.result_str()))
+ }
+ }
+
+ fn get_chunk_dir(&self) -> Result<PathBuf, String> {
+ let mut path = PathBuf::from(&self.storage_dir);
+ path.push("downloads");
+ path.push(self.update_id.clone());
+ fs::create_dir_all(&path)
+ .map(|_| path)
+ .map_err(|err| format!("couldn't create chunk dir: {}", err))
+ }
+
+ fn get_package_path(&self) -> Result<PathBuf, String> {
+ let mut path = PathBuf::from(&self.storage_dir);
+ path.push("packages");
+ try!(fs::create_dir_all(&path)
+ .map_err(|err| format!("couldn't create package dir {:?}: {}", path, err)));
+ path.push(format!("{}.spkg", self.update_id));
+ Ok(path)
+ }
+}
+
+impl Drop for Transfer {
+ fn drop(&mut self) {
+ let _ = self.get_chunk_dir().map(|dir| {
+ fs::read_dir(&dir)
+ .or_else(|err| Err(error!("couldn't read dir {:?}: {}", &dir, err)))
+ .and_then(|entries| {
+ for entry in entries {
+ let _ = entry.map(|entry| fs::remove_file(entry.path()))
+ .map_err(|err| error!("found a malformed entry: {}", err));
+ }
+ Ok(fs::remove_dir(dir).map_err(|err| error!("couldn't remove dir: {}", err)))
+ })
+ });
+ }
+}
+
+
+#[cfg(test)]
+mod test {
+ use rand;
+ use rand::Rng;
+ use rustc_serialize::base64;
+ use rustc_serialize::base64::ToBase64;
+ use std::path::PathBuf;
+ use std::fs::File;
+ use std::io::prelude::*;
+ use time;
+
+ use super::*;
+ use package_manager::TestDir;
+
+
+ impl Transfer {
+ pub fn new_test(test_dir: &TestDir) -> Transfer {
+ Transfer {
+ update_id: rand::thread_rng().gen_ascii_chars().take(10).collect::<String>(),
+ checksum: "".to_string(),
+ transferred_chunks: Vec::new(),
+ storage_dir: test_dir.0.clone(),
+ last_chunk_received: time::get_time().sec
+ }
+ }
+
+ pub fn assert_chunk_written(&mut self, test_dir: &TestDir, index: u64, data: &[u8]) {
+ let encoded = data.to_base64(base64::Config {
+ char_set: base64::CharacterSet::UrlSafe,
+ newline: base64::Newline::LF,
+ pad: true,
+ line_length: None
+ });
+ self.write_chunk(encoded.as_ref(), index).expect("couldn't write chunk");
+
+ let path = PathBuf::from(format!("{}/downloads/{}/{}", test_dir.0.clone(), self.update_id, index));
+ let mut file = File::open(path).map_err(|err| panic!("couldn't open file: {}", err)).unwrap();
+ let mut buf = Vec::new();
+ let _ = file.read_to_end(&mut buf).expect("couldn't read file");
+ assert_eq!(data.to_vec(), buf);
+ }
+ }
+
+
+ #[test]
+ fn test_package_directory_created() {
+ let test_dir = TestDir::new("sota-test-transfers");
+ let transfer = Transfer::new_test(&test_dir);
+ let chunk_dir = transfer.get_package_path().unwrap();
+ let path = format!("{}/packages/{}.spkg", test_dir.0, transfer.update_id);
+ assert_eq!(chunk_dir.to_str().unwrap(), path);
+ }
+
+ #[test]
+ fn test_checksum() {
+ let test_dir = TestDir::new("sota-test-transfers");
+ let mut transfer = Transfer::new_test(&test_dir);
+ transfer.assert_chunk_written(&test_dir, 0, "test\n".to_string().as_bytes());
+ transfer.assemble_chunks().expect("couldn't assemble chunks");
+
+ transfer.checksum = "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83".to_string();
+ assert!(transfer.verify().is_ok());
+
+ transfer.checksum = "invalid".to_string();
+ assert!(!transfer.verify().is_ok());
+ }
+
+ #[test]
+ fn test_assemble_chunks() {
+ let test_dir = TestDir::new("sota-test-transfers");
+ let mut transfer = Transfer::new_test(&test_dir);
+ let mut assembly = String::new();
+ for index in 1..20 {
+ let data = rand::thread_rng().gen_ascii_chars().take(index).collect::<String>();
+ assembly.push_str(&data);
+ transfer.assert_chunk_written(&test_dir, index as u64, data.as_bytes());
+ }
+
+ transfer.assemble_chunks().expect("couldn't assemble chunks");
+ let path = format!("{}/packages/{}.spkg", test_dir.0, transfer.update_id);
+ let mut buf = Vec::new();
+ let _ = File::open(PathBuf::from(path)).unwrap().read_to_end(&mut buf).unwrap();
+ assert_eq!(assembly.into_bytes(), buf);
+ }
+}
diff --git a/src/sota.rs b/src/sota.rs
new file mode 100644
index 0000000..3dec33d
--- /dev/null
+++ b/src/sota.rs
@@ -0,0 +1,141 @@
+use rustc_serialize::json;
+use std::fs::File;
+use std::io;
+use std::path::PathBuf;
+
+use datatype::{Config, DeviceReport, DownloadComplete, Error, Package,
+ PendingUpdateRequest, UpdateRequestId, UpdateReport, Url};
+use http::Client;
+
+
+/// Encapsulate the client configuration and HTTP client used for
+/// software-over-the-air updates.
+pub struct Sota<'c, 'h> {
+ config: &'c Config,
+ client: &'h Client,
+}
+
+impl<'c, 'h> Sota<'c, 'h> {
+ /// Creates a new instance for Sota communication.
+ pub fn new(config: &'c Config, client: &'h Client) -> Sota<'c, 'h> {
+ Sota { config: config, client: client }
+ }
+
+ /// Takes a path and returns a new endpoint of the format
+ /// `<Core server>/api/v1/device_updates/<uuid>/<path>`.
+ pub fn endpoint(&self, path: &str) -> Url {
+ let endpoint = if path.is_empty() {
+ format!("/api/v1/device_updates/{}", self.config.device.uuid)
+ } else {
+ format!("/api/v1/device_updates/{}/{}", self.config.device.uuid, path)
+ };
+ self.config.core.server.join(&endpoint).expect("couldn't build endpoint url")
+ }
+
+ /// Query the Core server to identify any new package updates available.
+ pub fn get_pending_updates(&mut self) -> Result<Vec<PendingUpdateRequest>, Error> {
+ let resp_rx = self.client.get(self.endpoint(""), None);
+ let resp = resp_rx.recv().expect("no get_package_updates response received");
+ let data = try!(resp);
+ let text = try!(String::from_utf8(data));
+ Ok(try!(json::decode::<Vec<PendingUpdateRequest>>(&text)))
+ }
+
+ /// Download a specific update from the Core server.
+ pub fn download_update(&mut self, id: UpdateRequestId) -> Result<DownloadComplete, Error> {
+ let resp_rx = self.client.get(self.endpoint(&format!("{}/download", id)), None);
+ let resp = resp_rx.recv().expect("no download_package_update response received");
+ let data = try!(resp);
+
+ let mut path = PathBuf::new();
+ path.push(&self.config.device.packages_dir);
+ path.push(id.clone()); // TODO: Use Content-Disposition filename from request?
+ let mut file = try!(File::create(path.as_path()));
+
+ let _ = io::copy(&mut &*data, &mut file);
+ let path = try!(path.to_str().ok_or(Error::Parse(format!("Path is not valid UTF-8: {:?}", path))));
+
+ Ok(DownloadComplete {
+ update_id: id,
+ update_image: path.to_string(),
+ signature: "".to_string()
+ })
+ }
+
+ /// Install an update using the package manager.
+ pub fn install_update(&mut self, download: DownloadComplete) -> Result<UpdateReport, UpdateReport> {
+ let ref pacman = self.config.device.package_manager;
+ pacman.install_package(&download.update_image).and_then(|(code, output)| {
+ Ok(UpdateReport::single(download.update_id.clone(), code, output))
+ }).or_else(|(code, output)| {
+ Err(UpdateReport::single(download.update_id.clone(), code, output))
+ })
+ }
+
+ /// Get a list of the currently installed packages from the package manager.
+ pub fn get_installed_packages(&mut self) -> Result<Vec<Package>, Error> {
+ Ok(try!(self.config.device.package_manager.installed_packages()))
+ }
+
+ /// Send a list of the currently installed packages to the Core server.
+ pub fn send_installed_packages(&mut self, packages: &Vec<Package>) -> Result<(), Error> {
+ let body = try!(json::encode(packages));
+ let resp_rx = self.client.put(self.endpoint("installed"), Some(body.into_bytes()));
+ let _ = resp_rx.recv().expect("no update_installed_packages response received")
+ .map_err(|err| error!("update_installed_packages failed: {}", err));
+ Ok(())
+ }
+
+ /// Send the outcome of a package update to the Core server.
+ pub fn send_update_report(&mut self, update_report: &UpdateReport) -> Result<(), Error> {
+ let report = DeviceReport::new(&self.config.device.uuid, update_report);
+ let body = try!(json::encode(&report));
+ let url = self.endpoint(report.device);
+ let resp_rx = self.client.post(url, Some(body.into_bytes()));
+ let resp = resp_rx.recv().expect("no send_install_report response received");
+ let _ = try!(resp);
+ Ok(())
+ }
+
+ /// Send system information from the device to the Core server.
+ pub fn send_system_info(&mut self, body: &str) -> Result<(), Error> {
+ let resp_rx = self.client.put(self.endpoint("system_info"), Some(body.as_bytes().to_vec()));
+ let resp = resp_rx.recv().expect("no send_system_info response received");
+ let _ = try!(resp);
+ Ok(())
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use rustc_serialize::json;
+
+ use super::*;
+ use datatype::{Config, Package, PendingUpdateRequest};
+ use http::TestClient;
+
+
+ #[test]
+ fn test_get_pending_updates() {
+ let pending_update = PendingUpdateRequest {
+ requestId: "someid".to_string(),
+ installPos: 0,
+ packageId: Package {
+ name: "fake-pkg".to_string(),
+ version: "0.1.1".to_string()
+ },
+ createdAt: "2010-01-01".to_string()
+ };
+
+ let json = format!("[{}]", json::encode(&pending_update).unwrap());
+ let mut sota = Sota {
+ config: &Config::default(),
+ client: &mut TestClient::from(vec![json.to_string()]),
+ };
+
+ let updates: Vec<PendingUpdateRequest> = sota.get_pending_updates().unwrap();
+ let ids: Vec<String> = updates.iter().map(|p| p.requestId.clone()).collect();
+ assert_eq!(ids, vec!["someid".to_string()])
+ }
+}
diff --git a/src/test_library.rs b/src/test_library.rs
deleted file mode 100644
index b5af298..0000000
--- a/src/test_library.rs
+++ /dev/null
@@ -1,72 +0,0 @@
-//! Helper functions for testing `sota_client`.
-
-use std::path::PathBuf;
-use std::fmt;
-use std::fs;
-
-use time;
-use log;
-use log::{LogRecord, LogLevel, LogMetadata};
-
-/// Initiates logging in tests. Can safely be called multiple times.
-macro_rules! test_init {
- () => {
- use test_library::SimpleLogger;
- use log::LogLevelFilter;
- use log;
- match log::set_logger(|max_log_level| {
- max_log_level.set(LogLevelFilter::Trace);
- Box::new(SimpleLogger)
- }) {
- Ok(..) => {},
- Err(..) => {}
- }
- }
-}
-
-/// Implements a simple logger printing all log messages to stdout.
-pub struct SimpleLogger;
-
-impl log::Log for SimpleLogger {
- fn enabled(&self, metadata: &LogMetadata) -> bool {
- metadata.level() <= LogLevel::Info
- }
-
- fn log(&self, record: &LogRecord) {
- if self.enabled(record.metadata()) {
- println!("{} - {}", record.level(), record.args());
- }
- }
-}
-
-/// Wrapper for storing test data in a temporary directory. The created directory will be deleted,
-/// when dropped.
-pub struct PathPrefix { prefix: String }
-
-impl PathPrefix {
- pub fn new() -> PathPrefix {
- PathPrefix {
- prefix: format!("/tmp/rust-test-{}",
- time::precise_time_ns()
- .to_string())
- }
- }
-
- pub fn to_string(&self) -> String {
- return self.prefix.clone();
- }
-}
-
-impl Drop for PathPrefix {
- fn drop(&mut self) {
- let dir = PathBuf::from(&self.prefix);
- fs::remove_dir_all(dir).unwrap();
- }
-}
-
-impl fmt::Display for PathPrefix {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "{}", self.prefix)
- }
-}
-