summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/datatype/auth.rs29
-rw-r--r--src/datatype/command.rs123
-rw-r--r--src/datatype/config.rs445
-rw-r--r--src/datatype/error.rs11
-rw-r--r--src/datatype/event.rs29
-rw-r--r--src/datatype/json_rpc.rs10
-rw-r--r--src/datatype/mod.rs17
-rw-r--r--src/datatype/network.rs (renamed from src/datatype/url.rs)59
-rw-r--r--src/datatype/shell.rs12
-rw-r--r--src/datatype/system_info.rs46
-rw-r--r--src/datatype/update_request.rs (renamed from src/datatype/package.rs)59
-rw-r--r--src/gateway/dbus.rs27
-rw-r--r--src/gateway/http.rs31
-rw-r--r--src/gateway/socket.rs55
-rw-r--r--src/gateway/websocket.rs26
-rw-r--r--src/http/auth_client.rs164
-rw-r--r--src/http/http_client.rs44
-rw-r--r--src/http/mod.rs2
-rw-r--r--src/http/test_client.rs10
-rw-r--r--src/interpreter.rs202
-rw-r--r--src/main.rs215
-rw-r--r--src/oauth2.rs11
-rw-r--r--src/package_manager/deb.rs2
-rw-r--r--src/package_manager/package_manager.rs6
-rw-r--r--src/package_manager/rpm.rs13
-rw-r--r--src/rvi/edge.rs16
-rw-r--r--src/rvi/parameters.rs3
-rw-r--r--src/sota.rs131
28 files changed, 1134 insertions, 664 deletions
diff --git a/src/datatype/auth.rs b/src/datatype/auth.rs
index cbfd097..83c872a 100644
--- a/src/datatype/auth.rs
+++ b/src/datatype/auth.rs
@@ -5,7 +5,7 @@ use std::borrow::Cow;
#[derive(Clone, Debug)]
pub enum Auth {
None,
- Credentials(ClientId, ClientSecret),
+ Credentials(ClientCredentials),
Token(AccessToken),
}
@@ -16,8 +16,15 @@ impl<'a> Into<Cow<'a, Auth>> for Auth {
}
-/// For storage of the returned access token data following a successful
-/// authentication.
+/// Encapsulates the client id and secret used during authentication.
+#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
+pub struct ClientCredentials {
+ pub client_id: String,
+ pub client_secret: String,
+}
+
+
+/// Stores the returned access token data following a successful authentication.
#[derive(RustcDecodable, Debug, PartialEq, Clone, Default)]
pub struct AccessToken {
pub access_token: String,
@@ -31,19 +38,3 @@ impl<'a> Into<Cow<'a, AccessToken>> for AccessToken {
Cow::Owned(self)
}
}
-
-
-/// Encapsulates a `String` type for use in `Auth::Credentials`
-#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
-pub struct ClientId(pub String);
-
-/// Encapsulates a `String` type for use in `Auth::Credentials`
-#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
-pub struct ClientSecret(pub String);
-
-/// Encapsulates the client id and secret used during authentication.
-#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
-pub struct ClientCredentials {
- pub client_id: ClientId,
- pub client_secret: ClientSecret,
-}
diff --git a/src/datatype/command.rs b/src/datatype/command.rs
index d449bb6..c88e8d5 100644
--- a/src/datatype/command.rs
+++ b/src/datatype/command.rs
@@ -3,9 +3,8 @@ use std::str;
use std::str::FromStr;
use nom::{IResult, space, eof};
-use datatype::{ClientCredentials, ClientId, ClientSecret, DownloadComplete, Error,
- InstalledSoftware, Package, UpdateReport, UpdateRequestId,
- UpdateResultCode};
+use datatype::{ClientCredentials, Error, InstalledSoftware, Package, UpdateReport,
+ UpdateRequestId, UpdateResultCode};
/// System-wide commands that are sent to the interpreter.
@@ -16,29 +15,36 @@ pub enum Command {
/// Shutdown the client immediately.
Shutdown,
- /// Check for any new updates.
- GetNewUpdates,
+ /// Check for any pending or in-flight updates.
+ GetUpdateRequests,
+
/// List the installed packages on the system.
ListInstalledPackages,
- /// Get the latest system information, and optionally publish it to Core.
- RefreshSystemInfo(bool),
+ /// List the system information.
+ ListSystemInfo,
- /// Start downloading one or more updates.
- StartDownload(Vec<UpdateRequestId>),
- /// Start installing an update
- StartInstall(DownloadComplete),
+ /// Start downloading an update.
+ StartDownload(UpdateRequestId),
+ /// Start installing an update.
+ StartInstall(UpdateRequestId),
/// Send a list of packages to the Core server.
SendInstalledPackages(Vec<Package>),
/// Send a list of all packages and firmware to the Core server.
SendInstalledSoftware(InstalledSoftware),
+ /// Send the system information to the Core server.
+ SendSystemInfo,
/// Send a package update report to the Core server.
SendUpdateReport(UpdateReport),
}
impl Display for Command {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
- write!(f, "{:?}", self)
+ let text = match *self {
+ Command::SendInstalledPackages(_) => "SendInstalledPackages(...)".to_string(),
+ _ => format!("{:?}", self)
+ };
+ write!(f, "{}", text)
}
}
@@ -59,24 +65,26 @@ named!(command <(Command, Vec<&str>)>, chain!(
~ cmd: alt!(
alt_complete!(tag!("Authenticate") | tag!("auth"))
=> { |_| Command::Authenticate(None) }
- | alt_complete!(tag!("GetNewUpdates") | tag!("new"))
- => { |_| Command::GetNewUpdates }
+ | alt_complete!(tag!("GetUpdateRequests") | tag!("getreq"))
+ => { |_| Command::GetUpdateRequests }
| alt_complete!(tag!("ListInstalledPackages") | tag!("ls"))
=> { |_| Command::ListInstalledPackages }
- | alt_complete!(tag!("RefreshSystemInfo") | tag!("info"))
- => { |_| Command::RefreshSystemInfo(false) }
+ | alt_complete!(tag!("ListSystemInfo") | tag!("info"))
+ => { |_| Command::ListSystemInfo }
| alt_complete!(tag!("Shutdown") | tag!("shutdown"))
=> { |_| Command::Shutdown }
| alt_complete!(tag!("SendInstalledPackages") | tag!("sendpack"))
=> { |_| Command::SendInstalledPackages(Vec::new()) }
| alt_complete!(tag!("SendInstalledSoftware") | tag!("sendinst"))
=> { |_| Command::SendInstalledSoftware(InstalledSoftware::default()) }
+ | alt_complete!(tag!("SendSystemInfo") | tag!("sendinfo"))
+ => { |_| Command::SendSystemInfo }
| alt_complete!(tag!("SendUpdateReport") | tag!("sendup"))
=> { |_| Command::SendUpdateReport(UpdateReport::default()) }
| alt_complete!(tag!("StartDownload") | tag!("dl"))
- => { |_| Command::StartDownload(Vec::new()) }
+ => { |_| Command::StartDownload("".to_string()) }
| alt_complete!(tag!("StartInstall") | tag!("inst"))
- => { |_| Command::StartInstall(DownloadComplete::default()) }
+ => { |_| Command::StartInstall("".to_string()) }
)
~ args: arguments
~ alt!(eof | tag!("\r") | tag!("\n") | tag!(";")),
@@ -103,14 +111,15 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
0 => Ok(Command::Authenticate(None)),
1 => Err(Error::Command("usage: auth <client-id> <client-secret>".to_string())),
2 => Ok(Command::Authenticate(Some(ClientCredentials {
- client_id: ClientId(args[0].to_string()),
- client_secret: ClientSecret(args[1].to_string())}))),
+ client_id: args[0].to_string(),
+ client_secret: args[1].to_string()
+ }))),
_ => Err(Error::Command(format!("unexpected Authenticate args: {:?}", args))),
},
- Command::GetNewUpdates => match args.len() {
- 0 => Ok(Command::GetNewUpdates),
- _ => Err(Error::Command(format!("unexpected GetNewUpdates args: {:?}", args))),
+ Command::GetUpdateRequests => match args.len() {
+ 0 => Ok(Command::GetUpdateRequests),
+ _ => Err(Error::Command(format!("unexpected GetUpdateRequests args: {:?}", args))),
},
Command::ListInstalledPackages => match args.len() {
@@ -118,16 +127,9 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
_ => Err(Error::Command(format!("unexpected ListInstalledPackages args: {:?}", args))),
},
- Command::RefreshSystemInfo(_) => match args.len() {
- 0 => Ok(Command::RefreshSystemInfo(false)),
- 1 => {
- if let Ok(b) = args[0].parse::<bool>() {
- Ok(Command::RefreshSystemInfo(b))
- } else {
- Err(Error::Command("couldn't parse 1st argument as boolean".to_string()))
- }
- }
- _ => Err(Error::Command(format!("unexpected RefreshSystemInfo args: {:?}", args))),
+ Command::ListSystemInfo => match args.len() {
+ 0 => Ok(Command::ListSystemInfo),
+ _ => Err(Error::Command(format!("unexpected ListSystemInfo args: {:?}", args))),
},
Command::SendInstalledPackages(_) => match args.len() {
@@ -150,6 +152,11 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
_ => Err(Error::Command(format!("unexpected SendInstalledSoftware args: {:?}", args))),
},
+ Command::SendSystemInfo => match args.len() {
+ 0 => Ok(Command::SendSystemInfo),
+ _ => Err(Error::Command(format!("unexpected SendSystemInfo args: {:?}", args))),
+ },
+
Command::SendUpdateReport(_) => match args.len() {
0 | 1 => Err(Error::Command("usage: sendup <update-id> <result-code>".to_string())),
2 => {
@@ -168,12 +175,14 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
},
Command::StartDownload(_) => match args.len() {
- 0 => Err(Error::Command("usage: dl [<id>]".to_string())),
- _ => Ok(Command::StartDownload(args.iter().map(|arg| String::from(*arg)).collect())),
+ 0 => Err(Error::Command("usage: dl <id>".to_string())),
+ 1 => Ok(Command::StartDownload(args[0].to_string())),
+ _ => Err(Error::Command(format!("unexpected StartInstall args: {:?}", args))),
},
Command::StartInstall(_) => match args.len() {
- // FIXME(PRO-1160): args
+ 0 => Err(Error::Command("usage: inst <id>".to_string())),
+ 1 => Ok(Command::StartInstall(args[0].to_string())),
_ => Err(Error::Command(format!("unexpected StartInstall args: {:?}", args))),
},
@@ -184,8 +193,7 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
#[cfg(test)]
mod tests {
use super::{command, arguments};
- use datatype::{Command, ClientCredentials, ClientId, ClientSecret, Package,
- UpdateReport, UpdateResultCode};
+ use datatype::{Command, ClientCredentials, Package, UpdateReport, UpdateResultCode};
use nom::IResult;
@@ -194,7 +202,7 @@ mod tests {
assert_eq!(command(&b"auth foo bar"[..]),
IResult::Done(&b""[..], (Command::Authenticate(None), vec!["foo", "bar"])));
assert_eq!(command(&b"dl 1"[..]),
- IResult::Done(&b""[..], (Command::StartDownload(Vec::new()), vec!["1"])));
+ IResult::Done(&b""[..], (Command::StartDownload("".to_string()), vec!["1"])));
assert_eq!(command(&b"ls;\n"[..]),
IResult::Done(&b"\n"[..], (Command::ListInstalledPackages, Vec::new())));
}
@@ -216,18 +224,18 @@ mod tests {
assert_eq!("auth".parse::<Command>().unwrap(), Command::Authenticate(None));
assert_eq!("auth user pass".parse::<Command>().unwrap(),
Command::Authenticate(Some(ClientCredentials {
- client_id: ClientId("user".to_string()),
- client_secret: ClientSecret("pass".to_string()),
+ client_id: "user".to_string(),
+ client_secret: "pass".to_string(),
})));
assert!("auth one".parse::<Command>().is_err());
assert!("auth one two three".parse::<Command>().is_err());
}
#[test]
- fn get_new_updates_test() {
- assert_eq!("GetNewUpdates".parse::<Command>().unwrap(), Command::GetNewUpdates);
- assert_eq!("new".parse::<Command>().unwrap(), Command::GetNewUpdates);
- assert!("new old".parse::<Command>().is_err());
+ fn get_update_requests_test() {
+ assert_eq!("GetUpdateRequests".parse::<Command>().unwrap(), Command::GetUpdateRequests);
+ assert_eq!("getreq".parse::<Command>().unwrap(), Command::GetUpdateRequests);
+ assert!("getreq now".parse::<Command>().is_err());
}
#[test]
@@ -238,10 +246,10 @@ mod tests {
}
#[test]
- fn refresh_system_info_test() {
- assert_eq!("RefreshSystemInfo true".parse::<Command>().unwrap(), Command::RefreshSystemInfo(true));
- assert_eq!("info".parse::<Command>().unwrap(), Command::RefreshSystemInfo(false));
- assert!("RefreshSystemInfo 1 2".parse::<Command>().is_err());
+ fn list_system_info_test() {
+ assert_eq!("ListSystemInfo".parse::<Command>().unwrap(), Command::ListSystemInfo);
+ assert_eq!("info".parse::<Command>().unwrap(), Command::ListSystemInfo);
+ assert!("ListSystemInfo 1 2".parse::<Command>().is_err());
assert!("info please".parse::<Command>().is_err());
}
@@ -271,6 +279,14 @@ mod tests {
}
#[test]
+ fn send_system_info_test() {
+ assert_eq!("SendSystemInfo".parse::<Command>().unwrap(), Command::SendSystemInfo);
+ assert_eq!("sendinfo".parse::<Command>().unwrap(), Command::SendSystemInfo);
+ assert!("SendSystemInfo 1 2".parse::<Command>().is_err());
+ assert!("sendinfo please".parse::<Command>().is_err());
+ }
+
+ #[test]
fn send_update_report_test() {
assert_eq!("SendUpdateReport myid OK".parse::<Command>().unwrap(), Command::SendUpdateReport(
UpdateReport::single("myid".to_string(), UpdateResultCode::OK, "".to_string())));
@@ -291,15 +307,16 @@ mod tests {
#[test]
fn start_download_test() {
- assert_eq!("StartDownload this".parse::<Command>().unwrap(),
- Command::StartDownload(vec!["this".to_string()]));
- assert_eq!("dl some more".parse::<Command>().unwrap(),
- Command::StartDownload(vec!["some".to_string(), "more".to_string()]));
+ assert_eq!("StartDownload this".parse::<Command>().unwrap(), Command::StartDownload("this".to_string()));
+ assert_eq!("dl that".parse::<Command>().unwrap(), Command::StartDownload("that".to_string()));
+ assert!("StartDownload this and that".parse::<Command>().is_err());
assert!("dl".parse::<Command>().is_err());
}
#[test]
fn start_install_test() {
+ assert_eq!("StartInstall 123".parse::<Command>().unwrap(), Command::StartInstall("123".to_string()));
+ assert_eq!("inst this".parse::<Command>().unwrap(), Command::StartInstall("this".to_string()));
assert!("StartInstall".parse::<Command>().is_err());
assert!("inst more than one".parse::<Command>().is_err());
}
diff --git a/src/datatype/config.rs b/src/datatype/config.rs
index 5db0f12..ad80c42 100644
--- a/src/datatype/config.rs
+++ b/src/datatype/config.rs
@@ -6,13 +6,13 @@ use std::os::unix::fs::PermissionsExt;
use std::io::prelude::*;
use std::path::Path;
use toml;
-use toml::{Decoder, Parser, Table, Value};
+use toml::{Decoder, Parser, Table};
-use datatype::{Error, SystemInfo, Url};
+use datatype::{Error, SocketAddr, Url};
use package_manager::PackageManager;
-/// An aggregation of all the configuration options parsed at startup.
+/// A container for all parsed configs.
#[derive(Default, PartialEq, Eq, Debug, Clone)]
pub struct Config {
pub auth: Option<AuthConfig>,
@@ -25,6 +25,8 @@ pub struct Config {
}
impl Config {
+ /// Read in a toml configuration file using default values for missing
+ /// sections or fields.
pub fn load(path: &str) -> Result<Config, Error> {
info!("Loading config file: {}", path);
let mut file = try!(File::open(path).map_err(Error::Io));
@@ -33,36 +35,34 @@ impl Config {
Config::parse(&toml)
}
+ /// Parse a toml configuration string using default values for missing
+ /// sections or fields while retaining backwards compatibility.
pub fn parse(toml: &str) -> Result<Config, Error> {
let table = try!(parse_table(&toml));
- let auth_cfg = if let Some(auth) = table.get("auth") {
- let parsed = try!(decode_section(auth.clone()));
- Some(try!(bootstrap_credentials(parsed)))
- } else {
- None
- };
-
- let dbus_cfg = if let Some(dbus) = table.get("dbus") {
- Some(try!(decode_section(dbus.clone())))
- } else {
- None
- };
-
- let rvi_cfg = if let Some(rvi) = table.get("rvi") {
- Some(try!(decode_section(rvi.clone())))
- } else {
- None
- };
+ let mut auth: Option<ParsedAuthConfig> = try!(maybe_parse_section(&table, "auth"));
+ let mut core: ParsedCoreConfig = try!(parse_section(&table, "core"));
+ let mut dbus: Option<ParsedDBusConfig> = try!(maybe_parse_section(&table, "dbus"));
+ let mut device: ParsedDeviceConfig = try!(parse_section(&table, "device"));
+ let mut gateway: ParsedGatewayConfig = try!(parse_section(&table, "gateway"));
+ let mut network: ParsedNetworkConfig = try!(parse_section(&table, "network"));
+ let mut rvi: Option<ParsedRviConfig> = try!(maybe_parse_section(&table, "rvi"));
+
+ if let Some(cfg) = auth {
+ auth = Some(try!(bootstrap_credentials(cfg)));
+ }
+
+ try!(apply_transformations(&mut auth, &mut core, &mut dbus, &mut device,
+ &mut gateway, &mut network, &mut rvi));
Ok(Config {
- auth: auth_cfg,
- core: try!(read_section(&table, "core")),
- dbus: dbus_cfg,
- device: try!(read_section(&table, "device")),
- gateway: try!(read_section(&table, "gateway")),
- network: try!(read_section(&table, "network")),
- rvi: rvi_cfg,
+ auth: auth.map(|mut cfg| cfg.defaultify()),
+ core: core.defaultify(),
+ dbus: dbus.map(|mut cfg| cfg.defaultify()),
+ device: device.defaultify(),
+ gateway: gateway.defaultify(),
+ network: network.defaultify(),
+ rvi: rvi.map(|mut cfg| cfg.defaultify())
})
}
}
@@ -72,15 +72,15 @@ fn parse_table(toml: &str) -> Result<Table, Error> {
Ok(try!(parser.parse().ok_or_else(move || parser.errors)))
}
-fn read_section<T: Decodable>(table: &Table, section: &str) -> Result<T, Error> {
- let part = try!(table.get(section)
- .ok_or_else(|| Error::Parse(format!("invalid section: {}", section))));
- decode_section(part.clone())
+fn parse_section<T: Decodable + Default>(table: &Table, section: &str) -> Result<T, Error> {
+ Ok(try!(maybe_parse_section(table, section)).unwrap_or(T::default()))
}
-fn decode_section<T: Decodable>(section: Value) -> Result<T, Error> {
- let mut decoder = Decoder::new(section);
- Ok(try!(T::decode(&mut decoder)))
+fn maybe_parse_section<T: Decodable>(table: &Table, section: &str) -> Result<Option<T>, Error> {
+ table.get(section).map_or(Ok(None), |sect| {
+ let mut decoder = Decoder::new(sect.clone());
+ Ok(Some(try!(T::decode(&mut decoder))))
+ })
}
@@ -92,8 +92,8 @@ struct CredentialsFile {
// Read AuthConfig values from the credentials file if it exists, or write the
// current AuthConfig values to a new credentials file otherwise.
-fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
- let creds = auth_cfg.credentials_file.clone();
+fn bootstrap_credentials(auth: ParsedAuthConfig) -> Result<ParsedAuthConfig, Error> {
+ let creds = auth.credentials_file.expect("couldn't get credentials_file");
let path = Path::new(&creds);
debug!("bootstrap_credentials: {:?}", path);
@@ -102,14 +102,16 @@ fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
let mut text = String::new();
try!(file.read_to_string(&mut text));
let table = try!(parse_table(&text));
- try!(read_section::<CredentialsFile>(&table, "auth"))
+ let auth = try!(table.get("auth").ok_or(Error::Config("no [auth] section".to_string())));
+ let mut decoder = Decoder::new(auth.clone());
+ try!(CredentialsFile::decode(&mut decoder))
}
Err(ref err) if err.kind() == ErrorKind::NotFound => {
let mut table = Table::new();
let credentials = CredentialsFile {
- client_id: auth_cfg.client_id,
- client_secret: auth_cfg.client_secret
+ client_id: auth.client_id.expect("expected client_id"),
+ client_secret: auth.client_secret.expect("expected client_secret")
};
table.insert("auth".to_string(), toml::encode(&credentials));
@@ -127,16 +129,52 @@ fn bootstrap_credentials(auth_cfg: AuthConfig) -> Result<AuthConfig, Error> {
Err(err) => return Err(Error::Io(err))
};
- Ok(AuthConfig {
- server: auth_cfg.server,
- client_id: credentials.client_id,
- client_secret: credentials.client_secret,
- credentials_file: auth_cfg.credentials_file,
+ Ok(ParsedAuthConfig {
+ server: auth.server,
+ client_id: Some(credentials.client_id),
+ client_secret: Some(credentials.client_secret),
+ credentials_file: Some(creds.clone()),
})
}
-/// A parsed representation of the [auth] configuration section.
+// Apply transformations from old to new config fields for backwards compatibility.
+fn apply_transformations(_: &mut Option<ParsedAuthConfig>,
+ core: &mut ParsedCoreConfig,
+ _: &mut Option<ParsedDBusConfig>,
+ device: &mut ParsedDeviceConfig,
+ _: &mut ParsedGatewayConfig,
+ _: &mut ParsedNetworkConfig,
+ _: &mut Option<ParsedRviConfig>) -> Result<(), Error> {
+
+ match (device.polling_interval, core.polling_sec) {
+ (Some(_), Some(_)) => {
+ return Err(Error::Config("core.polling_sec and device.polling_interval both set".to_string()))
+ }
+
+ (Some(interval), None) => {
+ if interval > 0 {
+ core.polling = Some(true);
+ core.polling_sec = Some(interval);
+ } else {
+ core.polling = Some(false);
+ }
+ }
+
+ _ => ()
+ }
+
+ Ok(())
+}
+
+
+/// Trait used to overwrite any `None` fields in a config with its default value.
+trait Defaultify<T: Default> {
+ fn defaultify(&mut self) -> T;
+}
+
+
+/// The [auth] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct AuthConfig {
pub server: Url,
@@ -146,7 +184,7 @@ pub struct AuthConfig {
}
impl Default for AuthConfig {
- fn default() -> AuthConfig {
+ fn default() -> Self {
AuthConfig {
server: "http://127.0.0.1:9001".parse().unwrap(),
client_id: "client-id".to_string(),
@@ -156,23 +194,86 @@ impl Default for AuthConfig {
}
}
+#[derive(RustcDecodable)]
+struct ParsedAuthConfig {
+ server: Option<Url>,
+ client_id: Option<String>,
+ client_secret: Option<String>,
+ credentials_file: Option<String>,
+}
-/// A parsed representation of the [core] configuration section.
+impl Default for ParsedAuthConfig {
+ fn default() -> Self {
+ ParsedAuthConfig {
+ server: None,
+ client_id: None,
+ client_secret: None,
+ credentials_file: None
+ }
+ }
+}
+
+impl Defaultify<AuthConfig> for ParsedAuthConfig {
+ fn defaultify(&mut self) -> AuthConfig {
+ let default = AuthConfig::default();
+ AuthConfig {
+ server: self.server.take().unwrap_or(default.server),
+ client_id: self.client_id.take().unwrap_or(default.client_id),
+ client_secret: self.client_secret.take().unwrap_or(default.client_secret),
+ credentials_file: self.credentials_file.take().unwrap_or(default.credentials_file)
+ }
+ }
+}
+
+
+/// The [core] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct CoreConfig {
- pub server: Url
+ pub server: Url,
+ pub polling: bool,
+ pub polling_sec: u64
}
impl Default for CoreConfig {
fn default() -> CoreConfig {
CoreConfig {
- server: "http://127.0.0.1:8080".parse().unwrap()
+ server: "http://127.0.0.1:8080".parse().unwrap(),
+ polling: true,
+ polling_sec: 10
+ }
+ }
+}
+
+#[derive(RustcDecodable)]
+struct ParsedCoreConfig {
+ server: Option<Url>,
+ polling: Option<bool>,
+ polling_sec: Option<u64>
+}
+
+impl Default for ParsedCoreConfig {
+ fn default() -> Self {
+ ParsedCoreConfig {
+ server: None,
+ polling: None,
+ polling_sec: None
+ }
+ }
+}
+
+impl Defaultify<CoreConfig> for ParsedCoreConfig {
+ fn defaultify(&mut self) -> CoreConfig {
+ let default = CoreConfig::default();
+ CoreConfig {
+ server: self.server.take().unwrap_or(default.server),
+ polling: self.polling.take().unwrap_or(default.polling),
+ polling_sec: self.polling_sec.take().unwrap_or(default.polling_sec)
}
}
}
-/// A parsed representation of the [dbus] configuration section.
+/// The [dbus] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct DBusConfig {
pub name: String,
@@ -180,7 +281,7 @@ pub struct DBusConfig {
pub interface: String,
pub software_manager: String,
pub software_manager_path: String,
- pub timeout: i32, // dbus-rs expects a signed int
+ pub timeout: i32,
}
impl Default for DBusConfig {
@@ -196,17 +297,53 @@ impl Default for DBusConfig {
}
}
+#[derive(RustcDecodable)]
+struct ParsedDBusConfig {
+ name: Option<String>,
+ path: Option<String>,
+ interface: Option<String>,
+ software_manager: Option<String>,
+ software_manager_path: Option<String>,
+ timeout: Option<i32>,
+}
+
+impl Default for ParsedDBusConfig {
+ fn default() -> Self {
+ ParsedDBusConfig {
+ name: None,
+ path: None,
+ interface: None,
+ software_manager: None,
+ software_manager_path: None,
+ timeout: None
+ }
+ }
+}
+
+impl Defaultify<DBusConfig> for ParsedDBusConfig {
+ fn defaultify(&mut self) -> DBusConfig {
+ let default = DBusConfig::default();
+ DBusConfig {
+ name: self.name.take().unwrap_or(default.name),
+ path: self.path.take().unwrap_or(default.path),
+ interface: self.interface.take().unwrap_or(default.interface),
+ software_manager: self.software_manager.take().unwrap_or(default.software_manager),
+ software_manager_path: self.software_manager_path.take().unwrap_or(default.software_manager_path),
+ timeout: self.timeout.take().unwrap_or(default.timeout)
+ }
+ }
+}
+
-/// A parsed representation of the [device] configuration section.
+/// The [device] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct DeviceConfig {
pub uuid: String,
pub vin: String,
pub packages_dir: String,
pub package_manager: PackageManager,
- pub system_info: SystemInfo,
- pub polling_interval: u64,
pub certificates_path: String,
+ pub system_info: Option<String>,
}
impl Default for DeviceConfig {
@@ -215,16 +352,54 @@ impl Default for DeviceConfig {
uuid: "123e4567-e89b-12d3-a456-426655440000".to_string(),
vin: "V1234567890123456".to_string(),
packages_dir: "/tmp/".to_string(),
- package_manager: PackageManager::Deb,
- system_info: SystemInfo::default(),
- polling_interval: 10,
- certificates_path: "/tmp/sota_certificates".to_string()
+ package_manager: PackageManager::Off,
+ certificates_path: "/tmp/sota_certificates".to_string(),
+ system_info: Some("system_info.sh".to_string())
+ }
+ }
+}
+
+#[derive(RustcDecodable)]
+struct ParsedDeviceConfig {
+ pub uuid: Option<String>,
+ pub vin: Option<String>,
+ pub packages_dir: Option<String>,
+ pub package_manager: Option<PackageManager>,
+ pub polling_interval: Option<u64>,
+ pub certificates_path: Option<String>,
+ pub system_info: Option<String>,
+}
+
+impl Default for ParsedDeviceConfig {
+ fn default() -> Self {
+ ParsedDeviceConfig {
+ uuid: None,
+ vin: None,
+ packages_dir: None,
+ package_manager: None,
+ polling_interval: None,
+ certificates_path: None,
+ system_info: None,
+ }
+ }
+}
+
+impl Defaultify<DeviceConfig> for ParsedDeviceConfig {
+ fn defaultify(&mut self) -> DeviceConfig {
+ let default = DeviceConfig::default();
+ DeviceConfig {
+ uuid: self.uuid.take().unwrap_or(default.uuid),
+ vin: self.vin.take().unwrap_or(default.vin),
+ packages_dir: self.packages_dir.take().unwrap_or(default.packages_dir),
+ package_manager: self.package_manager.take().unwrap_or(default.package_manager),
+ certificates_path: self.certificates_path.take().unwrap_or(default.certificates_path),
+ system_info: self.system_info.take().or(default.system_info),
}
}
}
-/// A parsed representation of the [gateway] configuration section.
+/// The [gateway] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct GatewayConfig {
pub console: bool,
@@ -243,17 +418,54 @@ impl Default for GatewayConfig {
http: false,
rvi: false,
socket: false,
- websocket: true,
+ websocket: false,
+ }
+ }
+}
+
+#[derive(RustcDecodable)]
+struct ParsedGatewayConfig {
+ console: Option<bool>,
+ dbus: Option<bool>,
+ http: Option<bool>,
+ rvi: Option<bool>,
+ socket: Option<bool>,
+ websocket: Option<bool>,
+}
+
+impl Default for ParsedGatewayConfig {
+ fn default() -> Self {
+ ParsedGatewayConfig {
+ console: None,
+ dbus: None,
+ http: None,
+ rvi: None,
+ socket: None,
+ websocket: None
}
}
}
+impl Defaultify<GatewayConfig> for ParsedGatewayConfig {
+ fn defaultify(&mut self) -> GatewayConfig {
+ let default = GatewayConfig::default();
+ GatewayConfig {
+ console: self.console.take().unwrap_or(default.console),
+ dbus: self.dbus.take().unwrap_or(default.dbus),
+ http: self.http.take().unwrap_or(default.http),
+ rvi: self.rvi.take().unwrap_or(default.rvi),
+ socket: self.socket.take().unwrap_or(default.socket),
+ websocket: self.websocket.take().unwrap_or(default.websocket)
+ }
+ }
+}
-/// A parsed representation of the [network] configuration section.
+
+/// The [network] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct NetworkConfig {
- pub http_server: String,
- pub rvi_edge_server: String,
+ pub http_server: SocketAddr,
+ pub rvi_edge_server: SocketAddr,
pub socket_commands_path: String,
pub socket_events_path: String,
pub websocket_server: String
@@ -262,17 +474,51 @@ pub struct NetworkConfig {
impl Default for NetworkConfig {
fn default() -> NetworkConfig {
NetworkConfig {
- http_server: "http://127.0.0.1:8888".to_string(),
- rvi_edge_server: "http://127.0.0.1:9080".to_string(),
+ http_server: "127.0.0.1:8888".parse().unwrap(),
+ rvi_edge_server: "127.0.0.1:9080".parse().unwrap(),
socket_commands_path: "/tmp/sota-commands.socket".to_string(),
socket_events_path: "/tmp/sota-events.socket".to_string(),
- websocket_server: "ws://127.0.0.1:3012".to_string()
+ websocket_server: "127.0.0.1:3012".to_string()
}
}
}
+#[derive(RustcDecodable)]
+struct ParsedNetworkConfig {
+ http_server: Option<SocketAddr>,
+ rvi_edge_server: Option<SocketAddr>,
+ socket_commands_path: Option<String>,
+ socket_events_path: Option<String>,
+ websocket_server: Option<String>
+}
+
+impl Default for ParsedNetworkConfig {
+ fn default() -> Self {
+ ParsedNetworkConfig {
+ http_server: None,
+ rvi_edge_server: None,
+ socket_commands_path: None,
+ socket_events_path: None,
+ websocket_server: None
+ }
+ }
+}
-/// A parsed representation of the [rvi] configuration section.
+impl Defaultify<NetworkConfig> for ParsedNetworkConfig {
+ fn defaultify(&mut self) -> NetworkConfig {
+ let default = NetworkConfig::default();
+ NetworkConfig {
+ http_server: self.http_server.take().unwrap_or(default.http_server),
+ rvi_edge_server: self.rvi_edge_server.take().unwrap_or(default.rvi_edge_server),
+ socket_commands_path: self.socket_commands_path.take().unwrap_or(default.socket_commands_path),
+ socket_events_path: self.socket_events_path.take().unwrap_or(default.socket_events_path),
+ websocket_server: self.websocket_server.take().unwrap_or(default.websocket_server)
+ }
+ }
+}
+
+
+/// The [rvi] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct RviConfig {
pub client: Url,
@@ -285,7 +531,35 @@ impl Default for RviConfig {
RviConfig {
client: "http://127.0.0.1:8901".parse().unwrap(),
storage_dir: "/var/sota".to_string(),
- timeout: Some(20),
+ timeout: None,
+ }
+ }
+}
+
+#[derive(RustcDecodable)]
+struct ParsedRviConfig {
+ client: Option<Url>,
+ storage_dir: Option<String>,
+ timeout: Option<i64>,
+}
+
+impl Default for ParsedRviConfig {
+ fn default() -> Self {
+ ParsedRviConfig {
+ client: None,
+ storage_dir: None,
+ timeout: None
+ }
+ }
+}
+
+impl Defaultify<RviConfig> for ParsedRviConfig {
+ fn defaultify(&mut self) -> RviConfig {
+ let default = RviConfig::default();
+ RviConfig {
+ client: self.client.take().unwrap_or(default.client),
+ storage_dir: self.storage_dir.take().unwrap_or(default.storage_dir),
+ timeout: self.timeout.take().or(default.timeout)
}
}
}
@@ -309,6 +583,8 @@ mod tests {
r#"
[core]
server = "http://127.0.0.1:8080"
+ polling = true
+ polling_sec = 10
"#;
const DBUS_CONFIG: &'static str =
@@ -327,11 +603,10 @@ mod tests {
[device]
uuid = "123e4567-e89b-12d3-a456-426655440000"
vin = "V1234567890123456"
- system_info = "system_info.sh"
- polling_interval = 10
packages_dir = "/tmp/"
- package_manager = "deb"
+ package_manager = "off"
certificates_path = "/tmp/sota_certificates"
+ system_info = "system_info.sh"
"#;
const GATEWAY_CONFIG: &'static str =
@@ -342,17 +617,17 @@ mod tests {
http = false
rvi = false
socket = false
- websocket = true
+ websocket = false
"#;
const NETWORK_CONFIG: &'static str =
r#"
[network]
- http_server = "http://127.0.0.1:8888"
- rvi_edge_server = "http://127.0.0.1:9080"
+ http_server = "127.0.0.1:8888"
+ rvi_edge_server = "127.0.0.1:9080"
socket_commands_path = "/tmp/sota-commands.socket"
socket_events_path = "/tmp/sota-events.socket"
- websocket_server = "ws://127.0.0.1:3012"
+ websocket_server = "127.0.0.1:3012"
"#;
const RVI_CONFIG: &'static str =
@@ -365,7 +640,12 @@ mod tests {
#[test]
- fn parse_default_config() {
+ fn empty_config() {
+ assert_eq!(Config::parse("").unwrap(), Config::default());
+ }
+
+ #[test]
+ fn basic_config() {
let config = String::new()
+ CORE_CONFIG
+ DEVICE_CONFIG
@@ -375,7 +655,7 @@ mod tests {
}
#[test]
- fn parse_example_config() {
+ fn default_config() {
let config = String::new()
+ AUTH_CONFIG
+ CORE_CONFIG
@@ -384,6 +664,13 @@ mod tests {
+ GATEWAY_CONFIG
+ NETWORK_CONFIG
+ RVI_CONFIG;
- assert_eq!(Config::load("tests/sota.toml").unwrap(), Config::parse(&config).unwrap());
+ assert_eq!(Config::load("tests/toml/default.toml").unwrap(), Config::parse(&config).unwrap());
+ }
+
+ #[test]
+ fn backwards_compatible_config() {
+ let config = Config::load("tests/toml/old.toml").unwrap();
+ assert_eq!(config.core.polling, true);
+ assert_eq!(config.core.polling_sec, 10);
}
}
diff --git a/src/datatype/error.rs b/src/datatype/error.rs
index 8267234..9503e2c 100644
--- a/src/datatype/error.rs
+++ b/src/datatype/error.rs
@@ -13,7 +13,7 @@ use toml::{ParserError as TomlParserError, DecodeError as TomlDecodeError};
use url::ParseError as UrlParseError;
use datatype::Event;
-use http::auth_client::AuthHandler;
+use http::{AuthHandler, ResponseData};
use gateway::Interpret;
use ws::Error as WebsocketError;
@@ -21,10 +21,12 @@ use ws::Error as WebsocketError;
/// System-wide errors that are returned from `Result` type failures.
#[derive(Debug)]
pub enum Error {
- Authorization(String),
Client(String),
Command(String),
+ Config(String),
FromUtf8(FromUtf8Error),
+ Http(ResponseData),
+ HttpAuth(ResponseData),
Hyper(HyperError),
HyperClient(HyperClientError<AuthHandler>),
Io(IoError),
@@ -76,6 +78,7 @@ derive_from!([
JsonEncoderError => JsonEncoder,
JsonDecoderError => JsonDecoder,
RecvError => Recv,
+ ResponseData => Http,
TomlDecodeError => TomlDecode,
UrlParseError => UrlParse,
WebsocketError => Websocket
@@ -92,9 +95,11 @@ impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
let inner: String = match *self {
Error::Client(ref s) => format!("Http client error: {}", s.clone()),
- Error::Authorization(ref s) => format!("Http client authorization error: {}", s.clone()),
Error::Command(ref e) => format!("Unknown Command: {}", e.clone()),
+ Error::Config(ref s) => format!("Bad Config: {}", s.clone()),
Error::FromUtf8(ref e) => format!("From utf8 error: {}", e.clone()),
+ Error::Http(ref r) => format!("HTTP client error: {}", r.clone()),
+ Error::HttpAuth(ref r) => format!("HTTP authorization error: {}", r.clone()),
Error::Hyper(ref e) => format!("Hyper error: {}", e.clone()),
Error::HyperClient(ref e) => format!("Hyper client error: {}", e.clone()),
Error::Io(ref e) => format!("IO error: {}", e.clone()),
diff --git a/src/datatype/event.rs b/src/datatype/event.rs
index e3f84ca..93ed4ec 100644
--- a/src/datatype/event.rs
+++ b/src/datatype/event.rs
@@ -1,7 +1,7 @@
use std::fmt::{Display, Formatter, Result as FmtResult};
use datatype::{DownloadComplete, Package, UpdateAvailable, UpdateReport,
- UpdateRequestId};
+ UpdateRequest, UpdateRequestId};
/// System-wide events that are broadcast to all interested parties.
@@ -14,22 +14,20 @@ pub enum Event {
Authenticated,
/// An operation failed because we are not currently authenticated.
NotAuthenticated,
+ /// Nothing was done as we are already authenticated.
+ AlreadyAuthenticated,
- /// There are new updates available.
- NewUpdatesReceived(Vec<UpdateRequestId>),
- /// A notification from RVI of a new update.
- NewUpdateAvailable(UpdateAvailable),
- /// There are no new updates available.
- NoNewUpdates,
+ /// A notification from Core of pending or in-flight updates.
+ UpdatesReceived(Vec<UpdateRequest>),
+ /// A notification from RVI of a pending update.
+ UpdateAvailable(UpdateAvailable),
+ /// There are no outstanding update requests.
+ NoUpdateRequests,
/// The following packages are installed on the device.
FoundInstalledPackages(Vec<Package>),
/// An update on the system information was received.
FoundSystemInfo(String),
- /// A list of installed packages was sent to the Core server.
- InstalledPackagesSent,
- /// An update report was sent to the Core server.
- UpdateReportSent,
/// Downloading an update.
DownloadingUpdate(UpdateRequestId),
@@ -45,6 +43,15 @@ pub enum Event {
/// The installation of an update failed.
InstallFailed(UpdateReport),
+ /// An update report was sent to the Core server.
+ UpdateReportSent,
+ /// A list of installed packages was sent to the Core server.
+ InstalledPackagesSent,
+ /// A list of installed software was sent to the Core server.
+ InstalledSoftwareSent,
+ /// The system information was sent to the Core server.
+ SystemInfoSent,
+
/// A broadcast event requesting an update on externally installed software.
InstalledSoftwareNeeded,
}
diff --git a/src/datatype/json_rpc.rs b/src/datatype/json_rpc.rs
index 3eed9a2..e3a046a 100644
--- a/src/datatype/json_rpc.rs
+++ b/src/datatype/json_rpc.rs
@@ -1,7 +1,7 @@
use rustc_serialize::{json, Decodable, Encodable};
use time;
-use http::{AuthClient, Client};
+use http::{AuthClient, Client, Response};
use super::Url;
@@ -32,8 +32,12 @@ impl<E: Encodable> RpcRequest<E> {
let body = json::encode(self).expect("couldn't encode RpcRequest");
let resp_rx = client.post(url, Some(body.into_bytes()));
let resp = resp_rx.recv().expect("no RpcRequest response received");
- let data = try!(resp.map_err(|err| format!("{}", err)));
- String::from_utf8(data).map_err(|err| format!("{}", err))
+
+ match resp {
+ Response::Success(data) => String::from_utf8(data.body).or_else(|err| Err(format!("{}", err))),
+ Response::Failed(data) => Err(format!("{}", data)),
+ Response::Error(err) => Err(format!("{}", err))
+ }
}
}
diff --git a/src/datatype/mod.rs b/src/datatype/mod.rs
index 8a9ca4e..71e7e0a 100644
--- a/src/datatype/mod.rs
+++ b/src/datatype/mod.rs
@@ -5,22 +5,23 @@ pub mod dbus;
pub mod error;
pub mod event;
pub mod json_rpc;
-pub mod package;
-pub mod system_info;
+pub mod network;
+pub mod shell;
pub mod update_report;
-pub mod url;
+pub mod update_request;
-pub use self::auth::{AccessToken, Auth, ClientId, ClientSecret, ClientCredentials};
+pub use self::auth::{AccessToken, Auth, ClientCredentials};
pub use self::command::Command;
pub use self::config::{AuthConfig, CoreConfig, Config, DBusConfig, DeviceConfig,
GatewayConfig, RviConfig};
pub use self::error::Error;
pub use self::event::Event;
pub use self::json_rpc::{RpcRequest, RpcOk, RpcErr};
-pub use self::package::{ChunkReceived, DownloadStarted, DownloadComplete, Package,
- PendingUpdateRequest, UpdateAvailable, UpdateRequestId};
-pub use self::system_info::SystemInfo;
+pub use self::network::{Method, SocketAddr, Url};
+pub use self::shell::system_info;
pub use self::update_report::{DeviceReport, InstalledFirmware, InstalledPackage,
InstalledSoftware, OperationResult, UpdateResultCode,
UpdateReport};
-pub use self::url::{Method, Url};
+pub use self::update_request::{ChunkReceived, DownloadComplete, DownloadFailed,
+ DownloadStarted, Package, UpdateAvailable,
+ UpdateRequest, UpdateRequestId, UpdateRequestStatus};
diff --git a/src/datatype/url.rs b/src/datatype/network.rs
index 5a9c97a..56f88ae 100644
--- a/src/datatype/url.rs
+++ b/src/datatype/network.rs
@@ -2,17 +2,53 @@ use hyper::method;
use rustc_serialize::{Decoder, Decodable};
use std::borrow::Cow;
use std::fmt::{Display, Formatter, Result as FmtResult};
-use std::io;
-use std::net::ToSocketAddrs;
+use std::net::{SocketAddr as StdSocketAddr};
+use std::ops::Deref;
use std::str::FromStr;
use url;
-use url::SocketAddrs;
use datatype::Error;
-/// Encapsulate a single crate URL with additional methods and traits.
-#[derive(PartialEq, Eq, Clone, Debug)]
+/// Encapsulate a socket address for implementing additional traits.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct SocketAddr(pub StdSocketAddr);
+
+impl Decodable for SocketAddr {
+ fn decode<D: Decoder>(d: &mut D) -> Result<SocketAddr, D::Error> {
+ let addr = try!(d.read_str());
+ addr.parse().or_else(|err| Err(d.error(&format!("{}", err))))
+ }
+}
+
+impl FromStr for SocketAddr {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match StdSocketAddr::from_str(s) {
+ Ok(addr) => Ok(SocketAddr(addr)),
+ Err(err) => Err(Error::Parse(format!("couldn't parse SocketAddr: {}", err)))
+ }
+ }
+}
+
+impl Deref for SocketAddr {
+ type Target = StdSocketAddr;
+
+ fn deref(&self) -> &StdSocketAddr {
+ &self.0
+ }
+}
+
+impl Display for SocketAddr {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "{}", self.0)
+ }
+}
+
+
+/// Encapsulate a url with additional methods and traits.
+#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Url(pub url::Url);
impl Url {
@@ -21,11 +57,6 @@ impl Url {
let url = try!(self.0.join(suffix));
Ok(Url(url))
}
-
- /// Return the encapsulated crate URL.
- pub fn inner(&self) -> url::Url {
- self.0.clone()
- }
}
impl<'a> Into<Cow<'a, Url>> for Url {
@@ -50,11 +81,11 @@ impl Decodable for Url {
}
}
-impl ToSocketAddrs for Url {
- type Iter = SocketAddrs;
+impl Deref for Url {
+ type Target = url::Url;
- fn to_socket_addrs(&self) -> io::Result<Self::Iter> {
- self.0.to_socket_addrs()
+ fn deref(&self) -> &url::Url {
+ &self.0
}
}
diff --git a/src/datatype/shell.rs b/src/datatype/shell.rs
new file mode 100644
index 0000000..d7cd4af
--- /dev/null
+++ b/src/datatype/shell.rs
@@ -0,0 +1,12 @@
+use std::process::Command;
+
+use datatype::Error;
+
+
+/// Generate a new system information report.
+pub fn system_info(cmd: &str) -> Result<String, Error> {
+ Command::new(cmd)
+ .output()
+ .map_err(|err| Error::SystemInfo(err.to_string()))
+ .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8))
+}
diff --git a/src/datatype/system_info.rs b/src/datatype/system_info.rs
deleted file mode 100644
index 2d8fff2..0000000
--- a/src/datatype/system_info.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use rustc_serialize::{Decoder, Decodable};
-use std::process::Command;
-use std::str::FromStr;
-
-use datatype::Error;
-
-
-/// A reference to the command that will report on the system information.
-#[derive(PartialEq, Eq, Debug, Clone)]
-pub struct SystemInfo {
- command: String
-}
-
-impl SystemInfo {
- /// Instantiate a new type to report on the system information.
- pub fn new(command: String) -> SystemInfo {
- SystemInfo { command: command }
- }
-
- /// Generate a new report of the system information.
- pub fn report(&self) -> Result<String, Error> {
- Command::new(&self.command)
- .output().map_err(|err| Error::SystemInfo(err.to_string()))
- .and_then(|info| String::from_utf8(info.stdout).map_err(Error::FromUtf8))
- }
-}
-
-impl Default for SystemInfo {
- fn default() -> SystemInfo {
- SystemInfo::new("system_info.sh".to_string())
- }
-}
-
-impl FromStr for SystemInfo {
- type Err = Error;
-
- fn from_str(s: &str) -> Result<SystemInfo, Error> {
- Ok(SystemInfo::new(s.to_string()))
- }
-}
-
-impl Decodable for SystemInfo {
- fn decode<D: Decoder>(d: &mut D) -> Result<SystemInfo, D::Error> {
- d.read_str().and_then(|s| Ok(s.parse::<SystemInfo>().unwrap()))
- }
-}
diff --git a/src/datatype/package.rs b/src/datatype/update_request.rs
index 146ff06..7398848 100644
--- a/src/datatype/package.rs
+++ b/src/datatype/update_request.rs
@@ -3,14 +3,36 @@ use std::fmt::{Display, Formatter, Result as FmtResult};
use rvi::services::LocalServices;
-/// Encapsulate a `String` type used to represent the `Package` version.
-pub type Version = String;
+/// Encapsulate a `String` type as the id of a specific update request.
+pub type UpdateRequestId = String;
+
+/// A device update request from Core to be installed by the client.
+#[allow(non_snake_case)]
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
+pub struct UpdateRequest {
+ pub requestId: UpdateRequestId,
+ pub status: UpdateRequestStatus,
+ pub packageId: Package,
+ pub installPos: i32,
+ pub createdAt: String,
+}
+
+/// The status of an `UpdateRequest` from Core.
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
+pub enum UpdateRequestStatus {
+ Pending,
+ InFlight,
+ Canceled,
+ Failed,
+ Finished
+}
+
/// Encodes the name and version of a specific package.
-#[derive(Debug, PartialEq, Eq, RustcEncodable, RustcDecodable, Clone)]
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
pub struct Package {
pub name: String,
- pub version: Version
+ pub version: String
}
impl Display for Package {
@@ -20,19 +42,6 @@ impl Display for Package {
}
-/// Encapsulate a `String` type as the id of a specific update request.
-pub type UpdateRequestId = String;
-
-/// A single pending update request to be installed by the client.
-#[allow(non_snake_case)]
-#[derive(Clone, PartialEq, Eq, Debug, RustcEncodable, RustcDecodable)]
-pub struct PendingUpdateRequest {
- pub requestId: UpdateRequestId,
- pub installPos: i32,
- pub packageId: Package,
- pub createdAt: String
-}
-
/// A notification from RVI that a new update is available.
#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
pub struct UpdateAvailable {
@@ -59,8 +68,7 @@ pub struct ChunkReceived {
pub chunks: Vec<u64>,
}
-/// A notification to indicate to any external package manager that the package
-/// download has successfully completed.
+/// A notification to an external package manager that the package was downloaded.
#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
pub struct DownloadComplete {
pub update_id: String,
@@ -68,12 +76,9 @@ pub struct DownloadComplete {
pub signature: String
}
-impl Default for DownloadComplete {
- fn default() -> Self {
- DownloadComplete {
- update_id: "".to_string(),
- update_image: "".to_string(),
- signature: "".to_string()
- }
- }
+/// A notification to an external package manager that the package download failed.
+#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
+pub struct DownloadFailed {
+ pub update_id: String,
+ pub reason: String
}
diff --git a/src/gateway/dbus.rs b/src/gateway/dbus.rs
index 3bd41ea..321e262 100644
--- a/src/gateway/dbus.rs
+++ b/src/gateway/dbus.rs
@@ -24,7 +24,7 @@ impl Gateway for DBus {
thread::spawn(move || {
let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session");
- conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).unwrap();
+ conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).expect("couldn't register name");
let mut obj_path = ObjectPath::new(&conn, &dbus_cfg.path, true);
obj_path.insert_interface(&dbus_cfg.interface, default_interface(itx));
@@ -33,10 +33,11 @@ impl Gateway for DBus {
loop {
for item in conn.iter(1000) {
if let ConnectionItem::MethodCall(mut msg) = item {
- info!("DBus method call: {:?}", msg);
- obj_path.handle_message(&mut msg).map(|result| {
- let _ = result.map_err(|_| error!("dbus method call failed: {:?}", msg));
- });
+ match obj_path.handle_message(&mut msg) {
+ Some(Ok(())) => info!("DBus message sent: {:?}", msg),
+ Some(Err(())) => error!("DBus message send failed: {:?}", msg),
+ None => debug!("unhandled dbus message: {:?}", msg)
+ }
}
}
}
@@ -47,8 +48,8 @@ impl Gateway for DBus {
fn pulse(&self, event: Event) {
match event {
- Event::NewUpdateAvailable(avail) => {
- let msg = self.new_message("updateAvailable", &[
+ Event::UpdateAvailable(avail) => {
+ let msg = self.new_swm_message("updateAvailable", &[
MessageItem::from(avail.update_id),
MessageItem::from(avail.signature),
MessageItem::from(avail.description),
@@ -59,7 +60,7 @@ impl Gateway for DBus {
}
Event::DownloadComplete(comp) => {
- let msg = self.new_message("downloadComplete", &[
+ let msg = self.new_swm_message("downloadComplete", &[
MessageItem::from(comp.update_image),
MessageItem::from(comp.signature)
]);
@@ -68,7 +69,7 @@ impl Gateway for DBus {
}
Event::InstalledSoftwareNeeded => {
- let msg = self.new_message("getInstalledPackages", &[
+ let msg = self.new_swm_message("getInstalledPackages", &[
MessageItem::from(true), // include packages?
MessageItem::from(false) // include firmware?
]);
@@ -103,7 +104,7 @@ impl Gateway for DBus {
}
impl DBus {
- fn new_message(&self, method: &str, args: &[MessageItem]) -> Message {
+ fn new_swm_message(&self, method: &str, args: &[MessageItem]) -> Message {
let mgr = self.dbus_cfg.software_manager.clone();
let path = self.dbus_cfg.software_manager_path.clone();
let result = Message::new_method_call(&mgr, &path, &mgr, method);
@@ -139,19 +140,19 @@ fn send(itx: &Sender<Interpret>, cmd: Command) {
fn handle_initiate_download(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult {
let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg()));
- debug!("handle_initiate_download: sender={:?}, msg={:?}", sender, msg);
+ debug!("dbus handle_initiate_download: sender={:?}, msg={:?}", sender, msg);
let mut args = msg.get_items().into_iter();
let arg_id = try!(args.next().ok_or(dbus::missing_arg()));
let update_id: &String = try!(FromMessageItem::from(&arg_id).or(Err(dbus::malformed_arg())));
- send(itx, Command::StartDownload(vec![update_id.clone()]));
+ send(itx, Command::StartDownload(update_id.clone()));
Ok(vec![])
}
fn handle_update_report(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult {
let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg()));
- debug!("handle_update_report: sender ={:?}, msg ={:?}", sender, msg);
+ debug!("dbus handle_update_report: sender={:?}, msg={:?}", sender, msg);
let mut args = msg.get_items().into_iter();
let id_arg = try!(args.next().ok_or(dbus::missing_arg()));
diff --git a/src/gateway/http.rs b/src/gateway/http.rs
index 990a1fc..52a271c 100644
--- a/src/gateway/http.rs
+++ b/src/gateway/http.rs
@@ -4,6 +4,7 @@ use hyper::StatusCode;
use hyper::net::{HttpStream, Transport};
use hyper::server::{Server as HyperServer, Request as HyperRequest};
use rustc_serialize::json;
+use std::net::SocketAddr;
use std::thread;
use std::sync::{Arc, Mutex};
@@ -14,16 +15,16 @@ use http::{Server, ServerHandler};
/// The `Http` gateway parses `Command`s from the body of incoming requests.
pub struct Http {
- pub server: String,
+ pub server: SocketAddr
}
impl Gateway for Http {
fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
- let itx = Arc::new(Mutex::new(itx));
- let server = match HyperServer::http(&self.server.parse().expect("couldn't parse http address")) {
- Ok(server) => server,
- Err(err) => return Err(format!("couldn't start http gateway: {}", err))
- };
+ let itx = Arc::new(Mutex::new(itx));
+ let server = try!(HyperServer::http(&self.server).map_err(|err| {
+ format!("couldn't start http gateway: {}", err)
+ }));
+
thread::spawn(move || {
let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap();
server.run();
@@ -91,7 +92,7 @@ mod tests {
use super::*;
use gateway::{Gateway, Interpret};
use datatype::{Command, Event};
- use http::{AuthClient, Client, set_ca_certificates};
+ use http::{AuthClient, Client, Response, set_ca_certificates};
#[test]
@@ -101,15 +102,15 @@ mod tests {
let (etx, erx) = chan::sync::<Event>(0);
let (itx, irx) = chan::sync::<Interpret>(0);
- thread::spawn(move || Http { server: "127.0.0.1:8888".to_string() }.start(itx, erx));
+ thread::spawn(move || Http { server: "127.0.0.1:8888".parse().unwrap() }.start(itx, erx));
thread::spawn(move || {
let _ = etx; // move into this scope
loop {
let interpret = irx.recv().expect("itx is closed");
match interpret.command {
- Command::StartDownload(ids) => {
+ Command::StartDownload(id) => {
let tx = interpret.response_tx.unwrap();
- tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ tx.lock().unwrap().send(Event::FoundSystemInfo(id));
}
_ => panic!("expected AcceptUpdates"),
}
@@ -119,13 +120,17 @@ mod tests {
crossbeam::scope(|scope| {
for id in 0..10 {
scope.spawn(move || {
- let cmd = Command::StartDownload(vec!(format!("{}", id)));
+ let cmd = Command::StartDownload(format!("{}", id));
let client = AuthClient::default();
let url = "http://127.0.0.1:8888".parse().unwrap();
let body = json::encode(&cmd).unwrap();
let resp_rx = client.post(url, Some(body.into_bytes()));
- let resp = resp_rx.recv().unwrap().unwrap();
- let text = String::from_utf8(resp).unwrap();
+ let resp = resp_rx.recv().unwrap();
+ let text = match resp {
+ Response::Success(data) => String::from_utf8(data.body).unwrap(),
+ Response::Failed(data) => panic!("failed response: {}", data),
+ Response::Error(err) => panic!("error response: {}", err)
+ };
assert_eq!(json::decode::<Event>(&text).unwrap(),
Event::FoundSystemInfo(format!("{}", id)));
});
diff --git a/src/gateway/socket.rs b/src/gateway/socket.rs
index f252e7c..571e34c 100644
--- a/src/gateway/socket.rs
+++ b/src/gateway/socket.rs
@@ -1,12 +1,12 @@
use chan;
use chan::Sender;
-use rustc_serialize::json;
+use rustc_serialize::{Encodable, json};
use std::io::{BufReader, Read, Write};
use std::net::Shutdown;
use std::sync::{Arc, Mutex};
use std::{fs, thread};
-use datatype::{Command, Error, Event};
+use datatype::{Command, DownloadFailed, Error, Event};
use super::{Gateway, Interpret};
use unix_socket::{UnixListener, UnixStream};
@@ -53,18 +53,32 @@ impl Gateway for Socket {
}
fn pulse(&self, event: Event) {
- match event {
+ let output = match event {
Event::DownloadComplete(dl) => {
- let _ = UnixStream::connect(&self.events_path).map(|mut stream| {
- stream.write_all(&json::encode(&dl).expect("couldn't encode Event").into_bytes())
- .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err));
- stream.shutdown(Shutdown::Write)
- .unwrap_or_else(|err| error!("couldn't close events socket: {}", err));
- }).map_err(|err| error!("couldn't open events socket: {}", err));
+ json::encode(&EventWrapper {
+ version: "0.1".to_string(),
+ event: "DownloadComplete".to_string(),
+ data: dl
+ }).expect("couldn't encode DownloadComplete event")
+ }
+
+ Event::DownloadFailed(id, reason) => {
+ json::encode(&EventWrapper {
+ version: "0.1".to_string(),
+ event: "DownloadFailed".to_string(),
+ data: DownloadFailed { update_id: id, reason: reason }
+ }).expect("couldn't encode DownloadFailed event")
}
- _ => ()
- }
+ _ => return
+ };
+
+ let _ = UnixStream::connect(&self.events_path).map(|mut stream| {
+ stream.write_all(&output.into_bytes())
+ .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err));
+ stream.shutdown(Shutdown::Write)
+ .unwrap_or_else(|err| error!("couldn't close events socket: {}", err));
+ }).map_err(|err| debug!("couldn't open events socket: {}", err));
}
}
@@ -85,6 +99,15 @@ fn handle_client(stream: &mut UnixStream, itx: Arc<Mutex<Sender<Interpret>>>) ->
}
+// FIXME(PRO-1322): create a proper JSON api
+#[derive(RustcEncodable, RustcDecodable, PartialEq, Eq, Debug)]
+pub struct EventWrapper<E: Encodable> {
+ pub version: String,
+ pub event: String,
+ pub data: E
+}
+
+
#[cfg(test)]
mod tests {
use chan;
@@ -126,17 +149,19 @@ mod tests {
let (mut stream, _) = server.accept().expect("couldn't read from events socket");
let mut text = String::new();
stream.read_to_string(&mut text).unwrap();
- let receive: DownloadComplete = json::decode(&text).expect("couldn't decode DownloadComplete message");
- assert_eq!(send, receive);
+ let receive: EventWrapper<DownloadComplete> = json::decode(&text).expect("couldn't decode Event");
+ assert_eq!(receive.version, "0.1".to_string());
+ assert_eq!(receive.event, "DownloadComplete".to_string());
+ assert_eq!(receive.data, send);
thread::spawn(move || {
let _ = etx; // move into this scope
loop {
let interpret = irx.recv().expect("gtx is closed");
match interpret.command {
- Command::StartDownload(ids) => {
+ Command::StartDownload(id) => {
let tx = interpret.response_tx.unwrap();
- tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ tx.lock().unwrap().send(Event::FoundSystemInfo(id));
}
_ => panic!("expected AcceptUpdates"),
}
diff --git a/src/gateway/websocket.rs b/src/gateway/websocket.rs
index eb5e040..8c597b9 100644
--- a/src/gateway/websocket.rs
+++ b/src/gateway/websocket.rs
@@ -1,12 +1,11 @@
use chan;
use chan::Sender;
use rustc_serialize::json;
-use std::thread;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
-use std::time::Duration;
+use std::thread;
use ws;
-use ws::{listen, CloseCode, Handler, Handshake, Message, Sender as WsSender};
+use ws::{CloseCode, Handler, Handshake, Message, Sender as WsSender};
use ws::util::Token;
use datatype::{Command, Error, Event};
@@ -24,10 +23,9 @@ impl Gateway for Websocket {
fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
let clients = self.clients.clone();
let addr = self.server.clone();
- info!("Opening websocket listener at {}", addr);
thread::spawn(move || {
- listen(&addr as &str, |out| {
+ ws::listen(&addr as &str, |out| {
WebsocketHandler {
out: out,
itx: itx.clone(),
@@ -36,8 +34,7 @@ impl Gateway for Websocket {
}).expect("couldn't start websocket listener");
});
- thread::sleep(Duration::from_secs(1)); // FIXME: ugly hack for blocking listen call
- Ok(info!("Websocket gateway started."))
+ Ok(info!("Websocket gateway started at {}.", self.server))
}
fn pulse(&self, event: Event) {
@@ -69,7 +66,7 @@ impl Handler for WebsocketHandler {
Err(err)
}
- Err(_) => unreachable!()
+ Err(err) => panic!("unexpected websocket on_message error: {}", err)
})
}
@@ -117,7 +114,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use ws;
- use ws::{connect, CloseCode};
+ use ws::CloseCode;
use datatype::{Command, Event};
use gateway::{Gateway, Interpret};
@@ -125,6 +122,7 @@ mod tests {
#[test]
+ #[ignore] // FIXME: wait for https://github.com/housleyjk/ws-rs/issues/64
fn websocket_connections() {
let (etx, erx) = chan::sync::<Event>(0);
let (itx, irx) = chan::sync::<Interpret>(0);
@@ -140,9 +138,9 @@ mod tests {
loop {
let interpret = irx.recv().expect("gtx is closed");
match interpret.command {
- Command::StartDownload(ids) => {
+ Command::StartDownload(id) => {
let tx = interpret.response_tx.unwrap();
- tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ tx.lock().unwrap().send(Event::FoundSystemInfo(id));
}
_ => panic!("expected AcceptUpdates"),
}
@@ -152,9 +150,9 @@ mod tests {
crossbeam::scope(|scope| {
for id in 0..10 {
scope.spawn(move || {
- connect("ws://localhost:3012", |out| {
- out.send(format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id))
- .expect("couldn't write to websocket");
+ ws::connect("ws://localhost:3012", |out| {
+ let msg = format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id);
+ out.send(msg).expect("couldn't write to websocket");
move |msg: ws::Message| {
let ev: Event = json::decode(&format!("{}", msg)).unwrap();
diff --git a/src/http/auth_client.rs b/src/http/auth_client.rs
index f4ad38b..3121c21 100644
--- a/src/http/auth_client.rs
+++ b/src/http/auth_client.rs
@@ -14,7 +14,7 @@ use std::time::Duration;
use time;
use datatype::{Auth, Error};
-use http::{Client, get_openssl, Request, Response};
+use http::{Client, get_openssl, Request, Response, ResponseData};
/// The `AuthClient` will attach an `Authentication` header to each outgoing
@@ -51,59 +51,31 @@ impl AuthClient {
impl Client for AuthClient {
fn chan_request(&self, req: Request, resp_tx: Sender<Response>) {
info!("{} {}", req.method, req.url);
- let _ = self.client.request(req.url.inner(), AuthHandler {
- auth: self.auth.clone(),
- req: req,
- timeout: Duration::from_secs(20),
- started: None,
- written: 0,
- response: Vec::new(),
- resp_tx: resp_tx.clone(),
- }).map_err(|err| resp_tx.send(Err(Error::from(err))));
+ let _ = self.client.request((*req.url).clone(), AuthHandler {
+ auth: self.auth.clone(),
+ req: req,
+ timeout: Duration::from_secs(20),
+ started: None,
+ written: 0,
+ resp_code: StatusCode::InternalServerError,
+ resp_body: Vec::new(),
+ resp_tx: resp_tx.clone(),
+ }).map_err(|err| resp_tx.send(Response::Error(Error::from(err))));
}
}
/// The async handler for outgoing HTTP requests.
-// FIXME: uncomment when yocto is at 1.8.0: #[derive(Debug)]
+#[derive(Debug)]
pub struct AuthHandler {
- auth: Auth,
- req: Request,
- timeout: Duration,
- started: Option<u64>,
- written: usize,
- response: Vec<u8>,
- resp_tx: Sender<Response>,
-}
-
-// FIXME: required for building on 1.7.0 only
-impl ::std::fmt::Debug for AuthHandler {
- fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
- write!(f, "unimplemented")
- }
-}
-
-impl AuthHandler {
- fn redirect_request(&mut self, resp: HyperResponse) {
- match resp.headers().get::<Location>() {
- Some(&Location(ref loc)) => self.req.url.join(loc).map(|url| {
- debug!("redirecting to {}", url);
- // drop Authentication Header on redirect
- let client = AuthClient::default();
- let resp_rx = client.send_request(Request {
- url: url,
- method: self.req.method.clone(),
- body: mem::replace(&mut self.req.body, None),
- });
- match resp_rx.recv().expect("no redirect_request response") {
- Ok(data) => self.resp_tx.send(Ok(data)),
- Err(err) => self.resp_tx.send(Err(Error::from(err)))
- }
- }).unwrap_or_else(|err| self.resp_tx.send(Err(Error::from(err)))),
-
- None => self.resp_tx.send(Err(Error::Client("redirect missing Location header".to_string())))
- }
- }
+ auth: Auth,
+ req: Request,
+ timeout: Duration,
+ started: Option<u64>,
+ written: usize,
+ resp_code: StatusCode,
+ resp_body: Vec<u8>,
+ resp_tx: Sender<Response>,
}
/// The `AuthClient` may be used for both HTTP and HTTPS connections.
@@ -125,15 +97,12 @@ impl Handler<Stream> for AuthHandler {
headers.set(ContentType(mime_json));
}
- Auth::Credentials(_, _) if self.req.body.is_some() => {
- panic!("no request body expected for Auth::Credentials");
- }
-
- Auth::Credentials(ref id, ref secret) => {
- headers.set(Authorization(Basic { username: id.0.clone(),
- password: Some(secret.0.clone()) }));
+ Auth::Credentials(ref cred) => {
+ headers.set(Authorization(Basic {
+ username: cred.client_id.clone(),
+ password: Some(cred.client_secret.clone())
+ }));
headers.set(ContentType(mime_form));
- self.req.body = Some(br#"grant_type=client_credentials"#.to_vec());
}
Auth::Token(ref token) => {
@@ -173,7 +142,7 @@ impl Handler<Stream> for AuthHandler {
Err(err) => {
error!("unable to write request body: {}", err);
- self.resp_tx.send(Err(Error::from(err)));
+ self.resp_tx.send(Response::Error(Error::from(err)));
Next::remove()
}
}
@@ -186,31 +155,25 @@ impl Handler<Stream> for AuthHandler {
let latency = time::precise_time_ns() as f64 - started as f64;
debug!("on_response latency: {}ms", (latency / 1e6) as u32);
- if resp.status().is_success() {
- if let Some(len) = resp.headers().get::<ContentLength>() {
- if **len > 0 {
- return Next::read();
- }
- }
- self.resp_tx.send(Ok(Vec::new()));
- Next::end()
- } else if resp.status().is_redirection() {
+ if resp.status().is_redirection() {
self.redirect_request(resp);
Next::end()
- } else if resp.status() == &StatusCode::Forbidden {
- self.resp_tx.send(Err(Error::Authorization(format!("{}", resp.status()))));
+ } else if let None = resp.headers().get::<ContentLength>() {
+ self.send_response(ResponseData { code: *resp.status(), body: Vec::new() });
Next::end()
} else {
- self.resp_tx.send(Err(Error::Client(format!("{}", resp.status()))));
- Next::end()
+ self.resp_code = *resp.status();
+ Next::read()
}
}
fn on_response_readable(&mut self, decoder: &mut Decoder<Stream>) -> Next {
- match io::copy(decoder, &mut self.response) {
+ match io::copy(decoder, &mut self.resp_body) {
Ok(0) => {
- debug!("on_response_readable bytes read: {}", self.response.len());
- self.resp_tx.send(Ok(mem::replace(&mut self.response, Vec::new())));
+ debug!("on_response_readable body size: {}", self.resp_body.len());
+ let code = self.resp_code.clone();
+ let body = mem::replace(&mut self.resp_body, Vec::new());
+ self.send_response(ResponseData { code: code, body: body });
Next::end()
}
@@ -226,7 +189,7 @@ impl Handler<Stream> for AuthHandler {
Err(err) => {
error!("unable to read response body: {}", err);
- self.resp_tx.send(Err(Error::from(err)));
+ self.resp_tx.send(Response::Error(Error::from(err)));
Next::end()
}
}
@@ -234,11 +197,41 @@ impl Handler<Stream> for AuthHandler {
fn on_error(&mut self, err: hyper::Error) -> Next {
error!("on_error: {}", err);
- self.resp_tx.send(Err(Error::from(err)));
+ self.resp_tx.send(Response::Error(Error::from(err)));
Next::remove()
}
}
+impl AuthHandler {
+ fn send_response(&mut self, resp: ResponseData) {
+ if resp.code == StatusCode::Unauthorized || resp.code == StatusCode::Forbidden {
+ self.resp_tx.send(Response::Error(Error::HttpAuth(resp)));
+ } else if resp.code.is_success() {
+ self.resp_tx.send(Response::Success(resp));
+ } else {
+ self.resp_tx.send(Response::Failed(resp));
+ }
+ }
+
+ fn redirect_request(&mut self, resp: HyperResponse) {
+ match resp.headers().get::<Location>() {
+ Some(&Location(ref loc)) => self.req.url.join(loc).map(|url| {
+ debug!("redirecting to {}", url);
+ // drop Authorization Header on redirect
+ let client = AuthClient::default();
+ let resp_rx = client.send_request(Request {
+ url: url,
+ method: self.req.method.clone(),
+ body: mem::replace(&mut self.req.body, None),
+ });
+ self.resp_tx.send(resp_rx.recv().expect("no redirect_request response"))
+ }).unwrap_or_else(|err| self.resp_tx.send(Response::Error(Error::from(err)))),
+
+ None => self.resp_tx.send(Response::Error((Error::Client("redirect missing Location header".to_string()))))
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
@@ -246,7 +239,7 @@ mod tests {
use std::path::Path;
use super::*;
- use http::{Client, set_ca_certificates};
+ use http::{Client, Response, set_ca_certificates};
fn get_client() -> AuthClient {
@@ -259,8 +252,13 @@ mod tests {
let client = get_client();
let url = "http://eu.httpbin.org/bytes/16?seed=123".parse().unwrap();
let resp_rx = client.get(url, None);
- let data = resp_rx.recv().unwrap().unwrap();
- assert_eq!(data, vec![13, 22, 104, 27, 230, 9, 137, 85, 218, 40, 86, 85, 62, 0, 111, 22]);
+ let resp = resp_rx.recv().unwrap();
+ let expect = vec![13, 22, 104, 27, 230, 9, 137, 85, 218, 40, 86, 85, 62, 0, 111, 22];
+ match resp {
+ Response::Success(data) => assert_eq!(data.body, expect),
+ Response::Failed(data) => panic!("failed response: {}", data),
+ Response::Error(err) => panic!("error response: {}", err)
+ };
}
#[test]
@@ -268,9 +266,13 @@ mod tests {
let client = get_client();
let url = "https://eu.httpbin.org/post".parse().unwrap();
let resp_rx = client.post(url, Some(br#"foo"#.to_vec()));
- let body = resp_rx.recv().unwrap().unwrap();
- let resp = String::from_utf8(body).unwrap();
- let json = Json::from_str(&resp).unwrap();
+ let resp = resp_rx.recv().unwrap();
+ let body = match resp {
+ Response::Success(data) => String::from_utf8(data.body).unwrap(),
+ Response::Failed(data) => panic!("failed response: {}", data),
+ Response::Error(err) => panic!("error response: {}", err)
+ };
+ let json = Json::from_str(&body).unwrap();
let obj = json.as_object().unwrap();
let data = obj.get("data").unwrap().as_string().unwrap();
assert_eq!(data, "foo");
diff --git a/src/http/http_client.rs b/src/http/http_client.rs
index 492166c..b911b8d 100644
--- a/src/http/http_client.rs
+++ b/src/http/http_client.rs
@@ -1,5 +1,8 @@
use chan;
use chan::{Sender, Receiver};
+use hyper::status::StatusCode;
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::str;
use datatype::{Error, Method, Url};
@@ -39,5 +42,42 @@ pub struct Request {
pub body: Option<Vec<u8>>
}
-/// Return the body of an HTTP response on success, or an `Error` otherwise.
-pub type Response = Result<Vec<u8>, Error>;
+
+/// A Response enumerates between a successful (e.g. 2xx) HTTP response, a failed
+/// (e.g. 4xx/5xx) response, or an Error before receiving any response.
+#[derive(Debug)]
+pub enum Response {
+ Success(ResponseData),
+ Failed(ResponseData),
+ Error(Error)
+}
+
+impl Display for Response {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ match *self {
+ Response::Success(ref data) => write!(f, "{}", data),
+ Response::Failed(ref data) => write!(f, "{}", data),
+ Response::Error(ref err) => write!(f, "{}", err),
+ }
+ }
+}
+
+
+/// Wraps the HTTP Status Code as well as any returned body.
+#[derive(Debug)]
+pub struct ResponseData {
+ pub code: StatusCode,
+ pub body: Vec<u8>
+}
+
+impl Display for ResponseData {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ match self.body.len() {
+ 0 => write!(f, "Response Code: {}", self.code),
+ n => match str::from_utf8(&self.body) {
+ Ok(text) => write!(f, "Response Code: {}, Body:\n{}", self.code, text),
+ Err(_) => write!(f, "Response Code: {}, Body: {} bytes", self.code, n),
+ }
+ }
+ }
+}
diff --git a/src/http/mod.rs b/src/http/mod.rs
index 5e990a3..11b1e3a 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -5,7 +5,7 @@ pub mod openssl;
pub mod test_client;
pub use self::auth_client::{AuthClient, AuthHandler};
-pub use self::http_client::{Client, Request, Response};
+pub use self::http_client::{Client, Request, Response, ResponseData};
pub use self::http_server::{Server, ServerHandler};
pub use self::openssl::{get_openssl, set_ca_certificates};
pub use self::test_client::TestClient;
diff --git a/src/http/test_client.rs b/src/http/test_client.rs
index 7857e0f..1886fdf 100644
--- a/src/http/test_client.rs
+++ b/src/http/test_client.rs
@@ -1,8 +1,9 @@
use chan::Sender;
+use hyper::status::StatusCode;
use std::cell::RefCell;
use datatype::Error;
-use http::{Client, Request, Response};
+use http::{Client, Request, Response, ResponseData};
/// The `TestClient` will return HTTP responses from an existing list of strings.
@@ -26,8 +27,11 @@ impl TestClient {
impl Client for TestClient {
fn chan_request(&self, req: Request, resp_tx: Sender<Response>) {
match self.responses.borrow_mut().pop() {
- Some(body) => resp_tx.send(Ok(body.as_bytes().to_vec())),
- None => resp_tx.send(Err(Error::Client(req.url.to_string())))
+ Some(body) => resp_tx.send(Response::Success(ResponseData {
+ code: StatusCode::Ok,
+ body: body.as_bytes().to_vec()
+ })),
+ None => resp_tx.send(Response::Error(Error::Client(req.url.to_string())))
}
}
diff --git a/src/interpreter.rs b/src/interpreter.rs
index b286ba5..d2d3f99 100644
--- a/src/interpreter.rs
+++ b/src/interpreter.rs
@@ -1,10 +1,13 @@
use chan;
-use chan::{Sender, Receiver};
-use std;
+use chan::{Sender, Receiver, WaitGroup};
+use std::{process, thread};
use std::borrow::Cow;
+use std::time::Duration;
+use time;
-use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config,
- Error, Event, Package, UpdateRequestId};
+use datatype::{AccessToken, Auth, ClientCredentials, Command, Config, Error, Event,
+ Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode,
+ system_info};
use gateway::Interpret;
use http::{AuthClient, Client};
use oauth2::authenticate;
@@ -18,9 +21,20 @@ use sota::Sota;
pub trait Interpreter<I, O> {
fn interpret(&mut self, input: I, otx: &Sender<O>);
- fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) {
+ fn run(&mut self, irx: Receiver<I>, otx: Sender<O>, wg: WaitGroup) {
+ let cooldown = Duration::from_millis(100);
+
loop {
- self.interpret(irx.recv().expect("interpreter sender closed"), &otx);
+ let input = irx.recv().expect("interpreter sender closed");
+ let started = time::precise_time_ns();
+
+ wg.add(1);
+ trace!("interpreter starting: {}", started);
+ self.interpret(input, &otx);
+
+ thread::sleep(cooldown); // let any further work commence
+ trace!("interpreter stopping: {}", started);
+ wg.done();
}
}
}
@@ -29,35 +43,68 @@ pub trait Interpreter<I, O> {
/// The `EventInterpreter` listens for `Event`s and optionally responds with
/// `Command`s that may be sent to the `CommandInterpreter`.
pub struct EventInterpreter {
- pub package_manager: PackageManager
+ pub pacman: PackageManager,
+ pub sysinfo: Option<String>,
}
impl Interpreter<Event, Command> for EventInterpreter {
fn interpret(&mut self, event: Event, ctx: &Sender<Command>) {
- info!("Event received: {}", event);
+ info!("EventInterpreter received: {}", event);
+
match event {
+ Event::Authenticated => {
+ if self.pacman != PackageManager::Off {
+ self.pacman.installed_packages().map(|packages| {
+ ctx.send(Command::SendInstalledPackages(packages));
+ }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err));
+ }
+
+ self.sysinfo.as_ref().map(|_| ctx.send(Command::SendSystemInfo));
+ }
+
Event::NotAuthenticated => {
info!("Trying to authenticate again...");
ctx.send(Command::Authenticate(None));
}
- Event::NewUpdatesReceived(ids) => {
- ctx.send(Command::StartDownload(ids));
+ Event::UpdatesReceived(requests) => {
+ for request in requests {
+ let id = request.requestId.clone();
+ match request.status {
+ Status::Pending => ctx.send(Command::StartDownload(id)),
+
+ Status::InFlight if self.pacman != PackageManager::Off => {
+ if self.pacman.is_installed(&request.packageId) {
+ let report = UpdateReport::single(id, UpdateResultCode::OK, "".to_string());
+ ctx.send(Command::SendUpdateReport(report));
+ } else {
+ ctx.send(Command::StartDownload(id));
+ }
+ }
+
+ _ => ()
+ }
+ }
}
Event::DownloadComplete(dl) => {
- if self.package_manager != PackageManager::Off {
- ctx.send(Command::StartInstall(dl));
+ if self.pacman != PackageManager::Off {
+ ctx.send(Command::StartInstall(dl.update_id.clone()));
}
}
- Event::InstallComplete(report) => {
+ Event::DownloadFailed(id, reason) => {
+ let report = UpdateReport::single(id, UpdateResultCode::GENERAL_ERROR, reason);
+ ctx.send(Command::SendUpdateReport(report));
+ }
+
+ Event::InstallComplete(report) | Event::InstallFailed(report) => {
ctx.send(Command::SendUpdateReport(report));
}
Event::UpdateReportSent => {
- if self.package_manager != PackageManager::Off {
- self.package_manager.installed_packages().map(|packages| {
+ if self.pacman != PackageManager::Off {
+ self.pacman.installed_packages().map(|packages| {
ctx.send(Command::SendInstalledPackages(packages));
}).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err));
}
@@ -75,7 +122,7 @@ pub struct CommandInterpreter;
impl Interpreter<Command, Interpret> for CommandInterpreter {
fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) {
- info!("Command received: {}", cmd);
+ info!("CommandInterpreter received: {}", cmd);
itx.send(Interpret { command: cmd, response_tx: None });
}
}
@@ -88,13 +135,12 @@ pub struct GlobalInterpreter<'t> {
pub config: Config,
pub token: Option<Cow<'t, AccessToken>>,
pub http_client: Box<Client>,
- pub rvi: Option<Services>,
- pub loopback_tx: Sender<Interpret>,
+ pub rvi: Option<Services>
}
impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> {
fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) {
- info!("Interpreter started: {}", interpret.command);
+ info!("GlobalInterpreter received: {}", interpret.command);
let (multi_tx, multi_rx) = chan::async::<Event>();
let outcome = match (self.token.as_ref(), self.config.auth.is_none()) {
@@ -109,21 +155,20 @@ impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> {
etx.send(ev.clone());
response_ev = Some(ev);
}
- info!("Interpreter finished.");
}
- Err(Error::Authorization(_)) => {
+ Err(Error::HttpAuth(resp)) => {
+ error!("HTTP authorization failed: {}", resp);
+ self.token = None;
let ev = Event::NotAuthenticated;
etx.send(ev.clone());
response_ev = Some(ev);
- error!("Interpreter authentication failed");
}
Err(err) => {
let ev = Event::Error(format!("{}", err));
etx.send(ev.clone());
response_ev = Some(ev);
- error!("Interpreter failed: {}", err);
}
}
@@ -138,16 +183,15 @@ impl<'t> GlobalInterpreter<'t> {
// always send at least one Event response
match cmd {
- Command::Authenticate(_) => etx.send(Event::Authenticated),
+ Command::Authenticate(_) => etx.send(Event::AlreadyAuthenticated),
- Command::GetNewUpdates => {
- let mut updates = try!(sota.get_pending_updates());
+ Command::GetUpdateRequests => {
+ let mut updates = try!(sota.get_update_requests());
if updates.is_empty() {
- etx.send(Event::NoNewUpdates);
+ etx.send(Event::NoUpdateRequests);
} else {
updates.sort_by_key(|u| u.installPos);
- let ids = updates.iter().map(|u| u.requestId.clone()).collect::<Vec<UpdateRequestId>>();
- etx.send(Event::NewUpdatesReceived(ids))
+ etx.send(Event::UpdatesReceived(updates));
}
}
@@ -159,18 +203,13 @@ impl<'t> GlobalInterpreter<'t> {
etx.send(Event::FoundInstalledPackages(packages));
}
- Command::RefreshSystemInfo(post) => {
- let info = try!(self.config.device.system_info.report());
- etx.send(Event::FoundSystemInfo(info.clone()));
- if post {
- let _ = sota.send_system_info(&info)
- .map_err(|err| etx.send(Event::Error(format!("{}", err))));
- }
+ Command::ListSystemInfo => {
+ let cmd = self.config.device.system_info.as_ref().expect("system_info command not set");
+ etx.send(Event::FoundSystemInfo(try!(system_info(&cmd))));
}
Command::SendInstalledPackages(packages) => {
- let _ = sota.send_installed_packages(&packages)
- .map_err(|err| error!("couldn't send installed packages: {}", err));
+ try!(sota.send_installed_packages(&packages));
etx.send(Event::InstalledPackagesSent);
}
@@ -178,39 +217,43 @@ impl<'t> GlobalInterpreter<'t> {
if let Some(ref rvi) = self.rvi {
let _ = rvi.remote.lock().unwrap().send_installed_software(sw);
}
+ etx.send(Event::InstalledSoftwareSent);
+ }
+
+ Command::SendSystemInfo => {
+ let cmd = self.config.device.system_info.as_ref().expect("system_info command not set");
+ try!(sota.send_system_info(&try!(system_info(&cmd))));
+ etx.send(Event::SystemInfoSent);
}
Command::SendUpdateReport(report) => {
if let Some(ref rvi) = self.rvi {
let _ = rvi.remote.lock().unwrap().send_update_report(report);
} else {
- let _ = sota.send_update_report(&report)
- .map_err(|err| error!("couldn't send update report: {}", err));
+ try!(sota.send_update_report(&report));
}
etx.send(Event::UpdateReportSent);
}
- Command::StartDownload(ref ids) => {
- for id in ids {
- etx.send(Event::DownloadingUpdate(id.clone()));
-
- if let Some(ref rvi) = self.rvi {
- let _ = rvi.remote.lock().unwrap().send_download_started(id.clone());
- } else {
- let _ = sota.download_update(id.clone())
- .map(|dl| etx.send(Event::DownloadComplete(dl)))
- .map_err(|err| etx.send(Event::DownloadFailed(id.clone(), format!("{}", err))));
- }
+ Command::StartDownload(id) => {
+ etx.send(Event::DownloadingUpdate(id.clone()));
+ if let Some(ref rvi) = self.rvi {
+ let _ = rvi.remote.lock().unwrap().send_download_started(id);
+ } else {
+ let _ = sota.download_update(id.clone())
+ .map(|dl| etx.send(Event::DownloadComplete(dl)))
+ .map_err(|err| etx.send(Event::DownloadFailed(id, format!("{}", err))));
}
}
- Command::StartInstall(dl) => {
- let _ = sota.install_update(dl)
+ Command::StartInstall(id) => {
+ etx.send(Event::InstallingUpdate(id.clone()));
+ let _ = sota.install_update(id)
.map(|report| etx.send(Event::InstallComplete(report)))
.map_err(|report| etx.send(Event::InstallFailed(report)));
}
- Command::Shutdown => std::process::exit(0),
+ Command::Shutdown => process::exit(0),
}
Ok(())
@@ -220,8 +263,10 @@ impl<'t> GlobalInterpreter<'t> {
match cmd {
Command::Authenticate(_) => {
let config = self.config.auth.clone().expect("trying to authenticate without auth config");
- self.set_client(Auth::Credentials(ClientId(config.client_id),
- ClientSecret(config.client_secret)));
+ self.set_client(Auth::Credentials(ClientCredentials {
+ client_id: config.client_id,
+ client_secret: config.client_secret,
+ }));
let server = config.server.join("/token").expect("couldn't build authentication url");
let token = try!(authenticate(server, self.http_client.as_ref()));
self.set_client(Auth::Token(token.clone()));
@@ -229,16 +274,9 @@ impl<'t> GlobalInterpreter<'t> {
etx.send(Event::Authenticated);
}
- Command::GetNewUpdates |
- Command::ListInstalledPackages |
- Command::RefreshSystemInfo(_) |
- Command::SendInstalledPackages(_) |
- Command::SendInstalledSoftware(_) |
- Command::SendUpdateReport(_) |
- Command::StartDownload(_) |
- Command::StartInstall(_) => etx.send(Event::NotAuthenticated),
+ Command::Shutdown => process::exit(0),
- Command::Shutdown => std::process::exit(0),
+ _ => etx.send(Event::NotAuthenticated)
}
Ok(())
@@ -270,15 +308,13 @@ mod tests {
fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) {
let (etx, erx) = chan::sync::<Event>(0);
let (ctx, crx) = chan::sync::<Command>(0);
- let (itx, _) = chan::sync::<Interpret>(0);
thread::spawn(move || {
let mut gi = GlobalInterpreter {
config: Config::default(),
token: Some(AccessToken::default().into()),
http_client: Box::new(TestClient::from(replies)),
- rvi: None,
- loopback_tx: itx,
+ rvi: None
};
gi.config.device.package_manager = pkg_mgr;
@@ -300,7 +336,7 @@ mod tests {
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
ctx.send(Command::Authenticate(None));
- assert_rx(erx, &[Event::Authenticated]);
+ assert_rx(erx, &[Event::AlreadyAuthenticated]);
}
#[test]
@@ -309,35 +345,26 @@ mod tests {
let pkg_mgr = PackageManager::new_tpm(true);
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
- ctx.send(Command::StartDownload(vec!["1".to_string(), "2".to_string()]));
+ ctx.send(Command::StartDownload("1".to_string()));
assert_rx(erx, &[
Event::DownloadingUpdate("1".to_string()),
Event::DownloadComplete(DownloadComplete {
update_id: "1".to_string(),
update_image: "/tmp/1".to_string(),
signature: "".to_string()
- }),
- Event::DownloadingUpdate("2".to_string()),
- Event::DownloadComplete(DownloadComplete {
- update_id: "2".to_string(),
- update_image: "/tmp/2".to_string(),
- signature: "".to_string()
- }),
+ })
]);
}
#[test]
- fn install_update() {
+ fn install_update_success() {
let replies = vec!["[]".to_string(); 10];
let pkg_mgr = PackageManager::new_tpm(true);
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
- ctx.send(Command::StartInstall(DownloadComplete {
- update_id: "1".to_string(),
- update_image: "/tmp/1".to_string(),
- signature: "".to_string()
- }));
+ ctx.send(Command::StartInstall("1".to_string()));
assert_rx(erx, &[
+ Event::InstallingUpdate("1".to_string()),
Event::InstallComplete(
UpdateReport::single("1".to_string(), UpdateResultCode::OK, "".to_string())
)
@@ -345,17 +372,14 @@ mod tests {
}
#[test]
- fn failed_installation() {
+ fn install_update_failed() {
let replies = vec!["[]".to_string(); 10];
let pkg_mgr = PackageManager::new_tpm(false);
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
- ctx.send(Command::StartInstall(DownloadComplete {
- update_id: "1".to_string(),
- update_image: "/tmp/1".to_string(),
- signature: "".to_string()
- }));
+ ctx.send(Command::StartInstall("1".to_string()));
assert_rx(erx, &[
+ Event::InstallingUpdate("1".to_string()),
Event::InstallFailed(
UpdateReport::single("1".to_string(), UpdateResultCode::INSTALL_FAILED, "failed".to_string())
)
diff --git a/src/main.rs b/src/main.rs
index 61fa02a..9821b2d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,71 +9,37 @@ extern crate rustc_serialize;
#[macro_use] extern crate sota;
extern crate time;
-use chan::{Sender, Receiver};
+use chan::{Sender, Receiver, WaitGroup};
use chan_signal::Signal;
use env_logger::LogBuilder;
use getopts::Options;
use log::{LogLevelFilter, LogRecord};
-use std::env;
+use std::{env, process, thread};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use sota::datatype::{Command, Config, Event, SystemInfo};
+use sota::datatype::{Command, Config, Event};
use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket};
use sota::broadcast::Broadcast;
use sota::http::{AuthClient, set_ca_certificates};
use sota::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, GlobalInterpreter};
-use sota::package_manager::PackageManager;
use sota::rvi::{Edge, Services};
macro_rules! exit {
- ($fmt:expr, $($arg:tt)*) => {{
+ ($code:expr, $fmt:expr, $($arg:tt)*) => {{
print!(concat!($fmt, "\n"), $($arg)*);
- std::process::exit(1);
+ process::exit($code);
}}
}
-fn start_signal_handler(signals: Receiver<Signal>) {
- loop {
- match signals.recv() {
- Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0),
- _ => ()
- }
- }
-}
-
-fn start_update_poller(interval: u64, itx: Sender<Interpret>) {
- let (etx, erx) = chan::async::<Event>();
- let tick = chan::tick(Duration::from_secs(interval));
- loop {
- let _ = tick.recv();
- itx.send(Interpret {
- command: Command::GetNewUpdates,
- response_tx: Some(Arc::new(Mutex::new(etx.clone())))
- });
- let _ = erx.recv();
- }
-}
-
-fn send_startup_commands(config: &Config, ctx: &Sender<Command>) {
- ctx.send(Command::Authenticate(None));
- ctx.send(Command::RefreshSystemInfo(true));
-
- if config.device.package_manager != PackageManager::Off {
- config.device.package_manager.installed_packages().map(|packages| {
- ctx.send(Command::SendInstalledPackages(packages));
- }).unwrap_or_else(|err| exit!("Couldn't get list of packages: {}", err));
- }
-}
-
fn main() {
- setup_logging();
+ let version = start_logging();
+ let config = build_config(&version);
- let config = build_config();
set_ca_certificates(Path::new(&config.device.certificates_path));
let (etx, erx) = chan::async::<Event>();
@@ -81,16 +47,25 @@ fn main() {
let (itx, irx) = chan::async::<Interpret>();
let mut broadcast = Broadcast::new(erx);
- send_startup_commands(&config, &ctx);
+ let wg = WaitGroup::new();
+
+ ctx.send(Command::Authenticate(None));
crossbeam::scope(|scope| {
- // Must subscribe to the signal before spawning ANY other threads
+ // subscribe to signals first
let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]);
scope.spawn(move || start_signal_handler(signals));
- let poll_tick = config.device.polling_interval;
- let poll_itx = itx.clone();
- scope.spawn(move || start_update_poller(poll_tick, poll_itx));
+ if config.core.polling {
+ let poll_tick = config.core.polling_sec;
+ let poll_itx = itx.clone();
+ let poll_wg = wg.clone();
+ scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg));
+ }
+
+ //
+ // start gateways
+ //
if config.gateway.console {
let cons_itx = itx.clone();
@@ -99,7 +74,7 @@ fn main() {
}
if config.gateway.dbus {
- let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway"));
+ let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for dbus gateway"));
let dbus_itx = itx.clone();
let dbus_sub = broadcast.subscribe();
let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() };
@@ -109,20 +84,21 @@ fn main() {
if config.gateway.http {
let http_itx = itx.clone();
let http_sub = broadcast.subscribe();
- let mut http = Http { server: config.network.http_server.clone() };
+ let mut http = Http { server: *config.network.http_server };
scope.spawn(move || http.start(http_itx, http_sub));
}
- let mut rvi = None;
- if config.gateway.rvi {
- let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway"));
- let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway"));
+ let rvi_services = if config.gateway.rvi {
+ let _ = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for rvi gateway"));
+ let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!(1, "{}", "rvi config required for rvi gateway"));
let rvi_edge = config.network.rvi_edge_server.clone();
let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone());
let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone());
scope.spawn(move || edge.start());
- rvi = Some(services);
- }
+ Some(services)
+ } else {
+ None
+ };
if config.gateway.socket {
let socket_itx = itx.clone();
@@ -142,48 +118,83 @@ fn main() {
scope.spawn(move || ws.start(ws_itx, ws_sub));
}
+ //
+ // start interpreters
+ //
+
let event_sub = broadcast.subscribe();
let event_ctx = ctx.clone();
let event_mgr = config.device.package_manager.clone();
+ let event_sys = config.device.system_info.clone();
+ let event_wg = wg.clone();
scope.spawn(move || EventInterpreter {
- package_manager: event_mgr
- }.run(event_sub, event_ctx));
+ pacman: event_mgr,
+ sysinfo: event_sys,
+ }.run(event_sub, event_ctx, event_wg));
let cmd_itx = itx.clone();
- scope.spawn(move || CommandInterpreter.run(crx, cmd_itx));
+ let cmd_wg = wg.clone();
+ scope.spawn(move || CommandInterpreter.run(crx, cmd_itx, cmd_wg));
scope.spawn(move || GlobalInterpreter {
config: config,
token: None,
http_client: Box::new(AuthClient::default()),
- rvi: rvi,
- loopback_tx: itx,
- }.run(irx, etx));
+ rvi: rvi_services,
+ }.run(irx, etx, wg));
scope.spawn(move || broadcast.start());
});
}
-fn setup_logging() {
- let version = option_env!("SOTA_VERSION").unwrap_or("?");
+fn start_logging() -> String {
+ let version = option_env!("SOTA_VERSION").unwrap_or("unknown");
+
let mut builder = LogBuilder::new();
builder.format(move |record: &LogRecord| {
let timestamp = format!("{}", time::now_utc().rfc3339());
format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args())
});
builder.filter(Some("hyper"), LogLevelFilter::Info);
+ builder.parse(&env::var("RUST_LOG").unwrap_or("INFO".to_string()));
+ builder.init().expect("builder already initialized");
+
+ version.to_string()
+}
- let _ = env::var("RUST_LOG").map(|level| builder.parse(&level));
- builder.init().expect("env_logger::init() called twice, blame the programmers.");
+fn start_signal_handler(signals: Receiver<Signal>) {
+ loop {
+ match signals.recv() {
+ Some(Signal::INT) | Some(Signal::TERM) => process::exit(0),
+ _ => ()
+ }
+ }
}
-fn build_config() -> Config {
+fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) {
+ info!("Polling for new updates every {} seconds.", interval);
+ let (etx, erx) = chan::async::<Event>();
+ let wait = Duration::from_secs(interval);
+ loop {
+ wg.wait(); // wait until not busy
+ thread::sleep(wait); // then wait `interval` seconds
+ itx.send(Interpret {
+ command: Command::GetUpdateRequests,
+ response_tx: Some(Arc::new(Mutex::new(etx.clone())))
+ }); // then request new updates
+ let _ = erx.recv(); // then wait for the response
+ }
+}
+
+fn build_config(version: &str) -> Config {
let args = env::args().collect::<Vec<String>>();
let program = args[0].clone();
let mut opts = Options::new();
- opts.optflag("h", "help", "print this help menu");
- opts.optopt("", "config", "change config path", "PATH");
+ opts.optflag("h", "help", "print this help menu then quit");
+ opts.optflag("p", "print", "print the parsed config then quit");
+ opts.optflag("v", "version", "print the version then quit");
+ opts.optopt("c", "config", "change config path", "PATH");
opts.optopt("", "auth-server", "change the auth server", "URL");
opts.optopt("", "auth-client-id", "change the auth client id", "ID");
@@ -191,6 +202,8 @@ fn build_config() -> Config {
opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH");
opts.optopt("", "core-server", "change the core server", "URL");
+ opts.optopt("", "core-polling", "toggle polling the core server for updates", "BOOL");
+ opts.optopt("", "core-polling-sec", "change the core polling interval", "SECONDS");
opts.optopt("", "dbus-name", "change the dbus registration name", "NAME");
opts.optopt("", "dbus-path", "change the dbus path", "PATH");
@@ -203,7 +216,6 @@ fn build_config() -> Config {
opts.optopt("", "device-vin", "change the device vin", "VIN");
opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH");
opts.optopt("", "device-package-manager", "change the package manager", "MANAGER");
- opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL");
opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH");
opts.optopt("", "device-system-info", "change the system information command", "PATH");
@@ -225,25 +237,37 @@ fn build_config() -> Config {
opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT");
let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string()));
- if matches.opt_present("h") {
- exit!("{}", opts.usage(&format!("Usage: {} [options]", program)));
+
+ if matches.opt_present("help") {
+ exit!(0, "{}", opts.usage(&format!("Usage: {} [options]", program)));
+ } else if matches.opt_present("version") {
+ exit!(0, "{}", version);
}
- let config_file = matches.opt_str("config").unwrap_or_else(|| {
- env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided."))
- });
- let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err));
+ let mut config = match matches.opt_str("config").or(env::var("SOTA_CONFIG").ok()) {
+ Some(file) => Config::load(&file).unwrap_or_else(|err| exit!(1, "{}", err)),
+ None => {
+ warn!("No config file given. Falling back to defaults.");
+ Config::default()
+ }
+ };
config.auth.as_mut().map(|auth_cfg| {
matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id);
matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret);
matches.opt_str("auth-server").map(|text| {
- auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err));
+ auth_cfg.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid auth-server URL: {}", err));
});
});
matches.opt_str("core-server").map(|text| {
- config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err));
+ config.core.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid core-server URL: {}", err));
+ });
+ matches.opt_str("core-polling").map(|polling| {
+ config.core.polling = polling.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling boolean: {}", err));
+ });
+ matches.opt_str("core-polling-sec").map(|secs| {
+ config.core.polling_sec = secs.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling-sec: {}", err));
});
config.dbus.as_mut().map(|dbus_cfg| {
@@ -253,7 +277,7 @@ fn build_config() -> Config {
matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr);
matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path);
matches.opt_str("dbus-timeout").map(|timeout| {
- dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err));
+ dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid dbus-timeout: {}", err));
});
});
@@ -261,48 +285,55 @@ fn build_config() -> Config {
matches.opt_str("device-vin").map(|vin| config.device.vin = vin);
matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path);
matches.opt_str("device-package-manager").map(|text| {
- config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err));
- });
- matches.opt_str("device-polling-interval").map(|interval| {
- config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err));
+ config.device.package_manager = text.parse().unwrap_or_else(|err| exit!(1, "Invalid device-package-manager: {}", err));
});
matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs);
- matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(cmd));
+ matches.opt_str("device-system-info").map(|cmd| {
+ config.device.system_info = if cmd.len() > 0 { Some(cmd) } else { None }
+ });
matches.opt_str("gateway-console").map(|console| {
- config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err));
+ config.gateway.console = console.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-console boolean: {}", err));
});
matches.opt_str("gateway-dbus").map(|dbus| {
- config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err));
+ config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-dbus boolean: {}", err));
});
matches.opt_str("gateway-http").map(|http| {
- config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err));
+ config.gateway.http = http.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-http boolean: {}", err));
});
matches.opt_str("gateway-rvi").map(|rvi| {
- config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err));
+ config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-rvi boolean: {}", err));
});
matches.opt_str("gateway-socket").map(|socket| {
- config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err));
+ config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-socket boolean: {}", err));
});
matches.opt_str("gateway-websocket").map(|websocket| {
- config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err));
+ config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-websocket boolean: {}", err));
});
- matches.opt_str("network-http-server").map(|server| config.network.http_server = server);
- matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server);
+ matches.opt_str("network-http-server").map(|addr| {
+ config.network.http_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-http-server: {}", err));
+ });
+ matches.opt_str("network-rvi-edge-server").map(|addr| {
+ config.network.rvi_edge_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-rvi-edge-server: {}", err));
+ });
matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path);
matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path);
matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server);
config.rvi.as_mut().map(|rvi_cfg| {
matches.opt_str("rvi-client").map(|url| {
- rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err));
+ rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-client URL: {}", err));
});
matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir);
matches.opt_str("rvi-timeout").map(|timeout| {
- rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err)));
+ rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-timeout: {}", err)));
});
});
+ if matches.opt_present("print") {
+ exit!(0, "{:#?}", config);
+ }
+
config
}
diff --git a/src/oauth2.rs b/src/oauth2.rs
index 0c5f152..e34e4c2 100644
--- a/src/oauth2.rs
+++ b/src/oauth2.rs
@@ -1,16 +1,19 @@
use rustc_serialize::json;
use datatype::{AccessToken, Error, Url};
-use http::Client;
+use http::{Client, Response};
/// Authenticate with the specified OAuth2 server to retrieve a new `AccessToken`.
pub fn authenticate(server: Url, client: &Client) -> Result<AccessToken, Error> {
debug!("authenticating at {}", server);
- let resp_rx = client.post(server, None);
+ let resp_rx = client.post(server, Some(br#"grant_type=client_credentials"#.to_vec()));
let resp = resp_rx.recv().expect("no authenticate response received");
- let data = try!(resp);
- let body = try!(String::from_utf8(data));
+ let body = match resp {
+ Response::Success(data) => try!(String::from_utf8(data.body)),
+ Response::Failed(data) => return Err(Error::from(data)),
+ Response::Error(err) => return Err(err)
+ };
Ok(try!(json::decode(&body)))
}
diff --git a/src/package_manager/deb.rs b/src/package_manager/deb.rs
index bba86e6..845c2b8 100644
--- a/src/package_manager/deb.rs
+++ b/src/package_manager/deb.rs
@@ -5,7 +5,7 @@ use package_manager::package_manager::{InstallOutcome, parse_package};
/// Returns a list of installed DEB packages with
-/// `dpkg-query -f='${Package} ${Version}\n -W`.
+/// `dpkg-query -f='${Package} ${Version}\n' -W`.
pub fn installed_packages() -> Result<Vec<Package>, Error> {
Command::new("dpkg-query").arg("-f='${Package} ${Version}\n'").arg("-W")
.output()
diff --git a/src/package_manager/package_manager.rs b/src/package_manager/package_manager.rs
index 09556a0..173a636 100644
--- a/src/package_manager/package_manager.rs
+++ b/src/package_manager/package_manager.rs
@@ -49,6 +49,12 @@ impl PackageManager {
}
}
+ /// Indicates whether a specific package is installed on the device.
+ pub fn is_installed(&self, package: &Package) -> bool {
+ self.installed_packages().map(|packages| packages.contains(package))
+ .unwrap_or_else(|err| { error!("couldn't get a list of packages: {}", err); false })
+ }
+
/// Returns a string representation of the package manager's extension.
pub fn extension(&self) -> String {
match *self {
diff --git a/src/package_manager/rpm.rs b/src/package_manager/rpm.rs
index 99aacbf..f2c8f2a 100644
--- a/src/package_manager/rpm.rs
+++ b/src/package_manager/rpm.rs
@@ -5,7 +5,7 @@ use package_manager::package_manager::{InstallOutcome, parse_package};
/// Returns a list of installed RPM packages with
-/// `rpm -qa ==queryformat ${NAME} ${VERSION}\n`.
+/// `rpm -qa --queryformat ${NAME} ${VERSION}\n`.
pub fn installed_packages() -> Result<Vec<Package>, Error> {
Command::new("rpm").arg("-qa").arg("--queryformat").arg("%{NAME} %{VERSION}\n")
.output()
@@ -23,17 +23,22 @@ pub fn installed_packages() -> Result<Vec<Package>, Error> {
})
}
-/// Installs a new RPM package.
+/// Installs a new RPM package with `rpm -Uvh --force <package-path>`.
pub fn install_package(path: &str) -> Result<InstallOutcome, InstallOutcome> {
let output = try!(Command::new("rpm").arg("-Uvh").arg("--force").arg(path)
.output()
- .map_err(|e| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", e))));
+ .map_err(|err| (UpdateResultCode::GENERAL_ERROR, format!("{:?}", err))));
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
match output.status.code() {
- Some(0) => Ok((UpdateResultCode::OK, stdout)),
+ Some(0) => {
+ let _ = Command::new("sync").status()
+ .map_err(|err| error!("couldn't run 'sync': {}", err));
+ Ok((UpdateResultCode::OK, stdout))
+ }
+
_ => {
let out = format!("stdout: {}\nstderr: {}", stdout, stderr);
if (&stderr).contains("already installed") {
diff --git a/src/rvi/edge.rs b/src/rvi/edge.rs
index cadea74..85d46df 100644
--- a/src/rvi/edge.rs
+++ b/src/rvi/edge.rs
@@ -4,25 +4,24 @@ use hyper::server::{Server as HyperServer, Request as HyperRequest};
use rustc_serialize::json;
use rustc_serialize::json::Json;
use std::{mem, str};
-use std::net::ToSocketAddrs;
-use datatype::{RpcRequest, RpcOk, RpcErr, Url};
+use datatype::{RpcRequest, RpcOk, RpcErr, SocketAddr, Url};
use http::{Server, ServerHandler};
use super::services::Services;
/// The HTTP server endpoint for `RVI` client communication.
pub struct Edge {
- rvi_edge: Url,
+ rvi_edge: SocketAddr,
services: Services,
}
impl Edge {
/// Create a new `Edge` by registering each `RVI` service.
- pub fn new(mut services: Services, rvi_edge: String, rvi_client: Url) -> Self {
+ pub fn new(mut services: Services, rvi_edge: SocketAddr, rvi_client: Url) -> Self {
services.register_services(|service| {
let req = RpcRequest::new("register_service", RegisterServiceRequest {
- network_address: rvi_edge.clone(),
+ network_address: format!("http://{}", rvi_edge),
service: service.to_string(),
});
let resp = req.send(rvi_client.clone())
@@ -32,14 +31,12 @@ impl Edge {
rpc_ok.result.expect("expected rpc_ok result").service
});
- Edge { rvi_edge: rvi_edge.parse().expect("couldn't parse edge server as url"), services: services }
+ Edge { rvi_edge: rvi_edge, services: services }
}
/// Start the HTTP server listening for incoming RVI client connections.
pub fn start(&mut self) {
- let mut addrs = self.rvi_edge.to_socket_addrs()
- .unwrap_or_else(|err| panic!("couldn't parse edge url: {}", err));
- let server = HyperServer::http(&addrs.next().expect("no SocketAddr found"))
+ let server = HyperServer::http(&*self.rvi_edge)
.unwrap_or_else(|err| panic!("couldn't start rvi edge server: {}", err));
let (addr, server) = server.handle(move |_| EdgeHandler::new(self.services.clone())).unwrap();
info!("RVI server edge listening at http://{}.", addr);
@@ -61,7 +58,6 @@ struct RegisterServiceResponse {
}
-
struct EdgeHandler {
services: Services,
resp_code: StatusCode,
diff --git a/src/rvi/parameters.rs b/src/rvi/parameters.rs
index 1fa1a87..e23ce9d 100644
--- a/src/rvi/parameters.rs
+++ b/src/rvi/parameters.rs
@@ -1,4 +1,3 @@
-use std::str;
use std::sync::Mutex;
use datatype::{ChunkReceived, Event, DownloadComplete, UpdateRequestId, UpdateAvailable};
@@ -23,7 +22,7 @@ pub struct Notify {
impl Parameter for Notify {
fn handle(&self, remote: &Mutex<RemoteServices>, _: &Mutex<Transfers>) -> Result<Option<Event>, String> {
remote.lock().unwrap().backend = Some(self.services.clone());
- Ok(Some(Event::NewUpdateAvailable(self.update_available.clone())))
+ Ok(Some(Event::UpdateAvailable(self.update_available.clone())))
}
}
diff --git a/src/sota.rs b/src/sota.rs
index 3dec33d..64abacd 100644
--- a/src/sota.rs
+++ b/src/sota.rs
@@ -1,11 +1,11 @@
use rustc_serialize::json;
+use std::{fs, io};
use std::fs::File;
-use std::io;
use std::path::PathBuf;
-use datatype::{Config, DeviceReport, DownloadComplete, Error, Package,
- PendingUpdateRequest, UpdateRequestId, UpdateReport, Url};
-use http::Client;
+use datatype::{Config, DownloadComplete, Error, Package,
+ UpdateReport, UpdateRequest, UpdateRequestId, Url};
+use http::{Client, Response};
/// Encapsulate the client configuration and HTTP client used for
@@ -22,39 +22,47 @@ impl<'c, 'h> Sota<'c, 'h> {
}
/// Takes a path and returns a new endpoint of the format
- /// `<Core server>/api/v1/device_updates/<uuid>/<path>`.
- pub fn endpoint(&self, path: &str) -> Url {
- let endpoint = if path.is_empty() {
- format!("/api/v1/device_updates/{}", self.config.device.uuid)
- } else {
- format!("/api/v1/device_updates/{}/{}", self.config.device.uuid, path)
- };
+ /// `<Core server>/api/v1/mydevice/<device-id>$path`.
+ fn endpoint(&self, path: &str) -> Url {
+ let endpoint = format!("/api/v1/mydevice/{}{}", self.config.device.uuid, path);
self.config.core.server.join(&endpoint).expect("couldn't build endpoint url")
}
- /// Query the Core server to identify any new package updates available.
- pub fn get_pending_updates(&mut self) -> Result<Vec<PendingUpdateRequest>, Error> {
- let resp_rx = self.client.get(self.endpoint(""), None);
- let resp = resp_rx.recv().expect("no get_package_updates response received");
- let data = try!(resp);
- let text = try!(String::from_utf8(data));
- Ok(try!(json::decode::<Vec<PendingUpdateRequest>>(&text)))
+ /// Returns the path to a package on the device.
+ fn package_path(&self, id: UpdateRequestId) -> Result<String, Error> {
+ let mut path = PathBuf::new();
+ path.push(&self.config.device.packages_dir);
+ path.push(id);
+ Ok(try!(path.to_str().ok_or(Error::Parse(format!("Path is not valid UTF-8: {:?}", path)))).to_string())
}
- /// Download a specific update from the Core server.
- pub fn download_update(&mut self, id: UpdateRequestId) -> Result<DownloadComplete, Error> {
- let resp_rx = self.client.get(self.endpoint(&format!("{}/download", id)), None);
- let resp = resp_rx.recv().expect("no download_package_update response received");
- let data = try!(resp);
+ /// Query the Core server for any pending or in-flight package updates.
+ pub fn get_update_requests(&mut self) -> Result<Vec<UpdateRequest>, Error> {
+ let resp_rx = self.client.get(self.endpoint("/updates"), None);
+ let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't get new updates".to_string())));
+ let data = match resp {
+ Response::Success(data) => data,
+ Response::Failed(data) => return Err(Error::from(data)),
+ Response::Error(err) => return Err(err)
+ };
- let mut path = PathBuf::new();
- path.push(&self.config.device.packages_dir);
- path.push(id.clone()); // TODO: Use Content-Disposition filename from request?
- let mut file = try!(File::create(path.as_path()));
+ let text = try!(String::from_utf8(data.body));
+ Ok(try!(json::decode::<Vec<UpdateRequest>>(&text)))
+ }
- let _ = io::copy(&mut &*data, &mut file);
- let path = try!(path.to_str().ok_or(Error::Parse(format!("Path is not valid UTF-8: {:?}", path))));
+ /// Download a specific update from the Core server.
+ pub fn download_update(&mut self, id: UpdateRequestId) -> Result<DownloadComplete, Error> {
+ let resp_rx = self.client.get(self.endpoint(&format!("/updates/{}/download", id)), None);
+ let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't download update".to_string())));
+ let data = match resp {
+ Response::Success(data) => data,
+ Response::Failed(data) => return Err(Error::from(data)),
+ Response::Error(err) => return Err(err)
+ };
+ let path = try!(self.package_path(id.clone()));
+ let mut file = try!(File::create(&path));
+ let _ = io::copy(&mut &*data.body, &mut file);
Ok(DownloadComplete {
update_id: id,
update_image: path.to_string(),
@@ -63,46 +71,54 @@ impl<'c, 'h> Sota<'c, 'h> {
}
/// Install an update using the package manager.
- pub fn install_update(&mut self, download: DownloadComplete) -> Result<UpdateReport, UpdateReport> {
+ pub fn install_update(&mut self, id: UpdateRequestId) -> Result<UpdateReport, UpdateReport> {
let ref pacman = self.config.device.package_manager;
- pacman.install_package(&download.update_image).and_then(|(code, output)| {
- Ok(UpdateReport::single(download.update_id.clone(), code, output))
+ let path = self.package_path(id.clone()).expect("install_update expects a valid path");
+ pacman.install_package(&path).and_then(|(code, output)| {
+ let _ = fs::remove_file(&path).unwrap_or_else(|err| error!("couldn't remove installed package: {}", err));
+ Ok(UpdateReport::single(id.clone(), code, output))
}).or_else(|(code, output)| {
- Err(UpdateReport::single(download.update_id.clone(), code, output))
+ Err(UpdateReport::single(id.clone(), code, output))
})
}
- /// Get a list of the currently installed packages from the package manager.
- pub fn get_installed_packages(&mut self) -> Result<Vec<Package>, Error> {
- Ok(try!(self.config.device.package_manager.installed_packages()))
- }
-
/// Send a list of the currently installed packages to the Core server.
pub fn send_installed_packages(&mut self, packages: &Vec<Package>) -> Result<(), Error> {
let body = try!(json::encode(packages));
- let resp_rx = self.client.put(self.endpoint("installed"), Some(body.into_bytes()));
- let _ = resp_rx.recv().expect("no update_installed_packages response received")
- .map_err(|err| error!("update_installed_packages failed: {}", err));
- Ok(())
+ let resp_rx = self.client.put(self.endpoint("/installed"), Some(body.into_bytes()));
+ let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't send installed packages".to_string())));
+
+ match resp {
+ Response::Success(_) => Ok(()),
+ Response::Failed(data) => Err(Error::from(data)),
+ Response::Error(err) => Err(err)
+ }
}
/// Send the outcome of a package update to the Core server.
pub fn send_update_report(&mut self, update_report: &UpdateReport) -> Result<(), Error> {
- let report = DeviceReport::new(&self.config.device.uuid, update_report);
- let body = try!(json::encode(&report));
- let url = self.endpoint(report.device);
+ let body = try!(json::encode(&update_report.operation_results));
+ let url = self.endpoint(&format!("/updates/{}", update_report.update_id));
let resp_rx = self.client.post(url, Some(body.into_bytes()));
- let resp = resp_rx.recv().expect("no send_install_report response received");
- let _ = try!(resp);
- Ok(())
+ let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't send update report".to_string())));
+
+ match resp {
+ Response::Success(_) => Ok(()),
+ Response::Failed(data) => Err(Error::from(data)),
+ Response::Error(err) => Err(err)
+ }
}
/// Send system information from the device to the Core server.
pub fn send_system_info(&mut self, body: &str) -> Result<(), Error> {
- let resp_rx = self.client.put(self.endpoint("system_info"), Some(body.as_bytes().to_vec()));
- let resp = resp_rx.recv().expect("no send_system_info response received");
- let _ = try!(resp);
- Ok(())
+ let resp_rx = self.client.put(self.endpoint("/system_info"), Some(body.as_bytes().to_vec()));
+ let resp = try!(resp_rx.recv().ok_or(Error::Client("couldn't send system info".to_string())));
+
+ match resp {
+ Response::Success(_) => Ok(()),
+ Response::Failed(data) => Err(Error::from(data)),
+ Response::Error(err) => Err(err)
+ }
}
}
@@ -112,19 +128,20 @@ mod tests {
use rustc_serialize::json;
use super::*;
- use datatype::{Config, Package, PendingUpdateRequest};
+ use datatype::{Config, Package, UpdateRequest, UpdateRequestStatus};
use http::TestClient;
#[test]
- fn test_get_pending_updates() {
- let pending_update = PendingUpdateRequest {
+ fn test_get_update_requests() {
+ let pending_update = UpdateRequest {
requestId: "someid".to_string(),
- installPos: 0,
+ status: UpdateRequestStatus::Pending,
packageId: Package {
name: "fake-pkg".to_string(),
version: "0.1.1".to_string()
},
+ installPos: 0,
createdAt: "2010-01-01".to_string()
};
@@ -134,7 +151,7 @@ mod tests {
client: &mut TestClient::from(vec![json.to_string()]),
};
- let updates: Vec<PendingUpdateRequest> = sota.get_pending_updates().unwrap();
+ let updates: Vec<UpdateRequest> = sota.get_update_requests().unwrap();
let ids: Vec<String> = updates.iter().map(|p| p.requestId.clone()).collect();
assert_eq!(ids, vec!["someid".to_string()])
}