diff options
Diffstat (limited to 'src/interpreter.rs')
-rw-r--r-- | src/interpreter.rs | 202 |
1 files changed, 113 insertions, 89 deletions
diff --git a/src/interpreter.rs b/src/interpreter.rs index b286ba5..d2d3f99 100644 --- a/src/interpreter.rs +++ b/src/interpreter.rs @@ -1,10 +1,13 @@ use chan; -use chan::{Sender, Receiver}; -use std; +use chan::{Sender, Receiver, WaitGroup}; +use std::{process, thread}; use std::borrow::Cow; +use std::time::Duration; +use time; -use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config, - Error, Event, Package, UpdateRequestId}; +use datatype::{AccessToken, Auth, ClientCredentials, Command, Config, Error, Event, + Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode, + system_info}; use gateway::Interpret; use http::{AuthClient, Client}; use oauth2::authenticate; @@ -18,9 +21,20 @@ use sota::Sota; pub trait Interpreter<I, O> { fn interpret(&mut self, input: I, otx: &Sender<O>); - fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) { + fn run(&mut self, irx: Receiver<I>, otx: Sender<O>, wg: WaitGroup) { + let cooldown = Duration::from_millis(100); + loop { - self.interpret(irx.recv().expect("interpreter sender closed"), &otx); + let input = irx.recv().expect("interpreter sender closed"); + let started = time::precise_time_ns(); + + wg.add(1); + trace!("interpreter starting: {}", started); + self.interpret(input, &otx); + + thread::sleep(cooldown); // let any further work commence + trace!("interpreter stopping: {}", started); + wg.done(); } } } @@ -29,35 +43,68 @@ pub trait Interpreter<I, O> { /// The `EventInterpreter` listens for `Event`s and optionally responds with /// `Command`s that may be sent to the `CommandInterpreter`. pub struct EventInterpreter { - pub package_manager: PackageManager + pub pacman: PackageManager, + pub sysinfo: Option<String>, } impl Interpreter<Event, Command> for EventInterpreter { fn interpret(&mut self, event: Event, ctx: &Sender<Command>) { - info!("Event received: {}", event); + info!("EventInterpreter received: {}", event); + match event { + Event::Authenticated => { + if self.pacman != PackageManager::Off { + self.pacman.installed_packages().map(|packages| { + ctx.send(Command::SendInstalledPackages(packages)); + }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); + } + + self.sysinfo.as_ref().map(|_| ctx.send(Command::SendSystemInfo)); + } + Event::NotAuthenticated => { info!("Trying to authenticate again..."); ctx.send(Command::Authenticate(None)); } - Event::NewUpdatesReceived(ids) => { - ctx.send(Command::StartDownload(ids)); + Event::UpdatesReceived(requests) => { + for request in requests { + let id = request.requestId.clone(); + match request.status { + Status::Pending => ctx.send(Command::StartDownload(id)), + + Status::InFlight if self.pacman != PackageManager::Off => { + if self.pacman.is_installed(&request.packageId) { + let report = UpdateReport::single(id, UpdateResultCode::OK, "".to_string()); + ctx.send(Command::SendUpdateReport(report)); + } else { + ctx.send(Command::StartDownload(id)); + } + } + + _ => () + } + } } Event::DownloadComplete(dl) => { - if self.package_manager != PackageManager::Off { - ctx.send(Command::StartInstall(dl)); + if self.pacman != PackageManager::Off { + ctx.send(Command::StartInstall(dl.update_id.clone())); } } - Event::InstallComplete(report) => { + Event::DownloadFailed(id, reason) => { + let report = UpdateReport::single(id, UpdateResultCode::GENERAL_ERROR, reason); + ctx.send(Command::SendUpdateReport(report)); + } + + Event::InstallComplete(report) | Event::InstallFailed(report) => { ctx.send(Command::SendUpdateReport(report)); } Event::UpdateReportSent => { - if self.package_manager != PackageManager::Off { - self.package_manager.installed_packages().map(|packages| { + if self.pacman != PackageManager::Off { + self.pacman.installed_packages().map(|packages| { ctx.send(Command::SendInstalledPackages(packages)); }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); } @@ -75,7 +122,7 @@ pub struct CommandInterpreter; impl Interpreter<Command, Interpret> for CommandInterpreter { fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) { - info!("Command received: {}", cmd); + info!("CommandInterpreter received: {}", cmd); itx.send(Interpret { command: cmd, response_tx: None }); } } @@ -88,13 +135,12 @@ pub struct GlobalInterpreter<'t> { pub config: Config, pub token: Option<Cow<'t, AccessToken>>, pub http_client: Box<Client>, - pub rvi: Option<Services>, - pub loopback_tx: Sender<Interpret>, + pub rvi: Option<Services> } impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) { - info!("Interpreter started: {}", interpret.command); + info!("GlobalInterpreter received: {}", interpret.command); let (multi_tx, multi_rx) = chan::async::<Event>(); let outcome = match (self.token.as_ref(), self.config.auth.is_none()) { @@ -109,21 +155,20 @@ impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { etx.send(ev.clone()); response_ev = Some(ev); } - info!("Interpreter finished."); } - Err(Error::Authorization(_)) => { + Err(Error::HttpAuth(resp)) => { + error!("HTTP authorization failed: {}", resp); + self.token = None; let ev = Event::NotAuthenticated; etx.send(ev.clone()); response_ev = Some(ev); - error!("Interpreter authentication failed"); } Err(err) => { let ev = Event::Error(format!("{}", err)); etx.send(ev.clone()); response_ev = Some(ev); - error!("Interpreter failed: {}", err); } } @@ -138,16 +183,15 @@ impl<'t> GlobalInterpreter<'t> { // always send at least one Event response match cmd { - Command::Authenticate(_) => etx.send(Event::Authenticated), + Command::Authenticate(_) => etx.send(Event::AlreadyAuthenticated), - Command::GetNewUpdates => { - let mut updates = try!(sota.get_pending_updates()); + Command::GetUpdateRequests => { + let mut updates = try!(sota.get_update_requests()); if updates.is_empty() { - etx.send(Event::NoNewUpdates); + etx.send(Event::NoUpdateRequests); } else { updates.sort_by_key(|u| u.installPos); - let ids = updates.iter().map(|u| u.requestId.clone()).collect::<Vec<UpdateRequestId>>(); - etx.send(Event::NewUpdatesReceived(ids)) + etx.send(Event::UpdatesReceived(updates)); } } @@ -159,18 +203,13 @@ impl<'t> GlobalInterpreter<'t> { etx.send(Event::FoundInstalledPackages(packages)); } - Command::RefreshSystemInfo(post) => { - let info = try!(self.config.device.system_info.report()); - etx.send(Event::FoundSystemInfo(info.clone())); - if post { - let _ = sota.send_system_info(&info) - .map_err(|err| etx.send(Event::Error(format!("{}", err)))); - } + Command::ListSystemInfo => { + let cmd = self.config.device.system_info.as_ref().expect("system_info command not set"); + etx.send(Event::FoundSystemInfo(try!(system_info(&cmd)))); } Command::SendInstalledPackages(packages) => { - let _ = sota.send_installed_packages(&packages) - .map_err(|err| error!("couldn't send installed packages: {}", err)); + try!(sota.send_installed_packages(&packages)); etx.send(Event::InstalledPackagesSent); } @@ -178,39 +217,43 @@ impl<'t> GlobalInterpreter<'t> { if let Some(ref rvi) = self.rvi { let _ = rvi.remote.lock().unwrap().send_installed_software(sw); } + etx.send(Event::InstalledSoftwareSent); + } + + Command::SendSystemInfo => { + let cmd = self.config.device.system_info.as_ref().expect("system_info command not set"); + try!(sota.send_system_info(&try!(system_info(&cmd)))); + etx.send(Event::SystemInfoSent); } Command::SendUpdateReport(report) => { if let Some(ref rvi) = self.rvi { let _ = rvi.remote.lock().unwrap().send_update_report(report); } else { - let _ = sota.send_update_report(&report) - .map_err(|err| error!("couldn't send update report: {}", err)); + try!(sota.send_update_report(&report)); } etx.send(Event::UpdateReportSent); } - Command::StartDownload(ref ids) => { - for id in ids { - etx.send(Event::DownloadingUpdate(id.clone())); - - if let Some(ref rvi) = self.rvi { - let _ = rvi.remote.lock().unwrap().send_download_started(id.clone()); - } else { - let _ = sota.download_update(id.clone()) - .map(|dl| etx.send(Event::DownloadComplete(dl))) - .map_err(|err| etx.send(Event::DownloadFailed(id.clone(), format!("{}", err)))); - } + Command::StartDownload(id) => { + etx.send(Event::DownloadingUpdate(id.clone())); + if let Some(ref rvi) = self.rvi { + let _ = rvi.remote.lock().unwrap().send_download_started(id); + } else { + let _ = sota.download_update(id.clone()) + .map(|dl| etx.send(Event::DownloadComplete(dl))) + .map_err(|err| etx.send(Event::DownloadFailed(id, format!("{}", err)))); } } - Command::StartInstall(dl) => { - let _ = sota.install_update(dl) + Command::StartInstall(id) => { + etx.send(Event::InstallingUpdate(id.clone())); + let _ = sota.install_update(id) .map(|report| etx.send(Event::InstallComplete(report))) .map_err(|report| etx.send(Event::InstallFailed(report))); } - Command::Shutdown => std::process::exit(0), + Command::Shutdown => process::exit(0), } Ok(()) @@ -220,8 +263,10 @@ impl<'t> GlobalInterpreter<'t> { match cmd { Command::Authenticate(_) => { let config = self.config.auth.clone().expect("trying to authenticate without auth config"); - self.set_client(Auth::Credentials(ClientId(config.client_id), - ClientSecret(config.client_secret))); + self.set_client(Auth::Credentials(ClientCredentials { + client_id: config.client_id, + client_secret: config.client_secret, + })); let server = config.server.join("/token").expect("couldn't build authentication url"); let token = try!(authenticate(server, self.http_client.as_ref())); self.set_client(Auth::Token(token.clone())); @@ -229,16 +274,9 @@ impl<'t> GlobalInterpreter<'t> { etx.send(Event::Authenticated); } - Command::GetNewUpdates | - Command::ListInstalledPackages | - Command::RefreshSystemInfo(_) | - Command::SendInstalledPackages(_) | - Command::SendInstalledSoftware(_) | - Command::SendUpdateReport(_) | - Command::StartDownload(_) | - Command::StartInstall(_) => etx.send(Event::NotAuthenticated), + Command::Shutdown => process::exit(0), - Command::Shutdown => std::process::exit(0), + _ => etx.send(Event::NotAuthenticated) } Ok(()) @@ -270,15 +308,13 @@ mod tests { fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) { let (etx, erx) = chan::sync::<Event>(0); let (ctx, crx) = chan::sync::<Command>(0); - let (itx, _) = chan::sync::<Interpret>(0); thread::spawn(move || { let mut gi = GlobalInterpreter { config: Config::default(), token: Some(AccessToken::default().into()), http_client: Box::new(TestClient::from(replies)), - rvi: None, - loopback_tx: itx, + rvi: None }; gi.config.device.package_manager = pkg_mgr; @@ -300,7 +336,7 @@ mod tests { let (ctx, erx) = new_interpreter(replies, pkg_mgr); ctx.send(Command::Authenticate(None)); - assert_rx(erx, &[Event::Authenticated]); + assert_rx(erx, &[Event::AlreadyAuthenticated]); } #[test] @@ -309,35 +345,26 @@ mod tests { let pkg_mgr = PackageManager::new_tpm(true); let (ctx, erx) = new_interpreter(replies, pkg_mgr); - ctx.send(Command::StartDownload(vec!["1".to_string(), "2".to_string()])); + ctx.send(Command::StartDownload("1".to_string())); assert_rx(erx, &[ Event::DownloadingUpdate("1".to_string()), Event::DownloadComplete(DownloadComplete { update_id: "1".to_string(), update_image: "/tmp/1".to_string(), signature: "".to_string() - }), - Event::DownloadingUpdate("2".to_string()), - Event::DownloadComplete(DownloadComplete { - update_id: "2".to_string(), - update_image: "/tmp/2".to_string(), - signature: "".to_string() - }), + }) ]); } #[test] - fn install_update() { + fn install_update_success() { let replies = vec!["[]".to_string(); 10]; let pkg_mgr = PackageManager::new_tpm(true); let (ctx, erx) = new_interpreter(replies, pkg_mgr); - ctx.send(Command::StartInstall(DownloadComplete { - update_id: "1".to_string(), - update_image: "/tmp/1".to_string(), - signature: "".to_string() - })); + ctx.send(Command::StartInstall("1".to_string())); assert_rx(erx, &[ + Event::InstallingUpdate("1".to_string()), Event::InstallComplete( UpdateReport::single("1".to_string(), UpdateResultCode::OK, "".to_string()) ) @@ -345,17 +372,14 @@ mod tests { } #[test] - fn failed_installation() { + fn install_update_failed() { let replies = vec!["[]".to_string(); 10]; let pkg_mgr = PackageManager::new_tpm(false); let (ctx, erx) = new_interpreter(replies, pkg_mgr); - ctx.send(Command::StartInstall(DownloadComplete { - update_id: "1".to_string(), - update_image: "/tmp/1".to_string(), - signature: "".to_string() - })); + ctx.send(Command::StartInstall("1".to_string())); assert_rx(erx, &[ + Event::InstallingUpdate("1".to_string()), Event::InstallFailed( UpdateReport::single("1".to_string(), UpdateResultCode::INSTALL_FAILED, "failed".to_string()) ) |