diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-03-25 00:28:25 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-03-25 00:28:25 +0000 |
commit | 7cddd594abf0c76de32f532984e34eb28a5d8b2a (patch) | |
tree | 72c966f1c15f5c2c2b61087bf0f8a3c472f4fcb6 | |
parent | c21f8d99aa489e25cc3df379dec7a74f016c20c7 (diff) | |
parent | ac8bdb634c36c834a27cfa226f591fad86acbfdd (diff) | |
download | oslo-messaging-7cddd594abf0c76de32f532984e34eb28a5d8b2a.tar.gz |
Merge "cleanup connection pool return" into stable/kilo
-rw-r--r-- | oslo_messaging/_drivers/amqp.py | 12 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 9 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 13 |
3 files changed, 31 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 0d3dc51..0b8cafa 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection): if self.pooled: # Reset the connection so it's ready for the next caller # to grab from the pool - self.connection.reset() - self.connection_pool.put(self.connection) + try: + self.connection.reset() + except Exception: + LOG.exception("Fail to reset the connection, drop it") + try: + self.connection.close() + except Exception: + pass + else: + self.connection_pool.put(self.connection) else: try: self.connection.close() diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 15273a6..b24e67a 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -884,8 +884,15 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" + recoverable_errors = (self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + with self._connection_lock: - self._set_current_channel(self.connection.channel()) + try: + self._set_current_channel(self.connection.channel()) + except recoverable_errors: + self._set_current_channel(None) + self.ensure_connection() self.consumers = [] self.consumer_num = itertools.count(1) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 8830125..3f3145f 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) + def test_connection_reset_always_succeed(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + channel = mock.Mock() + conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN + ).connection + conn.connection.recoverable_channel_errors = (IOError,) + with mock.patch.object(conn.connection, 'channel', + side_effect=[IOError, IOError, channel]): + conn.reset() + self.assertEqual(channel, conn.channel) + class TestRabbitTransportURL(test_utils.BaseTestCase): |