summaryrefslogtreecommitdiff
path: root/distbuild/jm.py
diff options
context:
space:
mode:
Diffstat (limited to 'distbuild/jm.py')
-rw-r--r--distbuild/jm.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/distbuild/jm.py b/distbuild/jm.py
new file mode 100644
index 00000000..ae222c00
--- /dev/null
+++ b/distbuild/jm.py
@@ -0,0 +1,98 @@
+# mainloop/jm.py -- state machine for JSON communication between nodes
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+import fcntl
+import json
+import logging
+import os
+import socket
+import sys
+
+from sm import StateMachine
+from stringbuffer import StringBuffer
+from sockbuf import (SocketBuffer, SocketBufferNewData,
+ SocketBufferEof, SocketError)
+
+
+class JsonNewMessage(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class JsonEof(object):
+
+ pass
+
+
+class _Close2(object):
+
+ pass
+
+
+class JsonMachine(StateMachine):
+
+ '''A state machine for sending/receiving JSON messages across TCP.'''
+
+ max_buffer = 16 * 1024
+
+ def __init__(self, conn):
+ StateMachine.__init__(self, 'rw')
+ self.conn = conn
+ self.debug_json = False
+
+ def setup(self):
+ sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer)
+ self.mainloop.add_state_machine(sockbuf)
+
+ self._eof = False
+ self.receive_buf = StringBuffer()
+
+ spec = [
+ ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse),
+ ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof),
+ ('rw', self, _Close2, None, self._really_close),
+
+ ('w', self, _Close2, None, self._really_close),
+ ]
+ self.add_transitions(spec)
+
+ def send(self, msg):
+ '''Send a message to the other side.'''
+ self.sockbuf.write('%s\n' % json.dumps(msg))
+
+ def close(self):
+ '''Tell state machine it should shut down.
+
+ The state machine will vanish once it has flushed any pending
+ writes.
+
+ '''
+
+ self.mainloop.queue_event(self, _Close2())
+
+ def _parse(self, event_source, event):
+ data = event.data
+ self.receive_buf.add(data)
+ if self.debug_json:
+ logging.debug('JsonMachine: Received: %s' % repr(data))
+ while True:
+ line = self.receive_buf.readline()
+ if line is None:
+ break
+ line = line.rstrip()
+ if self.debug_json:
+ logging.debug('JsonMachine: line: %s' % repr(line))
+ msg = json.loads(line)
+ self.mainloop.queue_event(self, JsonNewMessage(msg))
+
+ def _send_eof(self, event_source, event):
+ self.mainloop.queue_event(self, JsonEof())
+
+ def _really_close(self, event_source, event):
+ self.sockbuf.close()
+ self._send_eof(event_source, event)
+