diff options
author | Allen George <allen.george@gmail.com> | 2017-01-30 07:15:00 -0500 |
---|---|---|
committer | James E. King, III <jking@apache.org> | 2017-04-27 08:46:02 -0400 |
commit | 0e22c362b967bd3765ee3da349faa789904a0707 (patch) | |
tree | cf7271e15659c1181abb6ed8c57b599d79d026f3 /tutorial | |
parent | 9db23b7be330f47037b4e3e5e374eda5e38b0dfd (diff) | |
download | thrift-0e22c362b967bd3765ee3da349faa789904a0707.tar.gz |
THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
Diffstat (limited to 'tutorial')
-rw-r--r-- | tutorial/rs/README.md | 43 | ||||
-rw-r--r-- | tutorial/rs/src/bin/tutorial_client.rs | 51 | ||||
-rw-r--r-- | tutorial/rs/src/bin/tutorial_server.rs | 70 |
3 files changed, 77 insertions, 87 deletions
diff --git a/tutorial/rs/README.md b/tutorial/rs/README.md index 4d0d7c8af..384e9f8bb 100644 --- a/tutorial/rs/README.md +++ b/tutorial/rs/README.md @@ -35,13 +35,12 @@ extern crate thrift; extern crate try_from; // generated Rust module -mod tutorial; +use tutorial; -use std::cell::RefCell; -use std::rc::Rc; -use thrift::protocol::{TInputProtocol, TOutputProtocol}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; -use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; +use thrift::protocol::{TInputProtocol, TOutputProtocol}; +use thrift::transport::{TFramedReadTransport, TFramedWriteTransport}; +use thrift::transport::{TIoChannel, TTcpChannel}; use tutorial::{CalculatorSyncClient, TCalculatorSyncClient}; use tutorial::{Operation, Work}; @@ -61,28 +60,16 @@ fn run() -> thrift::Result<()> { // println!("connect to server on 127.0.0.1:9090"); - let mut t = TTcpTransport::new(); - let t = match t.open("127.0.0.1:9090") { - Ok(()) => t, - Err(e) => { - return Err( - format!("failed to connect with {:?}", e).into() - ); - } - }; - - let t = Rc::new(RefCell::new( - Box::new(t) as Box<TTransport> - )); - let t = Rc::new(RefCell::new( - Box::new(TFramedTransport::new(t)) as Box<TTransport> - )); + let mut c = TTcpTransport::new(); + c.open("127.0.0.1:9090")?; - let i_prot: Box<TInputProtocol> = Box::new( - TCompactInputProtocol::new(t.clone()) + let (i_chan, o_chan) = c.split()?; + + let i_prot = TCompactInputProtocol::new( + TFramedReadTransport::new(i_chan) ); - let o_prot: Box<TOutputProtocol> = Box::new( - TCompactOutputProtocol::new(t.clone()) + let o_prot = TCompactOutputProtocol::new( + TFramedWriteTransport::new(o_chan) ); let client = CalculatorSyncClient::new(i_prot, o_prot); @@ -177,10 +164,10 @@ A typedef is translated to a `pub type` declaration. ```thrift typedef i64 UserId -typedef map<string, Bonk> MapType +typedef map<string, UserId> MapType ``` ```rust -pub type UserId = 164; +pub type UserId = i64; pub type MapType = BTreeMap<String, Bonk>; ``` @@ -327,4 +314,4 @@ pub struct Foo { ## Known Issues * Struct constants are not supported -* Map, list and set constants require a const holder struct
\ No newline at end of file +* Map, list and set constants require a const holder struct diff --git a/tutorial/rs/src/bin/tutorial_client.rs b/tutorial/rs/src/bin/tutorial_client.rs index 2b0d4f908..24ab4be06 100644 --- a/tutorial/rs/src/bin/tutorial_client.rs +++ b/tutorial/rs/src/bin/tutorial_client.rs @@ -21,15 +21,12 @@ extern crate clap; extern crate thrift; extern crate thrift_tutorial; -use std::cell::RefCell; -use std::rc::Rc; - -use thrift::protocol::{TInputProtocol, TOutputProtocol}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; -use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; +use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel, + TTcpChannel, WriteHalf}; use thrift_tutorial::shared::TSharedServiceSyncClient; -use thrift_tutorial::tutorial::{CalculatorSyncClient, TCalculatorSyncClient, Operation, Work}; +use thrift_tutorial::tutorial::{CalculatorSyncClient, Operation, TCalculatorSyncClient, Work}; fn main() { match run() { @@ -73,7 +70,8 @@ fn run() -> thrift::Result<()> { let logid = 32; // let's do...a multiply! - let res = client.calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?; + let res = client + .calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?; println!("multiplied 7 and 8 and got {}", res); // let's get the log for it @@ -102,34 +100,31 @@ fn run() -> thrift::Result<()> { Ok(()) } -fn new_client(host: &str, port: u16) -> thrift::Result<CalculatorSyncClient> { - let mut t = TTcpTransport::new(); +type ClientInputProtocol = TCompactInputProtocol<TFramedReadTransport<ReadHalf<TTcpChannel>>>; +type ClientOutputProtocol = TCompactOutputProtocol<TFramedWriteTransport<WriteHalf<TTcpChannel>>>; + +fn new_client + ( + host: &str, + port: u16, +) -> thrift::Result<CalculatorSyncClient<ClientInputProtocol, ClientOutputProtocol>> { + let mut c = TTcpChannel::new(); // open the underlying TCP stream println!("connecting to tutorial server on {}:{}", host, port); - let t = match t.open(&format!("{}:{}", host, port)) { - Ok(()) => t, - Err(e) => { - return Err(format!("failed to open tcp stream to {}:{} error:{:?}", - host, - port, - e) - .into()); - } - }; - - // refcounted because it's shared by both input and output transports - let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>)); + c.open(&format!("{}:{}", host, port))?; - // wrap a raw socket (slow) with a buffered transport of some kind - let t = Box::new(TFramedTransport::new(t)) as Box<TTransport>; + // clone the TCP channel into two halves, one which + // we'll use for reading, the other for writing + let (i_chan, o_chan) = c.split()?; - // refcounted again because it's shared by both input and output protocols - let t = Rc::new(RefCell::new(t)); + // wrap the raw sockets (slow) with a buffered transport of some kind + let i_tran = TFramedReadTransport::new(i_chan); + let o_tran = TFramedWriteTransport::new(o_chan); // now create the protocol implementations - let i_prot = Box::new(TCompactInputProtocol::new(t.clone())) as Box<TInputProtocol>; - let o_prot = Box::new(TCompactOutputProtocol::new(t.clone())) as Box<TOutputProtocol>; + let i_prot = TCompactInputProtocol::new(i_tran); + let o_prot = TCompactOutputProtocol::new(o_tran); // we're done! Ok(CalculatorSyncClient::new(i_prot, o_prot)) diff --git a/tutorial/rs/src/bin/tutorial_server.rs b/tutorial/rs/src/bin/tutorial_server.rs index 9cc186649..8db8eed26 100644 --- a/tutorial/rs/src/bin/tutorial_server.rs +++ b/tutorial/rs/src/bin/tutorial_server.rs @@ -24,12 +24,12 @@ extern crate thrift_tutorial; use std::collections::HashMap; use std::convert::{From, Into}; use std::default::Default; +use std::sync::Mutex; -use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; use thrift::protocol::{TCompactInputProtocolFactory, TCompactOutputProtocolFactory}; -use thrift::server::TSimpleServer; +use thrift::server::TServer; -use thrift::transport::{TFramedTransportFactory, TTransportFactory}; +use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory}; use thrift_tutorial::shared::{SharedServiceSyncHandler, SharedStruct}; use thrift_tutorial::tutorial::{CalculatorSyncHandler, CalculatorSyncProcessor}; use thrift_tutorial::tutorial::{InvalidOperation, Operation, Work}; @@ -58,33 +58,36 @@ fn run() -> thrift::Result<()> { println!("binding to {}", listen_address); - let i_tran_fact: Box<TTransportFactory> = Box::new(TFramedTransportFactory::new()); - let i_prot_fact: Box<TInputProtocolFactory> = Box::new(TCompactInputProtocolFactory::new()); + let i_tran_fact = TFramedReadTransportFactory::new(); + let i_prot_fact = TCompactInputProtocolFactory::new(); - let o_tran_fact: Box<TTransportFactory> = Box::new(TFramedTransportFactory::new()); - let o_prot_fact: Box<TOutputProtocolFactory> = Box::new(TCompactOutputProtocolFactory::new()); + let o_tran_fact = TFramedWriteTransportFactory::new(); + let o_prot_fact = TCompactOutputProtocolFactory::new(); // demux incoming messages let processor = CalculatorSyncProcessor::new(CalculatorServer { ..Default::default() }); // create the server and start listening - let mut server = TSimpleServer::new(i_tran_fact, - i_prot_fact, - o_tran_fact, - o_prot_fact, - processor); + let mut server = TServer::new( + i_tran_fact, + i_prot_fact, + o_tran_fact, + o_prot_fact, + processor, + 10, + ); server.listen(&listen_address) } /// Handles incoming Calculator service calls. struct CalculatorServer { - log: HashMap<i32, SharedStruct>, + log: Mutex<HashMap<i32, SharedStruct>>, } impl Default for CalculatorServer { fn default() -> CalculatorServer { - CalculatorServer { log: HashMap::new() } + CalculatorServer { log: Mutex::new(HashMap::new()) } } } @@ -94,9 +97,9 @@ impl Default for CalculatorServer { // SharedService handler impl SharedServiceSyncHandler for CalculatorServer { - fn handle_get_struct(&mut self, key: i32) -> thrift::Result<SharedStruct> { - self.log - .get(&key) + fn handle_get_struct(&self, key: i32) -> thrift::Result<SharedStruct> { + let log = self.log.lock().unwrap(); + log.get(&key) .cloned() .ok_or_else(|| format!("could not find log for key {}", key).into()) } @@ -104,25 +107,27 @@ impl SharedServiceSyncHandler for CalculatorServer { // Calculator handler impl CalculatorSyncHandler for CalculatorServer { - fn handle_ping(&mut self) -> thrift::Result<()> { + fn handle_ping(&self) -> thrift::Result<()> { println!("pong!"); Ok(()) } - fn handle_add(&mut self, num1: i32, num2: i32) -> thrift::Result<i32> { + fn handle_add(&self, num1: i32, num2: i32) -> thrift::Result<i32> { println!("handling add: n1:{} n2:{}", num1, num2); Ok(num1 + num2) } - fn handle_calculate(&mut self, logid: i32, w: Work) -> thrift::Result<i32> { + fn handle_calculate(&self, logid: i32, w: Work) -> thrift::Result<i32> { println!("handling calculate: l:{}, w:{:?}", logid, w); let res = if let Some(ref op) = w.op { if w.num1.is_none() || w.num2.is_none() { - Err(InvalidOperation { - what_op: Some(*op as i32), - why: Some("no operands specified".to_owned()), - }) + Err( + InvalidOperation { + what_op: Some(*op as i32), + why: Some("no operands specified".to_owned()), + }, + ) } else { // so that I don't have to call unwrap() multiple times below let num1 = w.num1.as_ref().expect("operands checked"); @@ -134,10 +139,12 @@ impl CalculatorSyncHandler for CalculatorServer { Operation::MULTIPLY => Ok(num1 * num2), Operation::DIVIDE => { if *num2 == 0 { - Err(InvalidOperation { - what_op: Some(*op as i32), - why: Some("divide by 0".to_owned()), - }) + Err( + InvalidOperation { + what_op: Some(*op as i32), + why: Some("divide by 0".to_owned()), + }, + ) } else { Ok(num1 / num2) } @@ -145,12 +152,13 @@ impl CalculatorSyncHandler for CalculatorServer { } } } else { - Err(InvalidOperation::new(None, "no operation specified".to_owned())) + Err(InvalidOperation::new(None, "no operation specified".to_owned()),) }; // if the operation was successful log it if let Ok(ref v) = res { - self.log.insert(logid, SharedStruct::new(logid, format!("{}", v))); + let mut log = self.log.lock().unwrap(); + log.insert(logid, SharedStruct::new(logid, format!("{}", v))); } // the try! macro automatically maps errors @@ -161,7 +169,7 @@ impl CalculatorSyncHandler for CalculatorServer { res.map_err(From::from) } - fn handle_zip(&mut self) -> thrift::Result<()> { + fn handle_zip(&self) -> thrift::Result<()> { println!("handling zip"); Ok(()) } |