diff options
author | Dustin J. Mitchell <dustin@mozilla.com> | 2014-01-10 11:22:11 -0500 |
---|---|---|
committer | Dustin J. Mitchell <dustin@mozilla.com> | 2014-01-10 11:40:09 -0500 |
commit | 5ec0fa6bc12e47269941f49bbd19a81ba4109b83 (patch) | |
tree | 3224621ffe9f2d812b46a270f8cea7f4fbe0a690 | |
parent | b10a8dfb70dc9d834c47bf89495f789fde9f77e9 (diff) | |
download | py-amqp-5ec0fa6bc12e47269941f49bbd19a81ba4109b83.tar.gz |
refactor heartbeats to follow rabbitmq-java-client's model
-rw-r--r-- | amqp/connection.py | 60 |
1 files changed, 32 insertions, 28 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index a28f36c..9ff14b2 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -82,7 +82,9 @@ class Connection(AbstractChannel): prev_sent = None prev_recv = None - next_heartbeat = None + heartbeat_interval = None + last_heartbeat_sent = 0 + last_heartbeat_received = 0 def __init__(self, host='localhost', userid='guest', password='guest', login_method='AMQPLAIN', login_response=None, @@ -125,7 +127,7 @@ class Connection(AbstractChannel): # Properties set in the Tune method self.channel_max = channel_max self.frame_max = frame_max - self.heartbeat = heartbeat + self.client_heartbeat = heartbeat self.confirm_publish = confirm_publish @@ -843,46 +845,48 @@ class Connection(AbstractChannel): self.channel_max = args.read_short() or self.channel_max self.frame_max = args.read_long() or self.frame_max self.method_writer.frame_max = self.frame_max - heartbeat = args.read_short() - if heartbeat > 0 and self.heartbeat > 0: - self.heartbeat = min(self.heartbeat, heartbeat) + self.server_heartbeat = args.read_short() + + # negotiate the heartbeat interval to the smaller of the specified values + if self.server_heartbeat == 0 or self.client_heartbeat == 0: + self.heartbeat_interval = max(self.server_heartbeat, self.client_heartbeat) else: - self.heartbeat = heartbeat or self.heartbeat + self.heartbeat_interval = min(self.server_heartbeat, self.client_heartbeat) - self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat) + self._x_tune_ok(self.channel_max, self.frame_max, self.client_heartbeat) def send_heartbeat(self): self.transport.write_frame(8, 0, bytes()) def heartbeat_tick(self, rate=2): - """Send heartbeat packets, if necessary. This should be called - frequently, on the order of once per second. - - Heartbeats are underspecified in amqp-0.8, but comments in the 0.9 spec - suggest that the server may expect heartbeat frames at the rate - specified in the tuning negotiation, and fail the connection if such - frames are not received. - - On the client side, this method sends heartbeats, but does not measure - incoming traffic. This provides network traffic for transport-layer - protocols to use to discover connection failures. - - As the safest course, then, this method sends heartbeat packets at the - minimum of the server's and client's heartbeat rates. + """Send heartbeat packets, if necessary, and fail if none have been + received recently. This should be called frequently, on the order of + once per second. :keyword rate: Ignored """ - if not self.heartbeat: + if not self.heartbeat_interval: return - if self.next_heartbeat is not None and monotonic() > self.next_heartbeat: - self.send_heartbeat() - self.prev_sent = None - + # treat actual data exchange in either direction as a heartbeat sent_now = self.method_writer.bytes_sent + recv_now = self.method_reader.bytes_recv if self.prev_sent is None or self.prev_sent != sent_now: - self.next_heartbeat = monotonic() + self.heartbeat - self.prev_sent = sent_now + self.last_heartbeat_sent = monotonic() + if self.prev_recv is None or self.prev_recv != recv_now: + self.last_heartbeat_received = monotonic() + self.prev_sent, self.prev_recv = sent_now, recv_now + + # send a heartbeat if it's time to do so + if monotonic() > self.last_heartbeat_sent + self.heartbeat_interval: + self.send_heartbeat() + self.last_heartbeat_sent = monotonic() + + # if we've missed two intervals' heartbeats, fail; this gives the + # server enough time to send heartbeats a little late + if (self.last_heartbeat_received and + self.last_heartbeat_received + 2 * self.heartbeat_interval < monotonic()): + raise ConnectionForced('Too many heartbeats missed') def _x_tune_ok(self, channel_max, frame_max, heartbeat): """Negotiate connection tuning parameters |