diff options
author | Brian D. Elliott <bdelliott@gmail.com> | 2015-02-03 20:26:33 +0000 |
---|---|---|
committer | Brian D. Elliott <bdelliott@gmail.com> | 2015-02-03 20:26:33 +0000 |
commit | aac3b4b7e2e0bd20e8044f716068637329e48feb (patch) | |
tree | 37880848dac501d19266cce59d19a744734d9d6c | |
parent | 9b27095a001042a59c105828a5a3ea34f44375cf (diff) | |
download | nova-aac3b4b7e2e0bd20e8044f716068637329e48feb.tar.gz |
Fix cells rpc connection leak
Only create a Transport object once for each inter-cell hop. This
prevents the cells rpc driver from creating a new connection pool
(and connection) for each inter-cell message sent. This affects the
RabbitMQ and qpid transports.
This is a regression introduced by oslo.messaging commit
f3370da11a867bae287d7f549a671811e8b399ef which got rid stateful
tracking of connection pool references within oslo.messaging.
It is now the responsibility of the caller to manage these
references.
See related bug:
https://bugs.launchpad.net/oslo.messaging/+bug/1397925
Change-Id: Id1e75f456d4c0ef5b87bf3efe810e9fcfa4cce1d
Closes-Bug: #1417745
-rw-r--r-- | nova/cells/rpc_driver.py | 20 | ||||
-rw-r--r-- | nova/tests/unit/cells/test_cells_rpc_driver.py | 21 |
2 files changed, 38 insertions, 3 deletions
diff --git a/nova/cells/rpc_driver.py b/nova/cells/rpc_driver.py index fbf6f5a524..568d677fc5 100644 --- a/nova/cells/rpc_driver.py +++ b/nova/cells/rpc_driver.py @@ -114,12 +114,11 @@ class InterCellRPCAPI(object): self.version_cap = ( self.VERSION_ALIASES.get(CONF.upgrade_levels.intercell, CONF.upgrade_levels.intercell)) + self.transports = {} def _get_client(self, next_hop, topic): """Turn the DB information for a cell into a messaging.RPCClient.""" - transport_url = next_hop.db_info['transport_url'] - transport = messaging.get_transport(cfg.CONF, transport_url, - rpc.TRANSPORT_ALIASES) + transport = self._get_transport(next_hop) target = messaging.Target(topic=topic, version='1.0') serializer = rpc.RequestContextSerializer(None) return messaging.RPCClient(transport, @@ -127,6 +126,21 @@ class InterCellRPCAPI(object): version_cap=self.version_cap, serializer=serializer) + def _get_transport(self, next_hop): + """NOTE(belliott) Each Transport object contains connection pool + state. Maintain references to them to avoid continual reconnects + to the message broker. + """ + transport_url = next_hop.db_info['transport_url'] + if transport_url not in self.transports: + transport = messaging.get_transport(cfg.CONF, transport_url, + rpc.TRANSPORT_ALIASES) + self.transports[transport_url] = transport + else: + transport = self.transports[transport_url] + + return transport + def send_message_to_cell(self, cell_state, message): """Send a message to another cell by JSON-ifying the message and making an RPC cast to 'process_message'. If the message says to diff --git a/nova/tests/unit/cells/test_cells_rpc_driver.py b/nova/tests/unit/cells/test_cells_rpc_driver.py index 8c8e1921d5..bf5a656c9d 100644 --- a/nova/tests/unit/cells/test_cells_rpc_driver.py +++ b/nova/tests/unit/cells/test_cells_rpc_driver.py @@ -17,6 +17,7 @@ Tests For Cells RPC Communication Driver """ +import mock from mox3 import mox from oslo.config import cfg from oslo import messaging as oslo_messaging @@ -79,6 +80,26 @@ class CellsRPCDriverTestCase(test.NoDBTestCase): self.driver.stop_servers() self.assertEqual(fake_servers, call_info['stopped']) + def test_create_transport_once(self): + # should only construct each Transport once + rpcapi = self.driver.intercell_rpcapi + + transport_url = 'amqp://fakeurl' + next_hop = fakes.FakeCellState('cellname') + next_hop.db_info['transport_url'] = transport_url + + # first call to _get_transport creates a oslo.messaging.Transport obj + with mock.patch.object(oslo_messaging, 'get_transport') as get_trans: + transport = rpcapi._get_transport(next_hop) + get_trans.assert_called_once_with(rpc_driver.CONF, transport_url, + rpc.TRANSPORT_ALIASES) + self.assertIn(transport_url, rpcapi.transports) + self.assertEqual(transport, rpcapi.transports[transport_url]) + + # subsequent calls should return the pre-created Transport obj + transport2 = rpcapi._get_transport(next_hop) + self.assertEqual(transport, transport2) + def test_send_message_to_cell_cast(self): msg_runner = fakes.get_message_runner('api-cell') cell_state = fakes.get_cell_state('api-cell', 'child-cell2') |