summaryrefslogtreecommitdiff
path: root/lib/rs/src/server/multiplexed.rs
diff options
context:
space:
mode:
authorAllen George <allen.george@gmail.com>2017-01-30 07:15:00 -0500
committerJames E. King, III <jking@apache.org>2017-04-27 08:46:02 -0400
commit0e22c362b967bd3765ee3da349faa789904a0707 (patch)
treecf7271e15659c1181abb6ed8c57b599d79d026f3 /lib/rs/src/server/multiplexed.rs
parent9db23b7be330f47037b4e3e5e374eda5e38b0dfd (diff)
downloadthrift-0e22c362b967bd3765ee3da349faa789904a0707.tar.gz
THRIFT-4176: Implement threaded server for Rust
Client: rs * Create a TIoChannel construct * Separate TTransport into TReadTransport and TWriteTransport * Restructure types to avoid shared ownership * Remove user-visible boxing and ref-counting * Replace TSimpleServer with a thread-pool based TServer This closes #1255
Diffstat (limited to 'lib/rs/src/server/multiplexed.rs')
-rw-r--r--lib/rs/src/server/multiplexed.rs70
1 files changed, 44 insertions, 26 deletions
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index d2314a12a..b1243a86f 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -17,9 +17,10 @@
use std::collections::HashMap;
use std::convert::Into;
+use std::sync::{Arc, Mutex};
-use ::{new_application_error, ApplicationErrorKind};
-use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
+use {ApplicationErrorKind, new_application_error};
+use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
use super::TProcessor;
@@ -33,8 +34,9 @@ use super::TProcessor;
///
/// A `TMultiplexedProcessor` can only handle messages sent by a
/// `TMultiplexedOutputProtocol`.
+// FIXME: implement Debug
pub struct TMultiplexedProcessor {
- processors: HashMap<String, Box<TProcessor>>,
+ processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
}
impl TMultiplexedProcessor {
@@ -46,46 +48,62 @@ impl TMultiplexedProcessor {
/// Return `false` if a mapping previously existed (the previous mapping is
/// *not* overwritten).
#[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
- pub fn register_processor<S: Into<String>>(&mut self,
- service_name: S,
- processor: Box<TProcessor>)
- -> bool {
+ pub fn register_processor<S: Into<String>>(
+ &mut self,
+ service_name: S,
+ processor: Box<TProcessor>,
+ ) -> bool {
+ let mut processors = self.processors.lock().unwrap();
+
let name = service_name.into();
- if self.processors.contains_key(&name) {
+ if processors.contains_key(&name) {
false
} else {
- self.processors.insert(name, processor);
+ processors.insert(name, Arc::new(processor));
true
}
}
}
impl TProcessor for TMultiplexedProcessor {
- fn process(&mut self,
- i_prot: &mut TInputProtocol,
- o_prot: &mut TOutputProtocol)
- -> ::Result<()> {
+ fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
let msg_ident = i_prot.read_message_begin()?;
- let sep_index = msg_ident.name
+ let sep_index = msg_ident
+ .name
.find(':')
- .ok_or_else(|| {
- new_application_error(ApplicationErrorKind::Unknown,
- "no service separator found in incoming message")
- })?;
+ .ok_or_else(
+ || {
+ new_application_error(
+ ApplicationErrorKind::Unknown,
+ "no service separator found in incoming message",
+ )
+ },
+ )?;
let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
- match self.processors.get_mut(svc_name) {
- Some(ref mut processor) => {
- let new_msg_ident = TMessageIdentifier::new(svc_call,
- msg_ident.message_type,
- msg_ident.sequence_number);
+ let processor: Option<Arc<Box<TProcessor>>> = {
+ let processors = self.processors.lock().unwrap();
+ processors.get(svc_name).cloned()
+ };
+
+ match processor {
+ Some(arc) => {
+ let new_msg_ident = TMessageIdentifier::new(
+ svc_call,
+ msg_ident.message_type,
+ msg_ident.sequence_number,
+ );
let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
- processor.process(&mut proxy_i_prot, o_prot)
+ (*arc).process(&mut proxy_i_prot, o_prot)
}
None => {
- Err(new_application_error(ApplicationErrorKind::Unknown,
- format!("no processor found for service {}", svc_name)))
+ Err(
+ new_application_error(
+ ApplicationErrorKind::Unknown,
+ format!("no processor found for service {}", svc_name),
+ ),
+ )
}
}
}