diff options
-rw-r--r-- | nova/context.py | 6 | ||||
-rw-r--r-- | nova/rpc.py | 86 | ||||
-rw-r--r-- | nova/tests/unit/compute/test_rpcapi.py | 2 | ||||
-rw-r--r-- | nova/tests/unit/test_context.py | 14 | ||||
-rw-r--r-- | nova/tests/unit/test_rpc.py | 154 |
5 files changed, 59 insertions, 203 deletions
diff --git a/nova/context.py b/nova/context.py index bc5a21a92e..4dfd025722 100644 --- a/nova/context.py +++ b/nova/context.py @@ -370,8 +370,12 @@ def set_target_cell(context, cell_mapping): """ # avoid circular import from nova import db + from nova import rpc db_connection_string = cell_mapping.database_connection context.db_connection = db.create_context_manager(db_connection_string) + if not cell_mapping.transport_url.startswith('none'): + context.mq_connection = rpc.create_transport( + cell_mapping.transport_url) @contextmanager @@ -386,8 +390,10 @@ def target_cell(context, cell_mapping): :param cell_mapping: A objects.CellMapping object """ original_db_connection = context.db_connection + original_mq_connection = context.mq_connection set_target_cell(context, cell_mapping) try: yield context finally: context.db_connection = original_db_connection + context.mq_connection = original_mq_connection diff --git a/nova/rpc.py b/nova/rpc.py index dacb6ccb92..f374a7f4e3 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -34,13 +34,11 @@ from oslo_messaging.rpc import dispatcher from oslo_serialization import jsonutils from oslo_service import periodic_task from oslo_utils import importutils -from oslo_utils import timeutils import nova.conf import nova.context import nova.exception from nova.i18n import _ -from nova import objects profiler = importutils.try_import("osprofiler.profiler") @@ -395,27 +393,14 @@ class LegacyValidatingNotifier(object): getattr(self.notifier, priority)(ctxt, event_type, payload) -class ClientWrapper(object): - def __init__(self, client): - self._client = client - self.last_access_time = timeutils.utcnow() - - @property - def client(self): - self.last_access_time = timeutils.utcnow() - return self._client - - class ClientRouter(periodic_task.PeriodicTasks): - """Creates and caches RPC clients that route to cells or the default. - - The default client connects to the API cell message queue. The rest of the - clients connect to compute cell message queues. + """Creates RPC clients that honor the context's RPC transport + or provides a default. """ + def __init__(self, default_client): super(ClientRouter, self).__init__(CONF) - self.clients = {} - self.clients['default'] = ClientWrapper(default_client) + self.default_client = default_client self.target = default_client.target self.version_cap = default_client.version_cap # NOTE(melwitt): Cells v1 does its own serialization and won't @@ -424,55 +409,24 @@ class ClientRouter(periodic_task.PeriodicTasks): # Prevent this empty context from overwriting the thread local copy self.run_periodic_tasks(nova.context.RequestContext(overwrite=False)) - def _client(self, context, cell_mapping=None): - if cell_mapping: - client_id = cell_mapping.uuid + def _client(self, context, transport=None): + if transport: + return messaging.RPCClient(transport, self.target, + version_cap=self.version_cap, + serializer=self.serializer) else: - client_id = 'default' - - try: - client = self.clients[client_id].client - except KeyError: - transport = create_transport(cell_mapping.transport_url) - client = messaging.RPCClient(transport, self.target, - version_cap=self.version_cap, - serializer=self.serializer) - self.clients[client_id] = ClientWrapper(client) - - return client - - @periodic_task.periodic_task - def _remove_stale_clients(self, context): - timeout = 60 - - def stale(client_id, last_access_time): - if timeutils.is_older_than(last_access_time, timeout): - LOG.debug('Removing stale RPC client: %s as it was last ' - 'accessed at %s', client_id, last_access_time) - return True - return False - - # Never expire the default client - items_copy = list(self.clients.items()) - for client_id, client_wrapper in items_copy: - if (client_id != 'default' and - stale(client_id, client_wrapper.last_access_time)): - del self.clients[client_id] + return self.default_client def by_instance(self, context, instance): - try: - cell_mapping = objects.InstanceMapping.get_by_instance_uuid( - context, instance.uuid).cell_mapping - except nova.exception.InstanceMappingNotFound: - # Not a cells v2 deployment - cell_mapping = None - return self._client(context, cell_mapping=cell_mapping) + """Deprecated.""" + if context.mq_connection: + return self._client(context, transport=context.mq_connection) + else: + return self.default_client def by_host(self, context, host): - try: - cell_mapping = objects.HostMapping.get_by_host( - context, host).cell_mapping - except nova.exception.HostMappingNotFound: - # Not a cells v2 deployment - cell_mapping = None - return self._client(context, cell_mapping=cell_mapping) + """Deprecated.""" + if context.mq_connection: + return self._client(context, transport=context.mq_connection) + else: + return self.default_client diff --git a/nova/tests/unit/compute/test_rpcapi.py b/nova/tests/unit/compute/test_rpcapi.py index 4d0eb75721..46f7558d5a 100644 --- a/nova/tests/unit/compute/test_rpcapi.py +++ b/nova/tests/unit/compute/test_rpcapi.py @@ -115,7 +115,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase): # This test wants to run the real prepare function, so must use # a real client object - default_client = rpcapi.router.clients['default'].client + default_client = rpcapi.router.default_client orig_prepare = default_client.prepare base_version = rpcapi.router.target.version diff --git a/nova/tests/unit/test_context.py b/nova/tests/unit/test_context.py index 33d72f7d7c..f1f2544194 100644 --- a/nova/tests/unit/test_context.py +++ b/nova/tests/unit/test_context.py @@ -290,18 +290,24 @@ class ContextTestCase(test.NoDBTestCase): mock_authorize.assert_called_once_with(ctxt, mock.sentinel.rule, mock.sentinel.target) + @mock.patch('nova.rpc.create_transport') @mock.patch('nova.db.create_context_manager') - def test_target_cell(self, mock_create_ctxt_mgr): - mock_create_ctxt_mgr.return_value = mock.sentinel.cm + def test_target_cell(self, mock_create_ctxt_mgr, mock_rpc): + mock_create_ctxt_mgr.return_value = mock.sentinel.cdb + mock_rpc.return_value = mock.sentinel.cmq ctxt = context.RequestContext('111', '222', roles=['admin', 'weasel']) # Verify the existing db_connection, if any, is restored ctxt.db_connection = mock.sentinel.db_conn - mapping = objects.CellMapping(database_connection='fake://') + ctxt.mq_connection = mock.sentinel.mq_conn + mapping = objects.CellMapping(database_connection='fake://', + transport_url='fake://') with context.target_cell(ctxt, mapping): - self.assertEqual(ctxt.db_connection, mock.sentinel.cm) + self.assertEqual(ctxt.db_connection, mock.sentinel.cdb) + self.assertEqual(ctxt.mq_connection, mock.sentinel.cmq) self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection) + self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection) def test_get_context(self): ctxt = context.get_context() diff --git a/nova/tests/unit/test_rpc.py b/nova/tests/unit/test_rpc.py index a265d45cbd..0c49762c9e 100644 --- a/nova/tests/unit/test_rpc.py +++ b/nova/tests/unit/test_rpc.py @@ -12,18 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. import copy -import datetime import fixtures import mock import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher from oslo_serialization import jsonutils -from oslo_utils import fixture as utils_fixture import testtools from nova import context -from nova import exception from nova import objects from nova import rpc from nova import test @@ -445,179 +442,72 @@ class TestProfilerRequestContextSerializer(test.NoDBTestCase): class TestClientRouter(test.NoDBTestCase): - @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid') - @mock.patch('nova.rpc.create_transport') @mock.patch('oslo_messaging.RPCClient') - def test_by_instance(self, mock_rpcclient, mock_create, mock_get): + def test_by_instance(self, mock_rpcclient): default_client = mock.Mock() cell_client = mock.Mock() mock_rpcclient.return_value = cell_client ctxt = mock.Mock() - cm = objects.CellMapping(uuid=uuids.cell_mapping, - transport_url='fake:///') - mock_get.return_value = objects.InstanceMapping(cell_mapping=cm) + ctxt.mq_connection = mock.sentinel.transport instance = objects.Instance(uuid=uuids.instance) router = rpc.ClientRouter(default_client) client = router.by_instance(ctxt, instance) - mock_get.assert_called_once_with(ctxt, instance.uuid) # verify a client was created by ClientRouter mock_rpcclient.assert_called_once_with( - mock_create.return_value, default_client.target, + mock.sentinel.transport, default_client.target, version_cap=default_client.version_cap, serializer=default_client.serializer) # verify cell client was returned self.assertEqual(cell_client, client) - # reset and check that cached client is returned the second time - mock_rpcclient.reset_mock() - mock_create.reset_mock() - mock_get.reset_mock() + @mock.patch('oslo_messaging.RPCClient') + def test_by_instance_untargeted(self, mock_rpcclient): + default_client = mock.Mock() + cell_client = mock.Mock() + mock_rpcclient.return_value = cell_client + ctxt = mock.Mock() + ctxt.mq_connection = None + instance = objects.Instance(uuid=uuids.instance) + router = rpc.ClientRouter(default_client) client = router.by_instance(ctxt, instance) - mock_get.assert_called_once_with(ctxt, instance.uuid) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - self.assertEqual(cell_client, client) - @mock.patch('nova.objects.HostMapping.get_by_host') - @mock.patch('nova.rpc.create_transport') + self.assertEqual(router.default_client, client) + self.assertFalse(mock_rpcclient.called) + @mock.patch('oslo_messaging.RPCClient') - def test_by_host(self, mock_rpcclient, mock_create, mock_get): + def test_by_host(self, mock_rpcclient): default_client = mock.Mock() cell_client = mock.Mock() mock_rpcclient.return_value = cell_client ctxt = mock.Mock() - cm = objects.CellMapping(uuid=uuids.cell_mapping, - transport_url='fake:///') - mock_get.return_value = objects.HostMapping(cell_mapping=cm) + ctxt.mq_connection = mock.sentinel.transport host = 'fake-host' router = rpc.ClientRouter(default_client) client = router.by_host(ctxt, host) - mock_get.assert_called_once_with(ctxt, host) # verify a client was created by ClientRouter mock_rpcclient.assert_called_once_with( - mock_create.return_value, default_client.target, + mock.sentinel.transport, default_client.target, version_cap=default_client.version_cap, serializer=default_client.serializer) # verify cell client was returned self.assertEqual(cell_client, client) - # reset and check that cached client is returned the second time - mock_rpcclient.reset_mock() - mock_create.reset_mock() - mock_get.reset_mock() - - client = router.by_host(ctxt, host) - mock_get.assert_called_once_with(ctxt, host) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - self.assertEqual(cell_client, client) - - @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid', - side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance)) - @mock.patch('nova.rpc.create_transport') @mock.patch('oslo_messaging.RPCClient') - def test_by_instance_not_found(self, mock_rpcclient, mock_create, - mock_get): - default_client = mock.Mock() - cell_client = mock.Mock() - mock_rpcclient.return_value = cell_client - ctxt = mock.Mock() - instance = objects.Instance(uuid=uuids.instance) - - router = rpc.ClientRouter(default_client) - client = router.by_instance(ctxt, instance) - - mock_get.assert_called_once_with(ctxt, instance.uuid) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - # verify default client was returned - self.assertEqual(default_client, client) - - @mock.patch('nova.objects.HostMapping.get_by_host', - side_effect=exception.HostMappingNotFound(name='fake-host')) - @mock.patch('nova.rpc.create_transport') - @mock.patch('oslo_messaging.RPCClient') - def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get): + def test_by_host_untargeted(self, mock_rpcclient): default_client = mock.Mock() cell_client = mock.Mock() mock_rpcclient.return_value = cell_client ctxt = mock.Mock() + ctxt.mq_connection = None host = 'fake-host' router = rpc.ClientRouter(default_client) client = router.by_host(ctxt, host) - mock_get.assert_called_once_with(ctxt, host) - mock_rpcclient.assert_not_called() - mock_create.assert_not_called() - # verify default client was returned - self.assertEqual(default_client, client) - - @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid') - @mock.patch('nova.rpc.create_transport') - @mock.patch('oslo_messaging.RPCClient') - def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get): - t0 = datetime.datetime(2016, 8, 9, 0, 0, 0) - time_fixture = self.useFixture(utils_fixture.TimeFixture(t0)) - - default_client = mock.Mock() - ctxt = mock.Mock() - - cm1 = objects.CellMapping(uuid=uuids.cell_mapping1, - transport_url='fake:///') - cm2 = objects.CellMapping(uuid=uuids.cell_mapping2, - transport_url='fake:///') - cm3 = objects.CellMapping(uuid=uuids.cell_mapping3, - transport_url='fake:///') - mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1), - objects.InstanceMapping(cell_mapping=cm2), - objects.InstanceMapping(cell_mapping=cm3), - objects.InstanceMapping(cell_mapping=cm3)] - instance1 = objects.Instance(uuid=uuids.instance1) - instance2 = objects.Instance(uuid=uuids.instance2) - instance3 = objects.Instance(uuid=uuids.instance3) - - router = rpc.ClientRouter(default_client) - cell1_client = router.by_instance(ctxt, instance1) - cell2_client = router.by_instance(ctxt, instance2) - - # default client, cell1 client, cell2 client - self.assertEqual(3, len(router.clients)) - expected = {'default': default_client, - uuids.cell_mapping1: cell1_client, - uuids.cell_mapping2: cell2_client} - for client_id, client in expected.items(): - self.assertEqual(client, router.clients[client_id].client) - - # expire cell1 client and cell2 client - time_fixture.advance_time_seconds(80) - - # add cell3 client - cell3_client = router.by_instance(ctxt, instance3) - - router._remove_stale_clients(ctxt) - - # default client, cell3 client - expected = {'default': default_client, - uuids.cell_mapping3: cell3_client} - self.assertEqual(2, len(router.clients)) - for client_id, client in expected.items(): - self.assertEqual(client, router.clients[client_id].client) - - # expire cell3 client - time_fixture.advance_time_seconds(80) - - # access cell3 client to refresh it - cell3_client = router.by_instance(ctxt, instance3) - - router._remove_stale_clients(ctxt) - - # default client and cell3 client should be there - self.assertEqual(2, len(router.clients)) - for client_id, client in expected.items(): - self.assertEqual(client, router.clients[client_id].client) + self.assertEqual(router.default_client, client) + self.assertFalse(mock_rpcclient.called) |