summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nova/api/openstack/compute/multinic.py2
-rw-r--r--nova/db/api.py319
-rw-r--r--nova/db/sqlalchemy/api.py1040
-rw-r--r--nova/db/sqlalchemy/models.py9
-rw-r--r--nova/exception.py94
-rw-r--r--nova/network/neutron.py2
-rw-r--r--nova/objects/__init__.py4
-rw-r--r--nova/objects/fixed_ip.py315
-rw-r--r--nova/objects/floating_ip.py258
-rw-r--r--nova/objects/network.py229
-rw-r--r--nova/objects/security_group_rule.py114
-rw-r--r--nova/tests/unit/api/openstack/compute/test_multinic.py2
-rw-r--r--nova/tests/unit/db/test_db_api.py1908
-rw-r--r--nova/tests/unit/objects/test_fixed_ip.py382
-rw-r--r--nova/tests/unit/objects/test_floating_ip.py289
-rw-r--r--nova/tests/unit/objects/test_network.py239
-rw-r--r--nova/tests/unit/objects/test_objects.py11
-rw-r--r--nova/tests/unit/objects/test_security_group_rule.py126
18 files changed, 15 insertions, 5328 deletions
diff --git a/nova/api/openstack/compute/multinic.py b/nova/api/openstack/compute/multinic.py
index 89e9b01e02..8a477f1d1d 100644
--- a/nova/api/openstack/compute/multinic.py
+++ b/nova/api/openstack/compute/multinic.py
@@ -65,5 +65,5 @@ class MultinicController(wsgi.Controller):
try:
self.compute_api.remove_fixed_ip(context, instance, address)
- except exception.FixedIpNotFoundForSpecificInstance as e:
+ except exception.FixedIpNotFoundForInstance as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
diff --git a/nova/db/api.py b/nova/db/api.py
index f8fe410c58..5b88d126b1 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -391,114 +391,6 @@ def certificate_get_all_by_user_and_project(context, user_id, project_id):
project_id)
-###################
-
-def floating_ip_get(context, id):
- return IMPL.floating_ip_get(context, id)
-
-
-def floating_ip_get_pools(context):
- """Returns a list of floating IP pools."""
- return IMPL.floating_ip_get_pools(context)
-
-
-def floating_ip_allocate_address(context, project_id, pool,
- auto_assigned=False):
- """Allocate free floating IP from specified pool and return the address.
-
- Raises if one is not available.
-
- """
- return IMPL.floating_ip_allocate_address(context, project_id, pool,
- auto_assigned)
-
-
-def floating_ip_bulk_create(context, ips, want_result=True):
- """Create a lot of floating IPs from the values dictionary.
- :param want_result: If set to True, return floating IPs inserted
- """
- return IMPL.floating_ip_bulk_create(context, ips, want_result=want_result)
-
-
-def floating_ip_bulk_destroy(context, ips):
- """Destroy a lot of floating IPs from the values dictionary."""
- return IMPL.floating_ip_bulk_destroy(context, ips)
-
-
-def floating_ip_create(context, values):
- """Create a floating IP from the values dictionary."""
- return IMPL.floating_ip_create(context, values)
-
-
-def floating_ip_deallocate(context, address):
- """Deallocate a floating IP by address."""
- return IMPL.floating_ip_deallocate(context, address)
-
-
-def floating_ip_destroy(context, address):
- """Destroy the floating_ip or raise if it does not exist."""
- return IMPL.floating_ip_destroy(context, address)
-
-
-def floating_ip_disassociate(context, address):
- """Disassociate a floating IP from a fixed IP by address.
-
- :returns: the fixed IP record joined to network record or None
- if the IP was not associated to an IP.
-
- """
- return IMPL.floating_ip_disassociate(context, address)
-
-
-def floating_ip_fixed_ip_associate(context, floating_address,
- fixed_address, host):
- """Associate a floating IP to a fixed_ip by address.
-
- :returns: the fixed IP record joined to network record or None
- if the IP was already associated to the fixed IP.
- """
-
- return IMPL.floating_ip_fixed_ip_associate(context,
- floating_address,
- fixed_address,
- host)
-
-
-def floating_ip_get_all(context):
- """Get all floating IPs."""
- return IMPL.floating_ip_get_all(context)
-
-
-def floating_ip_get_all_by_host(context, host):
- """Get all floating IPs by host."""
- return IMPL.floating_ip_get_all_by_host(context, host)
-
-
-def floating_ip_get_all_by_project(context, project_id):
- """Get all floating IPs by project."""
- return IMPL.floating_ip_get_all_by_project(context, project_id)
-
-
-def floating_ip_get_by_address(context, address):
- """Get a floating IP by address or raise if it doesn't exist."""
- return IMPL.floating_ip_get_by_address(context, address)
-
-
-def floating_ip_get_by_fixed_address(context, fixed_address):
- """Get a floating IPs by fixed address."""
- return IMPL.floating_ip_get_by_fixed_address(context, fixed_address)
-
-
-def floating_ip_get_by_fixed_ip_id(context, fixed_ip_id):
- """Get a floating IPs by fixed address."""
- return IMPL.floating_ip_get_by_fixed_ip_id(context, fixed_ip_id)
-
-
-def floating_ip_update(context, address, values):
- """Update a floating IP by address or raise if it doesn't exist."""
- return IMPL.floating_ip_update(context, address, values)
-
-
####################
@@ -581,101 +473,6 @@ def migration_get_by_sort_filters(context, sort_keys, sort_dirs, values):
####################
-def fixed_ip_associate(context, address, instance_uuid, network_id=None,
- reserved=False, virtual_interface_id=None):
- """Associate fixed IP to instance.
-
- Raises if fixed IP is not available.
-
- """
- return IMPL.fixed_ip_associate(context, address, instance_uuid, network_id,
- reserved, virtual_interface_id)
-
-
-def fixed_ip_associate_pool(context, network_id, instance_uuid=None,
- host=None, virtual_interface_id=None):
- """Find free IP in network and associate it to instance or host.
-
- Raises if one is not available.
-
- """
- return IMPL.fixed_ip_associate_pool(context, network_id,
- instance_uuid, host,
- virtual_interface_id)
-
-
-def fixed_ip_create(context, values):
- """Create a fixed IP from the values dictionary."""
- return IMPL.fixed_ip_create(context, values)
-
-
-def fixed_ip_bulk_create(context, ips):
- """Create a lot of fixed IPs from the values dictionary."""
- return IMPL.fixed_ip_bulk_create(context, ips)
-
-
-def fixed_ip_disassociate(context, address):
- """Disassociate a fixed IP from an instance by address."""
- return IMPL.fixed_ip_disassociate(context, address)
-
-
-def fixed_ip_disassociate_all_by_timeout(context, host, time):
- """Disassociate old fixed IPs from host."""
- return IMPL.fixed_ip_disassociate_all_by_timeout(context, host, time)
-
-
-def fixed_ip_get(context, id, get_network=False):
- """Get fixed IP by id or raise if it does not exist.
-
- If get_network is true, also return the associated network.
- """
- return IMPL.fixed_ip_get(context, id, get_network)
-
-
-def fixed_ip_get_all(context):
- """Get all defined fixed IPs."""
- return IMPL.fixed_ip_get_all(context)
-
-
-def fixed_ip_get_by_address(context, address, columns_to_join=None):
- """Get a fixed IP by address or raise if it does not exist."""
- return IMPL.fixed_ip_get_by_address(context, address,
- columns_to_join=columns_to_join)
-
-
-def fixed_ip_get_by_floating_address(context, floating_address):
- """Get a fixed IP by a floating address."""
- return IMPL.fixed_ip_get_by_floating_address(context, floating_address)
-
-
-def fixed_ip_get_by_instance(context, instance_uuid):
- """Get fixed IPs by instance or raise if none exist."""
- return IMPL.fixed_ip_get_by_instance(context, instance_uuid)
-
-
-def fixed_ip_get_by_host(context, host):
- """Get fixed IPs by compute host."""
- return IMPL.fixed_ip_get_by_host(context, host)
-
-
-def fixed_ip_get_by_network_host(context, network_uuid, host):
- """Get fixed IP for a host in a network."""
- return IMPL.fixed_ip_get_by_network_host(context, network_uuid, host)
-
-
-def fixed_ips_by_virtual_interface(context, vif_id):
- """Get fixed IPs by virtual interface or raise if none exist."""
- return IMPL.fixed_ips_by_virtual_interface(context, vif_id)
-
-
-def fixed_ip_update(context, address, values):
- """Create a fixed IP from the values dictionary."""
- return IMPL.fixed_ip_update(context, address, values)
-
-
-####################
-
-
def virtual_interface_create(context, values):
"""Create a virtual interface record in the database."""
return IMPL.virtual_interface_create(context, values)
@@ -845,11 +642,6 @@ def instance_get_all_by_host_and_not_type(context, host, type_id=None):
return IMPL.instance_get_all_by_host_and_not_type(context, host, type_id)
-def instance_floating_address_get_all(context, instance_uuid):
- """Get all floating IP addresses of an instance."""
- return IMPL.instance_floating_address_get_all(context, instance_uuid)
-
-
# NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0.
def instance_get_all_hung_in_rebooting(context, reboot_window):
"""Get all instances stuck in a rebooting state."""
@@ -978,104 +770,6 @@ def key_pair_count_by_user(context, user_id):
return IMPL.key_pair_count_by_user(context, user_id)
-####################
-
-
-def network_associate(context, project_id, network_id=None, force=False):
- """Associate a free network to a project."""
- return IMPL.network_associate(context, project_id, network_id, force)
-
-
-def network_count_reserved_ips(context, network_id):
- """Return the number of reserved IPs in the network."""
- return IMPL.network_count_reserved_ips(context, network_id)
-
-
-def network_create_safe(context, values):
- """Create a network from the values dict.
-
- The network is only returned if the create succeeds. If the create violates
- constraints because the network already exists, no exception is raised.
-
- """
- return IMPL.network_create_safe(context, values)
-
-
-def network_delete_safe(context, network_id):
- """Delete network with key network_id.
-
- This method assumes that the network is not associated with any project
-
- """
- return IMPL.network_delete_safe(context, network_id)
-
-
-def network_disassociate(context, network_id, disassociate_host=True,
- disassociate_project=True):
- """Disassociate the network from project or host
-
- Raises if it does not exist.
- """
- return IMPL.network_disassociate(context, network_id, disassociate_host,
- disassociate_project)
-
-
-def network_get(context, network_id, project_only="allow_none"):
- """Get a network or raise if it does not exist."""
- return IMPL.network_get(context, network_id, project_only=project_only)
-
-
-def network_get_all(context, project_only="allow_none"):
- """Return all defined networks."""
- return IMPL.network_get_all(context, project_only)
-
-
-def network_get_all_by_uuids(context, network_uuids,
- project_only="allow_none"):
- """Return networks by ids."""
- return IMPL.network_get_all_by_uuids(context, network_uuids,
- project_only=project_only)
-
-
-def network_in_use_on_host(context, network_id, host=None):
- """Indicates if a network is currently in use on host."""
- return IMPL.network_in_use_on_host(context, network_id, host)
-
-
-def network_get_associated_fixed_ips(context, network_id, host=None):
- """Get all network's IPs that have been associated."""
- return IMPL.network_get_associated_fixed_ips(context, network_id, host)
-
-
-def network_get_by_uuid(context, uuid):
- """Get a network by uuid or raise if it does not exist."""
- return IMPL.network_get_by_uuid(context, uuid)
-
-
-def network_get_by_cidr(context, cidr):
- """Get a network by cidr or raise if it does not exist."""
- return IMPL.network_get_by_cidr(context, cidr)
-
-
-def network_get_all_by_host(context, host):
- """All networks for which the given host is the network host."""
- return IMPL.network_get_all_by_host(context, host)
-
-
-def network_set_host(context, network_id, host_id):
- """Safely set the host for network."""
- return IMPL.network_set_host(context, network_id, host_id)
-
-
-def network_update(context, network_id, values):
- """Set the given properties on a network and update it.
-
- Raises NotFound if network does not exist.
-
- """
- return IMPL.network_update(context, network_id, values)
-
-
###############
@@ -1355,19 +1049,6 @@ def security_group_rule_count_by_group(context, security_group_id):
###################
-def project_get_networks(context, project_id, associate=True):
- """Return the network associated with the project.
-
- If associate is true, it will attempt to associate a new
- network if one is not found, otherwise it returns None.
-
- """
- return IMPL.project_get_networks(context, project_id, associate)
-
-
-##################
-
-
def pci_device_get_by_addr(context, node_id, dev_addr):
"""Get PCI device by address."""
return IMPL.pci_device_get_by_addr(context, node_id, dev_addr)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index b3e974ab72..b496fec791 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -45,7 +45,6 @@ from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.orm import aliased
-from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import noload
from sqlalchemy.orm import undefer
@@ -888,617 +887,6 @@ def certificate_get_all_by_user_and_project(context, user_id, project_id):
@require_context
-@pick_context_manager_reader
-def floating_ip_get(context, id):
- try:
- result = model_query(context, models.FloatingIp, project_only=True).\
- filter_by(id=id).\
- options(_joinedload_all('fixed_ip.instance')).\
- first()
-
- if not result:
- raise exception.FloatingIpNotFound(id=id)
- except db_exc.DBError:
- LOG.warning("Invalid floating IP ID %s in request", id)
- raise exception.InvalidID(id=id)
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def floating_ip_get_pools(context):
- pools = []
- for result in model_query(context, models.FloatingIp,
- (models.FloatingIp.pool,)).distinct():
- pools.append({'name': result[0]})
- return pools
-
-
-@require_context
-@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
-@pick_context_manager_writer
-def floating_ip_allocate_address(context, project_id, pool,
- auto_assigned=False):
- nova.context.authorize_project_context(context, project_id)
- floating_ip_ref = model_query(context, models.FloatingIp,
- read_deleted="no").\
- filter_by(fixed_ip_id=None).\
- filter_by(project_id=None).\
- filter_by(pool=pool).\
- first()
-
- if not floating_ip_ref:
- raise exception.NoMoreFloatingIps()
-
- params = {'project_id': project_id, 'auto_assigned': auto_assigned}
-
- rows_update = model_query(context, models.FloatingIp, read_deleted="no").\
- filter_by(id=floating_ip_ref['id']).\
- filter_by(fixed_ip_id=None).\
- filter_by(project_id=None).\
- filter_by(pool=pool).\
- update(params, synchronize_session='evaluate')
-
- if not rows_update:
- LOG.debug('The row was updated in a concurrent transaction, '
- 'we will fetch another one')
- raise db_exc.RetryRequest(exception.FloatingIpAllocateFailed())
-
- return floating_ip_ref['address']
-
-
-@require_context
-@pick_context_manager_writer
-def floating_ip_bulk_create(context, ips, want_result=True):
- try:
- tab = models.FloatingIp().__table__
- context.session.execute(tab.insert(), ips)
- except db_exc.DBDuplicateEntry as e:
- raise exception.FloatingIpExists(address=e.value)
-
- if want_result:
- return model_query(context, models.FloatingIp).filter(
- models.FloatingIp.address.in_(
- [ip['address'] for ip in ips])).all()
-
-
-def _ip_range_splitter(ips, block_size=256):
- """Yields blocks of IPs no more than block_size elements long."""
- out = []
- count = 0
- for ip in ips:
- out.append(ip['address'])
- count += 1
-
- if count > block_size - 1:
- yield out
- out = []
- count = 0
-
- if out:
- yield out
-
-
-@require_context
-@pick_context_manager_writer
-def floating_ip_bulk_destroy(context, ips):
- project_id_to_quota_count = collections.defaultdict(int)
- for ip_block in _ip_range_splitter(ips):
- # Find any floating IPs that were not auto_assigned and
- # thus need quota released.
- query = model_query(context, models.FloatingIp).\
- filter(models.FloatingIp.address.in_(ip_block)).\
- filter_by(auto_assigned=False)
- for row in query.all():
- # The count is negative since we release quota by
- # reserving negative quota.
- project_id_to_quota_count[row['project_id']] -= 1
- # Delete the floating IPs.
- model_query(context, models.FloatingIp).\
- filter(models.FloatingIp.address.in_(ip_block)).\
- soft_delete(synchronize_session='fetch')
-
-
-@require_context
-@pick_context_manager_writer
-def floating_ip_create(context, values):
- floating_ip_ref = models.FloatingIp()
- floating_ip_ref.update(values)
- try:
- floating_ip_ref.save(context.session)
- except db_exc.DBDuplicateEntry:
- raise exception.FloatingIpExists(address=values['address'])
- return floating_ip_ref
-
-
-@require_context
-@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
-@pick_context_manager_writer
-def floating_ip_fixed_ip_associate(context, floating_address,
- fixed_address, host):
- fixed_ip_ref = model_query(context, models.FixedIp).\
- filter_by(address=fixed_address).\
- options(joinedload('network')).\
- first()
- if not fixed_ip_ref:
- raise exception.FixedIpNotFoundForAddress(address=fixed_address)
- rows = model_query(context, models.FloatingIp).\
- filter_by(address=floating_address).\
- filter(models.FloatingIp.project_id ==
- context.project_id).\
- filter(or_(models.FloatingIp.fixed_ip_id ==
- fixed_ip_ref['id'],
- models.FloatingIp.fixed_ip_id.is_(None))).\
- update({'fixed_ip_id': fixed_ip_ref['id'], 'host': host})
-
- if not rows:
- raise exception.FloatingIpAssociateFailed(address=floating_address)
-
- return fixed_ip_ref
-
-
-@require_context
-@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
-@pick_context_manager_writer
-def floating_ip_deallocate(context, address):
- return model_query(context, models.FloatingIp).\
- filter_by(address=address).\
- filter(and_(models.FloatingIp.project_id != null()),
- models.FloatingIp.fixed_ip_id == null()).\
- update({'project_id': None,
- 'host': None,
- 'auto_assigned': False},
- synchronize_session=False)
-
-
-@require_context
-@pick_context_manager_writer
-def floating_ip_destroy(context, address):
- model_query(context, models.FloatingIp).\
- filter_by(address=address).\
- delete()
-
-
-@require_context
-@pick_context_manager_writer
-def floating_ip_disassociate(context, address):
- floating_ip_ref = model_query(context,
- models.FloatingIp).\
- filter_by(address=address).\
- first()
- if not floating_ip_ref:
- raise exception.FloatingIpNotFoundForAddress(address=address)
-
- fixed_ip_ref = model_query(context, models.FixedIp).\
- filter_by(id=floating_ip_ref['fixed_ip_id']).\
- options(joinedload('network')).\
- first()
- floating_ip_ref.fixed_ip_id = None
- floating_ip_ref.host = None
-
- return fixed_ip_ref
-
-
-def _floating_ip_get_all(context):
- return model_query(context, models.FloatingIp, read_deleted="no")
-
-
-@pick_context_manager_reader
-def floating_ip_get_all(context):
- floating_ip_refs = _floating_ip_get_all(context).\
- options(joinedload('fixed_ip')).\
- all()
- if not floating_ip_refs:
- raise exception.NoFloatingIpsDefined()
- return floating_ip_refs
-
-
-@pick_context_manager_reader
-def floating_ip_get_all_by_host(context, host):
- floating_ip_refs = _floating_ip_get_all(context).\
- filter_by(host=host).\
- options(joinedload('fixed_ip')).\
- all()
- if not floating_ip_refs:
- raise exception.FloatingIpNotFoundForHost(host=host)
- return floating_ip_refs
-
-
-@require_context
-@pick_context_manager_reader
-def floating_ip_get_all_by_project(context, project_id):
- nova.context.authorize_project_context(context, project_id)
- # TODO(tr3buchet): why do we not want auto_assigned floating IPs here?
- return _floating_ip_get_all(context).\
- filter_by(project_id=project_id).\
- filter_by(auto_assigned=False).\
- options(_joinedload_all('fixed_ip.instance')).\
- all()
-
-
-@require_context
-@pick_context_manager_reader
-def floating_ip_get_by_address(context, address):
- return _floating_ip_get_by_address(context, address)
-
-
-def _floating_ip_get_by_address(context, address):
-
- # if address string is empty explicitly set it to None
- if not address:
- address = None
- try:
- result = model_query(context, models.FloatingIp).\
- filter_by(address=address).\
- options(_joinedload_all('fixed_ip.instance')).\
- first()
-
- if not result:
- raise exception.FloatingIpNotFoundForAddress(address=address)
- except db_exc.DBError:
- msg = _("Invalid floating IP %s in request") % address
- LOG.warning(msg)
- raise exception.InvalidIpAddressError(msg)
-
- # If the floating IP has a project ID set, check to make sure
- # the non-admin user has access.
- if result.project_id and nova.context.is_user_context(context):
- nova.context.authorize_project_context(context, result.project_id)
-
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def floating_ip_get_by_fixed_address(context, fixed_address):
- return model_query(context, models.FloatingIp).\
- outerjoin(models.FixedIp,
- models.FixedIp.id ==
- models.FloatingIp.fixed_ip_id).\
- filter(models.FixedIp.address == fixed_address).\
- all()
-
-
-@require_context
-@pick_context_manager_reader
-def floating_ip_get_by_fixed_ip_id(context, fixed_ip_id):
- return model_query(context, models.FloatingIp).\
- filter_by(fixed_ip_id=fixed_ip_id).\
- all()
-
-
-@require_context
-@pick_context_manager_writer
-def floating_ip_update(context, address, values):
- float_ip_ref = _floating_ip_get_by_address(context, address)
- float_ip_ref.update(values)
- try:
- float_ip_ref.save(context.session)
- except db_exc.DBDuplicateEntry:
- raise exception.FloatingIpExists(address=values['address'])
- return float_ip_ref
-
-
-###################
-
-
-@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
-@pick_context_manager_writer
-def fixed_ip_associate(context, address, instance_uuid, network_id=None,
- reserved=False, virtual_interface_id=None):
- """Keyword arguments:
- reserved -- should be a boolean value(True or False), exact value will be
- used to filter on the fixed IP address
- """
- if not uuidutils.is_uuid_like(instance_uuid):
- raise exception.InvalidUUID(uuid=instance_uuid)
-
- network_or_none = or_(models.FixedIp.network_id == network_id,
- models.FixedIp.network_id == null())
- fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
- filter(network_or_none).\
- filter_by(reserved=reserved).\
- filter_by(address=address).\
- first()
-
- if fixed_ip_ref is None:
- raise exception.FixedIpNotFoundForNetwork(address=address,
- network_uuid=network_id)
- if fixed_ip_ref.instance_uuid:
- raise exception.FixedIpAlreadyInUse(address=address,
- instance_uuid=instance_uuid)
-
- params = {'instance_uuid': instance_uuid,
- 'allocated': virtual_interface_id is not None}
- if not fixed_ip_ref.network_id:
- params['network_id'] = network_id
- if virtual_interface_id:
- params['virtual_interface_id'] = virtual_interface_id
-
- rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(id=fixed_ip_ref.id).\
- filter(network_or_none).\
- filter_by(reserved=reserved).\
- filter_by(address=address).\
- update(params, synchronize_session='evaluate')
-
- if not rows_updated:
- LOG.debug('The row was updated in a concurrent transaction, '
- 'we will fetch another row')
- raise db_exc.RetryRequest(
- exception.FixedIpAssociateFailed(net=network_id))
-
- return fixed_ip_ref
-
-
-@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
-@pick_context_manager_writer
-def fixed_ip_associate_pool(context, network_id, instance_uuid=None,
- host=None, virtual_interface_id=None):
- """allocate a fixed ip out of a fixed ip network pool.
-
- This allocates an unallocated fixed ip out of a specified
- network. We sort by updated_at to hand out the oldest address in
- the list.
-
- """
- if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
- raise exception.InvalidUUID(uuid=instance_uuid)
-
- network_or_none = or_(models.FixedIp.network_id == network_id,
- models.FixedIp.network_id == null())
- fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
- filter(network_or_none).\
- filter_by(reserved=False).\
- filter_by(instance_uuid=None).\
- filter_by(host=None).\
- filter_by(leased=False).\
- order_by(asc(models.FixedIp.updated_at)).\
- first()
-
- if not fixed_ip_ref:
- raise exception.NoMoreFixedIps(net=network_id)
-
- params = {'allocated': virtual_interface_id is not None}
- if fixed_ip_ref['network_id'] is None:
- params['network_id'] = network_id
- if instance_uuid:
- params['instance_uuid'] = instance_uuid
- if host:
- params['host'] = host
- if virtual_interface_id:
- params['virtual_interface_id'] = virtual_interface_id
-
- rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(id=fixed_ip_ref['id']).\
- filter_by(network_id=fixed_ip_ref['network_id']).\
- filter_by(reserved=False).\
- filter_by(instance_uuid=None).\
- filter_by(host=None).\
- filter_by(leased=False).\
- filter_by(address=fixed_ip_ref['address']).\
- update(params, synchronize_session='evaluate')
-
- if not rows_updated:
- LOG.debug('The row was updated in a concurrent transaction, '
- 'we will fetch another row')
- raise db_exc.RetryRequest(
- exception.FixedIpAssociateFailed(net=network_id))
-
- return fixed_ip_ref
-
-
-@require_context
-@pick_context_manager_writer
-def fixed_ip_create(context, values):
- fixed_ip_ref = models.FixedIp()
- fixed_ip_ref.update(values)
- try:
- fixed_ip_ref.save(context.session)
- except db_exc.DBDuplicateEntry:
- raise exception.FixedIpExists(address=values['address'])
- return fixed_ip_ref
-
-
-@require_context
-@pick_context_manager_writer
-def fixed_ip_bulk_create(context, ips):
- try:
- tab = models.FixedIp.__table__
- context.session.execute(tab.insert(), ips)
- except db_exc.DBDuplicateEntry as e:
- raise exception.FixedIpExists(address=e.value)
-
-
-@require_context
-@pick_context_manager_writer
-def fixed_ip_disassociate(context, address):
- _fixed_ip_get_by_address(context, address).update(
- {'instance_uuid': None,
- 'virtual_interface_id': None})
-
-
-@pick_context_manager_writer
-def fixed_ip_disassociate_all_by_timeout(context, host, time):
- # NOTE(vish): only update fixed ips that "belong" to this
- # host; i.e. the network host or the instance
- # host matches. Two queries necessary because
- # join with update doesn't work.
- host_filter = or_(and_(models.Instance.host == host,
- models.Network.multi_host == true()),
- models.Network.host == host)
- result = model_query(context, models.FixedIp, (models.FixedIp.id,),
- read_deleted="no").\
- filter(models.FixedIp.allocated == false()).\
- filter(models.FixedIp.updated_at < time).\
- join((models.Network,
- models.Network.id == models.FixedIp.network_id)).\
- join((models.Instance,
- models.Instance.uuid == models.FixedIp.instance_uuid)).\
- filter(host_filter).\
- all()
- fixed_ip_ids = [fip[0] for fip in result]
- if not fixed_ip_ids:
- return 0
- result = model_query(context, models.FixedIp).\
- filter(models.FixedIp.id.in_(fixed_ip_ids)).\
- update({'instance_uuid': None,
- 'leased': False,
- 'updated_at': timeutils.utcnow()},
- synchronize_session='fetch')
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def fixed_ip_get(context, id, get_network=False):
- query = model_query(context, models.FixedIp).filter_by(id=id)
- if get_network:
- query = query.options(joinedload('network'))
- result = query.first()
- if not result:
- raise exception.FixedIpNotFound(id=id)
-
- # FIXME(sirp): shouldn't we just use project_only here to restrict the
- # results?
- if (nova.context.is_user_context(context) and
- result['instance_uuid'] is not None):
- instance = instance_get_by_uuid(context.elevated(read_deleted='yes'),
- result['instance_uuid'])
- nova.context.authorize_project_context(context, instance.project_id)
-
- return result
-
-
-@pick_context_manager_reader
-def fixed_ip_get_all(context):
- result = model_query(context, models.FixedIp, read_deleted="yes").all()
- if not result:
- raise exception.NoFixedIpsDefined()
-
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def fixed_ip_get_by_address(context, address, columns_to_join=None):
- return _fixed_ip_get_by_address(context, address,
- columns_to_join=columns_to_join)
-
-
-def _fixed_ip_get_by_address(context, address, columns_to_join=None):
- if columns_to_join is None:
- columns_to_join = []
-
- try:
- result = model_query(context, models.FixedIp)
- for column in columns_to_join:
- result = result.options(_joinedload_all(column))
- result = result.filter_by(address=address).first()
- if not result:
- raise exception.FixedIpNotFoundForAddress(address=address)
- except db_exc.DBError:
- msg = _("Invalid fixed IP Address %s in request") % address
- LOG.warning(msg)
- raise exception.FixedIpInvalid(msg)
-
- # NOTE(sirp): shouldn't we just use project_only here to restrict the
- # results?
- if (nova.context.is_user_context(context) and
- result['instance_uuid'] is not None):
- instance = _instance_get_by_uuid(
- context.elevated(read_deleted='yes'),
- result['instance_uuid'])
- nova.context.authorize_project_context(context,
- instance.project_id)
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def fixed_ip_get_by_floating_address(context, floating_address):
- return model_query(context, models.FixedIp).\
- join(models.FloatingIp,
- models.FloatingIp.fixed_ip_id ==
- models.FixedIp.id).\
- filter(models.FloatingIp.address == floating_address).\
- first()
- # NOTE(tr3buchet) please don't invent an exception here, None is fine
-
-
-@require_context
-@pick_context_manager_reader
-def fixed_ip_get_by_instance(context, instance_uuid):
- if not uuidutils.is_uuid_like(instance_uuid):
- raise exception.InvalidUUID(uuid=instance_uuid)
-
- vif_and = and_(models.VirtualInterface.id ==
- models.FixedIp.virtual_interface_id,
- models.VirtualInterface.deleted == 0)
- result = model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(instance_uuid=instance_uuid).\
- outerjoin(models.VirtualInterface, vif_and).\
- options(contains_eager("virtual_interface")).\
- options(joinedload('network')).\
- options(joinedload('floating_ips')).\
- order_by(asc(models.VirtualInterface.created_at),
- asc(models.VirtualInterface.id)).\
- all()
-
- if not result:
- raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)
-
- return result
-
-
-@pick_context_manager_reader
-def fixed_ip_get_by_host(context, host):
- instance_uuids = _instance_get_all_uuids_by_hosts(
- context, [host]).get(host, [])
- if not instance_uuids:
- return []
-
- return model_query(context, models.FixedIp).\
- filter(models.FixedIp.instance_uuid.in_(instance_uuids)).\
- all()
-
-
-@require_context
-@pick_context_manager_reader
-def fixed_ip_get_by_network_host(context, network_id, host):
- result = model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(network_id=network_id).\
- filter_by(host=host).\
- first()
-
- if not result:
- raise exception.FixedIpNotFoundForNetworkHost(network_id=network_id,
- host=host)
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def fixed_ips_by_virtual_interface(context, vif_id):
- result = model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(virtual_interface_id=vif_id).\
- options(joinedload('network')).\
- options(joinedload('floating_ips')).\
- all()
-
- return result
-
-
-@require_context
-@pick_context_manager_writer
-def fixed_ip_update(context, address, values):
- _fixed_ip_get_by_address(context, address).update(values)
-
-
-###################
-
-
-@require_context
@pick_context_manager_writer
def virtual_interface_create(context, values):
"""Create a new virtual interface record in the database.
@@ -2660,21 +2048,6 @@ def instance_get_all_by_host_and_not_type(context, host, type_id=None):
filter(models.Instance.instance_type_id != type_id).all())
-@require_context
-@pick_context_manager_reader
-def instance_floating_address_get_all(context, instance_uuid):
- if not uuidutils.is_uuid_like(instance_uuid):
- raise exception.InvalidUUID(uuid=instance_uuid)
-
- floating_ips = model_query(context,
- models.FloatingIp,
- (models.FloatingIp.address,)).\
- join(models.FloatingIp.fixed_ip).\
- filter_by(instance_uuid=instance_uuid)
-
- return [floating_ip.address for floating_ip in floating_ips]
-
-
# NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0.
@pick_context_manager_reader
def instance_get_all_hung_in_rebooting(context, reboot_window):
@@ -3083,319 +2456,6 @@ def key_pair_count_by_user(context, user_id):
###################
-@pick_context_manager_writer
-def network_associate(context, project_id, network_id=None, force=False):
- """Associate a project with a network.
-
- called by project_get_networks under certain conditions
- and network manager add_network_to_project()
-
- only associate if the project doesn't already have a network
- or if force is True
-
- force solves race condition where a fresh project has multiple instance
- builds simultaneously picked up by multiple network hosts which attempt
- to associate the project with multiple networks
- force should only be used as a direct consequence of user request
- all automated requests should not use force
- """
- def network_query(project_filter, id=None):
- filter_kwargs = {'project_id': project_filter}
- if id is not None:
- filter_kwargs['id'] = id
- return model_query(context, models.Network, read_deleted="no").\
- filter_by(**filter_kwargs).\
- with_for_update().\
- first()
-
- if not force:
- # find out if project has a network
- network_ref = network_query(project_id)
-
- if force or not network_ref:
- # in force mode or project doesn't have a network so associate
- # with a new network
-
- # get new network
- network_ref = network_query(None, network_id)
- if not network_ref:
- raise exception.NoMoreNetworks()
-
- # associate with network
- # NOTE(vish): if with_lockmode isn't supported, as in sqlite,
- # then this has concurrency issues
- network_ref['project_id'] = project_id
- context.session.add(network_ref)
- return network_ref
-
-
-def _network_ips_query(context, network_id):
- return model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(network_id=network_id)
-
-
-@pick_context_manager_reader
-def network_count_reserved_ips(context, network_id):
- return _network_ips_query(context, network_id).\
- filter_by(reserved=True).\
- count()
-
-
-@pick_context_manager_writer
-def network_create_safe(context, values):
- network_ref = models.Network()
- network_ref['uuid'] = uuidutils.generate_uuid()
- network_ref.update(values)
-
- try:
- network_ref.save(context.session)
- return network_ref
- except db_exc.DBDuplicateEntry:
- raise exception.DuplicateVlan(vlan=values['vlan'])
-
-
-@pick_context_manager_writer
-def network_delete_safe(context, network_id):
- result = model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(network_id=network_id).\
- filter_by(allocated=True).\
- count()
- if result != 0:
- raise exception.NetworkInUse(network_id=network_id)
- network_ref = _network_get(context, network_id=network_id)
-
- model_query(context, models.FixedIp, read_deleted="no").\
- filter_by(network_id=network_id).\
- soft_delete()
-
- context.session.delete(network_ref)
-
-
-@pick_context_manager_writer
-def network_disassociate(context, network_id, disassociate_host,
- disassociate_project):
- net_update = {}
- if disassociate_project:
- net_update['project_id'] = None
- if disassociate_host:
- net_update['host'] = None
- network_update(context, network_id, net_update)
-
-
-def _network_get(context, network_id, project_only='allow_none'):
- result = model_query(context, models.Network, project_only=project_only).\
- filter_by(id=network_id).\
- first()
-
- if not result:
- raise exception.NetworkNotFound(network_id=network_id)
-
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def network_get(context, network_id, project_only='allow_none'):
- return _network_get(context, network_id, project_only=project_only)
-
-
-@require_context
-@pick_context_manager_reader
-def network_get_all(context, project_only):
- result = model_query(context, models.Network, read_deleted="no",
- project_only=project_only).all()
-
- if not result:
- raise exception.NoNetworksFound()
-
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def network_get_all_by_uuids(context, network_uuids, project_only):
- result = model_query(context, models.Network, read_deleted="no",
- project_only=project_only).\
- filter(models.Network.uuid.in_(network_uuids)).\
- all()
-
- if not result:
- raise exception.NoNetworksFound()
-
- # check if the result contains all the networks
- # we are looking for
- for network_uuid in network_uuids:
- for network in result:
- if network['uuid'] == network_uuid:
- break
- else:
- if project_only:
- raise exception.NetworkNotFoundForProject(
- network_uuid=network_uuid, project_id=context.project_id)
- raise exception.NetworkNotFound(network_id=network_uuid)
-
- return result
-
-
-def _get_associated_fixed_ips_query(context, network_id, host=None):
- # NOTE(vish): The ugly joins here are to solve a performance issue and
- # should be removed once we can add and remove leases
- # without regenerating the whole list
- vif_and = and_(models.VirtualInterface.id ==
- models.FixedIp.virtual_interface_id,
- models.VirtualInterface.deleted == 0)
- inst_and = and_(models.Instance.uuid == models.FixedIp.instance_uuid,
- models.Instance.deleted == 0)
- # NOTE(vish): This subquery left joins the minimum interface id for each
- # instance. If the join succeeds (i.e. the 11th column is not
- # null), then the fixed ip is on the first interface.
- subq = context.session.query(
- func.min(models.VirtualInterface.id).label("id"),
- models.VirtualInterface.instance_uuid).\
- group_by(models.VirtualInterface.instance_uuid).subquery()
- subq_and = and_(subq.c.id == models.FixedIp.virtual_interface_id,
- subq.c.instance_uuid == models.VirtualInterface.instance_uuid)
- query = context.session.query(
- models.FixedIp.address,
- models.FixedIp.instance_uuid,
- models.FixedIp.network_id,
- models.FixedIp.virtual_interface_id,
- models.VirtualInterface.address,
- models.Instance.hostname,
- models.Instance.updated_at,
- models.Instance.created_at,
- models.FixedIp.allocated,
- models.FixedIp.leased,
- subq.c.id).\
- filter(models.FixedIp.deleted == 0).\
- filter(models.FixedIp.network_id == network_id).\
- join((models.VirtualInterface, vif_and)).\
- join((models.Instance, inst_and)).\
- outerjoin((subq, subq_and)).\
- filter(models.FixedIp.instance_uuid != null()).\
- filter(models.FixedIp.virtual_interface_id != null())
- if host:
- query = query.filter(models.Instance.host == host)
- return query
-
-
-@pick_context_manager_reader
-def network_get_associated_fixed_ips(context, network_id, host=None):
- # FIXME(sirp): since this returns fixed_ips, this would be better named
- # fixed_ip_get_all_by_network.
- query = _get_associated_fixed_ips_query(context, network_id, host)
- result = query.all()
- data = []
- for datum in result:
- cleaned = {}
- cleaned['address'] = datum[0]
- cleaned['instance_uuid'] = datum[1]
- cleaned['network_id'] = datum[2]
- cleaned['vif_id'] = datum[3]
- cleaned['vif_address'] = datum[4]
- cleaned['instance_hostname'] = datum[5]
- cleaned['instance_updated'] = datum[6]
- cleaned['instance_created'] = datum[7]
- cleaned['allocated'] = datum[8]
- cleaned['leased'] = datum[9]
- # NOTE(vish): default_route is True if this fixed ip is on the first
- # interface its instance.
- cleaned['default_route'] = datum[10] is not None
- data.append(cleaned)
- return data
-
-
-@pick_context_manager_reader
-def network_in_use_on_host(context, network_id, host):
- query = _get_associated_fixed_ips_query(context, network_id, host)
- return query.count() > 0
-
-
-def _network_get_query(context):
- return model_query(context, models.Network, read_deleted="no")
-
-
-@pick_context_manager_reader
-def network_get_by_uuid(context, uuid):
- result = _network_get_query(context).filter_by(uuid=uuid).first()
-
- if not result:
- raise exception.NetworkNotFoundForUUID(uuid=uuid)
-
- return result
-
-
-@pick_context_manager_reader
-def network_get_by_cidr(context, cidr):
- result = _network_get_query(context).\
- filter(or_(models.Network.cidr == cidr,
- models.Network.cidr_v6 == cidr)).\
- first()
-
- if not result:
- raise exception.NetworkNotFoundForCidr(cidr=cidr)
-
- return result
-
-
-@pick_context_manager_reader
-def network_get_all_by_host(context, host):
- fixed_host_filter = or_(models.FixedIp.host == host,
- and_(models.FixedIp.instance_uuid != null(),
- models.Instance.host == host))
- fixed_ip_query = model_query(context, models.FixedIp,
- (models.FixedIp.network_id,)).\
- outerjoin((models.Instance,
- models.Instance.uuid ==
- models.FixedIp.instance_uuid)).\
- filter(fixed_host_filter)
- # NOTE(vish): return networks that have host set
- # or that have a fixed ip with host set
- # or that have an instance with host set
- host_filter = or_(models.Network.host == host,
- models.Network.id.in_(fixed_ip_query.subquery()))
- return _network_get_query(context).filter(host_filter).all()
-
-
-@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
-@pick_context_manager_writer
-def network_set_host(context, network_id, host_id):
- network_ref = _network_get_query(context).\
- filter_by(id=network_id).\
- first()
-
- if not network_ref:
- raise exception.NetworkNotFound(network_id=network_id)
-
- if network_ref.host:
- return None
-
- rows_updated = _network_get_query(context).\
- filter_by(id=network_id).\
- filter_by(host=None).\
- update({'host': host_id})
-
- if not rows_updated:
- LOG.debug('The row was updated in a concurrent transaction, '
- 'we will fetch another row')
- raise db_exc.RetryRequest(
- exception.NetworkSetHostFailed(network_id=network_id))
-
-
-@require_context
-@pick_context_manager_writer
-def network_update(context, network_id, values):
- network_ref = _network_get(context, network_id)
- network_ref.update(values)
- try:
- network_ref.save(context.session)
- except db_exc.DBDuplicateEntry:
- raise exception.DuplicateVlan(vlan=values['vlan'])
- return network_ref
-
-
-###################
-
@require_context
@pick_context_manager_reader
@@ -4081,106 +3141,6 @@ def security_group_destroy(context, security_group_id):
###################
-def _security_group_rule_create(context, values):
- security_group_rule_ref = models.SecurityGroupIngressRule()
- security_group_rule_ref.update(values)
- security_group_rule_ref.save(context.session)
- return security_group_rule_ref
-
-
-def _security_group_rule_get_query(context):
- return model_query(context, models.SecurityGroupIngressRule)
-
-
-@require_context
-@pick_context_manager_reader
-def security_group_rule_get(context, security_group_rule_id):
- result = (_security_group_rule_get_query(context).
- filter_by(id=security_group_rule_id).
- first())
-
- if not result:
- raise exception.SecurityGroupNotFoundForRule(
- rule_id=security_group_rule_id)
-
- return result
-
-
-@require_context
-@pick_context_manager_reader
-def security_group_rule_get_by_security_group(context, security_group_id,
- columns_to_join=None):
- if columns_to_join is None:
- columns_to_join = ['grantee_group.instances.system_metadata',
- 'grantee_group.instances.info_cache']
- query = (_security_group_rule_get_query(context).
- filter_by(parent_group_id=security_group_id))
- for column in columns_to_join:
- query = query.options(_joinedload_all(column))
- return query.all()
-
-
-@require_context
-@pick_context_manager_reader
-def security_group_rule_get_by_instance(context, instance_uuid):
- return (_security_group_rule_get_query(context).
- join('parent_group', 'instances').
- filter_by(uuid=instance_uuid).
- options(joinedload('grantee_group')).
- all())
-
-
-@require_context
-@pick_context_manager_writer
-def security_group_rule_create(context, values):
- return _security_group_rule_create(context, values)
-
-
-@require_context
-@pick_context_manager_writer
-def security_group_rule_destroy(context, security_group_rule_id):
- count = (_security_group_rule_get_query(context).
- filter_by(id=security_group_rule_id).
- soft_delete())
- if count == 0:
- raise exception.SecurityGroupNotFoundForRule(
- rule_id=security_group_rule_id)
-
-
-@require_context
-@pick_context_manager_reader
-def security_group_rule_count_by_group(context, security_group_id):
- return (model_query(context, models.SecurityGroupIngressRule,
- read_deleted="no").
- filter_by(parent_group_id=security_group_id).
- count())
-
-
-###################
-
-
-@require_context
-@pick_context_manager_writer
-def project_get_networks(context, project_id, associate=True):
- # NOTE(tr3buchet): as before this function will associate
- # a project with a network if it doesn't have one and
- # associate is true
- result = model_query(context, models.Network, read_deleted="no").\
- filter_by(project_id=project_id).\
- all()
-
- if not result:
- if not associate:
- return []
-
- return [network_associate(context, project_id)]
-
- return result
-
-
-###################
-
-
@pick_context_manager_writer
def migration_create(context, values):
migration = models.Migration()
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 8ae9415145..09ae863f2e 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -689,6 +689,8 @@ class SecurityGroup(BASE, NovaBase, models.SoftDeleteMixin):
backref='security_groups')
+# TODO(stephenfin): Remove this in the V release or later, once we're sure we
+# won't want it back (it's for nova-network, so we won't)
class SecurityGroupIngressRule(BASE, NovaBase, models.SoftDeleteMixin):
"""Represents a rule in a security group."""
__tablename__ = 'security_group_rules'
@@ -812,6 +814,8 @@ class Migration(BASE, NovaBase, models.SoftDeleteMixin):
'0)')
+# TODO(stephenfin): Remove this in the V release or later, once we're sure we
+# won't want it back (it's for nova-network, so we won't)
class Network(BASE, NovaBase, models.SoftDeleteMixin):
"""Represents a network."""
__tablename__ = 'networks'
@@ -882,7 +886,8 @@ class VirtualInterface(BASE, NovaBase, models.SoftDeleteMixin):
tag = Column(String(255))
-# TODO(vish): can these both come from the same baseclass?
+# TODO(stephenfin): Remove this in the V release or later, once we're sure we
+# won't want it back (it's for nova-network, so we won't)
class FixedIp(BASE, NovaBase, models.SoftDeleteMixin):
"""Represents a fixed IP for an instance."""
__tablename__ = 'fixed_ips'
@@ -941,6 +946,8 @@ class FixedIp(BASE, NovaBase, models.SoftDeleteMixin):
'VirtualInterface.deleted == 0)')
+# TODO(stephenfin): Remove this in the V release or later, once we're sure we
+# won't want it back (it's for nova-network, so we won't)
class FloatingIp(BASE, NovaBase, models.SoftDeleteMixin):
"""Represents a floating IP that dynamically forwards to a fixed IP."""
__tablename__ = 'floating_ips'
diff --git a/nova/exception.py b/nova/exception.py
index 55b8319ade..b8a7500c6f 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -710,23 +710,10 @@ class InstanceMappingNotFound(NotFound):
msg_fmt = _("Instance %(uuid)s has no mapping to a cell.")
-class NetworkInUse(NovaException):
- msg_fmt = _("Network %(network_id)s is still in use.")
-
-
-class NetworkSetHostFailed(NovaException):
- msg_fmt = _("Network set host failed for network %(network_id)s.")
-
-
class InvalidCidr(Invalid):
msg_fmt = _("%(cidr)s is not a valid IP network.")
-class DuplicateVlan(NovaException):
- msg_fmt = _("Detected existing vlan with id %(vlan)d")
- code = 409
-
-
class NetworkNotFound(NotFound):
msg_fmt = _("Network %(network_id)s could not be found.")
@@ -739,31 +726,10 @@ class NetworkNotFoundForBridge(NetworkNotFound):
msg_fmt = _("Network could not be found for bridge %(bridge)s")
-class NetworkNotFoundForUUID(NetworkNotFound):
- msg_fmt = _("Network could not be found for uuid %(uuid)s")
-
-
-class NetworkNotFoundForCidr(NetworkNotFound):
- msg_fmt = _("Network could not be found with cidr %(cidr)s.")
-
-
class NetworkNotFoundForInstance(NetworkNotFound):
msg_fmt = _("Network could not be found for instance %(instance_id)s.")
-class NoNetworksFound(NotFound):
- msg_fmt = _("No networks defined.")
-
-
-class NoMoreNetworks(NovaException):
- msg_fmt = _("No more available networks.")
-
-
-class NetworkNotFoundForProject(NetworkNotFound):
- msg_fmt = _("Either network uuid %(network_uuid)s is not present or "
- "is not assigned to the project %(project_id)s.")
-
-
class NetworkAmbiguous(Invalid):
msg_fmt = _("More than one possible network found. Specify "
"network ID(s) to select which one(s) to connect to.")
@@ -847,38 +813,12 @@ class AttachSRIOVPortNotSupported(Invalid):
'specified during server creation.')
-class FixedIpExists(NovaException):
- msg_fmt = _("Fixed IP %(address)s already exists.")
-
-
-class FixedIpNotFound(NotFound):
- msg_fmt = _("No fixed IP associated with id %(id)s.")
-
-
-class FixedIpNotFoundForAddress(FixedIpNotFound):
+class FixedIpNotFoundForAddress(NotFound):
msg_fmt = _("Fixed IP not found for address %(address)s.")
-class FixedIpNotFoundForInstance(FixedIpNotFound):
- msg_fmt = _("Instance %(instance_uuid)s has zero fixed IPs.")
-
-
-class FixedIpNotFoundForNetworkHost(FixedIpNotFound):
- msg_fmt = _("Network host %(host)s has zero fixed IPs "
- "in network %(network_id)s.")
-
-
-class FixedIpNotFoundForSpecificInstance(FixedIpNotFound):
- msg_fmt = _("Instance %(instance_uuid)s doesn't have fixed IP '%(ip)s'.")
-
-
-class FixedIpNotFoundForNetwork(FixedIpNotFound):
- msg_fmt = _("Fixed IP address (%(address)s) does not exist in "
- "network (%(network_uuid)s).")
-
-
-class FixedIpAssociateFailed(NovaException):
- msg_fmt = _("Fixed IP associate failed for network: %(net)s.")
+class FixedIpNotFoundForInstance(NotFound):
+ msg_fmt = _("Instance %(instance_uuid)s does not have fixed IP '%(ip)s'.")
class FixedIpAlreadyInUse(NovaException):
@@ -891,10 +831,6 @@ class FixedIpAssociatedWithMultipleInstances(NovaException):
"'%(address)s'.")
-class FixedIpInvalid(Invalid):
- msg_fmt = _("Fixed IP address %(address)s is invalid.")
-
-
class FixedIpInvalidOnHost(Invalid):
msg_fmt = _("The fixed IP associated with port %(port_id)s is not "
"compatible with the host.")
@@ -904,14 +840,6 @@ class NoMoreFixedIps(NovaException):
msg_fmt = _("No fixed IP addresses available for network: %(net)s")
-class NoFixedIpsDefined(NotFound):
- msg_fmt = _("Zero fixed IPs could be found.")
-
-
-class FloatingIpExists(NovaException):
- msg_fmt = _("Floating IP %(address)s already exists.")
-
-
class FloatingIpNotFound(NotFound):
msg_fmt = _("Floating IP not found for ID %(id)s.")
@@ -920,10 +848,6 @@ class FloatingIpNotFoundForAddress(FloatingIpNotFound):
msg_fmt = _("Floating IP not found for address %(address)s.")
-class FloatingIpNotFoundForHost(FloatingIpNotFound):
- msg_fmt = _("Floating IP not found for host %(host)s.")
-
-
class FloatingIpMultipleFoundForAddress(NovaException):
msg_fmt = _("Multiple floating IPs are found for address %(address)s.")
@@ -942,18 +866,10 @@ class FloatingIpAssociated(NovaException):
msg_fmt = _("Floating IP %(address)s is associated.")
-class NoFloatingIpsDefined(NotFound):
- msg_fmt = _("Zero floating IPs exist.")
-
-
class NoFloatingIpInterface(NotFound):
msg_fmt = _("Interface %(interface)s not found.")
-class FloatingIpAllocateFailed(NovaException):
- msg_fmt = _("Floating IP allocate failed.")
-
-
class FloatingIpAssociateFailed(NovaException):
msg_fmt = _("Floating IP %(address)s association has failed.")
@@ -1047,10 +963,6 @@ class SecurityGroupNotFoundForProject(SecurityGroupNotFound):
"for project %(project_id)s.")
-class SecurityGroupNotFoundForRule(SecurityGroupNotFound):
- msg_fmt = _("Security group with rule %(rule_id)s not found.")
-
-
class SecurityGroupExists(Invalid):
msg_fmt = _("Security group %(security_group_name)s already exists "
"for project %(project_id)s.")
diff --git a/nova/network/neutron.py b/nova/network/neutron.py
index 70291e39e1..621dfca2d0 100644
--- a/nova/network/neutron.py
+++ b/nova/network/neutron.py
@@ -1992,7 +1992,7 @@ class API(base.Base):
instance=instance)
return self._get_instance_nw_info(context, instance)
- raise exception.FixedIpNotFoundForSpecificInstance(
+ raise exception.FixedIpNotFoundForInstance(
instance_uuid=instance.uuid, ip=address)
def _get_physnet_tunneled_info(self, context, neutron, net_id):
diff --git a/nova/objects/__init__.py b/nova/objects/__init__.py
index 8ce1514f6f..09cb5f701f 100644
--- a/nova/objects/__init__.py
+++ b/nova/objects/__init__.py
@@ -35,9 +35,7 @@ def register_all():
__import__('nova.objects.console_auth_token')
__import__('nova.objects.ec2')
__import__('nova.objects.external_event')
- __import__('nova.objects.fixed_ip')
__import__('nova.objects.flavor')
- __import__('nova.objects.floating_ip')
__import__('nova.objects.host_mapping')
__import__('nova.objects.hv_spec')
__import__('nova.objects.image_meta')
@@ -55,7 +53,6 @@ def register_all():
__import__('nova.objects.migration')
__import__('nova.objects.migration_context')
__import__('nova.objects.monitor_metric')
- __import__('nova.objects.network')
__import__('nova.objects.network_metadata')
__import__('nova.objects.network_request')
__import__('nova.objects.numa')
@@ -66,7 +63,6 @@ def register_all():
__import__('nova.objects.quotas')
__import__('nova.objects.resource')
__import__('nova.objects.security_group')
- __import__('nova.objects.security_group_rule')
__import__('nova.objects.selection')
__import__('nova.objects.service')
__import__('nova.objects.task_log')
diff --git a/nova/objects/fixed_ip.py b/nova/objects/fixed_ip.py
deleted file mode 100644
index 4f0e05a3e5..0000000000
--- a/nova/objects/fixed_ip.py
+++ /dev/null
@@ -1,315 +0,0 @@
-# Copyright 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils import timeutils
-from oslo_utils import versionutils
-
-from nova.db import api as db
-from nova.db.sqlalchemy import api as db_api
-from nova.db.sqlalchemy import models
-from nova import exception
-from nova import objects
-from nova.objects import base as obj_base
-from nova.objects import fields
-from nova import utils
-
-
-FIXED_IP_OPTIONAL_ATTRS = ['instance', 'network', 'virtual_interface',
- 'floating_ips']
-
-
-# TODO(berrange): Remove NovaObjectDictCompat
-@obj_base.NovaObjectRegistry.register
-class FixedIP(obj_base.NovaPersistentObject, obj_base.NovaObject,
- obj_base.NovaObjectDictCompat):
- # Version 1.0: Initial version
- # Version 1.1: Added virtual_interface field
- # Version 1.2: Instance version 1.14
- # Version 1.3: Instance 1.15
- # Version 1.4: Added default_route field
- # Version 1.5: Added floating_ips field
- # Version 1.6: Instance 1.16
- # Version 1.7: Instance 1.17
- # Version 1.8: Instance 1.18
- # Version 1.9: Instance 1.19
- # Version 1.10: Instance 1.20
- # Version 1.11: Instance 1.21
- # Version 1.12: Instance 1.22, FloatingIPList 1.9
- # Version 1.13: Instance 1.23, FloatingIPList 1.10
- # Version 1.14: Added vif_id kwarg to associate(_pool), FloatingIPList 1.11
- VERSION = '1.14'
-
- fields = {
- 'id': fields.IntegerField(),
- 'address': fields.IPV4AndV6AddressField(),
- 'network_id': fields.IntegerField(nullable=True),
- 'virtual_interface_id': fields.IntegerField(nullable=True),
- 'instance_uuid': fields.UUIDField(nullable=True),
- 'allocated': fields.BooleanField(),
- 'leased': fields.BooleanField(),
- 'reserved': fields.BooleanField(),
- 'host': fields.StringField(nullable=True),
- 'default_route': fields.BooleanField(),
- 'instance': fields.ObjectField('Instance', nullable=True),
- 'network': fields.ObjectField('Network', nullable=True),
- 'virtual_interface': fields.ObjectField('VirtualInterface',
- nullable=True),
- # NOTE(danms): This should not ever be made lazy-loadable
- # because it would create a bit of a loop between FixedIP
- # and FloatingIP
- 'floating_ips': fields.ObjectField('FloatingIPList'),
- }
-
- def obj_make_compatible(self, primitive, target_version):
- super(FixedIP, self).obj_make_compatible(primitive, target_version)
- target_version = versionutils.convert_version_to_tuple(target_version)
- if target_version < (1, 4) and 'default_route' in primitive:
- del primitive['default_route']
-
- @staticmethod
- def _from_db_object(context, fixedip, db_fixedip, expected_attrs=None):
- if expected_attrs is None:
- expected_attrs = []
- for field in fixedip.fields:
- if field == 'default_route':
- # NOTE(danms): This field is only set when doing a
- # FixedIPList.get_by_network() because it's a relatively
- # special-case thing, so skip it here
- continue
- if field not in FIXED_IP_OPTIONAL_ATTRS:
- fixedip[field] = db_fixedip[field]
- # NOTE(danms): Instance could be deleted, and thus None
- if 'instance' in expected_attrs:
- fixedip.instance = objects.Instance._from_db_object(
- context,
- objects.Instance(context),
- db_fixedip['instance']) if db_fixedip['instance'] else None
- if 'network' in expected_attrs:
- fixedip.network = objects.Network._from_db_object(
- context,
- objects.Network(context),
- db_fixedip['network']) if db_fixedip['network'] else None
- if 'virtual_interface' in expected_attrs:
- db_vif = db_fixedip['virtual_interface']
- vif = objects.VirtualInterface._from_db_object(
- context,
- objects.VirtualInterface(context),
- db_fixedip['virtual_interface']) if db_vif else None
- fixedip.virtual_interface = vif
- if 'floating_ips' in expected_attrs:
- fixedip.floating_ips = obj_base.obj_make_list(
- context, objects.FloatingIPList(context),
- objects.FloatingIP, db_fixedip['floating_ips'])
- fixedip._context = context
- fixedip.obj_reset_changes()
- return fixedip
-
- @obj_base.remotable_classmethod
- def get_by_id(cls, context, id, expected_attrs=None):
- if expected_attrs is None:
- expected_attrs = []
- get_network = 'network' in expected_attrs
- db_fixedip = db.fixed_ip_get(context, id, get_network=get_network)
- return cls._from_db_object(context, cls(context), db_fixedip,
- expected_attrs)
-
- @obj_base.remotable_classmethod
- def get_by_address(cls, context, address, expected_attrs=None):
- if expected_attrs is None:
- expected_attrs = []
- db_fixedip = db.fixed_ip_get_by_address(context, str(address),
- columns_to_join=expected_attrs)
- return cls._from_db_object(context, cls(context), db_fixedip,
- expected_attrs)
-
- @obj_base.remotable_classmethod
- def get_by_floating_address(cls, context, address):
- db_fixedip = db.fixed_ip_get_by_floating_address(context, str(address))
- if db_fixedip is not None:
- return cls._from_db_object(context, cls(context), db_fixedip)
-
- @obj_base.remotable_classmethod
- def get_by_network_and_host(cls, context, network_id, host):
- db_fixedip = db.fixed_ip_get_by_network_host(context, network_id, host)
- return cls._from_db_object(context, cls(context), db_fixedip)
-
- @obj_base.remotable_classmethod
- def associate(cls, context, address, instance_uuid, network_id=None,
- reserved=False, vif_id=None):
- db_fixedip = db.fixed_ip_associate(context, address, instance_uuid,
- network_id=network_id,
- reserved=reserved,
- virtual_interface_id=vif_id)
- return cls._from_db_object(context, cls(context), db_fixedip)
-
- @obj_base.remotable_classmethod
- def associate_pool(cls, context, network_id, instance_uuid=None,
- host=None, vif_id=None):
- db_fixedip = db.fixed_ip_associate_pool(context, network_id,
- instance_uuid=instance_uuid,
- host=host,
- virtual_interface_id=vif_id)
- return cls._from_db_object(context, cls(context), db_fixedip)
-
- @obj_base.remotable_classmethod
- def disassociate_by_address(cls, context, address):
- db.fixed_ip_disassociate(context, address)
-
- @obj_base.remotable_classmethod
- def _disassociate_all_by_timeout(cls, context, host, time_str):
- time = timeutils.parse_isotime(time_str)
- return db.fixed_ip_disassociate_all_by_timeout(context, host, time)
-
- @classmethod
- def disassociate_all_by_timeout(cls, context, host, time):
- return cls._disassociate_all_by_timeout(context, host,
- utils.isotime(time))
-
- @obj_base.remotable
- def create(self):
- updates = self.obj_get_changes()
- if 'id' in updates:
- raise exception.ObjectActionError(action='create',
- reason='already created')
- if 'address' in updates:
- updates['address'] = str(updates['address'])
- db_fixedip = db.fixed_ip_create(self._context, updates)
- self._from_db_object(self._context, self, db_fixedip)
-
- @obj_base.remotable
- def save(self):
- updates = self.obj_get_changes()
- if 'address' in updates:
- raise exception.ObjectActionError(action='save',
- reason='address is not mutable')
- db.fixed_ip_update(self._context, str(self.address), updates)
- self.obj_reset_changes()
-
- @obj_base.remotable
- def disassociate(self):
- db.fixed_ip_disassociate(self._context, str(self.address))
- self.instance_uuid = None
- self.instance = None
- self.obj_reset_changes(['instance_uuid', 'instance'])
-
-
-@obj_base.NovaObjectRegistry.register
-class FixedIPList(obj_base.ObjectListBase, obj_base.NovaObject):
- # Version 1.0: Initial version
- # Version 1.1: Added get_by_network()
- # Version 1.2: FixedIP <= version 1.2
- # Version 1.3: FixedIP <= version 1.3
- # Version 1.4: FixedIP <= version 1.4
- # Version 1.5: FixedIP <= version 1.5, added expected attrs to gets
- # Version 1.6: FixedIP <= version 1.6
- # Version 1.7: FixedIP <= version 1.7
- # Version 1.8: FixedIP <= version 1.8
- # Version 1.9: FixedIP <= version 1.9
- # Version 1.10: FixedIP <= version 1.10
- # Version 1.11: FixedIP <= version 1.11
- # Version 1.12: FixedIP <= version 1.12
- # Version 1.13: FixedIP <= version 1.13
- # Version 1.14: FixedIP <= version 1.14
- # Version 1.15: Added get_count_by_project() for quotas
- VERSION = '1.15'
-
- fields = {
- 'objects': fields.ListOfObjectsField('FixedIP'),
- }
-
- @staticmethod
- @db_api.pick_context_manager_reader
- def _get_count_by_project_from_db(context, project_id):
- return context.session.query(models.FixedIp.id).\
- filter_by(deleted=0).\
- join(models.Instance,
- models.Instance.uuid == models.FixedIp.instance_uuid).\
- filter(models.Instance.project_id == project_id).\
- count()
-
- @obj_base.remotable_classmethod
- def get_all(cls, context):
- db_fixedips = db.fixed_ip_get_all(context)
- return obj_base.obj_make_list(context, cls(context),
- objects.FixedIP, db_fixedips)
-
- @obj_base.remotable_classmethod
- def get_by_instance_uuid(cls, context, instance_uuid):
- expected_attrs = ['network', 'virtual_interface', 'floating_ips']
- db_fixedips = db.fixed_ip_get_by_instance(context, instance_uuid)
- return obj_base.obj_make_list(context, cls(context),
- objects.FixedIP, db_fixedips,
- expected_attrs=expected_attrs)
-
- @obj_base.remotable_classmethod
- def get_by_host(cls, context, host):
- db_fixedips = db.fixed_ip_get_by_host(context, host)
- return obj_base.obj_make_list(context, cls(context),
- objects.FixedIP, db_fixedips)
-
- @obj_base.remotable_classmethod
- def get_by_virtual_interface_id(cls, context, vif_id):
- expected_attrs = ['network', 'floating_ips']
- db_fixedips = db.fixed_ips_by_virtual_interface(context, vif_id)
- return obj_base.obj_make_list(context, cls(context),
- objects.FixedIP, db_fixedips,
- expected_attrs=expected_attrs)
-
- @obj_base.remotable_classmethod
- def get_by_network(cls, context, network, host=None):
- ipinfo = db.network_get_associated_fixed_ips(context,
- network['id'],
- host=host)
- if not ipinfo:
- return []
-
- fips = cls(context=context, objects=[])
-
- for info in ipinfo:
- inst = objects.Instance(context=context,
- uuid=info['instance_uuid'],
- hostname=info['instance_hostname'],
- created_at=info['instance_created'],
- updated_at=info['instance_updated'])
- vif = objects.VirtualInterface(context=context,
- id=info['vif_id'],
- address=info['vif_address'])
- fip = objects.FixedIP(context=context,
- address=info['address'],
- instance_uuid=info['instance_uuid'],
- network_id=info['network_id'],
- virtual_interface_id=info['vif_id'],
- allocated=info['allocated'],
- leased=info['leased'],
- default_route=info['default_route'],
- instance=inst,
- virtual_interface=vif)
- fips.objects.append(fip)
- fips.obj_reset_changes()
- return fips
-
- @obj_base.remotable_classmethod
- def bulk_create(self, context, fixed_ips):
- ips = []
- for fixedip in fixed_ips:
- ip = obj_base.obj_to_primitive(fixedip)
- if 'id' in ip:
- raise exception.ObjectActionError(action='create',
- reason='already created')
- ips.append(ip)
- db.fixed_ip_bulk_create(context, ips)
-
- @obj_base.remotable_classmethod
- def get_count_by_project(cls, context, project_id):
- return cls._get_count_by_project_from_db(context, project_id)
diff --git a/nova/objects/floating_ip.py b/nova/objects/floating_ip.py
deleted file mode 100644
index 8e799bdbab..0000000000
--- a/nova/objects/floating_ip.py
+++ /dev/null
@@ -1,258 +0,0 @@
-# Copyright 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from nova.db import api as db
-from nova.db.sqlalchemy import api as db_api
-from nova.db.sqlalchemy import models
-from nova import exception
-from nova import objects
-from nova.objects import base as obj_base
-from nova.objects import fields
-
-FLOATING_IP_OPTIONAL_ATTRS = ['fixed_ip']
-
-
-# TODO(berrange): Remove NovaObjectDictCompat
-@obj_base.NovaObjectRegistry.register
-class FloatingIP(obj_base.NovaPersistentObject, obj_base.NovaObject,
- obj_base.NovaObjectDictCompat):
- # Version 1.0: Initial version
- # Version 1.1: Added _get_addresses_by_instance_uuid()
- # Version 1.2: FixedIP <= version 1.2
- # Version 1.3: FixedIP <= version 1.3
- # Version 1.4: FixedIP <= version 1.4
- # Version 1.5: FixedIP <= version 1.5
- # Version 1.6: FixedIP <= version 1.6
- # Version 1.7: FixedIP <= version 1.11
- # Version 1.8: FixedIP <= version 1.12
- # Version 1.9: FixedIP <= version 1.13
- # Version 1.10: FixedIP <= version 1.14
- VERSION = '1.10'
- fields = {
- 'id': fields.IntegerField(),
- 'address': fields.IPAddressField(),
- 'fixed_ip_id': fields.IntegerField(nullable=True),
- 'project_id': fields.UUIDField(nullable=True),
- 'host': fields.StringField(nullable=True),
- 'auto_assigned': fields.BooleanField(),
- 'pool': fields.StringField(nullable=True),
- 'interface': fields.StringField(nullable=True),
- 'fixed_ip': fields.ObjectField('FixedIP', nullable=True),
- }
-
- @staticmethod
- def _from_db_object(context, floatingip, db_floatingip,
- expected_attrs=None):
- if expected_attrs is None:
- expected_attrs = []
- for field in floatingip.fields:
- if field not in FLOATING_IP_OPTIONAL_ATTRS:
- floatingip[field] = db_floatingip[field]
- if ('fixed_ip' in expected_attrs and
- db_floatingip['fixed_ip'] is not None):
- floatingip.fixed_ip = objects.FixedIP._from_db_object(
- context, objects.FixedIP(context), db_floatingip['fixed_ip'])
- floatingip._context = context
- floatingip.obj_reset_changes()
- return floatingip
-
- def obj_load_attr(self, attrname):
- if attrname not in FLOATING_IP_OPTIONAL_ATTRS:
- raise exception.ObjectActionError(
- action='obj_load_attr',
- reason='attribute %s is not lazy-loadable' % attrname)
- if not self._context:
- raise exception.OrphanedObjectError(method='obj_load_attr',
- objtype=self.obj_name())
- if self.fixed_ip_id is not None:
- self.fixed_ip = objects.FixedIP.get_by_id(
- self._context, self.fixed_ip_id, expected_attrs=['network'])
- else:
- self.fixed_ip = None
-
- @obj_base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_floatingip = db.floating_ip_get(context, id)
- # XXX joins fixed.instance
- return cls._from_db_object(context, cls(context), db_floatingip,
- expected_attrs=['fixed_ip'])
-
- @obj_base.remotable_classmethod
- def get_by_address(cls, context, address):
- db_floatingip = db.floating_ip_get_by_address(context, str(address))
- return cls._from_db_object(context, cls(context), db_floatingip)
-
- @obj_base.remotable_classmethod
- def get_pool_names(cls, context):
- return [x['name'] for x in db.floating_ip_get_pools(context)]
-
- @obj_base.remotable_classmethod
- def allocate_address(cls, context, project_id, pool, auto_assigned=False):
- return db.floating_ip_allocate_address(context, project_id, pool,
- auto_assigned=auto_assigned)
-
- @obj_base.remotable_classmethod
- def associate(cls, context, floating_address, fixed_address, host):
- db_fixed = db.floating_ip_fixed_ip_associate(context,
- str(floating_address),
- str(fixed_address),
- host)
- if db_fixed is None:
- return None
-
- floating = FloatingIP(
- context=context, address=floating_address, host=host,
- fixed_ip_id=db_fixed['id'],
- fixed_ip=objects.FixedIP._from_db_object(
- context, objects.FixedIP(context), db_fixed,
- expected_attrs=['network']))
- return floating
-
- @obj_base.remotable_classmethod
- def deallocate(cls, context, address):
- return db.floating_ip_deallocate(context, str(address))
-
- @obj_base.remotable_classmethod
- def destroy(cls, context, address):
- db.floating_ip_destroy(context, str(address))
-
- @obj_base.remotable_classmethod
- def disassociate(cls, context, address):
- db_fixed = db.floating_ip_disassociate(context, str(address))
-
- return cls(context=context, address=address,
- fixed_ip_id=db_fixed['id'],
- fixed_ip=objects.FixedIP._from_db_object(
- context, objects.FixedIP(context), db_fixed,
- expected_attrs=['network']))
-
- @obj_base.remotable_classmethod
- def _get_addresses_by_instance_uuid(cls, context, instance_uuid):
- return db.instance_floating_address_get_all(context, instance_uuid)
-
- @classmethod
- def get_addresses_by_instance(cls, context, instance):
- return cls._get_addresses_by_instance_uuid(context, instance['uuid'])
-
- @obj_base.remotable
- def save(self):
- updates = self.obj_get_changes()
- if 'address' in updates:
- raise exception.ObjectActionError(action='save',
- reason='address is not mutable')
- if 'fixed_ip_id' in updates:
- reason = 'fixed_ip_id is not mutable'
- raise exception.ObjectActionError(action='save', reason=reason)
-
- # NOTE(danms): Make sure we don't pass the calculated fixed_ip
- # relationship to the DB update method
- updates.pop('fixed_ip', None)
-
- db_floatingip = db.floating_ip_update(self._context, str(self.address),
- updates)
- self._from_db_object(self._context, self, db_floatingip)
-
-
-@obj_base.NovaObjectRegistry.register
-class FloatingIPList(obj_base.ObjectListBase, obj_base.NovaObject):
- # Version 1.3: FloatingIP 1.2
- # Version 1.4: FloatingIP 1.3
- # Version 1.5: FloatingIP 1.4
- # Version 1.6: FloatingIP 1.5
- # Version 1.7: FloatingIP 1.6
- # Version 1.8: FloatingIP 1.7
- # Version 1.9: FloatingIP 1.8
- # Version 1.10: FloatingIP 1.9
- # Version 1.11: FloatingIP 1.10
- # Version 1.12: Added get_count_by_project() for quotas
- fields = {
- 'objects': fields.ListOfObjectsField('FloatingIP'),
- }
- VERSION = '1.12'
-
- @staticmethod
- @db_api.pick_context_manager_reader
- def _get_count_by_project_from_db(context, project_id):
- return context.session.query(models.FloatingIp.id).\
- filter_by(deleted=0).\
- filter_by(project_id=project_id).\
- filter_by(auto_assigned=False).\
- count()
-
- @obj_base.remotable_classmethod
- def get_all(cls, context):
- db_floatingips = db.floating_ip_get_all(context)
- return obj_base.obj_make_list(context, cls(context),
- objects.FloatingIP, db_floatingips)
-
- @obj_base.remotable_classmethod
- def get_by_host(cls, context, host):
- db_floatingips = db.floating_ip_get_all_by_host(context, host)
- return obj_base.obj_make_list(context, cls(context),
- objects.FloatingIP, db_floatingips)
-
- @obj_base.remotable_classmethod
- def get_by_project(cls, context, project_id):
- db_floatingips = db.floating_ip_get_all_by_project(context, project_id)
- return obj_base.obj_make_list(context, cls(context),
- objects.FloatingIP, db_floatingips)
-
- @obj_base.remotable_classmethod
- def get_by_fixed_address(cls, context, fixed_address):
- db_floatingips = db.floating_ip_get_by_fixed_address(
- context, str(fixed_address))
- return obj_base.obj_make_list(context, cls(context),
- objects.FloatingIP, db_floatingips)
-
- @obj_base.remotable_classmethod
- def get_by_fixed_ip_id(cls, context, fixed_ip_id):
- db_floatingips = db.floating_ip_get_by_fixed_ip_id(context,
- fixed_ip_id)
- return obj_base.obj_make_list(context, cls(), FloatingIP,
- db_floatingips)
-
- @staticmethod
- def make_ip_info(address, pool, interface):
- return {'address': str(address),
- 'pool': pool,
- 'interface': interface}
-
- @obj_base.remotable_classmethod
- def create(cls, context, ip_info, want_result=False):
- db_floatingips = db.floating_ip_bulk_create(context, ip_info,
- want_result=want_result)
- if want_result:
- return obj_base.obj_make_list(context, cls(), FloatingIP,
- db_floatingips)
-
- @obj_base.remotable_classmethod
- def destroy(cls, context, ips):
- db.floating_ip_bulk_destroy(context, ips)
-
- @obj_base.remotable_classmethod
- def get_count_by_project(cls, context, project_id):
- return cls._get_count_by_project_from_db(context, project_id)
-
-
-# We don't want to register this object because it will not be passed
-# around on RPC, it just makes our lives a lot easier in the API when
-# dealing with floating IP operations
-@obj_base.NovaObjectRegistry.register_if(False)
-class NeutronFloatingIP(FloatingIP):
- # Version 1.0: Initial Version
- VERSION = '1.0'
- fields = {
- 'id': fields.UUIDField(),
- 'fixed_ip_id': fields.UUIDField(nullable=True)
- }
diff --git a/nova/objects/network.py b/nova/objects/network.py
deleted file mode 100644
index eae7824ee9..0000000000
--- a/nova/objects/network.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# Copyright 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from oslo_utils import versionutils
-
-import nova.conf
-from nova.db import api as db
-from nova import exception
-from nova.i18n import _
-from nova import objects
-from nova.objects import base as obj_base
-from nova.objects import fields
-
-CONF = nova.conf.CONF
-
-
-# TODO(berrange): Remove NovaObjectDictCompat
-@obj_base.NovaObjectRegistry.register
-class Network(obj_base.NovaPersistentObject, obj_base.NovaObject,
- obj_base.NovaObjectDictCompat):
- # Version 1.0: Initial version
- # Version 1.1: Added in_use_on_host()
- # Version 1.2: Added mtu, dhcp_server, enable_dhcp, share_address
- VERSION = '1.2'
-
- fields = {
- 'id': fields.IntegerField(),
- 'label': fields.StringField(),
- 'injected': fields.BooleanField(),
- 'cidr': fields.IPV4NetworkField(nullable=True),
- 'cidr_v6': fields.IPV6NetworkField(nullable=True),
- 'multi_host': fields.BooleanField(),
- 'netmask': fields.IPV4AddressField(nullable=True),
- 'gateway': fields.IPV4AddressField(nullable=True),
- 'broadcast': fields.IPV4AddressField(nullable=True),
- 'netmask_v6': fields.IPV6AddressField(nullable=True),
- 'gateway_v6': fields.IPV6AddressField(nullable=True),
- 'bridge': fields.StringField(nullable=True),
- 'bridge_interface': fields.StringField(nullable=True),
- 'dns1': fields.IPAddressField(nullable=True),
- 'dns2': fields.IPAddressField(nullable=True),
- 'vlan': fields.IntegerField(nullable=True),
- 'vpn_public_address': fields.IPAddressField(nullable=True),
- 'vpn_public_port': fields.IntegerField(nullable=True),
- 'vpn_private_address': fields.IPAddressField(nullable=True),
- 'dhcp_start': fields.IPV4AddressField(nullable=True),
- 'rxtx_base': fields.IntegerField(nullable=True),
- 'project_id': fields.UUIDField(nullable=True),
- 'priority': fields.IntegerField(nullable=True),
- 'host': fields.StringField(nullable=True),
- 'uuid': fields.UUIDField(),
- 'mtu': fields.IntegerField(nullable=True),
- 'dhcp_server': fields.IPAddressField(nullable=True),
- 'enable_dhcp': fields.BooleanField(),
- 'share_address': fields.BooleanField(),
- }
-
- @staticmethod
- def _convert_legacy_ipv6_netmask(netmask):
- """Handle netmask_v6 possibilities from the database.
-
- Historically, this was stored as just an integral CIDR prefix,
- but in the future it should be stored as an actual netmask.
- Be tolerant of either here.
- """
- try:
- prefix = int(netmask)
- return netaddr.IPNetwork('1::/%i' % prefix).netmask
- except ValueError:
- pass
-
- try:
- return netaddr.IPNetwork(netmask).netmask
- except netaddr.AddrFormatError:
- raise ValueError(_('IPv6 netmask "%s" must be a netmask '
- 'or integral prefix') % netmask)
-
- def obj_make_compatible(self, primitive, target_version):
- target_version = versionutils.convert_version_to_tuple(target_version)
- if target_version < (1, 2):
- if 'mtu' in primitive:
- del primitive['mtu']
- if 'enable_dhcp' in primitive:
- del primitive['enable_dhcp']
- if 'dhcp_server' in primitive:
- del primitive['dhcp_server']
- if 'share_address' in primitive:
- del primitive['share_address']
-
- @staticmethod
- def _from_db_object(context, network, db_network):
- for field in network.fields:
- db_value = db_network[field]
- if field == 'netmask_v6' and db_value is not None:
- db_value = network._convert_legacy_ipv6_netmask(db_value)
- elif field == 'dhcp_server' and db_value is None:
- db_value = db_network['gateway']
-
- network[field] = db_value
- network._context = context
- network.obj_reset_changes()
- return network
-
- @obj_base.remotable_classmethod
- def get_by_id(cls, context, network_id, project_only='allow_none'):
- db_network = db.network_get(context, network_id,
- project_only=project_only)
- return cls._from_db_object(context, cls(), db_network)
-
- @obj_base.remotable_classmethod
- def get_by_uuid(cls, context, network_uuid):
- db_network = db.network_get_by_uuid(context, network_uuid)
- return cls._from_db_object(context, cls(), db_network)
-
- @obj_base.remotable_classmethod
- def get_by_cidr(cls, context, cidr):
- db_network = db.network_get_by_cidr(context, cidr)
- return cls._from_db_object(context, cls(), db_network)
-
- # TODO(stephenfin): This is no longer used and can be removed
- @obj_base.remotable_classmethod
- def associate(cls, context, project_id, network_id=None, force=False):
- db.network_associate(context, project_id, network_id=network_id,
- force=force)
-
- # TODO(stephenfin): This is no longer used and can be removed
- @obj_base.remotable_classmethod
- def disassociate(cls, context, network_id, host=False, project=False):
- db.network_disassociate(context, network_id, host, project)
-
- @obj_base.remotable_classmethod
- def in_use_on_host(cls, context, network_id, host):
- return db.network_in_use_on_host(context, network_id, host)
-
- def _get_primitive_changes(self):
- changes = {}
- for key, value in self.obj_get_changes().items():
- if isinstance(value, netaddr.IPAddress):
- changes[key] = str(value)
- else:
- changes[key] = value
- return changes
-
- @obj_base.remotable
- def create(self):
- updates = self._get_primitive_changes()
- if 'id' in updates:
- raise exception.ObjectActionError(action='create',
- reason='already created')
- db_network = db.network_create_safe(self._context, updates)
- self._from_db_object(self._context, self, db_network)
-
- @obj_base.remotable
- def destroy(self):
- db.network_delete_safe(self._context, self.id)
- self.deleted = True
- self.obj_reset_changes(['deleted'])
-
- @obj_base.remotable
- def save(self):
- context = self._context
- updates = self._get_primitive_changes()
- if 'netmask_v6' in updates:
- # NOTE(danms): For some reason, historical code stores the
- # IPv6 netmask as just the CIDR mask length, so convert that
- # back here before saving for now.
- updates['netmask_v6'] = netaddr.IPNetwork(
- updates['netmask_v6']).netmask
- set_host = 'host' in updates
- if set_host:
- db.network_set_host(context, self.id, updates.pop('host'))
- if updates:
- db_network = db.network_update(context, self.id, updates)
- elif set_host:
- db_network = db.network_get(context, self.id)
- else:
- db_network = None
- if db_network is not None:
- self._from_db_object(context, self, db_network)
-
-
-@obj_base.NovaObjectRegistry.register
-class NetworkList(obj_base.ObjectListBase, obj_base.NovaObject):
- # Version 1.0: Initial version
- # Version 1.1: Added get_by_project()
- # Version 1.2: Network <= version 1.2
- VERSION = '1.2'
-
- fields = {
- 'objects': fields.ListOfObjectsField('Network'),
- }
-
- @obj_base.remotable_classmethod
- def get_all(cls, context, project_only='allow_none'):
- db_networks = db.network_get_all(context, project_only)
- return obj_base.obj_make_list(context, cls(context), objects.Network,
- db_networks)
-
- @obj_base.remotable_classmethod
- def get_by_uuids(cls, context, network_uuids, project_only='allow_none'):
- db_networks = db.network_get_all_by_uuids(context, network_uuids,
- project_only)
- return obj_base.obj_make_list(context, cls(context), objects.Network,
- db_networks)
-
- @obj_base.remotable_classmethod
- def get_by_host(cls, context, host):
- db_networks = db.network_get_all_by_host(context, host)
- return obj_base.obj_make_list(context, cls(context), objects.Network,
- db_networks)
-
- @obj_base.remotable_classmethod
- def get_by_project(cls, context, project_id, associate=True):
- db_networks = db.project_get_networks(context, project_id,
- associate=associate)
- return obj_base.obj_make_list(context, cls(context), objects.Network,
- db_networks)
diff --git a/nova/objects/security_group_rule.py b/nova/objects/security_group_rule.py
deleted file mode 100644
index d0cf10bc72..0000000000
--- a/nova/objects/security_group_rule.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# TODO(stephenfin): This is all nova-network related and can be deleted as soon
-# as we remove the 'security_group' field from the 'Instance' object
-
-from nova.db import api as db
-from nova import exception
-from nova import objects
-from nova.objects import base
-from nova.objects import fields
-
-OPTIONAL_ATTRS = ['parent_group', 'grantee_group']
-
-
-@base.NovaObjectRegistry.register
-class SecurityGroupRule(base.NovaPersistentObject, base.NovaObject):
- # Version 1.0: Initial version
- # Version 1.1: Added create() and set id as read_only
- VERSION = '1.1'
-
- fields = {
- 'id': fields.IntegerField(read_only=True),
- 'protocol': fields.StringField(nullable=True),
- 'from_port': fields.IntegerField(nullable=True),
- 'to_port': fields.IntegerField(nullable=True),
- 'cidr': fields.IPNetworkField(nullable=True),
- 'parent_group': fields.ObjectField('SecurityGroup', nullable=True),
- 'grantee_group': fields.ObjectField('SecurityGroup', nullable=True),
- }
-
- @staticmethod
- def _from_db_subgroup(context, db_group):
- if db_group is None:
- return None
- return objects.SecurityGroup._from_db_object(
- context, objects.SecurityGroup(context), db_group)
-
- @staticmethod
- def _from_db_object(context, rule, db_rule, expected_attrs=None):
- if expected_attrs is None:
- expected_attrs = []
- for field in rule.fields:
- if field in expected_attrs:
- setattr(rule, field,
- rule._from_db_subgroup(context, db_rule[field]))
- elif field not in OPTIONAL_ATTRS:
- setattr(rule, field, db_rule[field])
- rule._context = context
- rule.obj_reset_changes()
- return rule
-
- @base.remotable
- def create(self):
- if self.obj_attr_is_set('id'):
- raise exception.ObjectActionError(action='create',
- reason='already created')
- updates = self.obj_get_changes()
- parent_group = updates.pop('parent_group', None)
- if parent_group:
- updates['parent_group_id'] = parent_group.id
- grantee_group = updates.pop('grantee_group', None)
- if grantee_group:
- updates['group_id'] = grantee_group.id
- db_rule = db.security_group_rule_create(self._context, updates)
- self._from_db_object(self._context, self, db_rule)
-
- @base.remotable_classmethod
- def get_by_id(cls, context, rule_id):
- db_rule = db.security_group_rule_get(context, rule_id)
- return cls._from_db_object(context, cls(), db_rule)
-
-
-@base.NovaObjectRegistry.register
-class SecurityGroupRuleList(base.ObjectListBase, base.NovaObject):
- fields = {
- 'objects': fields.ListOfObjectsField('SecurityGroupRule'),
- }
- VERSION = '1.2'
-
- @base.remotable_classmethod
- def get_by_security_group_id(cls, context, secgroup_id):
- db_rules = db.security_group_rule_get_by_security_group(
- context, secgroup_id, columns_to_join=['grantee_group'])
- return base.obj_make_list(context, cls(context),
- objects.SecurityGroupRule, db_rules,
- expected_attrs=['grantee_group'])
-
- @classmethod
- def get_by_security_group(cls, context, security_group):
- return cls.get_by_security_group_id(context, security_group.id)
-
- @base.remotable_classmethod
- def get_by_instance_uuid(cls, context, instance_uuid):
- db_rules = db.security_group_rule_get_by_instance(context,
- instance_uuid)
- return base.obj_make_list(context, cls(context),
- objects.SecurityGroupRule, db_rules,
- expected_attrs=['grantee_group'])
-
- @classmethod
- def get_by_instance(cls, context, instance):
- return cls.get_by_instance_uuid(context, instance.uuid)
diff --git a/nova/tests/unit/api/openstack/compute/test_multinic.py b/nova/tests/unit/api/openstack/compute/test_multinic.py
index bc98d877c7..de310c9a7a 100644
--- a/nova/tests/unit/api/openstack/compute/test_multinic.py
+++ b/nova/tests/unit/api/openstack/compute/test_multinic.py
@@ -134,7 +134,7 @@ class FixedIpTestV21(test.NoDBTestCase):
UUID, body=body)
@mock.patch.object(compute.api.API, 'remove_fixed_ip',
- side_effect=exception.FixedIpNotFoundForSpecificInstance(
+ side_effect=exception.FixedIpNotFoundForInstance(
instance_uuid=UUID, ip='10.10.10.1'))
def test_remove_fixed_ip_not_found(self, _remove_fixed_ip):
diff --git a/nova/tests/unit/db/test_db_api.py b/nova/tests/unit/db/test_db_api.py
index 3950f742e2..31ca028ab5 100644
--- a/nova/tests/unit/db/test_db_api.py
+++ b/nova/tests/unit/db/test_db_api.py
@@ -62,7 +62,6 @@ from nova.db.sqlalchemy import models
from nova.db.sqlalchemy import types as col_types
from nova.db.sqlalchemy import utils as db_utils
from nova import exception
-from nova import objects
from nova.objects import fields
from nova import test
from nova.tests import fixtures as nova_fixtures
@@ -1590,384 +1589,6 @@ class InstanceSystemMetadataTestCase(test.TestCase):
{'key': 'value'}, True)
-class SecurityGroupRuleTestCase(test.TestCase, ModelsObjectComparatorMixin):
- def setUp(self):
- super(SecurityGroupRuleTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_base_values(self):
- return {
- 'name': 'fake_sec_group',
- 'description': 'fake_sec_group_descr',
- 'user_id': 'fake',
- 'project_id': 'fake',
- 'instances': []
- }
-
- def _get_base_rule_values(self):
- return {
- 'protocol': "tcp",
- 'from_port': 80,
- 'to_port': 8080,
- 'cidr': None,
- 'deleted': 0,
- 'deleted_at': None,
- 'grantee_group': None,
- 'updated_at': None
- }
-
- def _create_security_group(self, values):
- v = self._get_base_values()
- v.update(values)
- return db.security_group_create(self.ctxt, v)
-
- def _create_security_group_rule(self, values):
- v = self._get_base_rule_values()
- v.update(values)
- return db.security_group_rule_create(self.ctxt, v)
-
- def test_security_group_rule_create(self):
- security_group_rule = self._create_security_group_rule({})
- self.assertIsNotNone(security_group_rule['id'])
- for key, value in self._get_base_rule_values().items():
- self.assertEqual(value, security_group_rule[key])
-
- def _test_security_group_rule_get_by_security_group(self, columns=None):
- instance = db.instance_create(self.ctxt,
- {'system_metadata': {'foo': 'bar'}})
- security_group = self._create_security_group({
- 'instances': [instance]})
- security_group_rule = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- security_group_rule1 = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- found_rules = db.security_group_rule_get_by_security_group(
- self.ctxt, security_group['id'], columns_to_join=columns)
- self.assertEqual(len(found_rules), 2)
- rules_ids = [security_group_rule['id'], security_group_rule1['id']]
- for rule in found_rules:
- if columns is None:
- self.assertIn('grantee_group', dict(rule))
- self.assertIn('instances',
- dict(rule.grantee_group))
- self.assertIn(
- 'system_metadata',
- dict(rule.grantee_group.instances[0]))
- self.assertIn(rule['id'], rules_ids)
- else:
- self.assertNotIn('grantee_group', dict(rule))
-
- def test_security_group_rule_get_by_security_group(self):
- self._test_security_group_rule_get_by_security_group()
-
- def test_security_group_rule_get_by_security_group_no_joins(self):
- self._test_security_group_rule_get_by_security_group(columns=[])
-
- def test_security_group_rule_get_by_instance(self):
- instance = db.instance_create(self.ctxt, {})
- security_group = self._create_security_group({
- 'instances': [instance]})
- security_group_rule = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- security_group_rule1 = self._create_security_group_rule(
- {'parent_group': security_group, 'grantee_group': security_group})
- security_group_rule_ids = [security_group_rule['id'],
- security_group_rule1['id']]
- found_rules = db.security_group_rule_get_by_instance(self.ctxt,
- instance['uuid'])
- self.assertEqual(len(found_rules), 2)
- for rule in found_rules:
- self.assertIn('grantee_group', rule)
- self.assertIn(rule['id'], security_group_rule_ids)
-
- def test_security_group_rule_destroy(self):
- self._create_security_group({'name': 'fake1'})
- self._create_security_group({'name': 'fake2'})
- security_group_rule1 = self._create_security_group_rule({})
- security_group_rule2 = self._create_security_group_rule({})
- db.security_group_rule_destroy(self.ctxt, security_group_rule1['id'])
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_rule_get,
- self.ctxt, security_group_rule1['id'])
- self._assertEqualObjects(db.security_group_rule_get(self.ctxt,
- security_group_rule2['id']),
- security_group_rule2, ['grantee_group'])
-
- def test_security_group_rule_destroy_not_found_exception(self):
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_rule_destroy, self.ctxt, 100500)
-
- def test_security_group_rule_get(self):
- security_group_rule1 = (
- self._create_security_group_rule({}))
- self._create_security_group_rule({})
- real_security_group_rule = db.security_group_rule_get(self.ctxt,
- security_group_rule1['id'])
- self._assertEqualObjects(security_group_rule1,
- real_security_group_rule, ['grantee_group'])
-
- def test_security_group_rule_get_not_found_exception(self):
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_rule_get, self.ctxt, 100500)
-
- def test_security_group_rule_count_by_group(self):
- sg1 = self._create_security_group({'name': 'fake1'})
- sg2 = self._create_security_group({'name': 'fake2'})
- rules_by_group = {sg1: [], sg2: []}
- for group in rules_by_group:
- rules = rules_by_group[group]
- for i in range(0, 10):
- rules.append(
- self._create_security_group_rule({'parent_group_id':
- group['id']}))
- db.security_group_rule_destroy(self.ctxt,
- rules_by_group[sg1][0]['id'])
- counted_groups = [db.security_group_rule_count_by_group(self.ctxt,
- group['id'])
- for group in [sg1, sg2]]
- expected = [9, 10]
- self.assertEqual(counted_groups, expected)
-
-
-class SecurityGroupTestCase(test.TestCase, ModelsObjectComparatorMixin):
- def setUp(self):
- super(SecurityGroupTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_base_values(self):
- return {
- 'name': 'fake_sec_group',
- 'description': 'fake_sec_group_descr',
- 'user_id': 'fake',
- 'project_id': 'fake',
- 'instances': []
- }
-
- def _create_security_group(self, values):
- v = self._get_base_values()
- v.update(values)
- return db.security_group_create(self.ctxt, v)
-
- def test_security_group_create(self):
- security_group = self._create_security_group({})
- self.assertIsNotNone(security_group['id'])
- for key, value in self._get_base_values().items():
- self.assertEqual(value, security_group[key])
-
- def test_security_group_destroy(self):
- security_group1 = self._create_security_group({})
- security_group2 = \
- self._create_security_group({'name': 'fake_sec_group2'})
-
- db.security_group_destroy(self.ctxt, security_group1['id'])
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_get,
- self.ctxt, security_group1['id'])
- self._assertEqualObjects(db.security_group_get(
- self.ctxt, security_group2['id'],
- columns_to_join=['instances',
- 'rules']), security_group2)
-
- def test_security_group_destroy_with_instance(self):
- security_group1 = self._create_security_group({})
-
- instance = db.instance_create(self.ctxt, {})
- db.instance_add_security_group(self.ctxt, instance.uuid,
- security_group1.id)
-
- self.assertEqual(
- 1,
- len(db.security_group_get_by_instance(self.ctxt, instance.uuid)))
-
- db.security_group_destroy(self.ctxt, security_group1['id'])
-
- self.assertEqual(
- 0,
- len(db.security_group_get_by_instance(self.ctxt, instance.uuid)))
-
- def test_security_group_get(self):
- security_group1 = self._create_security_group({})
- self._create_security_group({'name': 'fake_sec_group2'})
- real_security_group = db.security_group_get(self.ctxt,
- security_group1['id'],
- columns_to_join=['instances',
- 'rules'])
- self._assertEqualObjects(security_group1,
- real_security_group)
-
- def test_security_group_get_with_instance_columns(self):
- instance = db.instance_create(self.ctxt,
- {'system_metadata': {'foo': 'bar'}})
- secgroup = self._create_security_group({'instances': [instance]})
- secgroup = db.security_group_get(
- self.ctxt, secgroup['id'],
- columns_to_join=['instances.system_metadata'])
- inst = secgroup.instances[0]
- self.assertIn('system_metadata', dict(inst).keys())
-
- def test_security_group_get_no_instances(self):
- instance = db.instance_create(self.ctxt, {})
- sid = self._create_security_group({'instances': [instance]})['id']
-
- security_group = db.security_group_get(self.ctxt, sid,
- columns_to_join=['instances'])
- self.assertIn('instances', security_group.__dict__)
-
- security_group = db.security_group_get(self.ctxt, sid)
- self.assertNotIn('instances', security_group.__dict__)
-
- def test_security_group_get_not_found_exception(self):
- self.assertRaises(exception.SecurityGroupNotFound,
- db.security_group_get, self.ctxt, 100500)
-
- def test_security_group_get_by_name(self):
- security_group1 = self._create_security_group({'name': 'fake1'})
- security_group2 = self._create_security_group({'name': 'fake2'})
-
- real_security_group1 = db.security_group_get_by_name(
- self.ctxt,
- security_group1['project_id'],
- security_group1['name'],
- columns_to_join=None)
- real_security_group2 = db.security_group_get_by_name(
- self.ctxt,
- security_group2['project_id'],
- security_group2['name'],
- columns_to_join=None)
- self._assertEqualObjects(security_group1, real_security_group1)
- self._assertEqualObjects(security_group2, real_security_group2)
-
- def test_security_group_get_by_project(self):
- security_group1 = self._create_security_group(
- {'name': 'fake1', 'project_id': 'fake_proj1'})
- security_group2 = self._create_security_group(
- {'name': 'fake2', 'project_id': 'fake_proj2'})
-
- real1 = db.security_group_get_by_project(
- self.ctxt,
- security_group1['project_id'])
- real2 = db.security_group_get_by_project(
- self.ctxt,
- security_group2['project_id'])
-
- expected1, expected2 = [security_group1], [security_group2]
- self._assertEqualListsOfObjects(expected1, real1,
- ignored_keys=['instances'])
- self._assertEqualListsOfObjects(expected2, real2,
- ignored_keys=['instances'])
-
- def test_security_group_get_by_instance(self):
- instance = db.instance_create(self.ctxt, dict(host='foo'))
- values = [
- {'name': 'fake1', 'instances': [instance]},
- {'name': 'fake2', 'instances': [instance]},
- {'name': 'fake3', 'instances': []},
- ]
- security_groups = [self._create_security_group(vals)
- for vals in values]
-
- real = db.security_group_get_by_instance(self.ctxt,
- instance['uuid'])
- expected = security_groups[:2]
- self._assertEqualListsOfObjects(expected, real,
- ignored_keys=['instances'])
-
- def test_security_group_get_all(self):
- values = [
- {'name': 'fake1', 'project_id': 'fake_proj1'},
- {'name': 'fake2', 'project_id': 'fake_proj2'},
- ]
- security_groups = [self._create_security_group(vals)
- for vals in values]
-
- real = db.security_group_get_all(self.ctxt)
-
- self._assertEqualListsOfObjects(security_groups, real,
- ignored_keys=['instances'])
-
- def test_security_group_in_use(self):
- instance = db.instance_create(self.ctxt, dict(host='foo'))
- values = [
- {'instances': [instance],
- 'name': 'fake_in_use'},
- {'instances': []},
- ]
-
- security_groups = [self._create_security_group(vals)
- for vals in values]
-
- real = []
- for security_group in security_groups:
- in_use = db.security_group_in_use(self.ctxt,
- security_group['id'])
- real.append(in_use)
- expected = [True, False]
-
- self.assertEqual(expected, real)
-
- def test_security_group_ensure_default(self):
- self.ctxt.project_id = 'fake'
- self.ctxt.user_id = 'fake'
- self.assertEqual(0, len(db.security_group_get_by_project(
- self.ctxt,
- self.ctxt.project_id)))
-
- db.security_group_ensure_default(self.ctxt)
-
- security_groups = db.security_group_get_by_project(
- self.ctxt,
- self.ctxt.project_id)
-
- self.assertEqual(1, len(security_groups))
- self.assertEqual("default", security_groups[0]["name"])
-
- @mock.patch.object(sqlalchemy_api, '_security_group_get_by_names')
- def test_security_group_ensure_default_called_concurrently(self, sg_mock):
- # make sure NotFound is always raised here to trick Nova to insert the
- # duplicate security group entry
- sg_mock.side_effect = exception.NotFound
-
- # create the first db entry
- self.ctxt.project_id = 1
- db.security_group_ensure_default(self.ctxt)
- security_groups = db.security_group_get_by_project(
- self.ctxt,
- self.ctxt.project_id)
- self.assertEqual(1, len(security_groups))
-
- # create the second one and ensure the exception is handled properly
- default_group = db.security_group_ensure_default(self.ctxt)
- self.assertEqual('default', default_group.name)
-
- def test_security_group_update(self):
- security_group = self._create_security_group({})
- new_values = {
- 'name': 'sec_group1',
- 'description': 'sec_group_descr1',
- 'user_id': 'fake_user1',
- 'project_id': 'fake_proj1',
- }
-
- updated_group = db.security_group_update(self.ctxt,
- security_group['id'],
- new_values,
- columns_to_join=['rules.grantee_group'])
- for key, value in new_values.items():
- self.assertEqual(updated_group[key], value)
- self.assertEqual(updated_group['rules'], [])
-
- def test_security_group_update_to_duplicate(self):
- self._create_security_group(
- {'name': 'fake1', 'project_id': 'fake_proj1'})
- security_group2 = self._create_security_group(
- {'name': 'fake1', 'project_id': 'fake_proj2'})
-
- self.assertRaises(exception.SecurityGroupExists,
- db.security_group_update,
- self.ctxt, security_group2['id'],
- {'project_id': 'fake_proj1'})
-
-
@mock.patch('time.sleep', new=lambda x: None)
class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
@@ -2984,38 +2605,6 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Ensure that metadata is updated during instance_update
self._test_instance_update_updates_metadata('metadata')
- def test_instance_floating_address_get_all(self):
- ctxt = context.get_admin_context()
-
- instance1 = db.instance_create(ctxt, {'host': 'h1', 'hostname': 'n1'})
- instance2 = db.instance_create(ctxt, {'host': 'h2', 'hostname': 'n2'})
-
- fixed_addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_addresses = ['2.1.1.1', '2.1.1.2', '2.1.1.3']
- instance_uuids = [instance1['uuid'], instance1['uuid'],
- instance2['uuid']]
-
- for fixed_addr, float_addr, instance_uuid in zip(fixed_addresses,
- float_addresses,
- instance_uuids):
- db.fixed_ip_create(ctxt, {'address': fixed_addr,
- 'instance_uuid': instance_uuid})
- fixed_id = db.fixed_ip_get_by_address(ctxt, fixed_addr)['id']
- db.floating_ip_create(ctxt,
- {'address': float_addr,
- 'fixed_ip_id': fixed_id})
-
- real_float_addresses = \
- db.instance_floating_address_get_all(ctxt, instance_uuids[0])
- self.assertEqual(set(float_addresses[:2]), set(real_float_addresses))
- real_float_addresses = \
- db.instance_floating_address_get_all(ctxt, instance_uuids[2])
- self.assertEqual(set([float_addresses[2]]), set(real_float_addresses))
-
- self.assertRaises(exception.InvalidUUID,
- db.instance_floating_address_get_all,
- ctxt, 'invalid_uuid')
-
def test_instance_stringified_ips(self):
instance = self.create_instance_with_args()
instance = db.instance_update(
@@ -4318,1196 +3907,6 @@ class InstanceFaultTestCase(test.TestCase, ModelsObjectComparatorMixin):
self.assertFalse(mock_filter.called)
-@mock.patch('time.sleep', new=lambda x: None)
-class FixedIPTestCase(test.TestCase, ModelsObjectComparatorMixin):
- def setUp(self):
- super(FixedIPTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _timeout_test(self, ctxt, timeout, multi_host):
- instance = db.instance_create(ctxt, dict(host='foo'))
- net = db.network_create_safe(ctxt, dict(multi_host=multi_host,
- host='bar'))
- old = timeout - datetime.timedelta(seconds=5)
- new = timeout + datetime.timedelta(seconds=5)
- # should deallocate
- db.fixed_ip_create(ctxt, dict(allocated=False,
- instance_uuid=instance['uuid'],
- network_id=net['id'],
- updated_at=old))
- # still allocated
- db.fixed_ip_create(ctxt, dict(allocated=True,
- instance_uuid=instance['uuid'],
- network_id=net['id'],
- updated_at=old))
- # wrong network
- db.fixed_ip_create(ctxt, dict(allocated=False,
- instance_uuid=instance['uuid'],
- network_id=None,
- updated_at=old))
- # too new
- db.fixed_ip_create(ctxt, dict(allocated=False,
- instance_uuid=instance['uuid'],
- network_id=None,
- updated_at=new))
-
- def test_fixed_ip_disassociate_all_by_timeout_single_host(self):
- now = timeutils.utcnow()
- self._timeout_test(self.ctxt, now, False)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'foo', now)
- self.assertEqual(result, 0)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'bar', now)
- self.assertEqual(result, 1)
-
- def test_fixed_ip_disassociate_all_by_timeout_multi_host(self):
- now = timeutils.utcnow()
- self._timeout_test(self.ctxt, now, True)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'foo', now)
- self.assertEqual(result, 1)
- result = db.fixed_ip_disassociate_all_by_timeout(self.ctxt, 'bar', now)
- self.assertEqual(result, 0)
-
- def test_fixed_ip_get_by_floating_address(self):
- fixed_ip = db.fixed_ip_create(self.ctxt, {'address': '192.168.0.2'})
- values = {'address': '8.7.6.5',
- 'fixed_ip_id': fixed_ip['id']}
- floating = db.floating_ip_create(self.ctxt, values)['address']
- fixed_ip_ref = db.fixed_ip_get_by_floating_address(self.ctxt, floating)
- self._assertEqualObjects(fixed_ip, fixed_ip_ref)
-
- def test_fixed_ip_get_by_host(self):
- host_ips = {
- 'host1': ['1.1.1.1', '1.1.1.2', '1.1.1.3'],
- 'host2': ['1.1.1.4', '1.1.1.5'],
- 'host3': ['1.1.1.6']
- }
-
- for host, ips in host_ips.items():
- for ip in ips:
- instance_uuid = self._create_instance(host=host)
- db.fixed_ip_create(self.ctxt, {'address': ip})
- db.fixed_ip_associate(self.ctxt, ip, instance_uuid)
-
- for host, ips in host_ips.items():
- ips_on_host = [x['address']
- for x in db.fixed_ip_get_by_host(self.ctxt, host)]
- self._assertEqualListsOfPrimitivesAsSets(ips_on_host, ips)
-
- def test_fixed_ip_get_by_network_host_not_found_exception(self):
- self.assertRaises(
- exception.FixedIpNotFoundForNetworkHost,
- db.fixed_ip_get_by_network_host,
- self.ctxt, 1, 'ignore')
-
- def test_fixed_ip_get_by_network_host_fixed_ip_found(self):
- db.fixed_ip_create(self.ctxt, dict(network_id=1, host='host'))
-
- fip = db.fixed_ip_get_by_network_host(self.ctxt, 1, 'host')
-
- self.assertEqual(1, fip['network_id'])
- self.assertEqual('host', fip['host'])
-
- def _create_instance(self, **kwargs):
- instance = db.instance_create(self.ctxt, kwargs)
- return instance['uuid']
-
- def test_fixed_ip_get_by_instance_fixed_ip_found(self):
- instance_uuid = self._create_instance()
-
- FIXED_IP_ADDRESS = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS))
-
- ips_list = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
- self._assertEqualListsOfPrimitivesAsSets([FIXED_IP_ADDRESS],
- [ips_list[0].address])
-
- def test_fixed_ip_get_by_instance_multiple_fixed_ips_found(self):
- instance_uuid = self._create_instance()
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_2))
-
- ips_list = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ip_get_by_instance_inappropriate_ignored(self):
- instance_uuid = self._create_instance()
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=instance_uuid, address=FIXED_IP_ADDRESS_2))
-
- another_instance = db.instance_create(self.ctxt, {})
- db.fixed_ip_create(self.ctxt, dict(
- instance_uuid=another_instance['uuid'], address="192.168.1.7"))
-
- ips_list = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ip_get_by_instance_not_found_exception(self):
- instance_uuid = self._create_instance()
-
- self.assertRaises(exception.FixedIpNotFoundForInstance,
- db.fixed_ip_get_by_instance,
- self.ctxt, instance_uuid)
-
- def test_fixed_ips_by_virtual_interface_fixed_ip_found(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- FIXED_IP_ADDRESS = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self._assertEqualListsOfPrimitivesAsSets([FIXED_IP_ADDRESS],
- [ips_list[0].address])
-
- def test_fixed_ips_by_virtual_interface_multiple_fixed_ips_found(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_2))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ips_by_virtual_interface_inappropriate_ignored(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- FIXED_IP_ADDRESS_1 = '192.168.1.5'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_1))
- FIXED_IP_ADDRESS_2 = '192.168.1.6'
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=vif.id, address=FIXED_IP_ADDRESS_2))
-
- another_vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
- db.fixed_ip_create(self.ctxt, dict(
- virtual_interface_id=another_vif.id, address="192.168.1.7"))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self._assertEqualListsOfPrimitivesAsSets(
- [FIXED_IP_ADDRESS_1, FIXED_IP_ADDRESS_2],
- [ips_list[0].address, ips_list[1].address])
-
- def test_fixed_ips_by_virtual_interface_no_ip_found(self):
- instance_uuid = self._create_instance()
-
- vif = db.virtual_interface_create(
- self.ctxt, dict(instance_uuid=instance_uuid))
-
- ips_list = db.fixed_ips_by_virtual_interface(self.ctxt, vif.id)
- self.assertEqual(0, len(ips_list))
-
- def create_fixed_ip(self, **params):
- default_params = {'address': '192.168.0.1'}
- default_params.update(params)
- return db.fixed_ip_create(self.ctxt, default_params)['address']
-
- def test_fixed_ip_associate_fails_if_ip_not_in_network(self):
- instance_uuid = self._create_instance()
- self.assertRaises(exception.FixedIpNotFoundForNetwork,
- db.fixed_ip_associate,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_fails_if_ip_in_use(self):
- instance_uuid = self._create_instance()
-
- address = self.create_fixed_ip(instance_uuid=instance_uuid)
- self.assertRaises(exception.FixedIpAlreadyInUse,
- db.fixed_ip_associate,
- self.ctxt, address, instance_uuid)
-
- def test_fixed_ip_associate_succeeds(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip(network_id=network['id'])
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
-
- def test_fixed_ip_associate_succeeds_and_sets_network(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
- self.assertEqual(fixed_ip['network_id'], network['id'])
-
- def test_fixed_ip_associate_succeeds_retry_on_deadlock(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
-
- def fake_first():
- if mock_first.call_count == 1:
- raise db_exc.DBDeadlock()
- else:
- return objects.Instance(id=1, address=address, reserved=False,
- instance_uuid=None, network_id=None)
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- self.assertEqual(2, mock_first.call_count)
-
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
- self.assertEqual(fixed_ip['network_id'], network['id'])
-
- def test_fixed_ip_associate_succeeds_retry_on_no_rows_updated(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
-
- def fake_first():
- if mock_first.call_count == 1:
- return objects.Instance(id=2, address=address, reserved=False,
- instance_uuid=None, network_id=None)
- else:
- return objects.Instance(id=1, address=address, reserved=False,
- instance_uuid=None, network_id=None)
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'])
- self.assertEqual(2, mock_first.call_count)
-
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
- self.assertEqual(fixed_ip['network_id'], network['id'])
-
- def test_fixed_ip_associate_succeeds_retry_limit_exceeded(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip()
-
- def fake_first():
- return objects.Instance(id=2, address=address, reserved=False,
- instance_uuid=None, network_id=None)
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- self.assertRaises(exception.FixedIpAssociateFailed,
- db.fixed_ip_associate, self.ctxt, address,
- instance_uuid, network_id=network['id'])
- # 5 reties + initial attempt
- self.assertEqual(6, mock_first.call_count)
-
- def test_fixed_ip_associate_ip_not_in_network_with_no_retries(self):
- instance_uuid = self._create_instance()
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- return_value=None) as mock_first:
- self.assertRaises(exception.FixedIpNotFoundForNetwork,
- db.fixed_ip_associate,
- self.ctxt, None, instance_uuid)
- self.assertEqual(1, mock_first.call_count)
-
- def test_fixed_ip_associate_no_network_id_with_no_retries(self):
- # Tests that trying to associate an instance to a fixed IP on a network
- # but without specifying the network ID during associate will fail.
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
- address = self.create_fixed_ip(network_id=network['id'])
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- return_value=None) as mock_first:
- self.assertRaises(exception.FixedIpNotFoundForNetwork,
- db.fixed_ip_associate,
- self.ctxt, address, instance_uuid)
- self.assertEqual(1, mock_first.call_count)
-
- def test_fixed_ip_associate_with_vif(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
- vif = db.virtual_interface_create(self.ctxt, {})
- address = self.create_fixed_ip()
-
- fixed_ip = db.fixed_ip_associate(self.ctxt, address, instance_uuid,
- network_id=network['id'],
- virtual_interface_id=vif['id'])
-
- self.assertTrue(fixed_ip['allocated'])
- self.assertEqual(vif['id'], fixed_ip['virtual_interface_id'])
-
- def test_fixed_ip_associate_not_allocated_without_vif(self):
- instance_uuid = self._create_instance()
- address = self.create_fixed_ip()
-
- fixed_ip = db.fixed_ip_associate(self.ctxt, address, instance_uuid)
-
- self.assertFalse(fixed_ip['allocated'])
- self.assertIsNone(fixed_ip['virtual_interface_id'])
-
- def test_fixed_ip_associate_pool_invalid_uuid(self):
- instance_uuid = '123'
- self.assertRaises(exception.InvalidUUID, db.fixed_ip_associate_pool,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_pool_no_more_fixed_ips(self):
- instance_uuid = self._create_instance()
- self.assertRaises(exception.NoMoreFixedIps, db.fixed_ip_associate_pool,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_pool_ignores_leased_addresses(self):
- instance_uuid = self._create_instance()
- params = {'address': '192.168.1.5',
- 'leased': True}
- db.fixed_ip_create(self.ctxt, params)
- self.assertRaises(exception.NoMoreFixedIps, db.fixed_ip_associate_pool,
- self.ctxt, None, instance_uuid)
-
- def test_fixed_ip_associate_pool_succeeds(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip(network_id=network['id'])
- db.fixed_ip_associate_pool(self.ctxt, network['id'], instance_uuid)
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
-
- def test_fixed_ip_associate_pool_order(self):
- """Test that fixed_ip always uses oldest fixed_ip.
-
- We should always be using the fixed ip with the oldest
- updated_at.
- """
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
- self.addCleanup(timeutils.clear_time_override)
- start = timeutils.utcnow()
- for i in range(1, 4):
- now = start - datetime.timedelta(hours=i)
- timeutils.set_time_override(now)
- address = self.create_fixed_ip(
- updated_at=now,
- address='10.1.0.%d' % i,
- network_id=network['id'])
- db.fixed_ip_associate_pool(self.ctxt, network['id'], instance_uuid)
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(fixed_ip['instance_uuid'], instance_uuid)
-
- def test_fixed_ip_associate_pool_succeeds_fip_ref_network_id_is_none(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- self.create_fixed_ip(network_id=None)
- fixed_ip = db.fixed_ip_associate_pool(self.ctxt,
- network['id'], instance_uuid)
- self.assertEqual(instance_uuid, fixed_ip['instance_uuid'])
- self.assertEqual(network['id'], fixed_ip['network_id'])
-
- def test_fixed_ip_associate_pool_succeeds_retry(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- address = self.create_fixed_ip(network_id=network['id'])
-
- def fake_first():
- if mock_first.call_count == 1:
- return {'network_id': network['id'], 'address': 'invalid',
- 'instance_uuid': None, 'host': None, 'id': 1}
- else:
- return {'network_id': network['id'], 'address': address,
- 'instance_uuid': None, 'host': None, 'id': 1}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- db.fixed_ip_associate_pool(self.ctxt, network['id'], instance_uuid)
- self.assertEqual(2, mock_first.call_count)
-
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
- self.assertEqual(instance_uuid, fixed_ip['instance_uuid'])
-
- def test_fixed_ip_associate_pool_retry_limit_exceeded(self):
- instance_uuid = self._create_instance()
- network = db.network_create_safe(self.ctxt, {})
-
- self.create_fixed_ip(network_id=network['id'])
-
- def fake_first():
- return {'network_id': network['id'], 'address': 'invalid',
- 'instance_uuid': None, 'host': None, 'id': 1}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- self.assertRaises(exception.FixedIpAssociateFailed,
- db.fixed_ip_associate_pool, self.ctxt,
- network['id'], instance_uuid)
- # 5 retries + initial attempt
- self.assertEqual(6, mock_first.call_count)
-
- def test_fixed_ip_create_same_address(self):
- address = '192.168.1.5'
- params = {'address': address}
- db.fixed_ip_create(self.ctxt, params)
- self.assertRaises(exception.FixedIpExists, db.fixed_ip_create,
- self.ctxt, params)
-
- def test_fixed_ip_create_success(self):
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': '192.168.1.5',
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': None
- }
-
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at']
- fixed_ip_data = db.fixed_ip_create(self.ctxt, param)
- self._assertEqualObjects(param, fixed_ip_data, ignored_keys)
-
- def test_fixed_ip_bulk_create_same_address(self):
- address_1 = '192.168.1.5'
- address_2 = '192.168.1.6'
- instance_uuid = self._create_instance()
- network_id_1 = db.network_create_safe(self.ctxt, {})['id']
- network_id_2 = db.network_create_safe(self.ctxt, {})['id']
- params = [
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': address_2, 'allocated': False,
- 'instance_uuid': instance_uuid, 'network_id': network_id_1,
- 'virtual_interface_id': None},
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': address_1, 'allocated': False,
- 'instance_uuid': instance_uuid, 'network_id': network_id_1,
- 'virtual_interface_id': None},
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': 'localhost', 'address': address_2, 'allocated': True,
- 'instance_uuid': instance_uuid, 'network_id': network_id_2,
- 'virtual_interface_id': None},
- ]
-
- self.assertRaises(exception.FixedIpExists, db.fixed_ip_bulk_create,
- self.ctxt, params)
- # In this case the transaction will be rolled back and none of the ips
- # will make it to the database.
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_get_by_address, self.ctxt, address_1)
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_get_by_address, self.ctxt, address_2)
-
- def test_fixed_ip_bulk_create_success(self):
- address_1 = '192.168.1.5'
- address_2 = '192.168.1.6'
-
- instance_uuid = self._create_instance()
- network_id_1 = db.network_create_safe(self.ctxt, {})['id']
- network_id_2 = db.network_create_safe(self.ctxt, {})['id']
- params = [
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': address_1, 'allocated': False,
- 'instance_uuid': instance_uuid, 'network_id': network_id_1,
- 'virtual_interface_id': None},
- {'reserved': False, 'deleted': 0, 'leased': False,
- 'host': 'localhost', 'address': address_2, 'allocated': True,
- 'instance_uuid': instance_uuid, 'network_id': network_id_2,
- 'virtual_interface_id': None}
- ]
-
- db.fixed_ip_bulk_create(self.ctxt, params)
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at',
- 'virtual_interface', 'network', 'floating_ips']
- fixed_ip_data = db.fixed_ip_get_by_instance(self.ctxt, instance_uuid)
-
- # we have no `id` in incoming data so we can not use
- # _assertEqualListsOfObjects to compare incoming data and received
- # objects
- fixed_ip_data = sorted(fixed_ip_data, key=lambda i: i['network_id'])
- params = sorted(params, key=lambda i: i['network_id'])
- for param, ip in zip(params, fixed_ip_data):
- self._assertEqualObjects(param, ip, ignored_keys)
-
- def test_fixed_ip_disassociate(self):
- address = '192.168.1.5'
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- values = {'address': '192.168.1.5', 'instance_uuid': instance_uuid}
- vif = db.virtual_interface_create(self.ctxt, values)
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': address,
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': vif['id']
- }
- db.fixed_ip_create(self.ctxt, param)
-
- db.fixed_ip_disassociate(self.ctxt, address)
- fixed_ip_data = db.fixed_ip_get_by_address(self.ctxt, address)
- ignored_keys = ['created_at', 'id', 'deleted_at',
- 'updated_at', 'instance_uuid',
- 'virtual_interface_id']
- self._assertEqualObjects(param, fixed_ip_data, ignored_keys)
- self.assertIsNone(fixed_ip_data['instance_uuid'])
- self.assertIsNone(fixed_ip_data['virtual_interface_id'])
-
- def test_fixed_ip_get_not_found_exception(self):
- self.assertRaises(exception.FixedIpNotFound,
- db.fixed_ip_get, self.ctxt, 0)
-
- def test_fixed_ip_get_success2(self):
- address = '192.168.1.5'
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': address,
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': None
- }
- fixed_ip_id = db.fixed_ip_create(self.ctxt, param)
-
- self.ctxt.is_admin = False
- self.assertRaises(exception.Forbidden, db.fixed_ip_get,
- self.ctxt, fixed_ip_id)
-
- def test_fixed_ip_get_success(self):
- address = '192.168.1.5'
- instance_uuid = self._create_instance()
- network_id = db.network_create_safe(self.ctxt, {})['id']
- param = {
- 'reserved': False,
- 'deleted': 0,
- 'leased': False,
- 'host': '127.0.0.1',
- 'address': address,
- 'allocated': False,
- 'instance_uuid': instance_uuid,
- 'network_id': network_id,
- 'virtual_interface_id': None
- }
- db.fixed_ip_create(self.ctxt, param)
-
- fixed_ip_id = db.fixed_ip_get_by_address(self.ctxt, address)['id']
- fixed_ip_data = db.fixed_ip_get(self.ctxt, fixed_ip_id)
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at']
- self._assertEqualObjects(param, fixed_ip_data, ignored_keys)
-
- def test_fixed_ip_get_by_address(self):
- instance_uuid = self._create_instance()
- db.fixed_ip_create(self.ctxt, {'address': '1.2.3.4',
- 'instance_uuid': instance_uuid,
- })
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, '1.2.3.4',
- columns_to_join=['instance'])
- self.assertIn('instance', fixed_ip.__dict__)
- self.assertEqual(instance_uuid, fixed_ip.instance.uuid)
-
- def test_fixed_ip_update_not_found_for_address(self):
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_update, self.ctxt,
- '192.168.1.5', {})
-
- def test_fixed_ip_update(self):
- instance_uuid_1 = self._create_instance()
- instance_uuid_2 = self._create_instance()
- network_id_1 = db.network_create_safe(self.ctxt, {})['id']
- network_id_2 = db.network_create_safe(self.ctxt, {})['id']
- param_1 = {
- 'reserved': True, 'deleted': 0, 'leased': True,
- 'host': '192.168.133.1', 'address': '10.0.0.2',
- 'allocated': True, 'instance_uuid': instance_uuid_1,
- 'network_id': network_id_1, 'virtual_interface_id': '123',
- }
-
- param_2 = {
- 'reserved': False, 'deleted': 0, 'leased': False,
- 'host': '127.0.0.1', 'address': '10.0.0.3', 'allocated': False,
- 'instance_uuid': instance_uuid_2, 'network_id': network_id_2,
- 'virtual_interface_id': None
- }
-
- ignored_keys = ['created_at', 'id', 'deleted_at', 'updated_at']
- fixed_ip_addr = db.fixed_ip_create(self.ctxt, param_1)['address']
- db.fixed_ip_update(self.ctxt, fixed_ip_addr, param_2)
- fixed_ip_after_update = db.fixed_ip_get_by_address(self.ctxt,
- param_2['address'])
- self._assertEqualObjects(param_2, fixed_ip_after_update, ignored_keys)
-
-
-@mock.patch('time.sleep', new=lambda x: None)
-class FloatingIpTestCase(test.TestCase, ModelsObjectComparatorMixin):
-
- def setUp(self):
- super(FloatingIpTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_base_values(self):
- return {
- 'address': '1.1.1.1',
- 'fixed_ip_id': None,
- 'project_id': 'fake_project',
- 'host': 'fake_host',
- 'auto_assigned': False,
- 'pool': 'fake_pool',
- 'interface': 'fake_interface',
- }
-
- def _create_floating_ip(self, values):
- if not values:
- values = {}
- vals = self._get_base_values()
- vals.update(values)
- return db.floating_ip_create(self.ctxt, vals)
-
- def test_floating_ip_get(self):
- values = [{'address': '0.0.0.0'}, {'address': '1.1.1.1'}]
- floating_ips = [self._create_floating_ip(val) for val in values]
-
- for floating_ip in floating_ips:
- real_floating_ip = db.floating_ip_get(self.ctxt, floating_ip['id'])
- self._assertEqualObjects(floating_ip, real_floating_ip,
- ignored_keys=['fixed_ip'])
-
- def test_floating_ip_get_not_found(self):
- self.assertRaises(exception.FloatingIpNotFound,
- db.floating_ip_get, self.ctxt, 100500)
-
- @mock.patch.object(query.Query, 'first', side_effect=db_exc.DBError())
- def test_floating_ip_get_with_long_id_not_found(self, mock_query):
- self.assertRaises(exception.InvalidID,
- db.floating_ip_get, self.ctxt, 123456789101112)
- mock_query.assert_called_once_with()
-
- def test_floating_ip_get_pools(self):
- values = [
- {'address': '0.0.0.0', 'pool': 'abc'},
- {'address': '1.1.1.1', 'pool': 'abc'},
- {'address': '2.2.2.2', 'pool': 'def'},
- {'address': '3.3.3.3', 'pool': 'ghi'},
- ]
- for val in values:
- self._create_floating_ip(val)
- expected_pools = [{'name': x}
- for x in set(map(lambda x: x['pool'], values))]
- real_pools = db.floating_ip_get_pools(self.ctxt)
- self._assertEqualListsOfPrimitivesAsSets(real_pools, expected_pools)
-
- def test_floating_ip_allocate_address(self):
- pools = {
- 'pool1': ['0.0.0.0', '1.1.1.1'],
- 'pool2': ['2.2.2.2'],
- 'pool3': ['3.3.3.3', '4.4.4.4', '5.5.5.5']
- }
- for pool, addresses in pools.items():
- for address in addresses:
- vals = {'pool': pool, 'address': address, 'project_id': None}
- self._create_floating_ip(vals)
-
- project_id = self._get_base_values()['project_id']
- for pool, addresses in pools.items():
- alloc_addrs = []
- for i in addresses:
- float_addr = db.floating_ip_allocate_address(self.ctxt,
- project_id, pool)
- alloc_addrs.append(float_addr)
- self._assertEqualListsOfPrimitivesAsSets(alloc_addrs, addresses)
-
- def test_floating_ip_allocate_auto_assigned(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3', '1.1.1.4']
-
- float_ips = []
- for i in range(0, 2):
- float_ips.append(self._create_floating_ip(
- {"address": addresses[i]}))
- for i in range(2, 4):
- float_ips.append(self._create_floating_ip({"address": addresses[i],
- "auto_assigned": True}))
-
- for i in range(0, 2):
- float_ip = db.floating_ip_get(self.ctxt, float_ips[i].id)
- self.assertFalse(float_ip.auto_assigned)
- for i in range(2, 4):
- float_ip = db.floating_ip_get(self.ctxt, float_ips[i].id)
- self.assertTrue(float_ip.auto_assigned)
-
- def test_floating_ip_allocate_address_no_more_floating_ips(self):
- self.assertRaises(exception.NoMoreFloatingIps,
- db.floating_ip_allocate_address,
- self.ctxt, 'any_project_id', 'no_such_pool')
-
- def test_floating_ip_allocate_not_authorized(self):
- ctxt = context.RequestContext(user_id='a', project_id='abc',
- is_admin=False)
- self.assertRaises(exception.Forbidden,
- db.floating_ip_allocate_address,
- ctxt, 'other_project_id', 'any_pool')
-
- def test_floating_ip_allocate_address_succeeds_retry(self):
- pool = 'pool0'
- address = '0.0.0.0'
- vals = {'pool': pool, 'address': address, 'project_id': None}
- floating_ip = self._create_floating_ip(vals)
-
- project_id = self._get_base_values()['project_id']
-
- def fake_first():
- if mock_first.call_count == 1:
- return {'pool': pool, 'project_id': None, 'fixed_ip_id': None,
- 'address': address, 'id': 'invalid_id'}
- else:
- return {'pool': pool, 'project_id': None, 'fixed_ip_id': None,
- 'address': address, 'id': 1}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- float_addr = db.floating_ip_allocate_address(self.ctxt,
- project_id, pool)
- self.assertEqual(address, float_addr)
- self.assertEqual(2, mock_first.call_count)
-
- float_ip = db.floating_ip_get(self.ctxt, floating_ip.id)
- self.assertEqual(project_id, float_ip['project_id'])
-
- def test_floating_ip_allocate_address_retry_limit_exceeded(self):
- pool = 'pool0'
- address = '0.0.0.0'
- vals = {'pool': pool, 'address': address, 'project_id': None}
- self._create_floating_ip(vals)
-
- project_id = self._get_base_values()['project_id']
-
- def fake_first():
- return {'pool': pool, 'project_id': None, 'fixed_ip_id': None,
- 'address': address, 'id': 'invalid_id'}
-
- with mock.patch('sqlalchemy.orm.query.Query.first',
- side_effect=fake_first) as mock_first:
- self.assertRaises(exception.FloatingIpAllocateFailed,
- db.floating_ip_allocate_address, self.ctxt,
- project_id, pool)
- # 5 retries + initial attempt
- self.assertEqual(6, mock_first.call_count)
-
- def test_floating_ip_allocate_address_no_more_ips_with_no_retries(self):
- with mock.patch('sqlalchemy.orm.query.Query.first',
- return_value=None) as mock_first:
- self.assertRaises(exception.NoMoreFloatingIps,
- db.floating_ip_allocate_address,
- self.ctxt, 'any_project_id', 'no_such_pool')
- self.assertEqual(1, mock_first.call_count)
-
- def _get_existing_ips(self):
- return [ip['address'] for ip in db.floating_ip_get_all(self.ctxt)]
-
- def test_floating_ip_bulk_create(self):
- expected_ips = ['1.1.1.1', '1.1.1.2', '1.1.1.3', '1.1.1.4']
- result = db.floating_ip_bulk_create(self.ctxt,
- [{'address': x} for x in expected_ips],
- want_result=False)
- self.assertIsNone(result)
- self._assertEqualListsOfPrimitivesAsSets(self._get_existing_ips(),
- expected_ips)
-
- def test_floating_ip_bulk_create_duplicate(self):
- ips = ['1.1.1.1', '1.1.1.2', '1.1.1.3', '1.1.1.4']
- prepare_ips = lambda x: {'address': x}
-
- result = db.floating_ip_bulk_create(self.ctxt,
- list(map(prepare_ips, ips)))
- self.assertEqual(ips, [ip.address for ip in result])
- self.assertRaises(exception.FloatingIpExists,
- db.floating_ip_bulk_create,
- self.ctxt,
- list(map(prepare_ips, ['1.1.1.5', '1.1.1.4'])),
- want_result=False)
- self.assertRaises(exception.FloatingIpNotFoundForAddress,
- db.floating_ip_get_by_address,
- self.ctxt, '1.1.1.5')
-
- def test_floating_ip_bulk_destroy(self):
- ips_for_delete = []
- ips_for_non_delete = []
-
- def create_ips(i, j):
- return [{'address': '1.1.%s.%s' % (i, k)} for k in range(1, j + 1)]
-
- # NOTE(boris-42): Create more than 256 ip to check that
- # _ip_range_splitter works properly.
- for i in range(1, 3):
- ips_for_delete.extend(create_ips(i, 255))
- ips_for_non_delete.extend(create_ips(3, 255))
-
- result = db.floating_ip_bulk_create(self.ctxt,
- ips_for_delete + ips_for_non_delete,
- want_result=False)
- self.assertIsNone(result)
-
- non_bulk_ips_for_delete = create_ips(4, 3)
- non_bulk_ips_for_non_delete = create_ips(5, 3)
- non_bulk_ips = non_bulk_ips_for_delete + non_bulk_ips_for_non_delete
- for dct in non_bulk_ips:
- self._create_floating_ip(dct)
- ips_for_delete.extend(non_bulk_ips_for_delete)
- ips_for_non_delete.extend(non_bulk_ips_for_non_delete)
-
- db.floating_ip_bulk_destroy(self.ctxt, ips_for_delete)
-
- expected_addresses = [x['address'] for x in ips_for_non_delete]
- self._assertEqualListsOfPrimitivesAsSets(self._get_existing_ips(),
- expected_addresses)
-
- def test_floating_ip_create(self):
- floating_ip = self._create_floating_ip({})
- ignored_keys = ['id', 'deleted', 'deleted_at', 'updated_at',
- 'created_at']
-
- self.assertIsNotNone(floating_ip['id'])
- self._assertEqualObjects(floating_ip, self._get_base_values(),
- ignored_keys)
-
- def test_floating_ip_create_duplicate(self):
- self._create_floating_ip({})
- self.assertRaises(exception.FloatingIpExists,
- self._create_floating_ip, {})
-
- def _create_fixed_ip(self, params):
- default_params = {'address': '192.168.0.1'}
- default_params.update(params)
- return db.fixed_ip_create(self.ctxt, default_params)['address']
-
- def test_floating_ip_fixed_ip_associate(self):
- float_addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- fixed_addresses = ['2.2.2.1', '2.2.2.2', '2.2.2.3']
-
- project_id = self.ctxt.project_id
- float_ips = [self._create_floating_ip({'address': address,
- 'project_id': project_id})
- for address in float_addresses]
- fixed_addrs = [self._create_fixed_ip({'address': address})
- for address in fixed_addresses]
-
- for float_ip, fixed_addr in zip(float_ips, fixed_addrs):
- fixed_ip = db.floating_ip_fixed_ip_associate(self.ctxt,
- float_ip.address,
- fixed_addr, 'host')
- self.assertEqual(fixed_ip.address, fixed_addr)
-
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip.id)
- self.assertEqual(fixed_ip.id, updated_float_ip.fixed_ip_id)
- self.assertEqual('host', updated_float_ip.host)
-
- fixed_ip = db.floating_ip_fixed_ip_associate(self.ctxt,
- float_addresses[0],
- fixed_addresses[0],
- 'host')
- self.assertEqual(fixed_ip.address, fixed_addresses[0])
-
- def test_floating_ip_fixed_ip_associate_float_ip_not_found(self):
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.floating_ip_fixed_ip_associate,
- self.ctxt, '10.10.10.10', 'some', 'some')
-
- def test_floating_ip_associate_failed(self):
- fixed_ip = self._create_fixed_ip({'address': '7.7.7.7'})
- self.assertRaises(exception.FloatingIpAssociateFailed,
- db.floating_ip_fixed_ip_associate,
- self.ctxt, '10.10.10.10', fixed_ip, 'some')
-
- def test_floating_ip_deallocate(self):
- values = {'address': '1.1.1.1', 'project_id': 'fake', 'host': 'fake'}
- float_ip = self._create_floating_ip(values)
- rows_updated = db.floating_ip_deallocate(self.ctxt, float_ip.address)
- self.assertEqual(1, rows_updated)
-
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip.id)
- self.assertIsNone(updated_float_ip.project_id)
- self.assertIsNone(updated_float_ip.host)
- self.assertFalse(updated_float_ip.auto_assigned)
-
- def test_floating_ip_deallocate_address_not_found(self):
- self.assertEqual(0, db.floating_ip_deallocate(self.ctxt, '2.2.2.2'))
-
- def test_floating_ip_deallocate_address_associated_ip(self):
- float_address = '1.1.1.1'
- fixed_address = '2.2.2.1'
-
- project_id = self.ctxt.project_id
- float_ip = self._create_floating_ip({'address': float_address,
- 'project_id': project_id})
- fixed_addr = self._create_fixed_ip({'address': fixed_address})
- db.floating_ip_fixed_ip_associate(self.ctxt, float_ip.address,
- fixed_addr, 'host')
- self.assertEqual(0, db.floating_ip_deallocate(self.ctxt,
- float_address))
-
- def test_floating_ip_destroy(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_ips = [self._create_floating_ip({'address': addr})
- for addr in addresses]
-
- expected_len = len(addresses)
- for float_ip in float_ips:
- db.floating_ip_destroy(self.ctxt, float_ip.address)
- self.assertRaises(exception.FloatingIpNotFound,
- db.floating_ip_get, self.ctxt, float_ip.id)
- expected_len -= 1
- if expected_len > 0:
- self.assertEqual(expected_len,
- len(db.floating_ip_get_all(self.ctxt)))
- else:
- self.assertRaises(exception.NoFloatingIpsDefined,
- db.floating_ip_get_all, self.ctxt)
-
- def test_floating_ip_disassociate(self):
- float_addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- fixed_addresses = ['2.2.2.1', '2.2.2.2', '2.2.2.3']
-
- project_id = self.ctxt.project_id
- float_ips = [self._create_floating_ip({'address': address,
- 'project_id': project_id})
- for address in float_addresses]
- fixed_addrs = [self._create_fixed_ip({'address': address})
- for address in fixed_addresses]
-
- for float_ip, fixed_addr in zip(float_ips, fixed_addrs):
- db.floating_ip_fixed_ip_associate(self.ctxt,
- float_ip.address,
- fixed_addr, 'host')
-
- for float_ip, fixed_addr in zip(float_ips, fixed_addrs):
- fixed = db.floating_ip_disassociate(self.ctxt, float_ip.address)
- self.assertEqual(fixed.address, fixed_addr)
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip.id)
- self.assertIsNone(updated_float_ip.fixed_ip_id)
- self.assertIsNone(updated_float_ip.host)
-
- def test_floating_ip_disassociate_not_found(self):
- self.assertRaises(exception.FloatingIpNotFoundForAddress,
- db.floating_ip_disassociate, self.ctxt,
- '11.11.11.11')
-
- def test_floating_ip_get_all(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_ips = [self._create_floating_ip({'address': addr})
- for addr in addresses]
- self._assertEqualListsOfObjects(float_ips,
- db.floating_ip_get_all(self.ctxt),
- ignored_keys="fixed_ip")
-
- def test_floating_ip_get_all_associated(self):
- instance = db.instance_create(self.ctxt, {'uuid': 'fake'})
- project_id = self.ctxt.project_id
- float_ip = self._create_floating_ip({'address': '1.1.1.1',
- 'project_id': project_id})
- fixed_ip = self._create_fixed_ip({'address': '2.2.2.2',
- 'instance_uuid': instance.uuid})
- db.floating_ip_fixed_ip_associate(self.ctxt,
- float_ip.address,
- fixed_ip,
- 'host')
- float_ips = db.floating_ip_get_all(self.ctxt)
- self.assertEqual(1, len(float_ips))
- self.assertEqual(float_ip.address, float_ips[0].address)
- self.assertEqual(fixed_ip, float_ips[0].fixed_ip.address)
- self.assertEqual(instance.uuid, float_ips[0].fixed_ip.instance_uuid)
-
- def test_floating_ip_get_all_not_found(self):
- self.assertRaises(exception.NoFloatingIpsDefined,
- db.floating_ip_get_all, self.ctxt)
-
- def test_floating_ip_get_all_by_host(self):
- hosts = {
- 'host1': ['1.1.1.1', '1.1.1.2'],
- 'host2': ['2.1.1.1', '2.1.1.2'],
- 'host3': ['3.1.1.1', '3.1.1.2', '3.1.1.3']
- }
-
- hosts_with_float_ips = {}
- for host, addresses in hosts.items():
- hosts_with_float_ips[host] = []
- for address in addresses:
- float_ip = self._create_floating_ip({'host': host,
- 'address': address})
- hosts_with_float_ips[host].append(float_ip)
-
- for host, float_ips in hosts_with_float_ips.items():
- real_float_ips = db.floating_ip_get_all_by_host(self.ctxt, host)
- self._assertEqualListsOfObjects(float_ips, real_float_ips,
- ignored_keys="fixed_ip")
-
- def test_floating_ip_get_all_by_host_not_found(self):
- self.assertRaises(exception.FloatingIpNotFoundForHost,
- db.floating_ip_get_all_by_host,
- self.ctxt, 'non_exists_host')
-
- def test_floating_ip_get_all_by_project(self):
- projects = {
- 'pr1': ['1.1.1.1', '1.1.1.2'],
- 'pr2': ['2.1.1.1', '2.1.1.2'],
- 'pr3': ['3.1.1.1', '3.1.1.2', '3.1.1.3']
- }
-
- projects_with_float_ips = {}
- for project_id, addresses in projects.items():
- projects_with_float_ips[project_id] = []
- for address in addresses:
- float_ip = self._create_floating_ip({'project_id': project_id,
- 'address': address})
- projects_with_float_ips[project_id].append(float_ip)
-
- for project_id, float_ips in projects_with_float_ips.items():
- real_float_ips = db.floating_ip_get_all_by_project(self.ctxt,
- project_id)
- self._assertEqualListsOfObjects(float_ips, real_float_ips,
- ignored_keys='fixed_ip')
-
- def test_floating_ip_get_all_by_project_not_authorized(self):
- ctxt = context.RequestContext(user_id='a', project_id='abc',
- is_admin=False)
- self.assertRaises(exception.Forbidden,
- db.floating_ip_get_all_by_project,
- ctxt, 'other_project')
-
- def test_floating_ip_get_by_address(self):
- addresses = ['1.1.1.1', '1.1.1.2', '1.1.1.3']
- float_ips = [self._create_floating_ip({'address': addr})
- for addr in addresses]
-
- for float_ip in float_ips:
- real_float_ip = db.floating_ip_get_by_address(self.ctxt,
- float_ip.address)
- self._assertEqualObjects(float_ip, real_float_ip,
- ignored_keys='fixed_ip')
-
- def test_floating_ip_get_by_address_not_found(self):
- self.assertRaises(exception.FloatingIpNotFoundForAddress,
- db.floating_ip_get_by_address,
- self.ctxt, '20.20.20.20')
-
- @mock.patch.object(query.Query, 'first', side_effect=db_exc.DBError())
- def test_floating_ip_get_by_invalid_address(self, mock_query):
- self.assertRaises(exception.InvalidIpAddressError,
- db.floating_ip_get_by_address,
- self.ctxt, 'non_exists_host')
- mock_query.assert_called_once_with()
-
- def test_floating_ip_get_by_fixed_address(self):
- fixed_float = [
- ('1.1.1.1', '2.2.2.1'),
- ('1.1.1.2', '2.2.2.2'),
- ('1.1.1.3', '2.2.2.3')
- ]
-
- for fixed_addr, float_addr in fixed_float:
- project_id = self.ctxt.project_id
- self._create_floating_ip({'address': float_addr,
- 'project_id': project_id})
- self._create_fixed_ip({'address': fixed_addr})
- db.floating_ip_fixed_ip_associate(self.ctxt, float_addr,
- fixed_addr, 'some_host')
-
- for fixed_addr, float_addr in fixed_float:
- float_ip = db.floating_ip_get_by_fixed_address(self.ctxt,
- fixed_addr)
- self.assertEqual(float_addr, float_ip[0]['address'])
-
- def test_floating_ip_get_by_fixed_ip_id(self):
- fixed_float = [
- ('1.1.1.1', '2.2.2.1'),
- ('1.1.1.2', '2.2.2.2'),
- ('1.1.1.3', '2.2.2.3')
- ]
-
- for fixed_addr, float_addr in fixed_float:
- project_id = self.ctxt.project_id
- self._create_floating_ip({'address': float_addr,
- 'project_id': project_id})
- self._create_fixed_ip({'address': fixed_addr})
- db.floating_ip_fixed_ip_associate(self.ctxt, float_addr,
- fixed_addr, 'some_host')
-
- for fixed_addr, float_addr in fixed_float:
- fixed_ip = db.fixed_ip_get_by_address(self.ctxt, fixed_addr)
- float_ip = db.floating_ip_get_by_fixed_ip_id(self.ctxt,
- fixed_ip['id'])
- self.assertEqual(float_addr, float_ip[0]['address'])
-
- def test_floating_ip_update(self):
- float_ip = self._create_floating_ip({})
-
- values = {
- 'project_id': 'some_pr',
- 'host': 'some_host',
- 'auto_assigned': True,
- 'interface': 'some_interface',
- 'pool': 'some_pool'
- }
- floating_ref = db.floating_ip_update(self.ctxt, float_ip['address'],
- values)
- self.assertIsNotNone(floating_ref)
- updated_float_ip = db.floating_ip_get(self.ctxt, float_ip['id'])
- self._assertEqualObjects(updated_float_ip, values,
- ignored_keys=['id', 'address', 'updated_at',
- 'deleted_at', 'created_at',
- 'deleted', 'fixed_ip_id',
- 'fixed_ip'])
-
- def test_floating_ip_update_to_duplicate(self):
- float_ip1 = self._create_floating_ip({'address': '1.1.1.1'})
- float_ip2 = self._create_floating_ip({'address': '1.1.1.2'})
-
- self.assertRaises(exception.FloatingIpExists,
- db.floating_ip_update,
- self.ctxt, float_ip2['address'],
- {'address': float_ip1['address']})
-
-
class InstanceDestroyConstraints(test.TestCase):
def test_destroy_with_equal_any_constraint_met_single_value(self):
@@ -6444,313 +4843,6 @@ class VirtualInterfaceTestCase(test.TestCase, ModelsObjectComparatorMixin):
self._assertEqualObjects(updated, updated_vif, ignored_keys)
-@mock.patch('time.sleep', new=lambda x: None)
-class NetworkTestCase(test.TestCase, ModelsObjectComparatorMixin):
-
- """Tests for db.api.network_* methods."""
-
- def setUp(self):
- super(NetworkTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
-
- def _get_associated_fixed_ip(self, host, cidr, ip):
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'cidr': cidr})
- self.assertFalse(db.network_in_use_on_host(self.ctxt, network.id,
- host))
- instance = db.instance_create(self.ctxt,
- {'project_id': 'project1', 'host': host})
- virtual_interface = db.virtual_interface_create(self.ctxt,
- {'instance_uuid': instance.uuid, 'network_id': network.id,
- 'address': ip})
- db.fixed_ip_create(self.ctxt, {'address': ip,
- 'network_id': network.id, 'allocated': True,
- 'virtual_interface_id': virtual_interface.id})
- db.fixed_ip_associate(self.ctxt, ip, instance.uuid,
- network.id, virtual_interface_id=virtual_interface['id'])
- return network, instance
-
- def test_network_get_associated_default_route(self):
- network, instance = self._get_associated_fixed_ip('host.net',
- '192.0.2.0/30', '192.0.2.1')
- network2 = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'cidr': '192.0.3.0/30'})
- ip = '192.0.3.1'
- virtual_interface = db.virtual_interface_create(self.ctxt,
- {'instance_uuid': instance.uuid, 'network_id': network2.id,
- 'address': ip})
- db.fixed_ip_create(self.ctxt, {'address': ip,
- 'network_id': network2.id, 'allocated': True,
- 'virtual_interface_id': virtual_interface.id})
- db.fixed_ip_associate(self.ctxt, ip, instance.uuid,
- network2.id)
- data = db.network_get_associated_fixed_ips(self.ctxt, network.id)
- self.assertEqual(1, len(data))
- self.assertTrue(data[0]['default_route'])
- data = db.network_get_associated_fixed_ips(self.ctxt, network2.id)
- self.assertEqual(1, len(data))
- self.assertFalse(data[0]['default_route'])
-
- def test_network_get_associated_fixed_ips(self):
- network, instance = self._get_associated_fixed_ip('host.net',
- '192.0.2.0/30', '192.0.2.1')
- data = db.network_get_associated_fixed_ips(self.ctxt, network.id)
- self.assertEqual(1, len(data))
- self.assertEqual('192.0.2.1', data[0]['address'])
- self.assertEqual('192.0.2.1', data[0]['vif_address'])
- self.assertEqual(instance.uuid, data[0]['instance_uuid'])
- self.assertTrue(data[0][fields.PciDeviceStatus.ALLOCATED])
-
- def test_network_create_safe(self):
- values = {'host': 'localhost', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- self.assertEqual(36, len(network['uuid']))
- db_network = db.network_get(self.ctxt, network['id'])
- self._assertEqualObjects(network, db_network)
-
- def test_network_create_with_duplicate_vlan(self):
- values1 = {'host': 'localhost', 'project_id': 'project1', 'vlan': 1}
- values2 = {'host': 'something', 'project_id': 'project1', 'vlan': 1}
- db.network_create_safe(self.ctxt, values1)
- self.assertRaises(exception.DuplicateVlan,
- db.network_create_safe, self.ctxt, values2)
-
- def test_network_delete_safe(self):
- values = {'host': 'localhost', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- db.network_get(self.ctxt, network['id'])
- values = {'network_id': network['id'], 'address': '192.168.1.5'}
- address1 = db.fixed_ip_create(self.ctxt, values)['address']
- values = {'network_id': network['id'],
- 'address': '192.168.1.6',
- 'allocated': True}
- address2 = db.fixed_ip_create(self.ctxt, values)['address']
- self.assertRaises(exception.NetworkInUse,
- db.network_delete_safe, self.ctxt, network['id'])
- db.fixed_ip_update(self.ctxt, address2, {'allocated': False})
- network = db.network_delete_safe(self.ctxt, network['id'])
- self.assertRaises(exception.FixedIpNotFoundForAddress,
- db.fixed_ip_get_by_address, self.ctxt, address1)
- ctxt = self.ctxt.elevated(read_deleted='yes')
- fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
- self.assertTrue(fixed_ip['deleted'])
-
- def test_network_in_use_on_host(self):
- values = {'host': 'foo', 'hostname': 'myname'}
- instance = db.instance_create(self.ctxt, values)
- values = {'address': '192.168.1.5', 'instance_uuid': instance['uuid']}
- vif = db.virtual_interface_create(self.ctxt, values)
- values = {'address': '192.168.1.6',
- 'network_id': 1,
- 'allocated': True,
- 'instance_uuid': instance['uuid'],
- 'virtual_interface_id': vif['id']}
- db.fixed_ip_create(self.ctxt, values)
- self.assertTrue(db.network_in_use_on_host(self.ctxt, 1, 'foo'))
- self.assertFalse(db.network_in_use_on_host(self.ctxt, 1, 'bar'))
-
- def test_network_update_nonexistent(self):
- self.assertRaises(exception.NetworkNotFound,
- db.network_update, self.ctxt, 123456, {})
-
- def test_network_update_with_duplicate_vlan(self):
- values1 = {'host': 'localhost', 'project_id': 'project1', 'vlan': 1}
- values2 = {'host': 'something', 'project_id': 'project1', 'vlan': 2}
- network_ref = db.network_create_safe(self.ctxt, values1)
- db.network_create_safe(self.ctxt, values2)
- self.assertRaises(exception.DuplicateVlan,
- db.network_update, self.ctxt,
- network_ref["id"], values2)
-
- def test_network_update(self):
- network = db.network_create_safe(self.ctxt, {'project_id': 'project1',
- 'vlan': 1, 'host': 'test.com'})
- db.network_update(self.ctxt, network.id, {'vlan': 2})
- network_new = db.network_get(self.ctxt, network.id)
- self.assertEqual(2, network_new.vlan)
-
- def test_network_set_host_nonexistent_network(self):
- self.assertRaises(exception.NetworkNotFound, db.network_set_host,
- self.ctxt, 123456, 'nonexistent')
-
- def test_network_set_host_already_set_correct(self):
- values = {'host': 'example.com', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- self.assertIsNone(db.network_set_host(self.ctxt, network.id,
- 'example.com'))
-
- def test_network_set_host_already_set_incorrect(self):
- values = {'host': 'example.com', 'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- self.assertIsNone(db.network_set_host(self.ctxt, network.id,
- 'new.example.com'))
-
- def test_network_set_host_with_initially_no_host(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
- db.network_set_host(self.ctxt, network.id, 'example.com')
- self.assertEqual('example.com',
- db.network_get(self.ctxt, network.id).host)
-
- def test_network_set_host_succeeds_retry_on_deadlock(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
-
- def fake_update(params):
- if mock_update.call_count == 1:
- raise db_exc.DBDeadlock()
- else:
- return 1
-
- with mock.patch('sqlalchemy.orm.query.Query.update',
- side_effect=fake_update) as mock_update:
- db.network_set_host(self.ctxt, network.id, 'example.com')
- self.assertEqual(2, mock_update.call_count)
-
- def test_network_set_host_succeeds_retry_on_no_rows_updated(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
-
- def fake_update(params):
- if mock_update.call_count == 1:
- return 0
- else:
- return 1
-
- with mock.patch('sqlalchemy.orm.query.Query.update',
- side_effect=fake_update) as mock_update:
- db.network_set_host(self.ctxt, network.id, 'example.com')
- self.assertEqual(2, mock_update.call_count)
-
- def test_network_set_host_failed_with_retry_on_no_rows_updated(self):
- values = {'project_id': 'project1'}
- network = db.network_create_safe(self.ctxt, values)
-
- with mock.patch('sqlalchemy.orm.query.Query.update',
- return_value=0) as mock_update:
- self.assertRaises(exception.NetworkSetHostFailed,
- db.network_set_host, self.ctxt, network.id,
- 'example.com')
- # 5 retries + initial attempt
- self.assertEqual(6, mock_update.call_count)
-
- def test_network_get_all_by_host(self):
- self.assertEqual([],
- db.network_get_all_by_host(self.ctxt, 'example.com'))
- host = 'h1.example.com'
- # network with host set
- net1 = db.network_create_safe(self.ctxt, {'host': host})
- self._assertEqualListsOfObjects([net1],
- db.network_get_all_by_host(self.ctxt, host))
- # network with fixed ip with host set
- net2 = db.network_create_safe(self.ctxt, {})
- db.fixed_ip_create(self.ctxt, {'host': host, 'network_id': net2.id})
- db.network_get_all_by_host(self.ctxt, host)
- self._assertEqualListsOfObjects([net1, net2],
- db.network_get_all_by_host(self.ctxt, host))
- # network with instance with host set
- net3 = db.network_create_safe(self.ctxt, {})
- instance = db.instance_create(self.ctxt, {'host': host})
- db.fixed_ip_create(self.ctxt, {'network_id': net3.id,
- 'instance_uuid': instance.uuid})
- self._assertEqualListsOfObjects([net1, net2, net3],
- db.network_get_all_by_host(self.ctxt, host))
-
- def test_network_get_by_cidr(self):
- cidr = '192.0.2.0/30'
- cidr_v6 = '2001:db8:1::/64'
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'cidr': cidr, 'cidr_v6': cidr_v6})
- self._assertEqualObjects(network,
- db.network_get_by_cidr(self.ctxt, cidr))
- self._assertEqualObjects(network,
- db.network_get_by_cidr(self.ctxt, cidr_v6))
-
- def test_network_get_by_cidr_nonexistent(self):
- self.assertRaises(exception.NetworkNotFoundForCidr,
- db.network_get_by_cidr, self.ctxt, '192.0.2.0/30')
-
- def test_network_get_by_uuid(self):
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project_1'})
- self._assertEqualObjects(network,
- db.network_get_by_uuid(self.ctxt, network.uuid))
-
- def test_network_get_by_uuid_nonexistent(self):
- self.assertRaises(exception.NetworkNotFoundForUUID,
- db.network_get_by_uuid, self.ctxt, 'non-existent-uuid')
-
- def test_network_get_all_by_uuids_no_networks(self):
- self.assertRaises(exception.NoNetworksFound,
- db.network_get_all_by_uuids, self.ctxt, ['non-existent-uuid'])
-
- def test_network_get_all_by_uuids(self):
- net1 = db.network_create_safe(self.ctxt, {})
- net2 = db.network_create_safe(self.ctxt, {})
- self._assertEqualListsOfObjects([net1, net2],
- db.network_get_all_by_uuids(self.ctxt, [net1.uuid, net2.uuid]))
-
- def test_network_get_all_no_networks(self):
- self.assertRaises(exception.NoNetworksFound,
- db.network_get_all, self.ctxt)
-
- def test_network_get_all(self):
- network = db.network_create_safe(self.ctxt, {})
- network_db = db.network_get_all(self.ctxt)
- self.assertEqual(1, len(network_db))
- self._assertEqualObjects(network, network_db[0])
-
- def test_network_get_all_admin_user(self):
- network1 = db.network_create_safe(self.ctxt, {})
- network2 = db.network_create_safe(self.ctxt,
- {'project_id': 'project1'})
- self._assertEqualListsOfObjects([network1, network2],
- db.network_get_all(self.ctxt,
- project_only=True))
-
- def test_network_get_all_normal_user(self):
- normal_ctxt = context.RequestContext('fake', 'fake')
- db.network_create_safe(self.ctxt, {})
- db.network_create_safe(self.ctxt, {'project_id': 'project1'})
- network1 = db.network_create_safe(self.ctxt,
- {'project_id': 'fake'})
- network_db = db.network_get_all(normal_ctxt, project_only=True)
- self.assertEqual(1, len(network_db))
- self._assertEqualObjects(network1, network_db[0])
-
- def test_network_get(self):
- network = db.network_create_safe(self.ctxt, {})
- self._assertEqualObjects(db.network_get(self.ctxt, network.id),
- network)
- db.network_delete_safe(self.ctxt, network.id)
- self.assertRaises(exception.NetworkNotFound,
- db.network_get, self.ctxt, network.id)
-
- def test_network_associate(self):
- network = db.network_create_safe(self.ctxt, {})
- self.assertIsNone(network.project_id)
- db.network_associate(self.ctxt, "project1", network.id)
- self.assertEqual("project1", db.network_get(self.ctxt,
- network.id).project_id)
-
- def test_network_diassociate(self):
- network = db.network_create_safe(self.ctxt,
- {'project_id': 'project1', 'host': 'test.net'})
- # disassociate project
- db.network_disassociate(self.ctxt, network.id, False, True)
- self.assertIsNone(db.network_get(self.ctxt, network.id).project_id)
- # disassociate host
- db.network_disassociate(self.ctxt, network.id, True, False)
- self.assertIsNone(db.network_get(self.ctxt, network.id).host)
-
- def test_network_count_reserved_ips(self):
- net = db.network_create_safe(self.ctxt, {})
- self.assertEqual(0, db.network_count_reserved_ips(self.ctxt, net.id))
- db.fixed_ip_create(self.ctxt, {'network_id': net.id,
- 'reserved': True})
- self.assertEqual(1, db.network_count_reserved_ips(self.ctxt, net.id))
-
-
class KeyPairTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
super(KeyPairTestCase, self).setUp()
diff --git a/nova/tests/unit/objects/test_fixed_ip.py b/nova/tests/unit/objects/test_fixed_ip.py
deleted file mode 100644
index aa0dacf071..0000000000
--- a/nova/tests/unit/objects/test_fixed_ip.py
+++ /dev/null
@@ -1,382 +0,0 @@
-# Copyright 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-
-import iso8601
-import mock
-import netaddr
-from oslo_utils.fixture import uuidsentinel as uuids
-from oslo_utils import timeutils
-from oslo_versionedobjects import base as ovo_base
-
-from nova import exception
-from nova.objects import fixed_ip
-from nova.objects import instance as instance_obj
-from nova.tests.unit import fake_instance
-from nova.tests.unit.objects import test_network
-from nova.tests.unit.objects import test_objects
-from nova import utils
-
-fake_fixed_ip = {
- 'created_at': None,
- 'updated_at': None,
- 'deleted_at': None,
- 'deleted': False,
- 'id': 123,
- 'address': '192.168.1.100',
- 'network_id': None,
- 'virtual_interface_id': None,
- 'instance_uuid': None,
- 'allocated': False,
- 'leased': False,
- 'reserved': False,
- 'host': None,
- 'network': None,
- 'virtual_interface': None,
- 'floating_ips': [],
- }
-
-
-class _TestFixedIPObject(object):
- def _compare(self, obj, db_obj):
- for field in obj.fields:
- if field in ('default_route', 'floating_ips'):
- continue
- if field in fixed_ip.FIXED_IP_OPTIONAL_ATTRS:
- if obj.obj_attr_is_set(field) and db_obj[field] is not None:
- obj_val = obj[field].uuid
- db_val = db_obj[field]['uuid']
- else:
- continue
- else:
- obj_val = obj[field]
- db_val = db_obj[field]
- if isinstance(obj_val, netaddr.IPAddress):
- obj_val = str(obj_val)
- self.assertEqual(db_val, obj_val)
-
- @mock.patch('nova.db.api.fixed_ip_get')
- def test_get_by_id(self, get):
- get.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.get_by_id(self.context, 123)
- get.assert_called_once_with(self.context, 123, get_network=False)
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_get')
- @mock.patch('nova.db.api.network_get')
- def test_get_by_id_with_extras(self, network_get, fixed_get):
- db_fixed = dict(fake_fixed_ip,
- network=test_network.fake_network)
- fixed_get.return_value = db_fixed
- fixedip = fixed_ip.FixedIP.get_by_id(self.context, 123,
- expected_attrs=['network'])
- fixed_get.assert_called_once_with(self.context, 123, get_network=True)
- self._compare(fixedip, db_fixed)
- self.assertEqual(db_fixed['network']['uuid'], fixedip.network.uuid)
- self.assertFalse(network_get.called)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_address')
- def test_get_by_address(self, get):
- get.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.get_by_address(self.context, '1.2.3.4')
- get.assert_called_once_with(self.context, '1.2.3.4',
- columns_to_join=[])
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_address')
- @mock.patch('nova.db.api.network_get')
- @mock.patch('nova.db.api.instance_get')
- def test_get_by_address_with_extras(self, instance_get, network_get,
- fixed_get):
- db_fixed = dict(fake_fixed_ip, network=test_network.fake_network,
- instance=fake_instance.fake_db_instance())
- fixed_get.return_value = db_fixed
- fixedip = fixed_ip.FixedIP.get_by_address(self.context, '1.2.3.4',
- expected_attrs=['network',
- 'instance'])
- fixed_get.assert_called_once_with(self.context, '1.2.3.4',
- columns_to_join=['network',
- 'instance'])
- self._compare(fixedip, db_fixed)
- self.assertEqual(db_fixed['network']['uuid'], fixedip.network.uuid)
- self.assertEqual(db_fixed['instance']['uuid'], fixedip.instance.uuid)
- self.assertFalse(network_get.called)
- self.assertFalse(instance_get.called)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_address')
- @mock.patch('nova.db.api.network_get')
- @mock.patch('nova.db.api.instance_get')
- def test_get_by_address_with_extras_deleted_instance(self, instance_get,
- network_get,
- fixed_get):
- db_fixed = dict(fake_fixed_ip, network=test_network.fake_network,
- instance=None)
- fixed_get.return_value = db_fixed
- fixedip = fixed_ip.FixedIP.get_by_address(self.context, '1.2.3.4',
- expected_attrs=['network',
- 'instance'])
- fixed_get.assert_called_once_with(self.context, '1.2.3.4',
- columns_to_join=['network',
- 'instance'])
- self._compare(fixedip, db_fixed)
- self.assertEqual(db_fixed['network']['uuid'], fixedip.network.uuid)
- self.assertIsNone(fixedip.instance)
- self.assertFalse(network_get.called)
- self.assertFalse(instance_get.called)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_floating_address')
- def test_get_by_floating_address(self, get):
- get.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.get_by_floating_address(self.context,
- '1.2.3.4')
- get.assert_called_once_with(self.context, '1.2.3.4')
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_floating_address')
- def test_get_by_floating_address_none(self, get):
- get.return_value = None
- fixedip = fixed_ip.FixedIP.get_by_floating_address(self.context,
- '1.2.3.4')
- get.assert_called_once_with(self.context, '1.2.3.4')
- self.assertIsNone(fixedip)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_network_host')
- def test_get_by_network_and_host(self, get):
- get.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.get_by_network_and_host(self.context,
- 123, 'host')
- get.assert_called_once_with(self.context, 123, 'host')
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_associate')
- def test_associate(self, associate):
- associate.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.associate(self.context, '1.2.3.4',
- uuids.instance)
- associate.assert_called_with(self.context, '1.2.3.4', uuids.instance,
- network_id=None, reserved=False,
- virtual_interface_id=None)
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_associate')
- def test_associate_with_vif(self, associate):
- associate.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.associate(self.context, '1.2.3.4',
- uuids.instance,
- vif_id=0)
- associate.assert_called_with(self.context, '1.2.3.4',
- uuids.instance,
- network_id=None, reserved=False,
- virtual_interface_id=0)
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_associate_pool')
- def test_associate_pool(self, associate):
- associate.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.associate_pool(self.context, 123,
- uuids.instance, 'host')
- associate.assert_called_with(self.context, 123,
- instance_uuid=uuids.instance,
- host='host', virtual_interface_id=None)
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_associate_pool')
- def test_associate_pool_with_vif(self, associate):
- associate.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP.associate_pool(self.context, 123,
- uuids.instance, 'host',
- vif_id=0)
- associate.assert_called_with(self.context, 123,
- instance_uuid=uuids.instance,
- host='host', virtual_interface_id=0)
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_disassociate')
- def test_disassociate_by_address(self, disassociate):
- fixed_ip.FixedIP.disassociate_by_address(self.context, '1.2.3.4')
- disassociate.assert_called_with(self.context, '1.2.3.4')
-
- @mock.patch('nova.db.api.fixed_ip_disassociate_all_by_timeout')
- def test_disassociate_all_by_timeout(self, disassociate):
- now = timeutils.utcnow()
- now_tz = timeutils.parse_isotime(
- utils.isotime(now)).replace(
- tzinfo=iso8601.UTC)
- disassociate.return_value = 123
- result = fixed_ip.FixedIP.disassociate_all_by_timeout(self.context,
- 'host', now)
- self.assertEqual(123, result)
- # NOTE(danms): be pedantic about timezone stuff
- args, kwargs = disassociate.call_args_list[0]
- self.assertEqual(now_tz, args[2])
- self.assertEqual((self.context, 'host'), args[:2])
- self.assertEqual({}, kwargs)
-
- @mock.patch('nova.db.api.fixed_ip_create')
- def test_create(self, create):
- create.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP(context=self.context, address='1.2.3.4')
- fixedip.create()
- create.assert_called_once_with(
- self.context, {'address': '1.2.3.4'})
- self._compare(fixedip, fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_update')
- def test_save(self, update):
- update.return_value = fake_fixed_ip
- fixedip = fixed_ip.FixedIP(context=self.context, address='1.2.3.4',
- instance_uuid=uuids.instance)
- self.assertRaises(exception.ObjectActionError, fixedip.save)
- fixedip.obj_reset_changes(['address'])
- fixedip.save()
- update.assert_called_once_with(self.context, '1.2.3.4',
- {'instance_uuid': uuids.instance})
-
- @mock.patch('nova.db.api.fixed_ip_disassociate')
- def test_disassociate(self, disassociate):
- fixedip = fixed_ip.FixedIP(context=self.context, address='1.2.3.4',
- instance_uuid=uuids.instance)
- fixedip.obj_reset_changes()
- fixedip.disassociate()
- disassociate.assert_called_once_with(self.context, '1.2.3.4')
- self.assertIsNone(fixedip.instance_uuid)
-
- @mock.patch('nova.db.api.fixed_ip_get_all')
- def test_get_all(self, get_all):
- get_all.return_value = [fake_fixed_ip]
- fixedips = fixed_ip.FixedIPList.get_all(self.context)
- self.assertEqual(1, len(fixedips))
- get_all.assert_called_once_with(self.context)
- self._compare(fixedips[0], fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_instance')
- def test_get_by_instance(self, get):
- get.return_value = [fake_fixed_ip]
- fixedips = fixed_ip.FixedIPList.get_by_instance_uuid(self.context,
- uuids.instance)
- self.assertEqual(1, len(fixedips))
- get.assert_called_once_with(self.context, uuids.instance)
- self._compare(fixedips[0], fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ip_get_by_host')
- def test_get_by_host(self, get):
- get.return_value = [fake_fixed_ip]
- fixedips = fixed_ip.FixedIPList.get_by_host(self.context, 'host')
- self.assertEqual(1, len(fixedips))
- get.assert_called_once_with(self.context, 'host')
- self._compare(fixedips[0], fake_fixed_ip)
-
- @mock.patch('nova.db.api.fixed_ips_by_virtual_interface')
- def test_get_by_virtual_interface_id(self, get):
- get.return_value = [fake_fixed_ip]
- fixedips = fixed_ip.FixedIPList.get_by_virtual_interface_id(
- self.context, 123)
- self.assertEqual(1, len(fixedips))
- get.assert_called_once_with(self.context, 123)
- self._compare(fixedips[0], fake_fixed_ip)
-
- def test_floating_ips_do_not_lazy_load(self):
- fixedip = fixed_ip.FixedIP()
- self.assertRaises(NotImplementedError, lambda: fixedip.floating_ips)
-
- @mock.patch('nova.db.api.fixed_ip_bulk_create')
- def test_bulk_create(self, bulk):
- fixed_ips = [fixed_ip.FixedIP(address='192.168.1.1'),
- fixed_ip.FixedIP(address='192.168.1.2')]
- fixed_ip.FixedIPList.bulk_create(self.context, fixed_ips)
- bulk.assert_called_once_with(self.context,
- [{'address': '192.168.1.1'},
- {'address': '192.168.1.2'}])
-
- @mock.patch('nova.db.api.network_get_associated_fixed_ips')
- def test_get_by_network(self, get):
- info = {'address': '1.2.3.4',
- 'instance_uuid': uuids.instance,
- 'network_id': 0,
- 'vif_id': 1,
- 'vif_address': 'de:ad:be:ee:f0:00',
- 'instance_hostname': 'fake-host',
- 'instance_updated': datetime.datetime(1955, 11, 5),
- 'instance_created': datetime.datetime(1955, 11, 5),
- 'allocated': True,
- 'leased': True,
- 'default_route': True,
- }
- get.return_value = [info]
- fixed_ips = fixed_ip.FixedIPList.get_by_network(
- self.context, {'id': 0}, host='fake-host')
- get.assert_called_once_with(self.context, 0, host='fake-host')
- self.assertEqual(1, len(fixed_ips))
- fip = fixed_ips[0]
- self.assertEqual('1.2.3.4', str(fip.address))
- self.assertEqual(uuids.instance, fip.instance_uuid)
- self.assertEqual(0, fip.network_id)
- self.assertEqual(1, fip.virtual_interface_id)
- self.assertTrue(fip.allocated)
- self.assertTrue(fip.leased)
- self.assertEqual(uuids.instance, fip.instance.uuid)
- self.assertEqual('fake-host', fip.instance.hostname)
- self.assertIsInstance(fip.instance.created_at, datetime.datetime)
- self.assertIsInstance(fip.instance.updated_at, datetime.datetime)
- self.assertEqual(1, fip.virtual_interface.id)
- self.assertEqual(info['vif_address'], fip.virtual_interface.address)
-
- @mock.patch('nova.db.api.network_get_associated_fixed_ips')
- def test_backport_default_route(self, mock_get):
- info = {'address': '1.2.3.4',
- 'instance_uuid': uuids.instance,
- 'network_id': 0,
- 'vif_id': 1,
- 'vif_address': 'de:ad:be:ee:f0:00',
- 'instance_hostname': 'fake-host',
- 'instance_updated': datetime.datetime(1955, 11, 5),
- 'instance_created': datetime.datetime(1955, 11, 5),
- 'allocated': True,
- 'leased': True,
- 'default_route': True,
- }
- mock_get.return_value = [info]
- fixed_ips = fixed_ip.FixedIPList.get_by_network(
- self.context, {'id': 0}, host='fake-host')
- primitive = fixed_ips[0].obj_to_primitive()
- self.assertIn('default_route', primitive['nova_object.data'])
- versions = ovo_base.obj_tree_get_versions('FixedIP')
- fixed_ips[0].obj_make_compatible_from_manifest(
- primitive['nova_object.data'],
- target_version='1.1',
- version_manifest=versions)
- self.assertNotIn('default_route', primitive['nova_object.data'])
-
- def test_get_count_by_project(self):
- instance = instance_obj.Instance(context=self.context,
- uuid=uuids.instance,
- project_id=self.context.project_id)
- instance.create()
- ip = fixed_ip.FixedIP(context=self.context,
- address='192.168.1.1',
- instance_uuid=instance.uuid)
- ip.create()
- self.assertEqual(1, fixed_ip.FixedIPList.get_count_by_project(
- self.context, self.context.project_id))
-
-
-class TestFixedIPObject(test_objects._LocalTest,
- _TestFixedIPObject):
- pass
-
-
-class TestRemoteFixedIPObject(test_objects._RemoteTest,
- _TestFixedIPObject):
- pass
diff --git a/nova/tests/unit/objects/test_floating_ip.py b/nova/tests/unit/objects/test_floating_ip.py
deleted file mode 100644
index 94291807a5..0000000000
--- a/nova/tests/unit/objects/test_floating_ip.py
+++ /dev/null
@@ -1,289 +0,0 @@
-# Copyright 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-import netaddr
-from oslo_versionedobjects import base as ovo_base
-
-from nova import exception
-from nova import objects
-from nova.objects import floating_ip
-from nova import test
-from nova.tests.unit.api.openstack import fakes
-from nova.tests.unit.objects import test_fixed_ip
-from nova.tests.unit.objects import test_network
-from nova.tests.unit.objects import test_objects
-
-fake_floating_ip = {
- 'created_at': None,
- 'updated_at': None,
- 'deleted_at': None,
- 'deleted': False,
- 'id': 123,
- 'address': '172.17.0.1',
- 'fixed_ip_id': None,
- 'project_id': None,
- 'host': None,
- 'auto_assigned': False,
- 'pool': None,
- 'interface': None,
- 'fixed_ip': None,
-}
-
-
-class _TestFloatingIPObject(object):
- def _compare(self, obj, db_obj):
- for field in obj.fields:
- if field in floating_ip.FLOATING_IP_OPTIONAL_ATTRS:
- if obj.obj_attr_is_set(field):
- obj_val = obj[field].id
- db_val = db_obj[field]['id']
- else:
- continue
- else:
- obj_val = obj[field]
- db_val = db_obj[field]
- if isinstance(obj_val, netaddr.IPAddress):
- obj_val = str(obj_val)
- self.assertEqual(db_val, obj_val)
-
- @mock.patch('nova.db.api.floating_ip_get')
- def test_get_by_id(self, get):
- db_floatingip = dict(fake_floating_ip,
- fixed_ip=test_fixed_ip.fake_fixed_ip)
- get.return_value = db_floatingip
- floatingip = floating_ip.FloatingIP.get_by_id(self.context, 123)
- get.assert_called_once_with(self.context, 123)
- self._compare(floatingip, db_floatingip)
-
- @mock.patch('nova.db.api.floating_ip_get_by_address')
- def test_get_by_address(self, get):
- get.return_value = fake_floating_ip
- floatingip = floating_ip.FloatingIP.get_by_address(self.context,
- '1.2.3.4')
- get.assert_called_once_with(self.context, '1.2.3.4')
- self._compare(floatingip, fake_floating_ip)
-
- @mock.patch('nova.db.api.floating_ip_get_pools')
- def test_get_pool_names(self, get):
- get.return_value = [{'name': 'a'}, {'name': 'b'}]
- self.assertEqual(['a', 'b'],
- floating_ip.FloatingIP.get_pool_names(self.context))
-
- @mock.patch('nova.db.api.floating_ip_allocate_address')
- def test_allocate_address(self, allocate):
- allocate.return_value = '1.2.3.4'
- self.assertEqual('1.2.3.4',
- floating_ip.FloatingIP.allocate_address(self.context,
- 'project',
- 'pool'))
- allocate.assert_called_with(self.context, 'project', 'pool',
- auto_assigned=False)
-
- @mock.patch('nova.db.api.floating_ip_fixed_ip_associate')
- def test_associate(self, associate):
- db_fixed = dict(test_fixed_ip.fake_fixed_ip,
- network=test_network.fake_network)
- associate.return_value = db_fixed
- floatingip = floating_ip.FloatingIP.associate(self.context,
- '172.17.0.1',
- '192.168.1.1',
- 'host')
- associate.assert_called_with(self.context, '172.17.0.1',
- '192.168.1.1', 'host')
- self.assertEqual(db_fixed['id'], floatingip.fixed_ip.id)
- self.assertEqual('172.17.0.1', str(floatingip.address))
- self.assertEqual('host', floatingip.host)
-
- @mock.patch('nova.db.api.floating_ip_deallocate')
- def test_deallocate(self, deallocate):
- floating_ip.FloatingIP.deallocate(self.context, '1.2.3.4')
- deallocate.assert_called_with(self.context, '1.2.3.4')
-
- @mock.patch('nova.db.api.floating_ip_destroy')
- def test_destroy(self, destroy):
- floating_ip.FloatingIP.destroy(self.context, '1.2.3.4')
- destroy.assert_called_with(self.context, '1.2.3.4')
-
- @mock.patch('nova.db.api.floating_ip_disassociate')
- def test_disassociate(self, disassociate):
- db_fixed = dict(test_fixed_ip.fake_fixed_ip,
- network=test_network.fake_network)
- disassociate.return_value = db_fixed
- floatingip = floating_ip.FloatingIP.disassociate(self.context,
- '1.2.3.4')
- disassociate.assert_called_with(self.context, '1.2.3.4')
- self.assertEqual(db_fixed['id'], floatingip.fixed_ip.id)
- self.assertEqual('1.2.3.4', str(floatingip.address))
-
- @mock.patch('nova.db.api.floating_ip_update')
- def test_save(self, update):
- update.return_value = fake_floating_ip
- floatingip = floating_ip.FloatingIP(context=self.context,
- id=123, address='1.2.3.4',
- host='foo')
- floatingip.obj_reset_changes(['address', 'id'])
- floatingip.save()
- self.assertEqual(set(), floatingip.obj_what_changed())
- update.assert_called_with(self.context, '1.2.3.4',
- {'host': 'foo'})
-
- def test_save_errors(self):
- floatingip = floating_ip.FloatingIP(context=self.context,
- id=123, host='foo')
- floatingip.obj_reset_changes()
- floating_ip.address = '1.2.3.4'
- self.assertRaises(exception.ObjectActionError, floatingip.save)
-
- floatingip.obj_reset_changes()
- floatingip.fixed_ip_id = 1
- self.assertRaises(exception.ObjectActionError, floatingip.save)
-
- @mock.patch('nova.db.api.floating_ip_update')
- def test_save_no_fixedip(self, update):
- update.return_value = fake_floating_ip
- floatingip = floating_ip.FloatingIP(context=self.context,
- id=123)
- floatingip.fixed_ip = objects.FixedIP(context=self.context,
- id=456)
- self.assertNotIn('fixed_ip', update.calls[1])
-
- @mock.patch('nova.db.api.floating_ip_get_all')
- def test_get_all(self, get):
- get.return_value = [fake_floating_ip]
- floatingips = floating_ip.FloatingIPList.get_all(self.context)
- self.assertEqual(1, len(floatingips))
- self._compare(floatingips[0], fake_floating_ip)
- get.assert_called_with(self.context)
-
- @mock.patch('nova.db.api.floating_ip_get_all_by_host')
- def test_get_by_host(self, get):
- get.return_value = [fake_floating_ip]
- floatingips = floating_ip.FloatingIPList.get_by_host(self.context,
- 'host')
- self.assertEqual(1, len(floatingips))
- self._compare(floatingips[0], fake_floating_ip)
- get.assert_called_with(self.context, 'host')
-
- @mock.patch('nova.db.api.floating_ip_get_all_by_project')
- def test_get_by_project(self, get):
- get.return_value = [fake_floating_ip]
- floatingips = floating_ip.FloatingIPList.get_by_project(self.context,
- 'project')
- self.assertEqual(1, len(floatingips))
- self._compare(floatingips[0], fake_floating_ip)
- get.assert_called_with(self.context, 'project')
-
- @mock.patch('nova.db.api.floating_ip_get_by_fixed_address')
- def test_get_by_fixed_address(self, get):
- get.return_value = [fake_floating_ip]
- floatingips = floating_ip.FloatingIPList.get_by_fixed_address(
- self.context, '1.2.3.4')
- self.assertEqual(1, len(floatingips))
- self._compare(floatingips[0], fake_floating_ip)
- get.assert_called_with(self.context, '1.2.3.4')
-
- @mock.patch('nova.db.api.floating_ip_get_by_fixed_ip_id')
- def test_get_by_fixed_ip_id(self, get):
- get.return_value = [fake_floating_ip]
- floatingips = floating_ip.FloatingIPList.get_by_fixed_ip_id(
- self.context, 123)
- self.assertEqual(1, len(floatingips))
- self._compare(floatingips[0], fake_floating_ip)
- get.assert_called_with(self.context, 123)
-
- @mock.patch('nova.db.api.instance_floating_address_get_all')
- def test_get_addresses_by_instance(self, get_all):
- expected = ['1.2.3.4', '4.5.6.7']
- get_all.return_value = list(expected)
- ips = floating_ip.FloatingIP.get_addresses_by_instance(
- self.context, {'uuid': '1234'})
- self.assertEqual(expected, ips)
- get_all.assert_called_once_with(self.context, '1234')
-
- def test_make_ip_info(self):
- result = objects.FloatingIPList.make_ip_info('1.2.3.4', 'pool', 'eth0')
- self.assertEqual({'address': '1.2.3.4', 'pool': 'pool',
- 'interface': 'eth0'},
- result)
-
- @mock.patch('nova.db.api.floating_ip_bulk_create')
- def test_bulk_create(self, create_mock):
- def fake_create(ctxt, ip_info, want_result=False):
- return [{'id': 1, 'address': ip['address'], 'fixed_ip_id': 1,
- 'project_id': fakes.FAKE_PROJECT_ID, 'host': 'host',
- 'auto_assigned': False, 'pool': ip['pool'],
- 'interface': ip['interface'], 'fixed_ip': None,
- 'created_at': None, 'updated_at': None,
- 'deleted_at': None, 'deleted': False}
- for ip in ip_info]
-
- create_mock.side_effect = fake_create
- ips = [objects.FloatingIPList.make_ip_info('1.1.1.1', 'pool', 'eth0'),
- objects.FloatingIPList.make_ip_info('1.1.1.2', 'loop', 'eth1')]
- result = objects.FloatingIPList.create(None, ips)
- self.assertIsNone(result)
- result = objects.FloatingIPList.create(None, ips, want_result=True)
- self.assertEqual('1.1.1.1', str(result[0].address))
- self.assertEqual('1.1.1.2', str(result[1].address))
-
- @mock.patch('nova.db.api.floating_ip_bulk_destroy')
- def test_bulk_destroy(self, destroy_mock):
- ips = [{'address': '1.2.3.4'}, {'address': '4.5.6.7'}]
- objects.FloatingIPList.destroy(None, ips)
- destroy_mock.assert_called_once_with(None, ips)
-
- def test_backport_fixedip_1_1(self):
- floating = objects.FloatingIP()
- fixed = objects.FixedIP()
- floating.fixed_ip = fixed
- versions = ovo_base.obj_tree_get_versions('FloatingIP')
- versions['FixedIP'] = '1.1'
- primitive = floating.obj_to_primitive(target_version='1.1',
- version_manifest=versions)
- self.assertEqual('1.1',
- primitive['nova_object.data']['fixed_ip']['nova_object.version'])
-
- def test_get_count_by_project(self):
- ips = [objects.FloatingIPList.make_ip_info('1.1.1.1', 'pool', 'eth0')]
- objects.FloatingIPList.create(self.context, ips)
- floating_ip.FloatingIP.allocate_address(self.context,
- self.context.project_id,
- 'pool')
- self.assertEqual(1, floating_ip.FloatingIPList.get_count_by_project(
- self.context, self.context.project_id))
-
-
-class TestFloatingIPObject(test_objects._LocalTest,
- _TestFloatingIPObject):
- pass
-
-
-class TestRemoteFloatingIPObject(test_objects._RemoteTest,
- _TestFloatingIPObject):
- pass
-
-
-class TestNeutronFloatingIPObject(test.NoDBTestCase):
- def test_create_with_uuid_id(self):
- uuid = 'fc9b4956-fd97-11e5-86aa-5e5517507c66'
- fip = objects.floating_ip.NeutronFloatingIP(id=uuid)
-
- self.assertEqual(uuid, fip.id)
-
- def test_create_with_uuid_fixed_id(self):
- uuid = 'fc9b4c3a-fd97-11e5-86aa-5e5517507c66'
- fip = objects.floating_ip.NeutronFloatingIP(fixed_ip_id=uuid)
-
- self.assertEqual(uuid, fip.fixed_ip_id)
diff --git a/nova/tests/unit/objects/test_network.py b/nova/tests/unit/objects/test_network.py
deleted file mode 100644
index 047ed0c38a..0000000000
--- a/nova/tests/unit/objects/test_network.py
+++ /dev/null
@@ -1,239 +0,0 @@
-# Copyright 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-import netaddr
-from oslo_utils.fixture import uuidsentinel as uuids
-
-from nova.objects import network as network_obj
-from nova.tests.unit.objects import test_objects
-
-
-fake_network = {
- 'deleted': False,
- 'created_at': None,
- 'updated_at': None,
- 'deleted_at': None,
- 'id': 1,
- 'label': 'Fake Network',
- 'injected': False,
- 'cidr': '192.168.1.0/24',
- 'cidr_v6': '1234::/64',
- 'multi_host': False,
- 'netmask': '255.255.255.0',
- 'gateway': '192.168.1.1',
- 'broadcast': '192.168.1.255',
- 'netmask_v6': 64,
- 'gateway_v6': '1234::1',
- 'bridge': 'br100',
- 'bridge_interface': 'eth0',
- 'dns1': '8.8.8.8',
- 'dns2': '8.8.4.4',
- 'vlan': None,
- 'vpn_public_address': None,
- 'vpn_public_port': None,
- 'vpn_private_address': None,
- 'dhcp_start': '192.168.1.10',
- 'rxtx_base': None,
- 'project_id': None,
- 'priority': None,
- 'host': None,
- 'uuid': uuids.network_instance,
- 'mtu': None,
- 'dhcp_server': '192.168.1.1',
- 'enable_dhcp': True,
- 'share_address': False,
-}
-
-
-class _TestNetworkObject(object):
- def _compare(self, obj, db_obj):
- for field in obj.fields:
- db_val = db_obj[field]
- obj_val = obj[field]
- if isinstance(obj_val, netaddr.IPAddress):
- obj_val = str(obj_val)
- if isinstance(obj_val, netaddr.IPNetwork):
- obj_val = str(obj_val)
- if field == 'netmask_v6':
- db_val = str(netaddr.IPNetwork('1::/%i' % db_val).netmask)
- self.assertEqual(db_val, obj_val)
-
- @mock.patch('nova.db.api.network_get')
- def test_get_by_id(self, get):
- get.return_value = fake_network
- network = network_obj.Network.get_by_id(self.context, 'foo')
- self._compare(network, fake_network)
- get.assert_called_once_with(self.context, 'foo',
- project_only='allow_none')
-
- @mock.patch('nova.db.api.network_get_by_uuid')
- def test_get_by_uuid(self, get):
- get.return_value = fake_network
- network = network_obj.Network.get_by_uuid(self.context, 'foo')
- self._compare(network, fake_network)
- get.assert_called_once_with(self.context, 'foo')
-
- @mock.patch('nova.db.api.network_get_by_cidr')
- def test_get_by_cidr(self, get):
- get.return_value = fake_network
- network = network_obj.Network.get_by_cidr(self.context,
- '192.168.1.0/24')
- self._compare(network, fake_network)
- get.assert_called_once_with(self.context, '192.168.1.0/24')
-
- @mock.patch('nova.db.api.network_update')
- @mock.patch('nova.db.api.network_set_host')
- def test_save(self, set_host, update):
- result = dict(fake_network, injected=True)
- network = network_obj.Network._from_db_object(self.context,
- network_obj.Network(),
- fake_network)
- network.obj_reset_changes()
- network.save()
- network.label = 'bar'
- update.return_value = result
- network.save()
- update.assert_called_once_with(self.context, network.id,
- {'label': 'bar'})
- self.assertFalse(set_host.called)
- self._compare(network, result)
-
- @mock.patch('nova.db.api.network_update')
- @mock.patch('nova.db.api.network_set_host')
- @mock.patch('nova.db.api.network_get')
- def test_save_with_host(self, get, set_host, update):
- result = dict(fake_network, injected=True)
- network = network_obj.Network._from_db_object(self.context,
- network_obj.Network(),
- fake_network)
- network.obj_reset_changes()
- network.host = 'foo'
- get.return_value = result
- network.save()
- set_host.assert_called_once_with(self.context, network.id, 'foo')
- self.assertFalse(update.called)
- self._compare(network, result)
-
- @mock.patch('nova.db.api.network_update')
- @mock.patch('nova.db.api.network_set_host')
- def test_save_with_host_and_other(self, set_host, update):
- result = dict(fake_network, injected=True)
- network = network_obj.Network._from_db_object(self.context,
- network_obj.Network(),
- fake_network)
- network.obj_reset_changes()
- network.host = 'foo'
- network.label = 'bar'
- update.return_value = result
- network.save()
- set_host.assert_called_once_with(self.context, network.id, 'foo')
- update.assert_called_once_with(self.context, network.id,
- {'label': 'bar'})
- self._compare(network, result)
-
- @mock.patch('nova.db.api.network_associate')
- def test_associate(self, associate):
- network_obj.Network.associate(self.context, 'project',
- network_id=123)
- associate.assert_called_once_with(self.context, 'project',
- network_id=123, force=False)
-
- @mock.patch('nova.db.api.network_disassociate')
- def test_disassociate(self, disassociate):
- network_obj.Network.disassociate(self.context, 123,
- host=True, project=True)
- disassociate.assert_called_once_with(self.context, 123, True, True)
-
- @mock.patch('nova.db.api.network_create_safe')
- def test_create(self, create):
- create.return_value = fake_network
- network = network_obj.Network(context=self.context, label='foo')
- network.create()
- create.assert_called_once_with(self.context, {'label': 'foo'})
- self._compare(network, fake_network)
-
- @mock.patch('nova.db.api.network_delete_safe')
- def test_destroy(self, delete):
- network = network_obj.Network(context=self.context, id=123)
- network.destroy()
- delete.assert_called_once_with(self.context, 123)
- self.assertTrue(network.deleted)
- self.assertNotIn('deleted', network.obj_what_changed())
-
- @mock.patch('nova.db.api.network_get_all')
- def test_get_all(self, get_all):
- get_all.return_value = [fake_network]
- networks = network_obj.NetworkList.get_all(self.context)
- self.assertEqual(1, len(networks))
- get_all.assert_called_once_with(self.context, 'allow_none')
- self._compare(networks[0], fake_network)
-
- @mock.patch('nova.db.api.network_get_all_by_uuids')
- def test_get_all_by_uuids(self, get_all):
- get_all.return_value = [fake_network]
- networks = network_obj.NetworkList.get_by_uuids(self.context,
- ['foo'])
- self.assertEqual(1, len(networks))
- get_all.assert_called_once_with(self.context, ['foo'], 'allow_none')
- self._compare(networks[0], fake_network)
-
- @mock.patch('nova.db.api.network_get_all_by_host')
- def test_get_all_by_host(self, get_all):
- get_all.return_value = [fake_network]
- networks = network_obj.NetworkList.get_by_host(self.context, 'host')
- self.assertEqual(1, len(networks))
- get_all.assert_called_once_with(self.context, 'host')
- self._compare(networks[0], fake_network)
-
- @mock.patch('nova.db.api.network_in_use_on_host')
- def test_in_use_on_host(self, in_use):
- in_use.return_value = True
- self.assertTrue(network_obj.Network.in_use_on_host(self.context,
- 123, 'foo'))
- in_use.assert_called_once_with(self.context, 123, 'foo')
-
- @mock.patch('nova.db.api.project_get_networks')
- def test_get_all_by_project(self, get_nets):
- get_nets.return_value = [fake_network]
- networks = network_obj.NetworkList.get_by_project(self.context, 123)
- self.assertEqual(1, len(networks))
- get_nets.assert_called_once_with(self.context, 123, associate=True)
- self._compare(networks[0], fake_network)
-
- def test_compat_version_1_1(self):
- network = network_obj.Network._from_db_object(self.context,
- network_obj.Network(),
- fake_network)
- primitive = network.obj_to_primitive(target_version='1.1')
- self.assertNotIn('mtu', primitive['nova_object.data'])
- self.assertNotIn('enable_dhcp', primitive['nova_object.data'])
- self.assertNotIn('dhcp_server', primitive['nova_object.data'])
- self.assertNotIn('share_address', primitive['nova_object.data'])
-
- primitive = network.obj_to_primitive(target_version='1.2')
- self.assertIn('mtu', primitive['nova_object.data'])
- self.assertIn('enable_dhcp', primitive['nova_object.data'])
- self.assertIn('dhcp_server', primitive['nova_object.data'])
- self.assertIn('share_address', primitive['nova_object.data'])
-
-
-class TestNetworkObject(test_objects._LocalTest,
- _TestNetworkObject):
- pass
-
-
-class TestRemoteNetworkObject(test_objects._RemoteTest,
- _TestNetworkObject):
- pass
diff --git a/nova/tests/unit/objects/test_objects.py b/nova/tests/unit/objects/test_objects.py
index d0191e0ffd..70f5670080 100644
--- a/nova/tests/unit/objects/test_objects.py
+++ b/nova/tests/unit/objects/test_objects.py
@@ -1056,12 +1056,8 @@ object_data = {
'DiskMetadata': '1.0-e7a0f1ccccf10d26a76b28e7492f3788',
'EC2Ids': '1.0-474ee1094c7ec16f8ce657595d8c49d9',
'EC2InstanceMapping': '1.0-a4556eb5c5e94c045fe84f49cf71644f',
- 'FixedIP': '1.14-53e1c10b539f1a82fe83b1af4720efae',
- 'FixedIPList': '1.15-07b6261cef836cb09d2d8673f68ece15',
'Flavor': '1.2-4ce99b41327bb230262e5a8f45ff0ce3',
'FlavorList': '1.1-52b5928600e7ca973aa4fc1e46f3934c',
- 'FloatingIP': '1.10-52a67d52d85eb8b3f324a5b7935a335b',
- 'FloatingIPList': '1.12-e4debd21fddb12cf40d36f737225fa9d',
'HVSpec': '1.2-de06bcec472a2f04966b855a49c46b41',
'HostMapping': '1.0-1a3390a696792a552ab7bd31a77ba9ac',
'HostMappingList': '1.1-18ac2bfb8c1eb5545bed856da58a79bc',
@@ -1103,9 +1099,7 @@ object_data = {
'NUMAPagesTopology': '1.1-edab9fa2dc43c117a38d600be54b4542',
'NUMATopology': '1.2-c63fad38be73b6afd04715c9c1b29220',
'NUMATopologyLimits': '1.1-4235c5da7a76c7e36075f0cd2f5cf922',
- 'Network': '1.2-a977ab383aa462a479b2fae8211a5dde',
'NetworkInterfaceMetadata': '1.2-6f3d480b40fe339067b1c0dd4d656716',
- 'NetworkList': '1.2-69eca910d8fa035dfecd8ba10877ee59',
'NetworkMetadata': '1.0-2cb8d21b34f87b0261d3e1d1ae5cf218',
'NetworkRequest': '1.2-af1ff2d986999fbb79377712794d82aa',
'NetworkRequestList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
@@ -1130,8 +1124,6 @@ object_data = {
'SchedulerRetries': '1.1-3c9c8b16143ebbb6ad7030e999d14cc0',
'SecurityGroup': '1.2-86d67d8d3ab0c971e1dc86e02f9524a8',
'SecurityGroupList': '1.1-c655ed13298e630f4d398152f7d08d71',
- 'SecurityGroupRule': '1.1-ae1da17b79970012e8536f88cb3c6b29',
- 'SecurityGroupRuleList': '1.2-0005c47fcd0fb78dd6d7fd32a1409f5b',
'Selection': '1.1-548e3c2f04da2a61ceaf9c4e1589f264',
'Service': '1.22-8a740459ab9bf258a19c8fcb875c2d9a',
'ServiceList': '1.19-5325bce13eebcbf22edc9678285270cc',
@@ -1221,8 +1213,7 @@ class TestObjectVersions(test.NoDBTestCase):
init_args = {}
init_kwargs = {}
- checker = fixture.ObjectVersionChecker(
- base.NovaObjectRegistry.obj_classes())
+ checker = fixture.ObjectVersionChecker(get_nova_objects())
checker.test_compatibility_routines(use_manifest=True,
init_args=init_args,
init_kwargs=init_kwargs)
diff --git a/nova/tests/unit/objects/test_security_group_rule.py b/nova/tests/unit/objects/test_security_group_rule.py
deleted file mode 100644
index 9bbe573e43..0000000000
--- a/nova/tests/unit/objects/test_security_group_rule.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-from oslo_utils.fixture import uuidsentinel as uuids
-from oslo_versionedobjects import exception as ovo_exc
-
-from nova.db import api as db
-from nova import objects
-from nova.tests.unit.objects import test_objects
-from nova.tests.unit.objects import test_security_group
-
-
-fake_rule = {
- 'created_at': None,
- 'updated_at': None,
- 'deleted_at': None,
- 'deleted': False,
- 'id': 1,
- 'protocol': 'tcp',
- 'from_port': 22,
- 'to_port': 22,
- 'cidr': '0.0.0.0/0',
- }
-
-
-class _TestSecurityGroupRuleObject(object):
- def test_get_by_id(self):
- with mock.patch.object(db, 'security_group_rule_get') as sgrg:
- sgrg.return_value = fake_rule
- rule = objects.SecurityGroupRule.get_by_id(
- self.context, 1)
- for field in fake_rule:
- if field == 'cidr':
- self.assertEqual(fake_rule[field], str(getattr(rule,
- field)))
- else:
- self.assertEqual(fake_rule[field], getattr(rule, field))
- sgrg.assert_called_with(self.context, 1)
-
- def test_get_by_security_group(self):
- secgroup = objects.SecurityGroup()
- secgroup.id = 123
- rule = dict(fake_rule)
- rule['grantee_group'] = dict(test_security_group.fake_secgroup, id=123)
- stupid_method = 'security_group_rule_get_by_security_group'
- with mock.patch.object(db, stupid_method) as sgrgbsg:
- sgrgbsg.return_value = [rule]
- rules = (objects.SecurityGroupRuleList.
- get_by_security_group(self.context, secgroup))
- self.assertEqual(1, len(rules))
- self.assertEqual(123, rules[0].grantee_group.id)
-
- @mock.patch.object(db, 'security_group_rule_create',
- return_value=fake_rule)
- def test_create(self, db_mock):
- rule = objects.SecurityGroupRule(context=self.context)
- rule.protocol = 'tcp'
- secgroup = objects.SecurityGroup()
- secgroup.id = 123
- parentgroup = objects.SecurityGroup()
- parentgroup.id = 223
- rule.grantee_group = secgroup
- rule.parent_group = parentgroup
- rule.create()
- updates = db_mock.call_args[0][1]
- self.assertEqual(fake_rule['id'], rule.id)
- self.assertEqual(updates['group_id'], rule.grantee_group.id)
- self.assertEqual(updates['parent_group_id'], rule.parent_group.id)
-
- @mock.patch.object(db, 'security_group_rule_create',
- return_value=fake_rule)
- def test_set_id_failure(self, db_mock):
- rule = objects.SecurityGroupRule(context=self.context)
- rule.create()
- self.assertRaises(ovo_exc.ReadOnlyFieldError, setattr,
- rule, 'id', 124)
-
-
-class TestSecurityGroupRuleObject(test_objects._LocalTest,
- _TestSecurityGroupRuleObject):
- pass
-
-
-class TestSecurityGroupRuleObjectRemote(test_objects._RemoteTest,
- _TestSecurityGroupRuleObject):
- pass
-
-
-fake_rules = [
- dict(fake_rule, id=1, grantee_group=test_security_group.fake_secgroup),
- dict(fake_rule, id=2, grantee_group=test_security_group.fake_secgroup),
-]
-
-
-class _TestSecurityGroupRuleListObject(object):
- @mock.patch('nova.db.api.security_group_rule_get_by_instance')
- def test_get_by_instance(self, mock_get):
- mock_get.return_value = fake_rules
- instance = objects.Instance(uuid=uuids.instance)
- rules = objects.SecurityGroupRuleList.get_by_instance(self.context,
- instance)
- mock_get.assert_called_once_with(self.context, instance.uuid)
- self.assertEqual(2, len(rules))
- self.assertEqual([1, 2], [x.id for x in rules])
-
-
-class TestSecurityGroupRuleListObject(test_objects._LocalTest,
- _TestSecurityGroupRuleListObject):
- pass
-
-
-class TestSecurityGroupRuleListObjectRemote(test_objects._RemoteTest,
- _TestSecurityGroupRuleListObject):
- pass