summaryrefslogtreecommitdiff
path: root/example/stream.rb
blob: a72f5b980721d31992d4b5685e340d197f1bcbc9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
require 'msgpack'

class Server
	def initialize(sock)
		@sock = sock
		@pk = MessagePack::Unpacker.new
		@buffer = ''
		@nread = 0
	end

	def run
		while true
			begin
				data = @sock.sysread(1024)
			rescue
				puts "connection closed (#{$!})"
				return
			end
			receive_data(data)
		end
	end

	private
	def receive_data(data)
		@buffer << data

		while true
			@nread = @pk.execute(@buffer, @nread)

			if @pk.finished?
				msg = @pk.data
				process_message(msg)

				@pk.reset
				@buffer.slice!(0, @nread)
				@nread = 0

				next unless @buffer.empty?
			end

			break
		end

		if @buffer.length > 10*1024*1024
			raise "message is too large"
		end

	rescue
		puts "error while processing client packet: #{$!}"
	end

	def process_message(msg)
		puts "message reached: #{msg.inspect}"
	end
end


rpipe, wpipe = IO.pipe

# run server thread
thread = Thread.new(Server.new(rpipe)) {|srv|
	srv.run
}

# client thread:
wpipe.write ["put", "apple", "red"].to_msgpack
wpipe.write ["put", "lemon", "yellow"].to_msgpack
wpipe.write ["get", "apple"].to_msgpack
wpipe.close

thread.join