summaryrefslogtreecommitdiff
path: root/amqp/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'amqp/connection.py')
-rw-r--r--amqp/connection.py69
1 files changed, 50 insertions, 19 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index 8808a58..1d4980f 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -34,7 +34,7 @@ from .exceptions import (
ConnectionForced, ConnectionError, error_for_code,
RecoverableConnectionError, RecoverableChannelError,
)
-from .five import items, range, values
+from .five import items, range, values, monotonic
from .method_framing import MethodReader, MethodWriter
from .serialization import AMQPWriter
from .transport import create_transport
@@ -80,9 +80,26 @@ class Connection(AbstractChannel):
"""
Channel = Channel
+ #: Final heartbeat interval value (in float seconds) after negotiation
+ heartbeat = None
+
+ #: Original heartbeat interval value proposed by client.
+ client_heartbeat = None
+
+ #: Original heartbeat interval proposed by server.
+ server_heartbeat = None
+
+ #: Time of last heartbeat sent (in monotonic time, if available).
+ last_heartbeat_sent = 0
+
+ #: Time of last heartbeat received (in monotonic time, if available).
+ last_heartbeat_received = 0
+
+ #: Number of bytes sent to socket at the last heartbeat check.
prev_sent = None
+
+ #: Number of bytes received from socket at the last heartbeat check.
prev_recv = None
- missed_heartbeats = 0
def __init__(self, host='localhost', userid='guest', password='guest',
login_method='AMQPLAIN', login_response=None,
@@ -125,7 +142,7 @@ class Connection(AbstractChannel):
# Properties set in the Tune method
self.channel_max = channel_max
self.frame_max = frame_max
- self.heartbeat = heartbeat
+ self.client_heartbeat = heartbeat
self.confirm_publish = confirm_publish
@@ -841,10 +858,18 @@ class Connection(AbstractChannel):
want a heartbeat.
"""
+ client_heartbeat = self.client_heartbeat or 0
self.channel_max = args.read_short() or self.channel_max
self.frame_max = args.read_long() or self.frame_max
self.method_writer.frame_max = self.frame_max
- heartbeat = args.read_short() # noqa
+ self.server_heartbeat = args.read_short() or 0
+
+ # negotiate the heartbeat interval to the smaller of the
+ # specified values
+ if self.server_heartbeat == 0 or client_heartbeat == 0:
+ self.heartbeat = max(self.server_heartbeat, client_heartbeat)
+ else:
+ self.heartbeat = min(self.server_heartbeat, client_heartbeat)
self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
@@ -852,28 +877,34 @@ class Connection(AbstractChannel):
self.transport.write_frame(8, 0, bytes())
def heartbeat_tick(self, rate=2):
- """Verify that hartbeats are sent and received.
-
- :keyword rate: Rate is how often the tick is called
- compared to the actual heartbeat value. E.g. if
- the heartbeat is set to 3 seconds, and the tick
- is called every 3 / 2 seconds, then the rate is 2.
+ """Send heartbeat packets, if necessary, and fail if none have been
+ received recently. This should be called frequently, on the order of
+ once per second.
+ :keyword rate: Ignored
"""
+ if not self.heartbeat:
+ return
+
+ # treat actual data exchange in either direction as a heartbeat
sent_now = self.method_writer.bytes_sent
recv_now = self.method_reader.bytes_recv
+ if self.prev_sent is None or self.prev_sent != sent_now:
+ self.last_heartbeat_sent = monotonic()
+ if self.prev_recv is None or self.prev_recv != recv_now:
+ self.last_heartbeat_received = monotonic()
+ self.prev_sent, self.prev_recv = sent_now, recv_now
- if self.prev_sent is not None and self.prev_sent == sent_now:
+ # send a heartbeat if it's time to do so
+ if monotonic() > self.last_heartbeat_sent + self.heartbeat:
self.send_heartbeat()
+ self.last_heartbeat_sent = monotonic()
- if self.prev_recv is not None and self.prev_recv == recv_now:
- self.missed_heartbeats += 1
- else:
- self.missed_heartbeats = 0
-
- self.prev_sent, self.prev_recv = sent_now, recv_now
-
- if self.missed_heartbeats >= rate:
+ # if we've missed two intervals' heartbeats, fail; this gives the
+ # server enough time to send heartbeats a little late
+ if (self.last_heartbeat_received and
+ self.last_heartbeat_received + 2 *
+ self.heartbeat < monotonic()):
raise ConnectionForced('Too many heartbeats missed')
def _x_tune_ok(self, channel_max, frame_max, heartbeat):