summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian D. Elliott <bdelliott@gmail.com>2015-02-03 20:26:33 +0000
committerBrian D. Elliott <bdelliott@gmail.com>2015-02-03 20:26:33 +0000
commitaac3b4b7e2e0bd20e8044f716068637329e48feb (patch)
tree37880848dac501d19266cce59d19a744734d9d6c
parent9b27095a001042a59c105828a5a3ea34f44375cf (diff)
downloadnova-aac3b4b7e2e0bd20e8044f716068637329e48feb.tar.gz
Fix cells rpc connection leak
Only create a Transport object once for each inter-cell hop. This prevents the cells rpc driver from creating a new connection pool (and connection) for each inter-cell message sent. This affects the RabbitMQ and qpid transports. This is a regression introduced by oslo.messaging commit f3370da11a867bae287d7f549a671811e8b399ef which got rid stateful tracking of connection pool references within oslo.messaging. It is now the responsibility of the caller to manage these references. See related bug: https://bugs.launchpad.net/oslo.messaging/+bug/1397925 Change-Id: Id1e75f456d4c0ef5b87bf3efe810e9fcfa4cce1d Closes-Bug: #1417745
-rw-r--r--nova/cells/rpc_driver.py20
-rw-r--r--nova/tests/unit/cells/test_cells_rpc_driver.py21
2 files changed, 38 insertions, 3 deletions
diff --git a/nova/cells/rpc_driver.py b/nova/cells/rpc_driver.py
index fbf6f5a524..568d677fc5 100644
--- a/nova/cells/rpc_driver.py
+++ b/nova/cells/rpc_driver.py
@@ -114,12 +114,11 @@ class InterCellRPCAPI(object):
self.version_cap = (
self.VERSION_ALIASES.get(CONF.upgrade_levels.intercell,
CONF.upgrade_levels.intercell))
+ self.transports = {}
def _get_client(self, next_hop, topic):
"""Turn the DB information for a cell into a messaging.RPCClient."""
- transport_url = next_hop.db_info['transport_url']
- transport = messaging.get_transport(cfg.CONF, transport_url,
- rpc.TRANSPORT_ALIASES)
+ transport = self._get_transport(next_hop)
target = messaging.Target(topic=topic, version='1.0')
serializer = rpc.RequestContextSerializer(None)
return messaging.RPCClient(transport,
@@ -127,6 +126,21 @@ class InterCellRPCAPI(object):
version_cap=self.version_cap,
serializer=serializer)
+ def _get_transport(self, next_hop):
+ """NOTE(belliott) Each Transport object contains connection pool
+ state. Maintain references to them to avoid continual reconnects
+ to the message broker.
+ """
+ transport_url = next_hop.db_info['transport_url']
+ if transport_url not in self.transports:
+ transport = messaging.get_transport(cfg.CONF, transport_url,
+ rpc.TRANSPORT_ALIASES)
+ self.transports[transport_url] = transport
+ else:
+ transport = self.transports[transport_url]
+
+ return transport
+
def send_message_to_cell(self, cell_state, message):
"""Send a message to another cell by JSON-ifying the message and
making an RPC cast to 'process_message'. If the message says to
diff --git a/nova/tests/unit/cells/test_cells_rpc_driver.py b/nova/tests/unit/cells/test_cells_rpc_driver.py
index 8c8e1921d5..bf5a656c9d 100644
--- a/nova/tests/unit/cells/test_cells_rpc_driver.py
+++ b/nova/tests/unit/cells/test_cells_rpc_driver.py
@@ -17,6 +17,7 @@
Tests For Cells RPC Communication Driver
"""
+import mock
from mox3 import mox
from oslo.config import cfg
from oslo import messaging as oslo_messaging
@@ -79,6 +80,26 @@ class CellsRPCDriverTestCase(test.NoDBTestCase):
self.driver.stop_servers()
self.assertEqual(fake_servers, call_info['stopped'])
+ def test_create_transport_once(self):
+ # should only construct each Transport once
+ rpcapi = self.driver.intercell_rpcapi
+
+ transport_url = 'amqp://fakeurl'
+ next_hop = fakes.FakeCellState('cellname')
+ next_hop.db_info['transport_url'] = transport_url
+
+ # first call to _get_transport creates a oslo.messaging.Transport obj
+ with mock.patch.object(oslo_messaging, 'get_transport') as get_trans:
+ transport = rpcapi._get_transport(next_hop)
+ get_trans.assert_called_once_with(rpc_driver.CONF, transport_url,
+ rpc.TRANSPORT_ALIASES)
+ self.assertIn(transport_url, rpcapi.transports)
+ self.assertEqual(transport, rpcapi.transports[transport_url])
+
+ # subsequent calls should return the pre-created Transport obj
+ transport2 = rpcapi._get_transport(next_hop)
+ self.assertEqual(transport, transport2)
+
def test_send_message_to_cell_cast(self):
msg_runner = fakes.get_message_runner('api-cell')
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')