diff options
Diffstat (limited to 'amqp/connection.py')
-rw-r--r-- | amqp/connection.py | 69 |
1 files changed, 50 insertions, 19 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index 8808a58..1d4980f 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -34,7 +34,7 @@ from .exceptions import ( ConnectionForced, ConnectionError, error_for_code, RecoverableConnectionError, RecoverableChannelError, ) -from .five import items, range, values +from .five import items, range, values, monotonic from .method_framing import MethodReader, MethodWriter from .serialization import AMQPWriter from .transport import create_transport @@ -80,9 +80,26 @@ class Connection(AbstractChannel): """ Channel = Channel + #: Final heartbeat interval value (in float seconds) after negotiation + heartbeat = None + + #: Original heartbeat interval value proposed by client. + client_heartbeat = None + + #: Original heartbeat interval proposed by server. + server_heartbeat = None + + #: Time of last heartbeat sent (in monotonic time, if available). + last_heartbeat_sent = 0 + + #: Time of last heartbeat received (in monotonic time, if available). + last_heartbeat_received = 0 + + #: Number of bytes sent to socket at the last heartbeat check. prev_sent = None + + #: Number of bytes received from socket at the last heartbeat check. prev_recv = None - missed_heartbeats = 0 def __init__(self, host='localhost', userid='guest', password='guest', login_method='AMQPLAIN', login_response=None, @@ -125,7 +142,7 @@ class Connection(AbstractChannel): # Properties set in the Tune method self.channel_max = channel_max self.frame_max = frame_max - self.heartbeat = heartbeat + self.client_heartbeat = heartbeat self.confirm_publish = confirm_publish @@ -841,10 +858,18 @@ class Connection(AbstractChannel): want a heartbeat. """ + client_heartbeat = self.client_heartbeat or 0 self.channel_max = args.read_short() or self.channel_max self.frame_max = args.read_long() or self.frame_max self.method_writer.frame_max = self.frame_max - heartbeat = args.read_short() # noqa + self.server_heartbeat = args.read_short() or 0 + + # negotiate the heartbeat interval to the smaller of the + # specified values + if self.server_heartbeat == 0 or client_heartbeat == 0: + self.heartbeat = max(self.server_heartbeat, client_heartbeat) + else: + self.heartbeat = min(self.server_heartbeat, client_heartbeat) self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat) @@ -852,28 +877,34 @@ class Connection(AbstractChannel): self.transport.write_frame(8, 0, bytes()) def heartbeat_tick(self, rate=2): - """Verify that hartbeats are sent and received. - - :keyword rate: Rate is how often the tick is called - compared to the actual heartbeat value. E.g. if - the heartbeat is set to 3 seconds, and the tick - is called every 3 / 2 seconds, then the rate is 2. + """Send heartbeat packets, if necessary, and fail if none have been + received recently. This should be called frequently, on the order of + once per second. + :keyword rate: Ignored """ + if not self.heartbeat: + return + + # treat actual data exchange in either direction as a heartbeat sent_now = self.method_writer.bytes_sent recv_now = self.method_reader.bytes_recv + if self.prev_sent is None or self.prev_sent != sent_now: + self.last_heartbeat_sent = monotonic() + if self.prev_recv is None or self.prev_recv != recv_now: + self.last_heartbeat_received = monotonic() + self.prev_sent, self.prev_recv = sent_now, recv_now - if self.prev_sent is not None and self.prev_sent == sent_now: + # send a heartbeat if it's time to do so + if monotonic() > self.last_heartbeat_sent + self.heartbeat: self.send_heartbeat() + self.last_heartbeat_sent = monotonic() - if self.prev_recv is not None and self.prev_recv == recv_now: - self.missed_heartbeats += 1 - else: - self.missed_heartbeats = 0 - - self.prev_sent, self.prev_recv = sent_now, recv_now - - if self.missed_heartbeats >= rate: + # if we've missed two intervals' heartbeats, fail; this gives the + # server enough time to send heartbeats a little late + if (self.last_heartbeat_received and + self.last_heartbeat_received + 2 * + self.heartbeat < monotonic()): raise ConnectionForced('Too many heartbeats missed') def _x_tune_ok(self, channel_max, frame_max, heartbeat): |