diff options
author | Allen George <allen.george@gmail.com> | 2017-01-30 07:15:00 -0500 |
---|---|---|
committer | James E. King, III <jking@apache.org> | 2017-04-27 08:46:02 -0400 |
commit | 0e22c362b967bd3765ee3da349faa789904a0707 (patch) | |
tree | cf7271e15659c1181abb6ed8c57b599d79d026f3 /lib/rs/src/server/multiplexed.rs | |
parent | 9db23b7be330f47037b4e3e5e374eda5e38b0dfd (diff) | |
download | thrift-0e22c362b967bd3765ee3da349faa789904a0707.tar.gz |
THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
Diffstat (limited to 'lib/rs/src/server/multiplexed.rs')
-rw-r--r-- | lib/rs/src/server/multiplexed.rs | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs index d2314a12a..b1243a86f 100644 --- a/lib/rs/src/server/multiplexed.rs +++ b/lib/rs/src/server/multiplexed.rs @@ -17,9 +17,10 @@ use std::collections::HashMap; use std::convert::Into; +use std::sync::{Arc, Mutex}; -use ::{new_application_error, ApplicationErrorKind}; -use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; +use {ApplicationErrorKind, new_application_error}; +use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; use super::TProcessor; @@ -33,8 +34,9 @@ use super::TProcessor; /// /// A `TMultiplexedProcessor` can only handle messages sent by a /// `TMultiplexedOutputProtocol`. +// FIXME: implement Debug pub struct TMultiplexedProcessor { - processors: HashMap<String, Box<TProcessor>>, + processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>, } impl TMultiplexedProcessor { @@ -46,46 +48,62 @@ impl TMultiplexedProcessor { /// Return `false` if a mapping previously existed (the previous mapping is /// *not* overwritten). #[cfg_attr(feature = "cargo-clippy", allow(map_entry))] - pub fn register_processor<S: Into<String>>(&mut self, - service_name: S, - processor: Box<TProcessor>) - -> bool { + pub fn register_processor<S: Into<String>>( + &mut self, + service_name: S, + processor: Box<TProcessor>, + ) -> bool { + let mut processors = self.processors.lock().unwrap(); + let name = service_name.into(); - if self.processors.contains_key(&name) { + if processors.contains_key(&name) { false } else { - self.processors.insert(name, processor); + processors.insert(name, Arc::new(processor)); true } } } impl TProcessor for TMultiplexedProcessor { - fn process(&mut self, - i_prot: &mut TInputProtocol, - o_prot: &mut TOutputProtocol) - -> ::Result<()> { + fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> { let msg_ident = i_prot.read_message_begin()?; - let sep_index = msg_ident.name + let sep_index = msg_ident + .name .find(':') - .ok_or_else(|| { - new_application_error(ApplicationErrorKind::Unknown, - "no service separator found in incoming message") - })?; + .ok_or_else( + || { + new_application_error( + ApplicationErrorKind::Unknown, + "no service separator found in incoming message", + ) + }, + )?; let (svc_name, svc_call) = msg_ident.name.split_at(sep_index); - match self.processors.get_mut(svc_name) { - Some(ref mut processor) => { - let new_msg_ident = TMessageIdentifier::new(svc_call, - msg_ident.message_type, - msg_ident.sequence_number); + let processor: Option<Arc<Box<TProcessor>>> = { + let processors = self.processors.lock().unwrap(); + processors.get(svc_name).cloned() + }; + + match processor { + Some(arc) => { + let new_msg_ident = TMessageIdentifier::new( + svc_call, + msg_ident.message_type, + msg_ident.sequence_number, + ); let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident); - processor.process(&mut proxy_i_prot, o_prot) + (*arc).process(&mut proxy_i_prot, o_prot) } None => { - Err(new_application_error(ApplicationErrorKind::Unknown, - format!("no processor found for service {}", svc_name))) + Err( + new_application_error( + ApplicationErrorKind::Unknown, + format!("no processor found for service {}", svc_name), + ), + ) } } } |