summaryrefslogtreecommitdiff
path: root/src/gateway
diff options
context:
space:
mode:
authorShaun Taheri <github@taheris.co.uk>2016-10-19 16:57:54 +0200
committerGitHub <noreply@github.com>2016-10-19 16:57:54 +0200
commitf01087b25987fd6e7b356c7a62c9f49c720be825 (patch)
tree1c9497eb1c4cb09b8f0125d9fbb1852340fa5213 /src/gateway
parentc6d3bb27c6e794f124259b8f7e749df1687f28b0 (diff)
parent41db1050631cfe0aaca8922ec4a58a0f2109ac5d (diff)
downloadrvi_sota_client-f01087b25987fd6e7b356c7a62c9f49c720be825.tar.gz
Merge pull request #132 from advancedtelematic/bugfix/PRO-1601/fix-sota-toml
Fix config generation
Diffstat (limited to 'src/gateway')
-rw-r--r--src/gateway/dbus.rs23
-rw-r--r--src/gateway/websocket.rs38
2 files changed, 27 insertions, 34 deletions
diff --git a/src/gateway/dbus.rs b/src/gateway/dbus.rs
index 07a3b9c..321e262 100644
--- a/src/gateway/dbus.rs
+++ b/src/gateway/dbus.rs
@@ -24,7 +24,7 @@ impl Gateway for DBus {
thread::spawn(move || {
let conn = Connection::get_private(BusType::Session).expect("couldn't get dbus session");
- conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).unwrap();
+ conn.register_name(&dbus_cfg.name, NameFlag::ReplaceExisting as u32).expect("couldn't register name");
let mut obj_path = ObjectPath::new(&conn, &dbus_cfg.path, true);
obj_path.insert_interface(&dbus_cfg.interface, default_interface(itx));
@@ -33,10 +33,11 @@ impl Gateway for DBus {
loop {
for item in conn.iter(1000) {
if let ConnectionItem::MethodCall(mut msg) = item {
- info!("DBus method call: {:?}", msg);
- obj_path.handle_message(&mut msg).map(|result| {
- let _ = result.map_err(|_| error!("dbus method call failed: {:?}", msg));
- });
+ match obj_path.handle_message(&mut msg) {
+ Some(Ok(())) => info!("DBus message sent: {:?}", msg),
+ Some(Err(())) => error!("DBus message send failed: {:?}", msg),
+ None => debug!("unhandled dbus message: {:?}", msg)
+ }
}
}
}
@@ -48,7 +49,7 @@ impl Gateway for DBus {
fn pulse(&self, event: Event) {
match event {
Event::UpdateAvailable(avail) => {
- let msg = self.new_message("updateAvailable", &[
+ let msg = self.new_swm_message("updateAvailable", &[
MessageItem::from(avail.update_id),
MessageItem::from(avail.signature),
MessageItem::from(avail.description),
@@ -59,7 +60,7 @@ impl Gateway for DBus {
}
Event::DownloadComplete(comp) => {
- let msg = self.new_message("downloadComplete", &[
+ let msg = self.new_swm_message("downloadComplete", &[
MessageItem::from(comp.update_image),
MessageItem::from(comp.signature)
]);
@@ -68,7 +69,7 @@ impl Gateway for DBus {
}
Event::InstalledSoftwareNeeded => {
- let msg = self.new_message("getInstalledPackages", &[
+ let msg = self.new_swm_message("getInstalledPackages", &[
MessageItem::from(true), // include packages?
MessageItem::from(false) // include firmware?
]);
@@ -103,7 +104,7 @@ impl Gateway for DBus {
}
impl DBus {
- fn new_message(&self, method: &str, args: &[MessageItem]) -> Message {
+ fn new_swm_message(&self, method: &str, args: &[MessageItem]) -> Message {
let mgr = self.dbus_cfg.software_manager.clone();
let path = self.dbus_cfg.software_manager_path.clone();
let result = Message::new_method_call(&mgr, &path, &mgr, method);
@@ -139,7 +140,7 @@ fn send(itx: &Sender<Interpret>, cmd: Command) {
fn handle_initiate_download(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult {
let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg()));
- debug!("handle_initiate_download: sender={:?}, msg={:?}", sender, msg);
+ debug!("dbus handle_initiate_download: sender={:?}, msg={:?}", sender, msg);
let mut args = msg.get_items().into_iter();
let arg_id = try!(args.next().ok_or(dbus::missing_arg()));
@@ -151,7 +152,7 @@ fn handle_initiate_download(itx: &Sender<Interpret>, msg: &mut Message) -> Metho
fn handle_update_report(itx: &Sender<Interpret>, msg: &mut Message) -> MethodResult {
let sender = try!(msg.sender().map(|s| s.to_string()).ok_or(dbus::missing_arg()));
- debug!("handle_update_report: sender ={:?}, msg ={:?}", sender, msg);
+ debug!("dbus handle_update_report: sender={:?}, msg={:?}", sender, msg);
let mut args = msg.get_items().into_iter();
let id_arg = try!(args.next().ok_or(dbus::missing_arg()));
diff --git a/src/gateway/websocket.rs b/src/gateway/websocket.rs
index 72e0889..f63a763 100644
--- a/src/gateway/websocket.rs
+++ b/src/gateway/websocket.rs
@@ -1,12 +1,10 @@
use chan;
use chan::Sender;
use rustc_serialize::json;
-use std::thread;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
-use std::time::Duration;
use ws;
-use ws::{listen, CloseCode, Handler, Handshake, Message, Sender as WsSender};
+use ws::{CloseCode, Handler, Handshake, Message, Sender as WsSender};
use ws::util::Token;
use datatype::{Command, Error, Event};
@@ -22,22 +20,15 @@ pub struct Websocket {
impl Gateway for Websocket {
fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
- let clients = self.clients.clone();
- let addr = self.server.clone();
- info!("Opening websocket listener at {}", addr);
-
- thread::spawn(move || {
- listen(&addr as &str, |out| {
- WebsocketHandler {
- out: out,
- itx: itx.clone(),
- clients: clients.clone()
- }
- }).expect("couldn't start websocket listener");
- });
+ ws::listen(&self.server.clone() as &str, |out| {
+ WebsocketHandler {
+ out: out,
+ itx: itx.clone(),
+ clients: self.clients.clone()
+ }
+ }).expect("couldn't start websocket listener");
- thread::sleep(Duration::from_secs(1)); // FIXME: ugly hack for blocking listen call
- Ok(info!("Websocket gateway started."))
+ Ok(info!("Websocket gateway started at {}.", self.server))
}
fn pulse(&self, event: Event) {
@@ -69,7 +60,7 @@ impl Handler for WebsocketHandler {
Err(err)
}
- Err(_) => unreachable!()
+ Err(err) => panic!("unexpected websocket on_message error: {}", err)
})
}
@@ -117,7 +108,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use ws;
- use ws::{connect, CloseCode};
+ use ws::CloseCode;
use datatype::{Command, Event};
use gateway::{Gateway, Interpret};
@@ -125,6 +116,7 @@ mod tests {
#[test]
+ #[ignore] // FIXME: wait for https://github.com/housleyjk/ws-rs/issues/64
fn websocket_connections() {
let (etx, erx) = chan::sync::<Event>(0);
let (itx, irx) = chan::sync::<Interpret>(0);
@@ -152,9 +144,9 @@ mod tests {
crossbeam::scope(|scope| {
for id in 0..10 {
scope.spawn(move || {
- connect("ws://localhost:3012", |out| {
- out.send(format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id))
- .expect("couldn't write to websocket");
+ ws::connect("ws://localhost:3012", |out| {
+ let msg = format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id);
+ out.send(msg).expect("couldn't write to websocket");
move |msg: ws::Message| {
let ev: Event = json::decode(&format!("{}", msg)).unwrap();