From ff1183ab6ea6eeb67de5e72abb7d0c5d62c90d95 Mon Sep 17 00:00:00 2001 From: Shaun Taheri Date: Thu, 1 Sep 2016 11:23:38 +0200 Subject: Add Unix domain socket event listener --- run/sota.toml.env | 3 +- run/sota.toml.template | 3 +- src/datatype/command.rs | 24 ++++++-------- src/datatype/config.rs | 23 +++++++------- src/gateway/socket.rs | 78 +++++++++++++++++++++++++++++++++------------- src/main.rs | 11 +++++-- tests/sota.toml | 3 +- tests/sota_client_tests.rs | 6 ++-- 8 files changed, 96 insertions(+), 55 deletions(-) diff --git a/run/sota.toml.env b/run/sota.toml.env index 0823d3d..595f905 100644 --- a/run/sota.toml.env +++ b/run/sota.toml.env @@ -25,7 +25,8 @@ GATEWAY_WEBSOCKET=true NETWORK_HTTP_SERVER=http://127.0.0.1:8888 NETWORK_RVI_EDGE_SERVER=http://127.0.0.1:9080 -NETWORK_SOCKET_PATH=/tmp/sota.socket +NETWORK_SOCKET_COMMANDS_PATH=/tmp/sota-commands.socket +NETWORK_SOCKET_EVENTS_PATH=/tmp/sota-events.socket NETWORK_WEBSOCKET_SERVER=ws://127.0.0.1:3012 RVI_CLIENT=http://127.0.0.1:8901 diff --git a/run/sota.toml.template b/run/sota.toml.template index daaabf4..1a68ac6 100644 --- a/run/sota.toml.template +++ b/run/sota.toml.template @@ -35,7 +35,8 @@ websocket = ${GATEWAY_WEBSOCKET} [network] http_server = "${NETWORK_HTTP_SERVER}" rvi_edge_server = "${NETWORK_RVI_EDGE_SERVER}" -socket_path = "${NETWORK_SOCKET_PATH}" +socket_commands_path = "${NETWORK_SOCKET_COMMANDS_PATH}" +socket_events_path = "${NETWORK_SOCKET_EVENTS_PATH}" websocket_server = "${NETWORK_WEBSOCKET_SERVER}" [rvi] diff --git a/src/datatype/command.rs b/src/datatype/command.rs index aa1458e..d449bb6 100644 --- a/src/datatype/command.rs +++ b/src/datatype/command.rs @@ -151,12 +151,10 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result { }, Command::SendUpdateReport(_) => match args.len() { - 0 | 1 | 2 => Err(Error::Command("usage: sendup ".to_string())), - 3 => { + 0 | 1 => Err(Error::Command("usage: sendup ".to_string())), + 2 => { if let Ok(code) = args[1].parse::() { - let id = args[0].to_string(); - let text = args[2].to_string(); - Ok(Command::SendUpdateReport(UpdateReport::single(id, code, text))) + Ok(Command::SendUpdateReport(UpdateReport::single(args[0].to_string(), code, "".to_string()))) } else { Err(Error::Command("couldn't parse 2nd argument as an UpdateResultCode".to_string())) } @@ -274,17 +272,13 @@ mod tests { #[test] fn send_update_report_test() { - assert_eq!("SendUpdateReport myid OK done".parse::().unwrap(), - Command::SendUpdateReport(UpdateReport::single( - "myid".to_string(), UpdateResultCode::OK, "done".to_string() - ))); - assert_eq!("sendup myid 19 generr".parse::().unwrap(), - Command::SendUpdateReport(UpdateReport::single( - "myid".to_string(), UpdateResultCode::GENERAL_ERROR, "generr".to_string() - ))); - assert!("sendup myid 20 nosuch".parse::().is_err()); + assert_eq!("SendUpdateReport myid OK".parse::().unwrap(), Command::SendUpdateReport( + UpdateReport::single("myid".to_string(), UpdateResultCode::OK, "".to_string()))); + assert_eq!("sendup myid 19".parse::().unwrap(), Command::SendUpdateReport( + UpdateReport::single("myid".to_string(), UpdateResultCode::GENERAL_ERROR, "".to_string()))); + assert!("sendup myid 20".parse::().is_err()); assert!("SendInstalledPackages".parse::().is_err()); - assert!("sendup 1 2 3 4".parse::().is_err()); + assert!("sendup 1 2 3".parse::().is_err()); } #[test] diff --git a/src/datatype/config.rs b/src/datatype/config.rs index f95eecf..5db0f12 100644 --- a/src/datatype/config.rs +++ b/src/datatype/config.rs @@ -252,19 +252,21 @@ impl Default for GatewayConfig { /// A parsed representation of the [network] configuration section. #[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)] pub struct NetworkConfig { - pub http_server: String, - pub rvi_edge_server: String, - pub socket_path: String, - pub websocket_server: String + pub http_server: String, + pub rvi_edge_server: String, + pub socket_commands_path: String, + pub socket_events_path: String, + pub websocket_server: String } impl Default for NetworkConfig { fn default() -> NetworkConfig { NetworkConfig { - http_server: "http://127.0.0.1:8888".to_string(), - rvi_edge_server: "http://127.0.0.1:9080".to_string(), - socket_path: "/tmp/sota.socket".to_string(), - websocket_server: "ws://127.0.0.1:3012".to_string() + http_server: "http://127.0.0.1:8888".to_string(), + rvi_edge_server: "http://127.0.0.1:9080".to_string(), + socket_commands_path: "/tmp/sota-commands.socket".to_string(), + socket_events_path: "/tmp/sota-events.socket".to_string(), + websocket_server: "ws://127.0.0.1:3012".to_string() } } } @@ -330,7 +332,6 @@ mod tests { packages_dir = "/tmp/" package_manager = "deb" certificates_path = "/tmp/sota_certificates" - socket_path = "/tmp/sota.socket" "#; const GATEWAY_CONFIG: &'static str = @@ -349,7 +350,8 @@ mod tests { [network] http_server = "http://127.0.0.1:8888" rvi_edge_server = "http://127.0.0.1:9080" - socket_path = "/tmp/sota.socket" + socket_commands_path = "/tmp/sota-commands.socket" + socket_events_path = "/tmp/sota-events.socket" websocket_server = "ws://127.0.0.1:3012" "#; @@ -357,7 +359,6 @@ mod tests { r#" [rvi] client = "http://127.0.0.1:8901" - edge = "http://127.0.0.1:9080" storage_dir = "/var/sota" timeout = 20 "#; diff --git a/src/gateway/socket.rs b/src/gateway/socket.rs index 9b6175f..f252e7c 100644 --- a/src/gateway/socket.rs +++ b/src/gateway/socket.rs @@ -13,26 +13,26 @@ use unix_socket::{UnixListener, UnixStream}; /// The `Socket` gateway is used for communication via Unix Domain Sockets. pub struct Socket { - pub path: String + pub commands_path: String, + pub events_path: String, } impl Gateway for Socket { fn initialize(&mut self, itx: Sender) -> Result<(), String> { - let itx = Arc::new(Mutex::new(itx)); - let _ = fs::remove_file(&self.path); - - let server = match UnixListener::bind(self.path.clone()) { - Ok(server) => server, - Err(err) => return Err(format!("couldn't start socket gateway: {}", err)) + let _ = fs::remove_file(&self.commands_path); + let commands = match UnixListener::bind(&self.commands_path) { + Ok(sock) => sock, + Err(err) => return Err(format!("couldn't open commands socket: {}", err)) }; + let itx = Arc::new(Mutex::new(itx)); thread::spawn(move || { - for input in server.incoming() { - if let Err(err) = input { - error!("couldn't read socket input: {}", err); + for conn in commands.incoming() { + if let Err(err) = conn { + error!("couldn't get commands socket connection: {}", err); continue } - let mut stream = input.unwrap(); + let mut stream = conn.unwrap(); let itx = itx.clone(); thread::spawn(move || { @@ -41,14 +41,30 @@ impl Gateway for Socket { .unwrap_or_else(|err| format!("{}", err).into_bytes()); stream.write_all(&resp) - .unwrap_or_else(|err| error!("couldn't write to socket: {}", err)); + .unwrap_or_else(|err| error!("couldn't write to commands socket: {}", err)); stream.shutdown(Shutdown::Write) - .unwrap_or_else(|err| error!("couldn't close socket for writing: {}", err)) + .unwrap_or_else(|err| error!("couldn't close commands socket: {}", err)); }); } }); - Ok(info!("Unix Domain Socket gateway listening at {}.", self.path)) + Ok(info!("Socket listening for commands at {} and sending events to {}.", + self.commands_path, self.events_path)) + } + + fn pulse(&self, event: Event) { + match event { + Event::DownloadComplete(dl) => { + let _ = UnixStream::connect(&self.events_path).map(|mut stream| { + stream.write_all(&json::encode(&dl).expect("couldn't encode Event").into_bytes()) + .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err)); + stream.shutdown(Shutdown::Write) + .unwrap_or_else(|err| error!("couldn't close events socket: {}", err)); + }).map_err(|err| error!("couldn't open events socket: {}", err)); + } + + _ => () + } } } @@ -74,24 +90,44 @@ mod tests { use chan; use crossbeam; use rustc_serialize::json; - use std::thread; + use std::{fs, thread}; use std::io::{Read, Write}; use std::net::Shutdown; use std::time::Duration; - use datatype::{Command, Event}; + use datatype::{Command, DownloadComplete, Event}; use gateway::{Gateway, Interpret}; use super::*; - use unix_socket::UnixStream; + use unix_socket::{UnixListener, UnixStream}; #[test] - fn unix_domain_socket_connections() { + fn socket_commands_and_events() { let (etx, erx) = chan::sync::(0); let (itx, irx) = chan::sync::(0); - thread::spawn(move || Socket { path: "/tmp/sota.socket".to_string() }.start(itx, erx)); - thread::sleep(Duration::from_millis(100)); // wait until socket is created + thread::spawn(move || Socket { + commands_path: "/tmp/sota-commands.socket".to_string(), + events_path: "/tmp/sota-events.socket".to_string(), + }.start(itx, erx)); + thread::sleep(Duration::from_millis(100)); // wait until socket gateway is created + + let path = "/tmp/sota-events.socket"; + let _ = fs::remove_file(&path); + let server = UnixListener::bind(&path).expect("couldn't create events socket for testing"); + + let send = DownloadComplete { + update_id: "1".to_string(), + update_image: "/foo/bar".to_string(), + signature: "abc".to_string() + }; + etx.send(Event::DownloadComplete(send.clone())); + + let (mut stream, _) = server.accept().expect("couldn't read from events socket"); + let mut text = String::new(); + stream.read_to_string(&mut text).unwrap(); + let receive: DownloadComplete = json::decode(&text).expect("couldn't decode DownloadComplete message"); + assert_eq!(send, receive); thread::spawn(move || { let _ = etx; // move into this scope @@ -110,7 +146,7 @@ mod tests { crossbeam::scope(|scope| { for id in 0..10 { scope.spawn(move || { - let mut stream = UnixStream::connect("/tmp/sota.socket").expect("couldn't connect to socket"); + let mut stream = UnixStream::connect("/tmp/sota-commands.socket").expect("couldn't connect to socket"); let _ = stream.write_all(&format!("dl {}", id).into_bytes()).expect("couldn't write to stream"); stream.shutdown(Shutdown::Write).expect("couldn't shut down writing"); diff --git a/src/main.rs b/src/main.rs index 715fc08..61fa02a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -127,7 +127,10 @@ fn main() { if config.gateway.socket { let socket_itx = itx.clone(); let socket_sub = broadcast.subscribe(); - let mut socket = Socket { path: config.network.socket_path.clone() }; + let mut socket = Socket { + commands_path: config.network.socket_commands_path.clone(), + events_path: config.network.socket_events_path.clone() + }; scope.spawn(move || socket.start(socket_itx, socket_sub)); } @@ -213,7 +216,8 @@ fn build_config() -> Config { opts.optopt("", "network-http-server", "change the http server gateway address", "ADDR"); opts.optopt("", "network-rvi-edge-server", "change the rvi edge server gateway address", "ADDR"); - opts.optopt("", "network-socket-path", "change the domain socket path", "PATH"); + opts.optopt("", "network-socket-commands-path", "change the socket path for reading commands", "PATH"); + opts.optopt("", "network-socket-events-path", "change the socket path for sending events", "PATH"); opts.optopt("", "network-websocket-server", "change the websocket gateway address", "ADDR"); opts.optopt("", "rvi-client", "change the rvi client URL", "URL"); @@ -286,7 +290,8 @@ fn build_config() -> Config { matches.opt_str("network-http-server").map(|server| config.network.http_server = server); matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server); - matches.opt_str("network-socket-path").map(|path| config.network.socket_path = path); + matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path); + matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path); matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server); config.rvi.as_mut().map(|rvi_cfg| { diff --git a/tests/sota.toml b/tests/sota.toml index 7ad6162..0801509 100644 --- a/tests/sota.toml +++ b/tests/sota.toml @@ -35,7 +35,8 @@ websocket = true [network] http_server = "http://127.0.0.1:8888" rvi_edge_server = "http://127.0.0.1:9080" -socket_path = "/tmp/sota.socket" +socket_commands_path = "/tmp/sota-commands.socket" +socket_events_path = "/tmp/sota-events.socket" websocket_server = "ws://127.0.0.1:3012" [rvi] diff --git a/tests/sota_client_tests.rs b/tests/sota_client_tests.rs index 2655732..fcdbd0d 100644 --- a/tests/sota_client_tests.rs +++ b/tests/sota_client_tests.rs @@ -93,8 +93,10 @@ Options: change the http server gateway address --network-rvi-edge-server ADDR change the rvi edge server gateway address - --network-socket-path PATH - change the domain socket path + --network-socket-commands-path PATH + change the socket path for reading commands + --network-socket-events-path PATH + change the socket path for sending events --network-websocket-server ADDR change the websocket gateway address --rvi-client URL -- cgit v1.2.1