diff options
author | Arthur Taylor <codders@octomonkey.org.uk> | 2016-11-22 10:56:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-11-22 10:56:20 +0100 |
commit | 8e9d234dcfe03a24409829ddd31b51bd8f840345 (patch) | |
tree | d1d833d732faf4c8709e975a4bb6129acc3d0f76 /src | |
parent | 0167dce98692f707b74395977c478c2ca44fa0c7 (diff) | |
parent | 4b50e1cb0945adbbcc07dfcb65a9252e7523105d (diff) | |
download | rvi_sota_client-master.tar.gz |
Merge latest stable advancedtelematic
Diffstat (limited to 'src')
28 files changed, 1134 insertions, 664 deletions
diff --git a/src/datatype/auth.rs b/src/datatype/auth.rs index cbfd097..83c872a 100644 --- a/src/datatype/auth.rs +++ b/src/datatype/auth.rs @@ -5,7 +5,7 @@ use std::borrow::Cow; #[derive(Clone, Debug)] pub enum Auth { None, - Credentials(ClientId, ClientSecret), + Credentials(ClientCredentials), Token(AccessToken), } @@ -16,8 +16,15 @@ impl<'a> Into<Cow<'a, Auth>> for Auth { } -/// For storage of the returned access token data following a successful -/// authentication. +/// Encapsulates the client id and secret used during authentication. +#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] +pub struct ClientCredentials { + pub client_id: String, + pub client_secret: String, +} + + +/// Stores the returned access token data following a successful authentication. #[derive(RustcDecodable, Debug, PartialEq, Clone, Default)] pub struct AccessToken { pub access_token: String, @@ -31,19 +38,3 @@ impl<'a> Into<Cow<'a, AccessToken>> for AccessToken { Cow::Owned(self) } } - - -/// Encapsulates a `String` type for use in `Auth::Credentials` -#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] -pub struct ClientId(pub String); - -/// Encapsulates a `String` type for use in `Auth::Credentials` -#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] -pub struct ClientSecret(pub String); - -/// Encapsulates the client id and secret used during authentication. -#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] -pub struct ClientCredentials { - pub client_id: ClientId, - pub client_secret: ClientSecret, -} diff --git a/src/datatype/command.rs b/src/datatype/command.rs index d449bb6..c88e8d5 100644 --- a/src/datatype/command.rs +++ b/src/datatype/command.rs @@ -3,9 +3,8 @@ use std::str; use std::str::FromStr; use nom::{IResult, space, eof}; -use datatype::{ClientCredentials, ClientId, ClientSecret, DownloadComplete, Error, - InstalledSoftware, Package, UpdateReport, UpdateRequestId, - UpdateResultCode}; +use datatype::{ClientCredentials, Error, InstalledSoftware, Package, UpdateReport, + UpdateRequestId, UpdateResultCode}; /// System-wide commands that are sent to the interpreter. @@ -16,29 +15,36 @@ pub enum Command { /// Shutdown the client immediately. Shutdown, - /// Check for any new updates. - GetNewUpdates, + /// Check for any pending or in-flight updates. + GetUpdateRequests, + /// List the installed packages on the system. ListInstalledPackages, - /// Get the latest system information, and optionally publish it to Core. - RefreshSystemInfo(bool), + /// List the system information. + ListSystemInfo, - /// Start downloading one or more updates. - StartDownload(Vec<UpdateRequestId>), - /// Start installing an update - StartInstall(DownloadComplete), + /// Start downloading an update. + StartDownload(UpdateRequestId), + /// Start installing an update. + StartInstall(UpdateRequestId), /// Send a list of packages to the Core server. SendInstalledPackages(Vec<Package>), /// Send a list of all packages and firmware to the Core server. SendInstalledSoftware(InstalledSoftware), + /// Send the system information to the Core server. + SendSystemInfo, /// Send a package update report to the Core server. SendUpdateReport(UpdateReport), } impl Display for Command { fn fmt(&self, f: &mut Formatter) -> FmtResult { - write!(f, "{:?}", self) + let text = match *self { + Command::SendInstalledPackages(_) => "SendInstalledPackages(...)".to_string(), + _ => format!("{:?}", self) + }; + write!(f, "{}", text) } } @@ -59,24 +65,26 @@ named!(command <(Command, Vec<&str>)>, chain!( ~ cmd: alt!( alt_complete!(tag!("Authenticate") | tag!("auth")) => { |_| Command::Authenticate(None) } - | alt_complete!(tag!("GetNewUpdates") | tag!("new")) - => { |_| Command::GetNewUpdates } + | alt_complete!(tag!("GetUpdateRequests") | tag!("getreq")) + => { |_| Command::GetUpdateRequests } | alt_complete!(tag!("ListInstalledPackages") | tag!("ls")) => { |_| Command::ListInstalledPackages } - | alt_complete!(tag!("RefreshSystemInfo") | tag!("info")) - => { |_| Command::RefreshSystemInfo(false) } + | alt_complete!(tag!("ListSystemInfo") | tag!("info")) + => { |_| Command::ListSystemInfo } | alt_complete!(tag!("Shutdown") | tag!("shutdown")) => { |_| Command::Shutdown } | alt_complete!(tag!("SendInstalledPackages") | tag!("sendpack")) => { |_| Command::SendInstalledPackages(Vec::new()) } | alt_complete!(tag!("SendInstalledSoftware") | tag!("sendinst")) => { |_| Command::SendInstalledSoftware(InstalledSoftware::default()) } + | alt_complete!(tag!("SendSystemInfo") | tag!("sendinfo")) + => { |_| Command::SendSystemInfo } | alt_complete!(tag!("SendUpdateReport") | tag!("sendup")) => { |_| Command::SendUpdateReport(UpdateReport::default()) } | alt_complete!(tag!("StartDownload") | tag!("dl")) - => { |_| Command::StartDownload(Vec::new()) } + => { |_| Command::StartDownload("".to_string()) } | alt_complete!(tag!("StartInstall") | tag!("inst")) - => { |_| Command::StartInstall(DownloadComplete::default()) } + => { |_| Command::StartInstall("".to_string()) } ) ~ args: arguments ~ alt!(eof | tag!("\r") | tag!("\n") | tag!(";")), @@ -103,14 +111,15 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { 0 => Ok(Command::Authenticate(None)), 1 => Err(Error::Command("usage: auth <client-id> <client-secret>".to_string())), 2 => Ok(Command::Authenticate(Some(ClientCredentials { - client_id: ClientId(args[0].to_string()), - client_secret: ClientSecret(args[1].to_string())}))), + client_id: args[0].to_string(), + client_secret: args[1].to_string() + }))), _ => Err(Error::Command(format!("unexpected Authenticate args: {:?}", args))), }, - Command::GetNewUpdates => match args.len() { - 0 => Ok(Command::GetNewUpdates), - _ => Err(Error::Command(format!("unexpected GetNewUpdates args: {:?}", args))), + Command::GetUpdateRequests => match args.len() { + 0 => Ok(Command::GetUpdateRequests), + _ => Err(Error::Command(format!("unexpected GetUpdateRequests args: {:?}", args))), }, Command::ListInstalledPackages => match args.len() { @@ -118,16 +127,9 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { _ => Err(Error::Command(format!("unexpected ListInstalledPackages args: {:?}", args))), }, - Command::RefreshSystemInfo(_) => match args.len() { - 0 => Ok(Command::RefreshSystemInfo(false)), - 1 => { - if let Ok(b) = args[0].parse::<bool>() { - Ok(Command::RefreshSystemInfo(b)) - } else { - Err(Error::Command("couldn't parse 1st argument as boolean".to_string())) - } - } - _ => Err(Error::Command(format!("unexpected RefreshSystemInfo args: {:?}", args))), + Command::ListSystemInfo => match args.len() { + 0 => Ok(Command::ListSystemInfo), + _ => Err(Error::Command(format!("unexpected ListSystemInfo args: {:?}", args))), }, Command::SendInstalledPackages(_) => match args.len() { @@ -150,6 +152,11 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { _ => Err(Error::Command(format!("unexpected SendInstalledSoftware args: {:?}", args))), }, + Command::SendSystemInfo => match args.len() { + 0 => Ok(Command::SendSystemInfo), + _ => Err(Error::Command(format!("unexpected SendSystemInfo args: {:?}", args))), + }, + Command::SendUpdateReport(_) => match args.len() { 0 | 1 => Err(Error::Command("usage: sendup <update-id> <result-code>".to_string())), 2 => { @@ -168,12 +175,14 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { }, Command::StartDownload(_) => match args.len() { - 0 => Err(Error::Command("usage: dl [<id>]".to_string())), - _ => Ok(Command::StartDownload(args.iter().map(|arg| String::from(*arg)).collect())), + 0 => Err(Error::Command("usage: dl <id>".to_string())), + 1 => Ok(Command::StartDownload(args[0].to_string())), + _ => Err(Error::Command(format!("unexpected StartInstall args: {:?}", args))), }, Command::StartInstall(_) => match args.len() { - // FIXME(PRO-1160): args + 0 => Err(Error::Command("usage: inst <id>".to_string())), + 1 => Ok(Command::StartInstall(args[0].to_string())), _ => Err(Error::Command(format!("unexpected StartInstall args: {:?}", args))), }, @@ -184,8 +193,7 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { #[cfg(test)] mod tests { use super::{command, arguments}; - use datatype::{Command, ClientCredentials, ClientId, ClientSecret, Package, - UpdateReport, UpdateResultCode}; + use datatype::{Command, ClientCredentials, Package, UpdateReport, UpdateResultCode}; use nom::IResult; @@ -194,7 +202,7 @@ mod tests { assert_eq!(command(&b"auth foo bar"[..]), IResult::Done(&b""[..], (Command::Authenticate(None), vec!["foo", "bar"]))); assert_eq!(command(&b"dl 1"[..]), - IResult::Done(&b""[..], (Command::StartDownload(Vec::new()), vec!["1"]))); + IResult::Done(&b""[..], (Command::StartDownload("".to_string()), vec!["1"]))); assert_eq!(command(&b"ls;\n"[..]), IResult::Done(&b"\n"[..], (Command::ListInstalledPackages, Vec::new()))); } @@ -216,18 +224,18 @@ mod tests { assert_eq!("auth".parse::<Command>().unwrap(), Command::Authenticate(None)); assert_eq!("auth user pass".parse::<Command>().unwrap(), Command::Authenticate(Some(ClientCredentials { - client_id: ClientId("user".to_string()), - client_secret: ClientSecret("pass".to_string()), + client_id: "user".to_string(), + client_secret: "pass".to_string(), }))); assert!("auth one".parse::<Command>().is_err()); assert!("auth one two three".parse::<Command>().is_err()); } #[test] - fn get_new_updates_test() { - assert_eq!("GetNewUpdates".parse::<Command>().unwrap(), Command::GetNewUpdates); - assert_eq!("new".parse::<Command>().unwrap(), Command::GetNewUpdates); - assert!("new old".parse::<Command>().is_err()); + fn get_update_requests_test() { + assert_eq!("GetUpdateRequests".parse::<Command>().unwrap(), Command::GetUpdateRequests); + assert_eq!("getreq".parse::<Command>().unwrap(), Command::GetUpdateRequests); + assert!("getreq now".parse::<Command>().is_err()); } #[test] @@ -238,10 +246,10 @@ mod tests { } #[test] - fn refresh_system_info_test() { - assert_eq!("RefreshSystemInfo true".parse::<Command>().unwrap(), Command::RefreshSystemInfo(true)); - assert_eq!("info".parse::<Command>().unwrap(), Command::RefreshSystemInfo(false)); - assert!("RefreshSystemInfo 1 2".parse::<Command>().is_err()); + fn list_system_info_test() { + assert_eq!("ListSystemInfo".parse::<Command>().unwrap(), Command::ListSystemInfo); + assert_eq!("info".parse::<Command>().unwrap(), Command::ListSystemInfo); + assert!("ListSystemInfo 1 2".parse::<Command>().is_err()); assert!("info please".parse::<Command>().is_err()); } @@ -271,6 +279,14 @@ mod tests { } #[test] + fn send_system_info_test() { + assert_eq!("SendSystemInfo".parse::<Command>().unwrap(), Command::SendSystemInfo); + assert_eq!("sendinfo".parse::<Command>().unwrap(), Command::SendSystemInfo); + assert!("SendSystemInfo 1 2".parse::<Command>().is_err()); + assert!("sendinfo please".parse::<Command>().is_err()); + } + + #[test] fn send_update_report_test() { assert_eq!("SendUpdateReport myid OK".parse::<Command>().unwrap(), Command::SendUpdateReport( UpdateReport::single("myid".to_string(), UpdateResultCode::OK, "".to_string()))); @@ -291,15 +307,16 @@ mod tests { #[test] fn start_download_test() { - assert_eq!("StartDownload this".parse::<Command>().unwrap(), - Command::StartDownload(vec!["this".to_string()])); - assert_eq!("dl some more".parse::<Command>().unwrap(), - Command::StartDownload(vec!["some".to_string(), "more".to_string()])); + assert_eq!("StartDownload this".parse::<Command>().unwrap(), Command::StartDownload("this".to_string())); + assert_eq!("dl that".parse::<Command>().unwrap(), Command::StartDownload("that".to_string())); + assert!("StartDownload this and that".parse::<Command>().is_err()); assert!("dl".parse::<Command>().is_err()); } #[test] fn start_install_test() { + assert_eq!("StartInstall 123".parse::<Command>().unwrap(), Command::StartInstall("123".to_string())); + assert_eq!("inst this".parse::<Command>().unwrap(), Command::StartInstall("this".to_string())); assert!("StartInstall".parse::<Command>().is_err()); assert!("inst more than one".parse::<Command>().is_err()); } diff --git a/src/datatype/config.rs b/src/datatype/config.rs index 5db0f12..ad80c42 100644 --- a/src/datatype/config.rs +++ b/src/datatype/config.rs @@ -6,13 +6,13 @@ use std::os::unix::fs::PermissionsExt; use std::io::prelude::*; use std::path::Path; use toml; -use toml::{Decoder, Parser, Table, Value}; +use toml::{Decoder, Parser, Table}; -use datatype::{Error, SystemInfo, Url}; +use datatype::{Error, SocketAddr, Url}; use package_manager::PackageManager; -/// An aggregation of all the configuration options parsed at startup. +/// A container for all parsed configs. #[derive(Default, PartialEq, Eq, Debug, Clone)] pub struct Config { pub auth: Option<AuthConfig>, @@ -25,6 +25,8 @@ pub struct Config { } impl Config { + /// Read in a toml configuration file using default values for missing + /// sections or fields. pub fn load(path: &str) -> Result<Config, Error> { info!("Loading config file: {}", path); let mut file = try!(File::open(path).map_err(Error::Io)); @@ -33,36 +35,34 @@ impl Config { Config::parse(&toml) } + /// Parse a toml configuration string using default values for missing + /// sections or fields while retaining backwards compatibility. pub fn parse(toml: &str) -> Result<Config, Error> { let table = try!(parse_table(&toml)); - let auth_cfg = if let Some(auth) = table.get("auth") { - let parsed = try!(decode_section(auth.clone())); - Some(try!(bootstrap_credentials(parsed))) - } else { - None - }; - - let dbus_cfg = if let Some(dbus) = table.get("dbus") { - Some(try!(decode_section(dbus.clone()))) - } else { - None - }; - - let rvi_cfg = if let Some(rvi) = table.get("rvi") { - Some(try!(decode_section(rvi.clone()))) - } else { - None - }; + let mut auth: Option<ParsedAuthConfig> = try!(maybe_parse_section(&table, "auth")); + let mut core: ParsedCoreConfig = try!(parse_section(&table, "core")); + let mut dbus: Option<ParsedDBusConfig> = try!(maybe_parse_section(&table, "dbus")); + let mut device: ParsedDeviceConfig = try!(parse_section(&table, "device")); + let mut gateway: ParsedGatewayConfig = try!(parse_section(&table, "gateway")); + let mut network: ParsedNetworkConfig = try!(parse_section(&table, "network")); + let mut rvi: Option<ParsedRviConfig> = try!(maybe_parse_section(&table, "rvi")); + + if let Some(cfg) = auth { + auth = Some(try!(bootstrap_credentials(cfg))); + } + + try!(apply_transformations(&mut auth, &mut core, &mut dbus, &mut device, + &mut gateway, &mut network, &mut rvi)); Ok(Config { - auth: auth_cfg, - core: try!(read_section(&table, "core")), - dbus: dbus_cfg, - device: try!(read_section(&table, "device")), - gateway: try!(read_section(&table, "gateway")), - network: try!(read_section(&table, "network")), - rvi: rvi_cfg, + auth: auth.map(|mut cfg| cfg.defaultify()), + core: core.defaultify(), + dbus: dbus.map(|mut cfg| cfg.defaultify()), + device: device.defaultify(), + gateway: gateway.defaultify(), + network: network.defaultify(), + rvi: rvi.map(|mut cfg| cfg.defaultify()) }) } } @@ -72,15 +72,15 @@ fn parse_table(toml: &str) -> Result<Table, Error> { Ok(try!(parser.parse().ok_or_else(move || parser.errors))) } -fn read_section<T: Decodable>(table: &Table, section: &str) -> Result<T, Error> { - let part = try!(table.get(section) - .ok_or_else(|| Error::Parse(format!("invalid section: {}", section)))); - decode_section(part.clone()) +fn parse_section<T: Decodable + Default>(table: &Table, section: &str) -> Result<T, Error> { + Ok(try!(maybe_parse_section(table, section)).unwrap_or(T::default())) } -fn decode_section<T: Decodable>(section: Value) -> Result<T, Error> { - let mut decoder = Decoder::new(section); - Ok(try!(T::decode(&mut decoder))) +fn maybe_parse_section<T: Decodable>(table: &Table, section: &str) -> Result<Option<T>, Error> { + table.get(section).map_or(Ok(None), |sect| { + let mut decoder = Decoder::new(sect.clone()); + Ok(Some(try!(T::decode(&mut decoder)))) + }) } @@ -92,8 +92,8 @@ struct CredentialsFile { // Read AuthConfig values from the credentials file if it exists, or write the // current AuthConfig values to a new credentials file otherwise. -fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> { - let creds = auth_cfg.credentials_file.clone(); +fn bootstrap_credentials(auth: ParsedAuthConfig) -> Result<ParsedAuthConfig, Error> { + let creds = auth.credentials_file.expect("couldn't get credentials_file"); let path = Path::new(&creds); debug!("bootstrap_credentials: {:?}", path); @@ -102,14 +102,16 @@ fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> { let mut text = String::new(); try!(file.read_to_string(&mut text)); let table = try!(parse_table(&text)); - try!(read_section::<CredentialsFile>(&table, "auth")) + let auth = try!(table.get("auth").ok_or(Error::Config("no [auth] section".to_string()))); + let mut decoder = Decoder::new(auth.clone()); + try!(CredentialsFile::decode(&mut decoder)) } Err(ref err) if err.kind() == ErrorKind::NotFound => { let mut table = Table::new(); let credentials = CredentialsFile { - client_id: auth_cfg.client_id, - client_secret: auth_cfg.client_secret + client_id: auth.client_id.expect("expected client_id"), + client_secret: auth.client_secret.expect("expected client_secret") }; table.insert("auth".to_string(), toml::encode(&credentials)); @@ -127,16 +129,52 @@ fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> { Err(err) => return Err(Error::Io(err)) }; - Ok(AuthConfig { - server: auth_cfg.server, - client_id: credentials.client_id, - client_secret: credentials.client_secret, - credentials_file: auth_cfg.credentials_file, + Ok(ParsedAuthConfig { + server: auth.server, + client_id: Some(credentials.client_id), + client_secret: Some(credentials.client_secret), + credentials_file: Some(creds.clone()), }) } -/// A parsed representation of the [auth] configuration section. +// Apply transformations from old to new config fields for backwards compatibility. +fn apply_transformations(_: &mut Option<ParsedAuthConfig>, + core: &mut ParsedCoreConfig, + _: &mut Option<ParsedDBusConfig>, + device: &mut ParsedDeviceConfig, + _: &mut ParsedGatewayConfig, + _: &mut ParsedNetworkConfig, + _: &mut Option<ParsedRviConfig>) -> Result<(), Error> { + + match (device.polling_interval, core.polling_sec) { + (Some(_), Some(_)) => { + return Err(Error::Config("core.polling_sec and device.polling_interval both set".to_string())) + } + + (Some(interval), None) => { + if interval > 0 { + core.polling = Some(true); + core.polling_sec = Some(interval); + } else { + core.polling = Some(false); + } + } + + _ => () + } + + Ok(()) +} + + +/// Trait used to overwrite any `None` fields in a config with its default value. +trait Defaultify<T: Default> { + fn defaultify(&mut self) -> T; +} + + +/// The [auth] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct AuthConfig { pub server: Url, @@ -146,7 +184,7 @@ pub struct AuthConfig { } impl Default for AuthConfig { - fn default() -> AuthConfig { + fn default() -> Self { AuthConfig { server: "http://127.0.0.1:9001".parse().unwrap(), client_id: "client-id".to_string(), @@ -156,23 +194,86 @@ impl Default for AuthConfig { } } +#[derive(RustcDecodable)] +struct ParsedAuthConfig { + server: Option<Url>, + client_id: Option<String>, + client_secret: Option<String>, + credentials_file: Option<String>, +} -/// A parsed representation of the [core] configuration section. +impl Default for ParsedAuthConfig { + fn default() -> Self { + ParsedAuthConfig { + server: None, + client_id: None, + client_secret: None, + credentials_file: None + } + } +} + +impl Defaultify<AuthConfig> for ParsedAuthConfig { + fn defaultify(&mut self) -> AuthConfig { + let default = AuthConfig::default(); + AuthConfig { + server: self.server.take().unwrap_or(default.server), + client_id: self.client_id.take().unwrap_or(default.client_id), + client_secret: self.client_secret.take().unwrap_or(default.client_secret), + credentials_file: self.credentials_file.take().unwrap_or(default.credentials_file) + } + } +} + + +/// The [core] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct CoreConfig { - pub server: Url + pub server: Url, + pub polling: bool, + pub polling_sec: u64 } impl Default for CoreConfig { fn default() -> CoreConfig { CoreConfig { - server: "http://127.0.0.1:8080".parse().unwrap() + server: "http://127.0.0.1:8080".parse().unwrap(), + polling: true, + polling_sec: 10 + } + } +} + +#[derive(RustcDecodable)] +struct ParsedCoreConfig { + server: Option<Url>, + polling: Option<bool>, + polling_sec: Option<u64> +} + +impl Default for ParsedCoreConfig { + fn default() -> Self { + ParsedCoreConfig { + server: None, + polling: None, + polling_sec: None + } + } +} + +impl Defaultify<CoreConfig> for ParsedCoreConfig { + fn defaultify(&mut self) -> CoreConfig { + let default = CoreConfig::default(); + CoreConfig { + server: self.server.take().unwrap_or(default.server), + polling: self.polling.take().unwrap_or(default.polling), + polling_sec: self.polling_sec.take().unwrap_or(default.polling_sec) } } } -/// A parsed representation of the [dbus] configuration section. +/// The [dbus] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct DBusConfig { pub name: String, @@ -180,7 +281,7 @@ pub struct DBusConfig { pub interface: String, pub software_manager: String, pub software_manager_path: String, - pub timeout: i32, // dbus-rs expects a signed int + pub timeout: i32, } impl Default for DBusConfig { @@ -196,17 +297,53 @@ impl Default for DBusConfig { } } +#[derive(RustcDecodable)] +struct ParsedDBusConfig { + name: Option<String>, + path: Option<String>, + interface: Option<String>, + software_manager: Option<String>, + software_manager_path: Option<String>, + timeout: Option<i32>, +} + +impl Default for ParsedDBusConfig { + fn default() -> Self { + ParsedDBusConfig { + name: None, + path: None, + interface: None, + software_manager: None, + software_manager_path: None, + timeout: None + } + } +} + +impl Defaultify<DBusConfig> for ParsedDBusConfig { + fn defaultify(&mut self) -> DBusConfig { + let default = DBusConfig::default(); + DBusConfig { + name: self.name.take().unwrap_or(default.name), + path: self.path.take().unwrap_or(default.path), + interface: self.interface.take().unwrap_or(default.interface), + software_manager: self.software_manager.take().unwrap_or(default.software_manager), + software_manager_path: self.software_manager_path.take().unwrap_or(default.software_manager_path), + timeout: self.timeout.take().unwrap_or(default.timeout) + } + } +} + -/// A parsed representation of the [device] configuration section. +/// The [device] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct DeviceConfig { pub uuid: String, pub vin: String, pub packages_dir: String, pub package_manager: PackageManager, - pub system_info: SystemInfo, - pub polling_interval: u64, pub certificates_path: String, + pub system_info: Option<String>, } impl Default for DeviceConfig { @@ -215,16 +352,54 @@ impl Default for DeviceConfig { uuid: "123e4567-e89b-12d3-a456-426655440000".to_string(), vin: "V1234567890123456".to_string(), packages_dir: "/tmp/".to_string(), - package_manager: PackageManager::Deb, - system_info: SystemInfo::default(), - polling_interval: 10, - certificates_path: "/tmp/sota_certificates".to_string() + package_manager: PackageManager::Off, + certificates_path: "/tmp/sota_certificates".to_string(), + system_info: Some("system_info.sh".to_string()) + } + } +} + +#[derive(RustcDecodable)] +struct ParsedDeviceConfig { + pub uuid: Option<String>, + pub vin: Option<String>, + pub packages_dir: Option<String>, + pub package_manager: Option<PackageManager>, + pub polling_interval: Option<u64>, + pub certificates_path: Option<String>, + pub system_info: Option<String>, +} + +impl Default for ParsedDeviceConfig { + fn default() -> Self { + ParsedDeviceConfig { + uuid: None, + vin: None, + packages_dir: None, + package_manager: None, + polling_interval: None, + certificates_path: None, + system_info: None, + } + } +} + +impl Defaultify<DeviceConfig> for ParsedDeviceConfig { + fn defaultify(&mut self) -> DeviceConfig { + let default = DeviceConfig::default(); + DeviceConfig { + uuid: self.uuid.take().unwrap_or(default.uuid), + vin: self.vin.take().unwrap_or(default.vin), + packages_dir: self.packages_dir.take().unwrap_or(default.packages_dir), + package_manager: self.package_manager.take().unwrap_or(default.package_manager), + certificates_path: self.certificates_path.take().unwrap_or(default.certificates_path), + system_info: self.system_info.take().or(default.system_info), } } } -/// A parsed representation of the [gateway] configuration section. +/// The [gateway] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct GatewayConfig { pub console: bool, @@ -243,17 +418,54 @@ impl Default for GatewayConfig { http: false, rvi: false, socket: false, - websocket: true, + websocket: false, + } + } +} + +#[derive(RustcDecodable)] +struct ParsedGatewayConfig { + console: Option<bool>, + dbus: Option<bool>, + http: Option<bool>, + rvi: Option<bool>, + socket: Option<bool>, + websocket: Option<bool>, +} + +impl Default for ParsedGatewayConfig { + fn default() -> Self { + ParsedGatewayConfig { + console: None, + dbus: None, + http: None, + rvi: None, + socket: None, + websocket: None } } } +impl Defaultify<GatewayConfig> for ParsedGatewayConfig { + fn defaultify(&mut self) -> GatewayConfig { + let default = GatewayConfig::default(); + GatewayConfig { + console: self.console.take().unwrap_or(default.console), + dbus: self.dbus.take().unwrap_or(default.dbus), + http: self.http.take().unwrap_or(default.http), + rvi: self.rvi.take().unwrap_or(default.rvi), + socket: self.socket.take().unwrap_or(default.socket), + websocket: self.websocket.take().unwrap_or(default.websocket) + } + } +} -/// A parsed representation of the [network] configuration section. + +/// The [network] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct NetworkConfig { - pub http_server: String, - pub rvi_edge_server: String, + pub http_server: SocketAddr, + pub rvi_edge_server: SocketAddr, pub socket_commands_path: String, pub socket_events_path: String, pub websocket_server: String @@ -262,17 +474,51 @@ pub struct NetworkConfig { impl Default for NetworkConfig { fn default() -> NetworkConfig { NetworkConfig { - http_server: "http://127.0.0.1:8888".to_string(), - rvi_edge_server: "http://127.0.0.1:9080".to_string(), + http_server: "127.0.0.1:8888".parse().unwrap(), + rvi_edge_server: "127.0.0.1:9080".parse().unwrap(), socket_commands_path: "/tmp/sota-commands.socket".to_string(), socket_events_path: "/tmp/sota-events.socket".to_string(), - websocket_server: "ws://127.0.0.1:3012".to_string() + websocket_server: "127.0.0.1:3012".to_string() } } } +#[derive(RustcDecodable)] +struct ParsedNetworkConfig { + http_server: Option<SocketAddr>, + rvi_edge_server: Option<SocketAddr>, + socket_commands_path: Option<String>, + socket_events_path: Option<String>, + websocket_server: Option<String> +} + +impl Default for ParsedNetworkConfig { + fn default() -> Self { + ParsedNetworkConfig { + http_server: None, + rvi_edge_server: None, + socket_commands_path: None, + socket_events_path: None, + websocket_server: None + } + } +} -/// A parsed representation of the [rvi] configuration section. +impl Defaultify<NetworkConfig> for ParsedNetworkConfig { + fn defaultify(&mut self) -> NetworkConfig { + let default = NetworkConfig::default(); + NetworkConfig { + http_server: self.http_server.take().unwrap_or(default.http_server), + rvi_edge_server: self.rvi_edge_server.take().unwrap_or(default.rvi_edge_server), + socket_commands_path: self.socket_commands_path.take().unwrap_or(default.socket_commands_path), + socket_events_path: self.socket_events_path.take().unwrap_or(default.socket_events_path), + websocket_server: self.websocket_server.take().unwrap_or(default.websocket_server) + } + } +} + + +/// The [rvi] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct RviConfig { pub client: Url, @@ -285,7 +531,35 @@ impl Default for RviConfig { RviConfig { client: "http://127.0.0.1:8901".parse().unwrap(), storage_dir: "/var/sota".to_string(), - timeout: Some(20), + timeout: None, + } + } +} + +#[derive(RustcDecodable)] +struct ParsedRviConfig { + client: Option<Url>, + storage_dir: Option<String>, + timeout: Option<i64>, +} + +impl Default for ParsedRviConfig { + fn default() -> Self { + ParsedRviConfig { + client: None, + storage_dir: None, + timeout: None + } + } +} + +impl Defaultify<RviConfig> for ParsedRviConfig { + fn defaultify(&mut self) -> RviConfig { + let default = RviConfig::default(); + RviConfig { + client: self.client.take().unwrap_or(default.client), + storage_dir: self.storage_dir.take().unwrap_or(default.storage_dir), + timeout: self.timeout.take().or(default.timeout) } } } @@ -309,6 +583,8 @@ mod tests { r#" [core] server = "http://127.0.0.1:8080" + polling = true + polling_sec = 10 "#; const DBUS_CONFIG: &'static str = @@ -327,11 +603,10 @@ mod tests { [device] uuid = "123e4567-e89b-12d3-a456-426655440000" vin = "V1234567890123456" - system_info = "system_info.sh" - polling_interval = 10 packages_dir = "/tmp/" - package_manager = "deb" + package_manager = "off" certificates_path = "/tmp/sota_certificates" + system_info = "system_info.sh" "#; const GATEWAY_CONFIG: &'static str = @@ -342,17 +617,17 @@ mod tests { http = false rvi = false socket = false - websocket = true + websocket = false "#; const NETWORK_CONFIG: &'static str = r#" [network] - http_server = "http://127.0.0.1:8888" - rvi_edge_server = "http://127.0.0.1:9080" + http_server = "127.0.0.1:8888" + rvi_edge_server = "127.0.0.1:9080" socket_commands_path = "/tmp/sota-commands.socket" socket_events_path = "/tmp/sota-events.socket" - websocket_server = "ws://127.0.0.1:3012" + websocket_server = "127.0.0.1:3012" "#; const RVI_CONFIG: &'static str = @@ -365,7 +640,12 @@ mod tests { #[test] - fn parse_default_config() { + fn empty_config() { + assert_eq!(Config::parse("").unwrap(), Config::default()); + } + + #[test] + fn basic_config() { let config = String::new() + CORE_CONFIG + DEVICE_CONFIG @@ -375,7 +655,7 @@ mod tests { } #[test] - fn parse_example_config() { + fn default_config() { let config = String::new() + AUTH_CONFIG + CORE_CONFIG @@ -384,6 +664,13 @@ mod tests { + GATEWAY_CONFIG + NETWORK_CONFIG + RVI_CONFIG; - assert_eq!(Config::load("tests/sota.toml").unwrap(), Config::parse(&config).unwrap()); + assert_eq!(Config::load("tests/toml/default.toml").unwrap(), Config::parse(&config).unwrap()); + } + + #[test] + fn backwards_compatible_config() { + let config = Config::load("tests/toml/old.toml").unwrap(); + assert_eq!(config.core.polling, true); + assert_eq!(config.core.polling_sec, 10); } } diff --git a/src/datatype/error.rs b/src/datatype/error.rs index 8267234..9503e2c 100644 --- a/src/datatype/error.rs +++ b/src/datatype/error.rs @@ -13,7 +13,7 @@ use toml::{ParserError as TomlParserError, DecodeError as TomlDecodeError}; use url::ParseError as UrlParseError; use datatype::Event; -use http::auth_client::AuthHandler; +use http::{AuthHandler, ResponseData}; use gateway::Interpret; use ws::Error as WebsocketError; @@ -21,10 +21,12 @@ use ws::Error as WebsocketError; /// System-wide errors that are returned from `Result` type failures. #[derive(Debug)] pub enum Error { - Authorization(String), Client(String), Command(String), + Config(String), FromUtf8(FromUtf8Error), + Http(ResponseData), + HttpAuth(ResponseData), Hyper(HyperError), HyperClient(HyperClientError<AuthHandler>), Io(IoError), @@ -76,6 +78,7 @@ derive_from!([ JsonEncoderError => JsonEncoder, JsonDecoderError => JsonDecoder, RecvError => Recv, + ResponseData => Http, TomlDecodeError => TomlDecode, UrlParseError => UrlParse, WebsocketError => Websocket @@ -92,9 +95,11 @@ impl Display for Error { fn fmt(&self, f: &mut Formatter) -> FmtResult { let inner: String = match *self { Error::Client(ref s) => format!("Http client error: {}", s.clone()), - Error::Authorization(ref s) => format!("Http client authorization error: {}", s.clone()), Error::Command(ref e) => format!("Unknown Command: {}", e.clone()), + Error::Config(ref s) => format!("Bad Config: {}", s.clone()), Error::FromUtf8(ref e) => format!("From utf8 error: {}", e.clone()), + Error::Http(ref r) => format!("HTTP client error: {}", r.clone()), + Error::HttpAuth(ref r) => format!("HTTP authorization error: {}", r.clone()), Error::Hyper(ref e) => format!("Hyper error: {}", e.clone()), Error::HyperClient(ref e) => format!("Hyper client error: {}", e.clone()), Error::Io(ref e) => format!("IO error: {}", e.clone()), diff --git a/src/datatype/event.rs b/src/datatype/event.rs index e3f84ca..93ed4ec 100644 --- a/src/datatype/event.rs +++ b/src/datatype/event.rs @@ -1,7 +1,7 @@ use std::fmt::{Display, Formatter, Result as FmtResult}; use datatype::{DownloadComplete, Package, UpdateAvailable, UpdateReport, - UpdateRequestId}; + UpdateRequest, UpdateRequestId}; /// System-wide events that are broadcast to all interested parties. @@ -14,22 +14,20 @@ pub enum Event { Authenticated, /// An operation failed because we are not currently authenticated. NotAuthenticated, + /// Nothing was done as we are already authenticated. + AlreadyAuthenticated, - /// There are new updates available. - NewUpdatesReceived(Vec<UpdateRequestId>), - /// A notification from RVI of a new update. - NewUpdateAvailable(UpdateAvailable), - /// There are no new updates available. - NoNewUpdates, + /// A notification from Core of pending or in-flight updates. + UpdatesReceived(Vec<UpdateRequest>), + /// A notification from RVI of a pending update. + UpdateAvailable(UpdateAvailable), + /// There are no outstanding update requests. + NoUpdateRequests, /// The following packages are installed on the device. FoundInstalledPackages(Vec<Package>), /// An update on the system information was received. FoundSystemInfo(String), - /// A list of installed packages was sent to the Core server. - InstalledPackagesSent, - /// An update report was sent to the Core server. - UpdateReportSent, /// Downloading an update. DownloadingUpdate(UpdateRequestId), @@ -45,6 +43,15 @@ pub enum Event { /// The installation of an update failed. InstallFailed(UpdateReport), + /// An update report was sent to the Core server. + UpdateReportSent, + /// A list of installed packages was sent to the Core server. + InstalledPackagesSent, + /// A list of installed software was sent to the Core server. + InstalledSoftwareSent, + /// The system information was sent to the Core server. + SystemInfoSent, + /// A broadcast event requesting an update on externally installed software. InstalledSoftwareNeeded, } diff --git a/src/datatype/json_rpc.rs b/src/datatype/json_rpc.rs index 3eed9a2..e3a046a 100644 --- a/src/datatype/json_rpc.rs +++ b/src/datatype/json_rpc.rs @@ -1,7 +1,7 @@ use rustc_serialize::{json, Decodable, Encodable}; use time; -use http::{AuthClient, Client}; +use http::{AuthClient, Client, Response}; use super::Url; @@ -32,8 +32,12 @@ impl<E: Encodable> RpcRequest<E> { let body = json::encode(self).expect("couldn't encode RpcRequest"); let resp_rx = client.post(url, Some(body.into_bytes())); let resp = resp_rx.recv().expect("no RpcRequest response received"); - let data = try!(resp.map_err(|err| format!("{}", err))); - String::from_utf8(data).map_err(|err| format!("{}", err)) + + match resp { + Response::Success(data) => String::from_utf8(data.body).or_else(|err| Err(format!("{}", err))), + Response::Failed(data) => Err(format!("{}", data)), + Response::Error(err) => Err(format!("{}", err)) + } } } diff --git a/src/datatype/mod.rs b/src/datatype/mod.rs index 8a9ca4e..71e7e0a 100644 --- a/src/datatype/mod.rs +++ b/src/datatype/mod.rs @@ -5,22 +5,23 @@ pub mod dbus; pub mod error; pub mod event; pub mod json_rpc; -pub mod package; -pub mod system_info; +pub mod network; +pub mod shell; pub mod update_report; -pub mod url; +pub mod update_request; -pub use self::auth::{AccessToken, Auth, ClientId, ClientSecret, ClientCredentials}; +pub use self::auth::{AccessToken, Auth, ClientCredentials}; pub use self::command::Command; pub use self::config::{AuthConfig, CoreConfig, Config, DBusConfig, DeviceConfig, GatewayConfig, RviConfig}; pub use self::error::Error; pub use self::event::Event; pub use self::json_rpc::{RpcRequest, RpcOk, RpcErr}; -pub use self::package::{ChunkReceived, DownloadStarted, DownloadComplete, Package, - PendingUpdateRequest, UpdateAvailable, UpdateRequestId}; -pub use self::system_info::SystemInfo; +pub use self::network::{Method, SocketAddr, Url}; +pub use self::shell::system_info; pub use self::update_report::{DeviceReport, InstalledFirmware, InstalledPackage, InstalledSoftware, OperationResult, UpdateResultCode, UpdateReport}; -pub use self::url::{Method, Url}; +pub use self::update_request::{ChunkReceived, DownloadComplete, DownloadFailed, + DownloadStarted, Package, UpdateAvailable, + UpdateRequest, UpdateRequestId, UpdateRequestStatus}; diff --git a/src/datatype/url.rs b/src/datatype/network.rs index 5a9c97a..56f88ae 100644 --- a/src/datatype/url.rs +++ b/src/datatype/network.rs @@ -2,17 +2,53 @@ use hyper::method; use rustc_serialize::{Decoder, Decodable}; use std::borrow::Cow; use std::fmt::{Display, Formatter, Result as FmtResult}; -use std::io; -use std::net::ToSocketAddrs; +use std::net::{SocketAddr as StdSocketAddr}; +use std::ops::Deref; use std::str::FromStr; use url; -use url::SocketAddrs; use datatype::Error; -/// Encapsulate a single crate URL with additional methods and traits. -#[derive(PartialEq, Eq, Clone, Debug)] +/// Encapsulate a socket address for implementing additional traits. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SocketAddr(pub StdSocketAddr); + +impl Decodable for SocketAddr { + fn decode<D: Decoder>(d: &mut D) -> Result<SocketAddr, D::Error> { + let addr = try!(d.read_str()); + addr.parse().or_else(|err| Err(d.error(&format!("{}", err)))) + } +} + +impl FromStr for SocketAddr { + type Err = Error; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match StdSocketAddr::from_str(s) { + Ok(addr) => Ok(SocketAddr(addr)), + Err(err) => Err(Error::Parse(format!("couldn't parse SocketAddr: {}", err))) + } + } +} + +impl Deref for SocketAddr { + type Target = StdSocketAddr; + + fn deref(&self) -> &StdSocketAddr { + &self.0 + } +} + +impl Display for SocketAddr { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{}", self.0) + } +} + + +/// Encapsulate a url with additional methods and traits. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Url(pub url::Url); impl Url { @@ -21,11 +57,6 @@ impl Url { let url = try!(self.0.join(suffix)); Ok(Url(url)) } - - /// Return the encapsulated crate URL. - pub fn inner(&self) -> url::Url { - self.0.clone() - } } impl<'a> Into<Cow<'a, Url>> for Url { @@ -50,11 +81,11 @@ impl Decodable for Url { } } -impl ToSocketAddrs for Url { - type Iter = SocketAddrs; +impl Deref for Url { + type Target = url::Url; - fn to_socket_addrs(&self) -> io::Result<Self::Iter> { - self.0.to_socket_addrs() + fn deref(&self) -> &url::Url { + &self.0 } } diff --git a/src/datatype/shell.rs b/src/datatype/shell.rs new file mode 100644 index 0000000..d7cd4af --- /dev/null +++ b/src/datatype/shell.rs @@ -0,0 +1,12 @@ +use std::process::Command; + +use datatype::Error; + + +/// Generate a new system information report. +pub fn system_info(cmd: &str) -> Result<String, Error> { + Command::new(cmd) + .output() + .map_err(|err| Error::SystemInfo(err.to_string())) + .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8)) +} diff --git a/src/datatype/system_info.rs b/src/datatype/system_info.rs deleted file mode 100644 index 2d8fff2..0000000 --- a/src/datatype/system_info.rs +++ /dev/null @@ -1,46 +0,0 @@ -use rustc_serialize::{Decoder, Decodable}; -use std::process::Command; -use std::str::FromStr; - -use datatype::Error; - - -/// A reference to the command that will report on the system information. -#[derive(PartialEq, Eq, Debug, Clone)] -pub struct SystemInfo { - command: String -} - -impl SystemInfo { - /// Instantiate a new type to report on the system information. - pub fn new(command: String) -> SystemInfo { - SystemInfo { command: command } - } - - /// Generate a new report of the system information. - pub fn report(&self) -> Result<String, Error> { - Command::new(&self.command) - .output().map_err(|err| Error::SystemInfo(err.to_string())) - .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8)) - } -} - -impl Default for SystemInfo { - fn default() -> SystemInfo { - SystemInfo::new("system_info.sh".to_string()) - } -} - -impl FromStr for SystemInfo { - type Err = Error; - - fn from_str(s: &str) -> Result<SystemInfo, Error> { - Ok(SystemInfo::new(s.to_string())) - } -} - -impl Decodable for SystemInfo { - fn decode<D: Decoder>(d: &mut D) -> Result<SystemInfo, D::Error> { - d.read_str().and_then(|s| Ok(s.parse::<SystemInfo>().unwrap())) - } -} diff --git a/src/datatype/package.rs b/src/datatype/update_request.rs index 146ff06..7398848 100644 --- a/src/datatype/package.rs +++ b/src/datatype/update_request.rs @@ -3,14 +3,36 @@ use std::fmt::{Display, Formatter, Result as FmtResult}; use rvi::services::LocalServices; -/// Encapsulate a `String` type used to represent the `Package` version. -pub type Version = String; +/// Encapsulate a `String` type as the id of a specific update request. +pub type UpdateRequestId = String; + +/// A device update request from Core to be installed by the client. +#[allow(non_snake_case)] +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] +pub struct UpdateRequest { + pub requestId: UpdateRequestId, + pub status: UpdateRequestStatus, + pub packageId: Package, + pub installPos: i32, + pub createdAt: String, +} + +/// The status of an `UpdateRequest` from Core. +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] +pub enum UpdateRequestStatus { + Pending, + InFlight, + Canceled, + Failed, + Finished +} + /// Encodes the name and version of a specific package. -#[derive(Debug, PartialEq, Eq, RustcEncodable, RustcDecodable, Clone)] +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] pub struct Package { pub name: String, - pub version: Version + pub version: String } impl Display for Package { @@ -20,19 +42,6 @@ impl Display for Package { } -/// Encapsulate a `String` type as the id of a specific update request. -pub type UpdateRequestId = String; - -/// A single pending update request to be installed by the client. -#[allow(non_snake_case)] -#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)] -pub struct PendingUpdateRequest { - pub requestId: UpdateRequestId, - pub installPos: i32, - pub packageId: Package, - pub createdAt: String -} - /// A notification from RVI that a new update is available. #[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] pub struct UpdateAvailable { @@ -59,8 +68,7 @@ pub struct ChunkReceived { pub chunks: Vec<u64>, } -/// A notification to indicate to any external package manager that the package -/// download has successfully completed. +/// A notification to an external package manager that the package was downloaded. #[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] pub struct DownloadComplete { pub update_id: String, @@ -68,12 +76,9 @@ pub struct DownloadComplete { pub signature: String } -impl Default for DownloadComplete { - fn default() -> Self { - DownloadComplete { - update_id: "".to_string(), - update_image: "".to_string(), - signature: "".to_string() - } - } +/// A notification to an external package manager that the package download failed. +#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] +pub struct DownloadFailed { + pub update_id: String, + pub reason: String } diff --git a/src/gateway/dbus.rs b/src/gateway/dbus.rs index 3bd41ea..321e262 100644 --- a/src/gateway/dbus.rs +++ b/src/gateway/dbus.rs @@ -24,7 +24,7 @@ impl Gateway for DBus { thread::spawn(move || { let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session"); - conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).unwrap(); + conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).expect("couldn't register name"); let mut obj_path = ObjectPath::new(&conn, &dbus_cfg.path, true); obj_path.insert_interface(&dbus_cfg.interface, default_interface(itx)); @@ -33,10 +33,11 @@ impl Gateway for DBus { loop { for item in conn.iter(1000) { if let ConnectionItem::MethodCall(mut msg) = item { - info!("DBus method call: {:?}", msg); - obj_path.handle_message(&mut msg).map(|result| { - let _ = result.map_err(|_| error!("dbus method call failed: {:?}", msg)); - }); + match obj_path.handle_message(&mut msg) { + Some(Ok(())) => info!("DBus message sent: {:?}", msg), + Some(Err(())) => error!("DBus message send failed: {:?}", msg), + None => debug!("unhandled dbus message: {:?}", msg) + } } } } @@ -47,8 +48,8 @@ impl Gateway for DBus { fn pulse(&self, event: Event) { match event { - Event::NewUpdateAvailable(avail) => { - let msg = self.new_message("updateAvailable", &[ + Event::UpdateAvailable(avail) => { + let msg = self.new_swm_message("updateAvailable", &[ MessageItem::from(avail.update_id), MessageItem::from(avail.signature), MessageItem::from(avail.description), @@ -59,7 +60,7 @@ impl Gateway for DBus { } Event::DownloadComplete(comp) => { - let msg = self.new_message("downloadComplete", &[ + let msg = self.new_swm_message("downloadComplete", &[ MessageItem::from(comp.update_image), MessageItem::from(comp.signature) ]); @@ -68,7 +69,7 @@ impl Gateway for DBus { } Event::InstalledSoftwareNeeded => { - let msg = self.new_message("getInstalledPackages", &[ + let msg = self.new_swm_message("getInstalledPackages", &[ MessageItem::from(true), // include packages? MessageItem::from(false) // include firmware? ]); @@ -103,7 +104,7 @@ impl Gateway for DBus { } impl DBus { - fn new_message(&self, method: &str, args: &[MessageItem]) -> Message { + fn new_swm_message(&self, method: &str, args: &[MessageItem]) -> Message { let mgr = self.dbus_cfg.software_manager.clone(); let path = self.dbus_cfg.software_manager_path.clone(); let result = Message::new_method_call(&mgr, &path, &mgr, method); @@ -139,19 +140,19 @@ fn send(itx: &Sender<Interpret>, cmd: Command) { fn handle_initiate_download(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult { let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg())); - debug!("handle_initiate_download: sender={:?}, msg={:?}", sender, msg); + debug!("dbus handle_initiate_download: sender={:?}, msg={:?}", sender, msg); let mut args = msg.get_items().into_iter(); let arg_id = try!(args.next().ok_or(dbus::missing_arg())); let update_id: &String = try!(FromMessageItem::from(&arg_id).or(Err(dbus::malformed_arg()))); - send(itx, Command::StartDownload(vec![update_id.clone()])); + send(itx, Command::StartDownload(update_id.clone())); Ok(vec![]) } fn handle_update_report(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult { let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg())); - debug!("handle_update_report: sender ={:?}, msg ={:?}", sender, msg); + debug!("dbus handle_update_report: sender={:?}, msg={:?}", sender, msg); let mut args = msg.get_items().into_iter(); let id_arg = try!(args.next().ok_or(dbus::missing_arg())); diff --git a/src/gateway/http.rs b/src/gateway/http.rs index 990a1fc..52a271c 100644 --- a/src/gateway/http.rs +++ b/src/gateway/http.rs @@ -4,6 +4,7 @@ use hyper::StatusCode; use hyper::net::{HttpStream, Transport}; use hyper::server::{Server as HyperServer, Request as HyperRequest}; use rustc_serialize::json; +use std::net::SocketAddr; use std::thread; use std::sync::{Arc, Mutex}; @@ -14,16 +15,16 @@ use http::{Server, ServerHandler}; /// The `Http` gateway parses `Command`s from the body of incoming requests. pub struct Http { - pub server: String, + pub server: SocketAddr } impl Gateway for Http { fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { - let itx = Arc::new(Mutex::new(itx)); - let server = match HyperServer::http(&self.server.parse().expect("couldn't parse http address")) { - Ok(server) => server, - Err(err) => return Err(format!("couldn't start http gateway: {}", err)) - }; + let itx = Arc::new(Mutex::new(itx)); + let server = try!(HyperServer::http(&self.server).map_err(|err| { + format!("couldn't start http gateway: {}", err) + })); + thread::spawn(move || { let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap(); server.run(); @@ -91,7 +92,7 @@ mod tests { use super::*; use gateway::{Gateway, Interpret}; use datatype::{Command, Event}; - use http::{AuthClient, Client, set_ca_certificates}; + use http::{AuthClient, Client, Response, set_ca_certificates}; #[test] @@ -101,15 +102,15 @@ mod tests { let (etx, erx) = chan::sync::<Event>(0); let (itx, irx) = chan::sync::<Interpret>(0); - thread::spawn(move || Http { server: "127.0.0.1:8888".to_string() }.start(itx, erx)); + thread::spawn(move || Http { server: "127.0.0.1:8888".parse().unwrap() }.start(itx, erx)); thread::spawn(move || { let _ = etx; // move into this scope loop { let interpret = irx.recv().expect("itx is closed"); match interpret.command { - Command::StartDownload(ids) => { + Command::StartDownload(id) => { let tx = interpret.response_tx.unwrap(); - tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + tx.lock().unwrap().send(Event::FoundSystemInfo(id)); } _ => panic!("expected AcceptUpdates"), } @@ -119,13 +120,17 @@ mod tests { crossbeam::scope(|scope| { for id in 0..10 { scope.spawn(move || { - let cmd = Command::StartDownload(vec!(format!("{}", id))); + let cmd = Command::StartDownload(format!("{}", id)); let client = AuthClient::default(); let url = "http://127.0.0.1:8888".parse().unwrap(); let body = json::encode(&cmd).unwrap(); let resp_rx = client.post(url, Some(body.into_bytes())); - let resp = resp_rx.recv().unwrap().unwrap(); - let text = String::from_utf8(resp).unwrap(); + let resp = resp_rx.recv().unwrap(); + let text = match resp { + Response::Success(data) => String::from_utf8(data.body).unwrap(), + Response::Failed(data) => panic!("failed response: {}", data), + Response::Error(err) => panic!("error response: {}", err) + }; assert_eq!(json::decode::<Event>(&text).unwrap(), Event::FoundSystemInfo(format!("{}", id))); }); diff --git a/src/gateway/socket.rs b/src/gateway/socket.rs index f252e7c..571e34c 100644 --- a/src/gateway/socket.rs +++ b/src/gateway/socket.rs @@ -1,12 +1,12 @@ use chan; use chan::Sender; -use rustc_serialize::json; +use rustc_serialize::{Encodable, json}; use std::io::{BufReader, Read, Write}; use std::net::Shutdown; use std::sync::{Arc, Mutex}; use std::{fs, thread}; -use datatype::{Command, Error, Event}; +use datatype::{Command, DownloadFailed, Error, Event}; use super::{Gateway, Interpret}; use unix_socket::{UnixListener, UnixStream}; @@ -53,18 +53,32 @@ impl Gateway for Socket { } fn pulse(&self, event: Event) { - match event { + let output = match event { Event::DownloadComplete(dl) => { - let _ = UnixStream::connect(&self.events_path).map(|mut stream| { - stream.write_all(&json::encode(&dl).expect("couldn't encode Event").into_bytes()) - .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err)); - stream.shutdown(Shutdown::Write) - .unwrap_or_else(|err| error!("couldn't close events socket: {}", err)); - }).map_err(|err| error!("couldn't open events socket: {}", err)); + json::encode(&EventWrapper { + version: "0.1".to_string(), + event: "DownloadComplete".to_string(), + data: dl + }).expect("couldn't encode DownloadComplete event") + } + + Event::DownloadFailed(id, reason) => { + json::encode(&EventWrapper { + version: "0.1".to_string(), + event: "DownloadFailed".to_string(), + data: DownloadFailed { update_id: id, reason: reason } + }).expect("couldn't encode DownloadFailed event") } - _ => () - } + _ => return + }; + + let _ = UnixStream::connect(&self.events_path).map(|mut stream| { + stream.write_all(&output.into_bytes()) + .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err)); + stream.shutdown(Shutdown::Write) + .unwrap_or_else(|err| error!("couldn't close events socket: {}", err)); + }).map_err(|err| debug!("couldn't open events socket: {}", err)); } } @@ -85,6 +99,15 @@ fn handle_client(stream: &mut UnixStream, itx: Arc<Mutex<Sender<Interpret>>>) -> } +// FIXME(PRO-1322): create a proper JSON api +#[derive(RustcEncodable, RustcDecodable, PartialEq, Eq, Debug)] +pub struct EventWrapper<E: Encodable> { + pub version: String, + pub event: String, + pub data: E +} + + #[cfg(test)] mod tests { use chan; @@ -126,17 +149,19 @@ mod tests { let (mut stream, _) = server.accept().expect("couldn't read from events socket"); let mut text = String::new(); stream.read_to_string(&mut text).unwrap(); - let receive: DownloadComplete = json::decode(&text).expect("couldn't decode DownloadComplete message"); - assert_eq!(send, receive); + let receive: EventWrapper<DownloadComplete> = json::decode(&text).expect("couldn't decode Event"); + assert_eq!(receive.version, "0.1".to_string()); + assert_eq!(receive.event, "DownloadComplete".to_string()); + assert_eq!(receive.data, send); thread::spawn(move || { let _ = etx; // move into this scope loop { let interpret = irx.recv().expect("gtx is closed"); match interpret.command { - Command::StartDownload(ids) => { + Command::StartDownload(id) => { let tx = interpret.response_tx.unwrap(); - tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + tx.lock().unwrap().send(Event::FoundSystemInfo(id)); } _ => panic!("expected AcceptUpdates"), } diff --git a/src/gateway/websocket.rs b/src/gateway/websocket.rs index eb5e040..8c597b9 100644 --- a/src/gateway/websocket.rs +++ b/src/gateway/websocket.rs @@ -1,12 +1,11 @@ use chan; use chan::Sender; use rustc_serialize::json; -use std::thread; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::thread; use ws; -use ws::{listen, CloseCode, Handler, Handshake, Message, Sender as WsSender}; +use ws::{CloseCode, Handler, Handshake, Message, Sender as WsSender}; use ws::util::Token; use datatype::{Command, Error, Event}; @@ -24,10 +23,9 @@ impl Gateway for Websocket { fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { let clients = self.clients.clone(); let addr = self.server.clone(); - info!("Opening websocket listener at {}", addr); thread::spawn(move || { - listen(&addr as &str, |out| { + ws::listen(&addr as &str, |out| { WebsocketHandler { out: out, itx: itx.clone(), @@ -36,8 +34,7 @@ impl Gateway for Websocket { }).expect("couldn't start websocket listener"); }); - thread::sleep(Duration::from_secs(1)); // FIXME: ugly hack for blocking listen call - Ok(info!("Websocket gateway started.")) + Ok(info!("Websocket gateway started at {}.", self.server)) } fn pulse(&self, event: Event) { @@ -69,7 +66,7 @@ impl Handler for WebsocketHandler { Err(err) } - Err(_) => unreachable!() + Err(err) => panic!("unexpected websocket on_message error: {}", err) }) } @@ -117,7 +114,7 @@ mod tests { use std::collections::HashMap; use std::sync::{Arc, Mutex}; use ws; - use ws::{connect, CloseCode}; + use ws::CloseCode; use datatype::{Command, Event}; use gateway::{Gateway, Interpret}; @@ -125,6 +122,7 @@ mod tests { #[test] + #[ignore] // FIXME: wait for https://github.com/housleyjk/ws-rs/issues/64 fn websocket_connections() { let (etx, erx) = chan::sync::<Event>(0); let (itx, irx) = chan::sync::<Interpret>(0); @@ -140,9 +138,9 @@ mod tests { loop { let interpret = irx.recv().expect("gtx is closed"); match interpret.command { - Command::StartDownload(ids) => { + Command::StartDownload(id) => { let tx = interpret.response_tx.unwrap(); - tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + tx.lock().unwrap().send(Event::FoundSystemInfo(id)); } _ => panic!("expected AcceptUpdates"), } @@ -152,9 +150,9 @@ mod tests { crossbeam::scope(|scope| { for id in 0..10 { scope.spawn(move || { - connect("ws://localhost:3012", |out| { - out.send(format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id)) - .expect("couldn't write to websocket"); + ws::connect("ws://localhost:3012", |out| { + let msg = format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id); + out.send(msg).expect("couldn't write to websocket"); move |msg: ws::Message| { let ev: Event = json::decode(&format!("{}", msg)).unwrap(); diff --git a/src/http/auth_client.rs b/src/http/auth_client.rs index f4ad38b..3121c21 100644 --- a/src/http/auth_client.rs +++ b/src/http/auth_client.rs @@ -14,7 +14,7 @@ use std::time::Duration; use time; use datatype::{Auth, Error}; -use http::{Client, get_openssl, Request, Response}; +use http::{Client, get_openssl, Request, Response, ResponseData}; /// The `AuthClient` will attach an `Authentication` header to each outgoing @@ -51,59 +51,31 @@ impl AuthClient { impl Client for AuthClient { fn chan_request(&self, req: Request, resp_tx: Sender<Response>) { info!("{} {}", req.method, req.url); - let _ = self.client.request(req.url.inner(), AuthHandler { - auth: self.auth.clone(), - req: req, - timeout: Duration::from_secs(20), - started: None, - written: 0, - response: Vec::new(), - resp_tx: resp_tx.clone(), - }).map_err(|err| resp_tx.send(Err(Error::from(err)))); + let _ = self.client.request((*req.url).clone(), AuthHandler { + auth: self.auth.clone(), + req: req, + timeout: Duration::from_secs(20), + started: None, + written: 0, + resp_code: StatusCode::InternalServerError, + resp_body: Vec::new(), + resp_tx: resp_tx.clone(), + }).map_err(|err| resp_tx.send(Response::Error(Error::from(err)))); } } /// The async handler for outgoing HTTP requests. -// FIXME: uncomment when yocto is at 1.8.0: #[derive(Debug)] +#[derive(Debug)] pub struct AuthHandler { - auth: Auth, - req: Request, - timeout: Duration, - started: Option<u64>, - written: usize, - response: Vec<u8>, - resp_tx: Sender<Response>, -} - -// FIXME: required for building on 1.7.0 only -impl ::std::fmt::Debug for AuthHandler { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(f, "unimplemented") - } -} - -impl AuthHandler { - fn redirect_request(&mut self, resp: HyperResponse) { - match resp.headers().get::<Location>() { - Some(&Location(ref loc)) => self.req.url.join(loc).map(|url| { - debug!("redirecting to {}", url); - // drop Authentication Header on redirect - let client = AuthClient::default(); - let resp_rx = client.send_request(Request { - url: url, - method: self.req.method.clone(), - body: mem::replace(&mut self.req.body, None), - }); - match resp_rx.recv().expect("no redirect_request response") { - Ok(data) => self.resp_tx.send(Ok(data)), - Err(err) => self.resp_tx.send(Err(Error::from(err))) - } - }).unwrap_or_else(|err| self.resp_tx.send(Err(Error::from(err)))), - - None => self.resp_tx.send(Err(Error::Client("redirect missing Location header".to_string()))) - } - } + auth: Auth, + req: Request, + timeout: Duration, + started: Option<u64>, + written: usize, + resp_code: StatusCode, + resp_body: Vec<u8>, + resp_tx: Sender<Response>, } /// The `AuthClient` may be used for both HTTP and HTTPS connections. @@ -125,15 +97,12 @@ impl Handler<Stream> for AuthHandler { headers.set(ContentType(mime_json)); } - Auth::Credentials(_, _) if self.req.body.is_some() => { - panic!("no request body expected for Auth::Credentials"); - } - - Auth::Credentials(ref id, ref secret) => { - headers.set(Authorization(Basic { username: id.0.clone(), - password: Some(secret.0.clone()) })); + Auth::Credentials(ref cred) => { + headers.set(Authorization(Basic { + username: cred.client_id.clone(), + password: Some(cred.client_secret.clone()) + })); headers.set(ContentType(mime_form)); - self.req.body = Some(br#"grant_type=client_credentials"#.to_vec()); } Auth::Token(ref token) => { @@ -173,7 +142,7 @@ impl Handler<Stream> for AuthHandler { Err(err) => { error!("unable to write request body: {}", err); - self.resp_tx.send(Err(Error::from(err))); + self.resp_tx.send(Response::Error(Error::from(err))); Next::remove() } } @@ -186,31 +155,25 @@ impl Handler<Stream> for AuthHandler { let latency = time::precise_time_ns() as f64 - started as f64; debug!("on_response latency: {}ms", (latency / 1e6) as u32); - if resp.status().is_success() { - if let Some(len) = resp.headers().get::<ContentLength>() { - if **len > 0 { - return Next::read(); - } - } - self.resp_tx.send(Ok(Vec::new())); - Next::end() - } else if resp.status().is_redirection() { + if resp.status().is_redirection() { self.redirect_request(resp); Next::end() - } else if resp.status() == &StatusCode::Forbidden { - self.resp_tx.send(Err(Error::Authorization(format!("{}", resp.status())))); + } else if let None = resp.headers().get::<ContentLength>() { + self.send_response(ResponseData { code: *resp.status(), body: Vec::new() }); Next::end() } else { - self.resp_tx.send(Err(Error::Client(format!("{}", resp.status())))); - Next::end() + self.resp_code = *resp.status(); + Next::read() } } fn on_response_readable(&mut self, decoder: &mut Decoder<Stream>) -> Next { - match io::copy(decoder, &mut self.response) { + match io::copy(decoder, &mut self.resp_body) { Ok(0) => { - debug!("on_response_readable bytes read: {}", self.response.len()); - self.resp_tx.send(Ok(mem::replace(&mut self.response, Vec::new()))); + debug!("on_response_readable body size: {}", self.resp_body.len()); + let code = self.resp_code.clone(); + let body = mem::replace(&mut self.resp_body, Vec::new()); + self.send_response(ResponseData { code: code, body: body }); Next::end() } @@ -226,7 +189,7 @@ impl Handler<Stream> for AuthHandler { Err(err) => { error!("unable to read response body: {}", err); - self.resp_tx.send(Err(Error::from(err))); + self.resp_tx.send(Response::Error(Error::from(err))); Next::end() } } @@ -234,11 +197,41 @@ impl Handler<Stream> for AuthHandler { fn on_error(&mut self, err: hyper::Error) -> Next { error!("on_error: {}", err); - self.resp_tx.send(Err(Error::from(err))); + self.resp_tx.send(Response::Error(Error::from(err))); Next::remove() } } +impl AuthHandler { + fn send_response(&mut self, resp: ResponseData) { + if resp.code == StatusCode::Unauthorized || resp.code == StatusCode::Forbidden { + self.resp_tx.send(Response::Error(Error::HttpAuth(resp))); + } else if resp.code.is_success() { + self.resp_tx.send(Response::Success(resp)); + } else { + self.resp_tx.send(Response::Failed(resp)); + } + } + + fn redirect_request(&mut self, resp: HyperResponse) { + match resp.headers().get::<Location>() { + Some(&Location(ref loc)) => self.req.url.join(loc).map(|url| { + debug!("redirecting to {}", url); + // drop Authorization Header on redirect + let client = AuthClient::default(); + let resp_rx = client.send_request(Request { + url: url, + method: self.req.method.clone(), + body: mem::replace(&mut self.req.body, None), + }); + self.resp_tx.send(resp_rx.recv().expect("no redirect_request response")) + }).unwrap_or_else(|err| self.resp_tx.send(Response::Error(Error::from(err)))), + + None => self.resp_tx.send(Response::Error((Error::Client("redirect missing Location header".to_string())))) + } + } +} + #[cfg(test)] mod tests { @@ -246,7 +239,7 @@ mod tests { use std::path::Path; use super::*; - use http::{Client, set_ca_certificates}; + use http::{Client, Response, set_ca_certificates}; fn get_client() -> AuthClient { @@ -259,8 +252,13 @@ mod tests { let client = get_client(); let url = "http://eu.httpbin.org/bytes/16?seed=123".parse().unwrap(); let resp_rx = client.get(url, None); - let data = resp_rx.recv().unwrap().unwrap(); - assert_eq!(data, vec![13, 22, 104, 27, 230, 9, 137, 85, 218, 40, 86, 85, 62, 0, 111, 22]); + let resp = resp_rx.recv().unwrap(); + let expect = vec![13, 22, 104, 27, 230, 9, 137, 85, 218, 40, 86, 85, 62, 0, 111, 22]; + match resp { + Response::Success(data) => assert_eq!(data.body, expect), + Response::Failed(data) => panic!("failed response: {}", data), + Response::Error(err) => panic!("error response: {}", err) + }; } #[test] @@ -268,9 +266,13 @@ mod tests { let client = get_client(); let url = "https://eu.httpbin.org/post".parse().unwrap(); let resp_rx = client.post(url, Some(br#"foo"#.to_vec())); - let body = resp_rx.recv().unwrap().unwrap(); - let resp = String::from_utf8(body).unwrap(); - let json = Json::from_str(&resp).unwrap(); + let resp = resp_rx.recv().unwrap(); + let body = match resp { + Response::Success(data) => String::from_utf8(data.body).unwrap(), + Response::Failed(data) => panic!("failed response: {}", data), + Response::Error(err) => panic!("error response: {}", err) + }; + let json = Json::from_str(&body).unwrap(); let obj = json.as_object().unwrap(); let data = obj.get("data").unwrap().as_string().unwrap(); assert_eq!(data, "foo"); diff --git a/src/http/http_client.rs b/src/http/http_client.rs index 492166c..b911b8d 100644 --- a/src/http/http_client.rs +++ b/src/http/http_client.rs @@ -1,5 +1,8 @@ use chan; use chan::{Sender, Receiver}; +use hyper::status::StatusCode; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::str; use datatype::{Error, Method, Url}; @@ -39,5 +42,42 @@ pub struct Request { pub body: Option<Vec<u8>> } -/// Return the body of an HTTP response on success, or an `Error` otherwise. -pub type Response = Result<Vec<u8>, Error>; + +/// A Response enumerates between a successful (e.g. 2xx) HTTP response, a failed +/// (e.g. 4xx/5xx) response, or an Error before receiving any response. +#[derive(Debug)] +pub enum Response { + Success(ResponseData), + Failed(ResponseData), + Error(Error) +} + +impl Display for Response { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match *self { + Response::Success(ref data) => write!(f, "{}", data), + Response::Failed(ref data) => write!(f, "{}", data), + Response::Error(ref err) => write!(f, "{}", err), + } + } +} + + +/// Wraps the HTTP Status Code as well as any returned body. +#[derive(Debug)] +pub struct ResponseData { + pub code: StatusCode, + pub body: Vec<u8> +} + +impl Display for ResponseData { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self.body.len() { + 0 => write!(f, "Response Code: {}", self.code), + n => match str::from_utf8(&self.body) { + Ok(text) => write!(f, "Response Code: {}, Body:\n{}", self.code, text), + Err(_) => write!(f, "Response Code: {}, Body: {} bytes", self.code, n), + } + } + } +} diff --git a/src/http/mod.rs b/src/http/mod.rs index 5e990a3..11b1e3a 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -5,7 +5,7 @@ pub mod openssl; pub mod test_client; pub use self::auth_client::{AuthClient, AuthHandler}; -pub use self::http_client::{Client, Request, Response}; +pub use self::http_client::{Client, Request, Response, ResponseData}; pub use self::http_server::{Server, ServerHandler}; pub use self::openssl::{get_openssl, set_ca_certificates}; pub use self::test_client::TestClient; diff --git a/src/http/test_client.rs b/src/http/test_client.rs index 7857e0f..1886fdf 100644 --- a/src/http/test_client.rs +++ b/src/http/test_client.rs @@ -1,8 +1,9 @@ use chan::Sender; +use hyper::status::StatusCode; use std::cell::RefCell; use datatype::Error; -use http::{Client, Request, Response}; +use http::{Client, Request, Response, ResponseData}; /// The `TestClient` will return HTTP responses from an existing list of strings. @@ -26,8 +27,11 @@ impl TestClient { impl Client for TestClient { fn chan_request(&self, req: Request, resp_tx: Sender<Response>) { match self.responses.borrow_mut().pop() { - Some(body) => resp_tx.send(Ok(body.as_bytes().to_vec())), - None => resp_tx.send(Err(Error::Client(req.url.to_string()))) + Some(body) => resp_tx.send(Response::Success(ResponseData { + code: StatusCode::Ok, + body: body.as_bytes().to_vec() + })), + None => resp_tx.send(Response::Error(Error::Client(req.url.to_string()))) } } diff --git a/src/interpreter.rs b/src/interpreter.rs index b286ba5..d2d3f99 100644 --- a/src/interpreter.rs +++ b/src/interpreter.rs @@ -1,10 +1,13 @@ use chan; -use chan::{Sender, Receiver}; -use std; +use chan::{Sender, Receiver, WaitGroup}; +use std::{process, thread}; use std::borrow::Cow; +use std::time::Duration; +use time; -use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config, - Error, Event, Package, UpdateRequestId}; +use datatype::{AccessToken, Auth, ClientCredentials, Command, Config, Error, Event, + Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode, + system_info}; use gateway::Interpret; use http::{AuthClient, Client}; use oauth2::authenticate; @@ -18,9 +21,20 @@ use sota::Sota; pub trait Interpreter<I, O> { fn interpret(&mut self, input: I, otx: &Sender<O>); - fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) { + fn run(&mut self, irx: Receiver<I>, otx: Sender<O>, wg: WaitGroup) { + let cooldown = Duration::from_millis(100); + loop { - self.interpret(irx.recv().expect("interpreter sender closed"), &otx); + let input = irx.recv().expect("interpreter sender closed"); + let started = time::precise_time_ns(); + + wg.add(1); + trace!("interpreter starting: {}", started); + self.interpret(input, &otx); + + thread::sleep(cooldown); // let any further work commence + trace!("interpreter stopping: {}", started); + wg.done(); } } } @@ -29,35 +43,68 @@ pub trait Interpreter<I, O> { /// The `EventInterpreter` listens for `Event`s and optionally responds with /// `Command`s that may be sent to the `CommandInterpreter`. pub struct EventInterpreter { - pub package_manager: PackageManager + pub pacman: PackageManager, + pub sysinfo: Option<String>, } impl Interpreter<Event, Command> for EventInterpreter { fn interpret(&mut self, event: Event, ctx: &Sender<Command>) { - info!("Event received: {}", event); + info!("EventInterpreter received: {}", event); + match event { + Event::Authenticated => { + if self.pacman != PackageManager::Off { + self.pacman.installed_packages().map(|packages| { + ctx.send(Command::SendInstalledPackages(packages)); + }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); + } + + self.sysinfo.as_ref().map(|_| ctx.send(Command::SendSystemInfo)); + } + Event::NotAuthenticated => { info!("Trying to authenticate again..."); ctx.send(Command::Authenticate(None)); } - Event::NewUpdatesReceived(ids) => { - ctx.send(Command::StartDownload(ids)); + Event::UpdatesReceived(requests) => { + for request in requests { + let id = request.requestId.clone(); + match request.status { + Status::Pending => ctx.send(Command::StartDownload(id)), + + Status::InFlight if self.pacman != PackageManager::Off => { + if self.pacman.is_installed(&request.packageId) { + let report = UpdateReport::single(id, UpdateResultCode::OK, "".to_string()); + ctx.send(Command::SendUpdateReport(report)); + } else { + ctx.send(Command::StartDownload(id)); + } + } + + _ => () + } + } } Event::DownloadComplete(dl) => { - if self.package_manager != PackageManager::Off { - ctx.send(Command::StartInstall(dl)); + if self.pacman != PackageManager::Off { + ctx.send(Command::StartInstall(dl.update_id.clone())); } } - Event::InstallComplete(report) => { + Event::DownloadFailed(id, reason) => { + let report = UpdateReport::single(id, UpdateResultCode::GENERAL_ERROR, reason); + ctx.send(Command::SendUpdateReport(report)); + } + + Event::InstallComplete(report) | Event::InstallFailed(report) => { ctx.send(Command::SendUpdateReport(report)); } Event::UpdateReportSent => { - if self.package_manager != PackageManager::Off { - self.package_manager.installed_packages().map(|packages| { + if self.pacman != PackageManager::Off { + self.pacman.installed_packages().map(|packages| { ctx.send(Command::SendInstalledPackages(packages)); }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); } @@ -75,7 +122,7 @@ pub struct CommandInterpreter; impl Interpreter<Command, Interpret> for CommandInterpreter { fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) { - info!("Command received: {}", cmd); + info!("CommandInterpreter received: {}", cmd); itx.send(Interpret { command: cmd, response_tx: None }); } } @@ -88,13 +135,12 @@ pub struct GlobalInterpreter<'t> { pub config: Config, pub token: Option<Cow<'t, AccessToken>>, pub http_client: Box<Client>, - pub rvi: Option<Services>, - pub loopback_tx: Sender<Interpret>, + pub rvi: Option<Services> } impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) { - info!("Interpreter started: {}", interpret.command); + info!("GlobalInterpreter received: {}", interpret.command); let (multi_tx, multi_rx) = chan::async::<Event>(); let outcome = match (self.token.as_ref(), self.config.auth.is_none()) { @@ -109,21 +155,20 @@ impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { etx.send(ev.clone()); response_ev = Some(ev); } - info!("Interpreter finished."); } - Err(Error::Authorization(_)) => { + Err(Error::HttpAuth(resp)) => { + error!("HTTP authorization failed: {}", resp); + self.token = None; let ev = Event::NotAuthenticated; etx.send(ev.clone()); response_ev = Some(ev); - error!("Interpreter authentication failed"); } Err(err) => { let ev = Event::Error(format!("{}", err)); etx.send(ev.clone()); response_ev = Some(ev); - error!("Interpreter failed: {}", err); } } @@ -138,16 +183,15 @@ impl<'t> GlobalInterpreter<'t> { // always send at least one Event response match cmd { - Command::Authenticate(_) => etx.send(Event::Authenticated), + Command::Authenticate(_) => etx.send(Event::AlreadyAuthenticated), - Command::GetNewUpdates => { - let mut updates = try!(sota.get_pending_updates()); + Command::GetUpdateRequests => { + let mut updates = try!(sota.get_update_requests()); if updates.is_empty() { - etx.send(Event::NoNewUpdates); + etx.send(Event::NoUpdateRequests); } else { updates.sort_by_key(|u| u.installPos); - let ids = updates.iter().map(|u| u.requestId.clone()).collect::<Vec<UpdateRequestId>>(); - etx.send(Event::NewUpdatesReceived(ids)) + etx.send(Event::UpdatesReceived(updates)); } } @@ -159,18 +203,13 @@ impl<'t> GlobalInterpreter<'t> { etx.send(Event::FoundInstalledPackages(packages)); } - Command::RefreshSystemInfo(post) => { - let info = try!(self.config.device.system_info.report()); - etx.send(Event::FoundSystemInfo(info.clone())); - if post { - let _ = sota.send_system_info(&info) - .map_err(|err| etx.send(Event::Error(format!("{}", err)))); - } + Command::ListSystemInfo => { + let cmd = self.config.device.system_info.as_ref().expect("system_info command not set"); + etx.send(Event::FoundSystemInfo(try!(system_info(&cmd)))); } Command::SendInstalledPackages(packages) => { - let _ = sota.send_installed_packages(&packages) - .map_err(|err| error!("couldn't send installed packages: {}", err)); + try!(sota.send_installed_packages(&packages)); etx.send(Event::InstalledPackagesSent); } @@ -178,39 +217,43 @@ impl<'t> GlobalInterpreter<'t> { if let Some(ref rvi) = self.rvi { let _ = rvi.remote.lock().unwrap().send_installed_software(sw); } + etx.send(Event::InstalledSoftwareSent); + } + + Command::SendSystemInfo => { + let cmd = self.config.device.system_info.as_ref().expect("system_info command not set"); + try!(sota.send_system_info(&try!(system_info(&cmd)))); + etx.send(Event::SystemInfoSent); } Command::SendUpdateReport(report) => { if let Some(ref rvi) = self.rvi { let _ = rvi.remote.lock().unwrap().send_update_report(report); } else { - let _ = sota.send_update_report(&report) - .map_err(|err| error!("couldn't send update report: {}", err)); + try!(sota.send_update_report(&report)); } etx.send(Event::UpdateReportSent); } - Command::StartDownload(ref ids) => { - for id in ids { - etx.send(Event::DownloadingUpdate(id.clone())); - - if let Some(ref rvi) = self.rvi { - let _ = rvi.remote.lock().unwrap().send_download_started(id.clone()); - } else { - let _ = sota.download_update(id.clone()) - .map(|dl| etx.send(Event::DownloadComplete(dl))) - .map_err(|err| etx.send(Event::DownloadFailed(id.clone(), format!("{}", err)))); - } + Command::StartDownload(id) => { + etx.send(Event::DownloadingUpdate(id.clone())); + if let Some(ref rvi) = self.rvi { + let _ = rvi.remote.lock().unwrap().send_download_started(id); + } else { + let _ = sota.download_update(id.clone()) + .map(|dl| etx.send(Event::DownloadComplete(dl))) + .map_err(|err| etx.send(Event::DownloadFailed(id, format!("{}", err)))); } } - Command::StartInstall(dl) => { - let _ = sota.install_update(dl) + Command::StartInstall(id) => { + etx.send(Event::InstallingUpdate(id.clone())); + let _ = sota.install_update(id) .map(|report| etx.send(Event::InstallComplete(report))) .map_err(|report| etx.send(Event::InstallFailed(report))); } - Command::Shutdown => std::process::exit(0), + Command::Shutdown => process::exit(0), } Ok(()) @@ -220,8 +263,10 @@ impl<'t> GlobalInterpreter<'t> { match cmd { Command::Authenticate(_) => { let config = self.config.auth.clone().expect("trying to authenticate without auth config"); - self.set_client(Auth::Credentials(ClientId(config.client_id), - ClientSecret(config.client_secret))); + self.set_client(Auth::Credentials(ClientCredentials { + client_id: config.client_id, + client_secret: config.client_secret, + })); let server = config.server.join("/token").expect("couldn't build authentication url"); let token = try!(authenticate(server, self.http_client.as_ref())); self.set_client(Auth::Token(token.clone())); @@ -229,16 +274,9 @@ impl<'t> GlobalInterpreter<'t> { etx.send(Event::Authenticated); } - Command::GetNewUpdates | - Command::ListInstalledPackages | - Command::RefreshSystemInfo(_) | - Command::SendInstalledPackages(_) | - Command::SendInstalledSoftware(_) | - Command::SendUpdateReport(_) | - Command::StartDownload(_) | - Command::StartInstall(_) => etx.send(Event::NotAuthenticated), + Command::Shutdown => process::exit(0), - Command::Shutdown => std::process::exit(0), + _ => etx.send(Event::NotAuthenticated) } Ok(()) @@ -270,15 +308,13 @@ mod tests { fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) { let (etx, erx) = chan::sync::<Event>(0); let (ctx, crx) = chan::sync::<Command>(0); - let (itx, _) = chan::sync::<Interpret>(0); thread::spawn(move || { let mut gi = GlobalInterpreter { config: Config::default(), token: Some(AccessToken::default().into()), http_client: Box::new(TestClient::from(replies)), - rvi: None, - loopback_tx: itx, + rvi: None }; gi.config.device.package_manager = pkg_mgr; @@ -300,7 +336,7 @@ mod tests { let (ctx, erx) = new_interpreter(replies, pkg_mgr); ctx.send(Command::Authenticate(None)); - assert_rx(erx, &[Event::Authenticated]); + assert_rx(erx, &[Event::AlreadyAuthenticated]); } #[test] @@ -309,35 +345,26 @@ mod tests { let pkg_mgr = PackageManager::new_tpm(true); let (ctx, erx) = new_interpreter(replies, pkg_mgr); - ctx.send(Command::StartDownload(vec!["1".to_string(), "2".to_string()])); + ctx.send(Command::StartDownload("1".to_string())); assert_rx(erx, &[ Event::DownloadingUpdate("1".to_string()), Event::DownloadComplete(DownloadComplete { update_id: "1".to_string(), update_image: "/tmp/1".to_string(), signature: "".to_string() - }), - Event::DownloadingUpdate("2".to_string()), - Event::DownloadComplete(DownloadComplete { - update_id: "2".to_string(), - update_image: "/tmp/2".to_string(), - signature: "".to_string() - }), + }) ]); } #[test] - fn install_update() { + fn install_update_success() { let replies = vec!["[]".to_string(); 10]; let pkg_mgr = PackageManager::new_tpm(true); let (ctx, erx) = new_interpreter(replies, pkg_mgr); - ctx.send(Command::StartInstall(DownloadComplete { - update_id: "1".to_string(), - update_image: "/tmp/1".to_string(), - signature: "".to_string() - })); + ctx.send(Command::StartInstall("1".to_string())); assert_rx(erx, &[ + Event::InstallingUpdate("1".to_string()), Event::InstallComplete( UpdateReport::single("1".to_string(), UpdateResultCode::OK, "".to_string()) ) @@ -345,17 +372,14 @@ mod tests { } #[test] - fn failed_installation() { + fn install_update_failed() { let replies = vec!["[]".to_string(); 10]; let pkg_mgr = PackageManager::new_tpm(false); let (ctx, erx) = new_interpreter(replies, pkg_mgr); - ctx.send(Command::StartInstall(DownloadComplete { - update_id: "1".to_string(), - update_image: "/tmp/1".to_string(), - signature: "".to_string() - })); + ctx.send(Command::StartInstall("1".to_string())); assert_rx(erx, &[ + Event::InstallingUpdate("1".to_string()), Event::InstallFailed( UpdateReport::single("1".to_string(), UpdateResultCode::INSTALL_FAILED, "failed".to_string()) ) diff --git a/src/main.rs b/src/main.rs index 61fa02a..9821b2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,71 +9,37 @@ extern crate rustc_serialize; #[macro_use] extern crate sota; extern crate time; -use chan::{Sender, Receiver}; +use chan::{Sender, Receiver, WaitGroup}; use chan_signal::Signal; use env_logger::LogBuilder; use getopts::Options; use log::{LogLevelFilter, LogRecord}; -use std::env; +use std::{env, process, thread}; use std::collections::HashMap; use std::path::Path; use std::sync::{Arc, Mutex}; use std::time::Duration; -use sota::datatype::{Command, Config, Event, SystemInfo}; +use sota::datatype::{Command, Config, Event}; use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket}; use sota::broadcast::Broadcast; use sota::http::{AuthClient, set_ca_certificates}; use sota::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, GlobalInterpreter}; -use sota::package_manager::PackageManager; use sota::rvi::{Edge, Services}; macro_rules! exit { - ($fmt:expr, $($arg:tt)*) => {{ + ($code:expr, $fmt:expr, $($arg:tt)*) => {{ print!(concat!($fmt, "\n"), $($arg)*); - std::process::exit(1); + process::exit($code); }} } -fn start_signal_handler(signals: Receiver<Signal>) { - loop { - match signals.recv() { - Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0), - _ => () - } - } -} - -fn start_update_poller(interval: u64, itx: Sender<Interpret>) { - let (etx, erx) = chan::async::<Event>(); - let tick = chan::tick(Duration::from_secs(interval)); - loop { - let _ = tick.recv(); - itx.send(Interpret { - command: Command::GetNewUpdates, - response_tx: Some(Arc::new(Mutex::new(etx.clone()))) - }); - let _ = erx.recv(); - } -} - -fn send_startup_commands(config: &Config, ctx: &Sender<Command>) { - ctx.send(Command::Authenticate(None)); - ctx.send(Command::RefreshSystemInfo(true)); - - if config.device.package_manager != PackageManager::Off { - config.device.package_manager.installed_packages().map(|packages| { - ctx.send(Command::SendInstalledPackages(packages)); - }).unwrap_or_else(|err| exit!("Couldn't get list of packages: {}", err)); - } -} - fn main() { - setup_logging(); + let version = start_logging(); + let config = build_config(&version); - let config = build_config(); set_ca_certificates(Path::new(&config.device.certificates_path)); let (etx, erx) = chan::async::<Event>(); @@ -81,16 +47,25 @@ fn main() { let (itx, irx) = chan::async::<Interpret>(); let mut broadcast = Broadcast::new(erx); - send_startup_commands(&config, &ctx); + let wg = WaitGroup::new(); + + ctx.send(Command::Authenticate(None)); crossbeam::scope(|scope| { - // Must subscribe to the signal before spawning ANY other threads + // subscribe to signals first let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]); scope.spawn(move || start_signal_handler(signals)); - let poll_tick = config.device.polling_interval; - let poll_itx = itx.clone(); - scope.spawn(move || start_update_poller(poll_tick, poll_itx)); + if config.core.polling { + let poll_tick = config.core.polling_sec; + let poll_itx = itx.clone(); + let poll_wg = wg.clone(); + scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg)); + } + + // + // start gateways + // if config.gateway.console { let cons_itx = itx.clone(); @@ -99,7 +74,7 @@ fn main() { } if config.gateway.dbus { - let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway")); + let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for dbus gateway")); let dbus_itx = itx.clone(); let dbus_sub = broadcast.subscribe(); let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() }; @@ -109,20 +84,21 @@ fn main() { if config.gateway.http { let http_itx = itx.clone(); let http_sub = broadcast.subscribe(); - let mut http = Http { server: config.network.http_server.clone() }; + let mut http = Http { server: *config.network.http_server }; scope.spawn(move || http.start(http_itx, http_sub)); } - let mut rvi = None; - if config.gateway.rvi { - let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway")); - let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway")); + let rvi_services = if config.gateway.rvi { + let _ = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for rvi gateway")); + let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!(1, "{}", "rvi config required for rvi gateway")); let rvi_edge = config.network.rvi_edge_server.clone(); let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone()); let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone()); scope.spawn(move || edge.start()); - rvi = Some(services); - } + Some(services) + } else { + None + }; if config.gateway.socket { let socket_itx = itx.clone(); @@ -142,48 +118,83 @@ fn main() { scope.spawn(move || ws.start(ws_itx, ws_sub)); } + // + // start interpreters + // + let event_sub = broadcast.subscribe(); let event_ctx = ctx.clone(); let event_mgr = config.device.package_manager.clone(); + let event_sys = config.device.system_info.clone(); + let event_wg = wg.clone(); scope.spawn(move || EventInterpreter { - package_manager: event_mgr - }.run(event_sub, event_ctx)); + pacman: event_mgr, + sysinfo: event_sys, + }.run(event_sub, event_ctx, event_wg)); let cmd_itx = itx.clone(); - scope.spawn(move || CommandInterpreter.run(crx, cmd_itx)); + let cmd_wg = wg.clone(); + scope.spawn(move || CommandInterpreter.run(crx, cmd_itx, cmd_wg)); scope.spawn(move || GlobalInterpreter { config: config, token: None, http_client: Box::new(AuthClient::default()), - rvi: rvi, - loopback_tx: itx, - }.run(irx, etx)); + rvi: rvi_services, + }.run(irx, etx, wg)); scope.spawn(move || broadcast.start()); }); } -fn setup_logging() { - let version = option_env!("SOTA_VERSION").unwrap_or("?"); +fn start_logging() -> String { + let version = option_env!("SOTA_VERSION").unwrap_or("unknown"); + let mut builder = LogBuilder::new(); builder.format(move |record: &LogRecord| { let timestamp = format!("{}", time::now_utc().rfc3339()); format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args()) }); builder.filter(Some("hyper"), LogLevelFilter::Info); + builder.parse(&env::var("RUST_LOG").unwrap_or("INFO".to_string())); + builder.init().expect("builder already initialized"); + + version.to_string() +} - let _ = env::var("RUST_LOG").map(|level| builder.parse(&level)); - builder.init().expect("env_logger::init() called twice, blame the programmers."); +fn start_signal_handler(signals: Receiver<Signal>) { + loop { + match signals.recv() { + Some(Signal::INT) | Some(Signal::TERM) => process::exit(0), + _ => () + } + } } -fn build_config() -> Config { +fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) { + info!("Polling for new updates every {} seconds.", interval); + let (etx, erx) = chan::async::<Event>(); + let wait = Duration::from_secs(interval); + loop { + wg.wait(); // wait until not busy + thread::sleep(wait); // then wait `interval` seconds + itx.send(Interpret { + command: Command::GetUpdateRequests, + response_tx: Some(Arc::new(Mutex::new(etx.clone()))) + }); // then request new updates + let _ = erx.recv(); // then wait for the response + } +} + +fn build_config(version: &str) -> Config { let args = env::args().collect::<Vec<String>>(); let program = args[0].clone(); let mut opts = Options::new(); - opts.optflag("h", "help", "print this help menu"); - opts.optopt("", "config", "change config path", "PATH"); + opts.optflag("h", "help", "print this help menu then quit"); + opts.optflag("p", "print", "print the parsed config then quit"); + opts.optflag("v", "version", "print the version then quit"); + opts.optopt("c", "config", "change config path", "PATH"); opts.optopt("", "auth-server", "change the auth server", "URL"); opts.optopt("", "auth-client-id", "change the auth client id", "ID"); @@ -191,6 +202,8 @@ fn build_config() -> Config { opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH"); opts.optopt("", "core-server", "change the core server", "URL"); + opts.optopt("", "core-polling", "toggle polling the core server for updates", "BOOL"); + opts.optopt("", "core-polling-sec", "change the core polling interval", "SECONDS"); opts.optopt("", "dbus-name", "change the dbus registration name", "NAME"); opts.optopt("", "dbus-path", "change the dbus path", "PATH"); @@ -203,7 +216,6 @@ fn build_config() -> Config { opts.optopt("", "device-vin", "change the device vin", "VIN"); opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH"); opts.optopt("", "device-package-manager", "change the package manager", "MANAGER"); - opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL"); opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH"); opts.optopt("", "device-system-info", "change the system information command", "PATH"); @@ -225,25 +237,37 @@ fn build_config() -> Config { opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT"); let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string())); - if matches.opt_present("h") { - exit!("{}", opts.usage(&format!("Usage: {} [options]", program))); + + if matches.opt_present("help") { + exit!(0, "{}", opts.usage(&format!("Usage: {} [options]", program))); + } else if matches.opt_present("version") { + exit!(0, "{}", version); } - let config_file = matches.opt_str("config").unwrap_or_else(|| { - env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided.")) - }); - let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err)); + let mut config = match matches.opt_str("config").or(env::var("SOTA_CONFIG").ok()) { + Some(file) => Config::load(&file).unwrap_or_else(|err| exit!(1, "{}", err)), + None => { + warn!("No config file given. Falling back to defaults."); + Config::default() + } + }; config.auth.as_mut().map(|auth_cfg| { matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id); matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret); matches.opt_str("auth-server").map(|text| { - auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err)); + auth_cfg.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid auth-server URL: {}", err)); }); }); matches.opt_str("core-server").map(|text| { - config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err)); + config.core.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid core-server URL: {}", err)); + }); + matches.opt_str("core-polling").map(|polling| { + config.core.polling = polling.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling boolean: {}", err)); + }); + matches.opt_str("core-polling-sec").map(|secs| { + config.core.polling_sec = secs.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling-sec: {}", err)); }); config.dbus.as_mut().map(|dbus_cfg| { @@ -253,7 +277,7 @@ fn build_config() -> Config { matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr); matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path); matches.opt_str("dbus-timeout").map(|timeout| { - dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err)); + dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid dbus-timeout: {}", err)); }); }); @@ -261,48 +285,55 @@ fn build_config() -> Config { matches.opt_str("device-vin").map(|vin| config.device.vin = vin); matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path); matches.opt_str("device-package-manager").map(|text| { - config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err)); - }); - matches.opt_str("device-polling-interval").map(|interval| { - config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err)); + config.device.package_manager = text.parse().unwrap_or_else(|err| exit!(1, "Invalid device-package-manager: {}", err)); }); matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs); - matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(cmd)); + matches.opt_str("device-system-info").map(|cmd| { + config.device.system_info = if cmd.len() > 0 { Some(cmd) } else { None } + }); matches.opt_str("gateway-console").map(|console| { - config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err)); + config.gateway.console = console.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-console boolean: {}", err)); }); matches.opt_str("gateway-dbus").map(|dbus| { - config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err)); + config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-dbus boolean: {}", err)); }); matches.opt_str("gateway-http").map(|http| { - config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err)); + config.gateway.http = http.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-http boolean: {}", err)); }); matches.opt_str("gateway-rvi").map(|rvi| { - config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err)); + config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-rvi boolean: {}", err)); }); matches.opt_str("gateway-socket").map(|socket| { - config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err)); + config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-socket boolean: {}", err)); }); matches.opt_str("gateway-websocket").map(|websocket| { - config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err)); + config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-websocket boolean: {}", err)); }); - matches.opt_str("network-http-server").map(|server| config.network.http_server = server); - matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server); + matches.opt_str("network-http-server").map(|addr| { + config.network.http_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-http-server: {}", err)); + }); + matches.opt_str("network-rvi-edge-server").map(|addr| { + config.network.rvi_edge_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-rvi-edge-server: {}", err)); + }); matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path); matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path); matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server); config.rvi.as_mut().map(|rvi_cfg| { matches.opt_str("rvi-client").map(|url| { - rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err)); + rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-client URL: {}", err)); }); matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir); matches.opt_str("rvi-timeout").map(|timeout| { - rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err))); + rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-timeout: {}", err))); }); }); + if matches.opt_present("print") { + exit!(0, "{:#?}", config); + } + config } diff --git a/src/oauth2.rs b/src/oauth2.rs index 0c5f152..e34e4c2 100644 --- a/src/oauth2.rs +++ b/src/oauth2.rs @@ -1,16 +1,19 @@ use rustc_serialize::json; use datatype::{AccessToken, Error, Url}; -use http::Client; +use http::{Client, Response}; /// Authenticate with the specified OAuth2 server to retrieve a new `AccessToken`. pub fn authenticate(server: Url, client: &Client) -> Result<AccessToken, Error> { debug!("authenticating at {}", server); - let resp_rx = client.post(server, None); + let resp_rx = client.post(server, Some(br#"grant_type=client_credentials"#.to_vec())); let resp = resp_rx.recv().expect("no authenticate response received"); - let data = try!(resp); - let body = try!(String::from_utf8(data)); + let body = match resp { + Response::Success(data) => try!(String::from_utf8(data.body)), + Response::Failed(data) => return Err(Error::from(data)), + Response::Error(err) => return Err(err) + }; Ok(try!(json::decode(&body))) } diff --git a/src/package_manager/deb.rs b/src/package_manager/deb.rs index bba86e6..845c2b8 100644 --- a/src/package_manager/deb.rs +++ b/src/package_manager/deb.rs @@ -5,7 +5,7 @@ use package_manager::package_manager::{InstallOutcome, parse_package}; /// Returns a list of installed DEB packages with -/// `dpkg-query -f='${Package} ${Version}\n -W`. +/// `dpkg-query -f='${Package} ${Version}\n' -W`. pub fn installed_packages() -> Result<Vec<Package>, Error> { Command::new("dpkg-query").arg("-f='${Package} ${Version}\n'").arg("-W") .output() diff --git a/src/package_manager/package_manager.rs b/src/package_manager/package_manager.rs index 09556a0..173a636 100644 --- a/src/package_manager/package_manager.rs +++ b/src/package_manager/package_manager.rs @@ -49,6 +49,12 @@ impl PackageManager { } } + /// Indicates whether a specific package is installed on the device. + pub fn is_installed(&self, package: &Package) -> bool { + self.installed_packages().map(|packages| packages.contains(package)) + .unwrap_or_else(|err| { error!("couldn't get a list of packages: {}", err); false }) + } + /// Returns a string representation of the package manager's extension. pub fn extension(&self) -> String { match *self { diff --git a/src/package_manager/rpm.rs b/src/package_manager/rpm.rs index 99aacbf..f2c8f2a 100644 --- a/src/package_manager/rpm.rs +++ b/src/package_manager/rpm.rs @@ -5,7 +5,7 @@ use package_manager::package_manager::{InstallOutcome, parse_package}; /// Returns a list of installed RPM packages with -/// `rpm -qa ==queryformat ${NAME} ${VERSION}\n`. +/// `rpm -qa --queryformat ${NAME} ${VERSION}\n`. pub fn installed_packages() -> Result<Vec<Package>, Error> { Command::new("rpm").arg("-qa").arg("--queryformat").arg("%{NAME} %{VERSION}\n") .output() @@ -23,17 +23,22 @@ pub fn installed_packages() -> Result<Vec<Package>, Error> { }) } -/// Installs a new RPM package. +/// Installs a new RPM package with `rpm -Uvh --force <package-path>`. pub fn install_package(path: &str) -> Result<InstallOutcome, InstallOutcome> { let output = try!(Command::new("rpm").arg("-Uvh").arg("--force").arg(path) .output() - .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e)))); + .map_err(|err| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", err)))); let stdout = String::from_utf8_lossy(&output.stdout).into_owned(); let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); match output.status.code() { - Some(0) => Ok((UpdateResultCode::OK, stdout)), + Some(0) => { + let _ = Command::new("sync").status() + .map_err(|err| error!("couldn't run 'sync': {}", err)); + Ok((UpdateResultCode::OK, stdout)) + } + _ => { let out = format!("stdout: {}\nstderr: {}", stdout, stderr); if (&stderr).contains("already installed") { diff --git a/src/rvi/edge.rs b/src/rvi/edge.rs index cadea74..85d46df 100644 --- a/src/rvi/edge.rs +++ b/src/rvi/edge.rs @@ -4,25 +4,24 @@ use hyper::server::{Server as HyperServer, Request as HyperRequest}; use rustc_serialize::json; use rustc_serialize::json::Json; use std::{mem, str}; -use std::net::ToSocketAddrs; -use datatype::{RpcRequest, RpcOk, RpcErr, Url}; +use datatype::{RpcRequest, RpcOk, RpcErr, SocketAddr, Url}; use http::{Server, ServerHandler}; use super::services::Services; /// The HTTP server endpoint for `RVI` client communication. pub struct Edge { - rvi_edge: Url, + rvi_edge: SocketAddr, services: Services, } impl Edge { /// Create a new `Edge` by registering each `RVI` service. - pub fn new(mut services: Services, rvi_edge: String, rvi_client: Url) -> Self { + pub fn new(mut services: Services, rvi_edge: SocketAddr, rvi_client: Url) -> Self { services.register_services(|service| { let req = RpcRequest::new("register_service", RegisterServiceRequest { - network_address: rvi_edge.clone(), + network_address: format!("http://{}", rvi_edge), service: service.to_string(), }); let resp = req.send(rvi_client.clone()) @@ -32,14 +31,12 @@ impl Edge { rpc_ok.result.expect("expected rpc_ok result").service }); - Edge { rvi_edge: rvi_edge.parse().expect("couldn't parse edge server as url"), services: services } + Edge { rvi_edge: rvi_edge, services: services } } /// Start the HTTP server listening for incoming RVI client connections. pub fn start(&mut self) { - let mut addrs = self.rvi_edge.to_socket_addrs() - .unwrap_or_else(|err| panic!("couldn't parse edge url: {}", err)); - let server = HyperServer::http(&addrs.next().expect("no SocketAddr found")) + let server = HyperServer::http(&*self.rvi_edge) .unwrap_or_else(|err| panic!("couldn't start rvi edge server: {}", err)); let (addr, server) = server.handle(move |_| EdgeHandler::new(self.services.clone())).unwrap(); info!("RVI server edge listening at http://{}.", addr); @@ -61,7 +58,6 @@ struct RegisterServiceResponse { } - struct EdgeHandler { services: Services, resp_code: StatusCode, diff --git a/src/rvi/parameters.rs b/src/rvi/parameters.rs index 1fa1a87..e23ce9d 100644 --- a/src/rvi/parameters.rs +++ b/src/rvi/parameters.rs @@ -1,4 +1,3 @@ -use std::str; use std::sync::Mutex; use datatype::{ChunkReceived, Event, DownloadComplete, UpdateRequestId, UpdateAvailable}; @@ -23,7 +22,7 @@ pub struct Notify { impl Parameter for Notify { fn handle(&self, remote: &Mutex<RemoteServices>, _: &Mutex<Transfers>) -> Result<Option<Event>, String> { remote.lock().unwrap().backend = Some(self.services.clone()); - Ok(Some(Event::NewUpdateAvailable(self.update_available.clone()))) + Ok(Some(Event::UpdateAvailable(self.update_available.clone()))) } } diff --git a/src/sota.rs b/src/sota.rs index 3dec33d..64abacd 100644 --- a/src/sota.rs +++ b/src/sota.rs @@ -1,11 +1,11 @@ use rustc_serialize::json; +use std::{fs, io}; use std::fs::File; -use std::io; use std::path::PathBuf; -use datatype::{Config, DeviceReport, DownloadComplete, Error, Package, - PendingUpdateRequest, UpdateRequestId, UpdateReport, Url}; -use http::Client; +use datatype::{Config, DownloadComplete, Error, Package, + UpdateReport, UpdateRequest, UpdateRequestId, Url}; +use http::{Client, Response}; /// Encapsulate the client configuration and HTTP client used for @@ -22,39 +22,47 @@ impl<'c, 'h> Sota<'c, 'h> { } /// Takes a path and returns a new endpoint of the format - /// `<Core server>/api/v1/device_updates/<uuid>/<path>`. - pub fn endpoint(&self, path: &str) -> Url { - let endpoint = if path.is_empty() { - format!("/api/v1/device_updates/{}", self.config.device.uuid) - } else { - format!("/api/v1/device_updates/{}/{}", self.config.device.uuid, path) - }; + /// `<Core server>/api/v1/mydevice/<device-id>$path`. + fn endpoint(&self, path: &str) -> Url { + let endpoint = format!("/api/v1/mydevice/{}{}", self.config.device.uuid, path); self.config.core.server.join(&endpoint).expect("couldn't build endpoint url") } - /// Query the Core server to identify any new package updates available. - pub fn get_pending_updates(&mut self) -> Result<Vec<PendingUpdateRequest>, Error> { - let resp_rx = self.client.get(self.endpoint(""), None); - let resp = resp_rx.recv().expect("no get_package_updates response received"); - let data = try!(resp); - let text = try!(String::from_utf8(data)); - Ok(try!(json::decode::<Vec<PendingUpdateRequest>>(&text))) + /// Returns the path to a package on the device. + fn package_path(&self, id: UpdateRequestId) -> Result<String, Error> { + let mut path = PathBuf::new(); + path.push(&self.config.device.packages_dir); + path.push(id); + Ok(try!(path.to_str().ok_or(Error::Parse(format!("Path is not valid UTF-8: {:?}", path)))).to_string()) } - /// Download a specific update from the Core server. - pub fn download_update(&mut self, id: UpdateRequestId) -> Result<DownloadComplete, Error> { - let resp_rx = self.client.get(self.endpoint(&format!("{}/download", id)), None); - let resp = resp_rx.recv().expect("no download_package_update response received"); - let data = try!(resp); + /// Query the Core server for any pending or in-flight package updates. + pub fn get_update_requests(&mut self) -> Result<Vec<UpdateRequest>, Error> { + let resp_rx = self.client.get(self.endpoint("/updates"), None); + let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't get new updates".to_string()))); + let data = match resp { + Response::Success(data) => data, + Response::Failed(data) => return Err(Error::from(data)), + Response::Error(err) => return Err(err) + }; - let mut path = PathBuf::new(); - path.push(&self.config.device.packages_dir); - path.push(id.clone()); // TODO: Use Content-Disposition filename from request? - let mut file = try!(File::create(path.as_path())); + let text = try!(String::from_utf8(data.body)); + Ok(try!(json::decode::<Vec<UpdateRequest>>(&text))) + } - let _ = io::copy(&mut &*data, &mut file); - let path = try!(path.to_str().ok_or(Error::Parse(format!("Path is not valid UTF-8: {:?}", path)))); + /// Download a specific update from the Core server. + pub fn download_update(&mut self, id: UpdateRequestId) -> Result<DownloadComplete, Error> { + let resp_rx = self.client.get(self.endpoint(&format!("/updates/{}/download", id)), None); + let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't download update".to_string()))); + let data = match resp { + Response::Success(data) => data, + Response::Failed(data) => return Err(Error::from(data)), + Response::Error(err) => return Err(err) + }; + let path = try!(self.package_path(id.clone())); + let mut file = try!(File::create(&path)); + let _ = io::copy(&mut &*data.body, &mut file); Ok(DownloadComplete { update_id: id, update_image: path.to_string(), @@ -63,46 +71,54 @@ impl<'c, 'h> Sota<'c, 'h> { } /// Install an update using the package manager. - pub fn install_update(&mut self, download: DownloadComplete) -> Result<UpdateReport, UpdateReport> { + pub fn install_update(&mut self, id: UpdateRequestId) -> Result<UpdateReport, UpdateReport> { let ref pacman = self.config.device.package_manager; - pacman.install_package(&download.update_image).and_then(|(code, output)| { - Ok(UpdateReport::single(download.update_id.clone(), code, output)) + let path = self.package_path(id.clone()).expect("install_update expects a valid path"); + pacman.install_package(&path).and_then(|(code, output)| { + let _ = fs::remove_file(&path).unwrap_or_else(|err| error!("couldn't remove installed package: {}", err)); + Ok(UpdateReport::single(id.clone(), code, output)) }).or_else(|(code, output)| { - Err(UpdateReport::single(download.update_id.clone(), code, output)) + Err(UpdateReport::single(id.clone(), code, output)) }) } - /// Get a list of the currently installed packages from the package manager. - pub fn get_installed_packages(&mut self) -> Result<Vec<Package>, Error> { - Ok(try!(self.config.device.package_manager.installed_packages())) - } - /// Send a list of the currently installed packages to the Core server. pub fn send_installed_packages(&mut self, packages: &Vec<Package>) -> Result<(), Error> { let body = try!(json::encode(packages)); - let resp_rx = self.client.put(self.endpoint("installed"), Some(body.into_bytes())); - let _ = resp_rx.recv().expect("no update_installed_packages response received") - .map_err(|err| error!("update_installed_packages failed: {}", err)); - Ok(()) + let resp_rx = self.client.put(self.endpoint("/installed"), Some(body.into_bytes())); + let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't send installed packages".to_string()))); + + match resp { + Response::Success(_) => Ok(()), + Response::Failed(data) => Err(Error::from(data)), + Response::Error(err) => Err(err) + } } /// Send the outcome of a package update to the Core server. pub fn send_update_report(&mut self, update_report: &UpdateReport) -> Result<(), Error> { - let report = DeviceReport::new(&self.config.device.uuid, update_report); - let body = try!(json::encode(&report)); - let url = self.endpoint(report.device); + let body = try!(json::encode(&update_report.operation_results)); + let url = self.endpoint(&format!("/updates/{}", update_report.update_id)); let resp_rx = self.client.post(url, Some(body.into_bytes())); - let resp = resp_rx.recv().expect("no send_install_report response received"); - let _ = try!(resp); - Ok(()) + let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't send update report".to_string()))); + + match resp { + Response::Success(_) => Ok(()), + Response::Failed(data) => Err(Error::from(data)), + Response::Error(err) => Err(err) + } } /// Send system information from the device to the Core server. pub fn send_system_info(&mut self, body: &str) -> Result<(), Error> { - let resp_rx = self.client.put(self.endpoint("system_info"), Some(body.as_bytes().to_vec())); - let resp = resp_rx.recv().expect("no send_system_info response received"); - let _ = try!(resp); - Ok(()) + let resp_rx = self.client.put(self.endpoint("/system_info"), Some(body.as_bytes().to_vec())); + let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't send system info".to_string()))); + + match resp { + Response::Success(_) => Ok(()), + Response::Failed(data) => Err(Error::from(data)), + Response::Error(err) => Err(err) + } } } @@ -112,19 +128,20 @@ mod tests { use rustc_serialize::json; use super::*; - use datatype::{Config, Package, PendingUpdateRequest}; + use datatype::{Config, Package, UpdateRequest, UpdateRequestStatus}; use http::TestClient; #[test] - fn test_get_pending_updates() { - let pending_update = PendingUpdateRequest { + fn test_get_update_requests() { + let pending_update = UpdateRequest { requestId: "someid".to_string(), - installPos: 0, + status: UpdateRequestStatus::Pending, packageId: Package { name: "fake-pkg".to_string(), version: "0.1.1".to_string() }, + installPos: 0, createdAt: "2010-01-01".to_string() }; @@ -134,7 +151,7 @@ mod tests { client: &mut TestClient::from(vec![json.to_string()]), }; - let updates: Vec<PendingUpdateRequest> = sota.get_pending_updates().unwrap(); + let updates: Vec<UpdateRequest> = sota.get_update_requests().unwrap(); let ids: Vec<String> = updates.iter().map(|p| p.requestId.clone()).collect(); assert_eq!(ids, vec!["someid".to_string()]) } |