diff options
Diffstat (limited to 'nova/tests/virt/xenapi/test_xenapi.py')
-rw-r--r-- | nova/tests/virt/xenapi/test_xenapi.py | 4104 |
1 files changed, 0 insertions, 4104 deletions
diff --git a/nova/tests/virt/xenapi/test_xenapi.py b/nova/tests/virt/xenapi/test_xenapi.py deleted file mode 100644 index 98e0658f55..0000000000 --- a/nova/tests/virt/xenapi/test_xenapi.py +++ /dev/null @@ -1,4104 +0,0 @@ -# Copyright (c) 2010 Citrix Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Test suite for XenAPI.""" - -import ast -import base64 -import contextlib -import copy -import functools -import os -import re - -import mock -import mox -from oslo.concurrency import lockutils -from oslo.config import cfg -from oslo.serialization import jsonutils -from oslo.utils import importutils - -from nova.compute import api as compute_api -from nova.compute import arch -from nova.compute import flavors -from nova.compute import hvtype -from nova.compute import power_state -from nova.compute import task_states -from nova.compute import utils as compute_utils -from nova.compute import vm_states -from nova.conductor import api as conductor_api -from nova import context -from nova import crypto -from nova import db -from nova import exception -from nova import objects -from nova.objects import instance as instance_obj -from nova.openstack.common.fixture import config as config_fixture -from nova.openstack.common import log as logging -from nova import test -from nova.tests.db import fakes as db_fakes -from nova.tests import fake_instance -from nova.tests import fake_network -from nova.tests import fake_processutils -import nova.tests.image.fake as fake_image -from nova.tests import matchers -from nova.tests.objects import test_aggregate -from nova.tests.virt.xenapi import stubs -from nova.virt import fake -from nova.virt.xenapi import agent -from nova.virt.xenapi.client import session as xenapi_session -from nova.virt.xenapi import driver as xenapi_conn -from nova.virt.xenapi import fake as xenapi_fake -from nova.virt.xenapi import host -from nova.virt.xenapi.image import glance -from nova.virt.xenapi import pool -from nova.virt.xenapi import pool_states -from nova.virt.xenapi import vm_utils -from nova.virt.xenapi import vmops -from nova.virt.xenapi import volume_utils - -LOG = logging.getLogger(__name__) - -CONF = cfg.CONF -CONF.import_opt('compute_manager', 'nova.service') -CONF.import_opt('network_manager', 'nova.service') -CONF.import_opt('compute_driver', 'nova.virt.driver') -CONF.import_opt('host', 'nova.netconf') -CONF.import_opt('default_availability_zone', 'nova.availability_zones') -CONF.import_opt('login_timeout', 'nova.virt.xenapi.client.session', - group="xenserver") - -IMAGE_MACHINE = '1' -IMAGE_KERNEL = '2' -IMAGE_RAMDISK = '3' -IMAGE_RAW = '4' -IMAGE_VHD = '5' -IMAGE_ISO = '6' -IMAGE_IPXE_ISO = '7' -IMAGE_FROM_VOLUME = '8' - -IMAGE_FIXTURES = { - IMAGE_MACHINE: { - 'image_meta': {'name': 'fakemachine', 'size': 0, - 'disk_format': 'ami', - 'container_format': 'ami'}, - }, - IMAGE_KERNEL: { - 'image_meta': {'name': 'fakekernel', 'size': 0, - 'disk_format': 'aki', - 'container_format': 'aki'}, - }, - IMAGE_RAMDISK: { - 'image_meta': {'name': 'fakeramdisk', 'size': 0, - 'disk_format': 'ari', - 'container_format': 'ari'}, - }, - IMAGE_RAW: { - 'image_meta': {'name': 'fakeraw', 'size': 0, - 'disk_format': 'raw', - 'container_format': 'bare'}, - }, - IMAGE_VHD: { - 'image_meta': {'name': 'fakevhd', 'size': 0, - 'disk_format': 'vhd', - 'container_format': 'ovf'}, - }, - IMAGE_ISO: { - 'image_meta': {'name': 'fakeiso', 'size': 0, - 'disk_format': 'iso', - 'container_format': 'bare'}, - }, - IMAGE_IPXE_ISO: { - 'image_meta': {'name': 'fake_ipxe_iso', 'size': 0, - 'disk_format': 'iso', - 'container_format': 'bare', - 'properties': {'ipxe_boot': 'true'}}, - }, - IMAGE_FROM_VOLUME: { - 'image_meta': {'name': 'fake_ipxe_iso', - 'properties': {'foo': 'bar'}}, - }, -} - - -def get_session(): - return xenapi_session.XenAPISession('test_url', 'root', 'test_pass') - - -def set_image_fixtures(): - image_service = fake_image.FakeImageService() - image_service.images.clear() - for image_id, image_meta in IMAGE_FIXTURES.items(): - image_meta = image_meta['image_meta'] - image_meta['id'] = image_id - image_service.create(None, image_meta) - - -def get_fake_device_info(): - # FIXME: 'sr_uuid', 'introduce_sr_keys', sr_type and vdi_uuid - # can be removed from the dict when LP bug #1087308 is fixed - fake_vdi_ref = xenapi_fake.create_vdi('fake-vdi', None) - fake_vdi_uuid = xenapi_fake.get_record('VDI', fake_vdi_ref)['uuid'] - fake = {'block_device_mapping': - [{'connection_info': {'driver_volume_type': 'iscsi', - 'data': {'sr_uuid': 'falseSR', - 'introduce_sr_keys': ['sr_type'], - 'sr_type': 'iscsi', - 'vdi_uuid': fake_vdi_uuid, - 'target_discovered': False, - 'target_iqn': 'foo_iqn:foo_volid', - 'target_portal': 'localhost:3260', - 'volume_id': 'foo_volid', - 'target_lun': 1, - 'auth_password': 'my-p@55w0rd', - 'auth_username': 'johndoe', - 'auth_method': u'CHAP'}, }, - 'mount_device': 'vda', - 'delete_on_termination': False}, ], - 'root_device_name': '/dev/sda', - 'ephemerals': [], - 'swap': None, } - return fake - - -def stub_vm_utils_with_vdi_attached_here(function): - """vm_utils.with_vdi_attached_here needs to be stubbed out because it - calls down to the filesystem to attach a vdi. This provides a - decorator to handle that. - """ - @functools.wraps(function) - def decorated_function(self, *args, **kwargs): - @contextlib.contextmanager - def fake_vdi_attached_here(*args, **kwargs): - fake_dev = 'fakedev' - yield fake_dev - - def fake_image_download(*args, **kwargs): - pass - - orig_vdi_attached_here = vm_utils.vdi_attached_here - orig_image_download = fake_image._FakeImageService.download - try: - vm_utils.vdi_attached_here = fake_vdi_attached_here - fake_image._FakeImageService.download = fake_image_download - return function(self, *args, **kwargs) - finally: - fake_image._FakeImageService.download = orig_image_download - vm_utils.vdi_attached_here = orig_vdi_attached_here - - return decorated_function - - -def get_create_system_metadata(context, instance_type_id): - flavor = db.flavor_get(context, instance_type_id) - return flavors.save_flavor_info({}, flavor) - - -def create_instance_with_system_metadata(context, instance_values): - instance_values['system_metadata'] = get_create_system_metadata( - context, instance_values['instance_type_id']) - instance_values['pci_devices'] = [] - return db.instance_create(context, instance_values) - - -class XenAPIVolumeTestCase(stubs.XenAPITestBaseNoDB): - """Unit tests for Volume operations.""" - def setUp(self): - super(XenAPIVolumeTestCase, self).setUp() - self.fixture = self.useFixture(config_fixture.Config(lockutils.CONF)) - self.fixture.config(disable_process_locking=True, - group='oslo_concurrency') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - - self.instance = fake_instance.fake_db_instance(name='foo') - - @classmethod - def _make_connection_info(cls): - target_iqn = 'iqn.2010-10.org.openstack:volume-00000001' - return {'driver_volume_type': 'iscsi', - 'data': {'volume_id': 1, - 'target_iqn': target_iqn, - 'target_portal': '127.0.0.1:3260,fake', - 'target_lun': None, - 'auth_method': 'CHAP', - 'auth_username': 'username', - 'auth_password': 'password'}} - - def test_attach_volume(self): - # This shows how to test Ops classes' methods. - stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vm = xenapi_fake.create_vm(self.instance['name'], 'Running') - conn_info = self._make_connection_info() - self.assertIsNone( - conn.attach_volume(None, conn_info, self.instance, '/dev/sdc')) - - # check that the VM has a VBD attached to it - # Get XenAPI record for VBD - vbds = xenapi_fake.get_all('VBD') - vbd = xenapi_fake.get_record('VBD', vbds[0]) - vm_ref = vbd['VM'] - self.assertEqual(vm_ref, vm) - - def test_attach_volume_raise_exception(self): - # This shows how to test when exceptions are raised. - stubs.stubout_session(self.stubs, - stubs.FakeSessionForVolumeFailedTests) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - xenapi_fake.create_vm(self.instance['name'], 'Running') - self.assertRaises(exception.VolumeDriverNotFound, - conn.attach_volume, - None, {'driver_volume_type': 'nonexist'}, - self.instance, '/dev/sdc') - - -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIVMTestCase(stubs.XenAPITestBase): - """Unit tests for VM operations.""" - def setUp(self): - super(XenAPIVMTestCase, self).setUp() - self.useFixture(test.SampleNetworks()) - self.network = importutils.import_object(CONF.network_manager) - self.fixture = self.useFixture(config_fixture.Config(lockutils.CONF)) - self.fixture.config(disable_process_locking=True, - group='oslo_concurrency') - self.flags(instance_name_template='%d', - firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - db_fakes.stub_out_db_instance_api(self.stubs) - xenapi_fake.create_network('fake', 'fake_br1') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - stubs.stubout_get_this_vm_uuid(self.stubs) - stubs.stub_out_vm_methods(self.stubs) - fake_processutils.stub_out_processutils_execute(self.stubs) - self.user_id = 'fake' - self.project_id = 'fake' - self.context = context.RequestContext(self.user_id, self.project_id) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.conn._session.is_local_connection = False - - fake_image.stub_out_image_service(self.stubs) - set_image_fixtures() - stubs.stubout_image_service_download(self.stubs) - stubs.stubout_stream_disk(self.stubs) - - def fake_inject_instance_metadata(self, instance, vm): - pass - self.stubs.Set(vmops.VMOps, '_inject_instance_metadata', - fake_inject_instance_metadata) - - def fake_safe_copy_vdi(session, sr_ref, instance, vdi_to_copy_ref): - name_label = "fakenamelabel" - disk_type = "fakedisktype" - virtual_size = 777 - return vm_utils.create_vdi( - session, sr_ref, instance, name_label, disk_type, - virtual_size) - self.stubs.Set(vm_utils, '_safe_copy_vdi', fake_safe_copy_vdi) - - def tearDown(self): - fake_image.FakeImageService_reset() - super(XenAPIVMTestCase, self).tearDown() - - def test_init_host(self): - session = get_session() - vm = vm_utils._get_this_vm_ref(session) - # Local root disk - vdi0 = xenapi_fake.create_vdi('compute', None) - vbd0 = xenapi_fake.create_vbd(vm, vdi0) - # Instance VDI - vdi1 = xenapi_fake.create_vdi('instance-aaaa', None, - other_config={'nova_instance_uuid': 'aaaa'}) - xenapi_fake.create_vbd(vm, vdi1) - # Only looks like instance VDI - vdi2 = xenapi_fake.create_vdi('instance-bbbb', None) - vbd2 = xenapi_fake.create_vbd(vm, vdi2) - - self.conn.init_host(None) - self.assertEqual(set(xenapi_fake.get_all('VBD')), set([vbd0, vbd2])) - - def test_instance_exists(self): - self.mox.StubOutWithMock(vm_utils, 'lookup') - vm_utils.lookup(mox.IgnoreArg(), 'foo').AndReturn(True) - self.mox.ReplayAll() - - self.stubs.Set(objects.Instance, 'name', 'foo') - instance = objects.Instance(uuid='fake-uuid') - self.assertTrue(self.conn.instance_exists(instance)) - - def test_instance_not_exists(self): - self.mox.StubOutWithMock(vm_utils, 'lookup') - vm_utils.lookup(mox.IgnoreArg(), 'bar').AndReturn(None) - self.mox.ReplayAll() - - self.stubs.Set(objects.Instance, 'name', 'bar') - instance = objects.Instance(uuid='fake-uuid') - self.assertFalse(self.conn.instance_exists(instance)) - - def test_list_instances_0(self): - instances = self.conn.list_instances() - self.assertEqual(instances, []) - - def test_list_instance_uuids_0(self): - instance_uuids = self.conn.list_instance_uuids() - self.assertEqual(instance_uuids, []) - - def test_list_instance_uuids(self): - uuids = [] - for x in xrange(1, 4): - instance = self._create_instance(x) - uuids.append(instance['uuid']) - instance_uuids = self.conn.list_instance_uuids() - self.assertEqual(len(uuids), len(instance_uuids)) - self.assertEqual(set(uuids), set(instance_uuids)) - - def test_get_rrd_server(self): - self.flags(connection_url='myscheme://myaddress/', - group='xenserver') - server_info = vm_utils._get_rrd_server() - self.assertEqual(server_info[0], 'myscheme') - self.assertEqual(server_info[1], 'myaddress') - - expected_raw_diagnostics = { - 'vbd_xvdb_write': '0.0', - 'memory_target': '4294967296.0000', - 'memory_internal_free': '1415564.0000', - 'memory': '4294967296.0000', - 'vbd_xvda_write': '0.0', - 'cpu0': '0.0042', - 'vif_0_tx': '287.4134', - 'vbd_xvda_read': '0.0', - 'vif_0_rx': '1816.0144', - 'vif_2_rx': '0.0', - 'vif_2_tx': '0.0', - 'vbd_xvdb_read': '0.0', - 'last_update': '1328795567', - } - - def test_get_diagnostics(self): - def fake_get_rrd(host, vm_uuid): - path = os.path.dirname(os.path.realpath(__file__)) - with open(os.path.join(path, 'vm_rrd.xml')) as f: - return re.sub(r'\s', '', f.read()) - self.stubs.Set(vm_utils, '_get_rrd', fake_get_rrd) - - expected = self.expected_raw_diagnostics - instance = self._create_instance() - actual = self.conn.get_diagnostics(instance) - self.assertThat(actual, matchers.DictMatches(expected)) - - def test_get_instance_diagnostics(self): - def fake_get_rrd(host, vm_uuid): - path = os.path.dirname(os.path.realpath(__file__)) - with open(os.path.join(path, 'vm_rrd.xml')) as f: - return re.sub(r'\s', '', f.read()) - self.stubs.Set(vm_utils, '_get_rrd', fake_get_rrd) - - expected = { - 'config_drive': False, - 'state': 'running', - 'driver': 'xenapi', - 'version': '1.0', - 'uptime': 0, - 'hypervisor_os': None, - 'cpu_details': [{'time': 0}, {'time': 0}, - {'time': 0}, {'time': 0}], - 'nic_details': [{'mac_address': '00:00:00:00:00:00', - 'rx_drop': 0, - 'rx_errors': 0, - 'rx_octets': 0, - 'rx_packets': 0, - 'tx_drop': 0, - 'tx_errors': 0, - 'tx_octets': 0, - 'tx_packets': 0}], - 'disk_details': [{'errors_count': 0, - 'id': '', - 'read_bytes': 0, - 'read_requests': 0, - 'write_bytes': 0, - 'write_requests': 0}], - 'memory_details': {'maximum': 8192, 'used': 0}} - - instance = self._create_instance() - actual = self.conn.get_instance_diagnostics(instance) - self.assertEqual(expected, actual.serialize()) - - def test_get_vnc_console(self): - instance = self._create_instance(obj=True) - session = get_session() - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vm_ref = vm_utils.lookup(session, instance['name']) - - console = conn.get_vnc_console(self.context, instance) - - # Note(sulo): We don't care about session id in test - # they will always differ so strip that out - actual_path = console.internal_access_path.split('&')[0] - expected_path = "/console?ref=%s" % str(vm_ref) - - self.assertEqual(expected_path, actual_path) - - def test_get_vnc_console_for_rescue(self): - instance = self._create_instance(obj=True) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - rescue_vm = xenapi_fake.create_vm(instance['name'] + '-rescue', - 'Running') - # Set instance state to rescued - instance['vm_state'] = 'rescued' - - console = conn.get_vnc_console(self.context, instance) - - # Note(sulo): We don't care about session id in test - # they will always differ so strip that out - actual_path = console.internal_access_path.split('&')[0] - expected_path = "/console?ref=%s" % str(rescue_vm) - - self.assertEqual(expected_path, actual_path) - - def test_get_vnc_console_instance_not_ready(self): - instance = self._create_instance(obj=True, spawn=False) - instance.vm_state = 'building' - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.assertRaises(exception.InstanceNotFound, - conn.get_vnc_console, self.context, instance) - - def test_get_vnc_console_rescue_not_ready(self): - instance = self._create_instance(obj=True, spawn=False) - instance.vm_state = 'rescued' - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.assertRaises(exception.InstanceNotReady, - conn.get_vnc_console, self.context, instance) - - def test_instance_snapshot_fails_with_no_primary_vdi(self): - - def create_bad_vbd(session, vm_ref, vdi_ref, userdevice, - vbd_type='disk', read_only=False, bootable=False, - osvol=False): - vbd_rec = {'VM': vm_ref, - 'VDI': vdi_ref, - 'userdevice': 'fake', - 'currently_attached': False} - vbd_ref = xenapi_fake._create_object('VBD', vbd_rec) - xenapi_fake.after_VBD_create(vbd_ref, vbd_rec) - return vbd_ref - - self.stubs.Set(vm_utils, 'create_vbd', create_bad_vbd) - stubs.stubout_instance_snapshot(self.stubs) - # Stubbing out firewall driver as previous stub sets alters - # xml rpc result parsing - stubs.stubout_firewall_driver(self.stubs, self.conn) - instance = self._create_instance() - - image_id = "my_snapshot_id" - self.assertRaises(exception.NovaException, self.conn.snapshot, - self.context, instance, image_id, - lambda *args, **kwargs: None) - - def test_instance_snapshot(self): - expected_calls = [ - {'args': (), - 'kwargs': - {'task_state': task_states.IMAGE_PENDING_UPLOAD}}, - {'args': (), - 'kwargs': - {'task_state': task_states.IMAGE_UPLOADING, - 'expected_state': task_states.IMAGE_PENDING_UPLOAD}}] - func_call_matcher = matchers.FunctionCallMatcher(expected_calls) - image_id = "my_snapshot_id" - - stubs.stubout_instance_snapshot(self.stubs) - stubs.stubout_is_snapshot(self.stubs) - # Stubbing out firewall driver as previous stub sets alters - # xml rpc result parsing - stubs.stubout_firewall_driver(self.stubs, self.conn) - - instance = self._create_instance() - - self.fake_upload_called = False - - def fake_image_upload(_self, ctx, session, inst, img_id, vdi_uuids): - self.fake_upload_called = True - self.assertEqual(ctx, self.context) - self.assertEqual(inst, instance) - self.assertIsInstance(vdi_uuids, list) - self.assertEqual(img_id, image_id) - - self.stubs.Set(glance.GlanceStore, 'upload_image', - fake_image_upload) - - self.conn.snapshot(self.context, instance, image_id, - func_call_matcher.call) - - # Ensure VM was torn down - vm_labels = [] - for vm_ref in xenapi_fake.get_all('VM'): - vm_rec = xenapi_fake.get_record('VM', vm_ref) - if not vm_rec["is_control_domain"]: - vm_labels.append(vm_rec["name_label"]) - - self.assertEqual(vm_labels, [instance['name']]) - - # Ensure VBDs were torn down - vbd_labels = [] - for vbd_ref in xenapi_fake.get_all('VBD'): - vbd_rec = xenapi_fake.get_record('VBD', vbd_ref) - vbd_labels.append(vbd_rec["vm_name_label"]) - - self.assertEqual(vbd_labels, [instance['name']]) - - # Ensure task states changed in correct order - self.assertIsNone(func_call_matcher.match()) - - # Ensure VDIs were torn down - for vdi_ref in xenapi_fake.get_all('VDI'): - vdi_rec = xenapi_fake.get_record('VDI', vdi_ref) - name_label = vdi_rec["name_label"] - self.assertFalse(name_label.endswith('snapshot')) - - self.assertTrue(self.fake_upload_called) - - def create_vm_record(self, conn, os_type, name): - instances = conn.list_instances() - self.assertEqual(instances, [name]) - - # Get Nova record for VM - vm_info = conn.get_info({'name': name}) - # Get XenAPI record for VM - vms = [rec for ref, rec - in xenapi_fake.get_all_records('VM').iteritems() - if not rec['is_control_domain']] - vm = vms[0] - self.vm_info = vm_info - self.vm = vm - - def check_vm_record(self, conn, instance_type_id, check_injection): - flavor = db.flavor_get(conn, instance_type_id) - mem_kib = long(flavor['memory_mb']) << 10 - mem_bytes = str(mem_kib << 10) - vcpus = flavor['vcpus'] - vcpu_weight = flavor['vcpu_weight'] - - self.assertEqual(self.vm_info['max_mem'], mem_kib) - self.assertEqual(self.vm_info['mem'], mem_kib) - self.assertEqual(self.vm['memory_static_max'], mem_bytes) - self.assertEqual(self.vm['memory_dynamic_max'], mem_bytes) - self.assertEqual(self.vm['memory_dynamic_min'], mem_bytes) - self.assertEqual(self.vm['VCPUs_max'], str(vcpus)) - self.assertEqual(self.vm['VCPUs_at_startup'], str(vcpus)) - if vcpu_weight is None: - self.assertEqual(self.vm['VCPUs_params'], {}) - else: - self.assertEqual(self.vm['VCPUs_params'], - {'weight': str(vcpu_weight), 'cap': '0'}) - - # Check that the VM is running according to Nova - self.assertEqual(self.vm_info['state'], power_state.RUNNING) - - # Check that the VM is running according to XenAPI. - self.assertEqual(self.vm['power_state'], 'Running') - - if check_injection: - xenstore_data = self.vm['xenstore_data'] - self.assertNotIn('vm-data/hostname', xenstore_data) - key = 'vm-data/networking/DEADBEEF0001' - xenstore_value = xenstore_data[key] - tcpip_data = ast.literal_eval(xenstore_value) - self.assertEqual(tcpip_data, - {'broadcast': '192.168.1.255', - 'dns': ['192.168.1.4', '192.168.1.3'], - 'gateway': '192.168.1.1', - 'gateway_v6': '2001:db8:0:1::1', - 'ip6s': [{'enabled': '1', - 'ip': '2001:db8:0:1:dcad:beff:feef:1', - 'netmask': 64, - 'gateway': '2001:db8:0:1::1'}], - 'ips': [{'enabled': '1', - 'ip': '192.168.1.100', - 'netmask': '255.255.255.0', - 'gateway': '192.168.1.1'}, - {'enabled': '1', - 'ip': '192.168.1.101', - 'netmask': '255.255.255.0', - 'gateway': '192.168.1.1'}], - 'label': 'test1', - 'mac': 'DE:AD:BE:EF:00:01'}) - - def check_vm_params_for_windows(self): - self.assertEqual(self.vm['platform']['nx'], 'true') - self.assertEqual(self.vm['HVM_boot_params'], {'order': 'dc'}) - self.assertEqual(self.vm['HVM_boot_policy'], 'BIOS order') - - # check that these are not set - self.assertEqual(self.vm['PV_args'], '') - self.assertEqual(self.vm['PV_bootloader'], '') - self.assertEqual(self.vm['PV_kernel'], '') - self.assertEqual(self.vm['PV_ramdisk'], '') - - def check_vm_params_for_linux(self): - self.assertEqual(self.vm['platform']['nx'], 'false') - self.assertEqual(self.vm['PV_args'], '') - self.assertEqual(self.vm['PV_bootloader'], 'pygrub') - - # check that these are not set - self.assertEqual(self.vm['PV_kernel'], '') - self.assertEqual(self.vm['PV_ramdisk'], '') - self.assertEqual(self.vm['HVM_boot_params'], {}) - self.assertEqual(self.vm['HVM_boot_policy'], '') - - def check_vm_params_for_linux_with_external_kernel(self): - self.assertEqual(self.vm['platform']['nx'], 'false') - self.assertEqual(self.vm['PV_args'], 'root=/dev/xvda1') - self.assertNotEqual(self.vm['PV_kernel'], '') - self.assertNotEqual(self.vm['PV_ramdisk'], '') - - # check that these are not set - self.assertEqual(self.vm['HVM_boot_params'], {}) - self.assertEqual(self.vm['HVM_boot_policy'], '') - - def _list_vdis(self): - session = get_session() - return session.call_xenapi('VDI.get_all') - - def _list_vms(self): - session = get_session() - return session.call_xenapi('VM.get_all') - - def _check_vdis(self, start_list, end_list): - for vdi_ref in end_list: - if vdi_ref not in start_list: - vdi_rec = xenapi_fake.get_record('VDI', vdi_ref) - # If the cache is turned on then the base disk will be - # there even after the cleanup - if 'other_config' in vdi_rec: - if 'image-id' not in vdi_rec['other_config']: - self.fail('Found unexpected VDI:%s' % vdi_ref) - else: - self.fail('Found unexpected VDI:%s' % vdi_ref) - - def _test_spawn(self, image_ref, kernel_id, ramdisk_id, - instance_type_id="3", os_type="linux", - hostname="test", architecture="x86-64", instance_id=1, - injected_files=None, check_injection=False, - create_record=True, empty_dns=False, - block_device_info=None, - key_data=None): - if injected_files is None: - injected_files = [] - - # Fake out inject_instance_metadata - def fake_inject_instance_metadata(self, instance, vm): - pass - self.stubs.Set(vmops.VMOps, '_inject_instance_metadata', - fake_inject_instance_metadata) - - if create_record: - instance = objects.Instance(context=self.context) - instance.project_id = self.project_id - instance.user_id = self.user_id - instance.image_ref = image_ref - instance.kernel_id = kernel_id - instance.ramdisk_id = ramdisk_id - instance.root_gb = 20 - instance.ephemeral_gb = 0 - instance.instance_type_id = instance_type_id - instance.os_type = os_type - instance.hostname = hostname - instance.key_data = key_data - instance.architecture = architecture - instance.system_metadata = get_create_system_metadata( - self.context, instance_type_id) - instance.create() - else: - instance = objects.Instance.get_by_id(self.context, instance_id) - - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - if empty_dns: - # NOTE(tr3buchet): this is a terrible way to do this... - network_info[0]['network']['subnets'][0]['dns'] = [] - - image_meta = {} - if image_ref: - image_meta = IMAGE_FIXTURES[image_ref]["image_meta"] - self.conn.spawn(self.context, instance, image_meta, injected_files, - 'herp', network_info, block_device_info) - self.create_vm_record(self.conn, os_type, instance['name']) - self.check_vm_record(self.conn, instance_type_id, check_injection) - self.assertEqual(instance['os_type'], os_type) - self.assertEqual(instance['architecture'], architecture) - - def test_spawn_ipxe_iso_success(self): - self.mox.StubOutWithMock(vm_utils, 'get_sr_path') - vm_utils.get_sr_path(mox.IgnoreArg()).AndReturn('/sr/path') - - self.flags(ipxe_network_name='test1', - ipxe_boot_menu_url='http://boot.example.com', - ipxe_mkisofs_cmd='/root/mkisofs', - group='xenserver') - self.mox.StubOutWithMock(self.conn._session, 'call_plugin_serialized') - self.conn._session.call_plugin_serialized( - 'ipxe', 'inject', '/sr/path', mox.IgnoreArg(), - 'http://boot.example.com', '192.168.1.100', '255.255.255.0', - '192.168.1.1', '192.168.1.3', '/root/mkisofs') - - self.mox.ReplayAll() - self._test_spawn(IMAGE_IPXE_ISO, None, None) - - def test_spawn_ipxe_iso_no_network_name(self): - self.flags(ipxe_network_name=None, - ipxe_boot_menu_url='http://boot.example.com', - group='xenserver') - - # call_plugin_serialized shouldn't be called - self.mox.StubOutWithMock(self.conn._session, 'call_plugin_serialized') - - self.mox.ReplayAll() - self._test_spawn(IMAGE_IPXE_ISO, None, None) - - def test_spawn_ipxe_iso_no_boot_menu_url(self): - self.flags(ipxe_network_name='test1', - ipxe_boot_menu_url=None, - group='xenserver') - - # call_plugin_serialized shouldn't be called - self.mox.StubOutWithMock(self.conn._session, 'call_plugin_serialized') - - self.mox.ReplayAll() - self._test_spawn(IMAGE_IPXE_ISO, None, None) - - def test_spawn_ipxe_iso_unknown_network_name(self): - self.flags(ipxe_network_name='test2', - ipxe_boot_menu_url='http://boot.example.com', - group='xenserver') - - # call_plugin_serialized shouldn't be called - self.mox.StubOutWithMock(self.conn._session, 'call_plugin_serialized') - - self.mox.ReplayAll() - self._test_spawn(IMAGE_IPXE_ISO, None, None) - - def test_spawn_empty_dns(self): - # Test spawning with an empty dns list. - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64", - empty_dns=True) - self.check_vm_params_for_linux() - - def test_spawn_not_enough_memory(self): - self.assertRaises(exception.InsufficientFreeMemory, - self._test_spawn, - '1', 2, 3, "4") # m1.xlarge - - def test_spawn_fail_cleanup_1(self): - """Simulates an error while downloading an image. - - Verifies that the VM and VDIs created are properly cleaned up. - """ - vdi_recs_start = self._list_vdis() - start_vms = self._list_vms() - stubs.stubout_fetch_disk_image(self.stubs, raise_failure=True) - self.assertRaises(xenapi_fake.Failure, - self._test_spawn, '1', 2, 3) - # No additional VDI should be found. - vdi_recs_end = self._list_vdis() - end_vms = self._list_vms() - self._check_vdis(vdi_recs_start, vdi_recs_end) - # No additional VMs should be found. - self.assertEqual(start_vms, end_vms) - - def test_spawn_fail_cleanup_2(self): - """Simulates an error while creating VM record. - - Verifies that the VM and VDIs created are properly cleaned up. - """ - vdi_recs_start = self._list_vdis() - start_vms = self._list_vms() - stubs.stubout_create_vm(self.stubs) - self.assertRaises(xenapi_fake.Failure, - self._test_spawn, '1', 2, 3) - # No additional VDI should be found. - vdi_recs_end = self._list_vdis() - end_vms = self._list_vms() - self._check_vdis(vdi_recs_start, vdi_recs_end) - # No additional VMs should be found. - self.assertEqual(start_vms, end_vms) - - def test_spawn_fail_cleanup_3(self): - """Simulates an error while attaching disks. - - Verifies that the VM and VDIs created are properly cleaned up. - """ - stubs.stubout_attach_disks(self.stubs) - vdi_recs_start = self._list_vdis() - start_vms = self._list_vms() - self.assertRaises(xenapi_fake.Failure, - self._test_spawn, '1', 2, 3) - # No additional VDI should be found. - vdi_recs_end = self._list_vdis() - end_vms = self._list_vms() - self._check_vdis(vdi_recs_start, vdi_recs_end) - # No additional VMs should be found. - self.assertEqual(start_vms, end_vms) - - def test_spawn_raw_glance(self): - self._test_spawn(IMAGE_RAW, None, None, os_type=None) - self.check_vm_params_for_windows() - - def test_spawn_vhd_glance_linux(self): - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64") - self.check_vm_params_for_linux() - - def test_spawn_vhd_glance_windows(self): - self._test_spawn(IMAGE_VHD, None, None, - os_type="windows", architecture="i386", - instance_type_id=5) - self.check_vm_params_for_windows() - - def test_spawn_iso_glance(self): - self._test_spawn(IMAGE_ISO, None, None, - os_type="windows", architecture="i386") - self.check_vm_params_for_windows() - - def test_spawn_glance(self): - - def fake_fetch_disk_image(context, session, instance, name_label, - image_id, image_type): - sr_ref = vm_utils.safe_find_sr(session) - image_type_str = vm_utils.ImageType.to_string(image_type) - vdi_ref = vm_utils.create_vdi(session, sr_ref, instance, - name_label, image_type_str, "20") - vdi_role = vm_utils.ImageType.get_role(image_type) - vdi_uuid = session.call_xenapi("VDI.get_uuid", vdi_ref) - return {vdi_role: dict(uuid=vdi_uuid, file=None)} - self.stubs.Set(vm_utils, '_fetch_disk_image', - fake_fetch_disk_image) - - self._test_spawn(IMAGE_MACHINE, - IMAGE_KERNEL, - IMAGE_RAMDISK) - self.check_vm_params_for_linux_with_external_kernel() - - def test_spawn_boot_from_volume_no_image_meta(self): - dev_info = get_fake_device_info() - self._test_spawn(None, None, None, - block_device_info=dev_info) - - def test_spawn_boot_from_volume_no_glance_image_meta(self): - dev_info = get_fake_device_info() - self._test_spawn(IMAGE_FROM_VOLUME, None, None, - block_device_info=dev_info) - - def test_spawn_boot_from_volume_with_image_meta(self): - dev_info = get_fake_device_info() - self._test_spawn(IMAGE_VHD, None, None, - block_device_info=dev_info) - - def test_spawn_netinject_file(self): - self.flags(flat_injected=True) - db_fakes.stub_out_db_instance_api(self.stubs, injected=True) - - self._tee_executed = False - - def _tee_handler(cmd, **kwargs): - actual = kwargs.get('process_input', None) - expected = """\ -# Injected by Nova on instance boot -# -# This file describes the network interfaces available on your system -# and how to activate them. For more information, see interfaces(5). - -# The loopback network interface -auto lo -iface lo inet loopback - -auto eth0 -iface eth0 inet static - address 192.168.1.100 - netmask 255.255.255.0 - broadcast 192.168.1.255 - gateway 192.168.1.1 - dns-nameservers 192.168.1.3 192.168.1.4 -iface eth0 inet6 static - address 2001:db8:0:1:dcad:beff:feef:1 - netmask 64 - gateway 2001:db8:0:1::1 -""" - self.assertEqual(expected, actual) - self._tee_executed = True - return '', '' - - def _readlink_handler(cmd_parts, **kwargs): - return os.path.realpath(cmd_parts[2]), '' - - fake_processutils.fake_execute_set_repliers([ - # Capture the tee .../etc/network/interfaces command - (r'tee.*interfaces', _tee_handler), - (r'readlink -nm.*', _readlink_handler), - ]) - self._test_spawn(IMAGE_MACHINE, - IMAGE_KERNEL, - IMAGE_RAMDISK, - check_injection=True) - self.assertTrue(self._tee_executed) - - def test_spawn_netinject_xenstore(self): - db_fakes.stub_out_db_instance_api(self.stubs, injected=True) - - self._tee_executed = False - - def _mount_handler(cmd, *ignore_args, **ignore_kwargs): - # When mounting, create real files under the mountpoint to simulate - # files in the mounted filesystem - - # mount point will be the last item of the command list - self._tmpdir = cmd[len(cmd) - 1] - LOG.debug('Creating files in %s to simulate guest agent', - self._tmpdir) - os.makedirs(os.path.join(self._tmpdir, 'usr', 'sbin')) - # Touch the file using open - open(os.path.join(self._tmpdir, 'usr', 'sbin', - 'xe-update-networking'), 'w').close() - return '', '' - - def _umount_handler(cmd, *ignore_args, **ignore_kwargs): - # Umount would normally make files in the mounted filesystem - # disappear, so do that here - LOG.debug('Removing simulated guest agent files in %s', - self._tmpdir) - os.remove(os.path.join(self._tmpdir, 'usr', 'sbin', - 'xe-update-networking')) - os.rmdir(os.path.join(self._tmpdir, 'usr', 'sbin')) - os.rmdir(os.path.join(self._tmpdir, 'usr')) - return '', '' - - def _tee_handler(cmd, *ignore_args, **ignore_kwargs): - self._tee_executed = True - return '', '' - - fake_processutils.fake_execute_set_repliers([ - (r'mount', _mount_handler), - (r'umount', _umount_handler), - (r'tee.*interfaces', _tee_handler)]) - self._test_spawn('1', 2, 3, check_injection=True) - - # tee must not run in this case, where an injection-capable - # guest agent is detected - self.assertFalse(self._tee_executed) - - def test_spawn_injects_auto_disk_config_to_xenstore(self): - instance = self._create_instance(spawn=False) - self.mox.StubOutWithMock(self.conn._vmops, '_inject_auto_disk_config') - self.conn._vmops._inject_auto_disk_config(instance, mox.IgnoreArg()) - self.mox.ReplayAll() - self.conn.spawn(self.context, instance, - IMAGE_FIXTURES['1']["image_meta"], [], 'herp', '') - - def test_spawn_vlanmanager(self): - self.flags(network_manager='nova.network.manager.VlanManager', - vlan_interface='fake0') - - def dummy(*args, **kwargs): - pass - - self.stubs.Set(vmops.VMOps, '_create_vifs', dummy) - # Reset network table - xenapi_fake.reset_table('network') - # Instance id = 2 will use vlan network (see db/fakes.py) - ctxt = self.context.elevated() - self.network.conductor_api = conductor_api.LocalAPI() - self._create_instance(2, False) - networks = self.network.db.network_get_all(ctxt) - with mock.patch('nova.objects.network.Network._from_db_object'): - for network in networks: - self.network.set_network_host(ctxt, network) - - self.network.allocate_for_instance(ctxt, - instance_id=2, - instance_uuid='00000000-0000-0000-0000-000000000002', - host=CONF.host, - vpn=None, - rxtx_factor=3, - project_id=self.project_id, - macs=None) - self._test_spawn(IMAGE_MACHINE, - IMAGE_KERNEL, - IMAGE_RAMDISK, - instance_id=2, - create_record=False) - # TODO(salvatore-orlando): a complete test here would require - # a check for making sure the bridge for the VM's VIF is - # consistent with bridge specified in nova db - - def test_spawn_with_network_qos(self): - self._create_instance() - for vif_ref in xenapi_fake.get_all('VIF'): - vif_rec = xenapi_fake.get_record('VIF', vif_ref) - self.assertEqual(vif_rec['qos_algorithm_type'], 'ratelimit') - self.assertEqual(vif_rec['qos_algorithm_params']['kbps'], - str(3 * 10 * 1024)) - - def test_spawn_ssh_key_injection(self): - # Test spawning with key_data on an instance. Should use - # agent file injection. - self.flags(use_agent_default=True, - group='xenserver') - actual_injected_files = [] - - def fake_inject_file(self, method, args): - path = base64.b64decode(args['b64_path']) - contents = base64.b64decode(args['b64_contents']) - actual_injected_files.append((path, contents)) - return jsonutils.dumps({'returncode': '0', 'message': 'success'}) - - self.stubs.Set(stubs.FakeSessionForVMTests, - '_plugin_agent_inject_file', fake_inject_file) - - def fake_encrypt_text(sshkey, new_pass): - self.assertEqual("ssh-rsa fake_keydata", sshkey) - return "fake" - - self.stubs.Set(crypto, 'ssh_encrypt_text', fake_encrypt_text) - - expected_data = ('\n# The following ssh key was injected by ' - 'Nova\nssh-rsa fake_keydata\n') - - injected_files = [('/root/.ssh/authorized_keys', expected_data)] - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64", - key_data='ssh-rsa fake_keydata') - self.assertEqual(actual_injected_files, injected_files) - - def test_spawn_ssh_key_injection_non_rsa(self): - # Test spawning with key_data on an instance. Should use - # agent file injection. - self.flags(use_agent_default=True, - group='xenserver') - actual_injected_files = [] - - def fake_inject_file(self, method, args): - path = base64.b64decode(args['b64_path']) - contents = base64.b64decode(args['b64_contents']) - actual_injected_files.append((path, contents)) - return jsonutils.dumps({'returncode': '0', 'message': 'success'}) - - self.stubs.Set(stubs.FakeSessionForVMTests, - '_plugin_agent_inject_file', fake_inject_file) - - def fake_encrypt_text(sshkey, new_pass): - raise NotImplementedError("Should not be called") - - self.stubs.Set(crypto, 'ssh_encrypt_text', fake_encrypt_text) - - expected_data = ('\n# The following ssh key was injected by ' - 'Nova\nssh-dsa fake_keydata\n') - - injected_files = [('/root/.ssh/authorized_keys', expected_data)] - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64", - key_data='ssh-dsa fake_keydata') - self.assertEqual(actual_injected_files, injected_files) - - def test_spawn_injected_files(self): - # Test spawning with injected_files. - self.flags(use_agent_default=True, - group='xenserver') - actual_injected_files = [] - - def fake_inject_file(self, method, args): - path = base64.b64decode(args['b64_path']) - contents = base64.b64decode(args['b64_contents']) - actual_injected_files.append((path, contents)) - return jsonutils.dumps({'returncode': '0', 'message': 'success'}) - self.stubs.Set(stubs.FakeSessionForVMTests, - '_plugin_agent_inject_file', fake_inject_file) - - injected_files = [('/tmp/foo', 'foobar')] - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64", - injected_files=injected_files) - self.check_vm_params_for_linux() - self.assertEqual(actual_injected_files, injected_files) - - @mock.patch('nova.db.agent_build_get_by_triple') - def test_spawn_agent_upgrade(self, mock_get): - self.flags(use_agent_default=True, - group='xenserver') - - mock_get.return_value = {"version": "1.1.0", "architecture": "x86-64", - "hypervisor": "xen", "os": "windows", - "url": "url", "md5hash": "asdf", - 'created_at': None, 'updated_at': None, - 'deleted_at': None, 'deleted': False, - 'id': 1} - - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64") - - @mock.patch('nova.db.agent_build_get_by_triple') - def test_spawn_agent_upgrade_fails_silently(self, mock_get): - mock_get.return_value = {"version": "1.1.0", "architecture": "x86-64", - "hypervisor": "xen", "os": "windows", - "url": "url", "md5hash": "asdf", - 'created_at': None, 'updated_at': None, - 'deleted_at': None, 'deleted': False, - 'id': 1} - - self._test_spawn_fails_silently_with(exception.AgentError, - method="_plugin_agent_agentupdate", failure="fake_error") - - def test_spawn_with_resetnetwork_alternative_returncode(self): - self.flags(use_agent_default=True, - group='xenserver') - - def fake_resetnetwork(self, method, args): - fake_resetnetwork.called = True - # NOTE(johngarbutt): as returned by FreeBSD and Gentoo - return jsonutils.dumps({'returncode': '500', - 'message': 'success'}) - self.stubs.Set(stubs.FakeSessionForVMTests, - '_plugin_agent_resetnetwork', fake_resetnetwork) - fake_resetnetwork.called = False - - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64") - self.assertTrue(fake_resetnetwork.called) - - def _test_spawn_fails_silently_with(self, expected_exception_cls, - method="_plugin_agent_version", - failure=None, value=None): - self.flags(use_agent_default=True, - agent_version_timeout=0, - group='xenserver') - - def fake_agent_call(self, method, args): - if failure: - raise xenapi_fake.Failure([failure]) - else: - return value - - self.stubs.Set(stubs.FakeSessionForVMTests, - method, fake_agent_call) - - called = {} - - def fake_add_instance_fault(*args, **kwargs): - called["fake_add_instance_fault"] = args[2] - - self.stubs.Set(compute_utils, 'add_instance_fault_from_exc', - fake_add_instance_fault) - - self._test_spawn(IMAGE_VHD, None, None, - os_type="linux", architecture="x86-64") - actual_exception = called["fake_add_instance_fault"] - self.assertIsInstance(actual_exception, expected_exception_cls) - - def test_spawn_fails_silently_with_agent_timeout(self): - self._test_spawn_fails_silently_with(exception.AgentTimeout, - failure="TIMEOUT:fake") - - def test_spawn_fails_silently_with_agent_not_implemented(self): - self._test_spawn_fails_silently_with(exception.AgentNotImplemented, - failure="NOT IMPLEMENTED:fake") - - def test_spawn_fails_silently_with_agent_error(self): - self._test_spawn_fails_silently_with(exception.AgentError, - failure="fake_error") - - def test_spawn_fails_silently_with_agent_bad_return(self): - error = jsonutils.dumps({'returncode': -1, 'message': 'fake'}) - self._test_spawn_fails_silently_with(exception.AgentError, - value=error) - - def test_rescue(self): - instance = self._create_instance(spawn=False) - xenapi_fake.create_vm(instance['name'], 'Running') - - session = get_session() - vm_ref = vm_utils.lookup(session, instance['name']) - - swap_vdi_ref = xenapi_fake.create_vdi('swap', None) - root_vdi_ref = xenapi_fake.create_vdi('root', None) - eph1_vdi_ref = xenapi_fake.create_vdi('eph', None) - eph2_vdi_ref = xenapi_fake.create_vdi('eph', None) - vol_vdi_ref = xenapi_fake.create_vdi('volume', None) - - xenapi_fake.create_vbd(vm_ref, swap_vdi_ref, userdevice=2) - xenapi_fake.create_vbd(vm_ref, root_vdi_ref, userdevice=0) - xenapi_fake.create_vbd(vm_ref, eph1_vdi_ref, userdevice=4) - xenapi_fake.create_vbd(vm_ref, eph2_vdi_ref, userdevice=5) - xenapi_fake.create_vbd(vm_ref, vol_vdi_ref, userdevice=6, - other_config={'osvol': True}) - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - image_meta = {'id': IMAGE_VHD, - 'disk_format': 'vhd'} - conn.rescue(self.context, instance, [], image_meta, '') - - vm = xenapi_fake.get_record('VM', vm_ref) - rescue_name = "%s-rescue" % vm["name_label"] - rescue_ref = vm_utils.lookup(session, rescue_name) - rescue_vm = xenapi_fake.get_record('VM', rescue_ref) - - vdi_refs = {} - for vbd_ref in rescue_vm['VBDs']: - vbd = xenapi_fake.get_record('VBD', vbd_ref) - vdi_refs[vbd['VDI']] = vbd['userdevice'] - - self.assertEqual('1', vdi_refs[root_vdi_ref]) - self.assertEqual('2', vdi_refs[swap_vdi_ref]) - self.assertEqual('4', vdi_refs[eph1_vdi_ref]) - self.assertEqual('5', vdi_refs[eph2_vdi_ref]) - self.assertNotIn(vol_vdi_ref, vdi_refs) - - def test_rescue_preserve_disk_on_failure(self): - # test that the original disk is preserved if rescue setup fails - # bug #1227898 - instance = self._create_instance() - session = get_session() - image_meta = {'id': IMAGE_VHD, - 'disk_format': 'vhd'} - - vm_ref = vm_utils.lookup(session, instance['name']) - vdi_ref, vdi_rec = vm_utils.get_vdi_for_vm_safely(session, vm_ref) - - # raise an error in the spawn setup process and trigger the - # undo manager logic: - def fake_start(*args, **kwargs): - raise test.TestingException('Start Error') - - self.stubs.Set(self.conn._vmops, '_start', fake_start) - - self.assertRaises(test.TestingException, self.conn.rescue, - self.context, instance, [], image_meta, '') - - # confirm original disk still exists: - vdi_ref2, vdi_rec2 = vm_utils.get_vdi_for_vm_safely(session, vm_ref) - self.assertEqual(vdi_ref, vdi_ref2) - self.assertEqual(vdi_rec['uuid'], vdi_rec2['uuid']) - - def test_unrescue(self): - instance = self._create_instance() - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - # Unrescue expects the original instance to be powered off - conn.power_off(instance) - xenapi_fake.create_vm(instance['name'] + '-rescue', 'Running') - conn.unrescue(instance, None) - - def test_unrescue_not_in_rescue(self): - instance = self._create_instance() - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - # Ensure that it will not unrescue a non-rescued instance. - self.assertRaises(exception.InstanceNotInRescueMode, conn.unrescue, - instance, None) - - def test_finish_revert_migration(self): - instance = self._create_instance() - - class VMOpsMock(): - - def __init__(self): - self.finish_revert_migration_called = False - - def finish_revert_migration(self, context, instance, block_info, - power_on): - self.finish_revert_migration_called = True - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - conn._vmops = VMOpsMock() - conn.finish_revert_migration(self.context, instance, None) - self.assertTrue(conn._vmops.finish_revert_migration_called) - - def test_reboot_hard(self): - instance = self._create_instance() - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - conn.reboot(self.context, instance, None, "HARD") - - def test_poll_rebooting_instances(self): - self.mox.StubOutWithMock(compute_api.API, 'reboot') - compute_api.API.reboot(mox.IgnoreArg(), mox.IgnoreArg(), - mox.IgnoreArg()) - self.mox.ReplayAll() - instance = self._create_instance() - instances = [instance] - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - conn.poll_rebooting_instances(60, instances) - - def test_reboot_soft(self): - instance = self._create_instance() - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - conn.reboot(self.context, instance, None, "SOFT") - - def test_reboot_halted(self): - session = get_session() - instance = self._create_instance(spawn=False) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - xenapi_fake.create_vm(instance['name'], 'Halted') - conn.reboot(self.context, instance, None, "SOFT") - vm_ref = vm_utils.lookup(session, instance['name']) - vm = xenapi_fake.get_record('VM', vm_ref) - self.assertEqual(vm['power_state'], 'Running') - - def test_reboot_unknown_state(self): - instance = self._create_instance(spawn=False) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - xenapi_fake.create_vm(instance['name'], 'Unknown') - self.assertRaises(xenapi_fake.Failure, conn.reboot, self.context, - instance, None, "SOFT") - - def test_reboot_rescued(self): - instance = self._create_instance() - instance['vm_state'] = vm_states.RESCUED - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - real_result = vm_utils.lookup(conn._session, instance['name']) - - self.mox.StubOutWithMock(vm_utils, 'lookup') - vm_utils.lookup(conn._session, instance['name'], - True).AndReturn(real_result) - self.mox.ReplayAll() - - conn.reboot(self.context, instance, None, "SOFT") - - def test_get_console_output_succeeds(self): - - def fake_get_console_output(instance): - self.assertEqual("instance", instance) - return "console_log" - self.stubs.Set(self.conn._vmops, 'get_console_output', - fake_get_console_output) - - self.assertEqual(self.conn.get_console_output('context', "instance"), - "console_log") - - def _test_maintenance_mode(self, find_host, find_aggregate): - real_call_xenapi = self.conn._session.call_xenapi - instance = self._create_instance(spawn=True) - api_calls = {} - - # Record all the xenapi calls, and return a fake list of hosts - # for the host.get_all call - def fake_call_xenapi(method, *args): - api_calls[method] = args - if method == 'host.get_all': - return ['foo', 'bar', 'baz'] - return real_call_xenapi(method, *args) - self.stubs.Set(self.conn._session, 'call_xenapi', fake_call_xenapi) - - def fake_aggregate_get(context, host, key): - if find_aggregate: - return [test_aggregate.fake_aggregate] - else: - return [] - self.stubs.Set(db, 'aggregate_get_by_host', - fake_aggregate_get) - - def fake_host_find(context, session, src, dst): - if find_host: - return 'bar' - else: - raise exception.NoValidHost("I saw this one coming...") - self.stubs.Set(host, '_host_find', fake_host_find) - - result = self.conn.host_maintenance_mode('bar', 'on_maintenance') - self.assertEqual(result, 'on_maintenance') - - # We expect the VM.pool_migrate call to have been called to - # migrate our instance to the 'bar' host - vm_ref = vm_utils.lookup(self.conn._session, instance['name']) - host_ref = "foo" - expected = (vm_ref, host_ref, {"live": "true"}) - self.assertEqual(api_calls.get('VM.pool_migrate'), expected) - - instance = db.instance_get_by_uuid(self.context, instance['uuid']) - self.assertEqual(instance['vm_state'], vm_states.ACTIVE) - self.assertEqual(instance['task_state'], task_states.MIGRATING) - - def test_maintenance_mode(self): - self._test_maintenance_mode(True, True) - - def test_maintenance_mode_no_host(self): - self.assertRaises(exception.NoValidHost, - self._test_maintenance_mode, False, True) - - def test_maintenance_mode_no_aggregate(self): - self.assertRaises(exception.NotFound, - self._test_maintenance_mode, True, False) - - def test_uuid_find(self): - self.mox.StubOutWithMock(db, 'instance_get_all_by_host') - fake_inst = fake_instance.fake_db_instance(id=123) - fake_inst2 = fake_instance.fake_db_instance(id=456) - db.instance_get_all_by_host(self.context, fake_inst['host'], - columns_to_join=None, - use_slave=False - ).AndReturn([fake_inst, fake_inst2]) - self.mox.ReplayAll() - expected_name = CONF.instance_name_template % fake_inst['id'] - inst_uuid = host._uuid_find(self.context, fake_inst['host'], - expected_name) - self.assertEqual(inst_uuid, fake_inst['uuid']) - - def test_session_virtapi(self): - was = {'called': False} - - def fake_aggregate_get_by_host(self, *args, **kwargs): - was['called'] = True - raise test.TestingException() - self.stubs.Set(db, "aggregate_get_by_host", - fake_aggregate_get_by_host) - - self.stubs.Set(self.conn._session, "is_slave", True) - - self.assertRaises(test.TestingException, - self.conn._session._get_host_uuid) - self.assertTrue(was['called']) - - def test_per_instance_usage_running(self): - instance = self._create_instance(spawn=True) - flavor = flavors.get_flavor(3) - - expected = {instance['uuid']: {'memory_mb': flavor['memory_mb'], - 'uuid': instance['uuid']}} - actual = self.conn.get_per_instance_usage() - self.assertEqual(expected, actual) - - # Paused instances still consume resources: - self.conn.pause(instance) - actual = self.conn.get_per_instance_usage() - self.assertEqual(expected, actual) - - def test_per_instance_usage_suspended(self): - # Suspended instances do not consume memory: - instance = self._create_instance(spawn=True) - self.conn.suspend(instance) - actual = self.conn.get_per_instance_usage() - self.assertEqual({}, actual) - - def test_per_instance_usage_halted(self): - instance = self._create_instance(spawn=True) - self.conn.power_off(instance) - actual = self.conn.get_per_instance_usage() - self.assertEqual({}, actual) - - def _create_instance(self, instance_id=1, spawn=True, obj=False, **attrs): - """Creates and spawns a test instance.""" - instance_values = { - 'id': instance_id, - 'uuid': '00000000-0000-0000-0000-00000000000%d' % instance_id, - 'display_name': 'host-%d' % instance_id, - 'project_id': self.project_id, - 'user_id': self.user_id, - 'image_ref': 1, - 'kernel_id': 2, - 'ramdisk_id': 3, - 'root_gb': 80, - 'ephemeral_gb': 0, - 'instance_type_id': '3', # m1.large - 'os_type': 'linux', - 'vm_mode': 'hvm', - 'architecture': 'x86-64'} - instance_values.update(attrs) - - instance = create_instance_with_system_metadata(self.context, - instance_values) - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - image_meta = {'id': IMAGE_VHD, - 'disk_format': 'vhd'} - if spawn: - self.conn.spawn(self.context, instance, image_meta, [], 'herp', - network_info) - if obj: - instance = objects.Instance._from_db_object( - self.context, objects.Instance(), instance, - expected_attrs=instance_obj.INSTANCE_DEFAULT_FIELDS) - return instance - - def test_destroy_clean_up_kernel_and_ramdisk(self): - def fake_lookup_kernel_ramdisk(session, vm_ref): - return "kernel", "ramdisk" - - self.stubs.Set(vm_utils, "lookup_kernel_ramdisk", - fake_lookup_kernel_ramdisk) - - def fake_destroy_kernel_ramdisk(session, instance, kernel, ramdisk): - fake_destroy_kernel_ramdisk.called = True - self.assertEqual("kernel", kernel) - self.assertEqual("ramdisk", ramdisk) - - fake_destroy_kernel_ramdisk.called = False - - self.stubs.Set(vm_utils, "destroy_kernel_ramdisk", - fake_destroy_kernel_ramdisk) - - instance = self._create_instance(spawn=True) - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - self.conn.destroy(self.context, instance, network_info) - - vm_ref = vm_utils.lookup(self.conn._session, instance['name']) - self.assertIsNone(vm_ref) - self.assertTrue(fake_destroy_kernel_ramdisk.called) - - -class XenAPIDiffieHellmanTestCase(test.NoDBTestCase): - """Unit tests for Diffie-Hellman code.""" - def setUp(self): - super(XenAPIDiffieHellmanTestCase, self).setUp() - self.alice = agent.SimpleDH() - self.bob = agent.SimpleDH() - - def test_shared(self): - alice_pub = self.alice.get_public() - bob_pub = self.bob.get_public() - alice_shared = self.alice.compute_shared(bob_pub) - bob_shared = self.bob.compute_shared(alice_pub) - self.assertEqual(alice_shared, bob_shared) - - def _test_encryption(self, message): - enc = self.alice.encrypt(message) - self.assertFalse(enc.endswith('\n')) - dec = self.bob.decrypt(enc) - self.assertEqual(dec, message) - - def test_encrypt_simple_message(self): - self._test_encryption('This is a simple message.') - - def test_encrypt_message_with_newlines_at_end(self): - self._test_encryption('This message has a newline at the end.\n') - - def test_encrypt_many_newlines_at_end(self): - self._test_encryption('Message with lotsa newlines.\n\n\n') - - def test_encrypt_newlines_inside_message(self): - self._test_encryption('Message\nwith\ninterior\nnewlines.') - - def test_encrypt_with_leading_newlines(self): - self._test_encryption('\n\nMessage with leading newlines.') - - def test_encrypt_really_long_message(self): - self._test_encryption(''.join(['abcd' for i in xrange(1024)])) - - -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIMigrateInstance(stubs.XenAPITestBase): - """Unit test for verifying migration-related actions.""" - - REQUIRES_LOCKING = True - - def setUp(self): - super(XenAPIMigrateInstance, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - db_fakes.stub_out_db_instance_api(self.stubs) - xenapi_fake.create_network('fake', 'fake_br1') - self.user_id = 'fake' - self.project_id = 'fake' - self.context = context.RequestContext(self.user_id, self.project_id) - self.instance_values = {'id': 1, - 'project_id': self.project_id, - 'user_id': self.user_id, - 'image_ref': 1, - 'kernel_id': None, - 'ramdisk_id': None, - 'root_gb': 80, - 'ephemeral_gb': 0, - 'instance_type_id': '3', # m1.large - 'os_type': 'linux', - 'architecture': 'x86-64'} - - migration_values = { - 'source_compute': 'nova-compute', - 'dest_compute': 'nova-compute', - 'dest_host': '10.127.5.114', - 'status': 'post-migrating', - 'instance_uuid': '15f23e6a-cc6e-4d22-b651-d9bdaac316f7', - 'old_instance_type_id': 5, - 'new_instance_type_id': 1 - } - self.migration = db.migration_create( - context.get_admin_context(), migration_values) - - fake_processutils.stub_out_processutils_execute(self.stubs) - stubs.stub_out_migration_methods(self.stubs) - stubs.stubout_get_this_vm_uuid(self.stubs) - - def fake_inject_instance_metadata(self, instance, vm): - pass - self.stubs.Set(vmops.VMOps, '_inject_instance_metadata', - fake_inject_instance_metadata) - - def test_migrate_disk_and_power_off(self): - instance = db.instance_create(self.context, self.instance_values) - xenapi_fake.create_vm(instance['name'], 'Running') - flavor = {"root_gb": 80, 'ephemeral_gb': 0} - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - conn.migrate_disk_and_power_off(self.context, instance, - '127.0.0.1', flavor, None) - - def test_migrate_disk_and_power_off_passes_exceptions(self): - instance = db.instance_create(self.context, self.instance_values) - xenapi_fake.create_vm(instance['name'], 'Running') - flavor = {"root_gb": 80, 'ephemeral_gb': 0} - - def fake_raise(*args, **kwargs): - raise exception.MigrationError(reason='test failure') - self.stubs.Set(vmops.VMOps, "_migrate_disk_resizing_up", fake_raise) - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.assertRaises(exception.MigrationError, - conn.migrate_disk_and_power_off, - self.context, instance, - '127.0.0.1', flavor, None) - - def test_migrate_disk_and_power_off_throws_on_zero_gb_resize_down(self): - instance = db.instance_create(self.context, self.instance_values) - flavor = {"root_gb": 0, 'ephemeral_gb': 0} - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.assertRaises(exception.ResizeError, - conn.migrate_disk_and_power_off, - self.context, instance, - 'fake_dest', flavor, None) - - def test_migrate_disk_and_power_off_with_zero_gb_old_and_new_works(self): - flavor = {"root_gb": 0, 'ephemeral_gb': 0} - values = copy.copy(self.instance_values) - values["root_gb"] = 0 - values["ephemeral_gb"] = 0 - instance = db.instance_create(self.context, values) - xenapi_fake.create_vm(instance['name'], 'Running') - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - conn.migrate_disk_and_power_off(self.context, instance, - '127.0.0.1', flavor, None) - - def _test_revert_migrate(self, power_on): - instance = create_instance_with_system_metadata(self.context, - self.instance_values) - self.called = False - self.fake_vm_start_called = False - self.fake_finish_revert_migration_called = False - context = 'fake_context' - - def fake_vm_start(*args, **kwargs): - self.fake_vm_start_called = True - - def fake_vdi_resize(*args, **kwargs): - self.called = True - - def fake_finish_revert_migration(*args, **kwargs): - self.fake_finish_revert_migration_called = True - - self.stubs.Set(stubs.FakeSessionForVMTests, - "VDI_resize_online", fake_vdi_resize) - self.stubs.Set(vmops.VMOps, '_start', fake_vm_start) - self.stubs.Set(vmops.VMOps, 'finish_revert_migration', - fake_finish_revert_migration) - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests, - product_version=(4, 0, 0), - product_brand='XenServer') - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - image_meta = {'id': instance['image_ref'], 'disk_format': 'vhd'} - base = xenapi_fake.create_vdi('hurr', 'fake') - base_uuid = xenapi_fake.get_record('VDI', base)['uuid'] - cow = xenapi_fake.create_vdi('durr', 'fake') - cow_uuid = xenapi_fake.get_record('VDI', cow)['uuid'] - conn.finish_migration(self.context, self.migration, instance, - dict(base_copy=base_uuid, cow=cow_uuid), - network_info, image_meta, resize_instance=True, - block_device_info=None, power_on=power_on) - self.assertEqual(self.called, True) - self.assertEqual(self.fake_vm_start_called, power_on) - - conn.finish_revert_migration(context, instance, network_info) - self.assertEqual(self.fake_finish_revert_migration_called, True) - - def test_revert_migrate_power_on(self): - self._test_revert_migrate(True) - - def test_revert_migrate_power_off(self): - self._test_revert_migrate(False) - - def _test_finish_migrate(self, power_on): - instance = create_instance_with_system_metadata(self.context, - self.instance_values) - self.called = False - self.fake_vm_start_called = False - - def fake_vm_start(*args, **kwargs): - self.fake_vm_start_called = True - - def fake_vdi_resize(*args, **kwargs): - self.called = True - - self.stubs.Set(vmops.VMOps, '_start', fake_vm_start) - self.stubs.Set(stubs.FakeSessionForVMTests, - "VDI_resize_online", fake_vdi_resize) - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests, - product_version=(4, 0, 0), - product_brand='XenServer') - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - image_meta = {'id': instance['image_ref'], 'disk_format': 'vhd'} - conn.finish_migration(self.context, self.migration, instance, - dict(base_copy='hurr', cow='durr'), - network_info, image_meta, resize_instance=True, - block_device_info=None, power_on=power_on) - self.assertEqual(self.called, True) - self.assertEqual(self.fake_vm_start_called, power_on) - - def test_finish_migrate_power_on(self): - self._test_finish_migrate(True) - - def test_finish_migrate_power_off(self): - self._test_finish_migrate(False) - - def test_finish_migrate_no_local_storage(self): - values = copy.copy(self.instance_values) - values["root_gb"] = 0 - values["ephemeral_gb"] = 0 - instance = create_instance_with_system_metadata(self.context, values) - - def fake_vdi_resize(*args, **kwargs): - raise Exception("This shouldn't be called") - - self.stubs.Set(stubs.FakeSessionForVMTests, - "VDI_resize_online", fake_vdi_resize) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - image_meta = {'id': instance['image_ref'], 'disk_format': 'vhd'} - conn.finish_migration(self.context, self.migration, instance, - dict(base_copy='hurr', cow='durr'), - network_info, image_meta, resize_instance=True) - - def test_finish_migrate_no_resize_vdi(self): - instance = create_instance_with_system_metadata(self.context, - self.instance_values) - - def fake_vdi_resize(*args, **kwargs): - raise Exception("This shouldn't be called") - - self.stubs.Set(stubs.FakeSessionForVMTests, - "VDI_resize_online", fake_vdi_resize) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - network_info = fake_network.fake_get_instance_nw_info(self.stubs) - # Resize instance would be determined by the compute call - image_meta = {'id': instance['image_ref'], 'disk_format': 'vhd'} - conn.finish_migration(self.context, self.migration, instance, - dict(base_copy='hurr', cow='durr'), - network_info, image_meta, resize_instance=False) - - @stub_vm_utils_with_vdi_attached_here - def test_migrate_too_many_partitions_no_resize_down(self): - instance_values = self.instance_values - instance = db.instance_create(self.context, instance_values) - xenapi_fake.create_vm(instance['name'], 'Running') - flavor = db.flavor_get_by_name(self.context, 'm1.small') - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_get_partitions(partition): - return [(1, 2, 3, 4, "", ""), (1, 2, 3, 4, "", "")] - - self.stubs.Set(vm_utils, '_get_partitions', fake_get_partitions) - - self.assertRaises(exception.InstanceFaultRollback, - conn.migrate_disk_and_power_off, - self.context, instance, - '127.0.0.1', flavor, None) - - @stub_vm_utils_with_vdi_attached_here - def test_migrate_bad_fs_type_no_resize_down(self): - instance_values = self.instance_values - instance = db.instance_create(self.context, instance_values) - xenapi_fake.create_vm(instance['name'], 'Running') - flavor = db.flavor_get_by_name(self.context, 'm1.small') - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_get_partitions(partition): - return [(1, 2, 3, "ext2", "", "boot")] - - self.stubs.Set(vm_utils, '_get_partitions', fake_get_partitions) - - self.assertRaises(exception.InstanceFaultRollback, - conn.migrate_disk_and_power_off, - self.context, instance, - '127.0.0.1', flavor, None) - - def test_migrate_rollback_when_resize_down_fs_fails(self): - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vmops = conn._vmops - - self.mox.StubOutWithMock(vmops, '_resize_ensure_vm_is_shutdown') - self.mox.StubOutWithMock(vmops, '_apply_orig_vm_name_label') - self.mox.StubOutWithMock(vm_utils, 'resize_disk') - self.mox.StubOutWithMock(vm_utils, 'migrate_vhd') - self.mox.StubOutWithMock(vm_utils, 'destroy_vdi') - self.mox.StubOutWithMock(vm_utils, 'get_vdi_for_vm_safely') - self.mox.StubOutWithMock(vmops, '_restore_orig_vm_and_cleanup_orphan') - - instance = objects.Instance(context=self.context, - auto_disk_config=True, uuid='uuid') - instance.obj_reset_changes() - vm_ref = "vm_ref" - dest = "dest" - flavor = "type" - sr_path = "sr_path" - - vmops._resize_ensure_vm_is_shutdown(instance, vm_ref) - vmops._apply_orig_vm_name_label(instance, vm_ref) - old_vdi_ref = "old_ref" - vm_utils.get_vdi_for_vm_safely(vmops._session, vm_ref).AndReturn( - (old_vdi_ref, None)) - new_vdi_ref = "new_ref" - new_vdi_uuid = "new_uuid" - vm_utils.resize_disk(vmops._session, instance, old_vdi_ref, - flavor).AndReturn((new_vdi_ref, new_vdi_uuid)) - vm_utils.migrate_vhd(vmops._session, instance, new_vdi_uuid, dest, - sr_path, 0).AndRaise( - exception.ResizeError(reason="asdf")) - - vm_utils.destroy_vdi(vmops._session, new_vdi_ref) - vmops._restore_orig_vm_and_cleanup_orphan(instance) - - self.mox.ReplayAll() - - with mock.patch.object(instance, 'save') as mock_save: - self.assertRaises(exception.InstanceFaultRollback, - vmops._migrate_disk_resizing_down, self.context, - instance, dest, flavor, vm_ref, sr_path) - self.assertEqual(3, mock_save.call_count) - self.assertEqual(60.0, instance.progress) - - def test_resize_ensure_vm_is_shutdown_cleanly(self): - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vmops = conn._vmops - fake_instance = {'uuid': 'uuid'} - - self.mox.StubOutWithMock(vm_utils, 'is_vm_shutdown') - self.mox.StubOutWithMock(vm_utils, 'clean_shutdown_vm') - self.mox.StubOutWithMock(vm_utils, 'hard_shutdown_vm') - - vm_utils.is_vm_shutdown(vmops._session, "ref").AndReturn(False) - vm_utils.clean_shutdown_vm(vmops._session, fake_instance, - "ref").AndReturn(True) - - self.mox.ReplayAll() - - vmops._resize_ensure_vm_is_shutdown(fake_instance, "ref") - - def test_resize_ensure_vm_is_shutdown_forced(self): - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vmops = conn._vmops - fake_instance = {'uuid': 'uuid'} - - self.mox.StubOutWithMock(vm_utils, 'is_vm_shutdown') - self.mox.StubOutWithMock(vm_utils, 'clean_shutdown_vm') - self.mox.StubOutWithMock(vm_utils, 'hard_shutdown_vm') - - vm_utils.is_vm_shutdown(vmops._session, "ref").AndReturn(False) - vm_utils.clean_shutdown_vm(vmops._session, fake_instance, - "ref").AndReturn(False) - vm_utils.hard_shutdown_vm(vmops._session, fake_instance, - "ref").AndReturn(True) - - self.mox.ReplayAll() - - vmops._resize_ensure_vm_is_shutdown(fake_instance, "ref") - - def test_resize_ensure_vm_is_shutdown_fails(self): - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vmops = conn._vmops - fake_instance = {'uuid': 'uuid'} - - self.mox.StubOutWithMock(vm_utils, 'is_vm_shutdown') - self.mox.StubOutWithMock(vm_utils, 'clean_shutdown_vm') - self.mox.StubOutWithMock(vm_utils, 'hard_shutdown_vm') - - vm_utils.is_vm_shutdown(vmops._session, "ref").AndReturn(False) - vm_utils.clean_shutdown_vm(vmops._session, fake_instance, - "ref").AndReturn(False) - vm_utils.hard_shutdown_vm(vmops._session, fake_instance, - "ref").AndReturn(False) - - self.mox.ReplayAll() - - self.assertRaises(exception.ResizeError, - vmops._resize_ensure_vm_is_shutdown, fake_instance, "ref") - - def test_resize_ensure_vm_is_shutdown_already_shutdown(self): - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - vmops = conn._vmops - fake_instance = {'uuid': 'uuid'} - - self.mox.StubOutWithMock(vm_utils, 'is_vm_shutdown') - self.mox.StubOutWithMock(vm_utils, 'clean_shutdown_vm') - self.mox.StubOutWithMock(vm_utils, 'hard_shutdown_vm') - - vm_utils.is_vm_shutdown(vmops._session, "ref").AndReturn(True) - - self.mox.ReplayAll() - - vmops._resize_ensure_vm_is_shutdown(fake_instance, "ref") - - -class XenAPIImageTypeTestCase(test.NoDBTestCase): - """Test ImageType class.""" - - def test_to_string(self): - # Can convert from type id to type string. - self.assertEqual( - vm_utils.ImageType.to_string(vm_utils.ImageType.KERNEL), - vm_utils.ImageType.KERNEL_STR) - - def _assert_role(self, expected_role, image_type_id): - self.assertEqual( - expected_role, - vm_utils.ImageType.get_role(image_type_id)) - - def test_get_image_role_kernel(self): - self._assert_role('kernel', vm_utils.ImageType.KERNEL) - - def test_get_image_role_ramdisk(self): - self._assert_role('ramdisk', vm_utils.ImageType.RAMDISK) - - def test_get_image_role_disk(self): - self._assert_role('root', vm_utils.ImageType.DISK) - - def test_get_image_role_disk_raw(self): - self._assert_role('root', vm_utils.ImageType.DISK_RAW) - - def test_get_image_role_disk_vhd(self): - self._assert_role('root', vm_utils.ImageType.DISK_VHD) - - -class XenAPIDetermineDiskImageTestCase(test.NoDBTestCase): - """Unit tests for code that detects the ImageType.""" - def assert_disk_type(self, image_meta, expected_disk_type): - actual = vm_utils.determine_disk_image_type(image_meta) - self.assertEqual(expected_disk_type, actual) - - def test_machine(self): - image_meta = {'id': 'a', 'disk_format': 'ami'} - self.assert_disk_type(image_meta, vm_utils.ImageType.DISK) - - def test_raw(self): - image_meta = {'id': 'a', 'disk_format': 'raw'} - self.assert_disk_type(image_meta, vm_utils.ImageType.DISK_RAW) - - def test_vhd(self): - image_meta = {'id': 'a', 'disk_format': 'vhd'} - self.assert_disk_type(image_meta, vm_utils.ImageType.DISK_VHD) - - def test_none(self): - image_meta = None - self.assert_disk_type(image_meta, None) - - -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIHostTestCase(stubs.XenAPITestBase): - """Tests HostState, which holds metrics from XenServer that get - reported back to the Schedulers. - """ - - def setUp(self): - super(XenAPIHostTestCase, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.context = context.get_admin_context() - self.flags(use_local=True, group='conductor') - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.instance = fake_instance.fake_db_instance(name='foo') - - def test_host_state(self): - stats = self.conn.host_state.get_host_stats(False) - # Values from fake.create_local_srs (ext SR) - self.assertEqual(stats['disk_total'], 40000) - self.assertEqual(stats['disk_used'], 20000) - # Values from fake._plugin_xenhost_host_data - self.assertEqual(stats['host_memory_total'], 10) - self.assertEqual(stats['host_memory_overhead'], 20) - self.assertEqual(stats['host_memory_free'], 30) - self.assertEqual(stats['host_memory_free_computed'], 40) - self.assertEqual(stats['hypervisor_hostname'], 'fake-xenhost') - self.assertThat({'cpu_count': 50}, - matchers.DictMatches(stats['host_cpu_info'])) - # No VMs running - self.assertEqual(stats['vcpus_used'], 0) - - def test_host_state_vcpus_used(self): - stats = self.conn.host_state.get_host_stats(True) - self.assertEqual(stats['vcpus_used'], 0) - xenapi_fake.create_vm(self.instance['name'], 'Running') - stats = self.conn.host_state.get_host_stats(True) - self.assertEqual(stats['vcpus_used'], 4) - - def test_pci_passthrough_devices_whitelist(self): - # NOTE(guillaume-thouvenin): This pci whitelist will be used to - # match with _plugin_xenhost_get_pci_device_details method in fake.py. - white_list = '{"vendor_id":"10de", "product_id":"11bf"}' - self.flags(pci_passthrough_whitelist=[white_list]) - stats = self.conn.host_state.get_host_stats(False) - self.assertEqual(len(stats['pci_passthrough_devices']), 1) - - def test_pci_passthrough_devices_no_whitelist(self): - stats = self.conn.host_state.get_host_stats(False) - self.assertEqual(len(stats['pci_passthrough_devices']), 0) - - def test_host_state_missing_sr(self): - # Must trigger construction of 'host_state' property - # before introducing the stub which raises the error - hs = self.conn.host_state - - def fake_safe_find_sr(session): - raise exception.StorageRepositoryNotFound('not there') - - self.stubs.Set(vm_utils, 'safe_find_sr', fake_safe_find_sr) - self.assertRaises(exception.StorageRepositoryNotFound, - hs.get_host_stats, - refresh=True) - - def _test_host_action(self, method, action, expected=None): - result = method('host', action) - if not expected: - expected = action - self.assertEqual(result, expected) - - def test_host_reboot(self): - self._test_host_action(self.conn.host_power_action, 'reboot') - - def test_host_shutdown(self): - self._test_host_action(self.conn.host_power_action, 'shutdown') - - def test_host_startup(self): - self.assertRaises(NotImplementedError, - self.conn.host_power_action, 'host', 'startup') - - def test_host_maintenance_on(self): - self._test_host_action(self.conn.host_maintenance_mode, - True, 'on_maintenance') - - def test_host_maintenance_off(self): - self._test_host_action(self.conn.host_maintenance_mode, - False, 'off_maintenance') - - def test_set_enable_host_enable(self): - _create_service_entries(self.context, values={'nova': ['fake-mini']}) - self._test_host_action(self.conn.set_host_enabled, True, 'enabled') - service = db.service_get_by_args(self.context, 'fake-mini', - 'nova-compute') - self.assertEqual(service.disabled, False) - - def test_set_enable_host_disable(self): - _create_service_entries(self.context, values={'nova': ['fake-mini']}) - self._test_host_action(self.conn.set_host_enabled, False, 'disabled') - service = db.service_get_by_args(self.context, 'fake-mini', - 'nova-compute') - self.assertEqual(service.disabled, True) - - def test_get_host_uptime(self): - result = self.conn.get_host_uptime('host') - self.assertEqual(result, 'fake uptime') - - def test_supported_instances_is_included_in_host_state(self): - stats = self.conn.host_state.get_host_stats(False) - self.assertIn('supported_instances', stats) - - def test_supported_instances_is_calculated_by_to_supported_instances(self): - - def to_supported_instances(somedata): - self.assertIsNone(somedata) - return "SOMERETURNVALUE" - self.stubs.Set(host, 'to_supported_instances', to_supported_instances) - - stats = self.conn.host_state.get_host_stats(False) - self.assertEqual("SOMERETURNVALUE", stats['supported_instances']) - - def test_update_stats_caches_hostname(self): - self.mox.StubOutWithMock(host, 'call_xenhost') - self.mox.StubOutWithMock(vm_utils, 'scan_default_sr') - self.mox.StubOutWithMock(vm_utils, 'list_vms') - self.mox.StubOutWithMock(self.conn._session, 'call_xenapi') - data = {'disk_total': 0, - 'disk_used': 0, - 'disk_available': 0, - 'supported_instances': 0, - 'host_capabilities': [], - 'host_hostname': 'foo', - 'vcpus_used': 0, - } - sr_rec = { - 'physical_size': 0, - 'physical_utilisation': 0, - 'virtual_allocation': 0, - } - - for i in range(3): - host.call_xenhost(mox.IgnoreArg(), 'host_data', {}).AndReturn(data) - vm_utils.scan_default_sr(self.conn._session).AndReturn("ref") - vm_utils.list_vms(self.conn._session).AndReturn([]) - self.conn._session.call_xenapi('SR.get_record', "ref").AndReturn( - sr_rec) - if i == 2: - # On the third call (the second below) change the hostname - data = dict(data, host_hostname='bar') - - self.mox.ReplayAll() - stats = self.conn.host_state.get_host_stats(refresh=True) - self.assertEqual('foo', stats['hypervisor_hostname']) - stats = self.conn.host_state.get_host_stats(refresh=True) - self.assertEqual('foo', stats['hypervisor_hostname']) - - -class ToSupportedInstancesTestCase(test.NoDBTestCase): - def test_default_return_value(self): - self.assertEqual([], - host.to_supported_instances(None)) - - def test_return_value(self): - self.assertEqual([(arch.X86_64, hvtype.XEN, 'xen')], - host.to_supported_instances([u'xen-3.0-x86_64'])) - - def test_invalid_values_do_not_break(self): - self.assertEqual([(arch.X86_64, hvtype.XEN, 'xen')], - host.to_supported_instances([u'xen-3.0-x86_64', 'spam'])) - - def test_multiple_values(self): - self.assertEqual( - [ - (arch.X86_64, hvtype.XEN, 'xen'), - (arch.I686, hvtype.XEN, 'hvm') - ], - host.to_supported_instances([u'xen-3.0-x86_64', 'hvm-3.0-x86_32']) - ) - - -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIAutoDiskConfigTestCase(stubs.XenAPITestBase): - def setUp(self): - super(XenAPIAutoDiskConfigTestCase, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self.user_id = 'fake' - self.project_id = 'fake' - - self.instance_values = {'id': 1, - 'project_id': self.project_id, - 'user_id': self.user_id, - 'image_ref': 1, - 'kernel_id': 2, - 'ramdisk_id': 3, - 'root_gb': 80, - 'ephemeral_gb': 0, - 'instance_type_id': '3', # m1.large - 'os_type': 'linux', - 'architecture': 'x86-64'} - - self.context = context.RequestContext(self.user_id, self.project_id) - - def fake_create_vbd(session, vm_ref, vdi_ref, userdevice, - vbd_type='disk', read_only=False, bootable=True, - osvol=False): - pass - - self.stubs.Set(vm_utils, 'create_vbd', fake_create_vbd) - - def assertIsPartitionCalled(self, called): - marker = {"partition_called": False} - - def fake_resize_part_and_fs(dev, start, old_sectors, new_sectors, - flags): - marker["partition_called"] = True - self.stubs.Set(vm_utils, "_resize_part_and_fs", - fake_resize_part_and_fs) - - context.RequestContext(self.user_id, self.project_id) - session = get_session() - - disk_image_type = vm_utils.ImageType.DISK_VHD - instance = create_instance_with_system_metadata(self.context, - self.instance_values) - vm_ref = xenapi_fake.create_vm(instance['name'], 'Halted') - vdi_ref = xenapi_fake.create_vdi(instance['name'], 'fake') - - vdi_uuid = session.call_xenapi('VDI.get_record', vdi_ref)['uuid'] - vdis = {'root': {'uuid': vdi_uuid, 'ref': vdi_ref}} - - self.conn._vmops._attach_disks(instance, vm_ref, instance['name'], - vdis, disk_image_type, "fake_nw_inf") - - self.assertEqual(marker["partition_called"], called) - - def test_instance_not_auto_disk_config(self): - """Should not partition unless instance is marked as - auto_disk_config. - """ - self.instance_values['auto_disk_config'] = False - self.assertIsPartitionCalled(False) - - @stub_vm_utils_with_vdi_attached_here - def test_instance_auto_disk_config_fails_safe_two_partitions(self): - # Should not partition unless fail safes pass. - self.instance_values['auto_disk_config'] = True - - def fake_get_partitions(dev): - return [(1, 0, 100, 'ext4', "", ""), (2, 100, 200, 'ext4' "", "")] - self.stubs.Set(vm_utils, "_get_partitions", - fake_get_partitions) - - self.assertIsPartitionCalled(False) - - @stub_vm_utils_with_vdi_attached_here - def test_instance_auto_disk_config_fails_safe_badly_numbered(self): - # Should not partition unless fail safes pass. - self.instance_values['auto_disk_config'] = True - - def fake_get_partitions(dev): - return [(2, 100, 200, 'ext4', "", "")] - self.stubs.Set(vm_utils, "_get_partitions", - fake_get_partitions) - - self.assertIsPartitionCalled(False) - - @stub_vm_utils_with_vdi_attached_here - def test_instance_auto_disk_config_fails_safe_bad_fstype(self): - # Should not partition unless fail safes pass. - self.instance_values['auto_disk_config'] = True - - def fake_get_partitions(dev): - return [(1, 100, 200, 'asdf', "", "")] - self.stubs.Set(vm_utils, "_get_partitions", - fake_get_partitions) - - self.assertIsPartitionCalled(False) - - @stub_vm_utils_with_vdi_attached_here - def test_instance_auto_disk_config_passes_fail_safes(self): - """Should partition if instance is marked as auto_disk_config=True and - virt-layer specific fail-safe checks pass. - """ - self.instance_values['auto_disk_config'] = True - - def fake_get_partitions(dev): - return [(1, 0, 100, 'ext4', "", "boot")] - self.stubs.Set(vm_utils, "_get_partitions", - fake_get_partitions) - - self.assertIsPartitionCalled(True) - - -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIGenerateLocal(stubs.XenAPITestBase): - """Test generating of local disks, like swap and ephemeral.""" - def setUp(self): - super(XenAPIGenerateLocal, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - db_fakes.stub_out_db_instance_api(self.stubs) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self.user_id = 'fake' - self.project_id = 'fake' - - self.instance_values = {'id': 1, - 'project_id': self.project_id, - 'user_id': self.user_id, - 'image_ref': 1, - 'kernel_id': 2, - 'ramdisk_id': 3, - 'root_gb': 80, - 'ephemeral_gb': 0, - 'instance_type_id': '3', # m1.large - 'os_type': 'linux', - 'architecture': 'x86-64'} - - self.context = context.RequestContext(self.user_id, self.project_id) - - def fake_create_vbd(session, vm_ref, vdi_ref, userdevice, - vbd_type='disk', read_only=False, bootable=True, - osvol=False, empty=False, unpluggable=True): - return session.call_xenapi('VBD.create', {'VM': vm_ref, - 'VDI': vdi_ref}) - - self.stubs.Set(vm_utils, 'create_vbd', fake_create_vbd) - - def assertCalled(self, instance, - disk_image_type=vm_utils.ImageType.DISK_VHD): - context.RequestContext(self.user_id, self.project_id) - session = get_session() - - vm_ref = xenapi_fake.create_vm(instance['name'], 'Halted') - vdi_ref = xenapi_fake.create_vdi(instance['name'], 'fake') - - vdi_uuid = session.call_xenapi('VDI.get_record', vdi_ref)['uuid'] - - vdi_key = 'root' - if disk_image_type == vm_utils.ImageType.DISK_ISO: - vdi_key = 'iso' - vdis = {vdi_key: {'uuid': vdi_uuid, 'ref': vdi_ref}} - - self.called = False - self.conn._vmops._attach_disks(instance, vm_ref, instance['name'], - vdis, disk_image_type, "fake_nw_inf") - self.assertTrue(self.called) - - def test_generate_swap(self): - # Test swap disk generation. - instance_values = dict(self.instance_values, instance_type_id=5) - instance = create_instance_with_system_metadata(self.context, - instance_values) - - def fake_generate_swap(*args, **kwargs): - self.called = True - self.stubs.Set(vm_utils, 'generate_swap', fake_generate_swap) - - self.assertCalled(instance) - - def test_generate_ephemeral(self): - # Test ephemeral disk generation. - instance_values = dict(self.instance_values, instance_type_id=4) - instance = create_instance_with_system_metadata(self.context, - instance_values) - - def fake_generate_ephemeral(*args): - self.called = True - self.stubs.Set(vm_utils, 'generate_ephemeral', fake_generate_ephemeral) - - self.assertCalled(instance) - - def test_generate_iso_blank_root_disk(self): - instance_values = dict(self.instance_values, instance_type_id=4) - instance_values.pop('kernel_id') - instance_values.pop('ramdisk_id') - instance = create_instance_with_system_metadata(self.context, - instance_values) - - def fake_generate_ephemeral(*args): - pass - self.stubs.Set(vm_utils, 'generate_ephemeral', fake_generate_ephemeral) - - def fake_generate_iso(*args): - self.called = True - self.stubs.Set(vm_utils, 'generate_iso_blank_root_disk', - fake_generate_iso) - - self.assertCalled(instance, vm_utils.ImageType.DISK_ISO) - - -class XenAPIBWCountersTestCase(stubs.XenAPITestBaseNoDB): - FAKE_VMS = {'test1:ref': dict(name_label='test1', - other_config=dict(nova_uuid='hash'), - domid='12', - _vifmap={'0': "a:b:c:d...", - '1': "e:f:12:q..."}), - 'test2:ref': dict(name_label='test2', - other_config=dict(nova_uuid='hash'), - domid='42', - _vifmap={'0': "a:3:c:d...", - '1': "e:f:42:q..."}), - } - - def setUp(self): - super(XenAPIBWCountersTestCase, self).setUp() - self.stubs.Set(vm_utils, 'list_vms', - XenAPIBWCountersTestCase._fake_list_vms) - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def _fake_get_vif_device_map(vm_rec): - return vm_rec['_vifmap'] - - self.stubs.Set(self.conn._vmops, "_get_vif_device_map", - _fake_get_vif_device_map) - - @classmethod - def _fake_list_vms(cls, session): - return cls.FAKE_VMS.iteritems() - - @staticmethod - def _fake_fetch_bandwidth_mt(session): - return {} - - @staticmethod - def _fake_fetch_bandwidth(session): - return {'42': - {'0': {'bw_in': 21024, 'bw_out': 22048}, - '1': {'bw_in': 231337, 'bw_out': 221212121}}, - '12': - {'0': {'bw_in': 1024, 'bw_out': 2048}, - '1': {'bw_in': 31337, 'bw_out': 21212121}}, - } - - def test_get_all_bw_counters(self): - instances = [dict(name='test1', uuid='1-2-3'), - dict(name='test2', uuid='4-5-6')] - - self.stubs.Set(vm_utils, 'fetch_bandwidth', - self._fake_fetch_bandwidth) - result = self.conn.get_all_bw_counters(instances) - self.assertEqual(len(result), 4) - self.assertIn(dict(uuid='1-2-3', - mac_address="a:b:c:d...", - bw_in=1024, - bw_out=2048), result) - self.assertIn(dict(uuid='1-2-3', - mac_address="e:f:12:q...", - bw_in=31337, - bw_out=21212121), result) - - self.assertIn(dict(uuid='4-5-6', - mac_address="a:3:c:d...", - bw_in=21024, - bw_out=22048), result) - self.assertIn(dict(uuid='4-5-6', - mac_address="e:f:42:q...", - bw_in=231337, - bw_out=221212121), result) - - def test_get_all_bw_counters_in_failure_case(self): - """Test that get_all_bw_conters returns an empty list when - no data returned from Xenserver. c.f. bug #910045. - """ - instances = [dict(name='instance-0001', uuid='1-2-3-4-5')] - - self.stubs.Set(vm_utils, 'fetch_bandwidth', - self._fake_fetch_bandwidth_mt) - result = self.conn.get_all_bw_counters(instances) - self.assertEqual(result, []) - - -# TODO(salvatore-orlando): this class and -# nova.tests.virt.test_libvirt.IPTablesFirewallDriverTestCase share a lot of -# code. Consider abstracting common code in a base class for firewall driver -# testing. -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase): - - REQUIRES_LOCKING = True - - _in_rules = [ - '# Generated by iptables-save v1.4.10 on Sat Feb 19 00:03:19 2011', - '*nat', - ':PREROUTING ACCEPT [1170:189210]', - ':INPUT ACCEPT [844:71028]', - ':OUTPUT ACCEPT [5149:405186]', - ':POSTROUTING ACCEPT [5063:386098]', - '# Completed on Mon Dec 6 11:54:13 2010', - '# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010', - '*mangle', - ':INPUT ACCEPT [969615:281627771]', - ':FORWARD ACCEPT [0:0]', - ':OUTPUT ACCEPT [915599:63811649]', - ':nova-block-ipv4 - [0:0]', - '[0:0] -A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ', - '[0:0] -A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED' - ',ESTABLISHED -j ACCEPT ', - '[0:0] -A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ', - '[0:0] -A FORWARD -i virbr0 -o virbr0 -j ACCEPT ', - '[0:0] -A FORWARD -o virbr0 -j REJECT ' - '--reject-with icmp-port-unreachable ', - '[0:0] -A FORWARD -i virbr0 -j REJECT ' - '--reject-with icmp-port-unreachable ', - 'COMMIT', - '# Completed on Mon Dec 6 11:54:13 2010', - '# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010', - '*filter', - ':INPUT ACCEPT [969615:281627771]', - ':FORWARD ACCEPT [0:0]', - ':OUTPUT ACCEPT [915599:63811649]', - ':nova-block-ipv4 - [0:0]', - '[0:0] -A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ', - '[0:0] -A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED' - ',ESTABLISHED -j ACCEPT ', - '[0:0] -A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ', - '[0:0] -A FORWARD -i virbr0 -o virbr0 -j ACCEPT ', - '[0:0] -A FORWARD -o virbr0 -j REJECT ' - '--reject-with icmp-port-unreachable ', - '[0:0] -A FORWARD -i virbr0 -j REJECT ' - '--reject-with icmp-port-unreachable ', - 'COMMIT', - '# Completed on Mon Dec 6 11:54:13 2010', - ] - - _in6_filter_rules = [ - '# Generated by ip6tables-save v1.4.4 on Tue Jan 18 23:47:56 2011', - '*filter', - ':INPUT ACCEPT [349155:75810423]', - ':FORWARD ACCEPT [0:0]', - ':OUTPUT ACCEPT [349256:75777230]', - 'COMMIT', - '# Completed on Tue Jan 18 23:47:56 2011', - ] - - def setUp(self): - super(XenAPIDom0IptablesFirewallTestCase, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(instance_name_template='%d', - firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - self.user_id = 'mappin' - self.project_id = 'fake' - stubs.stubout_session(self.stubs, stubs.FakeSessionForFirewallTests, - test_case=self) - self.context = context.RequestContext(self.user_id, self.project_id) - self.network = importutils.import_object(CONF.network_manager) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.fw = self.conn._vmops.firewall_driver - - def _create_instance_ref(self): - return db.instance_create(self.context, - {'user_id': self.user_id, - 'project_id': self.project_id, - 'instance_type_id': 1}) - - def _create_test_security_group(self): - admin_ctxt = context.get_admin_context() - secgroup = db.security_group_create(admin_ctxt, - {'user_id': self.user_id, - 'project_id': self.project_id, - 'name': 'testgroup', - 'description': 'test group'}) - db.security_group_rule_create(admin_ctxt, - {'parent_group_id': secgroup['id'], - 'protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'cidr': '192.168.11.0/24'}) - - db.security_group_rule_create(admin_ctxt, - {'parent_group_id': secgroup['id'], - 'protocol': 'icmp', - 'from_port': 8, - 'to_port': -1, - 'cidr': '192.168.11.0/24'}) - - db.security_group_rule_create(admin_ctxt, - {'parent_group_id': secgroup['id'], - 'protocol': 'tcp', - 'from_port': 80, - 'to_port': 81, - 'cidr': '192.168.10.0/24'}) - return secgroup - - def _validate_security_group(self): - in_rules = filter(lambda l: not l.startswith('#'), - self._in_rules) - for rule in in_rules: - if 'nova' not in rule: - self.assertTrue(rule in self._out_rules, - 'Rule went missing: %s' % rule) - - instance_chain = None - for rule in self._out_rules: - # This is pretty crude, but it'll do for now - # last two octets change - if re.search('-d 192.168.[0-9]{1,3}.[0-9]{1,3} -j', rule): - instance_chain = rule.split(' ')[-1] - break - self.assertTrue(instance_chain, "The instance chain wasn't added") - security_group_chain = None - for rule in self._out_rules: - # This is pretty crude, but it'll do for now - if '-A %s -j' % instance_chain in rule: - security_group_chain = rule.split(' ')[-1] - break - self.assertTrue(security_group_chain, - "The security group chain wasn't added") - - regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p icmp' - ' -s 192.168.11.0/24') - self.assertTrue(len(filter(regex.match, self._out_rules)) > 0, - "ICMP acceptance rule wasn't added") - - regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p icmp -m icmp' - ' --icmp-type 8 -s 192.168.11.0/24') - self.assertTrue(len(filter(regex.match, self._out_rules)) > 0, - "ICMP Echo Request acceptance rule wasn't added") - - regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p tcp --dport 80:81' - ' -s 192.168.10.0/24') - self.assertTrue(len(filter(regex.match, self._out_rules)) > 0, - "TCP port 80/81 acceptance rule wasn't added") - - def test_static_filters(self): - instance_ref = self._create_instance_ref() - src_instance_ref = self._create_instance_ref() - admin_ctxt = context.get_admin_context() - secgroup = self._create_test_security_group() - - src_secgroup = db.security_group_create(admin_ctxt, - {'user_id': self.user_id, - 'project_id': self.project_id, - 'name': 'testsourcegroup', - 'description': 'src group'}) - db.security_group_rule_create(admin_ctxt, - {'parent_group_id': secgroup['id'], - 'protocol': 'tcp', - 'from_port': 80, - 'to_port': 81, - 'group_id': src_secgroup['id']}) - - db.instance_add_security_group(admin_ctxt, instance_ref['uuid'], - secgroup['id']) - db.instance_add_security_group(admin_ctxt, src_instance_ref['uuid'], - src_secgroup['id']) - instance_ref = db.instance_get(admin_ctxt, instance_ref['id']) - src_instance_ref = db.instance_get(admin_ctxt, src_instance_ref['id']) - - network_model = fake_network.fake_get_instance_nw_info(self.stubs, 1) - - from nova.compute import utils as compute_utils # noqa - self.stubs.Set(compute_utils, 'get_nw_info_for_instance', - lambda instance: network_model) - - self.fw.prepare_instance_filter(instance_ref, network_model) - self.fw.apply_instance_filter(instance_ref, network_model) - - self._validate_security_group() - # Extra test for TCP acceptance rules - for ip in network_model.fixed_ips(): - if ip['version'] != 4: - continue - regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p tcp' - ' --dport 80:81 -s %s' % ip['address']) - self.assertTrue(len(filter(regex.match, self._out_rules)) > 0, - "TCP port 80/81 acceptance rule wasn't added") - - db.instance_destroy(admin_ctxt, instance_ref['uuid']) - - def test_filters_for_instance_with_ip_v6(self): - self.flags(use_ipv6=True) - network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1) - rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info) - self.assertEqual(len(rulesv4), 2) - self.assertEqual(len(rulesv6), 1) - - def test_filters_for_instance_without_ip_v6(self): - self.flags(use_ipv6=False) - network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1) - rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info) - self.assertEqual(len(rulesv4), 2) - self.assertEqual(len(rulesv6), 0) - - def test_multinic_iptables(self): - ipv4_rules_per_addr = 1 - ipv4_addr_per_network = 2 - ipv6_rules_per_addr = 1 - ipv6_addr_per_network = 1 - networks_count = 5 - instance_ref = self._create_instance_ref() - _get_instance_nw_info = fake_network.fake_get_instance_nw_info - network_info = _get_instance_nw_info(self.stubs, - networks_count, - ipv4_addr_per_network) - network_info[0]['network']['subnets'][0]['meta']['dhcp_server'] = \ - '1.1.1.1' - ipv4_len = len(self.fw.iptables.ipv4['filter'].rules) - ipv6_len = len(self.fw.iptables.ipv6['filter'].rules) - inst_ipv4, inst_ipv6 = self.fw.instance_rules(instance_ref, - network_info) - self.fw.prepare_instance_filter(instance_ref, network_info) - ipv4 = self.fw.iptables.ipv4['filter'].rules - ipv6 = self.fw.iptables.ipv6['filter'].rules - ipv4_network_rules = len(ipv4) - len(inst_ipv4) - ipv4_len - ipv6_network_rules = len(ipv6) - len(inst_ipv6) - ipv6_len - # Extra rules are for the DHCP request - rules = (ipv4_rules_per_addr * ipv4_addr_per_network * - networks_count) + 2 - self.assertEqual(ipv4_network_rules, rules) - self.assertEqual(ipv6_network_rules, - ipv6_rules_per_addr * ipv6_addr_per_network * networks_count) - - def test_do_refresh_security_group_rules(self): - admin_ctxt = context.get_admin_context() - instance_ref = self._create_instance_ref() - network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1, 1) - secgroup = self._create_test_security_group() - db.instance_add_security_group(admin_ctxt, instance_ref['uuid'], - secgroup['id']) - self.fw.prepare_instance_filter(instance_ref, network_info) - self.fw.instance_info[instance_ref['id']] = (instance_ref, - network_info) - self._validate_security_group() - # add a rule to the security group - db.security_group_rule_create(admin_ctxt, - {'parent_group_id': secgroup['id'], - 'protocol': 'udp', - 'from_port': 200, - 'to_port': 299, - 'cidr': '192.168.99.0/24'}) - # validate the extra rule - self.fw.refresh_security_group_rules(secgroup) - regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p udp --dport 200:299' - ' -s 192.168.99.0/24') - self.assertTrue(len(filter(regex.match, self._out_rules)) > 0, - "Rules were not updated properly." - "The rule for UDP acceptance is missing") - - def test_provider_firewall_rules(self): - # setup basic instance data - instance_ref = self._create_instance_ref() - # FRAGILE: as in libvirt tests - # peeks at how the firewall names chains - chain_name = 'inst-%s' % instance_ref['id'] - - network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1, 1) - self.fw.prepare_instance_filter(instance_ref, network_info) - self.assertIn('provider', self.fw.iptables.ipv4['filter'].chains) - rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules - if rule.chain == 'provider'] - self.assertEqual(0, len(rules)) - - admin_ctxt = context.get_admin_context() - # add a rule and send the update message, check for 1 rule - db.provider_fw_rule_create(admin_ctxt, - {'protocol': 'tcp', - 'cidr': '10.99.99.99/32', - 'from_port': 1, - 'to_port': 65535}) - self.fw.refresh_provider_fw_rules() - rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules - if rule.chain == 'provider'] - self.assertEqual(1, len(rules)) - - # Add another, refresh, and make sure number of rules goes to two - provider_fw1 = db.provider_fw_rule_create(admin_ctxt, - {'protocol': 'udp', - 'cidr': '10.99.99.99/32', - 'from_port': 1, - 'to_port': 65535}) - self.fw.refresh_provider_fw_rules() - rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules - if rule.chain == 'provider'] - self.assertEqual(2, len(rules)) - - # create the instance filter and make sure it has a jump rule - self.fw.prepare_instance_filter(instance_ref, network_info) - self.fw.apply_instance_filter(instance_ref, network_info) - inst_rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules - if rule.chain == chain_name] - jump_rules = [rule for rule in inst_rules if '-j' in rule.rule] - provjump_rules = [] - # IptablesTable doesn't make rules unique internally - for rule in jump_rules: - if 'provider' in rule.rule and rule not in provjump_rules: - provjump_rules.append(rule) - self.assertEqual(1, len(provjump_rules)) - - # remove a rule from the db, cast to compute to refresh rule - db.provider_fw_rule_destroy(admin_ctxt, provider_fw1['id']) - self.fw.refresh_provider_fw_rules() - rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules - if rule.chain == 'provider'] - self.assertEqual(1, len(rules)) - - -class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB): - """Unit tests for testing we find the right SR.""" - def test_safe_find_sr_raise_exception(self): - # Ensure StorageRepositoryNotFound is raise when wrong filter. - self.flags(sr_matching_filter='yadayadayada', group='xenserver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - session = get_session() - self.assertRaises(exception.StorageRepositoryNotFound, - vm_utils.safe_find_sr, session) - - def test_safe_find_sr_local_storage(self): - # Ensure the default local-storage is found. - self.flags(sr_matching_filter='other-config:i18n-key=local-storage', - group='xenserver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - session = get_session() - # This test is only guaranteed if there is one host in the pool - self.assertEqual(len(xenapi_fake.get_all('host')), 1) - host_ref = xenapi_fake.get_all('host')[0] - pbd_refs = xenapi_fake.get_all('PBD') - for pbd_ref in pbd_refs: - pbd_rec = xenapi_fake.get_record('PBD', pbd_ref) - if pbd_rec['host'] != host_ref: - continue - sr_rec = xenapi_fake.get_record('SR', pbd_rec['SR']) - if sr_rec['other_config']['i18n-key'] == 'local-storage': - local_sr = pbd_rec['SR'] - expected = vm_utils.safe_find_sr(session) - self.assertEqual(local_sr, expected) - - def test_safe_find_sr_by_other_criteria(self): - # Ensure the SR is found when using a different filter. - self.flags(sr_matching_filter='other-config:my_fake_sr=true', - group='xenserver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - session = get_session() - host_ref = xenapi_fake.get_all('host')[0] - local_sr = xenapi_fake.create_sr(name_label='Fake Storage', - type='lvm', - other_config={'my_fake_sr': 'true'}, - host_ref=host_ref) - expected = vm_utils.safe_find_sr(session) - self.assertEqual(local_sr, expected) - - def test_safe_find_sr_default(self): - # Ensure the default SR is found regardless of other-config. - self.flags(sr_matching_filter='default-sr:true', - group='xenserver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - session = get_session() - pool_ref = session.call_xenapi('pool.get_all')[0] - expected = vm_utils.safe_find_sr(session) - self.assertEqual(session.call_xenapi('pool.get_default_SR', pool_ref), - expected) - - -def _create_service_entries(context, values={'avail_zone1': ['fake_host1', - 'fake_host2'], - 'avail_zone2': ['fake_host3'], }): - for avail_zone, hosts in values.iteritems(): - for service_host in hosts: - db.service_create(context, - {'host': service_host, - 'binary': 'nova-compute', - 'topic': 'compute', - 'report_count': 0}) - return values - - -# FIXME(sirp): convert this to use XenAPITestBaseNoDB -class XenAPIAggregateTestCase(stubs.XenAPITestBase): - """Unit tests for aggregate operations.""" - def setUp(self): - super(XenAPIAggregateTestCase, self).setUp() - self.flags(connection_url='http://test_url', - connection_username='test_user', - connection_password='test_pass', - group='xenserver') - self.flags(instance_name_template='%d', - firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver', - host='host', - compute_driver='xenapi.XenAPIDriver', - default_availability_zone='avail_zone1') - self.flags(use_local=True, group='conductor') - host_ref = xenapi_fake.get_all('host')[0] - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.context = context.get_admin_context() - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.compute = importutils.import_object(CONF.compute_manager) - self.api = compute_api.AggregateAPI() - values = {'name': 'test_aggr', - 'metadata': {'availability_zone': 'test_zone', - pool_states.POOL_FLAG: 'XenAPI'}} - self.aggr = db.aggregate_create(self.context, values) - self.fake_metadata = {pool_states.POOL_FLAG: 'XenAPI', - 'master_compute': 'host', - 'availability_zone': 'fake_zone', - pool_states.KEY: pool_states.ACTIVE, - 'host': xenapi_fake.get_record('host', - host_ref)['uuid']} - - def test_pool_add_to_aggregate_called_by_driver(self): - - calls = [] - - def pool_add_to_aggregate(context, aggregate, host, slave_info=None): - self.assertEqual("CONTEXT", context) - self.assertEqual("AGGREGATE", aggregate) - self.assertEqual("HOST", host) - self.assertEqual("SLAVEINFO", slave_info) - calls.append(pool_add_to_aggregate) - self.stubs.Set(self.conn._pool, - "add_to_aggregate", - pool_add_to_aggregate) - - self.conn.add_to_aggregate("CONTEXT", "AGGREGATE", "HOST", - slave_info="SLAVEINFO") - - self.assertIn(pool_add_to_aggregate, calls) - - def test_pool_remove_from_aggregate_called_by_driver(self): - - calls = [] - - def pool_remove_from_aggregate(context, aggregate, host, - slave_info=None): - self.assertEqual("CONTEXT", context) - self.assertEqual("AGGREGATE", aggregate) - self.assertEqual("HOST", host) - self.assertEqual("SLAVEINFO", slave_info) - calls.append(pool_remove_from_aggregate) - self.stubs.Set(self.conn._pool, - "remove_from_aggregate", - pool_remove_from_aggregate) - - self.conn.remove_from_aggregate("CONTEXT", "AGGREGATE", "HOST", - slave_info="SLAVEINFO") - - self.assertIn(pool_remove_from_aggregate, calls) - - def test_add_to_aggregate_for_first_host_sets_metadata(self): - def fake_init_pool(id, name): - fake_init_pool.called = True - self.stubs.Set(self.conn._pool, "_init_pool", fake_init_pool) - - aggregate = self._aggregate_setup() - self.conn._pool.add_to_aggregate(self.context, aggregate, "host") - result = db.aggregate_get(self.context, aggregate['id']) - self.assertTrue(fake_init_pool.called) - self.assertThat(self.fake_metadata, - matchers.DictMatches(result['metadetails'])) - - def test_join_slave(self): - # Ensure join_slave gets called when the request gets to master. - def fake_join_slave(id, compute_uuid, host, url, user, password): - fake_join_slave.called = True - self.stubs.Set(self.conn._pool, "_join_slave", fake_join_slave) - - aggregate = self._aggregate_setup(hosts=['host', 'host2'], - metadata=self.fake_metadata) - self.conn._pool.add_to_aggregate(self.context, aggregate, "host2", - dict(compute_uuid='fake_uuid', - url='fake_url', - user='fake_user', - passwd='fake_pass', - xenhost_uuid='fake_uuid')) - self.assertTrue(fake_join_slave.called) - - def test_add_to_aggregate_first_host(self): - def fake_pool_set_name_label(self, session, pool_ref, name): - fake_pool_set_name_label.called = True - self.stubs.Set(xenapi_fake.SessionBase, "pool_set_name_label", - fake_pool_set_name_label) - self.conn._session.call_xenapi("pool.create", {"name": "asdf"}) - - metadata = {'availability_zone': 'fake_zone', - pool_states.POOL_FLAG: "XenAPI", - pool_states.KEY: pool_states.CREATED} - - aggregate = objects.Aggregate() - aggregate.name = 'fake_aggregate' - aggregate.metadata = dict(metadata) - aggregate.create(self.context) - aggregate.add_host('host') - self.assertEqual(["host"], aggregate.hosts) - self.assertEqual(metadata, aggregate.metadata) - - self.conn._pool.add_to_aggregate(self.context, aggregate, "host") - self.assertTrue(fake_pool_set_name_label.called) - - def test_remove_from_aggregate_called(self): - def fake_remove_from_aggregate(context, aggregate, host): - fake_remove_from_aggregate.called = True - self.stubs.Set(self.conn._pool, - "remove_from_aggregate", - fake_remove_from_aggregate) - - self.conn.remove_from_aggregate(None, None, None) - self.assertTrue(fake_remove_from_aggregate.called) - - def test_remove_from_empty_aggregate(self): - result = self._aggregate_setup() - self.assertRaises(exception.InvalidAggregateAction, - self.conn._pool.remove_from_aggregate, - self.context, result, "test_host") - - def test_remove_slave(self): - # Ensure eject slave gets called. - def fake_eject_slave(id, compute_uuid, host_uuid): - fake_eject_slave.called = True - self.stubs.Set(self.conn._pool, "_eject_slave", fake_eject_slave) - - self.fake_metadata['host2'] = 'fake_host2_uuid' - aggregate = self._aggregate_setup(hosts=['host', 'host2'], - metadata=self.fake_metadata, aggr_state=pool_states.ACTIVE) - self.conn._pool.remove_from_aggregate(self.context, aggregate, "host2") - self.assertTrue(fake_eject_slave.called) - - def test_remove_master_solo(self): - # Ensure metadata are cleared after removal. - def fake_clear_pool(id): - fake_clear_pool.called = True - self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool) - - aggregate = self._aggregate_setup(metadata=self.fake_metadata) - self.conn._pool.remove_from_aggregate(self.context, aggregate, "host") - result = db.aggregate_get(self.context, aggregate['id']) - self.assertTrue(fake_clear_pool.called) - self.assertThat({'availability_zone': 'fake_zone', - pool_states.POOL_FLAG: 'XenAPI', - pool_states.KEY: pool_states.ACTIVE}, - matchers.DictMatches(result['metadetails'])) - - def test_remote_master_non_empty_pool(self): - # Ensure AggregateError is raised if removing the master. - aggregate = self._aggregate_setup(hosts=['host', 'host2'], - metadata=self.fake_metadata) - - self.assertRaises(exception.InvalidAggregateAction, - self.conn._pool.remove_from_aggregate, - self.context, aggregate, "host") - - def _aggregate_setup(self, aggr_name='fake_aggregate', - aggr_zone='fake_zone', - aggr_state=pool_states.CREATED, - hosts=['host'], metadata=None): - aggregate = objects.Aggregate() - aggregate.name = aggr_name - aggregate.metadata = {'availability_zone': aggr_zone, - pool_states.POOL_FLAG: 'XenAPI', - pool_states.KEY: aggr_state, - } - if metadata: - aggregate.metadata.update(metadata) - aggregate.create(self.context) - for aggregate_host in hosts: - aggregate.add_host(aggregate_host) - return aggregate - - def test_add_host_to_aggregate_invalid_changing_status(self): - """Ensure InvalidAggregateAction is raised when adding host while - aggregate is not ready. - """ - aggregate = self._aggregate_setup(aggr_state=pool_states.CHANGING) - ex = self.assertRaises(exception.InvalidAggregateAction, - self.conn.add_to_aggregate, self.context, - aggregate, 'host') - self.assertIn('setup in progress', str(ex)) - - def test_add_host_to_aggregate_invalid_dismissed_status(self): - """Ensure InvalidAggregateAction is raised when aggregate is - deleted. - """ - aggregate = self._aggregate_setup(aggr_state=pool_states.DISMISSED) - ex = self.assertRaises(exception.InvalidAggregateAction, - self.conn.add_to_aggregate, self.context, - aggregate, 'fake_host') - self.assertIn('aggregate deleted', str(ex)) - - def test_add_host_to_aggregate_invalid_error_status(self): - """Ensure InvalidAggregateAction is raised when aggregate is - in error. - """ - aggregate = self._aggregate_setup(aggr_state=pool_states.ERROR) - ex = self.assertRaises(exception.InvalidAggregateAction, - self.conn.add_to_aggregate, self.context, - aggregate, 'fake_host') - self.assertIn('aggregate in error', str(ex)) - - def test_remove_host_from_aggregate_error(self): - # Ensure we can remove a host from an aggregate even if in error. - values = _create_service_entries(self.context) - fake_zone = values.keys()[0] - aggr = self.api.create_aggregate(self.context, - 'fake_aggregate', fake_zone) - # let's mock the fact that the aggregate is ready! - metadata = {pool_states.POOL_FLAG: "XenAPI", - pool_states.KEY: pool_states.ACTIVE} - db.aggregate_metadata_add(self.context, aggr['id'], metadata) - for aggregate_host in values[fake_zone]: - aggr = self.api.add_host_to_aggregate(self.context, - aggr['id'], aggregate_host) - # let's mock the fact that the aggregate is in error! - expected = self.api.remove_host_from_aggregate(self.context, - aggr['id'], - values[fake_zone][0]) - self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts'])) - self.assertEqual(expected['metadata'][pool_states.KEY], - pool_states.ACTIVE) - - def test_remove_host_from_aggregate_invalid_dismissed_status(self): - """Ensure InvalidAggregateAction is raised when aggregate is - deleted. - """ - aggregate = self._aggregate_setup(aggr_state=pool_states.DISMISSED) - self.assertRaises(exception.InvalidAggregateAction, - self.conn.remove_from_aggregate, self.context, - aggregate, 'fake_host') - - def test_remove_host_from_aggregate_invalid_changing_status(self): - """Ensure InvalidAggregateAction is raised when aggregate is - changing. - """ - aggregate = self._aggregate_setup(aggr_state=pool_states.CHANGING) - self.assertRaises(exception.InvalidAggregateAction, - self.conn.remove_from_aggregate, self.context, - aggregate, 'fake_host') - - def test_add_aggregate_host_raise_err(self): - # Ensure the undo operation works correctly on add. - def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore): - raise exception.AggregateError( - aggregate_id='', action='', reason='') - self.stubs.Set(self.compute.driver, "add_to_aggregate", - fake_driver_add_to_aggregate) - metadata = {pool_states.POOL_FLAG: "XenAPI", - pool_states.KEY: pool_states.ACTIVE} - db.aggregate_metadata_add(self.context, self.aggr['id'], metadata) - db.aggregate_host_add(self.context, self.aggr['id'], 'fake_host') - - self.assertRaises(exception.AggregateError, - self.compute.add_aggregate_host, - self.context, host="fake_host", - aggregate=jsonutils.to_primitive(self.aggr), - slave_info=None) - excepted = db.aggregate_get(self.context, self.aggr['id']) - self.assertEqual(excepted['metadetails'][pool_states.KEY], - pool_states.ERROR) - self.assertEqual(excepted['hosts'], []) - - -class MockComputeAPI(object): - def __init__(self): - self._mock_calls = [] - - def add_aggregate_host(self, ctxt, aggregate, - host_param, host, slave_info): - self._mock_calls.append(( - self.add_aggregate_host, ctxt, aggregate, - host_param, host, slave_info)) - - def remove_aggregate_host(self, ctxt, aggregate_id, host_param, - host, slave_info): - self._mock_calls.append(( - self.remove_aggregate_host, ctxt, aggregate_id, - host_param, host, slave_info)) - - -class StubDependencies(object): - """Stub dependencies for ResourcePool.""" - - def __init__(self): - self.compute_rpcapi = MockComputeAPI() - - def _is_hv_pool(self, *_ignore): - return True - - def _get_metadata(self, *_ignore): - return { - pool_states.KEY: {}, - 'master_compute': 'master' - } - - def _create_slave_info(self, *ignore): - return "SLAVE_INFO" - - -class ResourcePoolWithStubs(StubDependencies, pool.ResourcePool): - """A ResourcePool, use stub dependencies.""" - - -class HypervisorPoolTestCase(test.NoDBTestCase): - - fake_aggregate = { - 'id': 98, - 'hosts': [], - 'metadata': { - 'master_compute': 'master', - pool_states.POOL_FLAG: {}, - pool_states.KEY: {} - } - } - - def test_slave_asks_master_to_add_slave_to_pool(self): - slave = ResourcePoolWithStubs() - - slave.add_to_aggregate("CONTEXT", self.fake_aggregate, "slave") - - self.assertIn( - (slave.compute_rpcapi.add_aggregate_host, - "CONTEXT", jsonutils.to_primitive(self.fake_aggregate), - "slave", "master", "SLAVE_INFO"), - slave.compute_rpcapi._mock_calls) - - def test_slave_asks_master_to_remove_slave_from_pool(self): - slave = ResourcePoolWithStubs() - - slave.remove_from_aggregate("CONTEXT", self.fake_aggregate, "slave") - - self.assertIn( - (slave.compute_rpcapi.remove_aggregate_host, - "CONTEXT", 98, "slave", "master", "SLAVE_INFO"), - slave.compute_rpcapi._mock_calls) - - -class SwapXapiHostTestCase(test.NoDBTestCase): - - def test_swapping(self): - self.assertEqual( - "http://otherserver:8765/somepath", - pool.swap_xapi_host( - "http://someserver:8765/somepath", 'otherserver')) - - def test_no_port(self): - self.assertEqual( - "http://otherserver/somepath", - pool.swap_xapi_host( - "http://someserver/somepath", 'otherserver')) - - def test_no_path(self): - self.assertEqual( - "http://otherserver", - pool.swap_xapi_host( - "http://someserver", 'otherserver')) - - -class XenAPILiveMigrateTestCase(stubs.XenAPITestBaseNoDB): - """Unit tests for live_migration.""" - def setUp(self): - super(XenAPILiveMigrateTestCase, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver', - host='host') - db_fakes.stub_out_db_instance_api(self.stubs) - self.context = context.get_admin_context() - - def test_live_migration_calls_vmops(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_live_migrate(context, instance_ref, dest, post_method, - recover_method, block_migration, migrate_data): - fake_live_migrate.called = True - - self.stubs.Set(self.conn._vmops, "live_migrate", fake_live_migrate) - - self.conn.live_migration(None, None, None, None, None) - self.assertTrue(fake_live_migrate.called) - - def test_pre_live_migration(self): - # ensure method is present - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.conn.pre_live_migration(None, None, None, None, None) - - def test_post_live_migration_at_destination(self): - # ensure method is present - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - fake_instance = {"name": "name"} - fake_network_info = "network_info" - - def fake_fw(instance, network_info): - self.assertEqual(instance, fake_instance) - self.assertEqual(network_info, fake_network_info) - fake_fw.call_count += 1 - - def fake_create_kernel_and_ramdisk(context, session, instance, - name_label): - return "fake-kernel-file", "fake-ramdisk-file" - - fake_fw.call_count = 0 - _vmops = self.conn._vmops - self.stubs.Set(_vmops.firewall_driver, - 'setup_basic_filtering', fake_fw) - self.stubs.Set(_vmops.firewall_driver, - 'prepare_instance_filter', fake_fw) - self.stubs.Set(_vmops.firewall_driver, - 'apply_instance_filter', fake_fw) - self.stubs.Set(vm_utils, "create_kernel_and_ramdisk", - fake_create_kernel_and_ramdisk) - - def fake_get_vm_opaque_ref(instance): - fake_get_vm_opaque_ref.called = True - self.stubs.Set(_vmops, "_get_vm_opaque_ref", fake_get_vm_opaque_ref) - fake_get_vm_opaque_ref.called = False - - def fake_strip_base_mirror_from_vdis(session, vm_ref): - fake_strip_base_mirror_from_vdis.called = True - self.stubs.Set(vm_utils, "strip_base_mirror_from_vdis", - fake_strip_base_mirror_from_vdis) - fake_strip_base_mirror_from_vdis.called = False - - self.conn.post_live_migration_at_destination(None, fake_instance, - fake_network_info, None) - self.assertEqual(fake_fw.call_count, 3) - self.assertTrue(fake_get_vm_opaque_ref.called) - self.assertTrue(fake_strip_base_mirror_from_vdis.called) - - def test_check_can_live_migrate_destination_with_block_migration(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self.stubs.Set(vm_utils, "safe_find_sr", lambda _x: "asdf") - - expected = {'block_migration': True, - 'migrate_data': { - 'migrate_send_data': "fake_migrate_data", - 'destination_sr_ref': 'asdf' - } - } - result = self.conn.check_can_live_migrate_destination(self.context, - {'host': 'host'}, - {}, {}, - True, False) - self.assertEqual(expected, result) - - def test_check_live_migrate_destination_verifies_ip(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - for pif_ref in xenapi_fake.get_all('PIF'): - pif_rec = xenapi_fake.get_record('PIF', pif_ref) - pif_rec['IP'] = '' - pif_rec['IPv6'] = '' - - self.stubs.Set(vm_utils, "safe_find_sr", lambda _x: "asdf") - - self.assertRaises(exception.MigrationError, - self.conn.check_can_live_migrate_destination, - self.context, {'host': 'host'}, - {}, {}, - True, False) - - def test_check_can_live_migrate_destination_block_migration_fails(self): - stubs.stubout_session(self.stubs, - stubs.FakeSessionForFailedMigrateTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self.assertRaises(exception.MigrationError, - self.conn.check_can_live_migrate_destination, - self.context, {'host': 'host'}, - {}, {}, - True, False) - - def _add_default_live_migrate_stubs(self, conn): - def fake_generate_vdi_map(destination_sr_ref, _vm_ref): - pass - - def fake_get_iscsi_srs(destination_sr_ref, _vm_ref): - return [] - - def fake_get_vm_opaque_ref(instance): - return "fake_vm" - - def fake_lookup_kernel_ramdisk(session, vm): - return ("fake_PV_kernel", "fake_PV_ramdisk") - - self.stubs.Set(conn._vmops, "_generate_vdi_map", - fake_generate_vdi_map) - self.stubs.Set(conn._vmops, "_get_iscsi_srs", - fake_get_iscsi_srs) - self.stubs.Set(conn._vmops, "_get_vm_opaque_ref", - fake_get_vm_opaque_ref) - self.stubs.Set(vm_utils, "lookup_kernel_ramdisk", - fake_lookup_kernel_ramdisk) - - def test_check_can_live_migrate_source_with_block_migrate(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - dest_check_data = {'block_migration': True, - 'migrate_data': { - 'destination_sr_ref': None, - 'migrate_send_data': None - }} - result = self.conn.check_can_live_migrate_source(self.context, - {'host': 'host'}, - dest_check_data) - self.assertEqual(dest_check_data, result) - - def test_check_can_live_migrate_source_with_block_migrate_iscsi(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - def fake_get_iscsi_srs(destination_sr_ref, _vm_ref): - return ['sr_ref'] - self.stubs.Set(self.conn._vmops, "_get_iscsi_srs", - fake_get_iscsi_srs) - - def fake_make_plugin_call(plugin, method, **args): - return "true" - self.stubs.Set(self.conn._vmops, "_make_plugin_call", - fake_make_plugin_call) - - dest_check_data = {'block_migration': True, - 'migrate_data': { - 'destination_sr_ref': None, - 'migrate_send_data': None - }} - result = self.conn.check_can_live_migrate_source(self.context, - {'host': 'host'}, - dest_check_data) - self.assertEqual(dest_check_data, result) - - def test_check_can_live_migrate_source_with_block_iscsi_fails(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - def fake_get_iscsi_srs(destination_sr_ref, _vm_ref): - return ['sr_ref'] - self.stubs.Set(self.conn._vmops, "_get_iscsi_srs", - fake_get_iscsi_srs) - - def fake_make_plugin_call(plugin, method, **args): - return {'returncode': 'error', 'message': 'Plugin not found'} - self.stubs.Set(self.conn._vmops, "_make_plugin_call", - fake_make_plugin_call) - - self.assertRaises(exception.MigrationError, - self.conn.check_can_live_migrate_source, - self.context, {'host': 'host'}, - {}) - - def test_check_can_live_migrate_source_with_block_migrate_fails(self): - stubs.stubout_session(self.stubs, - stubs.FakeSessionForFailedMigrateTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - dest_check_data = {'block_migration': True, - 'migrate_data': { - 'destination_sr_ref': None, - 'migrate_send_data': None - }} - self.assertRaises(exception.MigrationError, - self.conn.check_can_live_migrate_source, - self.context, - {'host': 'host'}, - dest_check_data) - - def test_check_can_live_migrate_works(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_aggregate_get_by_host(context, host, key=None): - self.assertEqual(CONF.host, host) - return [dict(test_aggregate.fake_aggregate, - metadetails={"host": "test_host_uuid"})] - - self.stubs.Set(db, "aggregate_get_by_host", - fake_aggregate_get_by_host) - self.conn.check_can_live_migrate_destination(self.context, - {'host': 'host'}, False, False) - - def test_check_can_live_migrate_fails(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_aggregate_get_by_host(context, host, key=None): - self.assertEqual(CONF.host, host) - return [dict(test_aggregate.fake_aggregate, - metadetails={"dest_other": "test_host_uuid"})] - - self.stubs.Set(db, "aggregate_get_by_host", - fake_aggregate_get_by_host) - self.assertRaises(exception.MigrationError, - self.conn.check_can_live_migrate_destination, - self.context, {'host': 'host'}, None, None) - - def test_live_migration(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_get_vm_opaque_ref(instance): - return "fake_vm" - self.stubs.Set(self.conn._vmops, "_get_vm_opaque_ref", - fake_get_vm_opaque_ref) - - def fake_get_host_opaque_ref(context, destination_hostname): - return "fake_host" - self.stubs.Set(self.conn._vmops, "_get_host_opaque_ref", - fake_get_host_opaque_ref) - - def post_method(context, instance, destination_hostname, - block_migration, migrate_data): - post_method.called = True - - self.conn.live_migration(self.conn, None, None, post_method, None) - - self.assertTrue(post_method.called, "post_method.called") - - def test_live_migration_on_failure(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - def fake_get_vm_opaque_ref(instance): - return "fake_vm" - self.stubs.Set(self.conn._vmops, "_get_vm_opaque_ref", - fake_get_vm_opaque_ref) - - def fake_get_host_opaque_ref(context, destination_hostname): - return "fake_host" - self.stubs.Set(self.conn._vmops, "_get_host_opaque_ref", - fake_get_host_opaque_ref) - - def fake_call_xenapi(*args): - raise NotImplementedError() - self.stubs.Set(self.conn._vmops._session, "call_xenapi", - fake_call_xenapi) - - def recover_method(context, instance, destination_hostname, - block_migration): - recover_method.called = True - - self.assertRaises(NotImplementedError, self.conn.live_migration, - self.conn, None, None, None, recover_method) - self.assertTrue(recover_method.called, "recover_method.called") - - def test_live_migration_calls_post_migration(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - def post_method(context, instance, destination_hostname, - block_migration, migrate_data): - post_method.called = True - - # pass block_migration = True and migrate data - migrate_data = {"destination_sr_ref": "foo", - "migrate_send_data": "bar"} - self.conn.live_migration(self.conn, None, None, post_method, None, - True, migrate_data) - self.assertTrue(post_method.called, "post_method.called") - - def test_live_migration_block_cleans_srs(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - def fake_get_iscsi_srs(context, instance): - return ['sr_ref'] - self.stubs.Set(self.conn._vmops, "_get_iscsi_srs", - fake_get_iscsi_srs) - - def fake_forget_sr(context, instance): - fake_forget_sr.called = True - self.stubs.Set(volume_utils, "forget_sr", - fake_forget_sr) - - def post_method(context, instance, destination_hostname, - block_migration, migrate_data): - post_method.called = True - - migrate_data = {"destination_sr_ref": "foo", - "migrate_send_data": "bar"} - self.conn.live_migration(self.conn, None, None, post_method, None, - True, migrate_data) - - self.assertTrue(post_method.called, "post_method.called") - self.assertTrue(fake_forget_sr.called, "forget_sr.called") - - def test_live_migration_with_block_migration_raises_invalid_param(self): - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - def recover_method(context, instance, destination_hostname, - block_migration): - recover_method.called = True - # pass block_migration = True and no migrate data - self.assertRaises(exception.InvalidParameterValue, - self.conn.live_migration, self.conn, - None, None, None, recover_method, True, None) - self.assertTrue(recover_method.called, "recover_method.called") - - def test_live_migration_with_block_migration_fails_migrate_send(self): - stubs.stubout_session(self.stubs, - stubs.FakeSessionForFailedMigrateTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(self.conn) - - def recover_method(context, instance, destination_hostname, - block_migration): - recover_method.called = True - # pass block_migration = True and migrate data - migrate_data = dict(destination_sr_ref='foo', migrate_send_data='bar') - self.assertRaises(exception.MigrationError, - self.conn.live_migration, self.conn, - None, None, None, recover_method, True, migrate_data) - self.assertTrue(recover_method.called, "recover_method.called") - - def test_live_migrate_block_migration_xapi_call_parameters(self): - - fake_vdi_map = object() - - class Session(xenapi_fake.SessionBase): - def VM_migrate_send(self_, session, vmref, migrate_data, islive, - vdi_map, vif_map, options): - self.assertEqual('SOMEDATA', migrate_data) - self.assertEqual(fake_vdi_map, vdi_map) - - stubs.stubout_session(self.stubs, Session) - - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self._add_default_live_migrate_stubs(conn) - - def fake_generate_vdi_map(destination_sr_ref, _vm_ref): - return fake_vdi_map - - self.stubs.Set(conn._vmops, "_generate_vdi_map", - fake_generate_vdi_map) - - def dummy_callback(*args, **kwargs): - pass - - conn.live_migration( - self.context, instance=dict(name='ignore'), dest=None, - post_method=dummy_callback, recover_method=dummy_callback, - block_migration="SOMEDATA", - migrate_data=dict(migrate_send_data='SOMEDATA', - destination_sr_ref="TARGET_SR_OPAQUE_REF")) - - def test_live_migrate_pool_migration_xapi_call_parameters(self): - - class Session(xenapi_fake.SessionBase): - def VM_pool_migrate(self_, session, vm_ref, host_ref, options): - self.assertEqual("fake_ref", host_ref) - self.assertEqual({"live": "true"}, options) - raise IOError() - - stubs.stubout_session(self.stubs, Session) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - self._add_default_live_migrate_stubs(conn) - - def fake_get_host_opaque_ref(context, destination): - return "fake_ref" - - self.stubs.Set(conn._vmops, "_get_host_opaque_ref", - fake_get_host_opaque_ref) - - def dummy_callback(*args, **kwargs): - pass - - self.assertRaises(IOError, conn.live_migration, - self.context, instance=dict(name='ignore'), dest=None, - post_method=dummy_callback, recover_method=dummy_callback, - block_migration=False, migrate_data={}) - - def test_generate_vdi_map(self): - stubs.stubout_session(self.stubs, xenapi_fake.SessionBase) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - vm_ref = "fake_vm_ref" - - def fake_find_sr(_session): - self.assertEqual(conn._session, _session) - return "source_sr_ref" - self.stubs.Set(vm_utils, "safe_find_sr", fake_find_sr) - - def fake_get_instance_vdis_for_sr(_session, _vm_ref, _sr_ref): - self.assertEqual(conn._session, _session) - self.assertEqual(vm_ref, _vm_ref) - self.assertEqual("source_sr_ref", _sr_ref) - return ["vdi0", "vdi1"] - - self.stubs.Set(vm_utils, "get_instance_vdis_for_sr", - fake_get_instance_vdis_for_sr) - - result = conn._vmops._generate_vdi_map("dest_sr_ref", vm_ref) - - self.assertEqual({"vdi0": "dest_sr_ref", - "vdi1": "dest_sr_ref"}, result) - - def test_rollback_live_migration_at_destination(self): - stubs.stubout_session(self.stubs, xenapi_fake.SessionBase) - conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - with mock.patch.object(conn, "destroy") as mock_destroy: - conn.rollback_live_migration_at_destination("context", - "instance", [], None) - self.assertFalse(mock_destroy.called) - - -class XenAPIInjectMetadataTestCase(stubs.XenAPITestBaseNoDB): - def setUp(self): - super(XenAPIInjectMetadataTestCase, self).setUp() - self.flags(connection_url='test_url', - connection_password='test_pass', - group='xenserver') - self.flags(firewall_driver='nova.virt.xenapi.firewall.' - 'Dom0IptablesFirewallDriver') - stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) - self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False) - - self.xenstore = dict(persist={}, ephem={}) - - self.called_fake_get_vm_opaque_ref = False - - def fake_get_vm_opaque_ref(inst, instance): - self.called_fake_get_vm_opaque_ref = True - if instance["uuid"] == "not_found": - raise exception.NotFound - self.assertEqual(instance, {'uuid': 'fake'}) - return 'vm_ref' - - def fake_add_to_param_xenstore(inst, vm_ref, key, val): - self.assertEqual(vm_ref, 'vm_ref') - self.xenstore['persist'][key] = val - - def fake_remove_from_param_xenstore(inst, vm_ref, key): - self.assertEqual(vm_ref, 'vm_ref') - if key in self.xenstore['persist']: - del self.xenstore['persist'][key] - - def fake_write_to_xenstore(inst, instance, path, value, vm_ref=None): - self.assertEqual(instance, {'uuid': 'fake'}) - self.assertEqual(vm_ref, 'vm_ref') - self.xenstore['ephem'][path] = jsonutils.dumps(value) - - def fake_delete_from_xenstore(inst, instance, path, vm_ref=None): - self.assertEqual(instance, {'uuid': 'fake'}) - self.assertEqual(vm_ref, 'vm_ref') - if path in self.xenstore['ephem']: - del self.xenstore['ephem'][path] - - self.stubs.Set(vmops.VMOps, '_get_vm_opaque_ref', - fake_get_vm_opaque_ref) - self.stubs.Set(vmops.VMOps, '_add_to_param_xenstore', - fake_add_to_param_xenstore) - self.stubs.Set(vmops.VMOps, '_remove_from_param_xenstore', - fake_remove_from_param_xenstore) - self.stubs.Set(vmops.VMOps, '_write_to_xenstore', - fake_write_to_xenstore) - self.stubs.Set(vmops.VMOps, '_delete_from_xenstore', - fake_delete_from_xenstore) - - def test_inject_instance_metadata(self): - - # Add some system_metadata to ensure it doesn't get added - # to xenstore - instance = dict(metadata=[{'key': 'a', 'value': 1}, - {'key': 'b', 'value': 2}, - {'key': 'c', 'value': 3}, - # Check xenstore key sanitizing - {'key': 'hi.there', 'value': 4}, - {'key': 'hi!t.e/e', 'value': 5}], - # Check xenstore key sanitizing - system_metadata=[{'key': 'sys_a', 'value': 1}, - {'key': 'sys_b', 'value': 2}, - {'key': 'sys_c', 'value': 3}], - uuid='fake') - self.conn._vmops._inject_instance_metadata(instance, 'vm_ref') - - self.assertEqual(self.xenstore, { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - 'vm-data/user-metadata/hi_there': '4', - 'vm-data/user-metadata/hi_t_e_e': '5', - }, - 'ephem': {}, - }) - - def test_change_instance_metadata_add(self): - # Test XenStore key sanitizing here, too. - diff = {'test.key': ['+', 4]} - instance = {'uuid': 'fake'} - self.xenstore = { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - }, - 'ephem': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - }, - } - - self.conn._vmops.change_instance_metadata(instance, diff) - - self.assertEqual(self.xenstore, { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - 'vm-data/user-metadata/test_key': '4', - }, - 'ephem': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - 'vm-data/user-metadata/test_key': '4', - }, - }) - - def test_change_instance_metadata_update(self): - diff = dict(b=['+', 4]) - instance = {'uuid': 'fake'} - self.xenstore = { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - }, - 'ephem': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - }, - } - - self.conn._vmops.change_instance_metadata(instance, diff) - - self.assertEqual(self.xenstore, { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '4', - 'vm-data/user-metadata/c': '3', - }, - 'ephem': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '4', - 'vm-data/user-metadata/c': '3', - }, - }) - - def test_change_instance_metadata_delete(self): - diff = dict(b=['-']) - instance = {'uuid': 'fake'} - self.xenstore = { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - }, - 'ephem': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/b': '2', - 'vm-data/user-metadata/c': '3', - }, - } - - self.conn._vmops.change_instance_metadata(instance, diff) - - self.assertEqual(self.xenstore, { - 'persist': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/c': '3', - }, - 'ephem': { - 'vm-data/user-metadata/a': '1', - 'vm-data/user-metadata/c': '3', - }, - }) - - def test_change_instance_metadata_not_found(self): - instance = {'uuid': 'not_found'} - self.conn._vmops.change_instance_metadata(instance, "fake_diff") - self.assertTrue(self.called_fake_get_vm_opaque_ref) - - -class XenAPISessionTestCase(test.NoDBTestCase): - def _get_mock_xapisession(self, software_version): - class MockXapiSession(xenapi_session.XenAPISession): - def __init__(_ignore): - "Skip the superclass's dirty init" - - def _get_software_version(_ignore): - return software_version - - return MockXapiSession() - - def test_local_session(self): - session = self._get_mock_xapisession({}) - session.is_local_connection = True - session.XenAPI = self.mox.CreateMockAnything() - session.XenAPI.xapi_local().AndReturn("local_connection") - - self.mox.ReplayAll() - self.assertEqual("local_connection", - session._create_session("unix://local")) - - def test_remote_session(self): - session = self._get_mock_xapisession({}) - session.is_local_connection = False - session.XenAPI = self.mox.CreateMockAnything() - session.XenAPI.Session("url").AndReturn("remote_connection") - - self.mox.ReplayAll() - self.assertEqual("remote_connection", session._create_session("url")) - - def test_get_product_version_product_brand_does_not_fail(self): - session = self._get_mock_xapisession({ - 'build_number': '0', - 'date': '2012-08-03', - 'hostname': 'komainu', - 'linux': '3.2.0-27-generic', - 'network_backend': 'bridge', - 'platform_name': 'XCP_Kronos', - 'platform_version': '1.6.0', - 'xapi': '1.3', - 'xen': '4.1.2', - 'xencenter_max': '1.10', - 'xencenter_min': '1.10' - }) - - self.assertEqual( - ((1, 6, 0), None), - session._get_product_version_and_brand() - ) - - def test_get_product_version_product_brand_xs_6(self): - session = self._get_mock_xapisession({ - 'product_brand': 'XenServer', - 'product_version': '6.0.50', - 'platform_version': '0.0.1' - }) - - self.assertEqual( - ((6, 0, 50), 'XenServer'), - session._get_product_version_and_brand() - ) - - def test_verify_plugin_version_same(self): - session = self._get_mock_xapisession({}) - - session.PLUGIN_REQUIRED_VERSION = '2.4' - - self.mox.StubOutWithMock(session, 'call_plugin_serialized') - session.call_plugin_serialized('nova_plugin_version', 'get_version', - ).AndReturn("2.4") - - self.mox.ReplayAll() - session._verify_plugin_version() - - def test_verify_plugin_version_compatible(self): - session = self._get_mock_xapisession({}) - session.XenAPI = xenapi_fake.FakeXenAPI() - - session.PLUGIN_REQUIRED_VERSION = '2.4' - - self.mox.StubOutWithMock(session, 'call_plugin_serialized') - session.call_plugin_serialized('nova_plugin_version', 'get_version', - ).AndReturn("2.5") - - self.mox.ReplayAll() - session._verify_plugin_version() - - def test_verify_plugin_version_bad_maj(self): - session = self._get_mock_xapisession({}) - session.XenAPI = xenapi_fake.FakeXenAPI() - - session.PLUGIN_REQUIRED_VERSION = '2.4' - - self.mox.StubOutWithMock(session, 'call_plugin_serialized') - session.call_plugin_serialized('nova_plugin_version', 'get_version', - ).AndReturn("3.0") - - self.mox.ReplayAll() - self.assertRaises(xenapi_fake.Failure, session._verify_plugin_version) - - def test_verify_plugin_version_bad_min(self): - session = self._get_mock_xapisession({}) - session.XenAPI = xenapi_fake.FakeXenAPI() - - session.PLUGIN_REQUIRED_VERSION = '2.4' - - self.mox.StubOutWithMock(session, 'call_plugin_serialized') - session.call_plugin_serialized('nova_plugin_version', 'get_version', - ).AndReturn("2.3") - - self.mox.ReplayAll() - self.assertRaises(xenapi_fake.Failure, session._verify_plugin_version) - - def test_verify_current_version_matches(self): - session = self._get_mock_xapisession({}) - - # Import the plugin to extract its version - path = os.path.dirname(__file__) - rel_path_elem = "../../../../plugins/xenserver/xenapi/etc/xapi.d/" \ - "plugins/nova_plugin_version" - for elem in rel_path_elem.split('/'): - path = os.path.join(path, elem) - path = os.path.realpath(path) - - plugin_version = None - with open(path) as plugin_file: - for line in plugin_file: - if "PLUGIN_VERSION = " in line: - plugin_version = line.strip()[17:].strip('"') - - self.assertEqual(session.PLUGIN_REQUIRED_VERSION, - plugin_version) - - -class XenAPIFakeTestCase(test.NoDBTestCase): - def test_query_matches(self): - record = {'a': '1', 'b': '2', 'c_d': '3'} - - tests = {'field "a"="1"': True, - 'field "b"="2"': True, - 'field "b"="4"': False, - 'not field "b"="4"': True, - 'field "a"="1" and field "b"="4"': False, - 'field "a"="1" or field "b"="4"': True, - 'field "c__d"="3"': True, - 'field \'b\'=\'2\'': True, - } - - for query in tests.keys(): - expected = tests[query] - fail_msg = "for test '%s'" % query - self.assertEqual(xenapi_fake._query_matches(record, query), - expected, fail_msg) - - def test_query_bad_format(self): - record = {'a': '1', 'b': '2', 'c': '3'} - - tests = ['"a"="1" or "b"="4"', - 'a=1', - ] - - for query in tests: - fail_msg = "for test '%s'" % query - self.assertFalse(xenapi_fake._query_matches(record, query), - fail_msg) |