summaryrefslogtreecommitdiff
path: root/src/rvi/services.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rvi/services.rs')
-rw-r--r--src/rvi/services.rs185
1 files changed, 185 insertions, 0 deletions
diff --git a/src/rvi/services.rs b/src/rvi/services.rs
new file mode 100644
index 0000000..fd5a640
--- /dev/null
+++ b/src/rvi/services.rs
@@ -0,0 +1,185 @@
+use chan;
+use chan::Sender;
+use rustc_serialize::{json, Decodable, Encodable};
+use std::thread;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use time;
+
+use datatype::{ChunkReceived, DownloadStarted, Event, InstalledSoftware,
+ RpcRequest, RpcOk, RpcErr, RviConfig, UpdateReport, UpdateRequestId,
+ Url};
+use super::parameters::{Abort, Chunk, Finish, Notify, Parameter, Report, Start};
+use super::transfers::Transfers;
+
+
+/// Hold references to RVI service endpoints, currently active `Transfers`, and
+/// where to broadcast outcome `Event`s to.
+#[derive(Clone)]
+pub struct Services {
+ pub remote: Arc<Mutex<RemoteServices>>,
+ pub sender: Arc<Mutex<Sender<Event>>>,
+ pub transfers: Arc<Mutex<Transfers>>,
+}
+
+impl Services {
+ /// Set up a new RVI service handler, pruning any inactive `Transfer`s each second.
+ pub fn new(rvi_cfg: RviConfig, device_id: String, sender: Sender<Event>) -> Self {
+ let transfers = Arc::new(Mutex::new(Transfers::new(rvi_cfg.storage_dir)));
+ rvi_cfg.timeout.map_or_else(|| info!("Transfers will never time out."), |timeout| {
+ info!("Transfers timeout after {} seconds.", timeout);
+ let transfers = transfers.clone();
+ thread::spawn(move || {
+ let tick = chan::tick(Duration::from_secs(1));
+ loop {
+ let _ = tick.recv();
+ let mut transfers = transfers.lock().unwrap();
+ transfers.prune(time::get_time().sec, timeout);
+ }
+ });
+ });
+
+ Services {
+ remote: Arc::new(Mutex::new(RemoteServices::new(device_id, rvi_cfg.client))),
+ sender: Arc::new(Mutex::new(sender)),
+ transfers: transfers,
+ }
+ }
+
+ /// Register each RVI endpoint with the provided registration function which
+ /// should return a `String` representation of the URL used to contact that
+ /// service.
+ pub fn register_services<F: Fn(&str) -> String>(&mut self, register: F) {
+ let _ = register("/sota/notify");
+ let mut remote = self.remote.lock().unwrap();
+ remote.local = Some(LocalServices {
+ start: register("/sota/start"),
+ chunk: register("/sota/chunk"),
+ abort: register("/sota/abort"),
+ finish: register("/sota/finish"),
+ getpackages: register("/sota/getpackages")
+ });
+ }
+
+ /// Handle an incoming message for a specific service endpoint.
+ pub fn handle_service(&self, service: &str, id: u64, msg: &str) -> Result<RpcOk<i32>, RpcErr> {
+ match service {
+ "/sota/notify" => self.handle_message::<Notify>(id, msg),
+ "/sota/start" => self.handle_message::<Start>(id, msg),
+ "/sota/chunk" => self.handle_message::<Chunk>(id, msg),
+ "/sota/finish" => self.handle_message::<Finish>(id, msg),
+ "/sota/getpackages" => self.handle_message::<Report>(id, msg),
+ "/sota/abort" => self.handle_message::<Abort>(id, msg),
+ _ => Err(RpcErr::invalid_request(id, format!("unknown service: {}", service)))
+ }
+ }
+
+ /// Parse the message as an `RpcRequest<RviMessage<Parameter>>` then delegate
+ /// to the specific `Parameter.handle()` function, forwarding any returned
+ /// `Event` to the `Services` sender.
+ fn handle_message<P>(&self, id: u64, msg: &str) -> Result<RpcOk<i32>, RpcErr>
+ where P: Parameter + Encodable + Decodable
+ {
+ let request = try!(json::decode::<RpcRequest<RviMessage<P>>>(&msg).map_err(|err| {
+ error!("couldn't decode message: {}", err);
+ RpcErr::invalid_params(id, format!("couldn't decode message: {}", err))
+ }));
+ let event = try!(request.params.parameters[0].handle(&self.remote, &self.transfers).map_err(|err| {
+ error!("couldn't handle parameters: {}", err);
+ RpcErr::unspecified(request.id, format!("couldn't handle parameters: {}", err))
+ }));
+ event.map(|ev| self.sender.lock().unwrap().send(ev));
+ Ok(RpcOk::new(request.id, None))
+ }
+}
+
+
+pub struct RemoteServices {
+ pub device_id: String,
+ pub rvi_client: Url,
+ pub local: Option<LocalServices>,
+ pub backend: Option<BackendServices>
+}
+
+impl RemoteServices {
+ pub fn new(device_id: String, rvi_client: Url) -> RemoteServices {
+ RemoteServices { device_id: device_id, rvi_client: rvi_client, local: None, backend: None }
+ }
+
+ fn send_message<E: Encodable>(&self, body: E, addr: &str) -> Result<String, String> {
+ RpcRequest::new("message", RviMessage::new(addr, vec![body], 60)).send(self.rvi_client.clone())
+ }
+
+ pub fn send_download_started(&self, update_id: UpdateRequestId) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ let local = try!(self.local.as_ref().ok_or("LocalServices not set"));
+ let start = DownloadStarted { device: self.device_id.clone(), update_id: update_id, services: local.clone() };
+ self.send_message(start, &backend.start)
+ }
+
+ pub fn send_chunk_received(&self, chunk: ChunkReceived) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ self.send_message(chunk, &backend.ack)
+ }
+
+ pub fn send_update_report(&self, report: UpdateReport) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ let result = UpdateReportResult { device: self.device_id.clone(), update_report: report };
+ self.send_message(result, &backend.report)
+ }
+
+ pub fn send_installed_software(&self, installed: InstalledSoftware) -> Result<String, String> {
+ let backend = try!(self.backend.as_ref().ok_or("BackendServices not set"));
+ let result = InstalledSoftwareResult { device_id: self.device_id.clone(), installed: installed };
+ self.send_message(result, &backend.packages)
+ }
+}
+
+
+#[derive(Clone, RustcDecodable, RustcEncodable)]
+pub struct LocalServices {
+ pub start: String,
+ pub abort: String,
+ pub chunk: String,
+ pub finish: String,
+ pub getpackages: String,
+}
+
+#[derive(Clone, RustcDecodable, RustcEncodable)]
+pub struct BackendServices {
+ pub start: String,
+ pub ack: String,
+ pub report: String,
+ pub packages: String
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+struct UpdateReportResult {
+ pub device: String,
+ pub update_report: UpdateReport
+}
+
+#[derive(RustcDecodable, RustcEncodable)]
+struct InstalledSoftwareResult {
+ device_id: String,
+ installed: InstalledSoftware
+}
+
+
+#[derive(RustcDecodable, RustcEncodable)]
+pub struct RviMessage<E: Encodable> {
+ pub service_name: String,
+ pub parameters: Vec<E>,
+ pub timeout: Option<i64>
+}
+
+impl<E: Encodable> RviMessage<E> {
+ pub fn new(service: &str, parameters: Vec<E>, expire_in: i64) -> RviMessage<E> {
+ RviMessage {
+ service_name: service.to_string(),
+ parameters: parameters,
+ timeout: Some((time::get_time() + time::Duration::seconds(expire_in)).sec)
+ }
+ }
+}