summaryrefslogtreecommitdiff
path: root/distbuild/connection_machine.py
diff options
context:
space:
mode:
Diffstat (limited to 'distbuild/connection_machine.py')
-rw-r--r--distbuild/connection_machine.py145
1 files changed, 145 insertions, 0 deletions
diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py
new file mode 100644
index 00000000..3d4e8d04
--- /dev/null
+++ b/distbuild/connection_machine.py
@@ -0,0 +1,145 @@
+# distbuild/connection_machine.py -- state machine for connecting to server
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import errno
+import logging
+import socket
+
+import distbuild
+
+
+class Reconnect(object):
+
+ pass
+
+
+class StopConnecting(object):
+
+ pass
+
+
+class ConnectError(object):
+
+ pass
+
+
+class ProxyEventSource(object):
+
+ '''Proxy event sources that may come and go.'''
+
+ def __init__(self):
+ self.event_source = None
+
+ def get_select_params(self):
+ if self.event_source:
+ return self.event_source.get_select_params()
+ else:
+ return [], [], [], None
+
+ def get_events(self, r, w, x):
+ if self.event_source:
+ return self.event_source.get_events(r, w, x)
+ else:
+ return []
+
+ def is_finished(self):
+ return False
+
+
+class ConnectionMachine(distbuild.StateMachine):
+
+ def __init__(self, addr, port, machine, extra_args):
+ distbuild.StateMachine.__init__(self, 'connecting')
+ self._addr = addr
+ self._port = port
+ self._machine = machine
+ self._extra_args = extra_args
+ self._socket = None
+ self.reconnect_interval = 1
+
+ def setup(self):
+ self._sock_proxy = ProxyEventSource()
+ self.mainloop.add_event_source(self._sock_proxy)
+ self._start_connect()
+
+ self._timer = distbuild.TimerEventSource(self.reconnect_interval)
+ self.mainloop.add_event_source(self._timer)
+
+ spec = [
+ ('connecting', self._sock_proxy, distbuild.SocketWriteable,
+ 'connected', self._connect),
+ ('connecting', self, StopConnecting, None, self._stop),
+ ('connected', self, Reconnect, 'connecting', self._reconnect),
+ ('connected', self, ConnectError, 'timeout', self._start_timer),
+ ('connected', self, StopConnecting, None, self._stop),
+ ('timeout', self._timer, distbuild.Timer, 'connecting',
+ self._reconnect),
+ ('timeout', self, StopConnecting, None, self._stop),
+ ]
+ self.add_transitions(spec)
+
+ def _start_connect(self):
+ logging.debug(
+ 'ConnectionMachine: connecting to %s:%s' %
+ (self._addr, self._port))
+ self._socket = socket.socket()
+ distbuild.set_nonblocking(self._socket)
+ try:
+ self._socket.connect((self._addr, self._port))
+ except socket.error, e:
+ if e.errno != errno.EINPROGRESS:
+ raise socket.error(
+ "%s (attempting connection to distbuild controller "
+ "at %s:%s)" % (e, self._addr, self._port))
+
+ src = distbuild.SocketEventSource(self._socket)
+ self._sock_proxy.event_source = src
+
+ def _connect(self, event_source, event):
+ try:
+ self._socket.connect((self._addr, self._port))
+ except socket.error, e:
+ logging.error(
+ 'Failed to connect to %s:%s: %s' %
+ (self._addr, self._port, str(e)))
+ self.mainloop.queue_event(self, ConnectError())
+ return
+ self._sock_proxy.event_source = None
+ logging.info('Connected to %s:%s' % (self._addr, self._port))
+ m = self._machine(self, self._socket, *self._extra_args)
+ self.mainloop.add_state_machine(m)
+ self._socket = None
+
+ def _reconnect(self, event_source, event):
+ logging.info('Reconnecting to %s:%s' % (self._addr, self._port))
+ if self._socket is not None:
+ self._socket.close()
+ self._timer.stop()
+ self._start_connect()
+
+ def _stop(self, event_source, event):
+ logging.info(
+ 'Stopping connection attempts to %s:%s' % (self._addr, self._port))
+ self.mainloop.remove_event_source(self._timer)
+ if self._socket is not None:
+ self._socket.close()
+ self._socket = None
+
+ def _start_timer(self, event_source, event):
+ self._timer.start()
+