diff options
author | Arthur Taylor <codders@octomonkey.org.uk> | 2016-09-05 15:43:23 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-05 15:43:23 +0200 |
commit | 0167dce98692f707b74395977c478c2ca44fa0c7 (patch) | |
tree | 53db4ad3d930e586be4ec946b0bbbfdda5350732 /src/gateway/http.rs | |
parent | d37818fa5ac01e2bf05c9b6c71362b41691a01f1 (diff) | |
parent | db7575f02de4064a7afaa10c3ae33349fadbf605 (diff) | |
download | rvi_sota_client-0167dce98692f707b74395977c478c2ca44fa0c7.tar.gz |
Merge pull request #8 from advancedtelematic/stable
Merge latest advancedtelematic/stable
Diffstat (limited to 'src/gateway/http.rs')
-rw-r--r-- | src/gateway/http.rs | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/src/gateway/http.rs b/src/gateway/http.rs new file mode 100644 index 0000000..990a1fc --- /dev/null +++ b/src/gateway/http.rs @@ -0,0 +1,135 @@ +use chan; +use chan::{Sender, Receiver}; +use hyper::StatusCode; +use hyper::net::{HttpStream, Transport}; +use hyper::server::{Server as HyperServer, Request as HyperRequest}; +use rustc_serialize::json; +use std::thread; +use std::sync::{Arc, Mutex}; + +use datatype::{Command, Event}; +use gateway::{Gateway, Interpret}; +use http::{Server, ServerHandler}; + + +/// The `Http` gateway parses `Command`s from the body of incoming requests. +pub struct Http { + pub server: String, +} + +impl Gateway for Http { + fn initialize(&mut self, itx: Sender<Interpret>) -> Result<(), String> { + let itx = Arc::new(Mutex::new(itx)); + let server = match HyperServer::http(&self.server.parse().expect("couldn't parse http address")) { + Ok(server) => server, + Err(err) => return Err(format!("couldn't start http gateway: {}", err)) + }; + thread::spawn(move || { + let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap(); + server.run(); + }); + + Ok(info!("HTTP gateway listening at http://{}", self.server)) + } +} + + +struct HttpHandler { + itx: Arc<Mutex<Sender<Interpret>>>, + response_rx: Option<Receiver<Event>> +} + +impl HttpHandler { + fn new(itx: Arc<Mutex<Sender<Interpret>>>) -> ServerHandler<HttpStream> { + ServerHandler::new(Box::new(HttpHandler { itx: itx, response_rx: None })) + } +} + +impl<T: Transport> Server<T> for HttpHandler { + fn headers(&mut self, _: HyperRequest<T>) {} + + fn request(&mut self, body: Vec<u8>) { + String::from_utf8(body).map(|body| { + json::decode::<Command>(&body).map(|cmd| { + info!("Incoming HTTP request command: {}", cmd); + let (etx, erx) = chan::async::<Event>(); + self.response_rx = Some(erx); + self.itx.lock().unwrap().send(Interpret { + command: cmd, + response_tx: Some(Arc::new(Mutex::new(etx))), + }); + }).unwrap_or_else(|err| error!("http request parse json: {}", err)) + }).unwrap_or_else(|err| error!("http request parse string: {}", err)) + } + + fn response(&mut self) -> (StatusCode, Option<Vec<u8>>) { + self.response_rx.as_ref().map_or((StatusCode::BadRequest, None), |rx| { + rx.recv().map_or_else(|| { + error!("on_response receiver error"); + (StatusCode::InternalServerError, None) + }, |event| { + json::encode(&event).map(|body| { + (StatusCode::Ok, Some(body.into_bytes())) + }).unwrap_or_else(|err| { + error!("on_response encoding json: {:?}", err); + (StatusCode::InternalServerError, None) + }) + }) + }) + } +} + + +#[cfg(test)] +mod tests { + use chan; + use crossbeam; + use rustc_serialize::json; + use std::path::Path; + use std::thread; + + use super::*; + use gateway::{Gateway, Interpret}; + use datatype::{Command, Event}; + use http::{AuthClient, Client, set_ca_certificates}; + + + #[test] + fn http_connections() { + set_ca_certificates(&Path::new("run/sota_certificates")); + + let (etx, erx) = chan::sync::<Event>(0); + let (itx, irx) = chan::sync::<Interpret>(0); + + thread::spawn(move || Http { server: "127.0.0.1:8888".to_string() }.start(itx, erx)); + thread::spawn(move || { + let _ = etx; // move into this scope + loop { + let interpret = irx.recv().expect("itx is closed"); + match interpret.command { + Command::StartDownload(ids) => { + let tx = interpret.response_tx.unwrap(); + tx.lock().unwrap().send(Event::FoundSystemInfo(ids.first().unwrap().to_owned())); + } + _ => panic!("expected AcceptUpdates"), + } + } + }); + + crossbeam::scope(|scope| { + for id in 0..10 { + scope.spawn(move || { + let cmd = Command::StartDownload(vec!(format!("{}", id))); + let client = AuthClient::default(); + let url = "http://127.0.0.1:8888".parse().unwrap(); + let body = json::encode(&cmd).unwrap(); + let resp_rx = client.post(url, Some(body.into_bytes())); + let resp = resp_rx.recv().unwrap().unwrap(); + let text = String::from_utf8(resp).unwrap(); + assert_eq!(json::decode::<Event>(&text).unwrap(), + Event::FoundSystemInfo(format!("{}", id))); + }); + } + }); + } +} |