summaryrefslogtreecommitdiff
path: root/lib/rb/spec/nonblockingserver_spec.rb
blob: 0eed1dd93f7d8c63709cfcb8310e153d3af95b23 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
require File.dirname(__FILE__) + '/spec_helper'
require 'thrift/server/nonblockingserver'
require File.dirname(__FILE__) + '/gen-rb/NonblockingService'

class ThriftNonblockingServerSpec < Spec::ExampleGroup
  include Thrift
  include SpecNamespace

  class Handler
    def initialize
      @queue = Queue.new
    end

    attr_accessor :server

    def greeting(english)
      if english
        SpecNamespace::Hello.new
      else
        SpecNamespace::Hello.new(:greeting => "Aloha!")
      end
    end

    def block
      @queue.pop
    end

    def unblock(n)
      n.times { @queue.push true }
    end

    def sleep(time)
      Kernel.sleep time
    end

    def shutdown
      @server.shutdown(0, false)
    end
  end

  class SpecTransport < Transport
    def initialize(transport, queue)
      @transport = transport
      @queue = queue
      @flushed = false
    end

    def open?
      @transport.open?
    end

    def open
      @transport.open
    end

    def close
      @transport.close
    end

    def read(sz)
      @transport.read(sz)
    end

    def write(buf,sz=nil)
      @transport.write(buf, sz)
    end

    def flush
      @queue.push :flushed unless @flushed or @queue.nil?
      @flushed = true
      @transport.flush
    end
  end

  class SpecServerSocket < ServerSocket
    def initialize(host, port, queue)
      super(host, port)
      @queue = queue
    end

    def listen
      super
      @queue.push :listen
    end
  end

  describe Thrift::NonblockingServer do
    before(:each) do
      @port = 43251
      handler = Handler.new
      processor = NonblockingService::Processor.new(handler)
      queue = Queue.new
      @transport = SpecServerSocket.new('localhost', @port, queue)
      transportFactory = FramedTransportFactory.new
      logger = Logger.new(STDERR)
      logger.level = Logger::WARN
      @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
      handler.server = @server
      @server_thread = Thread.new(Thread.current) do |master_thread|
        begin
          @server.serve
        rescue => e
          p e
          puts e.backtrace * "\n"
          master_thread.raise e
        end
      end
      queue.pop

      @clients = []
      @catch_exceptions = false
    end

    after(:each) do
      @clients.each { |client, trans| trans.close }
      # @server.shutdown(1)
      @server_thread.kill
      @transport.close
    end

    def setup_client(queue = nil)
      transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
      protocol = BinaryProtocol.new(transport)
      client = NonblockingService::Client.new(protocol)
      transport.open
      @clients << [client, transport]
      client
    end

    def setup_client_thread(result)
      queue = Queue.new
      Thread.new do
        begin
          client = setup_client
          while (cmd = queue.pop)
            msg, *args = cmd
            case msg
            when :block
              result << client.block
            when :unblock
              client.unblock(args.first)
            when :hello
              result << client.greeting(true) # ignore result
            when :sleep
              client.sleep(args[0] || 0.5)
              result << :slept
            when :shutdown
              client.shutdown
            when :exit
              result << :done
              break
            end
          end
          @clients.each { |c,t| t.close and break if c == client } #close the transport
        rescue => e
          raise e unless @catch_exceptions
        end
      end
      queue
    end

    it "should handle basic message passing" do
      client = setup_client
      client.greeting(true).should == Hello.new
      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
    end

    it "should handle concurrent clients" do
      queue = Queue.new
      trans_queue = Queue.new
      4.times do
        Thread.new(Thread.current) do |main_thread|
          begin
            queue.push setup_client(trans_queue).block
          rescue => e
            main_thread.raise e
          end
        end
      end
      4.times { trans_queue.pop }
      setup_client.unblock(4)
      4.times { queue.pop.should be_true }
    end

    it "should handle messages from more than 5 long-lived connections" do
      queues = []
      result = Queue.new
      7.times do |i|
        queues << setup_client_thread(result)
        Thread.pass if i == 4 # give the server time to accept connections
      end
      client = setup_client
      # block 4 connections
      4.times { |i| queues[i] << :block }
      queues[4] << :hello
      queues[5] << :hello
      queues[6] << :hello
      3.times { result.pop.should == Hello.new }
      client.greeting(true).should == Hello.new
      queues[5] << [:unblock, 4]
      4.times { result.pop.should be_true }
      queues[2] << :hello
      result.pop.should == Hello.new
      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
      7.times { queues.shift << :exit }
      client.greeting(true).should == Hello.new
    end

    it "should shut down when asked" do
      # connect first to ensure it's running
      client = setup_client
      client.greeting(false) # force a message pass
      @server.shutdown
      @server_thread.join(2).should be_an_instance_of(Thread)
    end

    it "should continue processing active messages when shutting down" do
      result = Queue.new
      client = setup_client_thread(result)
      client << :sleep
      sleep 0.1 # give the server time to start processing the client's message
      @server.shutdown
      @server_thread.join(2).should be_an_instance_of(Thread)
      result.pop.should == :slept
    end

    it "should kill active messages when they don't expire while shutting down" do
      result = Queue.new
      client = setup_client_thread(result)
      client << [:sleep, 10]
      sleep 0.1 # start processing the client's message
      @server.shutdown(1)
      @catch_exceptions = true
      @server_thread.join(3).should_not be_nil
      result.should be_empty
    end

    it "should allow shutting down in response to a message" do
      client = setup_client
      client.greeting(true).should == Hello.new
      client.shutdown
      @server_thread.join(2).should_not be_nil
    end
  end
end