summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShaun Taheri <shaun@advancedtelematic.com>2016-06-15 00:41:57 +0200
committerShaun Taheri <shaun@advancedtelematic.com>2016-06-17 18:35:41 +0200
commit9f56304fc2b3fdbacf5fcea971ee93d61a0d7e32 (patch)
treeed1078aa1864e8426ee88086ff19728fba534baa
parent23c3888851279494bfc52c11326d6bebe379b761 (diff)
downloadrvi_sota_client-9f56304fc2b3fdbacf5fcea971ee93d61a0d7e32.tar.gz
Variable arg support for AcceptUpdate Command
-rw-r--r--.gitignore2
-rw-r--r--src/datatype/command.rs144
-rw-r--r--src/datatype/error.rs9
-rw-r--r--src/datatype/event.rs2
-rw-r--r--src/http_client/http_client.rs2
-rw-r--r--src/http_client/test_client.rs2
-rw-r--r--src/interaction_library/broadcast.rs2
-rw-r--r--src/interaction_library/gateway.rs14
-rw-r--r--src/interaction_library/http.rs6
-rw-r--r--src/interpreter.rs260
-rw-r--r--src/main.rs48
-rw-r--r--src/ota_plus.rs36
-rw-r--r--src/package_manager/mod.rs1
-rw-r--r--src/package_manager/package_manager.rs21
-rw-r--r--src/package_manager/tpm.rs41
15 files changed, 326 insertions, 264 deletions
diff --git a/.gitignore b/.gitignore
index e140a79..58dd422 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,3 @@
target
pkg/ota_plus_client
-test_install_package_update_2
+.tmp*
diff --git a/src/datatype/command.rs b/src/datatype/command.rs
index e4ad023..3348c76 100644
--- a/src/datatype/command.rs
+++ b/src/datatype/command.rs
@@ -7,7 +7,7 @@ use datatype::{ClientCredentials, ClientId, ClientSecret, Error, UpdateRequestId
#[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)]
pub enum Command {
- AcceptUpdate(UpdateRequestId),
+ AcceptUpdates(Vec<UpdateRequestId>),
/* Add:
UpdateReport,
InstalledSoftware, // or reuse UpdateInstalledPackages
@@ -33,34 +33,18 @@ impl FromStr for Command {
named!(command <(Command, Vec<&str>)>, chain!(
space?
~ cmd: alt!(
- alt_complete!(tag!("Authenticate")
- | tag!("authenticate")
- | tag!("auth")
- ) => { |_| Command::Authenticate(None) }
-
- | alt_complete!(tag!("GetPendingUpdates")
- | tag!("getPendingUpdates")
- | tag!("pen")
- ) => { |_| Command::GetPendingUpdates }
-
- | alt_complete!(tag!("AcceptUpdate")
- | tag!("acceptUpdate")
- | tag!("acc")
- ) => { |_| Command::AcceptUpdate("".to_owned()) }
-
- | alt_complete!(tag!("ListInstalledPackages")
- | tag!("listInstalledPackages")
- | tag!("ls")
- ) => { |_| Command::ListInstalledPackages }
-
- | alt_complete!(tag!("Shutdown")
- | tag!("shutdown")
- ) => { |_| Command::Shutdown }
-
- | alt_complete!(tag!("UpdateInstalledPackages")
- | tag!("updateInstalledPackages")
- | tag!("up")
- ) => { |_| Command::UpdateInstalledPackages }
+ alt_complete!(tag!("AcceptUpdate") | tag!("acc"))
+ => { |_| Command::AcceptUpdates(Vec::new()) }
+ | alt_complete!(tag!("Authenticate") | tag!("auth"))
+ => { |_| Command::Authenticate(None) }
+ | alt_complete!(tag!("GetPendingUpdates") | tag!("pen"))
+ => { |_| Command::GetPendingUpdates }
+ | alt_complete!(tag!("ListInstalledPackages") | tag!("ls"))
+ => { |_| Command::ListInstalledPackages }
+ | alt_complete!(tag!("Shutdown") | tag!("shutdown"))
+ => { |_| Command::Shutdown }
+ | alt_complete!(tag!("UpdateInstalledPackages") | tag!("up"))
+ => { |_| Command::UpdateInstalledPackages }
)
~ args: arguments
~ alt!(eof | tag!("\r") | tag!("\n") | tag!(";")),
@@ -83,56 +67,39 @@ named!(arguments <&[u8], Vec<&str> >, chain!(
fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
match cmd {
- Command::AcceptUpdate(_) => {
- match args.len() {
- 0 => Err(Error::Command("usage: acc <id>".to_owned())),
- 1 => Ok(Command::AcceptUpdate(args[0].to_owned())),
- _ => Err(Error::Command(format!("unexpected acc args: {:?}", args))),
- }
- }
-
- Command::Authenticate(_) => {
- match args.len() {
- 0 => Ok(Command::Authenticate(None)),
- 1 => Err(Error::Command("usage: auth <user> <pass>".to_owned())),
- 2 => {
- let (user, pass) = (args[0].to_owned(), args[1].to_owned());
- Ok(Command::Authenticate(Some(ClientCredentials {
- id: ClientId(user),
- secret: ClientSecret(pass)
- })))
- }
- _ => Err(Error::Command(format!("unexpected auth args: {:?}", args))),
- }
- }
-
- Command::GetPendingUpdates => {
- match args.len() {
- 0 => Ok(Command::GetPendingUpdates),
- _ => Err(Error::Command(format!("unexpected pen args: {:?}", args))),
- }
- }
-
- Command::ListInstalledPackages => {
- match args.len() {
- 0 => Ok(Command::ListInstalledPackages),
- _ => Err(Error::Command(format!("unexpected ls args: {:?}", args))),
- }
- }
-
- Command::Shutdown => {
- match args.len() {
- 0 => Ok(Command::Shutdown),
- _ => Err(Error::Command(format!("unexpected shutdown args: {:?}", args))),
- }
- }
-
- Command::UpdateInstalledPackages => {
- match args.len() {
- 0 => Ok(Command::UpdateInstalledPackages),
- _ => Err(Error::Command(format!("unexpected up args: {:?}", args))),
- }
- }
+ Command::AcceptUpdates(_) => match args.len() {
+ 0 => Err(Error::Command("usage: acc [<id>]".to_string())),
+ _ => Ok(Command::AcceptUpdates(args.iter().map(|arg| String::from(*arg)).collect())),
+ },
+
+ Command::Authenticate(_) => match args.len() {
+ 0 => Ok(Command::Authenticate(None)),
+ 1 => Err(Error::Command("usage: auth <id> <secret>".to_string())),
+ 2 => Ok(Command::Authenticate(Some(ClientCredentials {
+ id: ClientId(args[0].to_string()),
+ secret: ClientSecret(args[1].to_string())}))),
+ _ => Err(Error::Command(format!("unexpected auth args: {:?}", args))),
+ },
+
+ Command::GetPendingUpdates => match args.len() {
+ 0 => Ok(Command::GetPendingUpdates),
+ _ => Err(Error::Command(format!("unexpected pen args: {:?}", args))),
+ },
+
+ Command::ListInstalledPackages => match args.len() {
+ 0 => Ok(Command::ListInstalledPackages),
+ _ => Err(Error::Command(format!("unexpected ls args: {:?}", args))),
+ },
+
+ Command::Shutdown => match args.len() {
+ 0 => Ok(Command::Shutdown),
+ _ => Err(Error::Command(format!("unexpected shutdown args: {:?}", args))),
+ },
+
+ Command::UpdateInstalledPackages => match args.len() {
+ 0 => Ok(Command::UpdateInstalledPackages),
+ _ => Err(Error::Command(format!("unexpected up args: {:?}", args))),
+ },
}
}
@@ -143,12 +110,13 @@ mod tests {
use datatype::{Command, ClientCredentials, ClientId, ClientSecret};
use nom::IResult;
+
#[test]
fn parse_command_test() {
assert_eq!(command(&b"auth foo bar"[..]),
IResult::Done(&b""[..], (Command::Authenticate(None), vec!["foo", "bar"])));
assert_eq!(command(&b"acc 1"[..]),
- IResult::Done(&b""[..], (Command::AcceptUpdate("".to_owned()), vec!["1"])));
+ IResult::Done(&b""[..], (Command::AcceptUpdates(Vec::new()), vec!["1"])));
assert_eq!(command(&b"ls;\n"[..]),
IResult::Done(&b"\n"[..], (Command::ListInstalledPackages, Vec::new())));
}
@@ -165,23 +133,20 @@ mod tests {
#[test]
fn accept_update_test() {
- assert_eq!("acc 1".parse::<Command>().unwrap(), Command::AcceptUpdate("1".to_owned()));
- assert_eq!("acceptUpdate 2".parse::<Command>().unwrap(), Command::AcceptUpdate("2".to_owned()));
- assert_eq!("AcceptUpdate 3".parse::<Command>().unwrap(), Command::AcceptUpdate("3".to_owned()));
- assert_eq!("acc some".parse::<Command>().unwrap(), Command::AcceptUpdate("some".to_owned()));
+ assert_eq!("acc 1".parse::<Command>().unwrap(), Command::AcceptUpdates(vec!["1".to_string()]));
+ assert_eq!("AcceptUpdate this".parse::<Command>().unwrap(), Command::AcceptUpdates(vec!["this".to_string()]));
+ assert_eq!("acc some more".parse::<Command>().unwrap(), Command::AcceptUpdates(vec!["some".to_string(), "more".to_string()]));
assert!("acc".parse::<Command>().is_err());
- assert!("acc more than one".parse::<Command>().is_err());
}
#[test]
fn authenticate_test() {
assert_eq!("auth".parse::<Command>().unwrap(), Command::Authenticate(None));
- assert_eq!("authenticate".parse::<Command>().unwrap(), Command::Authenticate(None));
assert_eq!("Authenticate".parse::<Command>().unwrap(), Command::Authenticate(None));
assert_eq!("auth user pass".parse::<Command>().unwrap(),
Command::Authenticate(Some(ClientCredentials {
- id: ClientId("user".to_owned()),
- secret: ClientSecret("pass".to_owned()),
+ id: ClientId("user".to_string()),
+ secret: ClientSecret("pass".to_string()),
})));
assert!("auth one".parse::<Command>().is_err());
assert!("auth one two three".parse::<Command>().is_err());
@@ -190,7 +155,6 @@ mod tests {
#[test]
fn get_pending_updates_test() {
assert_eq!("pen".parse::<Command>().unwrap(), Command::GetPendingUpdates);
- assert_eq!("getPendingUpdates".parse::<Command>().unwrap(), Command::GetPendingUpdates);
assert_eq!("GetPendingUpdates".parse::<Command>().unwrap(), Command::GetPendingUpdates);
assert!("pen some".parse::<Command>().is_err());
}
@@ -198,7 +162,6 @@ mod tests {
#[test]
fn list_installed_test() {
assert_eq!("ls".parse::<Command>().unwrap(), Command::ListInstalledPackages);
- assert_eq!("listInstalledPackages".parse::<Command>().unwrap(), Command::ListInstalledPackages);
assert_eq!("ListInstalledPackages".parse::<Command>().unwrap(), Command::ListInstalledPackages);
assert!("ls some".parse::<Command>().is_err());
}
@@ -214,7 +177,6 @@ mod tests {
#[test]
fn update_installed_test() {
assert_eq!("up".parse::<Command>().unwrap(), Command::UpdateInstalledPackages);
- assert_eq!("updateInstalledPackages".parse::<Command>().unwrap(), Command::UpdateInstalledPackages);
assert_eq!("UpdateInstalledPackages".parse::<Command>().unwrap(), Command::UpdateInstalledPackages);
assert!("up down".parse::<Command>().is_err());
}
diff --git a/src/datatype/error.rs b/src/datatype/error.rs
index 388ee9f..4ad3307 100644
--- a/src/datatype/error.rs
+++ b/src/datatype/error.rs
@@ -13,6 +13,7 @@ use datatype::Event;
use rustc_serialize::json::{EncoderError as JsonEncoderError, DecoderError as JsonDecoderError};
use ws::Error as WebsocketError;
use super::super::http_client::auth_client::AuthHandler;
+use super::super::interpreter::Wrapped;
#[derive(Debug)]
@@ -31,6 +32,7 @@ pub enum Error {
ParseError(String),
RecvError(RecvError),
SendErrorEvent(SendError<Event>),
+ SendErrorWrapped(SendError<Wrapped>),
TomlParserErrors(Vec<TomlParserError>),
TomlDecodeError(TomlDecodeError),
UrlParseError(UrlParseError),
@@ -43,6 +45,12 @@ impl From<SendError<Event>> for Error {
}
}
+impl From<SendError<Wrapped>> for Error {
+ fn from(e: SendError<Wrapped>) -> Error {
+ Error::SendErrorWrapped(e)
+ }
+}
+
impl From<RecvError> for Error {
fn from(e: RecvError) -> Error {
Error::RecvError(e)
@@ -111,6 +119,7 @@ impl Display for Error {
Error::ParseError(ref s) => s.clone(),
Error::RecvError(ref s) => format!("Recv error: {}", s.clone()),
Error::SendErrorEvent(ref s) => format!("Send error for Event: {}", s.clone()),
+ Error::SendErrorWrapped(ref s) => format!("Send error for Wrapped: {}", s.clone()),
Error::TomlDecodeError(ref e) => format!("Toml decode error: {}", e.clone()),
Error::TomlParserErrors(ref e) => format!("Toml parser errors: {:?}", e.clone()),
Error::UrlParseError(ref s) => format!("Url parse error: {}", s.clone()),
diff --git a/src/datatype/event.rs b/src/datatype/event.rs
index ac36697..3b7e7a8 100644
--- a/src/datatype/event.rs
+++ b/src/datatype/event.rs
@@ -6,8 +6,8 @@ use datatype::{UpdateRequestId, UpdateState, Package};
#[derive(RustcEncodable, RustcDecodable, Debug, Clone, PartialEq, Eq)]
pub enum Event {
Ok,
+ Authenticated,
NotAuthenticated,
- NewUpdateAvailable(UpdateRequestId),
/* TODO: Add:
DownloadComplete(UpdateRequestId),
GetInstalledSoftware,
diff --git a/src/http_client/http_client.rs b/src/http_client/http_client.rs
index 938f5c0..a2039bd 100644
--- a/src/http_client/http_client.rs
+++ b/src/http_client/http_client.rs
@@ -11,6 +11,8 @@ pub trait HttpClient {
}
fn chan_request(&self, req: HttpRequest, resp_tx: Sender<HttpResponse>);
+
+ fn is_testing(&self) -> bool { false }
}
#[derive(Debug)]
diff --git a/src/http_client/test_client.rs b/src/http_client/test_client.rs
index 7974144..a2a3017 100644
--- a/src/http_client/test_client.rs
+++ b/src/http_client/test_client.rs
@@ -26,4 +26,6 @@ impl HttpClient for TestHttpClient {
None => resp_tx.send(Err(Error::ClientError(req.url.to_string())))
}.map_err(|err| error!("couldn't send test chan_request response: {}", err));
}
+
+ fn is_testing(&self) -> bool { true }
}
diff --git a/src/interaction_library/broadcast.rs b/src/interaction_library/broadcast.rs
index 3c712bf..b35e3a9 100644
--- a/src/interaction_library/broadcast.rs
+++ b/src/interaction_library/broadcast.rs
@@ -23,7 +23,7 @@ impl<A: Clone> Broadcast<A> {
}
}
},
- Err(e) => error!("Error receiving: {}", e)
+ Err(e) => trace!("Error receiving: {}", e)
}
}
}
diff --git a/src/interaction_library/gateway.rs b/src/interaction_library/gateway.rs
index 3ecdaf6..ce58886 100644
--- a/src/interaction_library/gateway.rs
+++ b/src/interaction_library/gateway.rs
@@ -22,20 +22,16 @@ pub trait Gateway<C, E>: Sized + Send + Sync + 'static
thread::spawn(move || {
loop {
- gateway.next()
- .map(|i| {
- tx.send(i)
- .map_err(|err| error!("Error sending command: {:?}", err))
- });
+ gateway.next().map(|i| {
+ tx.send(i).map_err(|err| error!("Error sending command: {:?}", err))
+ });
}
});
thread::spawn(move || {
loop {
- match rx.recv() {
- Ok(e) => global.pulse(e),
- Err(err) => error!("Error receiving event: {:?}", err),
- }
+ let _ = rx.recv().map(|e| global.pulse(e))
+ .map_err(|err| trace!("Error receiving event: {:?}", err));
}
});
}
diff --git a/src/interaction_library/http.rs b/src/interaction_library/http.rs
index 6575710..f506fa0 100644
--- a/src/interaction_library/http.rs
+++ b/src/interaction_library/http.rs
@@ -221,8 +221,8 @@ mod tests {
loop {
let w = wrx.recv().unwrap();
match w.cmd {
- Command::AcceptUpdate(id) => {
- let ev = Event::Error(id);
+ Command::AcceptUpdates(ids) => {
+ let ev = Event::Error(ids.first().unwrap().to_owned());
match w.etx {
Some(etx) => etx.lock().unwrap().send(ev).unwrap(),
None => panic!("expected transmitter"),
@@ -238,7 +238,7 @@ mod tests {
for id in 0..10 {
scope.spawn(move || {
let client = AuthClient::new(Auth::None);
- let cmd = Command::AcceptUpdate(format!("{}", id));
+ let cmd = Command::AcceptUpdates(vec!(format!("{}", id)));
let req_body = json::encode(&cmd).unwrap();
let req = HttpRequest {
diff --git a/src/interpreter.rs b/src/interpreter.rs
index 9a76912..00e5adc 100644
--- a/src/interpreter.rs
+++ b/src/interpreter.rs
@@ -1,12 +1,11 @@
use std::borrow::Cow;
use std::process::exit;
-use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver, channel};
use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config,
- Error, Event, UpdateState};
+ Error, Event, UpdateState, UpdateRequestId};
use datatype::Command::*;
-use http_client::AuthClient;
+use http_client::{AuthClient, HttpClient};
use interaction_library::gateway::Interpret;
use oauth2::authenticate;
use ota_plus::OTA;
@@ -36,10 +35,6 @@ impl Interpreter<Event, Command> for EventInterpreter {
ctx.send(Command::Authenticate(None))
}
- Event::NewUpdateAvailable(ref id) => {
- ctx.send(Command::AcceptUpdate(id.clone()))
- }
-
/* TODO: Handle PackageManger events
Event::DownloadComplete => {
env.config.ota.package_manager.install_package(p);
@@ -58,89 +53,64 @@ impl Interpreter<Event, Command> for EventInterpreter {
}
-pub type Wrapped = Interpret<Command, Event>;
+pub struct CommandInterpreter;
-pub struct WrappedInterpreter<'t> {
- pub config: Config,
- pub access_token: Option<Cow<'t, AccessToken>>,
- pub wtx: Sender<Wrapped>,
+impl Interpreter<Command, Wrapped> for CommandInterpreter {
+ fn interpret(&mut self, cmd: Command, wtx: &Sender<Wrapped>) {
+ info!("Command interpreter: {:?}", cmd);
+ let _ = wtx.send(Wrapped { cmd: cmd, etx: None })
+ .map_err(|err| panic!("couldn't forward command: {}", err));
+ }
}
-impl<'t> Interpreter<Wrapped, Event> for WrappedInterpreter<'t> {
- fn interpret(&mut self, w: Wrapped, global_tx: &Sender<Event>) {
- fn send_global(ev: Event, global_tx: &Sender<Event>) {
- let _ = global_tx.send(ev).map_err(|err| panic!("couldn't send global response: {}", err));
- }
-
- fn send_local(ev: Event, local_tx: Option<Arc<Mutex<Sender<Event>>>>) {
- if let Some(local) = local_tx {
- let _ = local.lock().unwrap().send(ev)
- .map_err(|err| panic!("couldn't send local response: {}", err));
- }
- }
-
- info!("Interpreting wrapped command: {:?}", w.cmd);
- let (multi_tx, multi_rx): (Sender<Event>, Receiver<Event>) = channel();
- match match self.access_token.to_owned() {
- Some(token) => self.authenticated(w.cmd.clone(), token.into_owned(), multi_tx),
- None => self.unauthenticated(w.cmd.clone(), multi_tx)
- }{
- Ok(_) => {
- let mut last_ev = None;
- for ev in multi_rx {
- send_global(ev.clone(), &global_tx);
- last_ev = Some(ev);
- }
- match last_ev {
- Some(ev) => send_local(ev, w.etx),
- None => panic!("no local event to send back")
- };
- }
- Err(Error::AuthorizationError(_)) => {
- debug!("retry authorization and request");
- let a = Wrapped { cmd: Command::Authenticate(None), etx: None };
- let _ = self.wtx.send(a).map_err(|err| panic!("couldn't retry authentication: {}", err));
- let _ = self.wtx.send(w).map_err(|err| panic!("couldn't retry request: {}", err));
- }
+pub type Wrapped = Interpret<Command, Event>;
- Err(err) => {
- let ev = Event::Error(format!("{}", err));
- send_global(ev.clone(), &global_tx);
- send_local(ev, w.etx);
- }
+impl Wrapped {
+ fn publish(&self, ev: Event) {
+ if let Some(ref etx) = self.etx {
+ let _ = etx.lock().unwrap().send(ev).map_err(|err| panic!("couldn't publish event: {}", err));
}
}
}
+pub struct WrappedInterpreter<'t> {
+ pub config: Config,
+ pub token: Option<Cow<'t, AccessToken>>,
+ pub client: Box<HttpClient>,
+ pub loopback: Sender<Wrapped>,
+}
+
impl<'t> WrappedInterpreter<'t> {
- fn authenticated(&self, cmd: Command, token: AccessToken, etx: Sender<Event>) -> Result<(), Error> {
- let client = AuthClient::new(Auth::Token(token));
- let mut ota = OTA::new(&self.config, &client);
+ fn authenticated(&self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> {
+ let mut ota = OTA::new(&self.config, self.client.as_ref());
// always send at least one Event response
match cmd {
- Authenticate(_) => try!(etx.send(Event::Ok)),
-
- AcceptUpdate(ref id) => {
- try!(etx.send(Event::UpdateStateChanged(id.clone(), UpdateState::Downloading)));
- let report = try!(ota.install_package_update(&id, &etx));
- try!(ota.send_install_report(&report));
- info!("Update finished. Report sent: {:?}", report);
- try!(ota.update_installed_packages())
+ AcceptUpdates(ids) => {
+ for id in ids {
+ info!("Accepting id {}", id);
+ try!(etx.send(Event::UpdateStateChanged(id.clone(), UpdateState::Downloading)));
+ let report = try!(ota.install_package_update(&id, &etx));
+ try!(ota.send_install_report(&report));
+ info!("Install Report for {}: {:?}", id, report);
+ try!(ota.update_installed_packages())
+ }
}
+ Authenticate(_) => try!(etx.send(Event::Ok)),
+
GetPendingUpdates => {
let mut updates = try!(ota.get_package_updates());
- if updates.len() == 0 {
- return Ok(try!(etx.send(Event::Ok)));
- }
- updates.sort_by_key(|update| update.installPos);
- info!("New package updates available: {:?}", updates);
- for update in updates.iter() {
- try!(etx.send(Event::NewUpdateAvailable(update.requestId.clone())))
+ if updates.len() > 0 {
+ updates.sort_by_key(|u| u.installPos);
+ info!("New package updates available: {:?}", updates);
+ let ids: Vec<UpdateRequestId> = updates.iter().map(|u| u.requestId.clone()).collect();
+ let w = Wrapped { cmd: Command::AcceptUpdates(ids), etx: None };
+ try!(self.loopback.send(w))
}
+ try!(etx.send(Event::Ok));
}
ListInstalledPackages => {
@@ -148,13 +118,13 @@ impl<'t> WrappedInterpreter<'t> {
try!(etx.send(Event::FoundInstalledPackages(pkgs)))
}
+ Shutdown => exit(0),
+
UpdateInstalledPackages => {
try!(ota.update_installed_packages());
try!(etx.send(Event::Ok));
info!("Posted installed packages to the server.")
}
-
- Shutdown => exit(0),
}
Ok(())
@@ -163,15 +133,12 @@ impl<'t> WrappedInterpreter<'t> {
fn unauthenticated(&mut self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> {
match cmd {
Authenticate(_) => {
- let client = AuthClient::new(Auth::Credentials(
- ClientId(self.config.auth.client_id.clone()),
- ClientSecret(self.config.auth.secret.clone())));
- let token = try!(authenticate(&self.config.auth, &client));
- self.access_token = Some(token.into());
- try!(etx.send(Event::Ok));
+ let token = try!(authenticate(&self.config.auth, self.client.as_ref()));
+ self.token = Some(token.into());
+ try!(etx.send(Event::Authenticated));
}
- AcceptUpdate(_) |
+ AcceptUpdates(_) |
GetPendingUpdates |
ListInstalledPackages |
UpdateInstalledPackages => try!(etx.send(Event::NotAuthenticated)),
@@ -182,3 +149,136 @@ impl<'t> WrappedInterpreter<'t> {
Ok(())
}
}
+
+impl<'t> Interpreter<Wrapped, Event> for WrappedInterpreter<'t> {
+ fn interpret(&mut self, w: Wrapped, global_tx: &Sender<Event>) {
+ info!("Wrapped interpreter: {:?}", w.cmd);
+ let broadcast = |ev: Event| {
+ let _ = global_tx.send(ev).map_err(|err| panic!("couldn't broadcast event: {}", err));
+ };
+
+ let (etx, erx): (Sender<Event>, Receiver<Event>) = channel();
+ let outcome = match self.token.to_owned() {
+ Some(token) => {
+ if !self.client.is_testing() {
+ self.client = Box::new(AuthClient::new(Auth::Token(token.into_owned())));
+ }
+ self.authenticated(w.cmd.clone(), etx)
+ }
+
+ None => {
+ if !self.client.is_testing() {
+ self.client = Box::new(AuthClient::new(Auth::Credentials(
+ ClientId(self.config.auth.client_id.clone()),
+ ClientSecret(self.config.auth.secret.clone()))));
+ }
+ self.unauthenticated(w.cmd.clone(), etx)
+ }
+ };
+
+ match outcome {
+ Ok(_) => {
+ let mut last_ev = None;
+ for ev in erx {
+ broadcast(ev.clone());
+ last_ev = Some(ev);
+ }
+ match last_ev {
+ Some(ev) => w.publish(ev),
+ None => panic!("no local event to send back")
+ };
+ }
+
+ Err(Error::AuthorizationError(_)) => {
+ debug!("retry authorization and request");
+ let a = Wrapped { cmd: Command::Authenticate(None), etx: None };
+ let _ = self.loopback.send(a).map_err(|err| panic!("couldn't retry authentication: {}", err));
+ let _ = self.loopback.send(w).map_err(|err| panic!("couldn't retry request: {}", err));
+ }
+
+ Err(err) => {
+ let ev = Event::Error(format!("{}", err));
+ broadcast(ev.clone());
+ w.publish(ev);
+ }
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use std::thread;
+ use std::sync::mpsc::{channel, Sender, Receiver};
+
+ use super::*;
+ use datatype::{AccessToken, Command, Config, Event, UpdateState};
+ use http_client::test_client::TestHttpClient;
+ use package_manager::PackageManager;
+ use package_manager::tpm::assert_rx;
+
+
+ fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) {
+ let (ctx, crx): (Sender<Command>, Receiver<Command>) = channel();
+ let (etx, erx): (Sender<Event>, Receiver<Event>) = channel();
+ let (wtx, _): (Sender<Wrapped>, Receiver<Wrapped>) = channel();
+
+ thread::spawn(move || {
+ let mut wi = WrappedInterpreter {
+ config: Config::default(),
+ token: Some(AccessToken::default().into()),
+ client: Box::new(TestHttpClient::from(replies)),
+ loopback: wtx,
+ };
+ wi.config.ota.package_manager = pkg_mgr;
+ loop {
+ match crx.recv().expect("couldn't receive cmd") {
+ Command::Shutdown => break,
+ cmd @ _ => wi.interpret(Wrapped { cmd: cmd, etx: None }, &etx)
+ }
+ }
+ });
+
+ (ctx, erx)
+ }
+
+ #[test]
+ fn already_authenticated() {
+ let (ctx, erx) = new_interpreter(Vec::new(), PackageManager::new_file(true));
+ ctx.send(Command::Authenticate(None)).unwrap();
+ for ev in erx.recv() {
+ assert_eq!(ev, Event::Ok);
+ }
+ ctx.send(Command::Shutdown).unwrap();
+ }
+
+ #[test]
+ fn accept_updates() {
+ let replies = vec!["[]".to_string(); 10];
+ let (ctx, erx) = new_interpreter(replies, PackageManager::new_file(true));
+
+ ctx.send(Command::AcceptUpdates(vec!["1".to_string(), "2".to_string()])).unwrap();
+ assert_rx(erx, &[
+ Event::UpdateStateChanged("1".to_string(), UpdateState::Downloading),
+ Event::UpdateStateChanged("1".to_string(), UpdateState::Installing),
+ Event::UpdateStateChanged("1".to_string(), UpdateState::Installed),
+ Event::UpdateStateChanged("2".to_string(), UpdateState::Downloading),
+ Event::UpdateStateChanged("2".to_string(), UpdateState::Installing),
+ Event::UpdateStateChanged("2".to_string(), UpdateState::Installed),
+ ]);
+ ctx.send(Command::Shutdown).unwrap();
+ }
+
+ #[test]
+ fn failed_updates() {
+ let replies = vec!["[]".to_string(); 10];
+ let pkg_mgr = PackageManager::new_file(false);
+ let (ctx, erx) = new_interpreter(replies, pkg_mgr);
+
+ ctx.send(Command::AcceptUpdates(vec!["1".to_string()])).unwrap();
+ assert_rx(erx, &[
+ Event::Error("IO error: No such file or directory (os error 2)".to_owned()),
+ ]);
+ ctx.send(Command::Shutdown).unwrap();
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 039fc6a..bba5132 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,11 +4,11 @@ extern crate crossbeam;
extern crate env_logger;
extern crate getopts;
extern crate hyper;
+#[macro_use] extern crate libotaplus;
#[macro_use] extern crate log;
extern crate rustc_serialize;
extern crate time;
extern crate ws;
-#[macro_use] extern crate libotaplus;
use chan::Receiver as ChanReceiver;
use chan_signal::Signal;
@@ -20,11 +20,12 @@ use std::sync::mpsc::{Sender, Receiver, channel};
use std::thread;
use std::time::Duration;
-use libotaplus::datatype::{config, Command, Config, Event, Url};
+use libotaplus::datatype::{config, Auth, Command, Config, Event, Url};
+use libotaplus::http_client::AuthClient;
use libotaplus::interaction_library::{Console, Gateway, Http, Websocket};
use libotaplus::interaction_library::broadcast::Broadcast;
-use libotaplus::interaction_library::gateway::Interpret;
-use libotaplus::interpreter::{EventInterpreter, Interpreter, Wrapped, WrappedInterpreter};
+use libotaplus::interpreter::{EventInterpreter, CommandInterpreter, Interpreter,
+ Wrapped, WrappedInterpreter};
use libotaplus::package_manager::PackageManager;
@@ -41,16 +42,8 @@ fn spawn_signal_handler(signals: ChanReceiver<Signal>, ctx: Sender<Command>) {
fn spawn_update_poller(ctx: Sender<Command>, interval: u64) {
loop {
- let _ = ctx.send(Command::GetPendingUpdates);
thread::sleep(Duration::from_secs(interval));
- }
-}
-
-fn spawn_command_forwarder(crx: Receiver<Command>, wtx: Sender<Wrapped>) {
- loop {
- let _ = crx.recv()
- .map(|cmd| wtx.send(Interpret { cmd: cmd, etx: None }).unwrap())
- .map_err(|err| panic!("couldn't receive command to forward: {:?}", err));
+ let _ = ctx.send(Command::GetPendingUpdates);
}
}
@@ -80,21 +73,6 @@ fn main() {
let poll_interval = config.ota.polling_interval;
scope.spawn(move || spawn_update_poller(poll_ctx, poll_interval));
- let cmd_wtx = wtx.clone();
- scope.spawn(move || spawn_command_forwarder(crx, cmd_wtx));
-
- let event_sub = broadcast.subscribe();
- let event_ctx = ctx.clone();
- let mut event_int = EventInterpreter;
- scope.spawn(move || event_int.run(event_sub, event_ctx));
-
- let mut wrapped_int = WrappedInterpreter {
- config: config.clone(),
- access_token: None,
- wtx: wtx.clone(),
- };
- scope.spawn(move || wrapped_int.run(wrx, etx));
-
let ws_wtx = wtx.clone();
let ws_sub = broadcast.subscribe();
scope.spawn(move || Websocket::run(ws_wtx, ws_sub));
@@ -112,6 +90,20 @@ fn main() {
scope.spawn(move || Console::run(cons_wtx, cons_sub));
}
+ let event_sub = broadcast.subscribe();
+ let event_ctx = ctx.clone();
+ scope.spawn(move || EventInterpreter.run(event_sub, event_ctx));
+
+ let cmd_wtx = wtx.clone();
+ scope.spawn(move || CommandInterpreter.run(crx, cmd_wtx));
+
+ scope.spawn(move || WrappedInterpreter {
+ config: config,
+ token: None,
+ client: Box::new(AuthClient::new(Auth::None)),
+ loopback: wtx,
+ }.run(wrx, etx));
+
scope.spawn(move || broadcast.start());
});
}
diff --git a/src/ota_plus.rs b/src/ota_plus.rs
index 6d38da1..c139248 100644
--- a/src/ota_plus.rs
+++ b/src/ota_plus.rs
@@ -142,14 +142,14 @@ impl<'c, 'h> OTA<'c, 'h> {
#[cfg(test)]
mod tests {
- use std::fmt::Debug;
- use std::sync::mpsc::{channel, Receiver};
+ use std::sync::mpsc::channel;
use rustc_serialize::json;
use super::*;
use datatype::{Config, Event, Package, PendingUpdateRequest, UpdateResultCode, UpdateState};
use http_client::TestHttpClient;
use package_manager::PackageManager;
+ use package_manager::tpm::assert_rx;
#[test]
@@ -185,20 +185,6 @@ mod tests {
assert_eq!(expect, format!("{}", ota.download_package_update(&"0".to_string()).unwrap_err()));
}
- fn assert_receiver_eq<X: PartialEq + Debug>(rx: Receiver<X>, xs: &[X]) {
- let mut xs = xs.iter();
- while let Ok(x) = rx.try_recv() {
- if let Some(y) = xs.next() {
- assert_eq!(x, *y)
- } else {
- panic!("assert_receiver_eq: never nexted `{:?}`", x)
- }
- }
- if let Some(x) = xs.next() {
- panic!("assert_receiver_eq: never received `{:?}`", x)
- }
- }
-
#[test]
fn test_install_package_update_0() {
let mut ota = OTA {
@@ -211,19 +197,16 @@ mod tests {
UpdateResultCode::GENERAL_ERROR);
let expect = r#"ClientError("http://127.0.0.1:8080/api/v1/vehicle_updates/V1234567890123456/0/download")"#;
- assert_receiver_eq(rx, &[
+ assert_rx(rx, &[
Event::UpdateErrored("0".to_string(), String::from(expect))
]);
}
#[test]
fn test_install_package_update_1() {
- let mut config = Config::default();
+ let mut config = Config::default();
config.ota.packages_dir = "/tmp/".to_string();
- config.ota.package_manager = PackageManager::File {
- filename: "test_install_package_update_1".to_string(),
- succeeds: false
- };
+ config.ota.package_manager = PackageManager::new_file(false);
let mut ota = OTA {
config: &config,
@@ -234,7 +217,7 @@ mod tests {
assert_eq!(report.unwrap().operation_results.pop().unwrap().result_code,
UpdateResultCode::INSTALL_FAILED);
- assert_receiver_eq(rx, &[
+ assert_rx(rx, &[
Event::UpdateStateChanged("0".to_string(), UpdateState::Installing),
// XXX: Not very helpful message?
Event::UpdateErrored("0".to_string(), r#"INSTALL_FAILED: "failed""#.to_string())
@@ -245,10 +228,7 @@ mod tests {
fn test_install_package_update_2() {
let mut config = Config::default();
config.ota.packages_dir = "/tmp/".to_string();
- config.ota.package_manager = PackageManager::File {
- filename: "test_install_package_update_2".to_string(),
- succeeds: true
- };
+ config.ota.package_manager = PackageManager::new_file(true);
let replies = vec![
"[]".to_string(),
@@ -263,7 +243,7 @@ mod tests {
assert_eq!(report.unwrap().operation_results.pop().unwrap().result_code,
UpdateResultCode::OK);
- assert_receiver_eq(rx, &[
+ assert_rx(rx, &[
Event::UpdateStateChanged("0".to_string(), UpdateState::Installing),
Event::UpdateStateChanged("0".to_string(), UpdateState::Installed)
]);
diff --git a/src/package_manager/mod.rs b/src/package_manager/mod.rs
index b3db086..2ba31be 100644
--- a/src/package_manager/mod.rs
+++ b/src/package_manager/mod.rs
@@ -1,4 +1,5 @@
pub use self::package_manager::PackageManager;
+pub use self::tpm::assert_rx;
pub mod dpkg;
pub mod package_manager;
diff --git a/src/package_manager/package_manager.rs b/src/package_manager/package_manager.rs
index 104f18b..a31d303 100644
--- a/src/package_manager/package_manager.rs
+++ b/src/package_manager/package_manager.rs
@@ -1,8 +1,14 @@
+extern crate tempfile;
+
use rustc_serialize::{Decoder, Decodable};
+use std::env::temp_dir;
use datatype::{Error, Package, UpdateResultCode};
use package_manager::{dpkg, rpm, tpm};
+use tempfile::NamedTempFile;
+
+pub type InstallOutcome = (UpdateResultCode, String);
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PackageManager {
@@ -11,9 +17,20 @@ pub enum PackageManager {
File { filename: String, succeeds: bool }
}
-pub type InstallOutcome = (UpdateResultCode, String);
-
impl PackageManager {
+ pub fn new_file(succeeds: bool) -> Self {
+ PackageManager::File {
+ filename: NamedTempFile::new_in(temp_dir()).expect("couldn't create temporary file")
+ .path().file_name().expect("couldn't get file name")
+ .to_str().expect("couldn't parse file name").to_string(),
+ succeeds: succeeds
+ }
+ }
+
+ pub fn from_file(filename: String, succeeds: bool) -> Self {
+ PackageManager::File { filename: filename, succeeds: succeeds }
+ }
+
pub fn installed_packages(&self) -> Result<Vec<Package>, Error> {
match *self {
PackageManager::Dpkg => dpkg::installed_packages(),
diff --git a/src/package_manager/tpm.rs b/src/package_manager/tpm.rs
index d9a67e0..aa17800 100644
--- a/src/package_manager/tpm.rs
+++ b/src/package_manager/tpm.rs
@@ -1,20 +1,21 @@
+use std::fmt::Debug;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::BufReader;
use std::io::BufWriter;
use std::io::prelude::*;
+use std::sync::mpsc::Receiver;
use datatype::{Error, Package, UpdateResultCode};
+use package_manager::package_manager::InstallOutcome;
pub fn installed_packages(path: &str) -> Result<Vec<Package>, Error> {
-
let f = try!(File::open(path));
let reader = BufReader::new(f);
let mut pkgs = Vec::new();
for line in reader.lines() {
-
let line = try!(line);
let parts = line.split(' ');
@@ -28,39 +29,39 @@ pub fn installed_packages(path: &str) -> Result<Vec<Package>, Error> {
}
}
}
-
}
- return Ok(pkgs)
-
+ Ok(pkgs)
}
-pub fn install_package(path: &str, pkg: &str, succeeds: bool) -> Result<(UpdateResultCode, String), (UpdateResultCode, String)> {
-
- fn install(path: &str, pkg: &str) -> Result<(), Error> {
- let f = try!(OpenOptions::new()
- .create(true)
- .write(true)
- .append(true)
- .open(path));
-
+pub fn install_package(path: &str, pkg: &str, succeeds: bool) -> Result<InstallOutcome, InstallOutcome> {
+ let install = || -> Result<(), Error> {
+ let f = OpenOptions::new().create(true).write(true).append(true).open(path)
+ .expect("couldn't open file for writing");
let mut writer = BufWriter::new(f);
-
try!(writer.write(pkg.as_bytes()));
try!(writer.write(b"\n"));
-
- return Ok(())
- }
+ Ok(())
+ };
if succeeds {
- match install(path, pkg) {
- Ok(_) => Ok((UpdateResultCode::OK, "".to_string())),
+ match install() {
+ Ok(_) => Ok((UpdateResultCode::OK, "".to_string())),
Err(e) => Err((UpdateResultCode::INSTALL_FAILED, format!("{:?}", e)))
}
} else {
Err((UpdateResultCode::INSTALL_FAILED, "failed".to_string()))
}
+}
+pub fn assert_rx<X: PartialEq + Debug>(rx: Receiver<X>, xs: &[X]) {
+ let n = xs.len();
+ let mut xs = xs.iter();
+ for _ in 0..n {
+ let val = rx.recv().expect("assert_rx expected another val");
+ let x = xs.next().expect(&format!("assert_rx: no match for val: {:?}", val));
+ assert_eq!(val, *x);
+ }
}