summaryrefslogtreecommitdiff
path: root/src/remote
diff options
context:
space:
mode:
Diffstat (limited to 'src/remote')
-rw-r--r--src/remote/dw.rs581
-rw-r--r--src/remote/jsonrpc.rs150
-rw-r--r--src/remote/mod.rs5
-rw-r--r--src/remote/parm.rs189
-rw-r--r--src/remote/rvi/edge.rs162
-rw-r--r--src/remote/rvi/message.rs61
-rw-r--r--src/remote/rvi/mod.rs18
-rw-r--r--src/remote/rvi/send.rs61
-rw-r--r--src/remote/svc.rs272
9 files changed, 0 insertions, 1499 deletions
diff --git a/src/remote/dw.rs b/src/remote/dw.rs
deleted file mode 100644
index 0fd8b8e..0000000
--- a/src/remote/dw.rs
+++ /dev/null
@@ -1,581 +0,0 @@
-//! Handles caching and storage on disk for in-progress transfers and the assembly and verification
-//! of finished transfers
-
-use std::fs;
-use std::fs::{OpenOptions, DirEntry, File};
-use std::io::prelude::*;
-use std::path::PathBuf;
-use std::vec::Vec;
-use std::str::FromStr;
-
-use time;
-
-#[cfg(test)] use rand;
-#[cfg(test)] use rand::Rng;
-#[cfg(test)] use test_library::PathPrefix;
-
-use crypto::sha1::Sha1;
-use crypto::digest::Digest;
-
-use rustc_serialize::base64::FromBase64;
-
-use event::UpdateId;
-
-/// Type for storing the metadata of a in-progress transfer, which is defined as one package.
-/// Will clear out the chunks on disk when freed.
-pub struct Transfer {
- pub update_id: UpdateId,
- /// SHA1 checksum of the fully assembled package.
- pub checksum: String,
- /// `Vector` of transferred chunks.
- pub transferred_chunks: Vec<u64>,
- /// Path to the directory, where chunks will be cached and finished packages will be stored.
- pub prefix_dir: String,
- /// Timestamp, when the last chunk was received. Given as a unix epoch timestamp.
- pub last_chunk_received: i64
-}
-
-impl Transfer {
- /// Return a new `Transfer`
- ///
- /// # Arguments
- /// * `prefix`: Path where transferred chunks and assembled package will be stored.
- /// * `package`: [`PackageId`](../message/struct.PackageId.html) of this transfer.
- /// * `checksum`: SHA1 checksum of the fully assembled package.
- pub fn new(prefix: String, id: UpdateId, checksum: String)
- -> Transfer {
- Transfer {
- update_id: id,
- checksum: checksum,
- transferred_chunks: Vec::new(),
- prefix_dir: prefix,
- last_chunk_received: time::get_time().sec
- }
- }
-
- /// Create a transfer with empty values. To be used in tests.
- ///
- /// # Arguments
- /// * `prefix`: Path where transferred chunks and assembled package will be stored. This should
- /// be a temporary directory for tests.
- #[cfg(test)]
- pub fn new_test(prefix: &PathPrefix) -> Transfer {
- Transfer {
- update_id: UpdateId::new(),
- checksum: "".to_string(),
- transferred_chunks: Vec::new(),
- prefix_dir: prefix.to_string(),
- last_chunk_received: time::get_time().sec
- }
- }
-
- /// Randomize a existing transfer, by creating a random
- /// [`PackageId`](../message/struct.PackageId.html). Returns the created `PackageId`, so it can
- /// be used in assertions.
- ///
- /// # Arguments
- /// * `i`: Size of the name and version strings.
- #[cfg(test)]
- pub fn randomize(&mut self, i: usize) -> UpdateId {
- let update_id = rand::thread_rng()
- .gen_ascii_chars().take(i).collect::<String>();
-
- trace!("Testing with:");
- trace!(" update_id: {}", update_id);
- self.update_id = update_id.clone();
- update_id
- }
-
- /// Write a transferred chunk to disk. Returns false and logs an error if something goes wrong.
- ///
- /// # Arguments
- /// * `msg`: Base64 encoded data of this chunk.
- /// * `index`: Index of this chunk
- pub fn write_chunk(&mut self,
- msg: &str,
- index: u64) -> bool {
- let success = msg.from_base64().map_err(|e| {
- error!("Could not decode chunk {} for update_id {}", index, self.update_id);
- error!("{}", e)
- }).and_then(|msg| self.get_chunk_path(index).map_err(|e| {
- error!("Could not get path for chunk {}", index);
- error!("{}", e)
- }).map(|path| {
- trace!("Saving chunk to {}", path.display());
- if write_new_file(&path, &msg) {
- self.transferred_chunks.push(index);
- self.transferred_chunks.sort();
- self.transferred_chunks.dedup();
- true
- } else {
- error!("Couldn't write chunk {} for update_id {}", index, self.update_id);
- false
- }
- })).unwrap_or(false);
-
- self.last_chunk_received = time::get_time().sec;
- success
- }
-
- /// Assemble the transferred chunks to a package and verify it with the provided checksum.
- /// Returns `false` and prints a error message if either the package can't be assembled or the
- /// checksum doesn't match.
- pub fn assemble_package(&self) -> Result<PathBuf, String> {
- trace!("Finalizing package {}", self.update_id);
- self.assemble_chunks().
- and_then(|_| {
- if self.checksum() {
- self.get_package_path()
- } else {
- Err(format!("Cannot assemble_package for update_id: {}", self.update_id))
- }
- })
- }
-
- /// Collect all chunks and concatenate them into one file. Returns a `String` with a error
- /// message, should something go wrong.
- fn assemble_chunks(&self) -> Result<(), String> {
- let package_path = try!(self.get_package_path());
-
- trace!("Saving update_id {} to {}", self.update_id, package_path.display());
-
- let mut file = try!(OpenOptions::new()
- .write(true).append(true)
- .create(true).truncate(true)
- .open(package_path)
- .map_err(|x| format!("Couldn't open file: {}", x)));
-
- let path: PathBuf = try!(self.get_chunk_dir());
-
- // Make sure all indices are valid and sort them
- let mut indices = Vec::new();
- for entry in try!(read_dir(&path)) {
- let entry = try!(entry.map_err(|x| format!("No entries: {}", x)));
- indices.push(try!(parse_index(entry)));
- }
- indices.sort();
-
- // Append indices to the final file
- for index in indices {
- try!(self.copy_chunk(&path, index, &mut file));
- }
- Ok(())
- }
-
- /// Read a chunk file file and append it to a package file. Returns a `String` with a error
- /// message should something go wrong.
- ///
- /// # Arguments
- /// * `path`: Pointer to a [`PathBuf`]
- /// (https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) where the chunks are
- /// cached.
- /// * `index`: Index of the chunk to append.
- /// * `file`: Pointer to a `File` where the chunk should be appended. Should be created with
- /// `OpenOptions` and the append only option. See the documentation for [`OpenOptions`]
- /// (https://doc.rust-lang.org/stable/std/fs/struct.OpenOptions.html), [`File`]
- /// (https://doc.rust-lang.org/stable/std/fs/struct.File.html), and the implementation of
- /// [`assemble_chunks`](#method.assemble_chunks) for details.
- fn copy_chunk(&self, path: &PathBuf, index: u64, file: &mut File)
- -> Result<(), String> {
- let name = index.to_string();
- let mut chunk_path = path.clone();
- chunk_path.push(&name);
- let mut chunk =
- try!(OpenOptions::new().open(chunk_path)
- .map_err(|x| format!("Couldn't open file: {}", x)));
-
- let mut buf = Vec::new();
- try!(chunk.read_to_end(&mut buf)
- .map_err(|x| format!("Couldn't read file {}: {}", name, x)));
- try!(file.write(&mut buf)
- .map_err(|x| format!("Couldn't write chunk {} to file {}: {}",
- name, self.update_id, x)));
-
- trace!("Wrote chunk {} to update_id {}", name, self.update_id);
- Ok(())
- }
-
- /// Verify the checksum of this transfer. Assumes the package was already assembled. Prints a
- /// error message showing the mismatched checksums and returns false on errors.
- fn checksum(&self) -> bool {
- let path = try_or!(self.get_package_path(), return false);
- let mut file = try_or!(OpenOptions::new().open(path), return false);
- let mut data = Vec::new();
-
- // TODO: avoid reading in the whole file at once
- try_msg_or!(file.read_to_end(&mut data),
- "Couldn't read file to check",
- return false);
-
- let mut hasher = Sha1::new();
- hasher.input(&data);
- let hash = hasher.result_str();
-
- if hash == self.checksum {
- true
- } else {
- error!("Checksums didn't match for update_id {}", self.update_id);
- error!(" Expected: {}", self.checksum);
- error!(" Got: {}", hash);
- false
- }
- }
-
- /// Get the full path for the specified chunk index. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- ///
- /// # Arguments
- /// * `index`: The index for which the path should be constructed
- fn get_chunk_path(&self, index: u64) -> Result<PathBuf, String> {
- let mut path = try!(self.get_chunk_dir());
- let filename = index.to_string();
-
- trace!("Using filename {}", filename);
- path.push(filename);
- Ok(path)
- }
-
- /// Get the full path for the package of this `Transfer`. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- fn get_package_path(&self) -> Result<PathBuf, String> {
- let mut path = try!(self.get_package_dir());
- path.push(format!("{}.spkg", self.update_id));
- Ok(path)
- }
-
- /// Get the directory, where this `Transfer` caches chunks. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- fn get_chunk_dir(&self) -> Result<PathBuf, String> {
- let mut path = PathBuf::from(&self.prefix_dir);
- path.push("downloads");
- path.push(format!("{}", self.update_id));
-
- fs::create_dir_all(&path).map_err(|e| {
- let path_str = path.to_str().unwrap_or("unknown");
- format!("Couldn't create chunk dir at '{}': {}", path_str, e)
- }).map(|_| path)
- }
-
- /// Get the directory, where this `Transfer` stores the assembled package. Returns a
- /// [`PathBuf`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) on success or a
- /// `String` on errors detailing what went wrong.
- fn get_package_dir(&self) -> Result<PathBuf, String> {
- let mut path = PathBuf::from(&self.prefix_dir);
- path.push("packages");
-
- fs::create_dir_all(&path).map_err(|e| {
- let path_str = path.to_str().unwrap_or("unknown");
- format!("Couldn't create packges dir at '{}': {}", path_str, e)
- }).map(|_| path)
- }
-}
-
-impl Drop for Transfer {
- /// When a `Transfer` is freed it will also clear out the associated chunk cache on disk.
- fn drop(&mut self) {
- let dir = try_or!(self.get_chunk_dir(), return);
- trace!("Dropping transfer for package {}", self.update_id);
-
- for entry in try_or!(read_dir(&dir), return) {
- let entry = try_or!(entry, continue);
- let _ = entry.file_name().into_string().map_err(|_|
- error!("Found a malformed entry!")
- ).map(|name| {
- trace!("Dropping chunk file {}", name);
- try_or!(fs::remove_file(entry.path()), return);
- });
- }
-
- try_or!(fs::remove_dir(dir), return);
- }
-}
-
-/// Write the provided `data` to the file at `path`. Will create the file if it doesn't exist and
-/// overwrite existing files. Returns `false` on errors, after logging a error message.
-///
-/// # Arguments
-/// * `path`: Pointer to a [`PathBuf`]
-/// (https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html) where the data will be
-/// written to. Needs to point to a (possibly nonexistent) file.
-/// * `data`: The data to be written to disk.
-fn write_new_file(path: &PathBuf, data: &Vec<u8>) -> bool {
- let mut file = try_or!(OpenOptions::new()
- .write(true).create(true)
- .truncate(true).open(path),
- return false);
-
- try_or!(file.write_all(data), return false);
- try_or!(file.flush(), return false);
- true
-}
-
-/// Read the contents of a directory. Returns a
-/// [`ReadDir`](https://doc.rust-lang.org/stable/std/fs/struct.ReadDir.html) iterator on success or
-/// a `String` with a detailed error message on failure.
-fn read_dir(path: &PathBuf) -> Result<fs::ReadDir, String> {
- fs::read_dir(path).map_err(|e| {
- let path_str = path.to_str().unwrap_or("unknown");
- format!("Couldn't read dir at '{}': {}", path_str, e)
- })
-}
-
-/// Parse a [`DirEntry`](https://doc.rust-lang.org/stable/std/fs/struct.DirEntry.html) to a `u64`.
-/// Returns the parsed number on success or a `String` with a detailed error message on failure.
-///
-/// # Arguments
-/// * `entry`: `DirEntry` to be parsed.
-fn parse_index(entry: DirEntry) -> Result<u64, String> {
- let name = entry.file_name().into_string()
- .unwrap_or("unknown".to_string());
- u64::from_str(&name)
- .map_err(|_| "Couldn't parse chunk index from filename".to_string())
-}
-
-use std::collections::HashMap;
-
-/// Type alias to hide the internal `HashMap`, that is used to store
-/// [`Transfer`](../persistence/struct.Transfer.html)s.
-pub struct Transfers {
- items: HashMap<UpdateId, Transfer>,
- storage_dir: String
-}
-
-impl Transfers {
- pub fn new(dir: String) -> Transfers {
- Transfers {
- items: HashMap::new(),
- storage_dir: dir
- }
- }
-
- pub fn get(&self, pkg: &UpdateId) -> Option<&Transfer> {
- self.items.get(pkg)
- }
-
- pub fn get_mut(&mut self, pkg: &UpdateId) -> Option<&mut Transfer> {
- self.items.get_mut(pkg)
- }
-
- pub fn push(&mut self, pkg: UpdateId, cksum: String) {
- self.items.insert(
- pkg.clone(),
- Transfer::new(self.storage_dir.to_string(), pkg, cksum));
- }
-
- #[cfg(test)]
- pub fn push_test(&mut self, tr: Transfer) {
- self.items.insert(tr.update_id.clone(), tr);
- }
-
- #[cfg(test)]
- pub fn is_empty(&self) -> bool {
- self.items.is_empty()
- }
-
- pub fn remove(&mut self, pkg: &UpdateId) {
- self.items.remove(pkg);
- }
-
- pub fn clear(&mut self) {
- self.items.clear();
- }
-
- pub fn prune(&mut self, now: i64, timeout: i64) {
- self.items.iter()
- .filter(|&(_, v)| now - v.last_chunk_received > timeout)
- .map(|(k, _)| k.clone())
- .collect::<Vec<UpdateId>>()
- .iter().map(|k| {
- self.items.remove(k);
- info!("Transfer for update_id {} timed out after {} ms", k, timeout)})
- .collect::<Vec<()>>();
- }
-}
-
-
-#[cfg(test)]
-mod test {
- use super::*;
- use test_library::*;
-
- use std::path::PathBuf;
- use std::fs;
- use std::fs::OpenOptions;
- use std::io::prelude::*;
-
- use rand;
- use rand::Rng;
- use rustc_serialize::base64;
- use rustc_serialize::base64::ToBase64;
-
- fn create_tmp_directories(prefix: &PathPrefix) {
- for i in 1..20 {
- let mut transfer = Transfer::new_test(prefix);
- let update_id = transfer.randomize(i);
- let chunk_dir: PathBuf = transfer.get_chunk_dir().unwrap();
- let path = format!("{}/downloads/{}", prefix, update_id);
- assert_eq!(chunk_dir.to_str().unwrap(), path);
-
- let path = PathBuf::from(path);
- // This also makes sure it's a directory
- let dir = fs::read_dir(&path).unwrap();
-
- for _ in dir {
- panic!("Found non-empty directory!");
- }
- }
- }
-
- #[test]
- fn it_creates_a_tmp_directory() {
- test_init!();
- let prefix = PathPrefix::new();
- create_tmp_directories(&prefix);
- }
-
- #[test]
- fn it_cleans_up_the_tmp_directories() {
- test_init!();
- let prefix = PathPrefix::new();
- create_tmp_directories(&prefix);
- let path = PathBuf::from(format!("{}/downloads/", prefix));
- let dir = fs::read_dir(&path).unwrap();
-
- for _ in dir {
- panic!("Found non-empty directory!");
- }
- }
-
- #[test]
- fn it_creates_a_persistent_directory_per_package() {
- test_init!();
- let prefix = PathPrefix::new();
- for i in 1..20 {
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(i);
-
- let chunk_dir: PathBuf = transfer.get_package_path().unwrap();
- let path = format!("{}/packages/{}.spkg", prefix, update_id);
- assert_eq!(chunk_dir.to_str().unwrap(), path);
- }
- }
-
- macro_rules! assert_chunk_written {
- ($transfer:ident,
- $prefix:ident,
- $update_id:ident,
- $index:ident,
- $data:ident) => {{
- trace!("Testing with: {}", $data);
-
- let b64_data = $data.as_bytes().to_base64(
- base64::Config {
- char_set: base64::CharacterSet::UrlSafe,
- newline: base64::Newline::LF,
- pad: true,
- line_length: None
- });
-
- trace!("Encoded as: {}", b64_data);
-
- $transfer.write_chunk(&b64_data, $index as u64);
-
- let path = format!("{}/downloads/{}/{}", $prefix, $update_id, $index);
-
- trace!("Expecting file at: {}", path);
-
- let mut from_disk = Vec::new();
- OpenOptions::new()
- .open(PathBuf::from(path))
- .unwrap()
- .read_to_end(&mut from_disk)
- .unwrap();
-
- assert_eq!($data.into_bytes(), from_disk);
- }}
- }
-
- #[test]
- fn it_writes_decoded_data_to_disk() {
- test_init!();
- let prefix = PathPrefix::new();
- for i in 1..20 {
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(i);
- for i in 1..20 {
- let data = rand::thread_rng()
- .gen_ascii_chars().take(i).collect::<String>();
- assert_chunk_written!(transfer, prefix, update_id, i, data);
- }
- }
- }
-
- #[test]
- fn it_correctly_assembles_stored_chunks() {
- test_init!();
- let prefix = PathPrefix::new();
- for i in 1..20 {
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(i);
- let mut full_data = String::new();
- for i in 1..20 {
- let data = rand::thread_rng()
- .gen_ascii_chars().take(i).collect::<String>();
- full_data.push_str(&data);
-
- assert_chunk_written!(transfer, prefix, update_id, i, data);
- }
-
- transfer.assemble_chunks().unwrap();
-
- let path = format!("{}/packages/{}.spkg", prefix, update_id);
-
- trace!("Expecting assembled file at: {}", path);
-
- let mut from_disk = Vec::new();
- OpenOptions::new()
- .open(PathBuf::from(path))
- .unwrap()
- .read_to_end(&mut from_disk)
- .unwrap();
-
- assert_eq!(full_data.into_bytes(), from_disk);
- }
- }
-
- fn checksum_matching(data: String, checksum: String) -> bool {
- let prefix = PathPrefix::new();
- let mut transfer = Transfer::new_test(&prefix);
- let update_id = transfer.randomize(20);
- let index = 0;
- assert_chunk_written!(transfer, prefix, update_id, index, data);
- transfer.assemble_chunks().unwrap();
-
- transfer.checksum = checksum;
- transfer.checksum()
- }
-
- #[test]
- fn it_returns_true_for_correct_checksums() {
- test_init!();
- assert!(checksum_matching("test\n".to_string(),
- "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83".to_string()));
- }
-
- #[test]
- fn it_returns_false_for_incorrect_checksums() {
- test_init!();
- assert!(!checksum_matching("test\n".to_string(),
- "fa7c4d75bae3a641d1f9ab5df028175bfb8a69ca".to_string()));
- }
-
- #[test]
- fn it_returns_false_for_invalid_checksums() {
- test_init!();
- assert!(!checksum_matching("test\n".to_string(),
- "invalid".to_string()));
- }
-}
diff --git a/src/remote/jsonrpc.rs b/src/remote/jsonrpc.rs
deleted file mode 100644
index 5676a37..0000000
--- a/src/remote/jsonrpc.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-//! RVI specific implementation of the jsonrpc protocol
-use time;
-
-/// Type to encode a generic jsonrpc call.
-#[derive(RustcDecodable,RustcEncodable,Debug)]
-pub struct Request<T> {
- /// The version of jsonrpc to use, has to be set to `"2.0"`.
- pub jsonrpc: String,
- /// The identifier of the request. Only unsigned numbers are accepted
- // TODO: id can be any type
- pub id: u64,
- /// The method to call on the receiving side.
- pub method: String,
- /// Arguments for the method.
- pub params: T
-}
-
-impl<T> Request<T> {
- /// Returns a new `Request`.
- ///
- /// # Arguments
- /// * `s`: The name of the method to call.
- /// * `p`: The arguments of said method.
- pub fn new(s: &str, p: T) -> Request<T> {
- Request::<T> {
- jsonrpc: "2.0".to_string(),
- id: time::precise_time_ns(),
- method: s.to_string(),
- params: p
- }
- }
-}
-
-/// Response to a jsonrpc call, indicating a successful method call.
-#[derive(RustcDecodable,RustcEncodable)]
-pub struct OkResponse<T> {
- /// The version of jsonrpc to use, has to be set to `"2.0"`.
- pub jsonrpc: String,
- /// The identifier of the jsonrpc call this response belongs to. Only unsigned numbers are
- /// accepted
- // TODO: id can be any type
- pub id: u64,
- /// The result of the method call, if any.
- pub result: Option<T>
-}
-
-impl<T> OkResponse<T> {
- /// Returns a new `OkResponse`
- ///
- /// # Arguments
- /// * `id`: The identifier of the jsonrpc call the returned response belongs to.
- /// * `result`: The result of the method call, if any.
- pub fn new(id: u64, result: Option<T>) -> OkResponse<T> {
- OkResponse {
- jsonrpc: "2.0".to_string(),
- id: id,
- result: result
- }
- }
-}
-
-/// Response to a jsonrpc call, indicating failure.
-#[derive(RustcDecodable,RustcEncodable)]
-pub struct ErrResponse {
- /// The version of jsonrpc to use, has to be set to `"2.0"`.
- pub jsonrpc: String,
- /// The identifier of the jsonrpc call this response belongs to. Only unsigned numbers are
- /// accepted
- // TODO: id can be any type
- pub id: u64,
- /// The error code and message.
- pub error: ErrorCode
-}
-
-impl ErrResponse {
- /// Returns a new `ErrResponse`
- ///
- /// # Arguments
- /// * `id`: The identifier of the jsonrpc call the returned response belongs to.
- /// * `error`: The error code and message. See [`ErrorCode`](./struct.ErrorCode.html).
- pub fn new(id: u64, error: ErrorCode) -> ErrResponse {
- ErrResponse {
- jsonrpc: "2.0".to_string(),
- id: id,
- error: error
- }
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Invalid
- /// Request"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn invalid_request(id: u64) -> ErrResponse {
- ErrResponse::new(id,
- ErrorCode {
- code: -32600,
- message: "Invalid Request".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Method not
- /// found"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn method_not_found(id: u64) -> ErrResponse {
- ErrResponse::new(id,
- ErrorCode {
- code: -32601,
- message: "Method not found".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Parse
- /// error"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn parse_error() -> ErrResponse {
- ErrResponse::new(0,
- ErrorCode {
- code: -32700,
- message: "Parse error".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a ["Invalid
- /// params"](http://www.jsonrpc.org/specification#error_object) error.
- pub fn invalid_params(id: u64) -> ErrResponse {
- ErrResponse::new(
- id,
- ErrorCode {
- code: -32602,
- message: "Invalid params".to_string()
- })
- }
-
- /// Returns a new `ErrResponse`, indicating a unspecified error.
- pub fn unspecified(id: u64) -> ErrResponse {
- ErrResponse::new(
- id,
- ErrorCode {
- code: -32100,
- message: "Couldn't handle request".to_string()
- })
- }
-}
-
-/// Type to encode a jsonrpc error.
-#[derive(RustcDecodable,RustcEncodable)]
-pub struct ErrorCode {
- /// The error code as [specified by
- /// jsonrpc](http://www.jsonrpc.org/specification#error_object).
- pub code: i32,
- /// The error message as [specified by
- /// jsonrpc](http://www.jsonrpc.org/specification#error_object).
- pub message: String
-}
diff --git a/src/remote/mod.rs b/src/remote/mod.rs
deleted file mode 100644
index 42a2c7b..0000000
--- a/src/remote/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-mod dw;
-mod jsonrpc;
-mod parm;
-pub mod rvi;
-pub mod svc;
diff --git a/src/remote/parm.rs b/src/remote/parm.rs
deleted file mode 100644
index 5c2db8b..0000000
--- a/src/remote/parm.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-use event::UpdateId;
-use event::inbound::{InboundEvent, UpdateAvailable, GetInstalledSoftware, DownloadComplete};
-
-use super::dw::Transfers;
-use super::svc::{BackendServices, RemoteServices};
-
-use std::result;
-use std::sync::Mutex;
-
-#[derive(Debug)]
-pub enum Error {
- UnknownPackage,
- IoFailure,
- SendFailure
-}
-pub type Result = result::Result<Option<InboundEvent>, Error>;
-
-/// Trait that every message handler needs to implement.
-pub trait ParamHandler {
- /// Handle the message.
- ///
- /// Return a [`Event`](../message/enum.Event.html) to be passed to the
- /// [`main_loop`](../main_loop/index.html) if apropriate.
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>)
- -> Result;
-}
-
-
-/// Type for "Notify" messages.
-#[derive(RustcDecodable, Clone)]
-pub struct NotifyParams {
- /// A `Vector` of packages, that are available for download.
- pub update_available: UpdateAvailable,
- /// The service URLs, that the SOTA server supports.
- pub services: BackendServices,
-}
-
-impl ParamHandler for NotifyParams {
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- _: &Mutex<Transfers>) -> Result {
- let mut services = services.lock().unwrap();
- services.set(self.services.clone());
-
- Ok(Some(InboundEvent::UpdateAvailable(self.update_available.clone())))
- }
-}
-
-
-/// Type for "Start Transfer" messages.
-#[derive(RustcDecodable)]
-pub struct StartParams {
- pub update_id: UpdateId,
- /// The amount of chunks this `Transfer` will have.
- pub chunkscount: u64,
- /// The SHA1 checksum of the assembled package.
- pub checksum: String
-}
-
-impl ParamHandler for StartParams {
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let services = services.lock().unwrap();
- let mut transfers = transfers.lock().unwrap();
-
- info!("Starting transfer for update_id {}", self.update_id);
-
- transfers.push(self.update_id.clone(), self.checksum.clone());
- services.send_chunk_received(
- ChunkReceived {
- update_id: self.update_id.clone(),
- chunks: Vec::new(),
- vin: services.vin.clone() })
- .map_err(|e| {
- error!("Error on sending start ACK: {}", e);
- Error::SendFailure })
- .map(|_| None)
- }
-}
-
-/// Encodes the "Chunk Received" message, indicating that a chunk was successfully transferred.
-#[derive(RustcEncodable)]
-pub struct ChunkReceived {
- /// The transfer to which the transferred chunk belongs.
- pub update_id: UpdateId,
- /// A list of the successfully transferred chunks.
- pub chunks: Vec<u64>,
- /// The VIN of this device.
- pub vin: String
-}
-
-
-/// Type for messages transferring single chunks.
-#[derive(RustcDecodable)]
-pub struct ChunkParams {
- /// The package transfer this chunk belongs to.
- pub update_id: UpdateId,
- /// The data of the transferred chunk.
- pub bytes: String,
- /// The index of this chunk.
- pub index: u64
-}
-
-impl ParamHandler for ChunkParams {
- fn handle(&self,
- services: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let services = services.lock().unwrap();
- let mut transfers = transfers.lock().unwrap();
- transfers.get_mut(&self.update_id).map(|t| {
- if t.write_chunk(&self.bytes, self.index) {
- info!("Wrote chunk {} for package {}", self.index, self.update_id);
- services.send_chunk_received(
- ChunkReceived {
- update_id: self.update_id.clone(),
- chunks: t.transferred_chunks.clone(),
- vin: services.vin.clone() })
- .map_err(|e| {
- error!("Error on sending ChunkReceived: {}", e);
- Error::SendFailure })
- .map(|_| None)
- } else {
- Err(Error::IoFailure)
- }
- }).unwrap_or_else(|| {
- error!("Couldn't find transfer for update_id {}", self.update_id);
- Err(Error::UnknownPackage)
- })
- }
-}
-
-
-/// Type for "Finish Transfer" messages.
-#[derive(RustcDecodable)]
-pub struct FinishParams {
- /// The package transfer to finalize.
- pub update_id: UpdateId,
- pub signature: String
-}
-
-impl ParamHandler for FinishParams {
- fn handle(&self,
- _: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let mut transfers = transfers.lock().unwrap();
- transfers.get(&self.update_id).ok_or(Error::UnknownPackage)
- .and_then(|t| {
- t.assemble_package().map_err(|_| Error::IoFailure) })
- .and_then(|p| {
- p.into_os_string().into_string().map_err(|_| Error::IoFailure) })
- .map(|p| {
- transfers.remove(&self.update_id);
- info!("Finished transfer of {}", self.update_id);
- Some(InboundEvent::DownloadComplete(DownloadComplete {
- update_id: self.update_id.clone(),
- update_image: p,
- signature: self.signature.clone() })) })
- }
-}
-
-
-/// Type for "Abort Transfer" messages.
-#[derive(RustcDecodable)]
-pub struct AbortParams;
-
-impl ParamHandler for AbortParams {
- fn handle(&self,
- _: &Mutex<RemoteServices>,
- transfers: &Mutex<Transfers>) -> Result {
- let mut transfers = transfers.lock().unwrap();
- transfers.clear();
- Ok(None)
- }
-}
-
-
-/// Type for "Get All Packages" messages.
-pub type ReportParams = GetInstalledSoftware;
-
-impl ParamHandler for ReportParams {
- fn handle(&self,
- _: &Mutex<RemoteServices>,
- _: &Mutex<Transfers>) -> Result {
- Ok(Some(InboundEvent::GetInstalledSoftware(self.clone())))
- }
-}
diff --git a/src/remote/rvi/edge.rs b/src/remote/rvi/edge.rs
deleted file mode 100644
index 6eec01f..0000000
--- a/src/remote/rvi/edge.rs
+++ /dev/null
@@ -1,162 +0,0 @@
-//! Implements the RVI facing webservice.
-
-use std::io::{Read, Write};
-use std::thread;
-use hyper::Server;
-use hyper::server::{Handler, Request, Response};
-use rustc_serialize::json;
-use rustc_serialize::json::Json;
-
-use remote::jsonrpc;
-use remote::jsonrpc::{OkResponse, ErrResponse};
-
-use remote::rvi::send;
-use remote::rvi::message::{RegisterServiceRequest, RegisterServiceResponse};
-
-pub trait ServiceHandler: Sync + Send {
- fn handle_service(&self, id: u64, service: &str, message: &str)
- -> Result<OkResponse<i32>, ErrResponse>;
- fn register_services<F: Fn(&str) -> String>(&self, reg: F);
-}
-
-/// Encodes the service edge of the webservice.
-pub struct ServiceEdge<H: ServiceHandler + 'static> {
- /// The full URL where RVI can be reached.
- rvi_url: String,
- /// The `host:port` to bind and listen for incoming RVI messages.
- edge_url: String,
- hdlr: H
-}
-
-impl<H: ServiceHandler + 'static> ServiceEdge<H> {
- /// Create a new service edge.
- ///
- /// # Arguments
- /// * `r`: The full URL where RVI can be reached.
- /// * `e`: The `host:port` combination where the edge should bind.
- /// * `s`: A sender to communicate back the service URLs.
- pub fn new(r: String, e: String, h: H) -> ServiceEdge<H> {
- ServiceEdge {
- rvi_url: r,
- edge_url: e,
- hdlr: h
- }
- }
-
- /// Register a service. Returns the full service URL as provided by RVI. Panics if the
- /// registration in RVI failed. This can be handled by starting the RVI edge in a separate
- /// thread.
- ///
- /// # Arguments
- /// * `s`: The service to register. Will get prepended with the device identifier by RVI.
- pub fn register_service(&self, s: &str) -> String {
- let json_rpc = jsonrpc::Request::new(
- "register_service",
- RegisterServiceRequest {
- network_address: self.edge_url.to_string(),
- service: s.to_string()
- });
-
- let resp = send(&self.rvi_url, &json_rpc)
- .map_err(|e| error!("Couldn't send registration to RVI\n{}", e))
- .and_then(|r| json::decode::<jsonrpc::OkResponse<RegisterServiceResponse>>(&r)
- .map_err(|e| error!("Couldn't parse response when registering in RVI\n{}", e)))
- .unwrap();
-
- resp.result
- .expect("Didn't get full service name when registering")
- .service
- }
-
- /// Starts the service edge.
- ///
- /// It binds on the provided `host:port` combination, registers all services and then waits for
- /// incoming RVI messages. On incoming messages it forks another thread and passes the message
- /// to the provided `Handler`. For details about how to implement a `Handler` see the
- /// [`hyper`](../../hyper/index.html) documentation and the [reference
- /// implementation](../handler/index.html).
- ///
- /// Panics if it can't reach or register in RVI.
- ///
- /// # Arguments
- /// * `h`: The `Handler` all messages are passed to.
- /// * `s`: A `Vector` of service strings to register in RVI.
- pub fn start(self) {
- let url = self.edge_url.clone();
- self.hdlr.register_services(|s| self.register_service(s));
- thread::spawn(move || {
- Server::http(&*url).and_then(|srv| {
- info!("Ready to accept connections.");
- srv.handle(self) })
- .map_err(|e| error!("Couldn't start server\n{}", e))
- .unwrap()
- });
- }
-
- /// Try to parse the type of a message and forward it to the appropriate message handler.
- /// Returns the result of the message handling or a `jsonrpc` result indicating a parser error.
- ///
- /// Needs to be extended to support new services.
- ///
- /// # Arguments
- /// * `message`: The message that will be parsed.
- fn handle_message(&self, message: &str)
- -> Result<OkResponse<i32>, ErrResponse> {
-
- let data = try!(
- Json::from_str(message)
- .map_err(|_| ErrResponse::parse_error()));
- let obj = try!(
- data.as_object().ok_or(ErrResponse::parse_error()));
- let rpc_id = try!(
- obj.get("id").and_then(|x| x.as_u64())
- .ok_or(ErrResponse::parse_error()));
-
- let method = try!(
- obj.get("method").and_then(|x| x.as_string())
- .ok_or(ErrResponse::invalid_request(rpc_id)));
-
- if method == "services_available" {
- Ok(OkResponse::new(rpc_id, None))
- }
- else if method != "message" {
- Err(ErrResponse::method_not_found(rpc_id))
- } else {
- let service = try!(obj.get("params")
- .and_then(|x| x.as_object())
- .and_then(|x| x.get("service_name"))
- .and_then(|x| x.as_string())
- .ok_or(ErrResponse::invalid_request(rpc_id)));
-
- self.hdlr.handle_service(rpc_id, service, message)
- }
- }
-}
-
-impl<H: ServiceHandler + 'static> Handler for ServiceEdge<H> {
- fn handle(&self, mut req: Request, resp: Response) {
- let mut rbody = String::new();
- try_or!(req.read_to_string(&mut rbody), return);
- debug!(">>> Received Message: {}", rbody);
- let mut resp = try_or!(resp.start(), return);
-
- macro_rules! send_response {
- ($rtype:ty, $resp:ident) => {
- match json::encode::<$rtype>(&$resp) {
- Ok(decoded_msg) => {
- try_or!(resp.write_all(decoded_msg.as_bytes()), return);
- debug!("<<< Sent Response: {}", decoded_msg);
- },
- Err(p) => { error!("{}", p); }
- }
- };
- }
-
- match self.handle_message(&rbody) {
- Ok(msg) => send_response!(OkResponse<i32>, msg),
- Err(msg) => send_response!(ErrResponse, msg)
- }
-
- try_or!(resp.end(), return);
- }
-}
diff --git a/src/remote/rvi/message.rs b/src/remote/rvi/message.rs
deleted file mode 100644
index 8adf596..0000000
--- a/src/remote/rvi/message.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-//! Wrappers for messages exchanged with RVI.
-
-use std::vec::Vec;
-use time;
-
-/// A generic incoming message.
-#[derive(RustcDecodable, RustcEncodable)]
-pub struct Message<T> {
- /// The service that got called.
- pub service_name: String,
- /// The paramaters to the service call.
- pub parameters: Vec<T>
-}
-
-/// A generic outgoing message.
-#[derive(RustcDecodable, RustcEncodable)]
-pub struct RVIMessage<T> {
- /// The service name to call.
- pub service_name: String,
- /// A timestamp when this message should expire. In UTC UNIX epoch.
- pub timeout: i64,
- /// The parameters to the service call.
- pub parameters: Vec<T>
-}
-
-impl<T> RVIMessage<T> {
- /// Create a new outgoing RVI message.
- ///
- /// # Arguments
- /// * `service`: The service name to call.
- /// * `parameters`: The parameters to the service call.
- /// * `tdelta`: Amount of seconds before the message will expire.
- pub fn new(service: &str,
- parameters: Vec<T>,
- tdelta: i64) -> RVIMessage<T> {
- let timeout = time::Duration::seconds(tdelta);
- RVIMessage {
- timeout: (time::get_time() + timeout).sec,
- service_name: service.to_string(),
- parameters: parameters
- }
- }
-}
-
-/// Encodes a registration request.
-#[derive(RustcEncodable)]
-pub struct RegisterServiceRequest {
- /// The network address where RVI can be reached.
- pub network_address: String,
- /// The service (short name) to register.
- pub service: String
-}
-
-/// Encodes a registration response.
-#[derive(RustcDecodable)]
-pub struct RegisterServiceResponse {
- /// Status number indicating success or failure. See the RVI documentation for details.
- pub status: i32,
- /// The full service URL, that RVI assigned.
- pub service: String
-}
diff --git a/src/remote/rvi/mod.rs b/src/remote/rvi/mod.rs
deleted file mode 100644
index a6791bc..0000000
--- a/src/remote/rvi/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-//! RVI bindings for Rust.
-//!
-//! RVI - Remote Vehicle Interaction - is the next generation of connected vehicle services. Based
-//! on the discussions inside and outside the Automotive Grade Linux expert group.
-//!
-//! This module implements Rust bindings to simplify the interaction with it.
-//!
-//! It is intended to be split out into a separate crate at some point in the future.
-
-mod edge;
-mod send;
-mod message;
-
-// Export public interface
-pub use super::rvi::edge::{ServiceEdge, ServiceHandler};
-pub use super::rvi::send::send;
-pub use super::rvi::send::send_message;
-pub use super::rvi::message::Message;
diff --git a/src/remote/rvi/send.rs b/src/remote/rvi/send.rs
deleted file mode 100644
index 8308203..0000000
--- a/src/remote/rvi/send.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-//! Helper functions for sending messages to RVI.
-
-use std::io::Read;
-use hyper::Client;
-use rustc_serialize::{json, Encodable};
-
-use remote::jsonrpc;
-use remote::rvi::message::RVIMessage;
-
-/// Send a object to RVI. Either returns the full response from RVI or a error message.
-///
-/// The object will get encoded to json. Apart from that no sanity checks are made. You usually
-/// don't need this function.
-///
-/// # Arguments
-/// * `url`: The full URL where RVI can be reached.
-/// * `b`: The object to encode and send to RVI.
-pub fn send<E: Encodable>(url: &str, b: &E) -> Result<String, String> {
- let client = Client::new();
-
- let mut resp = try!(json::encode(b)
- .map_err(|e| format!("{}", e))
- .and_then(|j| {
- debug!("<<< Sent Message: {}", j);
- client.post(url).body(&j).send()
- .map_err(|e| format!("{}", e))
- }));
-
- let mut rbody = String::new();
- try!(resp.read_to_string(&mut rbody)
- .map_err(|e| format!("{}", e)));
- debug!(">>> Received Response: {}", rbody);
- Ok(rbody)
-}
-
-/// Prepare a message and send it to RVI. Returns the full response from RVI on success or a error
-/// message on failure.
-///
-/// This wraps the provided object into a proper RVI message and encodes it to json. You usually
-/// should call this function.
-///
-/// **NOTE:** This currently implements a workaround for RVI, that will get fixed in the upcoming
-/// RVI version `0.5.0`, which will break this current implementation. For the new protocol you
-/// don't have to wrap the `params` in a one element `Vector` any more.
-///
-/// # Arguments
-/// * `url`: The full URL where RVI can be reached.
-/// * `b`: The object to wrap into a RVI Message, encode and send to RVI.
-/// * `addr`: The full RVI address (service URL) where this message should be sent to.
-#[cfg(not(test))]
-pub fn send_message<E: Encodable>(url: &str, b: E, addr: &str) -> Result<String, String> {
- let mut params = Vec::new();
- params.push(b);
- let message = RVIMessage::<E>::new(addr, params, 90);
- let json_rpc = jsonrpc::Request::new("message", message);
- send(url, &json_rpc)
-}
-#[cfg(test)]
-pub fn send_message<E: Encodable>(url: &str, _: E, addr: &str) -> Result<String, String> {
- Ok(format!("Faked sending to RVI: {}, {}", url, addr))
-}
diff --git a/src/remote/svc.rs b/src/remote/svc.rs
deleted file mode 100644
index 873cb80..0000000
--- a/src/remote/svc.rs
+++ /dev/null
@@ -1,272 +0,0 @@
-//! The main service handler
-//!
-//! Parses incoming messages and delegates them to the appropriate individual message handlers,
-//! passing on the results to the [`main_loop`](../main_loop/index.html)
-
-use std::ops::Deref;
-use std::sync::{Arc, Mutex};
-use std::sync::mpsc::Sender;
-use std::thread;
-use std::thread::sleep_ms;
-
-use rustc_serialize::{json, Decodable};
-use time;
-
-use event::{Event, UpdateId};
-use event::inbound::InboundEvent;
-use event::outbound::{UpdateReport, InstalledSoftware};
-
-use super::parm::{NotifyParams, StartParams, ChunkParams, ChunkReceived, FinishParams};
-use super::parm::{ReportParams, AbortParams, ParamHandler};
-use super::dw::Transfers;
-
-use super::jsonrpc;
-use super::jsonrpc::{OkResponse, ErrResponse};
-use super::rvi;
-
-use configuration::ClientConfiguration;
-
-/// Encodes the list of service URLs the client registered.
-///
-/// Needs to be extended to introduce new services.
-#[derive(RustcEncodable, Clone)]
-pub struct LocalServices {
- /// "Start Download" URL.
- pub start: String,
- /// "Chunk" URL.
- pub chunk: String,
- /// "Abort Download" URL.
- pub abort: String,
- /// "Finish Download" URL.
- pub finish: String,
- /// "Get All Packages" URL.
- pub getpackages: String,
-}
-
-impl LocalServices {
- /// Returns the VIN of this device.
- ///
- /// # Arguments
- /// * `vin_match`: The index, where to look for the VIN in the service URL.
- pub fn get_vin(&self, vin_match: i32) -> String {
- self.start.split("/").nth(vin_match as usize).unwrap().to_string()
- }
-}
-
-/// Encodes the service URLs, that the server provides.
-#[derive(RustcDecodable, Clone)]
-pub struct BackendServices {
- /// URL for the "Start Download" call.
- pub start: String,
- /// URL for the "Chunk Received" call.
- pub ack: String,
- /// URL for the "Installation Report" call.
- pub report: String,
- /// URL for the "Get All Packages" call.
- pub packages: String
-}
-
-#[derive(RustcEncodable, Clone)]
-struct StartDownload {
- vin: String,
- update_id: UpdateId,
- services: LocalServices,
-}
-
-#[derive(RustcEncodable, Clone)]
-struct UpdateResult {
- vin: String,
- update_report: UpdateReport
-}
-
-#[derive(RustcEncodable, Clone)]
-struct InstalledSoftwareResult {
- vin: String,
- installed_software: InstalledSoftware
-}
-
-pub struct RemoteServices {
- pub vin: String,
- url: String,
- local_svcs: Option<LocalServices>,
- svcs: Option<BackendServices>
-}
-
-impl RemoteServices {
- pub fn new(url: String) -> RemoteServices {
- RemoteServices {
- vin: String::new(),
- url: url,
- local_svcs: None,
- svcs: None
- }
- }
-
- pub fn set_remote(&mut self, vin: String, svcs: LocalServices) {
- self.vin = vin;
- self.local_svcs = Some(svcs);
- }
-
- pub fn set(&mut self, svcs: BackendServices) {
- self.svcs = Some(svcs);
- }
-
- pub fn send_chunk_received(&self, m: ChunkReceived) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(&self.url, m, &svcs.ack))
- }
-
- fn make_start_download(&self, id: UpdateId) -> StartDownload {
- StartDownload {
- vin: self.vin.clone(),
- services: self.local_svcs.iter().next().cloned().unwrap(),
- update_id: id
- }
- }
-
- pub fn send_start_download(&self, id: UpdateId) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(
- &self.url,
- self.make_start_download(id),
- &svcs.start))
- }
-
- pub fn send_update_report(&self, m: UpdateReport) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(
- &self.url,
- UpdateResult {
- vin: self.vin.clone(),
- update_report: m },
- &svcs.report))
- }
-
- pub fn send_installed_software(&self, m: InstalledSoftware) -> Result<String, String> {
- self.svcs.iter().next().ok_or(format!("RemoteServices not set"))
- .and_then(|ref svcs| rvi::send_message(
- &self.url,
- InstalledSoftwareResult {
- vin: self.vin.clone(),
- installed_software: m },
- &svcs.packages))
- }
-}
-
-
-/// Type that encodes a single service handler.
-///
-/// Holds the necessary state, like in-progress transfers, that are needed for handling incoming
-/// messages and sending replies to RVI. Needs to be thread safe as
-/// [`hyper`](../../../hyper/index.html) handles requests asynchronously.
-pub struct ServiceHandler {
- /// A `Sender` that connects the handlers with the `main_loop`.
- sender: Mutex<Sender<Event>>,
- /// The currently in-progress `Transfer`s.
- transfers: Arc<Mutex<Transfers>>,
- /// The service URLs that the SOTA server advertised.
- remote_services: Arc<Mutex<RemoteServices>>,
- /// The full `Configuration` of sota_client.
- conf: ClientConfiguration
-}
-
-impl ServiceHandler {
- /// Create a new `ServiceHandler`.
- ///
- /// # Arguments
- /// * `transfers`: A `Transfers` object to store the in-progress `Transfer`s.
- /// * `sender`: A `Sender` to call back into the `main_loop`.
- /// * `url`: The full URL, where RVI can be reached.
- /// * `c`: The full `Configuration` of sota_client.
- pub fn new(sender: Sender<Event>,
- r: Arc<Mutex<RemoteServices>>,
- c: ClientConfiguration) -> ServiceHandler {
- let transfers = Arc::new(Mutex::new(Transfers::new(c.storage_dir.clone())));
- let tc = transfers.clone();
- c.timeout
- .map(|t| {
- let _ = thread::spawn(move || ServiceHandler::start_timer(tc.deref(), t));
- info!("Transfers timeout after {}", t)})
- .unwrap_or(info!("No timeout configured, transfers will never time out."));
-
- ServiceHandler {
- sender: Mutex::new(sender),
- transfers: transfers,
- remote_services: r,
- conf: c
- }
- }
-
- /// Starts a infinite loop to expire timed out transfers. Checks once a second for timed out
- /// transfers.
- ///
- /// # Arguments
- /// * `transfers`: Pointer to a `Transfers` object, that stores the transfers to be checked for
- /// expired timeouts.
- /// * `timeout`: The timeout in seconds.
- pub fn start_timer(transfers: &Mutex<Transfers>,
- timeout: i64) {
- loop {
- sleep_ms(1000);
- let mut transfers = transfers.lock().unwrap();
- transfers.prune(time::get_time().sec, timeout);
- }
- }
-
- /// Helper function to send a `Event` to the `main_loop`.
- ///
- /// # Arguments
- /// * `e`: `Event` to send.
- fn push_notify(&self, e: InboundEvent) {
- try_or!(self.sender.lock().unwrap().send(Event::Inbound(e)), return);
- }
-
- /// Create a message handler `D`, and let it process the `message`. If it returns a
- /// Event, forward it to the `main_loop`. Returns a `jsonrpc` response indicating
- /// success or failure.
- ///
- /// # Arguments
- /// * `message`: The message, that should be handled.
- fn handle_message_params<D>(&self, id: u64, message: &str)
- -> Result<OkResponse<i32>, ErrResponse>
- where D: Decodable + ParamHandler {
- json::decode::<jsonrpc::Request<rvi::Message<D>>>(&message)
- .map_err(|_| ErrResponse::invalid_params(id))
- .and_then(|p| {
- let handler = &p.params.parameters[0];
- handler.handle(&self.remote_services, &self.transfers)
- .map_err(|_| ErrResponse::unspecified(p.id))
- .map(|r| {
- r.map(|m| self.push_notify(m));
- OkResponse::new(p.id, None) })
- })
- }
-}
-
-impl rvi::ServiceHandler for ServiceHandler {
- fn handle_service(&self, id: u64, service: &str, message: &str)
- -> Result<OkResponse<i32>, ErrResponse> {
- match service {
- "/sota/notify" => self.handle_message_params::<NotifyParams>(id, message),
- "/sota/start" => self.handle_message_params::<StartParams>(id, message),
- "/sota/chunk" => self.handle_message_params::<ChunkParams>(id, message),
- "/sota/finish" => self.handle_message_params::<FinishParams>(id, message),
- "/sota/abort" => self.handle_message_params::<AbortParams>(id, message),
- "/sota/getpackages" => self.handle_message_params::<ReportParams>(id, message),
- _ => Err(ErrResponse::invalid_request(id))
- }
- }
-
- fn register_services<F: Fn(&str) -> String>(&self, reg: F) {
- reg("/sota/notify");
- let mut remote_svcs = self.remote_services.lock().unwrap();
- let svcs = LocalServices {
- start: reg("/sota/start"),
- chunk: reg("/sota/chunk"),
- abort: reg("/sota/abort"),
- finish: reg("/sota/finish"),
- getpackages: reg("/sota/getpackages")
- };
- remote_svcs.set_remote(svcs.get_vin(self.conf.vin_match), svcs);
- }
-}