summaryrefslogtreecommitdiff
path: root/nova/tests/unit/db
diff options
context:
space:
mode:
authorStephen Finucane <sfinucan@redhat.com>2020-01-23 16:35:00 +0000
committerStephen Finucane <sfinucan@redhat.com>2020-02-18 13:19:43 +0000
commit998475f5bdb1fa1473716d93a60999c66f88dc68 (patch)
tree10092c4214fdf989093362e78e5ecbd93230e8f6 /nova/tests/unit/db
parent25b7cf23017d83805bf5f2b2626d24323c1f7023 (diff)
downloadnova-998475f5bdb1fa1473716d93a60999c66f88dc68.tar.gz
nova-net: Remove unused nova-network objects
With the removal of the related quotas, the 'FixedIP', 'FloatingIP', 'SecurityGroupRule', 'Network' and 'NetworkList' objects can all be deleted. As noted previously, the 'SecurityGroup' object must be retained until we bump the 'Instance' object version to v3 and drop the 'security_group' field. In addition, we need to make a small change to the object versioning test to ensure we're only testing objects in the nova namespace. If we don't do this, we end up pulling in things like os_vif objects. This was previously noted in change Ica9f217d0318fc7c2db4bcdea12d00aad749c30c. Change-Id: I6aba959eff1e50af4ac040148c7177f235a09a1f Signed-off-by: Stephen Finucane <sfinucan@redhat.com>
Diffstat (limited to 'nova/tests/unit/db')
-rw-r--r--nova/tests/unit/db/test_db_api.py1908
1 files changed, 0 insertions, 1908 deletions
diff --git a/nova/tests/unit/db/test_db_api.py b/nova/tests/unit/db/test_db_api.py
index 3950f742e2..31ca028ab5 100644
--- a/nova/tests/unit/db/test_db_api.py
+++ b/nova/tests/unit/db/test_db_api.py
@@ -62,7 +62,6 @@ from nova.db.sqlalchemy import models
from nova.db.sqlalchemy import types as col_types
from nova.db.sqlalchemy import utils as db_utils
from nova import exception
-from nova import objects
from nova.objects import fields
from nova import test
from nova.tests import fixtures as nova_fixtures
@@ -1590,384 +1589,6 @@ class InstanceSystemMetadataTestCase(test.TestCase):
{'key': 'value'}, True)
-class SecurityGroupRuleTestCase(test.TestCase, ModelsObjectComparatorMixin):
- def setUp(self):
- super(SecurityGroupRuleTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_base_values(self):
- return {
- 'name': 'fake_sec_group',
- 'description': 'fake_sec_group_descr',
- 'user_id': 'fake',
- 'project_id': 'fake',
- 'instances': []
- }
-
- def _get_base_rule_values(self):
- return {
- 'protocol': "tcp",
- 'from_port': 80,
- 'to_port': 8080,
- 'cidr': None,
- 'deleted': 0,
- 'deleted_at': None,
- 'grantee_group': None,
- 'updated_at': None
- }
-
- def _create_security_group(self, values):
- v = self._get_base_values()
- v.update(values)
- return db.security_group_create(self.ctxt, v)
-
- def _create_security_group_rule(self, values):
- v = self._get_base_rule_values()
- v.update(values)
- return db.security_group_rule_create(self.ctxt, v)
-
- def test_security_group_rule_create(self):
- security_group_rule = self._create_security_group_rule({})
- self.assertIsNotNone(security_group_rule['id'])
- for key, value in self._get_base_rule_values().items():
- self.assertEqual(value, security_group_rule[key])
-
- def _test_security_group_rule_get_by_security_group(self, columns=None):
- instance = db.instance_create(self.ctxt,
- {'system_metadata': {'foo': 'bar'}})
- security_group = self._create_security_group({
- 'instances': [instance]})
- security_group_rule = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- security_group_rule1 = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- found_rules = db.security_group_rule_get_by_security_group(
- self.ctxt, security_group['id'], columns_to_join=columns)
- self.assertEqual(len(found_rules), 2)
- rules_ids = [security_group_rule['id'], security_group_rule1['id']]
- for rule in found_rules:
- if columns is None:
- self.assertIn('grantee_group', dict(rule))
- self.assertIn('instances',
- dict(rule.grantee_group))
- self.assertIn(
- 'system_metadata',
- dict(rule.grantee_group.instances[0]))
- self.assertIn(rule['id'], rules_ids)
- else:
- self.assertNotIn('grantee_group', dict(rule))
-
- def test_security_group_rule_get_by_security_group(self):
- self._test_security_group_rule_get_by_security_group()
-
- def test_security_group_rule_get_by_security_group_no_joins(self):
- self._test_security_group_rule_get_by_security_group(columns=[])
-
- def test_security_group_rule_get_by_instance(self):
- instance = db.instance_create(self.ctxt, {})
- security_group = self._create_security_group({
- 'instances': [instance]})
- security_group_rule = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- security_group_rule1 = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- security_group_rule_ids = [security_group_rule['id'],
- security_group_rule1['id']]
- found_rules = db.security_group_rule_get_by_instance(self.ctxt,
- instance['uuid'])
- self.assertEqual(len(found_rules), 2)
- for rule in found_rules:
- self.assertIn('grantee_group', rule)
- self.assertIn(rule['id'], security_group_rule_ids)
-
- def test_security_group_rule_destroy(self):
- self._create_security_group({'name': 'fake1'})
- self._create_security_group({'name': 'fake2'})
- security_group_rule1 = self._create_security_group_rule({})
- security_group_rule2 = self._create_security_group_rule({})
- db.security_group_rule_destroy(self.ctxt, security_group_rule1['id'])
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_rule_get,
- self.ctxt, security_group_rule1['id'])
- self._assertEqualObjects(db.security_group_rule_get(self.ctxt,
- security_group_rule2['id']),
- security_group_rule2, ['grantee_group'])
-
- def test_security_group_rule_destroy_not_found_exception(self):
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_rule_destroy, self.ctxt, 100500)
-
- def test_security_group_rule_get(self):
- security_group_rule1 = (
- self._create_security_group_rule({}))
- self._create_security_group_rule({})
- real_security_group_rule = db.security_group_rule_get(self.ctxt,
- security_group_rule1['id'])
- self._assertEqualObjects(security_group_rule1,
- real_security_group_rule, ['grantee_group'])
-
- def test_security_group_rule_get_not_found_exception(self):
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_rule_get, self.ctxt, 100500)
-
- def test_security_group_rule_count_by_group(self):
- sg1 = self._create_security_group({'name': 'fake1'})
- sg2 = self._create_security_group({'name': 'fake2'})
- rules_by_group = {sg1: [], sg2: []}
- for group in rules_by_group:
- rules = rules_by_group[group]
- for i in range(0, 10):
- rules.append(
- self._create_security_group_rule({'parent_group_id':
- group['id']}))
- db.security_group_rule_destroy(self.ctxt,
- rules_by_group[sg1][0]['id'])
- counted_groups = [db.security_group_rule_count_by_group(self.ctxt,
- group['id'])
- for group in [sg1, sg2]]
- expected = [9, 10]
- self.assertEqual(counted_groups, expected)
-
-
-class SecurityGroupTestCase(test.TestCase, ModelsObjectComparatorMixin):
- def setUp(self):
- super(SecurityGroupTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_base_values(self):
- return {
- 'name': 'fake_sec_group',
- 'description': 'fake_sec_group_descr',
- 'user_id': 'fake',
- 'project_id': 'fake',
- 'instances': []
- }
-
- def _create_security_group(self, values):
- v = self._get_base_values()
- v.update(values)
- return db.security_group_create(self.ctxt, v)
-
- def test_security_group_create(self):
- security_group = self._create_security_group({})
- self.assertIsNotNone(security_group['id'])
- for key, value in self._get_base_values().items():
- self.assertEqual(value, security_group[key])
-
- def test_security_group_destroy(self):
- security_group1 = self._create_security_group({})
- security_group2 = \
- self._create_security_group({'name': 'fake_sec_group2'})
-
- db.security_group_destroy(self.ctxt, security_group1['id'])
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_get,
- self.ctxt, security_group1['id'])
- self._assertEqualObjects(db.security_group_get(
- self.ctxt, security_group2['id'],
- columns_to_join=['instances',
- 'rules']), security_group2)
-
- def test_security_group_destroy_with_instance(self):
- security_group1 = self._create_security_group({})
-
- instance = db.instance_create(self.ctxt, {})
- db.instance_add_security_group(self.ctxt, instance.uuid,
- security_group1.id)
-
- self.assertEqual(
- 1,
- len(db.security_group_get_by_instance(self.ctxt, instance.uuid)))
-
- db.security_group_destroy(self.ctxt, security_group1['id'])
-
- self.assertEqual(
- 0,
- len(db.security_group_get_by_instance(self.ctxt, instance.uuid)))
-
- def test_security_group_get(self):
- security_group1 = self._create_security_group({})
- self._create_security_group({'name': 'fake_sec_group2'})
- real_security_group = db.security_group_get(self.ctxt,
- security_group1['id'],
- columns_to_join=['instances',
- 'rules'])
- self._assertEqualObjects(security_group1,
- real_security_group)
-
- def test_security_group_get_with_instance_columns(self):
- instance = db.instance_create(self.ctxt,
- {'system_metadata': {'foo': 'bar'}})
- secgroup = self._create_security_group({'instances': [instance]})
- secgroup = db.security_group_get(
- self.ctxt, secgroup['id'],
- columns_to_join=['instances.system_metadata'])
- inst = secgroup.instances[0]
- self.assertIn('system_metadata', dict(inst).keys())
-
- def test_security_group_get_no_instances(self):
- instance = db.instance_create(self.ctxt, {})
- sid = self._create_security_group({'instances': [instance]})['id']
-
- security_group = db.security_group_get(self.ctxt, sid,
- columns_to_join=['instances'])
- self.assertIn('instances', security_group.__dict__)
-
- security_group = db.security_group_get(self.ctxt, sid)
- self.assertNotIn('instances', security_group.__dict__)
-
- def test_security_group_get_not_found_exception(self):
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_get, self.ctxt, 100500)
-
- def test_security_group_get_by_name(self):
- security_group1 = self._create_security_group({'name': 'fake1'})
- security_group2 = self._create_security_group({'name': 'fake2'})
-
- real_security_group1 = db.security_group_get_by_name(
- self.ctxt,
- security_group1['project_id'],
- security_group1['name'],
- columns_to_join=None)
- real_security_group2 = db.security_group_get_by_name(
- self.ctxt,
- security_group2['project_id'],
- security_group2['name'],
- columns_to_join=None)
- self._assertEqualObjects(security_group1, real_security_group1)
- self._assertEqualObjects(security_group2, real_security_group2)
-
- def test_security_group_get_by_project(self):
- security_group1 = self._create_security_group(
- {'name': 'fake1', 'project_id': 'fake_proj1'})
- security_group2 = self._create_security_group(
- {'name': 'fake2', 'project_id': 'fake_proj2'})
-
- real1 = db.security_group_get_by_project(
- self.ctxt,
- security_group1['project_id'])
- real2 = db.security_group_get_by_project(
- self.ctxt,
- security_group2['project_id'])
-
- expected1, expected2 = [security_group1], [security_group2]
- self._assertEqualListsOfObjects(expected1, real1,
- ignored_keys=['instances'])
- self._assertEqualListsOfObjects(expected2, real2,
- ignored_keys=['instances'])
-
- def test_security_group_get_by_instance(self):
- instance = db.instance_create(self.ctxt, dict(host='foo'))
- values = [
- {'name': 'fake1', 'instances': [instance]},
- {'name': 'fake2', 'instances': [instance]},
- {'name': 'fake3', 'instances': []},
- ]
- security_groups = [self._create_security_group(vals)
- for vals in values]
-
- real = db.security_group_get_by_instance(self.ctxt,
- instance['uuid'])
- expected = security_groups[:2]
- self._assertEqualListsOfObjects(expected, real,
- ignored_keys=['instances'])
-
- def test_security_group_get_all(self):
- values = [
- {'name': 'fake1', 'project_id': 'fake_proj1'},
- {'name': 'fake2', 'project_id': 'fake_proj2'},
- ]
- security_groups = [self._create_security_group(vals)
- for vals in values]
-
- real = db.security_group_get_all(self.ctxt)
-
- self._assertEqualListsOfObjects(security_groups, real,
- ignored_keys=['instances'])
-
- def test_security_group_in_use(self):
- instance = db.instance_create(self.ctxt, dict(host='foo'))
- values = [
- {'instances': [instance],
- 'name': 'fake_in_use'},
- {'instances': []},
- ]
-
- security_groups = [self._create_security_group(vals)
- for vals in values]
-
- real = []
- for security_group in security_groups:
- in_use = db.security_group_in_use(self.ctxt,
- security_group['id'])
- real.append(in_use)
- expected = [True, False]
-
- self.assertEqual(expected, real)
-
- def test_security_group_ensure_default(self):
- self.ctxt.project_id = 'fake'
- self.ctxt.user_id = 'fake'
- self.assertEqual(0, len(db.security_group_get_by_project(
- self.ctxt,
- self.ctxt.project_id)))
-
- db.security_group_ensure_default(self.ctxt)
-
- security_groups = db.security_group_get_by_project(
- self.ctxt,
- self.ctxt.project_id)
-
- self.assertEqual(1, len(security_groups))
- self.assertEqual("default", security_groups[0]["name"])
-
- @mock.patch.object(sqlalchemy_api, '_security_group_get_by_names')
- def test_security_group_ensure_default_called_concurrently(self, sg_mock):
- # make sure NotFound is always raised here to trick Nova to insert the
- # duplicate security group entry
- sg_mock.side_effect = exception.NotFound
-
- # create the first db entry
- self.ctxt.project_id = 1
- db.security_group_ensure_default(self.ctxt)
- security_groups = db.security_group_get_by_project(
- self.ctxt,
- self.ctxt.project_id)
- self.assertEqual(1, len(security_groups))
-
- # create the second one and ensure the exception is handled properly
- default_group = db.security_group_ensure_default(self.ctxt)
- self.assertEqual('default', default_group.name)
-
- def test_security_group_update(self):
- security_group = self._create_security_group({})
- new_values = {
- 'name': 'sec_group1',
- 'description': 'sec_group_descr1',
- 'user_id': 'fake_user1',
- 'project_id': 'fake_proj1',
- }
-
- updated_group = db.security_group_update(self.ctxt,
- security_group['id'],
- new_values,
- columns_to_join=['rules.grantee_group'])
- for key, value in new_values.items():
- self.assertEqual(updated_group[key], value)
- self.assertEqual(updated_group['rules'], [])
-
- def test_security_group_update_to_duplicate(self):
- self._create_security_group(
- {'name': 'fake1', 'project_id': 'fake_proj1'})
- security_group2 = self._create_security_group(
- {'name': 'fake1', 'project_id': 'fake_proj2'})
-
- self.assertRaises(exception.SecurityGroupExists,
- db.security_group_update,
- self.ctxt, security_group2['id'],
- {'project_id': 'fake_proj1'})
-
-
@mock.patch('time.sleep', new=lambda x: None)
class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
@@ -2984,38 +2605,6 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Ensure that metadata is updated during instance_update
self._test_instance_update_updates_metadata('metadata')
- def test_instance_floating_address_get_all(self):
- ctxt = context.get_admin_context()
-
- instance1 = db.instance_create(ctxt, {'host': 'h1', 'hostname': 'n1'})
- instance2 = db.instance_create(ctxt, {'host': 'h2', 'hostname': 'n2'})
-
- fixed_addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_addresses = ['2.1.1.1', '2.1.1.2', '2.1.1.3']
- instance_uuids = [instance1['uuid'], instance1['uuid'],
- instance2['uuid']]
-
- for fixed_addr, float_addr, instance_uuid in zip(fixed_addresses,
- float_addresses,
- instance_uuids):
- db.fixed_ip_create(ctxt, {'address': fixed_addr,
- 'instance_uuid': instance_uuid})
- fixed_id = db.fixed_ip_get_by_address(ctxt, fixed_addr)['id']
- db.floating_ip_create(ctxt,
- {'address': float_addr,
- 'fixed_ip_id': fixed_id})
-
- real_float_addresses = \
- db.instance_floating_address_get_all(ctxt, instance_uuids[0])
- self.assertEqual(set(float_addresses[:2]), set(real_float_addresses))
- real_float_addresses = \
- db.instance_floating_address_get_all(ctxt, instance_uuids[2])
- self.assertEqual(set([float_addresses[2]]), set(real_float_addresses))
-
- self.assertRaises(exception.InvalidUUID,
- db.instance_floating_address_get_all,
- ctxt, 'invalid_uuid')
-
def test_instance_stringified_ips(self):
instance = self.create_instance_with_args()
instance = db.instance_update(
@@ -4318,1196 +3907,6 @@ class InstanceFaultTestCase(test.TestCase, ModelsObjectComparatorMixin):
self.assertFalse(mock_filter.called)
-@mock.patch('time.sleep', new=lambda x: None)
-class FixedIPTestCase(test.TestCase, ModelsObjectComparatorMixin):
- def setUp(self):
- super(FixedIPTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _timeout_test(self, ctxt, timeout, multi_host):
- instance = db.instance_create(ctxt, dict(host='foo'))
- net = db.network_create_safe(ctxt, dict(multi_host=multi_host,
- host='bar'))
- old = timeout - datetime.timedelta(seconds=5)
- new = timeout + datetime.timedelta(seconds=5)
- # should deallocate
- db.fixed_ip_create(ctxt, dict(allocated=False,
- instance_uuid=instance['uuid'],
- network_id=net['id'],
- updated_at=old))
- # still allocated
- db.fixed_ip_create(ctxt, dict(allocated=True,
- instance_uuid=instance['uuid'],
- network_id=net['id'],
- updated_at=old))
- # wrong network
- db.fixed_ip_create(ctxt, dict(allocated=False,
- instance_uuid=instance['uuid'],
- network_id=None,
- updated_at=old))
- # too new
- db.fixed_ip_create(ctxt, dict(allocated=False,
- instance_uuid=instance['uuid'],
- network_id=None,
- updated_at=new))
-
- def test_fixed_ip_disassociate_all_by_timeout_single_host(self):
- now = timeutils.utcnow()
- self._timeout_test(self.ctxt, now, False)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'foo', now)
- self.assertEqual(result, 0)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'bar', now)
- self.assertEqual(result, 1)
-
- def test_fixed_ip_disassociate_all_by_timeout_multi_host(self):
- now = timeutils.utcnow()
- self._timeout_test(self.ctxt, now, True)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'foo', now)
- self.assertEqual(result, 1)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'bar', now)
- self.assertEqual(result, 0)
-
- def test_fixed_ip_get_by_floating_address(self):
- fixed_ip = db.fixed_ip_create(self.ctxt, {'address': '192.168.0.2'})
- values = {'address': '8.7.6.5',
- 'fixed_ip_id': fixed_ip['id']}
- floating = db.floating_ip_create(self.ctxt, values)['address']
- fixed_ip_ref = db.fixed_ip_get_by_floating_address(self.ctxt, floating)
- self._assertEqualObjects(fixed_ip, fixed_ip_ref)
-
- def test_fixed_ip_get_by_host(self):
- host_ips = {
- 'host1': ['1.1.1.1', '1.1.1.2', '1.1.1.3'],
- 'host2': ['1.1.1.4', '1.1.1.5'],
- 'host3': ['1.1.1.6']
- }
-
- for host, ips in host_ips.items():
- for ip in ips:
- instance_uuid = self._create_instance(host=host)
- db.fixed_ip_create(self.ctxt, {'address': ip})
- db.fixed_ip_associate(self.ctxt, ip, instance_uuid)
-
- for host, ips in host_ips.items():
- ips_on_host = [x['address']
- for x in db.fixed_ip_get_by_host(self.ctxt, host)]
- self._assertEqualListsOfPrimitivesAsSets(ips_on_host, ips)
-
- def test_fixed_ip_get_by_network_host_not_found_exception(self):
- self.assertRaises(
- exception.FixedIpNotFoundForNetworkHost,
- db.fixed_ip_get_by_network_host,
- self.ctxt, 1, 'ignore')
-
- def test_fixed_ip_get_by_network_host_fixed_ip_found(self):
- db.fixed_ip_create(self.ctxt, dict(network_id=1, host='host'))
-
- fip = db.fixed_ip_get_by_network_host(self.ctxt, 1, 'host')
-
- self.assertEqual(1, fip['network_id'])
- self.assertEqual('host', fip['host'])
-
- def _create_instance(self, **kwargs):
- instance = db.instance_create(self.ctxt, kwargs)
- return instance['uuid']
-
- def test_fixed_ip_get_by_instance_fixed_ip_found(self):
- instance_uuid = self._create_instance()
-
- FIXED_IP_ADDRESS = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS))
-
- ips_list = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
- self._assertEqualListsOfPrimitivesAsSets([FIXED_IP_ADDRESS],
- [ips_list[0].address])
-
- def test_fixed_ip_get_by_instance_multiple_fixed_ips_found(self):
- instance_uuid = self._create_instance()
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_2))
-
- ips_list = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ip_get_by_instance_inappropriate_ignored(self):
- instance_uuid = self._create_instance()
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_2))
-
- another_instance = db.instance_create(self.ctxt, {})
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=another_instance['uuid'], address="192.168.1.7"))
-
- ips_list = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ip_get_by_instance_not_found_exception(self):
- instance_uuid = self._create_instance()
-
- self.assertRaises(exception.FixedIpNotFoundForInstance,
- db.fixed_ip_get_by_instance,
- self.ctxt, instance_uuid)
-
- def test_fixed_ips_by_virtual_interface_fixed_ip_found(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- FIXED_IP_ADDRESS = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self._assertEqualListsOfPrimitivesAsSets([FIXED_IP_ADDRESS],
- [ips_list[0].address])
-
- def test_fixed_ips_by_virtual_interface_multiple_fixed_ips_found(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_2))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ips_by_virtual_interface_inappropriate_ignored(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_2))
-
- another_vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=another_vif.id, address="192.168.1.7"))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ips_by_virtual_interface_no_ip_found(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self.assertEqual(0, len(ips_list))
-
- def create_fixed_ip(self, **params):
- default_params = {'address': '192.168.0.1'}
- default_params.update(params)
- return db.fixed_ip_create(self.ctxt, default_params)['address']
-
- def test_fixed_ip_associate_fails_if_ip_not_in_network(self):
- instance_uuid = self._create_instance()
- self.assertRaises(exception.FixedIpNotFoundForNetwork,
- db.fixed_ip_associate,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_fails_if_ip_in_use(self):
- instance_uuid = self._create_instance()
-
- address = self.create_fixed_ip(instance_uuid=instance_uuid)
- self.assertRaises(exception.FixedIpAlreadyInUse,
- db.fixed_ip_associate,
- self.ctxt, address, instance_uuid)
-
- def test_fixed_ip_associate_succeeds(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip(network_id=network['id'])
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
-
- def test_fixed_ip_associate_succeeds_and_sets_network(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
- self.assertEqual(fixed_ip['network_id'], network['id'])
-
- def test_fixed_ip_associate_succeeds_retry_on_deadlock(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
-
- def fake_first():
- if mock_first.call_count == 1:
- raise db_exc.DBDeadlock()
- else:
- return objects.Instance(id=1, address=address, reserved=False,
- instance_uuid=None, network_id=None)
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- self.assertEqual(2, mock_first.call_count)
-
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
- self.assertEqual(fixed_ip['network_id'], network['id'])
-
- def test_fixed_ip_associate_succeeds_retry_on_no_rows_updated(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
-
- def fake_first():
- if mock_first.call_count == 1:
- return objects.Instance(id=2, address=address, reserved=False,
- instance_uuid=None, network_id=None)
- else:
- return objects.Instance(id=1, address=address, reserved=False,
- instance_uuid=None, network_id=None)
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- self.assertEqual(2, mock_first.call_count)
-
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
- self.assertEqual(fixed_ip['network_id'], network['id'])
-
- def test_fixed_ip_associate_succeeds_retry_limit_exceeded(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
-
- def fake_first():
- return objects.Instance(id=2, address=address, reserved=False,
- instance_uuid=None, network_id=None)
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- self.assertRaises(exception.FixedIpAssociateFailed,
- db.fixed_ip_associate, self.ctxt, address,
- instance_uuid, network_id=network['id'])
- # 5 reties + initial attempt
- self.assertEqual(6, mock_first.call_count)
-
- def test_fixed_ip_associate_ip_not_in_network_with_no_retries(self):
- instance_uuid = self._create_instance()
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- return_value=None) as mock_first:
- self.assertRaises(exception.FixedIpNotFoundForNetwork,
- db.fixed_ip_associate,
- self.ctxt, None, instance_uuid)
- self.assertEqual(1, mock_first.call_count)
-
- def test_fixed_ip_associate_no_network_id_with_no_retries(self):
- # Tests that trying to associate an instance to a fixed IP on a network
- # but without specifying the network ID during associate will fail.
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
- address = self.create_fixed_ip(network_id=network['id'])
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- return_value=None) as mock_first:
- self.assertRaises(exception.FixedIpNotFoundForNetwork,
- db.fixed_ip_associate,
- self.ctxt, address, instance_uuid)
- self.assertEqual(1, mock_first.call_count)
-
- def test_fixed_ip_associate_with_vif(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
- vif = db.virtual_interface_create(self.ctxt, {})
- address = self.create_fixed_ip()
-
- fixed_ip = db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'],
- virtual_interface_id=vif['id'])
-
- self.assertTrue(fixed_ip['allocated'])
- self.assertEqual(vif['id'], fixed_ip['virtual_interface_id'])
-
- def test_fixed_ip_associate_not_allocated_without_vif(self):
- instance_uuid = self._create_instance()
- address = self.create_fixed_ip()
-
- fixed_ip = db.fixed_ip_associate(self.ctxt, address, instance_uuid)
-
- self.assertFalse(fixed_ip['allocated'])
- self.assertIsNone(fixed_ip['virtual_interface_id'])
-
- def test_fixed_ip_associate_pool_invalid_uuid(self):
- instance_uuid = '123'
- self.assertRaises(exception.InvalidUUID, db.fixed_ip_associate_pool,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_pool_no_more_fixed_ips(self):
- instance_uuid = self._create_instance()
- self.assertRaises(exception.NoMoreFixedIps, db.fixed_ip_associate_pool,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_pool_ignores_leased_addresses(self):
- instance_uuid = self._create_instance()
- params = {'address': '192.168.1.5',
- 'leased': True}
- db.fixed_ip_create(self.ctxt, params)
- self.assertRaises(exception.NoMoreFixedIps, db.fixed_ip_associate_pool,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_pool_succeeds(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip(network_id=network['id'])
- db.fixed_ip_associate_pool(self.ctxt, network['id'], instance_uuid)
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
-
- def test_fixed_ip_associate_pool_order(self):
- """Test that fixed_ip always uses oldest fixed_ip.
-
- We should always be using the fixed ip with the oldest
- updated_at.
- """
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
- self.addCleanup(timeutils.clear_time_override)
- start = timeutils.utcnow()
- for i in range(1, 4):
- now = start - datetime.timedelta(hours=i)
- timeutils.set_time_override(now)
- address = self.create_fixed_ip(
- updated_at=now,
- address='10.1.0.%d' % i,
- network_id=network['id'])
- db.fixed_ip_associate_pool(self.ctxt, network['id'], instance_uuid)
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
-
- def test_fixed_ip_associate_pool_succeeds_fip_ref_network_id_is_none(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- self.create_fixed_ip(network_id=None)
- fixed_ip = db.fixed_ip_associate_pool(self.ctxt,
- network['id'], instance_uuid)
- self.assertEqual(instance_uuid, fixed_ip['instance_uuid'])
- self.assertEqual(network['id'], fixed_ip['network_id'])
-
- def test_fixed_ip_associate_pool_succeeds_retry(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip(network_id=network['id'])
-
- def fake_first():
- if mock_first.call_count == 1:
- return {'network_id': network['id'], 'address': 'invalid',
- 'instance_uuid': None, 'host': None, 'id': 1}
- else:
- return {'network_id': network['id'], 'address': address,
- 'instance_uuid': None, 'host': None, 'id': 1}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- db.fixed_ip_associate_pool(self.ctxt, network['id'], instance_uuid)
- self.assertEqual(2, mock_first.call_count)
-
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(instance_uuid, fixed_ip['instance_uuid'])
-
- def test_fixed_ip_associate_pool_retry_limit_exceeded(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- self.create_fixed_ip(network_id=network['id'])
-
- def fake_first():
- return {'network_id': network['id'], 'address': 'invalid',
- 'instance_uuid': None, 'host': None, 'id': 1}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- self.assertRaises(exception.FixedIpAssociateFailed,
- db.fixed_ip_associate_pool, self.ctxt,
- network['id'], instance_uuid)
- # 5 retries + initial attempt
- self.assertEqual(6, mock_first.call_count)
-
- def test_fixed_ip_create_same_address(self):
- address = '192.168.1.5'
- params = {'address': address}
- db.fixed_ip_create(self.ctxt, params)
- self.assertRaises(exception.FixedIpExists, db.fixed_ip_create,
- self.ctxt, params)
-
- def test_fixed_ip_create_success(self):
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': '192.168.1.5',
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': None
- }
-
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at']
- fixed_ip_data = db.fixed_ip_create(self.ctxt, param)
- self._assertEqualObjects(param, fixed_ip_data, ignored_keys)
-
- def test_fixed_ip_bulk_create_same_address(self):
- address_1 = '192.168.1.5'
- address_2 = '192.168.1.6'
- instance_uuid = self._create_instance()
- network_id_1 = db.network_create_safe(self.ctxt, {})['id']
- network_id_2 = db.network_create_safe(self.ctxt, {})['id']
- params = [
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': address_2, 'allocated': False,
- 'instance_uuid': instance_uuid, 'network_id': network_id_1,
- 'virtual_interface_id': None},
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': address_1, 'allocated': False,
- 'instance_uuid': instance_uuid, 'network_id': network_id_1,
- 'virtual_interface_id': None},
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': 'localhost', 'address': address_2, 'allocated': True,
- 'instance_uuid': instance_uuid, 'network_id': network_id_2,
- 'virtual_interface_id': None},
- ]
-
- self.assertRaises(exception.FixedIpExists, db.fixed_ip_bulk_create,
- self.ctxt, params)
- # In this case the transaction will be rolled back and none of the ips
- # will make it to the database.
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_get_by_address, self.ctxt, address_1)
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_get_by_address, self.ctxt, address_2)
-
- def test_fixed_ip_bulk_create_success(self):
- address_1 = '192.168.1.5'
- address_2 = '192.168.1.6'
-
- instance_uuid = self._create_instance()
- network_id_1 = db.network_create_safe(self.ctxt, {})['id']
- network_id_2 = db.network_create_safe(self.ctxt, {})['id']
- params = [
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': address_1, 'allocated': False,
- 'instance_uuid': instance_uuid, 'network_id': network_id_1,
- 'virtual_interface_id': None},
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': 'localhost', 'address': address_2, 'allocated': True,
- 'instance_uuid': instance_uuid, 'network_id': network_id_2,
- 'virtual_interface_id': None}
- ]
-
- db.fixed_ip_bulk_create(self.ctxt, params)
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at',
- 'virtual_interface', 'network', 'floating_ips']
- fixed_ip_data = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
-
- # we have no `id` in incoming data so we can not use
- # _assertEqualListsOfObjects to compare incoming data and received
- # objects
- fixed_ip_data = sorted(fixed_ip_data, key=lambda i: i['network_id'])
- params = sorted(params, key=lambda i: i['network_id'])
- for param, ip in zip(params, fixed_ip_data):
- self._assertEqualObjects(param, ip, ignored_keys)
-
- def test_fixed_ip_disassociate(self):
- address = '192.168.1.5'
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- values = {'address': '192.168.1.5', 'instance_uuid': instance_uuid}
- vif = db.virtual_interface_create(self.ctxt, values)
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': address,
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': vif['id']
- }
- db.fixed_ip_create(self.ctxt, param)
-
- db.fixed_ip_disassociate(self.ctxt, address)
- fixed_ip_data = db.fixed_ip_get_by_address(self.ctxt, address)
- ignored_keys = ['created_at', 'id', 'deleted_at',
- 'updated_at', 'instance_uuid',
- 'virtual_interface_id']
- self._assertEqualObjects(param, fixed_ip_data, ignored_keys)
- self.assertIsNone(fixed_ip_data['instance_uuid'])
- self.assertIsNone(fixed_ip_data['virtual_interface_id'])
-
- def test_fixed_ip_get_not_found_exception(self):
- self.assertRaises(exception.FixedIpNotFound,
- db.fixed_ip_get, self.ctxt, 0)
-
- def test_fixed_ip_get_success2(self):
- address = '192.168.1.5'
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': address,
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': None
- }
- fixed_ip_id = db.fixed_ip_create(self.ctxt, param)
-
- self.ctxt.is_admin = False
- self.assertRaises(exception.Forbidden, db.fixed_ip_get,
- self.ctxt, fixed_ip_id)
-
- def test_fixed_ip_get_success(self):
- address = '192.168.1.5'
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': address,
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': None
- }
- db.fixed_ip_create(self.ctxt, param)
-
- fixed_ip_id = db.fixed_ip_get_by_address(self.ctxt, address)['id']
- fixed_ip_data = db.fixed_ip_get(self.ctxt, fixed_ip_id)
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at']
- self._assertEqualObjects(param, fixed_ip_data, ignored_keys)
-
- def test_fixed_ip_get_by_address(self):
- instance_uuid = self._create_instance()
- db.fixed_ip_create(self.ctxt, {'address': '1.2.3.4',
- 'instance_uuid': instance_uuid,
- })
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, '1.2.3.4',
- columns_to_join=['instance'])
- self.assertIn('instance', fixed_ip.__dict__)
- self.assertEqual(instance_uuid, fixed_ip.instance.uuid)
-
- def test_fixed_ip_update_not_found_for_address(self):
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_update, self.ctxt,
- '192.168.1.5', {})
-
- def test_fixed_ip_update(self):
- instance_uuid_1 = self._create_instance()
- instance_uuid_2 = self._create_instance()
- network_id_1 = db.network_create_safe(self.ctxt, {})['id']
- network_id_2 = db.network_create_safe(self.ctxt, {})['id']
- param_1 = {
- 'reserved': True, 'deleted': 0, 'leased': True,
- 'host': '192.168.133.1', 'address': '10.0.0.2',
- 'allocated': True, 'instance_uuid': instance_uuid_1,
- 'network_id': network_id_1, 'virtual_interface_id': '123',
- }
-
- param_2 = {
- 'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': '10.0.0.3', 'allocated': False,
- 'instance_uuid': instance_uuid_2, 'network_id': network_id_2,
- 'virtual_interface_id': None
- }
-
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at']
- fixed_ip_addr = db.fixed_ip_create(self.ctxt, param_1)['address']
- db.fixed_ip_update(self.ctxt, fixed_ip_addr, param_2)
- fixed_ip_after_update = db.fixed_ip_get_by_address(self.ctxt,
- param_2['address'])
- self._assertEqualObjects(param_2, fixed_ip_after_update, ignored_keys)
-
-
-@mock.patch('time.sleep', new=lambda x: None)
-class FloatingIpTestCase(test.TestCase, ModelsObjectComparatorMixin):
-
- def setUp(self):
- super(FloatingIpTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_base_values(self):
- return {
- 'address': '1.1.1.1',
- 'fixed_ip_id': None,
- 'project_id': 'fake_project',
- 'host': 'fake_host',
- 'auto_assigned': False,
- 'pool': 'fake_pool',
- 'interface': 'fake_interface',
- }
-
- def _create_floating_ip(self, values):
- if not values:
- values = {}
- vals = self._get_base_values()
- vals.update(values)
- return db.floating_ip_create(self.ctxt, vals)
-
- def test_floating_ip_get(self):
- values = [{'address': '0.0.0.0'}, {'address': '1.1.1.1'}]
- floating_ips = [self._create_floating_ip(val) for val in values]
-
- for floating_ip in floating_ips:
- real_floating_ip = db.floating_ip_get(self.ctxt, floating_ip['id'])
- self._assertEqualObjects(floating_ip, real_floating_ip,
- ignored_keys=['fixed_ip'])
-
- def test_floating_ip_get_not_found(self):
- self.assertRaises(exception.FloatingIpNotFound,
- db.floating_ip_get, self.ctxt, 100500)
-
- @mock.patch.object(query.Query, 'first', side_effect=db_exc.DBError())
- def test_floating_ip_get_with_long_id_not_found(self, mock_query):
- self.assertRaises(exception.InvalidID,
- db.floating_ip_get, self.ctxt, 123456789101112)
- mock_query.assert_called_once_with()
-
- def test_floating_ip_get_pools(self):
- values = [
- {'address': '0.0.0.0', 'pool': 'abc'},
- {'address': '1.1.1.1', 'pool': 'abc'},
- {'address': '2.2.2.2', 'pool': 'def'},
- {'address': '3.3.3.3', 'pool': 'ghi'},
- ]
- for val in values:
- self._create_floating_ip(val)
- expected_pools = [{'name': x}
- for x in set(map(lambda x: x['pool'], values))]
- real_pools = db.floating_ip_get_pools(self.ctxt)
- self._assertEqualListsOfPrimitivesAsSets(real_pools, expected_pools)
-
- def test_floating_ip_allocate_address(self):
- pools = {
- 'pool1': ['0.0.0.0', '1.1.1.1'],
- 'pool2': ['2.2.2.2'],
- 'pool3': ['3.3.3.3', '4.4.4.4', '5.5.5.5']
- }
- for pool, addresses in pools.items():
- for address in addresses:
- vals = {'pool': pool, 'address': address, 'project_id': None}
- self._create_floating_ip(vals)
-
- project_id = self._get_base_values()['project_id']
- for pool, addresses in pools.items():
- alloc_addrs = []
- for i in addresses:
- float_addr = db.floating_ip_allocate_address(self.ctxt,
- project_id, pool)
- alloc_addrs.append(float_addr)
- self._assertEqualListsOfPrimitivesAsSets(alloc_addrs, addresses)
-
- def test_floating_ip_allocate_auto_assigned(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3', '1.1.1.4']
-
- float_ips = []
- for i in range(0, 2):
- float_ips.append(self._create_floating_ip(
- {"address": addresses[i]}))
- for i in range(2, 4):
- float_ips.append(self._create_floating_ip({"address": addresses[i],
- "auto_assigned": True}))
-
- for i in range(0, 2):
- float_ip = db.floating_ip_get(self.ctxt, float_ips[i].id)
- self.assertFalse(float_ip.auto_assigned)
- for i in range(2, 4):
- float_ip = db.floating_ip_get(self.ctxt, float_ips[i].id)
- self.assertTrue(float_ip.auto_assigned)
-
- def test_floating_ip_allocate_address_no_more_floating_ips(self):
- self.assertRaises(exception.NoMoreFloatingIps,
- db.floating_ip_allocate_address,
- self.ctxt, 'any_project_id', 'no_such_pool')
-
- def test_floating_ip_allocate_not_authorized(self):
- ctxt = context.RequestContext(user_id='a', project_id='abc',
- is_admin=False)
- self.assertRaises(exception.Forbidden,
- db.floating_ip_allocate_address,
- ctxt, 'other_project_id', 'any_pool')
-
- def test_floating_ip_allocate_address_succeeds_retry(self):
- pool = 'pool0'
- address = '0.0.0.0'
- vals = {'pool': pool, 'address': address, 'project_id': None}
- floating_ip = self._create_floating_ip(vals)
-
- project_id = self._get_base_values()['project_id']
-
- def fake_first():
- if mock_first.call_count == 1:
- return {'pool': pool, 'project_id': None, 'fixed_ip_id': None,
- 'address': address, 'id': 'invalid_id'}
- else:
- return {'pool': pool, 'project_id': None, 'fixed_ip_id': None,
- 'address': address, 'id': 1}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- float_addr = db.floating_ip_allocate_address(self.ctxt,
- project_id, pool)
- self.assertEqual(address, float_addr)
- self.assertEqual(2, mock_first.call_count)
-
- float_ip = db.floating_ip_get(self.ctxt, floating_ip.id)
- self.assertEqual(project_id, float_ip['project_id'])
-
- def test_floating_ip_allocate_address_retry_limit_exceeded(self):
- pool = 'pool0'
- address = '0.0.0.0'
- vals = {'pool': pool, 'address': address, 'project_id': None}
- self._create_floating_ip(vals)
-
- project_id = self._get_base_values()['project_id']
-
- def fake_first():
- return {'pool': pool, 'project_id': None, 'fixed_ip_id': None,
- 'address': address, 'id': 'invalid_id'}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- self.assertRaises(exception.FloatingIpAllocateFailed,
- db.floating_ip_allocate_address, self.ctxt,
- project_id, pool)
- # 5 retries + initial attempt
- self.assertEqual(6, mock_first.call_count)
-
- def test_floating_ip_allocate_address_no_more_ips_with_no_retries(self):
- with mock.patch('sqlalchemy.orm.query.Query.first',
- return_value=None) as mock_first:
- self.assertRaises(exception.NoMoreFloatingIps,
- db.floating_ip_allocate_address,
- self.ctxt, 'any_project_id', 'no_such_pool')
- self.assertEqual(1, mock_first.call_count)
-
- def _get_existing_ips(self):
- return [ip['address'] for ip in db.floating_ip_get_all(self.ctxt)]
-
- def test_floating_ip_bulk_create(self):
- expected_ips = ['1.1.1.1', '1.1.1.2', '1.1.1.3', '1.1.1.4']
- result = db.floating_ip_bulk_create(self.ctxt,
- [{'address': x} for x in expected_ips],
- want_result=False)
- self.assertIsNone(result)
- self._assertEqualListsOfPrimitivesAsSets(self._get_existing_ips(),
- expected_ips)
-
- def test_floating_ip_bulk_create_duplicate(self):
- ips = ['1.1.1.1', '1.1.1.2', '1.1.1.3', '1.1.1.4']
- prepare_ips = lambda x: {'address': x}
-
- result = db.floating_ip_bulk_create(self.ctxt,
- list(map(prepare_ips, ips)))
- self.assertEqual(ips, [ip.address for ip in result])
- self.assertRaises(exception.FloatingIpExists,
- db.floating_ip_bulk_create,
- self.ctxt,
- list(map(prepare_ips, ['1.1.1.5', '1.1.1.4'])),
- want_result=False)
- self.assertRaises(exception.FloatingIpNotFoundForAddress,
- db.floating_ip_get_by_address,
- self.ctxt, '1.1.1.5')
-
- def test_floating_ip_bulk_destroy(self):
- ips_for_delete = []
- ips_for_non_delete = []
-
- def create_ips(i, j):
- return [{'address': '1.1.%s.%s' % (i, k)} for k in range(1, j + 1)]
-
- # NOTE(boris-42): Create more than 256 ip to check that
- # _ip_range_splitter works properly.
- for i in range(1, 3):
- ips_for_delete.extend(create_ips(i, 255))
- ips_for_non_delete.extend(create_ips(3, 255))
-
- result = db.floating_ip_bulk_create(self.ctxt,
- ips_for_delete + ips_for_non_delete,
- want_result=False)
- self.assertIsNone(result)
-
- non_bulk_ips_for_delete = create_ips(4, 3)
- non_bulk_ips_for_non_delete = create_ips(5, 3)
- non_bulk_ips = non_bulk_ips_for_delete + non_bulk_ips_for_non_delete
- for dct in non_bulk_ips:
- self._create_floating_ip(dct)
- ips_for_delete.extend(non_bulk_ips_for_delete)
- ips_for_non_delete.extend(non_bulk_ips_for_non_delete)
-
- db.floating_ip_bulk_destroy(self.ctxt, ips_for_delete)
-
- expected_addresses = [x['address'] for x in ips_for_non_delete]
- self._assertEqualListsOfPrimitivesAsSets(self._get_existing_ips(),
- expected_addresses)
-
- def test_floating_ip_create(self):
- floating_ip = self._create_floating_ip({})
- ignored_keys = ['id', 'deleted', 'deleted_at', 'updated_at',
- 'created_at']
-
- self.assertIsNotNone(floating_ip['id'])
- self._assertEqualObjects(floating_ip, self._get_base_values(),
- ignored_keys)
-
- def test_floating_ip_create_duplicate(self):
- self._create_floating_ip({})
- self.assertRaises(exception.FloatingIpExists,
- self._create_floating_ip, {})
-
- def _create_fixed_ip(self, params):
- default_params = {'address': '192.168.0.1'}
- default_params.update(params)
- return db.fixed_ip_create(self.ctxt, default_params)['address']
-
- def test_floating_ip_fixed_ip_associate(self):
- float_addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- fixed_addresses = ['2.2.2.1', '2.2.2.2', '2.2.2.3']
-
- project_id = self.ctxt.project_id
- float_ips = [self._create_floating_ip({'address': address,
- 'project_id': project_id})
- for address in float_addresses]
- fixed_addrs = [self._create_fixed_ip({'address': address})
- for address in fixed_addresses]
-
- for float_ip, fixed_addr in zip(float_ips, fixed_addrs):
- fixed_ip = db.floating_ip_fixed_ip_associate(self.ctxt,
- float_ip.address,
- fixed_addr, 'host')
- self.assertEqual(fixed_ip.address, fixed_addr)
-
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip.id)
- self.assertEqual(fixed_ip.id, updated_float_ip.fixed_ip_id)
- self.assertEqual('host', updated_float_ip.host)
-
- fixed_ip = db.floating_ip_fixed_ip_associate(self.ctxt,
- float_addresses[0],
- fixed_addresses[0],
- 'host')
- self.assertEqual(fixed_ip.address, fixed_addresses[0])
-
- def test_floating_ip_fixed_ip_associate_float_ip_not_found(self):
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.floating_ip_fixed_ip_associate,
- self.ctxt, '10.10.10.10', 'some', 'some')
-
- def test_floating_ip_associate_failed(self):
- fixed_ip = self._create_fixed_ip({'address': '7.7.7.7'})
- self.assertRaises(exception.FloatingIpAssociateFailed,
- db.floating_ip_fixed_ip_associate,
- self.ctxt, '10.10.10.10', fixed_ip, 'some')
-
- def test_floating_ip_deallocate(self):
- values = {'address': '1.1.1.1', 'project_id': 'fake', 'host': 'fake'}
- float_ip = self._create_floating_ip(values)
- rows_updated = db.floating_ip_deallocate(self.ctxt, float_ip.address)
- self.assertEqual(1, rows_updated)
-
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip.id)
- self.assertIsNone(updated_float_ip.project_id)
- self.assertIsNone(updated_float_ip.host)
- self.assertFalse(updated_float_ip.auto_assigned)
-
- def test_floating_ip_deallocate_address_not_found(self):
- self.assertEqual(0, db.floating_ip_deallocate(self.ctxt, '2.2.2.2'))
-
- def test_floating_ip_deallocate_address_associated_ip(self):
- float_address = '1.1.1.1'
- fixed_address = '2.2.2.1'
-
- project_id = self.ctxt.project_id
- float_ip = self._create_floating_ip({'address': float_address,
- 'project_id': project_id})
- fixed_addr = self._create_fixed_ip({'address': fixed_address})
- db.floating_ip_fixed_ip_associate(self.ctxt, float_ip.address,
- fixed_addr, 'host')
- self.assertEqual(0, db.floating_ip_deallocate(self.ctxt,
- float_address))
-
- def test_floating_ip_destroy(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_ips = [self._create_floating_ip({'address': addr})
- for addr in addresses]
-
- expected_len = len(addresses)
- for float_ip in float_ips:
- db.floating_ip_destroy(self.ctxt, float_ip.address)
- self.assertRaises(exception.FloatingIpNotFound,
- db.floating_ip_get, self.ctxt, float_ip.id)
- expected_len -= 1
- if expected_len > 0:
- self.assertEqual(expected_len,
- len(db.floating_ip_get_all(self.ctxt)))
- else:
- self.assertRaises(exception.NoFloatingIpsDefined,
- db.floating_ip_get_all, self.ctxt)
-
- def test_floating_ip_disassociate(self):
- float_addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- fixed_addresses = ['2.2.2.1', '2.2.2.2', '2.2.2.3']
-
- project_id = self.ctxt.project_id
- float_ips = [self._create_floating_ip({'address': address,
- 'project_id': project_id})
- for address in float_addresses]
- fixed_addrs = [self._create_fixed_ip({'address': address})
- for address in fixed_addresses]
-
- for float_ip, fixed_addr in zip(float_ips, fixed_addrs):
- db.floating_ip_fixed_ip_associate(self.ctxt,
- float_ip.address,
- fixed_addr, 'host')
-
- for float_ip, fixed_addr in zip(float_ips, fixed_addrs):
- fixed = db.floating_ip_disassociate(self.ctxt, float_ip.address)
- self.assertEqual(fixed.address, fixed_addr)
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip.id)
- self.assertIsNone(updated_float_ip.fixed_ip_id)
- self.assertIsNone(updated_float_ip.host)
-
- def test_floating_ip_disassociate_not_found(self):
- self.assertRaises(exception.FloatingIpNotFoundForAddress,
- db.floating_ip_disassociate, self.ctxt,
- '11.11.11.11')
-
- def test_floating_ip_get_all(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_ips = [self._create_floating_ip({'address': addr})
- for addr in addresses]
- self._assertEqualListsOfObjects(float_ips,
- db.floating_ip_get_all(self.ctxt),
- ignored_keys="fixed_ip")
-
- def test_floating_ip_get_all_associated(self):
- instance = db.instance_create(self.ctxt, {'uuid': 'fake'})
- project_id = self.ctxt.project_id
- float_ip = self._create_floating_ip({'address': '1.1.1.1',
- 'project_id': project_id})
- fixed_ip = self._create_fixed_ip({'address': '2.2.2.2',
- 'instance_uuid': instance.uuid})
- db.floating_ip_fixed_ip_associate(self.ctxt,
- float_ip.address,
- fixed_ip,
- 'host')
- float_ips = db.floating_ip_get_all(self.ctxt)
- self.assertEqual(1, len(float_ips))
- self.assertEqual(float_ip.address, float_ips[0].address)
- self.assertEqual(fixed_ip, float_ips[0].fixed_ip.address)
- self.assertEqual(instance.uuid, float_ips[0].fixed_ip.instance_uuid)
-
- def test_floating_ip_get_all_not_found(self):
- self.assertRaises(exception.NoFloatingIpsDefined,
- db.floating_ip_get_all, self.ctxt)
-
- def test_floating_ip_get_all_by_host(self):
- hosts = {
- 'host1': ['1.1.1.1', '1.1.1.2'],
- 'host2': ['2.1.1.1', '2.1.1.2'],
- 'host3': ['3.1.1.1', '3.1.1.2', '3.1.1.3']
- }
-
- hosts_with_float_ips = {}
- for host, addresses in hosts.items():
- hosts_with_float_ips[host] = []
- for address in addresses:
- float_ip = self._create_floating_ip({'host': host,
- 'address': address})
- hosts_with_float_ips[host].append(float_ip)
-
- for host, float_ips in hosts_with_float_ips.items():
- real_float_ips = db.floating_ip_get_all_by_host(self.ctxt, host)
- self._assertEqualListsOfObjects(float_ips, real_float_ips,
- ignored_keys="fixed_ip")
-
- def test_floating_ip_get_all_by_host_not_found(self):
- self.assertRaises(exception.FloatingIpNotFoundForHost,
- db.floating_ip_get_all_by_host,
- self.ctxt, 'non_exists_host')
-
- def test_floating_ip_get_all_by_project(self):
- projects = {
- 'pr1': ['1.1.1.1', '1.1.1.2'],
- 'pr2': ['2.1.1.1', '2.1.1.2'],
- 'pr3': ['3.1.1.1', '3.1.1.2', '3.1.1.3']
- }
-
- projects_with_float_ips = {}
- for project_id, addresses in projects.items():
- projects_with_float_ips[project_id] = []
- for address in addresses:
- float_ip = self._create_floating_ip({'project_id': project_id,
- 'address': address})
- projects_with_float_ips[project_id].append(float_ip)
-
- for project_id, float_ips in projects_with_float_ips.items():
- real_float_ips = db.floating_ip_get_all_by_project(self.ctxt,
- project_id)
- self._assertEqualListsOfObjects(float_ips, real_float_ips,
- ignored_keys='fixed_ip')
-
- def test_floating_ip_get_all_by_project_not_authorized(self):
- ctxt = context.RequestContext(user_id='a', project_id='abc',
- is_admin=False)
- self.assertRaises(exception.Forbidden,
- db.floating_ip_get_all_by_project,
- ctxt, 'other_project')
-
- def test_floating_ip_get_by_address(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_ips = [self._create_floating_ip({'address': addr})
- for addr in addresses]
-
- for float_ip in float_ips:
- real_float_ip = db.floating_ip_get_by_address(self.ctxt,
- float_ip.address)
- self._assertEqualObjects(float_ip, real_float_ip,
- ignored_keys='fixed_ip')
-
- def test_floating_ip_get_by_address_not_found(self):
- self.assertRaises(exception.FloatingIpNotFoundForAddress,
- db.floating_ip_get_by_address,
- self.ctxt, '20.20.20.20')
-
- @mock.patch.object(query.Query, 'first', side_effect=db_exc.DBError())
- def test_floating_ip_get_by_invalid_address(self, mock_query):
- self.assertRaises(exception.InvalidIpAddressError,
- db.floating_ip_get_by_address,
- self.ctxt, 'non_exists_host')
- mock_query.assert_called_once_with()
-
- def test_floating_ip_get_by_fixed_address(self):
- fixed_float = [
- ('1.1.1.1', '2.2.2.1'),
- ('1.1.1.2', '2.2.2.2'),
- ('1.1.1.3', '2.2.2.3')
- ]
-
- for fixed_addr, float_addr in fixed_float:
- project_id = self.ctxt.project_id
- self._create_floating_ip({'address': float_addr,
- 'project_id': project_id})
- self._create_fixed_ip({'address': fixed_addr})
- db.floating_ip_fixed_ip_associate(self.ctxt, float_addr,
- fixed_addr, 'some_host')
-
- for fixed_addr, float_addr in fixed_float:
- float_ip = db.floating_ip_get_by_fixed_address(self.ctxt,
- fixed_addr)
- self.assertEqual(float_addr, float_ip[0]['address'])
-
- def test_floating_ip_get_by_fixed_ip_id(self):
- fixed_float = [
- ('1.1.1.1', '2.2.2.1'),
- ('1.1.1.2', '2.2.2.2'),
- ('1.1.1.3', '2.2.2.3')
- ]
-
- for fixed_addr, float_addr in fixed_float:
- project_id = self.ctxt.project_id
- self._create_floating_ip({'address': float_addr,
- 'project_id': project_id})
- self._create_fixed_ip({'address': fixed_addr})
- db.floating_ip_fixed_ip_associate(self.ctxt, float_addr,
- fixed_addr, 'some_host')
-
- for fixed_addr, float_addr in fixed_float:
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, fixed_addr)
- float_ip = db.floating_ip_get_by_fixed_ip_id(self.ctxt,
- fixed_ip['id'])
- self.assertEqual(float_addr, float_ip[0]['address'])
-
- def test_floating_ip_update(self):
- float_ip = self._create_floating_ip({})
-
- values = {
- 'project_id': 'some_pr',
- 'host': 'some_host',
- 'auto_assigned': True,
- 'interface': 'some_interface',
- 'pool': 'some_pool'
- }
- floating_ref = db.floating_ip_update(self.ctxt, float_ip['address'],
- values)
- self.assertIsNotNone(floating_ref)
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip['id'])
- self._assertEqualObjects(updated_float_ip, values,
- ignored_keys=['id', 'address', 'updated_at',
- 'deleted_at', 'created_at',
- 'deleted', 'fixed_ip_id',
- 'fixed_ip'])
-
- def test_floating_ip_update_to_duplicate(self):
- float_ip1 = self._create_floating_ip({'address': '1.1.1.1'})
- float_ip2 = self._create_floating_ip({'address': '1.1.1.2'})
-
- self.assertRaises(exception.FloatingIpExists,
- db.floating_ip_update,
- self.ctxt, float_ip2['address'],
- {'address': float_ip1['address']})
-
-
class InstanceDestroyConstraints(test.TestCase):
def test_destroy_with_equal_any_constraint_met_single_value(self):
@@ -6444,313 +4843,6 @@ class VirtualInterfaceTestCase(test.TestCase, ModelsObjectComparatorMixin):
self._assertEqualObjects(updated, updated_vif, ignored_keys)
-@mock.patch('time.sleep', new=lambda x: None)
-class NetworkTestCase(test.TestCase, ModelsObjectComparatorMixin):
-
- """Tests for db.api.network_* methods."""
-
- def setUp(self):
- super(NetworkTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_associated_fixed_ip(self, host, cidr, ip):
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'cidr': cidr})
- self.assertFalse(db.network_in_use_on_host(self.ctxt, network.id,
- host))
- instance = db.instance_create(self.ctxt,
- {'project_id': 'project1', 'host': host})
- virtual_interface = db.virtual_interface_create(self.ctxt,
- {'instance_uuid': instance.uuid, 'network_id': network.id,
- 'address': ip})
- db.fixed_ip_create(self.ctxt, {'address': ip,
- 'network_id': network.id, 'allocated': True,
- 'virtual_interface_id': virtual_interface.id})
- db.fixed_ip_associate(self.ctxt, ip, instance.uuid,
- network.id, virtual_interface_id=virtual_interface['id'])
- return network, instance
-
- def test_network_get_associated_default_route(self):
- network, instance = self._get_associated_fixed_ip('host.net',
- '192.0.2.0/30', '192.0.2.1')
- network2 = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'cidr': '192.0.3.0/30'})
- ip = '192.0.3.1'
- virtual_interface = db.virtual_interface_create(self.ctxt,
- {'instance_uuid': instance.uuid, 'network_id': network2.id,
- 'address': ip})
- db.fixed_ip_create(self.ctxt, {'address': ip,
- 'network_id': network2.id, 'allocated': True,
- 'virtual_interface_id': virtual_interface.id})
- db.fixed_ip_associate(self.ctxt, ip, instance.uuid,
- network2.id)
- data = db.network_get_associated_fixed_ips(self.ctxt, network.id)
- self.assertEqual(1, len(data))
- self.assertTrue(data[0]['default_route'])
- data = db.network_get_associated_fixed_ips(self.ctxt, network2.id)
- self.assertEqual(1, len(data))
- self.assertFalse(data[0]['default_route'])
-
- def test_network_get_associated_fixed_ips(self):
- network, instance = self._get_associated_fixed_ip('host.net',
- '192.0.2.0/30', '192.0.2.1')
- data = db.network_get_associated_fixed_ips(self.ctxt, network.id)
- self.assertEqual(1, len(data))
- self.assertEqual('192.0.2.1', data[0]['address'])
- self.assertEqual('192.0.2.1', data[0]['vif_address'])
- self.assertEqual(instance.uuid, data[0]['instance_uuid'])
- self.assertTrue(data[0][fields.PciDeviceStatus.ALLOCATED])
-
- def test_network_create_safe(self):
- values = {'host': 'localhost', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- self.assertEqual(36, len(network['uuid']))
- db_network = db.network_get(self.ctxt, network['id'])
- self._assertEqualObjects(network, db_network)
-
- def test_network_create_with_duplicate_vlan(self):
- values1 = {'host': 'localhost', 'project_id': 'project1', 'vlan': 1}
- values2 = {'host': 'something', 'project_id': 'project1', 'vlan': 1}
- db.network_create_safe(self.ctxt, values1)
- self.assertRaises(exception.DuplicateVlan,
- db.network_create_safe, self.ctxt, values2)
-
- def test_network_delete_safe(self):
- values = {'host': 'localhost', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- db.network_get(self.ctxt, network['id'])
- values = {'network_id': network['id'], 'address': '192.168.1.5'}
- address1 = db.fixed_ip_create(self.ctxt, values)['address']
- values = {'network_id': network['id'],
- 'address': '192.168.1.6',
- 'allocated': True}
- address2 = db.fixed_ip_create(self.ctxt, values)['address']
- self.assertRaises(exception.NetworkInUse,
- db.network_delete_safe, self.ctxt, network['id'])
- db.fixed_ip_update(self.ctxt, address2, {'allocated': False})
- network = db.network_delete_safe(self.ctxt, network['id'])
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_get_by_address, self.ctxt, address1)
- ctxt = self.ctxt.elevated(read_deleted='yes')
- fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
- self.assertTrue(fixed_ip['deleted'])
-
- def test_network_in_use_on_host(self):
- values = {'host': 'foo', 'hostname': 'myname'}
- instance = db.instance_create(self.ctxt, values)
- values = {'address': '192.168.1.5', 'instance_uuid': instance['uuid']}
- vif = db.virtual_interface_create(self.ctxt, values)
- values = {'address': '192.168.1.6',
- 'network_id': 1,
- 'allocated': True,
- 'instance_uuid': instance['uuid'],
- 'virtual_interface_id': vif['id']}
- db.fixed_ip_create(self.ctxt, values)
- self.assertTrue(db.network_in_use_on_host(self.ctxt, 1, 'foo'))
- self.assertFalse(db.network_in_use_on_host(self.ctxt, 1, 'bar'))
-
- def test_network_update_nonexistent(self):
- self.assertRaises(exception.NetworkNotFound,
- db.network_update, self.ctxt, 123456, {})
-
- def test_network_update_with_duplicate_vlan(self):
- values1 = {'host': 'localhost', 'project_id': 'project1', 'vlan': 1}
- values2 = {'host': 'something', 'project_id': 'project1', 'vlan': 2}
- network_ref = db.network_create_safe(self.ctxt, values1)
- db.network_create_safe(self.ctxt, values2)
- self.assertRaises(exception.DuplicateVlan,
- db.network_update, self.ctxt,
- network_ref["id"], values2)
-
- def test_network_update(self):
- network = db.network_create_safe(self.ctxt, {'project_id': 'project1',
- 'vlan': 1, 'host': 'test.com'})
- db.network_update(self.ctxt, network.id, {'vlan': 2})
- network_new = db.network_get(self.ctxt, network.id)
- self.assertEqual(2, network_new.vlan)
-
- def test_network_set_host_nonexistent_network(self):
- self.assertRaises(exception.NetworkNotFound, db.network_set_host,
- self.ctxt, 123456, 'nonexistent')
-
- def test_network_set_host_already_set_correct(self):
- values = {'host': 'example.com', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- self.assertIsNone(db.network_set_host(self.ctxt, network.id,
- 'example.com'))
-
- def test_network_set_host_already_set_incorrect(self):
- values = {'host': 'example.com', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- self.assertIsNone(db.network_set_host(self.ctxt, network.id,
- 'new.example.com'))
-
- def test_network_set_host_with_initially_no_host(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- db.network_set_host(self.ctxt, network.id, 'example.com')
- self.assertEqual('example.com',
- db.network_get(self.ctxt, network.id).host)
-
- def test_network_set_host_succeeds_retry_on_deadlock(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
-
- def fake_update(params):
- if mock_update.call_count == 1:
- raise db_exc.DBDeadlock()
- else:
- return 1
-
- with mock.patch('sqlalchemy.orm.query.Query.update',
- side_effect=fake_update) as mock_update:
- db.network_set_host(self.ctxt, network.id, 'example.com')
- self.assertEqual(2, mock_update.call_count)
-
- def test_network_set_host_succeeds_retry_on_no_rows_updated(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
-
- def fake_update(params):
- if mock_update.call_count == 1:
- return 0
- else:
- return 1
-
- with mock.patch('sqlalchemy.orm.query.Query.update',
- side_effect=fake_update) as mock_update:
- db.network_set_host(self.ctxt, network.id, 'example.com')
- self.assertEqual(2, mock_update.call_count)
-
- def test_network_set_host_failed_with_retry_on_no_rows_updated(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
-
- with mock.patch('sqlalchemy.orm.query.Query.update',
- return_value=0) as mock_update:
- self.assertRaises(exception.NetworkSetHostFailed,
- db.network_set_host, self.ctxt, network.id,
- 'example.com')
- # 5 retries + initial attempt
- self.assertEqual(6, mock_update.call_count)
-
- def test_network_get_all_by_host(self):
- self.assertEqual([],
- db.network_get_all_by_host(self.ctxt, 'example.com'))
- host = 'h1.example.com'
- # network with host set
- net1 = db.network_create_safe(self.ctxt, {'host': host})
- self._assertEqualListsOfObjects([net1],
- db.network_get_all_by_host(self.ctxt, host))
- # network with fixed ip with host set
- net2 = db.network_create_safe(self.ctxt, {})
- db.fixed_ip_create(self.ctxt, {'host': host, 'network_id': net2.id})
- db.network_get_all_by_host(self.ctxt, host)
- self._assertEqualListsOfObjects([net1, net2],
- db.network_get_all_by_host(self.ctxt, host))
- # network with instance with host set
- net3 = db.network_create_safe(self.ctxt, {})
- instance = db.instance_create(self.ctxt, {'host': host})
- db.fixed_ip_create(self.ctxt, {'network_id': net3.id,
- 'instance_uuid': instance.uuid})
- self._assertEqualListsOfObjects([net1, net2, net3],
- db.network_get_all_by_host(self.ctxt, host))
-
- def test_network_get_by_cidr(self):
- cidr = '192.0.2.0/30'
- cidr_v6 = '2001:db8:1::/64'
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'cidr': cidr, 'cidr_v6': cidr_v6})
- self._assertEqualObjects(network,
- db.network_get_by_cidr(self.ctxt, cidr))
- self._assertEqualObjects(network,
- db.network_get_by_cidr(self.ctxt, cidr_v6))
-
- def test_network_get_by_cidr_nonexistent(self):
- self.assertRaises(exception.NetworkNotFoundForCidr,
- db.network_get_by_cidr, self.ctxt, '192.0.2.0/30')
-
- def test_network_get_by_uuid(self):
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project_1'})
- self._assertEqualObjects(network,
- db.network_get_by_uuid(self.ctxt, network.uuid))
-
- def test_network_get_by_uuid_nonexistent(self):
- self.assertRaises(exception.NetworkNotFoundForUUID,
- db.network_get_by_uuid, self.ctxt, 'non-existent-uuid')
-
- def test_network_get_all_by_uuids_no_networks(self):
- self.assertRaises(exception.NoNetworksFound,
- db.network_get_all_by_uuids, self.ctxt, ['non-existent-uuid'])
-
- def test_network_get_all_by_uuids(self):
- net1 = db.network_create_safe(self.ctxt, {})
- net2 = db.network_create_safe(self.ctxt, {})
- self._assertEqualListsOfObjects([net1, net2],
- db.network_get_all_by_uuids(self.ctxt, [net1.uuid, net2.uuid]))
-
- def test_network_get_all_no_networks(self):
- self.assertRaises(exception.NoNetworksFound,
- db.network_get_all, self.ctxt)
-
- def test_network_get_all(self):
- network = db.network_create_safe(self.ctxt, {})
- network_db = db.network_get_all(self.ctxt)
- self.assertEqual(1, len(network_db))
- self._assertEqualObjects(network, network_db[0])
-
- def test_network_get_all_admin_user(self):
- network1 = db.network_create_safe(self.ctxt, {})
- network2 = db.network_create_safe(self.ctxt,
- {'project_id': 'project1'})
- self._assertEqualListsOfObjects([network1, network2],
- db.network_get_all(self.ctxt,
- project_only=True))
-
- def test_network_get_all_normal_user(self):
- normal_ctxt = context.RequestContext('fake', 'fake')
- db.network_create_safe(self.ctxt, {})
- db.network_create_safe(self.ctxt, {'project_id': 'project1'})
- network1 = db.network_create_safe(self.ctxt,
- {'project_id': 'fake'})
- network_db = db.network_get_all(normal_ctxt, project_only=True)
- self.assertEqual(1, len(network_db))
- self._assertEqualObjects(network1, network_db[0])
-
- def test_network_get(self):
- network = db.network_create_safe(self.ctxt, {})
- self._assertEqualObjects(db.network_get(self.ctxt, network.id),
- network)
- db.network_delete_safe(self.ctxt, network.id)
- self.assertRaises(exception.NetworkNotFound,
- db.network_get, self.ctxt, network.id)
-
- def test_network_associate(self):
- network = db.network_create_safe(self.ctxt, {})
- self.assertIsNone(network.project_id)
- db.network_associate(self.ctxt, "project1", network.id)
- self.assertEqual("project1", db.network_get(self.ctxt,
- network.id).project_id)
-
- def test_network_diassociate(self):
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'host': 'test.net'})
- # disassociate project
- db.network_disassociate(self.ctxt, network.id, False, True)
- self.assertIsNone(db.network_get(self.ctxt, network.id).project_id)
- # disassociate host
- db.network_disassociate(self.ctxt, network.id, True, False)
- self.assertIsNone(db.network_get(self.ctxt, network.id).host)
-
- def test_network_count_reserved_ips(self):
- net = db.network_create_safe(self.ctxt, {})
- self.assertEqual(0, db.network_count_reserved_ips(self.ctxt, net.id))
- db.fixed_ip_create(self.ctxt, {'network_id': net.id,
- 'reserved': True})
- self.assertEqual(1, db.network_count_reserved_ips(self.ctxt, net.id))
-
-
class KeyPairTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
super(KeyPairTestCase, self).setUp()