summaryrefslogtreecommitdiff
path: root/distbuild/jm.py
blob: 5592d5bddc84edf0a805238b6f29ccdb9c75086b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# mainloop/jm.py -- state machine for JSON communication between nodes
#
# Copyright (C) 2012, 2014  Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..


import json
import logging
import yaml

from distbuild.sm import StateMachine 
from distbuild.stringbuffer import StringBuffer
from distbuild.sockbuf import (SocketBuffer, SocketBufferNewData,
                               SocketBufferEof)


class JsonNewMessage(object):

    def __init__(self, msg):
        self.msg = msg

        
class JsonEof(object):

    pass
    
    
class _Close2(object):

    pass


class JsonMachine(StateMachine):

    '''A state machine for sending/receiving JSON messages across TCP.'''

    max_buffer = 16 * 1024

    def __init__(self, conn):
        StateMachine.__init__(self, 'rw')
        self.conn = conn
        self.debug_json = False

    def __repr__(self):
        return '<JsonMachine at 0x%x: socket %s, max_buffer %s>' % \
            (id(self), self.conn, self.max_buffer)

    def setup(self):
        sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer)
        self.mainloop.add_state_machine(sockbuf)

        self._eof = False
        self.receive_buf = StringBuffer()

        spec = [
            # state, source, event_class, new_state, callback
            ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse),
            ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof),
            ('rw', self, _Close2, None, self._really_close),
            
            ('w', self, _Close2, None, self._really_close),
        ]
        self.add_transitions(spec)
        
    def send(self, msg):
        '''Send a message to the other side.'''
        if self.debug_json:
            logging.debug('JsonMachine: Sending message %s' % repr(msg))
        s = json.dumps(yaml.safe_dump(msg))
        if self.debug_json:
            logging.debug('JsonMachine: As %s' % repr(s))
        self.sockbuf.write('%s\n' % s)
    
    def close(self):
        '''Tell state machine it should shut down.
        
        The state machine will vanish once it has flushed any pending
        writes.
        
        '''
        
        self.mainloop.queue_event(self, _Close2())
        
    def _parse(self, event_source, event):
        data = event.data
        self.receive_buf.add(data)
        if self.debug_json:
            logging.debug('JsonMachine: Received: %s' % repr(data))
        while True:
            line = self.receive_buf.readline()
            if line is None:
                break
            line = line.rstrip()
            if self.debug_json:
                logging.debug('JsonMachine: line: %s' % repr(line))
            msg = yaml.load(json.loads(line))
            self.mainloop.queue_event(self, JsonNewMessage(msg))

    def _send_eof(self, event_source, event):
        self.mainloop.queue_event(self, JsonEof())

    def _really_close(self, event_source, event):
        self.sockbuf.close()
        self._send_eof(event_source, event)