diff options
Diffstat (limited to 'nova/tests/network/test_rpcapi.py')
-rw-r--r-- | nova/tests/network/test_rpcapi.py | 353 |
1 files changed, 0 insertions, 353 deletions
diff --git a/nova/tests/network/test_rpcapi.py b/nova/tests/network/test_rpcapi.py deleted file mode 100644 index f33128cae3..0000000000 --- a/nova/tests/network/test_rpcapi.py +++ /dev/null @@ -1,353 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit Tests for nova.network.rpcapi -""" - -import collections - -import mox -from oslo.config import cfg - -from nova import context -from nova.network import rpcapi as network_rpcapi -from nova import test -from nova.tests import fake_instance - -CONF = cfg.CONF - - -class NetworkRpcAPITestCase(test.NoDBTestCase): - def setUp(self): - super(NetworkRpcAPITestCase, self).setUp() - self.flags(multi_host=True) - - # Used to specify the default value expected if no real value is passed - DefaultArg = collections.namedtuple('DefaultArg', ['value']) - - def _test_network_api(self, method, rpc_method, **kwargs): - ctxt = context.RequestContext('fake_user', 'fake_project') - - rpcapi = network_rpcapi.NetworkAPI() - self.assertIsNotNone(rpcapi.client) - self.assertEqual(rpcapi.client.target.topic, CONF.network_topic) - - expected_retval = 'foo' if rpc_method == 'call' else None - expected_version = kwargs.pop('version', None) - expected_fanout = kwargs.pop('fanout', None) - expected_kwargs = kwargs.copy() - - for k, v in expected_kwargs.items(): - if isinstance(v, self.DefaultArg): - expected_kwargs[k] = v.value - kwargs.pop(k) - - prepare_kwargs = {} - if expected_version: - prepare_kwargs['version'] = expected_version - if expected_fanout: - prepare_kwargs['fanout'] = True - - if 'source_compute' in expected_kwargs: - # Fix up for migrate_instance_* calls. - expected_kwargs['source'] = expected_kwargs.pop('source_compute') - expected_kwargs['dest'] = expected_kwargs.pop('dest_compute') - - targeted_methods = [ - 'lease_fixed_ip', 'release_fixed_ip', 'rpc_setup_network_on_host', - '_rpc_allocate_fixed_ip', 'deallocate_fixed_ip', 'update_dns', - '_associate_floating_ip', '_disassociate_floating_ip', - 'lease_fixed_ip', 'release_fixed_ip', 'migrate_instance_start', - 'migrate_instance_finish', - 'allocate_for_instance', 'deallocate_for_instance', - ] - targeted_by_instance = ['deallocate_for_instance'] - if method in targeted_methods and ('host' in expected_kwargs or - 'instance' in expected_kwargs): - if method in targeted_by_instance: - host = expected_kwargs['instance']['host'] - else: - host = expected_kwargs['host'] - if method not in ['allocate_for_instance', - 'deallocate_fixed_ip']: - expected_kwargs.pop('host') - if CONF.multi_host: - prepare_kwargs['server'] = host - - self.mox.StubOutWithMock(rpcapi, 'client') - - version_check = [ - 'deallocate_for_instance', 'deallocate_fixed_ip', - 'allocate_for_instance', - ] - if method in version_check: - rpcapi.client.can_send_version(mox.IgnoreArg()).AndReturn(True) - - if prepare_kwargs: - rpcapi.client.prepare(**prepare_kwargs).AndReturn(rpcapi.client) - - rpc_method = getattr(rpcapi.client, rpc_method) - rpc_method(ctxt, method, **expected_kwargs).AndReturn('foo') - - self.mox.ReplayAll() - - retval = getattr(rpcapi, method)(ctxt, **kwargs) - self.assertEqual(retval, expected_retval) - - def test_create_networks(self): - self._test_network_api('create_networks', rpc_method='call', - arg1='arg', arg2='arg') - - def test_delete_network(self): - self._test_network_api('delete_network', rpc_method='call', - uuid='fake_uuid', fixed_range='range') - - def test_disassociate_network(self): - self._test_network_api('disassociate_network', rpc_method='call', - network_uuid='fake_uuid') - - def test_associate_host_and_project(self): - self._test_network_api('associate', rpc_method='call', - network_uuid='fake_uuid', - associations={'host': "testHost", - 'project': 'testProject'}, - version="1.5") - - def test_get_fixed_ip(self): - self._test_network_api('get_fixed_ip', rpc_method='call', id='id') - - def test_get_fixed_ip_by_address(self): - self._test_network_api('get_fixed_ip_by_address', rpc_method='call', - address='a.b.c.d') - - def test_get_floating_ip(self): - self._test_network_api('get_floating_ip', rpc_method='call', id='id') - - def test_get_floating_ip_pools(self): - self._test_network_api('get_floating_ip_pools', rpc_method='call', - version="1.7") - - def test_get_floating_ip_by_address(self): - self._test_network_api('get_floating_ip_by_address', rpc_method='call', - address='a.b.c.d') - - def test_get_floating_ips_by_project(self): - self._test_network_api('get_floating_ips_by_project', - rpc_method='call') - - def test_get_floating_ips_by_fixed_address(self): - self._test_network_api('get_floating_ips_by_fixed_address', - rpc_method='call', fixed_address='w.x.y.z') - - def test_get_instance_id_by_floating_address(self): - self._test_network_api('get_instance_id_by_floating_address', - rpc_method='call', address='w.x.y.z') - - def test_allocate_floating_ip(self): - self._test_network_api('allocate_floating_ip', rpc_method='call', - project_id='fake_id', pool='fake_pool', auto_assigned=False) - - def test_deallocate_floating_ip(self): - self._test_network_api('deallocate_floating_ip', rpc_method='call', - address='addr', affect_auto_assigned=True) - - def test_allocate_floating_ip_no_multi(self): - self.flags(multi_host=False) - self._test_network_api('allocate_floating_ip', rpc_method='call', - project_id='fake_id', pool='fake_pool', auto_assigned=False) - - def test_deallocate_floating_ip_no_multi(self): - self.flags(multi_host=False) - self._test_network_api('deallocate_floating_ip', rpc_method='call', - address='addr', affect_auto_assigned=True) - - def test_associate_floating_ip(self): - self._test_network_api('associate_floating_ip', rpc_method='call', - floating_address='blah', fixed_address='foo', - affect_auto_assigned=True) - - def test_disassociate_floating_ip(self): - self._test_network_api('disassociate_floating_ip', rpc_method='call', - address='addr', affect_auto_assigned=True) - - def test_allocate_for_instance(self): - self._test_network_api('allocate_for_instance', rpc_method='call', - instance_id='fake_id', project_id='fake_id', host='fake_host', - rxtx_factor='fake_factor', vpn=False, requested_networks={}, - macs=[], version='1.13') - - def test_deallocate_for_instance(self): - instance = fake_instance.fake_instance_obj(context.get_admin_context()) - self._test_network_api('deallocate_for_instance', rpc_method='call', - requested_networks=self.DefaultArg(None), instance=instance, - version='1.11') - - def test_deallocate_for_instance_with_expected_networks(self): - instance = fake_instance.fake_instance_obj(context.get_admin_context()) - self._test_network_api('deallocate_for_instance', rpc_method='call', - instance=instance, requested_networks={}, version='1.11') - - def test_add_fixed_ip_to_instance(self): - self._test_network_api('add_fixed_ip_to_instance', rpc_method='call', - instance_id='fake_id', rxtx_factor='fake_factor', - host='fake_host', network_id='fake_id', version='1.9') - - def test_remove_fixed_ip_from_instance(self): - self._test_network_api('remove_fixed_ip_from_instance', - rpc_method='call', instance_id='fake_id', - rxtx_factor='fake_factor', host='fake_host', - address='fake_address', version='1.9') - - def test_add_network_to_project(self): - self._test_network_api('add_network_to_project', rpc_method='call', - project_id='fake_id', network_uuid='fake_uuid') - - def test_get_instance_nw_info(self): - self._test_network_api('get_instance_nw_info', rpc_method='call', - instance_id='fake_id', rxtx_factor='fake_factor', - host='fake_host', project_id='fake_id', version='1.9') - - def test_validate_networks(self): - self._test_network_api('validate_networks', rpc_method='call', - networks={}) - - def test_get_instance_uuids_by_ip_filter(self): - self._test_network_api('get_instance_uuids_by_ip_filter', - rpc_method='call', filters={}) - - def test_get_dns_domains(self): - self._test_network_api('get_dns_domains', rpc_method='call') - - def test_add_dns_entry(self): - self._test_network_api('add_dns_entry', rpc_method='call', - address='addr', name='name', dns_type='foo', domain='domain') - - def test_modify_dns_entry(self): - self._test_network_api('modify_dns_entry', rpc_method='call', - address='addr', name='name', domain='domain') - - def test_delete_dns_entry(self): - self._test_network_api('delete_dns_entry', rpc_method='call', - name='name', domain='domain') - - def test_delete_dns_domain(self): - self._test_network_api('delete_dns_domain', rpc_method='call', - domain='fake_domain') - - def test_get_dns_entries_by_address(self): - self._test_network_api('get_dns_entries_by_address', rpc_method='call', - address='fake_address', domain='fake_domain') - - def test_get_dns_entries_by_name(self): - self._test_network_api('get_dns_entries_by_name', rpc_method='call', - name='fake_name', domain='fake_domain') - - def test_create_private_dns_domain(self): - self._test_network_api('create_private_dns_domain', rpc_method='call', - domain='fake_domain', av_zone='fake_zone') - - def test_create_public_dns_domain(self): - self._test_network_api('create_public_dns_domain', rpc_method='call', - domain='fake_domain', project='fake_project') - - def test_setup_networks_on_host(self): - self._test_network_api('setup_networks_on_host', rpc_method='call', - instance_id='fake_id', host='fake_host', teardown=False) - - def test_lease_fixed_ip(self): - self._test_network_api('lease_fixed_ip', rpc_method='cast', - host='fake_host', address='fake_addr') - - def test_release_fixed_ip(self): - self._test_network_api('release_fixed_ip', rpc_method='cast', - host='fake_host', address='fake_addr') - - def test_set_network_host(self): - self._test_network_api('set_network_host', rpc_method='call', - network_ref={}) - - def test_rpc_setup_network_on_host(self): - self._test_network_api('rpc_setup_network_on_host', rpc_method='call', - network_id='fake_id', teardown=False, host='fake_host') - - def test_rpc_allocate_fixed_ip(self): - self._test_network_api('_rpc_allocate_fixed_ip', rpc_method='call', - instance_id='fake_id', network_id='fake_id', address='addr', - vpn=True, host='fake_host') - - def test_deallocate_fixed_ip(self): - instance = fake_instance.fake_db_instance() - self._test_network_api('deallocate_fixed_ip', rpc_method='call', - address='fake_addr', host='fake_host', instance=instance, - version='1.12') - - def test_update_dns(self): - self._test_network_api('update_dns', rpc_method='cast', fanout=True, - network_ids='fake_id', version='1.3') - - def test__associate_floating_ip(self): - self._test_network_api('_associate_floating_ip', rpc_method='call', - floating_address='fake_addr', fixed_address='fixed_address', - interface='fake_interface', host='fake_host', - instance_uuid='fake_uuid', version='1.6') - - def test__disassociate_floating_ip(self): - self._test_network_api('_disassociate_floating_ip', rpc_method='call', - address='fake_addr', interface='fake_interface', - host='fake_host', instance_uuid='fake_uuid', version='1.6') - - def test_migrate_instance_start(self): - self._test_network_api('migrate_instance_start', rpc_method='call', - instance_uuid='fake_instance_uuid', - rxtx_factor='fake_factor', - project_id='fake_project', - source_compute='fake_src_compute', - dest_compute='fake_dest_compute', - floating_addresses='fake_floating_addresses', - host=self.DefaultArg(None), - version='1.2') - - def test_migrate_instance_start_multi_host(self): - self._test_network_api('migrate_instance_start', rpc_method='call', - instance_uuid='fake_instance_uuid', - rxtx_factor='fake_factor', - project_id='fake_project', - source_compute='fake_src_compute', - dest_compute='fake_dest_compute', - floating_addresses='fake_floating_addresses', - host='fake_host', - version='1.2') - - def test_migrate_instance_finish(self): - self._test_network_api('migrate_instance_finish', rpc_method='call', - instance_uuid='fake_instance_uuid', - rxtx_factor='fake_factor', - project_id='fake_project', - source_compute='fake_src_compute', - dest_compute='fake_dest_compute', - floating_addresses='fake_floating_addresses', - host=self.DefaultArg(None), - version='1.2') - - def test_migrate_instance_finish_multi_host(self): - self._test_network_api('migrate_instance_finish', rpc_method='call', - instance_uuid='fake_instance_uuid', - rxtx_factor='fake_factor', - project_id='fake_project', - source_compute='fake_src_compute', - dest_compute='fake_dest_compute', - floating_addresses='fake_floating_addresses', - host='fake_host', - version='1.2') |