summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShaun Taheri <github@taheris.co.uk>2016-09-01 17:45:49 +0200
committerGitHub <noreply@github.com>2016-09-01 17:45:49 +0200
commit7a2abdf4f751a5634d9ce997603c160d48d76593 (patch)
treeaae51d9de6ff19b5c904fd5e1713c45dde471e1a
parentc44dc6cdc080e12be31935667d8a0eecbce65361 (diff)
parentff1183ab6ea6eeb67de5e72abb7d0c5d62c90d95 (diff)
downloadrvi_sota_client-7a2abdf4f751a5634d9ce997603c160d48d76593.tar.gz
Merge pull request #113 from advancedtelematic/feat/PRO-1271/socket-event-listener
Add Unix domain socket event listener
-rw-r--r--run/sota.toml.env3
-rw-r--r--run/sota.toml.template3
-rw-r--r--src/datatype/command.rs24
-rw-r--r--src/datatype/config.rs23
-rw-r--r--src/gateway/socket.rs78
-rw-r--r--src/main.rs11
-rw-r--r--tests/sota.toml3
-rw-r--r--tests/sota_client_tests.rs6
8 files changed, 96 insertions, 55 deletions
diff --git a/run/sota.toml.env b/run/sota.toml.env
index 0823d3d..595f905 100644
--- a/run/sota.toml.env
+++ b/run/sota.toml.env
@@ -25,7 +25,8 @@ GATEWAY_WEBSOCKET=true
NETWORK_HTTP_SERVER=http://127.0.0.1:8888
NETWORK_RVI_EDGE_SERVER=http://127.0.0.1:9080
-NETWORK_SOCKET_PATH=/tmp/sota.socket
+NETWORK_SOCKET_COMMANDS_PATH=/tmp/sota-commands.socket
+NETWORK_SOCKET_EVENTS_PATH=/tmp/sota-events.socket
NETWORK_WEBSOCKET_SERVER=ws://127.0.0.1:3012
RVI_CLIENT=http://127.0.0.1:8901
diff --git a/run/sota.toml.template b/run/sota.toml.template
index daaabf4..1a68ac6 100644
--- a/run/sota.toml.template
+++ b/run/sota.toml.template
@@ -35,7 +35,8 @@ websocket = ${GATEWAY_WEBSOCKET}
[network]
http_server = "${NETWORK_HTTP_SERVER}"
rvi_edge_server = "${NETWORK_RVI_EDGE_SERVER}"
-socket_path = "${NETWORK_SOCKET_PATH}"
+socket_commands_path = "${NETWORK_SOCKET_COMMANDS_PATH}"
+socket_events_path = "${NETWORK_SOCKET_EVENTS_PATH}"
websocket_server = "${NETWORK_WEBSOCKET_SERVER}"
[rvi]
diff --git a/src/datatype/command.rs b/src/datatype/command.rs
index aa1458e..d449bb6 100644
--- a/src/datatype/command.rs
+++ b/src/datatype/command.rs
@@ -151,12 +151,10 @@ fn parse_arguments(cmd: Command, args: Vec<&str>) -> Result<Command, Error> {
},
Command::SendUpdateReport(_) => match args.len() {
- 0 | 1 | 2 => Err(Error::Command("usage: sendup <update-id> <result-code> <result-text>".to_string())),
- 3 => {
+ 0 | 1 => Err(Error::Command("usage: sendup <update-id> <result-code>".to_string())),
+ 2 => {
if let Ok(code) = args[1].parse::<UpdateResultCode>() {
- let id = args[0].to_string();
- let text = args[2].to_string();
- Ok(Command::SendUpdateReport(UpdateReport::single(id, code, text)))
+ Ok(Command::SendUpdateReport(UpdateReport::single(args[0].to_string(), code, "".to_string())))
} else {
Err(Error::Command("couldn't parse 2nd argument as an UpdateResultCode".to_string()))
}
@@ -274,17 +272,13 @@ mod tests {
#[test]
fn send_update_report_test() {
- assert_eq!("SendUpdateReport myid OK done".parse::<Command>().unwrap(),
- Command::SendUpdateReport(UpdateReport::single(
- "myid".to_string(), UpdateResultCode::OK, "done".to_string()
- )));
- assert_eq!("sendup myid 19 generr".parse::<Command>().unwrap(),
- Command::SendUpdateReport(UpdateReport::single(
- "myid".to_string(), UpdateResultCode::GENERAL_ERROR, "generr".to_string()
- )));
- assert!("sendup myid 20 nosuch".parse::<Command>().is_err());
+ assert_eq!("SendUpdateReport myid OK".parse::<Command>().unwrap(), Command::SendUpdateReport(
+ UpdateReport::single("myid".to_string(), UpdateResultCode::OK, "".to_string())));
+ assert_eq!("sendup myid 19".parse::<Command>().unwrap(), Command::SendUpdateReport(
+ UpdateReport::single("myid".to_string(), UpdateResultCode::GENERAL_ERROR, "".to_string())));
+ assert!("sendup myid 20".parse::<Command>().is_err());
assert!("SendInstalledPackages".parse::<Command>().is_err());
- assert!("sendup 1 2 3 4".parse::<Command>().is_err());
+ assert!("sendup 1 2 3".parse::<Command>().is_err());
}
#[test]
diff --git a/src/datatype/config.rs b/src/datatype/config.rs
index f95eecf..5db0f12 100644
--- a/src/datatype/config.rs
+++ b/src/datatype/config.rs
@@ -252,19 +252,21 @@ impl Default for GatewayConfig {
/// A parsed representation of the [network] configuration section.
#[derive(RustcDecodable, PartialEq, Eq, Debug, Clone)]
pub struct NetworkConfig {
- pub http_server: String,
- pub rvi_edge_server: String,
- pub socket_path: String,
- pub websocket_server: String
+ pub http_server: String,
+ pub rvi_edge_server: String,
+ pub socket_commands_path: String,
+ pub socket_events_path: String,
+ pub websocket_server: String
}
impl Default for NetworkConfig {
fn default() -> NetworkConfig {
NetworkConfig {
- http_server: "http://127.0.0.1:8888".to_string(),
- rvi_edge_server: "http://127.0.0.1:9080".to_string(),
- socket_path: "/tmp/sota.socket".to_string(),
- websocket_server: "ws://127.0.0.1:3012".to_string()
+ http_server: "http://127.0.0.1:8888".to_string(),
+ rvi_edge_server: "http://127.0.0.1:9080".to_string(),
+ socket_commands_path: "/tmp/sota-commands.socket".to_string(),
+ socket_events_path: "/tmp/sota-events.socket".to_string(),
+ websocket_server: "ws://127.0.0.1:3012".to_string()
}
}
}
@@ -330,7 +332,6 @@ mod tests {
packages_dir = "/tmp/"
package_manager = "deb"
certificates_path = "/tmp/sota_certificates"
- socket_path = "/tmp/sota.socket"
"#;
const GATEWAY_CONFIG: &'static str =
@@ -349,7 +350,8 @@ mod tests {
[network]
http_server = "http://127.0.0.1:8888"
rvi_edge_server = "http://127.0.0.1:9080"
- socket_path = "/tmp/sota.socket"
+ socket_commands_path = "/tmp/sota-commands.socket"
+ socket_events_path = "/tmp/sota-events.socket"
websocket_server = "ws://127.0.0.1:3012"
"#;
@@ -357,7 +359,6 @@ mod tests {
r#"
[rvi]
client = "http://127.0.0.1:8901"
- edge = "http://127.0.0.1:9080"
storage_dir = "/var/sota"
timeout = 20
"#;
diff --git a/src/gateway/socket.rs b/src/gateway/socket.rs
index 9b6175f..f252e7c 100644
--- a/src/gateway/socket.rs
+++ b/src/gateway/socket.rs
@@ -13,26 +13,26 @@ use unix_socket::{UnixListener, UnixStream};
/// The `Socket` gateway is used for communication via Unix Domain Sockets.
pub struct Socket {
- pub path: String
+ pub commands_path: String,
+ pub events_path: String,
}
impl Gateway for Socket {
fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> {
- let itx = Arc::new(Mutex::new(itx));
- let _ = fs::remove_file(&self.path);
-
- let server = match UnixListener::bind(self.path.clone()) {
- Ok(server) => server,
- Err(err) => return Err(format!("couldn't start socket gateway: {}", err))
+ let _ = fs::remove_file(&self.commands_path);
+ let commands = match UnixListener::bind(&self.commands_path) {
+ Ok(sock) => sock,
+ Err(err) => return Err(format!("couldn't open commands socket: {}", err))
};
+ let itx = Arc::new(Mutex::new(itx));
thread::spawn(move || {
- for input in server.incoming() {
- if let Err(err) = input {
- error!("couldn't read socket input: {}", err);
+ for conn in commands.incoming() {
+ if let Err(err) = conn {
+ error!("couldn't get commands socket connection: {}", err);
continue
}
- let mut stream = input.unwrap();
+ let mut stream = conn.unwrap();
let itx = itx.clone();
thread::spawn(move || {
@@ -41,14 +41,30 @@ impl Gateway for Socket {
.unwrap_or_else(|err| format!("{}", err).into_bytes());
stream.write_all(&resp)
- .unwrap_or_else(|err| error!("couldn't write to socket: {}", err));
+ .unwrap_or_else(|err| error!("couldn't write to commands socket: {}", err));
stream.shutdown(Shutdown::Write)
- .unwrap_or_else(|err| error!("couldn't close socket for writing: {}", err))
+ .unwrap_or_else(|err| error!("couldn't close commands socket: {}", err));
});
}
});
- Ok(info!("Unix Domain Socket gateway listening at {}.", self.path))
+ Ok(info!("Socket listening for commands at {} and sending events to {}.",
+ self.commands_path, self.events_path))
+ }
+
+ fn pulse(&self, event: Event) {
+ match event {
+ Event::DownloadComplete(dl) => {
+ let _ = UnixStream::connect(&self.events_path).map(|mut stream| {
+ stream.write_all(&json::encode(&dl).expect("couldn't encode Event").into_bytes())
+ .unwrap_or_else(|err| error!("couldn't write to events socket: {}", err));
+ stream.shutdown(Shutdown::Write)
+ .unwrap_or_else(|err| error!("couldn't close events socket: {}", err));
+ }).map_err(|err| error!("couldn't open events socket: {}", err));
+ }
+
+ _ => ()
+ }
}
}
@@ -74,24 +90,44 @@ mod tests {
use chan;
use crossbeam;
use rustc_serialize::json;
- use std::thread;
+ use std::{fs, thread};
use std::io::{Read, Write};
use std::net::Shutdown;
use std::time::Duration;
- use datatype::{Command, Event};
+ use datatype::{Command, DownloadComplete, Event};
use gateway::{Gateway, Interpret};
use super::*;
- use unix_socket::UnixStream;
+ use unix_socket::{UnixListener, UnixStream};
#[test]
- fn unix_domain_socket_connections() {
+ fn socket_commands_and_events() {
let (etx, erx) = chan::sync::<Event>(0);
let (itx, irx) = chan::sync::<Interpret>(0);
- thread::spawn(move || Socket { path: "/tmp/sota.socket".to_string() }.start(itx, erx));
- thread::sleep(Duration::from_millis(100)); // wait until socket is created
+ thread::spawn(move || Socket {
+ commands_path: "/tmp/sota-commands.socket".to_string(),
+ events_path: "/tmp/sota-events.socket".to_string(),
+ }.start(itx, erx));
+ thread::sleep(Duration::from_millis(100)); // wait until socket gateway is created
+
+ let path = "/tmp/sota-events.socket";
+ let _ = fs::remove_file(&path);
+ let server = UnixListener::bind(&path).expect("couldn't create events socket for testing");
+
+ let send = DownloadComplete {
+ update_id: "1".to_string(),
+ update_image: "/foo/bar".to_string(),
+ signature: "abc".to_string()
+ };
+ etx.send(Event::DownloadComplete(send.clone()));
+
+ let (mut stream, _) = server.accept().expect("couldn't read from events socket");
+ let mut text = String::new();
+ stream.read_to_string(&mut text).unwrap();
+ let receive: DownloadComplete = json::decode(&text).expect("couldn't decode DownloadComplete message");
+ assert_eq!(send, receive);
thread::spawn(move || {
let _ = etx; // move into this scope
@@ -110,7 +146,7 @@ mod tests {
crossbeam::scope(|scope| {
for id in 0..10 {
scope.spawn(move || {
- let mut stream = UnixStream::connect("/tmp/sota.socket").expect("couldn't connect to socket");
+ let mut stream = UnixStream::connect("/tmp/sota-commands.socket").expect("couldn't connect to socket");
let _ = stream.write_all(&format!("dl {}", id).into_bytes()).expect("couldn't write to stream");
stream.shutdown(Shutdown::Write).expect("couldn't shut down writing");
diff --git a/src/main.rs b/src/main.rs
index 715fc08..61fa02a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -127,7 +127,10 @@ fn main() {
if config.gateway.socket {
let socket_itx = itx.clone();
let socket_sub = broadcast.subscribe();
- let mut socket = Socket { path: config.network.socket_path.clone() };
+ let mut socket = Socket {
+ commands_path: config.network.socket_commands_path.clone(),
+ events_path: config.network.socket_events_path.clone()
+ };
scope.spawn(move || socket.start(socket_itx, socket_sub));
}
@@ -213,7 +216,8 @@ fn build_config() -> Config {
opts.optopt("", "network-http-server", "change the http server gateway address", "ADDR");
opts.optopt("", "network-rvi-edge-server", "change the rvi edge server gateway address", "ADDR");
- opts.optopt("", "network-socket-path", "change the domain socket path", "PATH");
+ opts.optopt("", "network-socket-commands-path", "change the socket path for reading commands", "PATH");
+ opts.optopt("", "network-socket-events-path", "change the socket path for sending events", "PATH");
opts.optopt("", "network-websocket-server", "change the websocket gateway address", "ADDR");
opts.optopt("", "rvi-client", "change the rvi client URL", "URL");
@@ -286,7 +290,8 @@ fn build_config() -> Config {
matches.opt_str("network-http-server").map(|server| config.network.http_server = server);
matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server);
- matches.opt_str("network-socket-path").map(|path| config.network.socket_path = path);
+ matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path);
+ matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path);
matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server);
config.rvi.as_mut().map(|rvi_cfg| {
diff --git a/tests/sota.toml b/tests/sota.toml
index 7ad6162..0801509 100644
--- a/tests/sota.toml
+++ b/tests/sota.toml
@@ -35,7 +35,8 @@ websocket = true
[network]
http_server = "http://127.0.0.1:8888"
rvi_edge_server = "http://127.0.0.1:9080"
-socket_path = "/tmp/sota.socket"
+socket_commands_path = "/tmp/sota-commands.socket"
+socket_events_path = "/tmp/sota-events.socket"
websocket_server = "ws://127.0.0.1:3012"
[rvi]
diff --git a/tests/sota_client_tests.rs b/tests/sota_client_tests.rs
index 2655732..fcdbd0d 100644
--- a/tests/sota_client_tests.rs
+++ b/tests/sota_client_tests.rs
@@ -93,8 +93,10 @@ Options:
change the http server gateway address
--network-rvi-edge-server ADDR
change the rvi edge server gateway address
- --network-socket-path PATH
- change the domain socket path
+ --network-socket-commands-path PATH
+ change the socket path for reading commands
+ --network-socket-events-path PATH
+ change the socket path for sending events
--network-websocket-server ADDR
change the websocket gateway address
--rvi-client URL