summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgordon chung <gord@live.ca>2015-05-26 14:36:23 -0400
committergord chung <gord@live.ca>2015-06-09 02:57:30 -0400
commit41d0d875a10b48eea89f46ae3c426a023dfae27c (patch)
tree3c87cc2ee58c6104816aaf1e263e2d9e6132b4c8
parentd416889d0ca73de62ffbf5ae58089795ce55ab35 (diff)
downloadoslo-messaging-41d0d875a10b48eea89f46ae3c426a023dfae27c.tar.gz
consumer connections not closed properly
heartbeat_thread is not set for listeners. when closing connection, it blindly checks heartbeat_thread and will throw an error causing connection to remain open. this patch explicitly sets heartbeat_thread to None. Change-Id: Ief3bf02f952882ecadf742cdd0bac8edd7812473 Closes-Bug: #1458917
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py1
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py13
2 files changed, 7 insertions, 7 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 437f2de..bba9008 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -685,6 +685,7 @@ class Connection(object):
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# the consume code does the heartbeat stuff
# we don't need a thread
+ self._heartbeat_thread = None
if purpose == rpc_amqp.PURPOSE_SEND:
self._heartbeat_start()
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 3d81c70..eab73ab 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -200,13 +200,12 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
channel = mock.Mock()
- conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
- ).connection
- conn.connection.recoverable_channel_errors = (IOError,)
- with mock.patch.object(conn.connection, 'channel',
- side_effect=[IOError, IOError, channel]):
- conn.reset()
- self.assertEqual(channel, conn.channel)
+ with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
+ conn.connection.connection.recoverable_channel_errors = (IOError,)
+ with mock.patch.object(conn.connection.connection, 'channel',
+ side_effect=[IOError, IOError, channel]):
+ conn.connection.reset()
+ self.assertEqual(channel, conn.connection.channel)
class TestRabbitTransportURL(test_utils.BaseTestCase):