summaryrefslogtreecommitdiff
path: root/src/gateway/websocket.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/gateway/websocket.rs')
-rw-r--r--src/gateway/websocket.rs26
1 files changed, 12 insertions, 14 deletions
diff --git a/src/gateway/websocket.rs b/src/gateway/websocket.rs
index eb5e040..8c597b9 100644
--- a/src/gateway/websocket.rs
+++ b/src/gateway/websocket.rs
@@ -1,12 +1,11 @@
use chan;
use chan::Sender;
use rustc_serialize::json;
-use std::thread;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
-use std::time::Duration;
+use std::thread;
use ws;
-use ws::{listen, CloseCode, Handler, Handshake, Message, Sender as WsSender};
+use ws::{CloseCode, Handler, Handshake, Message, Sender as WsSender};
use ws::util::Token;
use datatype::{Command, Error, Event};
@@ -24,10 +23,9 @@ impl Gateway for Websocket {
fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
let clients = self.clients.clone();
let addr = self.server.clone();
- info!("Opening websocket listener at {}", addr);
thread::spawn(move || {
- listen(&addr as &str, |out| {
+ ws::listen(&addr as &str, |out| {
WebsocketHandler {
out: out,
itx: itx.clone(),
@@ -36,8 +34,7 @@ impl Gateway for Websocket {
}).expect("couldn't start websocket listener");
});
- thread::sleep(Duration::from_secs(1)); // FIXME: ugly hack for blocking listen call
- Ok(info!("Websocket gateway started."))
+ Ok(info!("Websocket gateway started at {}.", self.server))
}
fn pulse(&self, event: Event) {
@@ -69,7 +66,7 @@ impl Handler for WebsocketHandler {
Err(err)
}
- Err(_) => unreachable!()
+ Err(err) => panic!("unexpected websocket on_message error: {}", err)
})
}
@@ -117,7 +114,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use ws;
- use ws::{connect, CloseCode};
+ use ws::CloseCode;
use datatype::{Command, Event};
use gateway::{Gateway, Interpret};
@@ -125,6 +122,7 @@ mod tests {
#[test]
+ #[ignore] // FIXME: wait for https://github.com/housleyjk/ws-rs/issues/64
fn websocket_connections() {
let (etx, erx) = chan::sync::<Event>(0);
let (itx, irx) = chan::sync::<Interpret>(0);
@@ -140,9 +138,9 @@ mod tests {
loop {
let interpret = irx.recv().expect("gtx is closed");
match interpret.command {
- Command::StartDownload(ids) => {
+ Command::StartDownload(id) => {
let tx = interpret.response_tx.unwrap();
- tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned()));
+ tx.lock().unwrap().send(Event::FoundSystemInfo(id));
}
_ => panic!("expected AcceptUpdates"),
}
@@ -152,9 +150,9 @@ mod tests {
crossbeam::scope(|scope| {
for id in 0..10 {
scope.spawn(move || {
- connect("ws://localhost:3012", |out| {
- out.send(format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id))
- .expect("couldn't write to websocket");
+ ws::connect("ws://localhost:3012", |out| {
+ let msg = format!(r#"{{ "variant": "StartDownload", "fields": [["{}"]] }}"#, id);
+ out.send(msg).expect("couldn't write to websocket");
move |msg: ws::Message| {
let ev: Event = json::decode(&format!("{}", msg)).unwrap();