summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2015-03-18 08:31:32 +0100
committerMehdi Abaakouk <mehdi.abaakouk@enovance.com>2015-03-24 17:34:55 +0100
commitac8bdb634c36c834a27cfa226f591fad86acbfdd (patch)
treef835fd4993e22745bd1efa3705cbe9f3757ad4ac
parentee18dc5eed7b9dfa4739fed760bd4c4a0478ef51 (diff)
downloadoslo-messaging-ac8bdb634c36c834a27cfa226f591fad86acbfdd.tar.gz
cleanup connection pool return
This change ensures that connections that fail to return to the pool are cleanly closed and exception raised are not returned to the caller. For rabbit, we also try to reconnection in case of connection failure, before dropping the connection. Closes-bug: #1433458 Change-Id: Ic714db7b8be9df8b6935a903732c60aaea0bc404 (cherry picked from commit 0dff20b8b979d902d855aa1c0ae0f4b9398a3116)
-rw-r--r--oslo_messaging/_drivers/amqp.py12
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py9
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py13
3 files changed, 31 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index 0d3dc51..0b8cafa 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection):
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
- self.connection.reset()
- self.connection_pool.put(self.connection)
+ try:
+ self.connection.reset()
+ except Exception:
+ LOG.exception("Fail to reset the connection, drop it")
+ try:
+ self.connection.close()
+ except Exception:
+ pass
+ else:
+ self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 15273a6..b24e67a 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -884,8 +884,15 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
+ recoverable_errors = (self.connection.recoverable_channel_errors +
+ self.connection.recoverable_connection_errors)
+
with self._connection_lock:
- self._set_current_channel(self.connection.channel())
+ try:
+ self._set_current_channel(self.connection.channel())
+ except recoverable_errors:
+ self._set_current_channel(None)
+ self.ensure_connection()
self.consumers = []
self.consumer_num = itertools.count(1)
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 8830125..3f3145f 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time()))
+ def test_connection_reset_always_succeed(self):
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ channel = mock.Mock()
+ conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
+ ).connection
+ conn.connection.recoverable_channel_errors = (IOError,)
+ with mock.patch.object(conn.connection, 'channel',
+ side_effect=[IOError, IOError, channel]):
+ conn.reset()
+ self.assertEqual(channel, conn.channel)
+
class TestRabbitTransportURL(test_utils.BaseTestCase):