diff options
| author | Alan Conway <aconway@apache.org> | 2014-01-09 22:30:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-01-09 22:30:26 +0000 |
| commit | 1bbac3489bb654963ba8629a5f3194e7ea56b23c (patch) | |
| tree | 689c91aa4181a73cf85a0b029b99e5f41dc4616c | |
| parent | 105190c196b37637c6f1eb8d7081457c4bd67716 (diff) | |
| download | qpid-python-1bbac3489bb654963ba8629a5f3194e7ea56b23c.tar.gz | |
QPID-5428: Heartbeats not in use when attempting to connect with python client.
Heartbeats ignored when opening a connection, could hang indefinitely
Need to cover 3 cases (test included):
- Connect sucessful but then broker stalls.
- Connect to a stalled broker that never responds.
- Fail-over to a stalled broker that never responds
All cases are handled by the following fixes to driver.py:
- Check for heartbeats even before engine._connected since we may time out
before receiving open-ok if the peer is stalled and never sends data.
- Set _last_in and _last_out so that we time heartbeats from the start of the
connection if no data is ever sent or received.
- Call self.update_status in Driver.timeout to detect connection closed due to
heartbeat timeout (rather than a readable or writeable event.)
Make update_status a no-op if engine or transport are not yet set up.
- Don't consider reconnect complete in connect(), wait till we get the open-ok.
See the comment on Driver._check_retry_ok()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556971 13f79535-47bb-0310-9956-ffa450edef68
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 27 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 55 |
2 files changed, 64 insertions, 18 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 7a292f177a..b8644ab0fa 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -233,6 +233,33 @@ class ReplicationTests(HaBrokerTest): c.close() finally: l.restore() + + def test_heartbeat_python(self): + """Verify that a python client with a heartbeat specified disconnects + from a stalled broker and does not hang indefinitely.""" + + broker = Broker(self) + broker_addr = broker.host_port() + + # Case 1: Connect before stalling the broker, use the connection after stalling. + c = Connection(broker_addr, heartbeat=1) + c.open() + os.kill(broker.pid, signal.SIGSTOP) # Stall the broker + self.assertRaises(ConnectionError, c.session().sender, "foo") + + # Case 2: Connect to a stalled broker + c = Connection(broker_addr, heartbeat=1) + self.assertRaises(ConnectionError, c.open) + + # Case 3: Re-connect to a stalled broker. + broker2 = Broker(self) + c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1, + reconnect=True, reconnect_urls=[broker_addr], + reconnect_log=False) # Hide expected warnings + c.open() + broker2.kill() # Cause re-connection to broker + self.assertRaises(ConnectionError, c.session().sender, "foo") + def test_failover_cpp(self): """Verify that failover works in the C++ client.""" cluster = HaCluster(self, 2) diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 06bbe610b8..43c42273ea 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -362,11 +362,11 @@ class Driver: [(u.host, default(u.port, 5672)) for u in urls] if self._host >= len(hosts): self._host = 0 - result = hosts[self._host] + self._last_host = hosts[self._host] if self._host == 0: self._attempts += 1 self._host = self._host + 1 - return result + return self._last_host def _num_hosts(self): return len(self.connection.reconnect_urls) + 1 @@ -401,6 +401,24 @@ class Driver: def timing(self): return self._timeout + def _check_retry_ok(self): + """We consider a reconnect to have suceeded only when we have received + open-ok from the peer. + + If we declared success as soon as the transport connected, then we could get + into an infinite heartbeat loop if the remote process is hung and never + sends us any data. We would fail the connection after 2 missed heartbeats, + reconnect the transport, declare the reconnect ok, then fail again after 2 + missed heartbeats and so on. + """ + if self._retrying and self.engine._connected: # Means we have received open-ok. + if self._reconnect_log: + log.warn("reconnect succeeded: %s:%s", *self._last_host) + self._next_retry = None + self._attempts = 0 + self._delay = self.connection.reconnect_interval_min + self._retrying = False + @synchronized def readable(self): try: @@ -410,6 +428,7 @@ class Driver: elif data: rawlog.debug("READ[%s]: %r", self.log_id, data) self.engine.write(data) + self._check_retry_ok() else: self.close_engine() except socket.error, e: @@ -451,13 +470,14 @@ class Driver: self.schedule() def update_status(self): + if not self.engine: return False status = self.engine.status() return getattr(self, "st_%s" % status.lower())() def st_closed(self): # XXX: this log statement seems to sometimes hit when the socket is not connected # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) - self._transport.close() + if self._transport: self._transport.close() self._transport = None self.engine = None return True @@ -483,6 +503,7 @@ class Driver: @synchronized def timeout(self): self.dispatch() + self.update_status() self._notify() self.schedule() @@ -531,12 +552,6 @@ class Driver: self._transport = trans(self.connection, host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) - if self._retrying and self._reconnect_log: - log.warn("reconnect succeeded: %s:%s", host, port) - self._next_retry = None - self._attempts = 0 - self._delay = self.connection.reconnect_interval_min - self._retrying = False self.schedule() except socket.error, e: self.close_engine(ConnectError(text=str(e))) @@ -589,8 +604,10 @@ class Engine: self._status = CLOSED self._buf = "" self._hdr = "" - self._last_in = None - self._last_out = None + # Set _last_in and _last_out here so heartbeats will be timed from the + # beginning of connection if no data is sent/received. + self._last_in = time.time() + self._last_out = time.time() self._op_enc = OpEncoder() self._seg_enc = SegmentEncoder() self._frame_enc = FrameEncoder() @@ -813,13 +830,15 @@ class Engine: self.attach(ssn) self.process(ssn) - if self.connection.heartbeat and self._status != CLOSED: - now = time.time() - if self._last_in is not None and \ - now - self._last_in > 2*self.connection.heartbeat: - raise HeartbeatTimeout(text="heartbeat timeout") - if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: - self.write_op(ConnectionHeartbeat()) + # We need to check heartbeat even if not self._connected since we may have + # heartbeat timeout before receiving an open-ok + if self.connection.heartbeat and self._status != CLOSED and not self._closing: + now = time.time() + if now - self._last_in > 2*self.connection.heartbeat: + raise HeartbeatTimeout(text="heartbeat timeout") + # Only send heartbeats if we are connected. + if self._connected and now - self._last_out >= self.connection.heartbeat/2.0: + self.write_op(ConnectionHeartbeat()) def open(self): self._reset() |
