summaryrefslogtreecommitdiff
path: root/tutorial
diff options
context:
space:
mode:
authorAllen George <allen.george@gmail.com>2017-01-30 07:15:00 -0500
committerJames E. King, III <jking@apache.org>2017-04-27 08:46:02 -0400
commit0e22c362b967bd3765ee3da349faa789904a0707 (patch)
treecf7271e15659c1181abb6ed8c57b599d79d026f3 /tutorial
parent9db23b7be330f47037b4e3e5e374eda5e38b0dfd (diff)
downloadthrift-0e22c362b967bd3765ee3da349faa789904a0707.tar.gz
THRIFT-4176: Implement threaded server for Rust
Client: rs * Create a TIoChannel construct * Separate TTransport into TReadTransport and TWriteTransport * Restructure types to avoid shared ownership * Remove user-visible boxing and ref-counting * Replace TSimpleServer with a thread-pool based TServer This closes #1255
Diffstat (limited to 'tutorial')
-rw-r--r--tutorial/rs/README.md43
-rw-r--r--tutorial/rs/src/bin/tutorial_client.rs51
-rw-r--r--tutorial/rs/src/bin/tutorial_server.rs70
3 files changed, 77 insertions, 87 deletions
diff --git a/tutorial/rs/README.md b/tutorial/rs/README.md
index 4d0d7c8af..384e9f8bb 100644
--- a/tutorial/rs/README.md
+++ b/tutorial/rs/README.md
@@ -35,13 +35,12 @@ extern crate thrift;
extern crate try_from;
// generated Rust module
-mod tutorial;
+use tutorial;
-use std::cell::RefCell;
-use std::rc::Rc;
-use thrift::protocol::{TInputProtocol, TOutputProtocol};
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::protocol::{TInputProtocol, TOutputProtocol};
+use thrift::transport::{TFramedReadTransport, TFramedWriteTransport};
+use thrift::transport::{TIoChannel, TTcpChannel};
use tutorial::{CalculatorSyncClient, TCalculatorSyncClient};
use tutorial::{Operation, Work};
@@ -61,28 +60,16 @@ fn run() -> thrift::Result<()> {
//
println!("connect to server on 127.0.0.1:9090");
- let mut t = TTcpTransport::new();
- let t = match t.open("127.0.0.1:9090") {
- Ok(()) => t,
- Err(e) => {
- return Err(
- format!("failed to connect with {:?}", e).into()
- );
- }
- };
-
- let t = Rc::new(RefCell::new(
- Box::new(t) as Box<TTransport>
- ));
- let t = Rc::new(RefCell::new(
- Box::new(TFramedTransport::new(t)) as Box<TTransport>
- ));
+ let mut c = TTcpTransport::new();
+ c.open("127.0.0.1:9090")?;
- let i_prot: Box<TInputProtocol> = Box::new(
- TCompactInputProtocol::new(t.clone())
+ let (i_chan, o_chan) = c.split()?;
+
+ let i_prot = TCompactInputProtocol::new(
+ TFramedReadTransport::new(i_chan)
);
- let o_prot: Box<TOutputProtocol> = Box::new(
- TCompactOutputProtocol::new(t.clone())
+ let o_prot = TCompactOutputProtocol::new(
+ TFramedWriteTransport::new(o_chan)
);
let client = CalculatorSyncClient::new(i_prot, o_prot);
@@ -177,10 +164,10 @@ A typedef is translated to a `pub type` declaration.
```thrift
typedef i64 UserId
-typedef map<string, Bonk> MapType
+typedef map<string, UserId> MapType
```
```rust
-pub type UserId = 164;
+pub type UserId = i64;
pub type MapType = BTreeMap<String, Bonk>;
```
@@ -327,4 +314,4 @@ pub struct Foo {
## Known Issues
* Struct constants are not supported
-* Map, list and set constants require a const holder struct \ No newline at end of file
+* Map, list and set constants require a const holder struct
diff --git a/tutorial/rs/src/bin/tutorial_client.rs b/tutorial/rs/src/bin/tutorial_client.rs
index 2b0d4f908..24ab4be06 100644
--- a/tutorial/rs/src/bin/tutorial_client.rs
+++ b/tutorial/rs/src/bin/tutorial_client.rs
@@ -21,15 +21,12 @@ extern crate clap;
extern crate thrift;
extern crate thrift_tutorial;
-use std::cell::RefCell;
-use std::rc::Rc;
-
-use thrift::protocol::{TInputProtocol, TOutputProtocol};
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel,
+ TTcpChannel, WriteHalf};
use thrift_tutorial::shared::TSharedServiceSyncClient;
-use thrift_tutorial::tutorial::{CalculatorSyncClient, TCalculatorSyncClient, Operation, Work};
+use thrift_tutorial::tutorial::{CalculatorSyncClient, Operation, TCalculatorSyncClient, Work};
fn main() {
match run() {
@@ -73,7 +70,8 @@ fn run() -> thrift::Result<()> {
let logid = 32;
// let's do...a multiply!
- let res = client.calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?;
+ let res = client
+ .calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?;
println!("multiplied 7 and 8 and got {}", res);
// let's get the log for it
@@ -102,34 +100,31 @@ fn run() -> thrift::Result<()> {
Ok(())
}
-fn new_client(host: &str, port: u16) -> thrift::Result<CalculatorSyncClient> {
- let mut t = TTcpTransport::new();
+type ClientInputProtocol = TCompactInputProtocol<TFramedReadTransport<ReadHalf<TTcpChannel>>>;
+type ClientOutputProtocol = TCompactOutputProtocol<TFramedWriteTransport<WriteHalf<TTcpChannel>>>;
+
+fn new_client
+ (
+ host: &str,
+ port: u16,
+) -> thrift::Result<CalculatorSyncClient<ClientInputProtocol, ClientOutputProtocol>> {
+ let mut c = TTcpChannel::new();
// open the underlying TCP stream
println!("connecting to tutorial server on {}:{}", host, port);
- let t = match t.open(&format!("{}:{}", host, port)) {
- Ok(()) => t,
- Err(e) => {
- return Err(format!("failed to open tcp stream to {}:{} error:{:?}",
- host,
- port,
- e)
- .into());
- }
- };
-
- // refcounted because it's shared by both input and output transports
- let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
+ c.open(&format!("{}:{}", host, port))?;
- // wrap a raw socket (slow) with a buffered transport of some kind
- let t = Box::new(TFramedTransport::new(t)) as Box<TTransport>;
+ // clone the TCP channel into two halves, one which
+ // we'll use for reading, the other for writing
+ let (i_chan, o_chan) = c.split()?;
- // refcounted again because it's shared by both input and output protocols
- let t = Rc::new(RefCell::new(t));
+ // wrap the raw sockets (slow) with a buffered transport of some kind
+ let i_tran = TFramedReadTransport::new(i_chan);
+ let o_tran = TFramedWriteTransport::new(o_chan);
// now create the protocol implementations
- let i_prot = Box::new(TCompactInputProtocol::new(t.clone())) as Box<TInputProtocol>;
- let o_prot = Box::new(TCompactOutputProtocol::new(t.clone())) as Box<TOutputProtocol>;
+ let i_prot = TCompactInputProtocol::new(i_tran);
+ let o_prot = TCompactOutputProtocol::new(o_tran);
// we're done!
Ok(CalculatorSyncClient::new(i_prot, o_prot))
diff --git a/tutorial/rs/src/bin/tutorial_server.rs b/tutorial/rs/src/bin/tutorial_server.rs
index 9cc186649..8db8eed26 100644
--- a/tutorial/rs/src/bin/tutorial_server.rs
+++ b/tutorial/rs/src/bin/tutorial_server.rs
@@ -24,12 +24,12 @@ extern crate thrift_tutorial;
use std::collections::HashMap;
use std::convert::{From, Into};
use std::default::Default;
+use std::sync::Mutex;
-use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
use thrift::protocol::{TCompactInputProtocolFactory, TCompactOutputProtocolFactory};
-use thrift::server::TSimpleServer;
+use thrift::server::TServer;
-use thrift::transport::{TFramedTransportFactory, TTransportFactory};
+use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory};
use thrift_tutorial::shared::{SharedServiceSyncHandler, SharedStruct};
use thrift_tutorial::tutorial::{CalculatorSyncHandler, CalculatorSyncProcessor};
use thrift_tutorial::tutorial::{InvalidOperation, Operation, Work};
@@ -58,33 +58,36 @@ fn run() -> thrift::Result<()> {
println!("binding to {}", listen_address);
- let i_tran_fact: Box<TTransportFactory> = Box::new(TFramedTransportFactory::new());
- let i_prot_fact: Box<TInputProtocolFactory> = Box::new(TCompactInputProtocolFactory::new());
+ let i_tran_fact = TFramedReadTransportFactory::new();
+ let i_prot_fact = TCompactInputProtocolFactory::new();
- let o_tran_fact: Box<TTransportFactory> = Box::new(TFramedTransportFactory::new());
- let o_prot_fact: Box<TOutputProtocolFactory> = Box::new(TCompactOutputProtocolFactory::new());
+ let o_tran_fact = TFramedWriteTransportFactory::new();
+ let o_prot_fact = TCompactOutputProtocolFactory::new();
// demux incoming messages
let processor = CalculatorSyncProcessor::new(CalculatorServer { ..Default::default() });
// create the server and start listening
- let mut server = TSimpleServer::new(i_tran_fact,
- i_prot_fact,
- o_tran_fact,
- o_prot_fact,
- processor);
+ let mut server = TServer::new(
+ i_tran_fact,
+ i_prot_fact,
+ o_tran_fact,
+ o_prot_fact,
+ processor,
+ 10,
+ );
server.listen(&listen_address)
}
/// Handles incoming Calculator service calls.
struct CalculatorServer {
- log: HashMap<i32, SharedStruct>,
+ log: Mutex<HashMap<i32, SharedStruct>>,
}
impl Default for CalculatorServer {
fn default() -> CalculatorServer {
- CalculatorServer { log: HashMap::new() }
+ CalculatorServer { log: Mutex::new(HashMap::new()) }
}
}
@@ -94,9 +97,9 @@ impl Default for CalculatorServer {
// SharedService handler
impl SharedServiceSyncHandler for CalculatorServer {
- fn handle_get_struct(&mut self, key: i32) -> thrift::Result<SharedStruct> {
- self.log
- .get(&key)
+ fn handle_get_struct(&self, key: i32) -> thrift::Result<SharedStruct> {
+ let log = self.log.lock().unwrap();
+ log.get(&key)
.cloned()
.ok_or_else(|| format!("could not find log for key {}", key).into())
}
@@ -104,25 +107,27 @@ impl SharedServiceSyncHandler for CalculatorServer {
// Calculator handler
impl CalculatorSyncHandler for CalculatorServer {
- fn handle_ping(&mut self) -> thrift::Result<()> {
+ fn handle_ping(&self) -> thrift::Result<()> {
println!("pong!");
Ok(())
}
- fn handle_add(&mut self, num1: i32, num2: i32) -> thrift::Result<i32> {
+ fn handle_add(&self, num1: i32, num2: i32) -> thrift::Result<i32> {
println!("handling add: n1:{} n2:{}", num1, num2);
Ok(num1 + num2)
}
- fn handle_calculate(&mut self, logid: i32, w: Work) -> thrift::Result<i32> {
+ fn handle_calculate(&self, logid: i32, w: Work) -> thrift::Result<i32> {
println!("handling calculate: l:{}, w:{:?}", logid, w);
let res = if let Some(ref op) = w.op {
if w.num1.is_none() || w.num2.is_none() {
- Err(InvalidOperation {
- what_op: Some(*op as i32),
- why: Some("no operands specified".to_owned()),
- })
+ Err(
+ InvalidOperation {
+ what_op: Some(*op as i32),
+ why: Some("no operands specified".to_owned()),
+ },
+ )
} else {
// so that I don't have to call unwrap() multiple times below
let num1 = w.num1.as_ref().expect("operands checked");
@@ -134,10 +139,12 @@ impl CalculatorSyncHandler for CalculatorServer {
Operation::MULTIPLY => Ok(num1 * num2),
Operation::DIVIDE => {
if *num2 == 0 {
- Err(InvalidOperation {
- what_op: Some(*op as i32),
- why: Some("divide by 0".to_owned()),
- })
+ Err(
+ InvalidOperation {
+ what_op: Some(*op as i32),
+ why: Some("divide by 0".to_owned()),
+ },
+ )
} else {
Ok(num1 / num2)
}
@@ -145,12 +152,13 @@ impl CalculatorSyncHandler for CalculatorServer {
}
}
} else {
- Err(InvalidOperation::new(None, "no operation specified".to_owned()))
+ Err(InvalidOperation::new(None, "no operation specified".to_owned()),)
};
// if the operation was successful log it
if let Ok(ref v) = res {
- self.log.insert(logid, SharedStruct::new(logid, format!("{}", v)));
+ let mut log = self.log.lock().unwrap();
+ log.insert(logid, SharedStruct::new(logid, format!("{}", v)));
}
// the try! macro automatically maps errors
@@ -161,7 +169,7 @@ impl CalculatorSyncHandler for CalculatorServer {
res.map_err(From::from)
}
- fn handle_zip(&mut self) -> thrift::Result<()> {
+ fn handle_zip(&self) -> thrift::Result<()> {
println!("handling zip");
Ok(())
}