1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# mainloop/jm.py -- state machine for JSON communication between nodes
#
# Copyright (C) 2012, 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
import json
import logging
import yaml
from distbuild.sm import StateMachine
from distbuild.stringbuffer import StringBuffer
from distbuild.sockbuf import (SocketBuffer, SocketBufferNewData,
SocketBufferEof)
class JsonNewMessage(object):
def __init__(self, msg):
self.msg = msg
class JsonEof(object):
pass
class _Close2(object):
pass
class JsonMachine(StateMachine):
'''A state machine for sending/receiving JSON messages across TCP.'''
max_buffer = 16 * 1024
def __init__(self, conn):
StateMachine.__init__(self, 'rw')
self.conn = conn
self.debug_json = False
def __repr__(self):
return '<JsonMachine at 0x%x: socket %s, max_buffer %s>' % \
(id(self), self.conn, self.max_buffer)
def setup(self):
sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer)
self.mainloop.add_state_machine(sockbuf)
self._eof = False
self.receive_buf = StringBuffer()
spec = [
# state, source, event_class, new_state, callback
('rw', sockbuf, SocketBufferNewData, 'rw', self._parse),
('rw', sockbuf, SocketBufferEof, 'w', self._send_eof),
('rw', self, _Close2, None, self._really_close),
('w', self, _Close2, None, self._really_close),
]
self.add_transitions(spec)
def send(self, msg):
'''Send a message to the other side.'''
if self.debug_json:
logging.debug('JsonMachine: Sending message %s' % repr(msg))
s = json.dumps(yaml.safe_dump(msg))
if self.debug_json:
logging.debug('JsonMachine: As %s' % repr(s))
self.sockbuf.write('%s\n' % s)
def close(self):
'''Tell state machine it should shut down.
The state machine will vanish once it has flushed any pending
writes.
'''
self.mainloop.queue_event(self, _Close2())
def _parse(self, event_source, event):
data = event.data
self.receive_buf.add(data)
if self.debug_json:
logging.debug('JsonMachine: Received: %s' % repr(data))
while True:
line = self.receive_buf.readline()
if line is None:
break
line = line.rstrip()
if self.debug_json:
logging.debug('JsonMachine: line: %s' % repr(line))
msg = None
try:
msg = yaml.safe_load(json.loads(line))
except Exception:
logging.error('Invalid input: %s' % line)
if msg:
self.mainloop.queue_event(self, JsonNewMessage(msg))
def _send_eof(self, event_source, event):
self.mainloop.queue_event(self, JsonEof())
def _really_close(self, event_source, event):
self.sockbuf.close()
self._send_eof(event_source, event)
|