summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDustin J. Mitchell <dustin@mozilla.com>2014-01-08 14:57:45 -0500
committerDustin J. Mitchell <dustin@mozilla.com>2014-01-08 14:58:02 -0500
commit7e75070f6012edda1e17dac9d8c725169da5fc06 (patch)
tree28e581c6ca118f7aaa37b18c849830d45e433ba1
parent89ef0d2d0d0f6577d4c5d56880f134638024f93e (diff)
downloadpy-amqp-7e75070f6012edda1e17dac9d8c725169da5fc06.tar.gz
Revamp heartbeat handling
Be conservative in heartbeat behavior: send at the minimum interval negotiated by the server and the client, and do not artificially abort a connection if we do not receive any heartbeats.
-rw-r--r--amqp/connection.py48
1 files changed, 29 insertions, 19 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index 9474ab2..aefa5c3 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -18,6 +18,7 @@ from __future__ import absolute_import
import logging
import socket
+import time
from array import array
try:
@@ -82,7 +83,7 @@ class Connection(AbstractChannel):
prev_sent = None
prev_recv = None
- missed_heartbeats = 0
+ next_heartbeat = None
def __init__(self, host='localhost', userid='guest', password='guest',
login_method='AMQPLAIN', login_response=None,
@@ -843,7 +844,11 @@ class Connection(AbstractChannel):
self.channel_max = args.read_short() or self.channel_max
self.frame_max = args.read_long() or self.frame_max
self.method_writer.frame_max = self.frame_max
- heartbeat = args.read_short() # noqa
+ heartbeat = args.read_short()
+ if heartbeat > 0 and self.heartbeat > 0:
+ self.heartbeat = min(self.heartbeat, heartbeat)
+ else:
+ self.heartbeat = heartbeat or self.heartbeat
self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
@@ -851,29 +856,34 @@ class Connection(AbstractChannel):
self.transport.write_frame(8, 0, bytes())
def heartbeat_tick(self, rate=2):
- """Verify that hartbeats are sent and received.
+ """Send heartbeat packets, if necessary. This should be called
+ frequently, on the order of once per second.
- :keyword rate: Rate is how often the tick is called
- compared to the actual heartbeat value. E.g. if
- the heartbeat is set to 3 seconds, and the tick
- is called every 3 / 2 seconds, then the rate is 2.
+ Heartbeats are underspecified in amqp-0.8, but comments in the 0.9 spec
+ suggest that the server may expect heartbeat frames at the rate
+ specified in the tuning negotiation, and fail the connection if such
+ frames are not received.
- """
- sent_now = self.method_writer.bytes_sent
- recv_now = self.method_reader.bytes_recv
+ On the client side, this method sends heartbeats, but does not measure
+ incoming traffic. This provides network traffic for transport-layer
+ protocols to use to discover connection failures.
- if self.prev_sent is not None and self.prev_sent == sent_now:
- self.send_heartbeat()
+ As the safest course, then, this method sends heartbeat packets at the
+ minimum of the server's and client's heartbeat rates.
- if self.prev_recv is not None and self.prev_recv == recv_now:
- self.missed_heartbeats += 1
- else:
- self.missed_heartbeats = 0
+ :keyword rate: Ignored
+ """
+ if not self.heartbeat:
+ return
- self.prev_sent, self.prev_recv = sent_now, recv_now
+ if self.next_heartbeat is not None and time.time() > self.next_heartbeat:
+ self.send_heartbeat()
+ self.prev_sent = None
- if self.missed_heartbeats >= rate:
- raise ConnectionForced('Too many heartbeats missed')
+ sent_now = self.method_writer.bytes_sent
+ if self.prev_sent is None or self.prev_sent != sent_now:
+ self.next_heartbeat = time.time() + self.heartbeat
+ self.prev_sent = sent_now
def _x_tune_ok(self, channel_max, frame_max, heartbeat):
"""Negotiate connection tuning parameters