summaryrefslogtreecommitdiff
path: root/distbuild/sockbuf.py
blob: c2cc8e2afa520249f0d1322a12f2002085ef7582 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# mainloop/sockbuf.py -- a buffering, non-blocking socket I/O state machine
#
# Copyright 2012 Codethink Limited
# All rights reserved.


import logging


'''A buffering, non-blocking I/O state machine for sockets.

The state machine is given an open socket. It reads from the socket,
and writes to it, when it can do so without blocking. A maximum size
for the read buffer can be set: the state machine will stop reading
if the buffer becomes full. This avoids the problem of an excessively
large buffer.

The state machine generates events to indicate that the buffer contains
data or that the end of the file for reading has been reached. An event
is also generated if there is an error while doing I/O with the socket.

* SocketError: an error has occurred
* SocketBufferNewData: socket buffer has received new data; the data
  is available as the ``data`` attribute
* SocketBufferEof: socket buffer has reached EOF for reading, but
  still writes anything in the write buffer (or anything that gets added
  to the write buffer)
* SocketBufferClosed: socket is now closed

The state machine starts shutting down when ``close`` method is called,
but continues to operate in write-only mode until the write buffer has
been emptied.

'''


from socketsrc import (SocketError, SocketReadable, SocketWriteable,
                       SocketEventSource)
from sm import StateMachine
from stringbuffer import StringBuffer


class SocketBufferNewData(object):

    '''Socket buffer has received new data.'''

    def __init__(self, data):
        self.data = data


class SocketBufferEof(object):

    '''Socket buffer has reached end of file when reading.
    
    Note that the socket buffer may still be available for writing.
    However, no more new data will be read.
    
    '''


class SocketBufferClosed(object):

    '''Socket buffer has closed its socket.'''

        
class _Close(object): pass
class _WriteBufferIsEmpty(object): pass
class _WriteBufferNotEmpty(object): pass

        
        
class SocketBuffer(StateMachine):

    def __init__(self, sock, max_buffer):
        StateMachine.__init__(self, 'reading')

        self._sock = sock
        self._max_buffer = max_buffer

    def __repr__(self):
        return '<SocketBuffer at 0x%x: socket %s max_buffer %i>' % (
                id(self), self._sock, self._max_buffer)

    def setup(self):        
        src = self._src = SocketEventSource(self._sock)
        src.stop_writing() # We'll start writing when we need to.
        self.mainloop.add_event_source(src)

        self._wbuf = StringBuffer()

        spec = [
            # state, source, event_class, new_state, callback
            ('reading', src, SocketReadable, 'reading', self._fill),
            ('reading', self, _WriteBufferNotEmpty, 'rw', 
                self._start_writing),
            ('reading', self, SocketBufferEof, 'idle', None),
            ('reading', self, _Close, None, self._really_close),
            
            ('rw', src, SocketReadable, 'rw', self._fill),
            ('rw', src, SocketWriteable, 'rw', self._flush),
            ('rw', self, _WriteBufferIsEmpty, 'reading', self._stop_writing),
            ('rw', self, SocketBufferEof, 'w', None),
            ('rw', self, _Close, 'wc', None),
            
            ('idle', self, _WriteBufferNotEmpty, 'w', self._start_writing),
            ('idle', self, _Close, None, self._really_close),
            
            ('w', src, SocketWriteable, 'w', self._flush),
            ('w', self, _WriteBufferIsEmpty, 'idle', self._stop_writing),

            ('wc', src, SocketWriteable, 'wc', self._flush),
            ('wc', self, _WriteBufferIsEmpty, None, self._really_close),
        ]
        self.add_transitions(spec)

    def write(self, data):
        '''Put data into write queue.'''
        
        was_empty = len(self._wbuf) == 0
        self._wbuf.add(data)
        if was_empty and len(self._wbuf) > 0:
            self._start_writing(None, None)
            self.mainloop.queue_event(self, _WriteBufferNotEmpty())

    def close(self):
        '''Tell state machine to terminate.'''
        self.mainloop.queue_event(self, _Close())

    def _report_error(self, event_source, event):
        logging.error(str(event))

    def _fill(self, event_source, event):
        try:
            data = event.sock.read(self._max_buffer)
        except (IOError, OSError), e:
            logging.debug(
                '%s: _fill(): Exception %s from sock.read()', self, e)
            return [SocketError(event.sock, e)]

        if data:
            self.mainloop.queue_event(self, SocketBufferNewData(data))
        else:
            event_source.stop_reading()
            self.mainloop.queue_event(self, SocketBufferEof())

    def _really_close(self, event_source, event):
        self._src.close()
        self.mainloop.queue_event(self, SocketBufferClosed())

    def _flush(self, event_source, event):
        max_write = 1024**2
        data = self._wbuf.read(max_write)
        try:
            n = event.sock.write(data)
        except (IOError, OSError), e:
            logging.debug(
                '%s: _flush(): Exception %s from sock.write()', self, e)
            return [SocketError(event.sock, e)]
        self._wbuf.remove(n)
        if len(self._wbuf) == 0:
            self.mainloop.queue_event(self, _WriteBufferIsEmpty())

    def _start_writing(self, event_source, event):
        self._src.start_writing()

    def _stop_writing(self, event_source, event):
        self._src.stop_writing()