diff options
author | Shaun Taheri <shaun@advancedtelematic.com> | 2016-06-15 00:41:57 +0200 |
---|---|---|
committer | Shaun Taheri <shaun@advancedtelematic.com> | 2016-06-17 18:35:41 +0200 |
commit | 9f56304fc2b3fdbacf5fcea971ee93d61a0d7e32 (patch) | |
tree | ed1078aa1864e8426ee88086ff19728fba534baa | |
parent | 23c3888851279494bfc52c11326d6bebe379b761 (diff) | |
download | rvi_sota_client-9f56304fc2b3fdbacf5fcea971ee93d61a0d7e32.tar.gz |
Variable arg support for AcceptUpdate Command
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | src/datatype/command.rs | 144 | ||||
-rw-r--r-- | src/datatype/error.rs | 9 | ||||
-rw-r--r-- | src/datatype/event.rs | 2 | ||||
-rw-r--r-- | src/http_client/http_client.rs | 2 | ||||
-rw-r--r-- | src/http_client/test_client.rs | 2 | ||||
-rw-r--r-- | src/interaction_library/broadcast.rs | 2 | ||||
-rw-r--r-- | src/interaction_library/gateway.rs | 14 | ||||
-rw-r--r-- | src/interaction_library/http.rs | 6 | ||||
-rw-r--r-- | src/interpreter.rs | 260 | ||||
-rw-r--r-- | src/main.rs | 48 | ||||
-rw-r--r-- | src/ota_plus.rs | 36 | ||||
-rw-r--r-- | src/package_manager/mod.rs | 1 | ||||
-rw-r--r-- | src/package_manager/package_manager.rs | 21 | ||||
-rw-r--r-- | src/package_manager/tpm.rs | 41 |
15 files changed, 326 insertions, 264 deletions
@@ -1,3 +1,3 @@ target pkg/ota_plus_client -test_install_package_update_2 +.tmp* diff --git a/src/datatype/command.rs b/src/datatype/command.rs index e4ad023..3348c76 100644 --- a/src/datatype/command.rs +++ b/src/datatype/command.rs @@ -7,7 +7,7 @@ use datatype::{ClientCredentials, ClientId, ClientSecret, Error, UpdateRequestId #[derive(RustcDecodable, RustcEncodable, PartialEq, Eq, Debug, Clone)] pub enum Command { - AcceptUpdate(UpdateRequestId), + AcceptUpdates(Vec<UpdateRequestId>), /* Add: UpdateReport, InstalledSoftware, // or reuse UpdateInstalledPackages @@ -33,34 +33,18 @@ impl FromStr for Command { named!(command <(Command, Vec<&str>)>, chain!( space? ~ cmd: alt!( - alt_complete!(tag!("Authenticate") - | tag!("authenticate") - | tag!("auth") - ) => { |_| Command::Authenticate(None) } - - | alt_complete!(tag!("GetPendingUpdates") - | tag!("getPendingUpdates") - | tag!("pen") - ) => { |_| Command::GetPendingUpdates } - - | alt_complete!(tag!("AcceptUpdate") - | tag!("acceptUpdate") - | tag!("acc") - ) => { |_| Command::AcceptUpdate("".to_owned()) } - - | alt_complete!(tag!("ListInstalledPackages") - | tag!("listInstalledPackages") - | tag!("ls") - ) => { |_| Command::ListInstalledPackages } - - | alt_complete!(tag!("Shutdown") - | tag!("shutdown") - ) => { |_| Command::Shutdown } - - | alt_complete!(tag!("UpdateInstalledPackages") - | tag!("updateInstalledPackages") - | tag!("up") - ) => { |_| Command::UpdateInstalledPackages } + alt_complete!(tag!("AcceptUpdate") | tag!("acc")) + => { |_| Command::AcceptUpdates(Vec::new()) } + | alt_complete!(tag!("Authenticate") | tag!("auth")) + => { |_| Command::Authenticate(None) } + | alt_complete!(tag!("GetPendingUpdates") | tag!("pen")) + => { |_| Command::GetPendingUpdates } + | alt_complete!(tag!("ListInstalledPackages") | tag!("ls")) + => { |_| Command::ListInstalledPackages } + | alt_complete!(tag!("Shutdown") | tag!("shutdown")) + => { |_| Command::Shutdown } + | alt_complete!(tag!("UpdateInstalledPackages") | tag!("up")) + => { |_| Command::UpdateInstalledPackages } ) ~ args: arguments ~ alt!(eof | tag!("\r") | tag!("\n") | tag!(";")), @@ -83,56 +67,39 @@ named!(arguments <&[u8], Vec<&str> >, chain!( fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> { match cmd { - Command::AcceptUpdate(_) => { - match args.len() { - 0 => Err(Error::Command("usage: acc <id>".to_owned())), - 1 => Ok(Command::AcceptUpdate(args[0].to_owned())), - _ => Err(Error::Command(format!("unexpected acc args: {:?}", args))), - } - } - - Command::Authenticate(_) => { - match args.len() { - 0 => Ok(Command::Authenticate(None)), - 1 => Err(Error::Command("usage: auth <user> <pass>".to_owned())), - 2 => { - let (user, pass) = (args[0].to_owned(), args[1].to_owned()); - Ok(Command::Authenticate(Some(ClientCredentials { - id: ClientId(user), - secret: ClientSecret(pass) - }))) - } - _ => Err(Error::Command(format!("unexpected auth args: {:?}", args))), - } - } - - Command::GetPendingUpdates => { - match args.len() { - 0 => Ok(Command::GetPendingUpdates), - _ => Err(Error::Command(format!("unexpected pen args: {:?}", args))), - } - } - - Command::ListInstalledPackages => { - match args.len() { - 0 => Ok(Command::ListInstalledPackages), - _ => Err(Error::Command(format!("unexpected ls args: {:?}", args))), - } - } - - Command::Shutdown => { - match args.len() { - 0 => Ok(Command::Shutdown), - _ => Err(Error::Command(format!("unexpected shutdown args: {:?}", args))), - } - } - - Command::UpdateInstalledPackages => { - match args.len() { - 0 => Ok(Command::UpdateInstalledPackages), - _ => Err(Error::Command(format!("unexpected up args: {:?}", args))), - } - } + Command::AcceptUpdates(_) => match args.len() { + 0 => Err(Error::Command("usage: acc [<id>]".to_string())), + _ => Ok(Command::AcceptUpdates(args.iter().map(|arg| String::from(*arg)).collect())), + }, + + Command::Authenticate(_) => match args.len() { + 0 => Ok(Command::Authenticate(None)), + 1 => Err(Error::Command("usage: auth <id> <secret>".to_string())), + 2 => Ok(Command::Authenticate(Some(ClientCredentials { + id: ClientId(args[0].to_string()), + secret: ClientSecret(args[1].to_string())}))), + _ => Err(Error::Command(format!("unexpected auth args: {:?}", args))), + }, + + Command::GetPendingUpdates => match args.len() { + 0 => Ok(Command::GetPendingUpdates), + _ => Err(Error::Command(format!("unexpected pen args: {:?}", args))), + }, + + Command::ListInstalledPackages => match args.len() { + 0 => Ok(Command::ListInstalledPackages), + _ => Err(Error::Command(format!("unexpected ls args: {:?}", args))), + }, + + Command::Shutdown => match args.len() { + 0 => Ok(Command::Shutdown), + _ => Err(Error::Command(format!("unexpected shutdown args: {:?}", args))), + }, + + Command::UpdateInstalledPackages => match args.len() { + 0 => Ok(Command::UpdateInstalledPackages), + _ => Err(Error::Command(format!("unexpected up args: {:?}", args))), + }, } } @@ -143,12 +110,13 @@ mod tests { use datatype::{Command, ClientCredentials, ClientId, ClientSecret}; use nom::IResult; + #[test] fn parse_command_test() { assert_eq!(command(&b"auth foo bar"[..]), IResult::Done(&b""[..], (Command::Authenticate(None), vec!["foo", "bar"]))); assert_eq!(command(&b"acc 1"[..]), - IResult::Done(&b""[..], (Command::AcceptUpdate("".to_owned()), vec!["1"]))); + IResult::Done(&b""[..], (Command::AcceptUpdates(Vec::new()), vec!["1"]))); assert_eq!(command(&b"ls;\n"[..]), IResult::Done(&b"\n"[..], (Command::ListInstalledPackages, Vec::new()))); } @@ -165,23 +133,20 @@ mod tests { #[test] fn accept_update_test() { - assert_eq!("acc 1".parse::<Command>().unwrap(), Command::AcceptUpdate("1".to_owned())); - assert_eq!("acceptUpdate 2".parse::<Command>().unwrap(), Command::AcceptUpdate("2".to_owned())); - assert_eq!("AcceptUpdate 3".parse::<Command>().unwrap(), Command::AcceptUpdate("3".to_owned())); - assert_eq!("acc some".parse::<Command>().unwrap(), Command::AcceptUpdate("some".to_owned())); + assert_eq!("acc 1".parse::<Command>().unwrap(), Command::AcceptUpdates(vec!["1".to_string()])); + assert_eq!("AcceptUpdate this".parse::<Command>().unwrap(), Command::AcceptUpdates(vec!["this".to_string()])); + assert_eq!("acc some more".parse::<Command>().unwrap(), Command::AcceptUpdates(vec!["some".to_string(), "more".to_string()])); assert!("acc".parse::<Command>().is_err()); - assert!("acc more than one".parse::<Command>().is_err()); } #[test] fn authenticate_test() { assert_eq!("auth".parse::<Command>().unwrap(), Command::Authenticate(None)); - assert_eq!("authenticate".parse::<Command>().unwrap(), Command::Authenticate(None)); assert_eq!("Authenticate".parse::<Command>().unwrap(), Command::Authenticate(None)); assert_eq!("auth user pass".parse::<Command>().unwrap(), Command::Authenticate(Some(ClientCredentials { - id: ClientId("user".to_owned()), - secret: ClientSecret("pass".to_owned()), + id: ClientId("user".to_string()), + secret: ClientSecret("pass".to_string()), }))); assert!("auth one".parse::<Command>().is_err()); assert!("auth one two three".parse::<Command>().is_err()); @@ -190,7 +155,6 @@ mod tests { #[test] fn get_pending_updates_test() { assert_eq!("pen".parse::<Command>().unwrap(), Command::GetPendingUpdates); - assert_eq!("getPendingUpdates".parse::<Command>().unwrap(), Command::GetPendingUpdates); assert_eq!("GetPendingUpdates".parse::<Command>().unwrap(), Command::GetPendingUpdates); assert!("pen some".parse::<Command>().is_err()); } @@ -198,7 +162,6 @@ mod tests { #[test] fn list_installed_test() { assert_eq!("ls".parse::<Command>().unwrap(), Command::ListInstalledPackages); - assert_eq!("listInstalledPackages".parse::<Command>().unwrap(), Command::ListInstalledPackages); assert_eq!("ListInstalledPackages".parse::<Command>().unwrap(), Command::ListInstalledPackages); assert!("ls some".parse::<Command>().is_err()); } @@ -214,7 +177,6 @@ mod tests { #[test] fn update_installed_test() { assert_eq!("up".parse::<Command>().unwrap(), Command::UpdateInstalledPackages); - assert_eq!("updateInstalledPackages".parse::<Command>().unwrap(), Command::UpdateInstalledPackages); assert_eq!("UpdateInstalledPackages".parse::<Command>().unwrap(), Command::UpdateInstalledPackages); assert!("up down".parse::<Command>().is_err()); } diff --git a/src/datatype/error.rs b/src/datatype/error.rs index 388ee9f..4ad3307 100644 --- a/src/datatype/error.rs +++ b/src/datatype/error.rs @@ -13,6 +13,7 @@ use datatype::Event; use rustc_serialize::json::{EncoderError as JsonEncoderError, DecoderError as JsonDecoderError}; use ws::Error as WebsocketError; use super::super::http_client::auth_client::AuthHandler; +use super::super::interpreter::Wrapped; #[derive(Debug)] @@ -31,6 +32,7 @@ pub enum Error { ParseError(String), RecvError(RecvError), SendErrorEvent(SendError<Event>), + SendErrorWrapped(SendError<Wrapped>), TomlParserErrors(Vec<TomlParserError>), TomlDecodeError(TomlDecodeError), UrlParseError(UrlParseError), @@ -43,6 +45,12 @@ impl From<SendError<Event>> for Error { } } +impl From<SendError<Wrapped>> for Error { + fn from(e: SendError<Wrapped>) -> Error { + Error::SendErrorWrapped(e) + } +} + impl From<RecvError> for Error { fn from(e: RecvError) -> Error { Error::RecvError(e) @@ -111,6 +119,7 @@ impl Display for Error { Error::ParseError(ref s) => s.clone(), Error::RecvError(ref s) => format!("Recv error: {}", s.clone()), Error::SendErrorEvent(ref s) => format!("Send error for Event: {}", s.clone()), + Error::SendErrorWrapped(ref s) => format!("Send error for Wrapped: {}", s.clone()), Error::TomlDecodeError(ref e) => format!("Toml decode error: {}", e.clone()), Error::TomlParserErrors(ref e) => format!("Toml parser errors: {:?}", e.clone()), Error::UrlParseError(ref s) => format!("Url parse error: {}", s.clone()), diff --git a/src/datatype/event.rs b/src/datatype/event.rs index ac36697..3b7e7a8 100644 --- a/src/datatype/event.rs +++ b/src/datatype/event.rs @@ -6,8 +6,8 @@ use datatype::{UpdateRequestId, UpdateState, Package}; #[derive(RustcEncodable, RustcDecodable, Debug, Clone, PartialEq, Eq)] pub enum Event { Ok, + Authenticated, NotAuthenticated, - NewUpdateAvailable(UpdateRequestId), /* TODO: Add: DownloadComplete(UpdateRequestId), GetInstalledSoftware, diff --git a/src/http_client/http_client.rs b/src/http_client/http_client.rs index 938f5c0..a2039bd 100644 --- a/src/http_client/http_client.rs +++ b/src/http_client/http_client.rs @@ -11,6 +11,8 @@ pub trait HttpClient { } fn chan_request(&self, req: HttpRequest, resp_tx: Sender<HttpResponse>); + + fn is_testing(&self) -> bool { false } } #[derive(Debug)] diff --git a/src/http_client/test_client.rs b/src/http_client/test_client.rs index 7974144..a2a3017 100644 --- a/src/http_client/test_client.rs +++ b/src/http_client/test_client.rs @@ -26,4 +26,6 @@ impl HttpClient for TestHttpClient { None => resp_tx.send(Err(Error::ClientError(req.url.to_string()))) }.map_err(|err| error!("couldn't send test chan_request response: {}", err)); } + + fn is_testing(&self) -> bool { true } } diff --git a/src/interaction_library/broadcast.rs b/src/interaction_library/broadcast.rs index 3c712bf..b35e3a9 100644 --- a/src/interaction_library/broadcast.rs +++ b/src/interaction_library/broadcast.rs @@ -23,7 +23,7 @@ impl<A: Clone> Broadcast<A> { } } }, - Err(e) => error!("Error receiving: {}", e) + Err(e) => trace!("Error receiving: {}", e) } } } diff --git a/src/interaction_library/gateway.rs b/src/interaction_library/gateway.rs index 3ecdaf6..ce58886 100644 --- a/src/interaction_library/gateway.rs +++ b/src/interaction_library/gateway.rs @@ -22,20 +22,16 @@ pub trait Gateway<C, E>: Sized + Send + Sync + 'static thread::spawn(move || { loop { - gateway.next() - .map(|i| { - tx.send(i) - .map_err(|err| error!("Error sending command: {:?}", err)) - }); + gateway.next().map(|i| { + tx.send(i).map_err(|err| error!("Error sending command: {:?}", err)) + }); } }); thread::spawn(move || { loop { - match rx.recv() { - Ok(e) => global.pulse(e), - Err(err) => error!("Error receiving event: {:?}", err), - } + let _ = rx.recv().map(|e| global.pulse(e)) + .map_err(|err| trace!("Error receiving event: {:?}", err)); } }); } diff --git a/src/interaction_library/http.rs b/src/interaction_library/http.rs index 6575710..f506fa0 100644 --- a/src/interaction_library/http.rs +++ b/src/interaction_library/http.rs @@ -221,8 +221,8 @@ mod tests { loop { let w = wrx.recv().unwrap(); match w.cmd { - Command::AcceptUpdate(id) => { - let ev = Event::Error(id); + Command::AcceptUpdates(ids) => { + let ev = Event::Error(ids.first().unwrap().to_owned()); match w.etx { Some(etx) => etx.lock().unwrap().send(ev).unwrap(), None => panic!("expected transmitter"), @@ -238,7 +238,7 @@ mod tests { for id in 0..10 { scope.spawn(move || { let client = AuthClient::new(Auth::None); - let cmd = Command::AcceptUpdate(format!("{}", id)); + let cmd = Command::AcceptUpdates(vec!(format!("{}", id))); let req_body = json::encode(&cmd).unwrap(); let req = HttpRequest { diff --git a/src/interpreter.rs b/src/interpreter.rs index 9a76912..00e5adc 100644 --- a/src/interpreter.rs +++ b/src/interpreter.rs @@ -1,12 +1,11 @@ use std::borrow::Cow; use std::process::exit; -use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Sender, Receiver, channel}; use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config, - Error, Event, UpdateState}; + Error, Event, UpdateState, UpdateRequestId}; use datatype::Command::*; -use http_client::AuthClient; +use http_client::{AuthClient, HttpClient}; use interaction_library::gateway::Interpret; use oauth2::authenticate; use ota_plus::OTA; @@ -36,10 +35,6 @@ impl Interpreter<Event, Command> for EventInterpreter { ctx.send(Command::Authenticate(None)) } - Event::NewUpdateAvailable(ref id) => { - ctx.send(Command::AcceptUpdate(id.clone())) - } - /* TODO: Handle PackageManger events Event::DownloadComplete => { env.config.ota.package_manager.install_package(p); @@ -58,89 +53,64 @@ impl Interpreter<Event, Command> for EventInterpreter { } -pub type Wrapped = Interpret<Command, Event>; +pub struct CommandInterpreter; -pub struct WrappedInterpreter<'t> { - pub config: Config, - pub access_token: Option<Cow<'t, AccessToken>>, - pub wtx: Sender<Wrapped>, +impl Interpreter<Command, Wrapped> for CommandInterpreter { + fn interpret(&mut self, cmd: Command, wtx: &Sender<Wrapped>) { + info!("Command interpreter: {:?}", cmd); + let _ = wtx.send(Wrapped { cmd: cmd, etx: None }) + .map_err(|err| panic!("couldn't forward command: {}", err)); + } } -impl<'t> Interpreter<Wrapped, Event> for WrappedInterpreter<'t> { - fn interpret(&mut self, w: Wrapped, global_tx: &Sender<Event>) { - fn send_global(ev: Event, global_tx: &Sender<Event>) { - let _ = global_tx.send(ev).map_err(|err| panic!("couldn't send global response: {}", err)); - } - - fn send_local(ev: Event, local_tx: Option<Arc<Mutex<Sender<Event>>>>) { - if let Some(local) = local_tx { - let _ = local.lock().unwrap().send(ev) - .map_err(|err| panic!("couldn't send local response: {}", err)); - } - } - - info!("Interpreting wrapped command: {:?}", w.cmd); - let (multi_tx, multi_rx): (Sender<Event>, Receiver<Event>) = channel(); - match match self.access_token.to_owned() { - Some(token) => self.authenticated(w.cmd.clone(), token.into_owned(), multi_tx), - None => self.unauthenticated(w.cmd.clone(), multi_tx) - }{ - Ok(_) => { - let mut last_ev = None; - for ev in multi_rx { - send_global(ev.clone(), &global_tx); - last_ev = Some(ev); - } - match last_ev { - Some(ev) => send_local(ev, w.etx), - None => panic!("no local event to send back") - }; - } - Err(Error::AuthorizationError(_)) => { - debug!("retry authorization and request"); - let a = Wrapped { cmd: Command::Authenticate(None), etx: None }; - let _ = self.wtx.send(a).map_err(|err| panic!("couldn't retry authentication: {}", err)); - let _ = self.wtx.send(w).map_err(|err| panic!("couldn't retry request: {}", err)); - } +pub type Wrapped = Interpret<Command, Event>; - Err(err) => { - let ev = Event::Error(format!("{}", err)); - send_global(ev.clone(), &global_tx); - send_local(ev, w.etx); - } +impl Wrapped { + fn publish(&self, ev: Event) { + if let Some(ref etx) = self.etx { + let _ = etx.lock().unwrap().send(ev).map_err(|err| panic!("couldn't publish event: {}", err)); } } } +pub struct WrappedInterpreter<'t> { + pub config: Config, + pub token: Option<Cow<'t, AccessToken>>, + pub client: Box<HttpClient>, + pub loopback: Sender<Wrapped>, +} + impl<'t> WrappedInterpreter<'t> { - fn authenticated(&self, cmd: Command, token: AccessToken, etx: Sender<Event>) -> Result<(), Error> { - let client = AuthClient::new(Auth::Token(token)); - let mut ota = OTA::new(&self.config, &client); + fn authenticated(&self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> { + let mut ota = OTA::new(&self.config, self.client.as_ref()); // always send at least one Event response match cmd { - Authenticate(_) => try!(etx.send(Event::Ok)), - - AcceptUpdate(ref id) => { - try!(etx.send(Event::UpdateStateChanged(id.clone(), UpdateState::Downloading))); - let report = try!(ota.install_package_update(&id, &etx)); - try!(ota.send_install_report(&report)); - info!("Update finished. Report sent: {:?}", report); - try!(ota.update_installed_packages()) + AcceptUpdates(ids) => { + for id in ids { + info!("Accepting id {}", id); + try!(etx.send(Event::UpdateStateChanged(id.clone(), UpdateState::Downloading))); + let report = try!(ota.install_package_update(&id, &etx)); + try!(ota.send_install_report(&report)); + info!("Install Report for {}: {:?}", id, report); + try!(ota.update_installed_packages()) + } } + Authenticate(_) => try!(etx.send(Event::Ok)), + GetPendingUpdates => { let mut updates = try!(ota.get_package_updates()); - if updates.len() == 0 { - return Ok(try!(etx.send(Event::Ok))); - } - updates.sort_by_key(|update| update.installPos); - info!("New package updates available: {:?}", updates); - for update in updates.iter() { - try!(etx.send(Event::NewUpdateAvailable(update.requestId.clone()))) + if updates.len() > 0 { + updates.sort_by_key(|u| u.installPos); + info!("New package updates available: {:?}", updates); + let ids: Vec<UpdateRequestId> = updates.iter().map(|u| u.requestId.clone()).collect(); + let w = Wrapped { cmd: Command::AcceptUpdates(ids), etx: None }; + try!(self.loopback.send(w)) } + try!(etx.send(Event::Ok)); } ListInstalledPackages => { @@ -148,13 +118,13 @@ impl<'t> WrappedInterpreter<'t> { try!(etx.send(Event::FoundInstalledPackages(pkgs))) } + Shutdown => exit(0), + UpdateInstalledPackages => { try!(ota.update_installed_packages()); try!(etx.send(Event::Ok)); info!("Posted installed packages to the server.") } - - Shutdown => exit(0), } Ok(()) @@ -163,15 +133,12 @@ impl<'t> WrappedInterpreter<'t> { fn unauthenticated(&mut self, cmd: Command, etx: Sender<Event>) -> Result<(), Error> { match cmd { Authenticate(_) => { - let client = AuthClient::new(Auth::Credentials( - ClientId(self.config.auth.client_id.clone()), - ClientSecret(self.config.auth.secret.clone()))); - let token = try!(authenticate(&self.config.auth, &client)); - self.access_token = Some(token.into()); - try!(etx.send(Event::Ok)); + let token = try!(authenticate(&self.config.auth, self.client.as_ref())); + self.token = Some(token.into()); + try!(etx.send(Event::Authenticated)); } - AcceptUpdate(_) | + AcceptUpdates(_) | GetPendingUpdates | ListInstalledPackages | UpdateInstalledPackages => try!(etx.send(Event::NotAuthenticated)), @@ -182,3 +149,136 @@ impl<'t> WrappedInterpreter<'t> { Ok(()) } } + +impl<'t> Interpreter<Wrapped, Event> for WrappedInterpreter<'t> { + fn interpret(&mut self, w: Wrapped, global_tx: &Sender<Event>) { + info!("Wrapped interpreter: {:?}", w.cmd); + let broadcast = |ev: Event| { + let _ = global_tx.send(ev).map_err(|err| panic!("couldn't broadcast event: {}", err)); + }; + + let (etx, erx): (Sender<Event>, Receiver<Event>) = channel(); + let outcome = match self.token.to_owned() { + Some(token) => { + if !self.client.is_testing() { + self.client = Box::new(AuthClient::new(Auth::Token(token.into_owned()))); + } + self.authenticated(w.cmd.clone(), etx) + } + + None => { + if !self.client.is_testing() { + self.client = Box::new(AuthClient::new(Auth::Credentials( + ClientId(self.config.auth.client_id.clone()), + ClientSecret(self.config.auth.secret.clone())))); + } + self.unauthenticated(w.cmd.clone(), etx) + } + }; + + match outcome { + Ok(_) => { + let mut last_ev = None; + for ev in erx { + broadcast(ev.clone()); + last_ev = Some(ev); + } + match last_ev { + Some(ev) => w.publish(ev), + None => panic!("no local event to send back") + }; + } + + Err(Error::AuthorizationError(_)) => { + debug!("retry authorization and request"); + let a = Wrapped { cmd: Command::Authenticate(None), etx: None }; + let _ = self.loopback.send(a).map_err(|err| panic!("couldn't retry authentication: {}", err)); + let _ = self.loopback.send(w).map_err(|err| panic!("couldn't retry request: {}", err)); + } + + Err(err) => { + let ev = Event::Error(format!("{}", err)); + broadcast(ev.clone()); + w.publish(ev); + } + } + } +} + + +#[cfg(test)] +mod tests { + use std::thread; + use std::sync::mpsc::{channel, Sender, Receiver}; + + use super::*; + use datatype::{AccessToken, Command, Config, Event, UpdateState}; + use http_client::test_client::TestHttpClient; + use package_manager::PackageManager; + use package_manager::tpm::assert_rx; + + + fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) { + let (ctx, crx): (Sender<Command>, Receiver<Command>) = channel(); + let (etx, erx): (Sender<Event>, Receiver<Event>) = channel(); + let (wtx, _): (Sender<Wrapped>, Receiver<Wrapped>) = channel(); + + thread::spawn(move || { + let mut wi = WrappedInterpreter { + config: Config::default(), + token: Some(AccessToken::default().into()), + client: Box::new(TestHttpClient::from(replies)), + loopback: wtx, + }; + wi.config.ota.package_manager = pkg_mgr; + loop { + match crx.recv().expect("couldn't receive cmd") { + Command::Shutdown => break, + cmd @ _ => wi.interpret(Wrapped { cmd: cmd, etx: None }, &etx) + } + } + }); + + (ctx, erx) + } + + #[test] + fn already_authenticated() { + let (ctx, erx) = new_interpreter(Vec::new(), PackageManager::new_file(true)); + ctx.send(Command::Authenticate(None)).unwrap(); + for ev in erx.recv() { + assert_eq!(ev, Event::Ok); + } + ctx.send(Command::Shutdown).unwrap(); + } + + #[test] + fn accept_updates() { + let replies = vec!["[]".to_string(); 10]; + let (ctx, erx) = new_interpreter(replies, PackageManager::new_file(true)); + + ctx.send(Command::AcceptUpdates(vec!["1".to_string(), "2".to_string()])).unwrap(); + assert_rx(erx, &[ + Event::UpdateStateChanged("1".to_string(), UpdateState::Downloading), + Event::UpdateStateChanged("1".to_string(), UpdateState::Installing), + Event::UpdateStateChanged("1".to_string(), UpdateState::Installed), + Event::UpdateStateChanged("2".to_string(), UpdateState::Downloading), + Event::UpdateStateChanged("2".to_string(), UpdateState::Installing), + Event::UpdateStateChanged("2".to_string(), UpdateState::Installed), + ]); + ctx.send(Command::Shutdown).unwrap(); + } + + #[test] + fn failed_updates() { + let replies = vec!["[]".to_string(); 10]; + let pkg_mgr = PackageManager::new_file(false); + let (ctx, erx) = new_interpreter(replies, pkg_mgr); + + ctx.send(Command::AcceptUpdates(vec!["1".to_string()])).unwrap(); + assert_rx(erx, &[ + Event::Error("IO error: No such file or directory (os error 2)".to_owned()), + ]); + ctx.send(Command::Shutdown).unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index 039fc6a..bba5132 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,11 +4,11 @@ extern crate crossbeam; extern crate env_logger; extern crate getopts; extern crate hyper; +#[macro_use] extern crate libotaplus; #[macro_use] extern crate log; extern crate rustc_serialize; extern crate time; extern crate ws; -#[macro_use] extern crate libotaplus; use chan::Receiver as ChanReceiver; use chan_signal::Signal; @@ -20,11 +20,12 @@ use std::sync::mpsc::{Sender, Receiver, channel}; use std::thread; use std::time::Duration; -use libotaplus::datatype::{config, Command, Config, Event, Url}; +use libotaplus::datatype::{config, Auth, Command, Config, Event, Url}; +use libotaplus::http_client::AuthClient; use libotaplus::interaction_library::{Console, Gateway, Http, Websocket}; use libotaplus::interaction_library::broadcast::Broadcast; -use libotaplus::interaction_library::gateway::Interpret; -use libotaplus::interpreter::{EventInterpreter, Interpreter, Wrapped, WrappedInterpreter}; +use libotaplus::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, + Wrapped, WrappedInterpreter}; use libotaplus::package_manager::PackageManager; @@ -41,16 +42,8 @@ fn spawn_signal_handler(signals: ChanReceiver<Signal>, ctx: Sender<Command>) { fn spawn_update_poller(ctx: Sender<Command>, interval: u64) { loop { - let _ = ctx.send(Command::GetPendingUpdates); thread::sleep(Duration::from_secs(interval)); - } -} - -fn spawn_command_forwarder(crx: Receiver<Command>, wtx: Sender<Wrapped>) { - loop { - let _ = crx.recv() - .map(|cmd| wtx.send(Interpret { cmd: cmd, etx: None }).unwrap()) - .map_err(|err| panic!("couldn't receive command to forward: {:?}", err)); + let _ = ctx.send(Command::GetPendingUpdates); } } @@ -80,21 +73,6 @@ fn main() { let poll_interval = config.ota.polling_interval; scope.spawn(move || spawn_update_poller(poll_ctx, poll_interval)); - let cmd_wtx = wtx.clone(); - scope.spawn(move || spawn_command_forwarder(crx, cmd_wtx)); - - let event_sub = broadcast.subscribe(); - let event_ctx = ctx.clone(); - let mut event_int = EventInterpreter; - scope.spawn(move || event_int.run(event_sub, event_ctx)); - - let mut wrapped_int = WrappedInterpreter { - config: config.clone(), - access_token: None, - wtx: wtx.clone(), - }; - scope.spawn(move || wrapped_int.run(wrx, etx)); - let ws_wtx = wtx.clone(); let ws_sub = broadcast.subscribe(); scope.spawn(move || Websocket::run(ws_wtx, ws_sub)); @@ -112,6 +90,20 @@ fn main() { scope.spawn(move || Console::run(cons_wtx, cons_sub)); } + let event_sub = broadcast.subscribe(); + let event_ctx = ctx.clone(); + scope.spawn(move || EventInterpreter.run(event_sub, event_ctx)); + + let cmd_wtx = wtx.clone(); + scope.spawn(move || CommandInterpreter.run(crx, cmd_wtx)); + + scope.spawn(move || WrappedInterpreter { + config: config, + token: None, + client: Box::new(AuthClient::new(Auth::None)), + loopback: wtx, + }.run(wrx, etx)); + scope.spawn(move || broadcast.start()); }); } diff --git a/src/ota_plus.rs b/src/ota_plus.rs index 6d38da1..c139248 100644 --- a/src/ota_plus.rs +++ b/src/ota_plus.rs @@ -142,14 +142,14 @@ impl<'c, 'h> OTA<'c, 'h> { #[cfg(test)] mod tests { - use std::fmt::Debug; - use std::sync::mpsc::{channel, Receiver}; + use std::sync::mpsc::channel; use rustc_serialize::json; use super::*; use datatype::{Config, Event, Package, PendingUpdateRequest, UpdateResultCode, UpdateState}; use http_client::TestHttpClient; use package_manager::PackageManager; + use package_manager::tpm::assert_rx; #[test] @@ -185,20 +185,6 @@ mod tests { assert_eq!(expect, format!("{}", ota.download_package_update(&"0".to_string()).unwrap_err())); } - fn assert_receiver_eq<X: PartialEq + Debug>(rx: Receiver<X>, xs: &[X]) { - let mut xs = xs.iter(); - while let Ok(x) = rx.try_recv() { - if let Some(y) = xs.next() { - assert_eq!(x, *y) - } else { - panic!("assert_receiver_eq: never nexted `{:?}`", x) - } - } - if let Some(x) = xs.next() { - panic!("assert_receiver_eq: never received `{:?}`", x) - } - } - #[test] fn test_install_package_update_0() { let mut ota = OTA { @@ -211,19 +197,16 @@ mod tests { UpdateResultCode::GENERAL_ERROR); let expect = r#"ClientError("http://127.0.0.1:8080/api/v1/vehicle_updates/V1234567890123456/0/download")"#; - assert_receiver_eq(rx, &[ + assert_rx(rx, &[ Event::UpdateErrored("0".to_string(), String::from(expect)) ]); } #[test] fn test_install_package_update_1() { - let mut config = Config::default(); + let mut config = Config::default(); config.ota.packages_dir = "/tmp/".to_string(); - config.ota.package_manager = PackageManager::File { - filename: "test_install_package_update_1".to_string(), - succeeds: false - }; + config.ota.package_manager = PackageManager::new_file(false); let mut ota = OTA { config: &config, @@ -234,7 +217,7 @@ mod tests { assert_eq!(report.unwrap().operation_results.pop().unwrap().result_code, UpdateResultCode::INSTALL_FAILED); - assert_receiver_eq(rx, &[ + assert_rx(rx, &[ Event::UpdateStateChanged("0".to_string(), UpdateState::Installing), // XXX: Not very helpful message? Event::UpdateErrored("0".to_string(), r#"INSTALL_FAILED: "failed""#.to_string()) @@ -245,10 +228,7 @@ mod tests { fn test_install_package_update_2() { let mut config = Config::default(); config.ota.packages_dir = "/tmp/".to_string(); - config.ota.package_manager = PackageManager::File { - filename: "test_install_package_update_2".to_string(), - succeeds: true - }; + config.ota.package_manager = PackageManager::new_file(true); let replies = vec![ "[]".to_string(), @@ -263,7 +243,7 @@ mod tests { assert_eq!(report.unwrap().operation_results.pop().unwrap().result_code, UpdateResultCode::OK); - assert_receiver_eq(rx, &[ + assert_rx(rx, &[ Event::UpdateStateChanged("0".to_string(), UpdateState::Installing), Event::UpdateStateChanged("0".to_string(), UpdateState::Installed) ]); diff --git a/src/package_manager/mod.rs b/src/package_manager/mod.rs index b3db086..2ba31be 100644 --- a/src/package_manager/mod.rs +++ b/src/package_manager/mod.rs @@ -1,4 +1,5 @@ pub use self::package_manager::PackageManager; +pub use self::tpm::assert_rx; pub mod dpkg; pub mod package_manager; diff --git a/src/package_manager/package_manager.rs b/src/package_manager/package_manager.rs index 104f18b..a31d303 100644 --- a/src/package_manager/package_manager.rs +++ b/src/package_manager/package_manager.rs @@ -1,8 +1,14 @@ +extern crate tempfile; + use rustc_serialize::{Decoder, Decodable}; +use std::env::temp_dir; use datatype::{Error, Package, UpdateResultCode}; use package_manager::{dpkg, rpm, tpm}; +use tempfile::NamedTempFile; + +pub type InstallOutcome = (UpdateResultCode, String); #[derive(Debug, PartialEq, Eq, Clone)] pub enum PackageManager { @@ -11,9 +17,20 @@ pub enum PackageManager { File { filename: String, succeeds: bool } } -pub type InstallOutcome = (UpdateResultCode, String); - impl PackageManager { + pub fn new_file(succeeds: bool) -> Self { + PackageManager::File { + filename: NamedTempFile::new_in(temp_dir()).expect("couldn't create temporary file") + .path().file_name().expect("couldn't get file name") + .to_str().expect("couldn't parse file name").to_string(), + succeeds: succeeds + } + } + + pub fn from_file(filename: String, succeeds: bool) -> Self { + PackageManager::File { filename: filename, succeeds: succeeds } + } + pub fn installed_packages(&self) -> Result<Vec<Package>, Error> { match *self { PackageManager::Dpkg => dpkg::installed_packages(), diff --git a/src/package_manager/tpm.rs b/src/package_manager/tpm.rs index d9a67e0..aa17800 100644 --- a/src/package_manager/tpm.rs +++ b/src/package_manager/tpm.rs @@ -1,20 +1,21 @@ +use std::fmt::Debug; use std::fs::File; use std::fs::OpenOptions; use std::io::BufReader; use std::io::BufWriter; use std::io::prelude::*; +use std::sync::mpsc::Receiver; use datatype::{Error, Package, UpdateResultCode}; +use package_manager::package_manager::InstallOutcome; pub fn installed_packages(path: &str) -> Result<Vec<Package>, Error> { - let f = try!(File::open(path)); let reader = BufReader::new(f); let mut pkgs = Vec::new(); for line in reader.lines() { - let line = try!(line); let parts = line.split(' '); @@ -28,39 +29,39 @@ pub fn installed_packages(path: &str) -> Result<Vec<Package>, Error> { } } } - } - return Ok(pkgs) - + Ok(pkgs) } -pub fn install_package(path: &str, pkg: &str, succeeds: bool) -> Result<(UpdateResultCode, String), (UpdateResultCode, String)> { - - fn install(path: &str, pkg: &str) -> Result<(), Error> { - let f = try!(OpenOptions::new() - .create(true) - .write(true) - .append(true) - .open(path)); - +pub fn install_package(path: &str, pkg: &str, succeeds: bool) -> Result<InstallOutcome, InstallOutcome> { + let install = || -> Result<(), Error> { + let f = OpenOptions::new().create(true).write(true).append(true).open(path) + .expect("couldn't open file for writing"); let mut writer = BufWriter::new(f); - try!(writer.write(pkg.as_bytes())); try!(writer.write(b"\n")); - - return Ok(()) - } + Ok(()) + }; if succeeds { - match install(path, pkg) { - Ok(_) => Ok((UpdateResultCode::OK, "".to_string())), + match install() { + Ok(_) => Ok((UpdateResultCode::OK, "".to_string())), Err(e) => Err((UpdateResultCode::INSTALL_FAILED, format!("{:?}", e))) } } else { Err((UpdateResultCode::INSTALL_FAILED, "failed".to_string())) } +} +pub fn assert_rx<X: PartialEq + Debug>(rx: Receiver<X>, xs: &[X]) { + let n = xs.len(); + let mut xs = xs.iter(); + for _ in 0..n { + let val = rx.recv().expect("assert_rx expected another val"); + let x = xs.next().expect(&format!("assert_rx: no match for val: {:?}", val)); + assert_eq!(val, *x); + } } |