summaryrefslogtreecommitdiff
path: root/src/interpreter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/interpreter.rs')
-rw-r--r--src/interpreter.rs202
1 files changed, 113 insertions, 89 deletions
diff --git a/src/interpreter.rs b/src/interpreter.rs
index b286ba5..d2d3f99 100644
--- a/src/interpreter.rs
+++ b/src/interpreter.rs
@@ -1,10 +1,13 @@
use chan;
-use chan::{Sender, Receiver};
-use std;
+use chan::{Sender, Receiver, WaitGroup};
+use std::{process, thread};
use std::borrow::Cow;
+use std::time::Duration;
+use time;
-use datatype::{AccessToken, Auth, ClientId, ClientSecret, Command, Config,
- Error, Event, Package, UpdateRequestId};
+use datatype::{AccessToken, Auth, ClientCredentials, Command, Config, Error, Event,
+ Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode,
+ system_info};
use gateway::Interpret;
use http::{AuthClient, Client};
use oauth2::authenticate;
@@ -18,9 +21,20 @@ use sota::Sota;
pub trait Interpreter<I, O> {
fn interpret(&mut self, input: I, otx: &Sender<O>);
- fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) {
+ fn run(&mut self, irx: Receiver<I>, otx: Sender<O>, wg: WaitGroup) {
+ let cooldown = Duration::from_millis(100);
+
loop {
- self.interpret(irx.recv().expect("interpreter sender closed"), &otx);
+ let input = irx.recv().expect("interpreter sender closed");
+ let started = time::precise_time_ns();
+
+ wg.add(1);
+ trace!("interpreter starting: {}", started);
+ self.interpret(input, &otx);
+
+ thread::sleep(cooldown); // let any further work commence
+ trace!("interpreter stopping: {}", started);
+ wg.done();
}
}
}
@@ -29,35 +43,68 @@ pub trait Interpreter<I, O> {
/// The `EventInterpreter` listens for `Event`s and optionally responds with
/// `Command`s that may be sent to the `CommandInterpreter`.
pub struct EventInterpreter {
- pub package_manager: PackageManager
+ pub pacman: PackageManager,
+ pub sysinfo: Option<String>,
}
impl Interpreter<Event, Command> for EventInterpreter {
fn interpret(&mut self, event: Event, ctx: &Sender<Command>) {
- info!("Event received: {}", event);
+ info!("EventInterpreter received: {}", event);
+
match event {
+ Event::Authenticated => {
+ if self.pacman != PackageManager::Off {
+ self.pacman.installed_packages().map(|packages| {
+ ctx.send(Command::SendInstalledPackages(packages));
+ }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err));
+ }
+
+ self.sysinfo.as_ref().map(|_| ctx.send(Command::SendSystemInfo));
+ }
+
Event::NotAuthenticated => {
info!("Trying to authenticate again...");
ctx.send(Command::Authenticate(None));
}
- Event::NewUpdatesReceived(ids) => {
- ctx.send(Command::StartDownload(ids));
+ Event::UpdatesReceived(requests) => {
+ for request in requests {
+ let id = request.requestId.clone();
+ match request.status {
+ Status::Pending => ctx.send(Command::StartDownload(id)),
+
+ Status::InFlight if self.pacman != PackageManager::Off => {
+ if self.pacman.is_installed(&request.packageId) {
+ let report = UpdateReport::single(id, UpdateResultCode::OK, "".to_string());
+ ctx.send(Command::SendUpdateReport(report));
+ } else {
+ ctx.send(Command::StartDownload(id));
+ }
+ }
+
+ _ => ()
+ }
+ }
}
Event::DownloadComplete(dl) => {
- if self.package_manager != PackageManager::Off {
- ctx.send(Command::StartInstall(dl));
+ if self.pacman != PackageManager::Off {
+ ctx.send(Command::StartInstall(dl.update_id.clone()));
}
}
- Event::InstallComplete(report) => {
+ Event::DownloadFailed(id, reason) => {
+ let report = UpdateReport::single(id, UpdateResultCode::GENERAL_ERROR, reason);
+ ctx.send(Command::SendUpdateReport(report));
+ }
+
+ Event::InstallComplete(report) | Event::InstallFailed(report) => {
ctx.send(Command::SendUpdateReport(report));
}
Event::UpdateReportSent => {
- if self.package_manager != PackageManager::Off {
- self.package_manager.installed_packages().map(|packages| {
+ if self.pacman != PackageManager::Off {
+ self.pacman.installed_packages().map(|packages| {
ctx.send(Command::SendInstalledPackages(packages));
}).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err));
}
@@ -75,7 +122,7 @@ pub struct CommandInterpreter;
impl Interpreter<Command, Interpret> for CommandInterpreter {
fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) {
- info!("Command received: {}", cmd);
+ info!("CommandInterpreter received: {}", cmd);
itx.send(Interpret { command: cmd, response_tx: None });
}
}
@@ -88,13 +135,12 @@ pub struct GlobalInterpreter<'t> {
pub config: Config,
pub token: Option<Cow<'t, AccessToken>>,
pub http_client: Box<Client>,
- pub rvi: Option<Services>,
- pub loopback_tx: Sender<Interpret>,
+ pub rvi: Option<Services>
}
impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> {
fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) {
- info!("Interpreter started: {}", interpret.command);
+ info!("GlobalInterpreter received: {}", interpret.command);
let (multi_tx, multi_rx) = chan::async::<Event>();
let outcome = match (self.token.as_ref(), self.config.auth.is_none()) {
@@ -109,21 +155,20 @@ impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> {
etx.send(ev.clone());
response_ev = Some(ev);
}
- info!("Interpreter finished.");
}
- Err(Error::Authorization(_)) => {
+ Err(Error::HttpAuth(resp)) => {
+ error!("HTTP authorization failed: {}", resp);
+ self.token = None;
let ev = Event::NotAuthenticated;
etx.send(ev.clone());
response_ev = Some(ev);
- error!("Interpreter authentication failed");
}
Err(err) => {
let ev = Event::Error(format!("{}", err));
etx.send(ev.clone());
response_ev = Some(ev);
- error!("Interpreter failed: {}", err);
}
}
@@ -138,16 +183,15 @@ impl<'t> GlobalInterpreter<'t> {
// always send at least one Event response
match cmd {
- Command::Authenticate(_) => etx.send(Event::Authenticated),
+ Command::Authenticate(_) => etx.send(Event::AlreadyAuthenticated),
- Command::GetNewUpdates => {
- let mut updates = try!(sota.get_pending_updates());
+ Command::GetUpdateRequests => {
+ let mut updates = try!(sota.get_update_requests());
if updates.is_empty() {
- etx.send(Event::NoNewUpdates);
+ etx.send(Event::NoUpdateRequests);
} else {
updates.sort_by_key(|u| u.installPos);
- let ids = updates.iter().map(|u| u.requestId.clone()).collect::<Vec<UpdateRequestId>>();
- etx.send(Event::NewUpdatesReceived(ids))
+ etx.send(Event::UpdatesReceived(updates));
}
}
@@ -159,18 +203,13 @@ impl<'t> GlobalInterpreter<'t> {
etx.send(Event::FoundInstalledPackages(packages));
}
- Command::RefreshSystemInfo(post) => {
- let info = try!(self.config.device.system_info.report());
- etx.send(Event::FoundSystemInfo(info.clone()));
- if post {
- let _ = sota.send_system_info(&info)
- .map_err(|err| etx.send(Event::Error(format!("{}", err))));
- }
+ Command::ListSystemInfo => {
+ let cmd = self.config.device.system_info.as_ref().expect("system_info command not set");
+ etx.send(Event::FoundSystemInfo(try!(system_info(&cmd))));
}
Command::SendInstalledPackages(packages) => {
- let _ = sota.send_installed_packages(&packages)
- .map_err(|err| error!("couldn't send installed packages: {}", err));
+ try!(sota.send_installed_packages(&packages));
etx.send(Event::InstalledPackagesSent);
}
@@ -178,39 +217,43 @@ impl<'t> GlobalInterpreter<'t> {
if let Some(ref rvi) = self.rvi {
let _ = rvi.remote.lock().unwrap().send_installed_software(sw);
}
+ etx.send(Event::InstalledSoftwareSent);
+ }
+
+ Command::SendSystemInfo => {
+ let cmd = self.config.device.system_info.as_ref().expect("system_info command not set");
+ try!(sota.send_system_info(&try!(system_info(&cmd))));
+ etx.send(Event::SystemInfoSent);
}
Command::SendUpdateReport(report) => {
if let Some(ref rvi) = self.rvi {
let _ = rvi.remote.lock().unwrap().send_update_report(report);
} else {
- let _ = sota.send_update_report(&report)
- .map_err(|err| error!("couldn't send update report: {}", err));
+ try!(sota.send_update_report(&report));
}
etx.send(Event::UpdateReportSent);
}
- Command::StartDownload(ref ids) => {
- for id in ids {
- etx.send(Event::DownloadingUpdate(id.clone()));
-
- if let Some(ref rvi) = self.rvi {
- let _ = rvi.remote.lock().unwrap().send_download_started(id.clone());
- } else {
- let _ = sota.download_update(id.clone())
- .map(|dl| etx.send(Event::DownloadComplete(dl)))
- .map_err(|err| etx.send(Event::DownloadFailed(id.clone(), format!("{}", err))));
- }
+ Command::StartDownload(id) => {
+ etx.send(Event::DownloadingUpdate(id.clone()));
+ if let Some(ref rvi) = self.rvi {
+ let _ = rvi.remote.lock().unwrap().send_download_started(id);
+ } else {
+ let _ = sota.download_update(id.clone())
+ .map(|dl| etx.send(Event::DownloadComplete(dl)))
+ .map_err(|err| etx.send(Event::DownloadFailed(id, format!("{}", err))));
}
}
- Command::StartInstall(dl) => {
- let _ = sota.install_update(dl)
+ Command::StartInstall(id) => {
+ etx.send(Event::InstallingUpdate(id.clone()));
+ let _ = sota.install_update(id)
.map(|report| etx.send(Event::InstallComplete(report)))
.map_err(|report| etx.send(Event::InstallFailed(report)));
}
- Command::Shutdown => std::process::exit(0),
+ Command::Shutdown => process::exit(0),
}
Ok(())
@@ -220,8 +263,10 @@ impl<'t> GlobalInterpreter<'t> {
match cmd {
Command::Authenticate(_) => {
let config = self.config.auth.clone().expect("trying to authenticate without auth config");
- self.set_client(Auth::Credentials(ClientId(config.client_id),
- ClientSecret(config.client_secret)));
+ self.set_client(Auth::Credentials(ClientCredentials {
+ client_id: config.client_id,
+ client_secret: config.client_secret,
+ }));
let server = config.server.join("/token").expect("couldn't build authentication url");
let token = try!(authenticate(server, self.http_client.as_ref()));
self.set_client(Auth::Token(token.clone()));
@@ -229,16 +274,9 @@ impl<'t> GlobalInterpreter<'t> {
etx.send(Event::Authenticated);
}
- Command::GetNewUpdates |
- Command::ListInstalledPackages |
- Command::RefreshSystemInfo(_) |
- Command::SendInstalledPackages(_) |
- Command::SendInstalledSoftware(_) |
- Command::SendUpdateReport(_) |
- Command::StartDownload(_) |
- Command::StartInstall(_) => etx.send(Event::NotAuthenticated),
+ Command::Shutdown => process::exit(0),
- Command::Shutdown => std::process::exit(0),
+ _ => etx.send(Event::NotAuthenticated)
}
Ok(())
@@ -270,15 +308,13 @@ mod tests {
fn new_interpreter(replies: Vec<String>, pkg_mgr: PackageManager) -> (Sender<Command>, Receiver<Event>) {
let (etx, erx) = chan::sync::<Event>(0);
let (ctx, crx) = chan::sync::<Command>(0);
- let (itx, _) = chan::sync::<Interpret>(0);
thread::spawn(move || {
let mut gi = GlobalInterpreter {
config: Config::default(),
token: Some(AccessToken::default().into()),
http_client: Box::new(TestClient::from(replies)),
- rvi: None,
- loopback_tx: itx,
+ rvi: None
};
gi.config.device.package_manager = pkg_mgr;
@@ -300,7 +336,7 @@ mod tests {
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
ctx.send(Command::Authenticate(None));
- assert_rx(erx, &[Event::Authenticated]);
+ assert_rx(erx, &[Event::AlreadyAuthenticated]);
}
#[test]
@@ -309,35 +345,26 @@ mod tests {
let pkg_mgr = PackageManager::new_tpm(true);
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
- ctx.send(Command::StartDownload(vec!["1".to_string(), "2".to_string()]));
+ ctx.send(Command::StartDownload("1".to_string()));
assert_rx(erx, &[
Event::DownloadingUpdate("1".to_string()),
Event::DownloadComplete(DownloadComplete {
update_id: "1".to_string(),
update_image: "/tmp/1".to_string(),
signature: "".to_string()
- }),
- Event::DownloadingUpdate("2".to_string()),
- Event::DownloadComplete(DownloadComplete {
- update_id: "2".to_string(),
- update_image: "/tmp/2".to_string(),
- signature: "".to_string()
- }),
+ })
]);
}
#[test]
- fn install_update() {
+ fn install_update_success() {
let replies = vec!["[]".to_string(); 10];
let pkg_mgr = PackageManager::new_tpm(true);
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
- ctx.send(Command::StartInstall(DownloadComplete {
- update_id: "1".to_string(),
- update_image: "/tmp/1".to_string(),
- signature: "".to_string()
- }));
+ ctx.send(Command::StartInstall("1".to_string()));
assert_rx(erx, &[
+ Event::InstallingUpdate("1".to_string()),
Event::InstallComplete(
UpdateReport::single("1".to_string(), UpdateResultCode::OK, "".to_string())
)
@@ -345,17 +372,14 @@ mod tests {
}
#[test]
- fn failed_installation() {
+ fn install_update_failed() {
let replies = vec!["[]".to_string(); 10];
let pkg_mgr = PackageManager::new_tpm(false);
let (ctx, erx) = new_interpreter(replies, pkg_mgr);
- ctx.send(Command::StartInstall(DownloadComplete {
- update_id: "1".to_string(),
- update_image: "/tmp/1".to_string(),
- signature: "".to_string()
- }));
+ ctx.send(Command::StartInstall("1".to_string()));
assert_rx(erx, &[
+ Event::InstallingUpdate("1".to_string()),
Event::InstallFailed(
UpdateReport::single("1".to_string(), UpdateResultCode::INSTALL_FAILED, "failed".to_string())
)