summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDustin J. Mitchell <dustin@mozilla.com>2014-01-10 11:22:11 -0500
committerDustin J. Mitchell <dustin@mozilla.com>2014-01-10 11:40:09 -0500
commit5ec0fa6bc12e47269941f49bbd19a81ba4109b83 (patch)
tree3224621ffe9f2d812b46a270f8cea7f4fbe0a690
parentb10a8dfb70dc9d834c47bf89495f789fde9f77e9 (diff)
downloadpy-amqp-5ec0fa6bc12e47269941f49bbd19a81ba4109b83.tar.gz
refactor heartbeats to follow rabbitmq-java-client's model
-rw-r--r--amqp/connection.py60
1 files changed, 32 insertions, 28 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index a28f36c..9ff14b2 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -82,7 +82,9 @@ class Connection(AbstractChannel):
prev_sent = None
prev_recv = None
- next_heartbeat = None
+ heartbeat_interval = None
+ last_heartbeat_sent = 0
+ last_heartbeat_received = 0
def __init__(self, host='localhost', userid='guest', password='guest',
login_method='AMQPLAIN', login_response=None,
@@ -125,7 +127,7 @@ class Connection(AbstractChannel):
# Properties set in the Tune method
self.channel_max = channel_max
self.frame_max = frame_max
- self.heartbeat = heartbeat
+ self.client_heartbeat = heartbeat
self.confirm_publish = confirm_publish
@@ -843,46 +845,48 @@ class Connection(AbstractChannel):
self.channel_max = args.read_short() or self.channel_max
self.frame_max = args.read_long() or self.frame_max
self.method_writer.frame_max = self.frame_max
- heartbeat = args.read_short()
- if heartbeat > 0 and self.heartbeat > 0:
- self.heartbeat = min(self.heartbeat, heartbeat)
+ self.server_heartbeat = args.read_short()
+
+ # negotiate the heartbeat interval to the smaller of the specified values
+ if self.server_heartbeat == 0 or self.client_heartbeat == 0:
+ self.heartbeat_interval = max(self.server_heartbeat, self.client_heartbeat)
else:
- self.heartbeat = heartbeat or self.heartbeat
+ self.heartbeat_interval = min(self.server_heartbeat, self.client_heartbeat)
- self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
+ self._x_tune_ok(self.channel_max, self.frame_max, self.client_heartbeat)
def send_heartbeat(self):
self.transport.write_frame(8, 0, bytes())
def heartbeat_tick(self, rate=2):
- """Send heartbeat packets, if necessary. This should be called
- frequently, on the order of once per second.
-
- Heartbeats are underspecified in amqp-0.8, but comments in the 0.9 spec
- suggest that the server may expect heartbeat frames at the rate
- specified in the tuning negotiation, and fail the connection if such
- frames are not received.
-
- On the client side, this method sends heartbeats, but does not measure
- incoming traffic. This provides network traffic for transport-layer
- protocols to use to discover connection failures.
-
- As the safest course, then, this method sends heartbeat packets at the
- minimum of the server's and client's heartbeat rates.
+ """Send heartbeat packets, if necessary, and fail if none have been
+ received recently. This should be called frequently, on the order of
+ once per second.
:keyword rate: Ignored
"""
- if not self.heartbeat:
+ if not self.heartbeat_interval:
return
- if self.next_heartbeat is not None and monotonic() > self.next_heartbeat:
- self.send_heartbeat()
- self.prev_sent = None
-
+ # treat actual data exchange in either direction as a heartbeat
sent_now = self.method_writer.bytes_sent
+ recv_now = self.method_reader.bytes_recv
if self.prev_sent is None or self.prev_sent != sent_now:
- self.next_heartbeat = monotonic() + self.heartbeat
- self.prev_sent = sent_now
+ self.last_heartbeat_sent = monotonic()
+ if self.prev_recv is None or self.prev_recv != recv_now:
+ self.last_heartbeat_received = monotonic()
+ self.prev_sent, self.prev_recv = sent_now, recv_now
+
+ # send a heartbeat if it's time to do so
+ if monotonic() > self.last_heartbeat_sent + self.heartbeat_interval:
+ self.send_heartbeat()
+ self.last_heartbeat_sent = monotonic()
+
+ # if we've missed two intervals' heartbeats, fail; this gives the
+ # server enough time to send heartbeats a little late
+ if (self.last_heartbeat_received and
+ self.last_heartbeat_received + 2 * self.heartbeat_interval < monotonic()):
+ raise ConnectionForced('Too many heartbeats missed')
def _x_tune_ok(self, channel_max, frame_max, heartbeat):
"""Negotiate connection tuning parameters