1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
|
require File.dirname(__FILE__) + '/spec_helper'
require 'thrift/server/nonblockingserver'
require File.dirname(__FILE__) + '/gen-rb/NonblockingService'
class ThriftNonblockingServerSpec < Spec::ExampleGroup
include Thrift
include SpecNamespace
class Handler
def initialize
@queue = Queue.new
end
attr_accessor :server
def greeting(english)
if english
SpecNamespace::Hello.new
else
SpecNamespace::Hello.new(:greeting => "Aloha!")
end
end
def block
@queue.pop
end
def unblock(n)
n.times { @queue.push true }
end
def sleep(time)
Kernel.sleep time
end
def shutdown
@server.shutdown(0, false)
end
end
class SpecTransport < Transport
def initialize(transport, queue)
@transport = transport
@queue = queue
@flushed = false
end
def open?
@transport.open?
end
def open
@transport.open
end
def close
@transport.close
end
def read(sz)
@transport.read(sz)
end
def write(buf,sz=nil)
@transport.write(buf, sz)
end
def flush
@queue.push :flushed unless @flushed or @queue.nil?
@flushed = true
@transport.flush
end
end
class SpecServerSocket < ServerSocket
def initialize(host, port, queue)
super(host, port)
@queue = queue
end
def listen
super
@queue.push :listen
end
end
describe Thrift::NonblockingServer do
before(:each) do
@port = 43251
handler = Handler.new
processor = NonblockingService::Processor.new(handler)
queue = Queue.new
@transport = SpecServerSocket.new('localhost', @port, queue)
transportFactory = FramedTransportFactory.new
logger = Logger.new(STDERR)
logger.level = Logger::WARN
@server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
handler.server = @server
@server_thread = Thread.new(Thread.current) do |master_thread|
begin
@server.serve
rescue => e
p e
puts e.backtrace * "\n"
master_thread.raise e
end
end
queue.pop
@clients = []
@catch_exceptions = false
end
after(:each) do
@clients.each { |client, trans| trans.close }
# @server.shutdown(1)
@server_thread.kill
@transport.close
end
def setup_client(queue = nil)
transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
protocol = BinaryProtocol.new(transport)
client = NonblockingService::Client.new(protocol)
transport.open
@clients << [client, transport]
client
end
def setup_client_thread(result)
queue = Queue.new
Thread.new do
begin
client = setup_client
while (cmd = queue.pop)
msg, *args = cmd
case msg
when :block
result << client.block
when :unblock
client.unblock(args.first)
when :hello
result << client.greeting(true) # ignore result
when :sleep
client.sleep(args[0] || 0.5)
result << :slept
when :shutdown
client.shutdown
when :exit
result << :done
break
end
end
@clients.each { |c,t| t.close and break if c == client } #close the transport
rescue => e
raise e unless @catch_exceptions
end
end
queue
end
it "should handle basic message passing" do
client = setup_client
client.greeting(true).should == Hello.new
client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
end
it "should handle concurrent clients" do
queue = Queue.new
trans_queue = Queue.new
4.times do
Thread.new(Thread.current) do |main_thread|
begin
queue.push setup_client(trans_queue).block
rescue => e
main_thread.raise e
end
end
end
4.times { trans_queue.pop }
setup_client.unblock(4)
4.times { queue.pop.should be_true }
end
it "should handle messages from more than 5 long-lived connections" do
queues = []
result = Queue.new
7.times do |i|
queues << setup_client_thread(result)
Thread.pass if i == 4 # give the server time to accept connections
end
client = setup_client
# block 4 connections
4.times { |i| queues[i] << :block }
queues[4] << :hello
queues[5] << :hello
queues[6] << :hello
3.times { result.pop.should == Hello.new }
client.greeting(true).should == Hello.new
queues[5] << [:unblock, 4]
4.times { result.pop.should be_true }
queues[2] << :hello
result.pop.should == Hello.new
client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
7.times { queues.shift << :exit }
client.greeting(true).should == Hello.new
end
it "should shut down when asked" do
# connect first to ensure it's running
client = setup_client
client.greeting(false) # force a message pass
@server.shutdown
@server_thread.join(2).should be_an_instance_of(Thread)
end
it "should continue processing active messages when shutting down" do
result = Queue.new
client = setup_client_thread(result)
client << :sleep
sleep 0.1 # give the server time to start processing the client's message
@server.shutdown
@server_thread.join(2).should be_an_instance_of(Thread)
result.pop.should == :slept
end
it "should kill active messages when they don't expire while shutting down" do
result = Queue.new
client = setup_client_thread(result)
client << [:sleep, 10]
sleep 0.1 # start processing the client's message
@server.shutdown(1)
@catch_exceptions = true
@server_thread.join(3).should_not be_nil
result.should be_empty
end
it "should allow shutting down in response to a message" do
client = setup_client
client.greeting(true).should == Hello.new
client.shutdown
@server_thread.join(2).should_not be_nil
end
end
end
|