diff options
author | Dustin J. Mitchell <dustin@mozilla.com> | 2014-01-08 14:57:45 -0500 |
---|---|---|
committer | Dustin J. Mitchell <dustin@mozilla.com> | 2014-01-08 14:58:02 -0500 |
commit | 7e75070f6012edda1e17dac9d8c725169da5fc06 (patch) | |
tree | 28e581c6ca118f7aaa37b18c849830d45e433ba1 | |
parent | 89ef0d2d0d0f6577d4c5d56880f134638024f93e (diff) | |
download | py-amqp-7e75070f6012edda1e17dac9d8c725169da5fc06.tar.gz |
Revamp heartbeat handling
Be conservative in heartbeat behavior: send at the minimum interval
negotiated by the server and the client, and do not artificially abort a
connection if we do not receive any heartbeats.
-rw-r--r-- | amqp/connection.py | 48 |
1 files changed, 29 insertions, 19 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index 9474ab2..aefa5c3 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -18,6 +18,7 @@ from __future__ import absolute_import import logging import socket +import time from array import array try: @@ -82,7 +83,7 @@ class Connection(AbstractChannel): prev_sent = None prev_recv = None - missed_heartbeats = 0 + next_heartbeat = None def __init__(self, host='localhost', userid='guest', password='guest', login_method='AMQPLAIN', login_response=None, @@ -843,7 +844,11 @@ class Connection(AbstractChannel): self.channel_max = args.read_short() or self.channel_max self.frame_max = args.read_long() or self.frame_max self.method_writer.frame_max = self.frame_max - heartbeat = args.read_short() # noqa + heartbeat = args.read_short() + if heartbeat > 0 and self.heartbeat > 0: + self.heartbeat = min(self.heartbeat, heartbeat) + else: + self.heartbeat = heartbeat or self.heartbeat self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat) @@ -851,29 +856,34 @@ class Connection(AbstractChannel): self.transport.write_frame(8, 0, bytes()) def heartbeat_tick(self, rate=2): - """Verify that hartbeats are sent and received. + """Send heartbeat packets, if necessary. This should be called + frequently, on the order of once per second. - :keyword rate: Rate is how often the tick is called - compared to the actual heartbeat value. E.g. if - the heartbeat is set to 3 seconds, and the tick - is called every 3 / 2 seconds, then the rate is 2. + Heartbeats are underspecified in amqp-0.8, but comments in the 0.9 spec + suggest that the server may expect heartbeat frames at the rate + specified in the tuning negotiation, and fail the connection if such + frames are not received. - """ - sent_now = self.method_writer.bytes_sent - recv_now = self.method_reader.bytes_recv + On the client side, this method sends heartbeats, but does not measure + incoming traffic. This provides network traffic for transport-layer + protocols to use to discover connection failures. - if self.prev_sent is not None and self.prev_sent == sent_now: - self.send_heartbeat() + As the safest course, then, this method sends heartbeat packets at the + minimum of the server's and client's heartbeat rates. - if self.prev_recv is not None and self.prev_recv == recv_now: - self.missed_heartbeats += 1 - else: - self.missed_heartbeats = 0 + :keyword rate: Ignored + """ + if not self.heartbeat: + return - self.prev_sent, self.prev_recv = sent_now, recv_now + if self.next_heartbeat is not None and time.time() > self.next_heartbeat: + self.send_heartbeat() + self.prev_sent = None - if self.missed_heartbeats >= rate: - raise ConnectionForced('Too many heartbeats missed') + sent_now = self.method_writer.bytes_sent + if self.prev_sent is None or self.prev_sent != sent_now: + self.next_heartbeat = time.time() + self.heartbeat + self.prev_sent = sent_now def _x_tune_ok(self, channel_max, frame_max, heartbeat): """Negotiate connection tuning parameters |