summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-03-25 00:28:25 +0000
committerGerrit Code Review <review@openstack.org>2015-03-25 00:28:25 +0000
commit7cddd594abf0c76de32f532984e34eb28a5d8b2a (patch)
tree72c966f1c15f5c2c2b61087bf0f8a3c472f4fcb6
parentc21f8d99aa489e25cc3df379dec7a74f016c20c7 (diff)
parentac8bdb634c36c834a27cfa226f591fad86acbfdd (diff)
downloadoslo-messaging-7cddd594abf0c76de32f532984e34eb28a5d8b2a.tar.gz
Merge "cleanup connection pool return" into stable/kilo
-rw-r--r--oslo_messaging/_drivers/amqp.py12
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py9
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py13
3 files changed, 31 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index 0d3dc51..0b8cafa 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection):
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
- self.connection.reset()
- self.connection_pool.put(self.connection)
+ try:
+ self.connection.reset()
+ except Exception:
+ LOG.exception("Fail to reset the connection, drop it")
+ try:
+ self.connection.close()
+ except Exception:
+ pass
+ else:
+ self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 15273a6..b24e67a 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -884,8 +884,15 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
+ recoverable_errors = (self.connection.recoverable_channel_errors +
+ self.connection.recoverable_connection_errors)
+
with self._connection_lock:
- self._set_current_channel(self.connection.channel())
+ try:
+ self._set_current_channel(self.connection.channel())
+ except recoverable_errors:
+ self._set_current_channel(None)
+ self.ensure_connection()
self.consumers = []
self.consumer_num = itertools.count(1)
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 8830125..3f3145f 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time()))
+ def test_connection_reset_always_succeed(self):
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ channel = mock.Mock()
+ conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
+ ).connection
+ conn.connection.recoverable_channel_errors = (IOError,)
+ with mock.patch.object(conn.connection, 'channel',
+ side_effect=[IOError, IOError, channel]):
+ conn.reset()
+ self.assertEqual(channel, conn.channel)
+
class TestRabbitTransportURL(test_utils.BaseTestCase):