summaryrefslogtreecommitdiff
path: root/neutron/tests/unit/db
diff options
context:
space:
mode:
Diffstat (limited to 'neutron/tests/unit/db')
-rw-r--r--neutron/tests/unit/db/metering/test_metering_db.py42
-rw-r--r--neutron/tests/unit/db/test_agentschedulers_db.py63
-rw-r--r--neutron/tests/unit/db/test_db_base_plugin_v2.py567
-rw-r--r--neutron/tests/unit/db/test_dvr_mac_db.py6
-rw-r--r--neutron/tests/unit/db/test_ipam_backend_mixin.py6
-rw-r--r--neutron/tests/unit/db/test_ipam_pluggable_backend.py3
-rw-r--r--neutron/tests/unit/db/test_l3_db.py3
-rw-r--r--neutron/tests/unit/db/test_ovn_revision_numbers_db.py2
8 files changed, 374 insertions, 318 deletions
diff --git a/neutron/tests/unit/db/metering/test_metering_db.py b/neutron/tests/unit/db/metering/test_metering_db.py
index 35d7f733c1..1c2af72a4e 100644
--- a/neutron/tests/unit/db/metering/test_metering_db.py
+++ b/neutron/tests/unit/db/metering/test_metering_db.py
@@ -16,7 +16,6 @@ import contextlib
from neutron_lib.api.definitions import metering as metering_apidef
from neutron_lib import constants as n_consts
-from neutron_lib import context
from neutron_lib.db import constants as db_const
from neutron_lib.plugins import constants
from oslo_utils import uuidutils
@@ -42,18 +41,12 @@ _fake_uuid = uuidutils.generate_uuid
class MeteringPluginDbTestCaseMixin(object):
def _create_metering_label(self, fmt, name, description, **kwargs):
data = {'metering_label': {'name': name,
- 'tenant_id': kwargs.get('tenant_id',
- 'test-tenant'),
'shared': kwargs.get('shared', False),
'description': description}}
- req = self.new_create_request('metering-labels', data,
- fmt)
-
- if kwargs.get('set_context') and 'tenant_id' in kwargs:
- # create a specific auth context for this request
- req.environ['neutron.context'] = (
- context.Context('', kwargs['tenant_id'],
- is_admin=kwargs.get('is_admin', True)))
+ req = self.new_create_request(
+ 'metering-labels', data, fmt,
+ tenant_id=kwargs.get('tenant_id', self._tenant_id),
+ as_admin=kwargs.get('is_admin', True))
return req.get_response(self.ext_api)
@@ -71,7 +64,6 @@ class MeteringPluginDbTestCaseMixin(object):
data = {
'metering_label_rule': {
'metering_label_id': metering_label_id,
- 'tenant_id': kwargs.get('tenant_id', 'test-tenant'),
'direction': direction,
'excluded': excluded,
}
@@ -87,13 +79,10 @@ class MeteringPluginDbTestCaseMixin(object):
data['metering_label_rule']['destination_ip_prefix'] =\
destination_ip_prefix
- req = self.new_create_request('metering-label-rules',
- data, fmt)
-
- if kwargs.get('set_context') and 'tenant_id' in kwargs:
- # create a specific auth context for this request
- req.environ['neutron.context'] = (
- context.Context('', kwargs['tenant_id']))
+ req = self.new_create_request(
+ 'metering-label-rules', data, fmt,
+ tenant_id=kwargs.get('tenant_id', self._tenant_id),
+ as_admin=kwargs.get('is_admin', True))
return req.get_response(self.ext_api)
@@ -203,7 +192,8 @@ class TestMetering(MeteringPluginDbTestCase):
with self.metering_label(name, description) as metering_label:
metering_label_id = metering_label['metering_label']['id']
- self._delete('metering-labels', metering_label_id, 204)
+ self._delete('metering-labels', metering_label_id, 204,
+ as_admin=True)
def test_list_metering_label(self):
name = 'my label'
@@ -258,7 +248,7 @@ class TestMetering(MeteringPluginDbTestCase):
remote_ip_prefix=remote_ip_prefix) as label_rule:
rule_id = label_rule['metering_label_rule']['id']
self._update('metering-label-rules', rule_id, data,
- webob.exc.HTTPNotImplemented.code)
+ webob.exc.HTTPNotImplemented.code, as_admin=True)
def test_delete_metering_label_rule(self):
name = 'my label'
@@ -275,7 +265,8 @@ class TestMetering(MeteringPluginDbTestCase):
metering_label_id, direction, excluded,
remote_ip_prefix=remote_ip_prefix) as label_rule:
rule_id = label_rule['metering_label_rule']['id']
- self._delete('metering-label-rules', rule_id, 204)
+ self._delete('metering-label-rules', rule_id, 204,
+ as_admin=True)
def test_list_metering_label_rule(self):
name = 'my label'
@@ -297,7 +288,7 @@ class TestMetering(MeteringPluginDbTestCase):
metering_label_rule = (v1, v2)
self._test_list_resources('metering-label-rule',
- metering_label_rule)
+ metering_label_rule, as_admin=True)
def test_create_metering_label_rules(self):
name = 'my label'
@@ -319,7 +310,7 @@ class TestMetering(MeteringPluginDbTestCase):
metering_label_rule = (v1, v2)
self._test_list_resources('metering-label-rule',
- metering_label_rule)
+ metering_label_rule, as_admin=True)
def test_create_overlap_metering_label_rules(self):
name = 'my label'
@@ -365,4 +356,5 @@ class TestMetering(MeteringPluginDbTestCase):
metering_label_rule = (v1, v2)
self._test_list_resources('metering-label-rule',
- metering_label_rule)
+ metering_label_rule,
+ as_admin=True)
diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py
index d503aee1ec..df4edfaa9b 100644
--- a/neutron/tests/unit/db/test_agentschedulers_db.py
+++ b/neutron/tests/unit/db/test_agentschedulers_db.py
@@ -45,6 +45,7 @@ from neutron.db.models import agent as agent_model
from neutron.extensions import l3agentscheduler
from neutron.objects import agent as ag_obj
from neutron.objects import l3agent as rb_obj
+from neutron import policy
from neutron.tests.common import helpers
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
@@ -78,18 +79,21 @@ class AgentSchedulerTestMixIn(object):
def _path_req(self, path, method='GET', data=None,
query_string=None,
- admin_context=True):
+ admin_context=True,
+ req_tenant_id=None):
content_type = 'application/%s' % self.fmt
body = None
if data is not None: # empty dict is valid
body = wsgi.Serializer().serialize(data, content_type)
+ roles = ['member', 'reader']
+ req_tenant_id = req_tenant_id or self._tenant_id
if admin_context:
- return testlib_api.create_request(
- path, body, content_type, method, query_string=query_string)
- else:
- return testlib_api.create_request(
- path, body, content_type, method, query_string=query_string,
- context=context.Context('', 'tenant_id'))
+ roles.append('admin')
+ req = testlib_api.create_request(
+ path, body, content_type, method, query_string=query_string)
+ req.environ['neutron.context'] = context.Context(
+ '', req_tenant_id, roles=roles, is_admin=admin_context)
+ return req
def _path_create_request(self, path, data, admin_context=True):
return self._path_req(path, method='POST', data=data,
@@ -218,7 +222,7 @@ class AgentSchedulerTestMixIn(object):
new_agent = {}
new_agent['agent'] = {}
new_agent['agent']['admin_state_up'] = admin_state_up
- self._update('agents', agent_id, new_agent)
+ self._update('agents', agent_id, new_agent, as_admin=True)
def _get_agent_id(self, agent_type, host):
agents = self._list_agents()
@@ -269,6 +273,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
self.dhcp_notify_p = mock.patch(
'neutron.extensions.dhcpagentscheduler.notify')
self.patched_dhcp_notify = self.dhcp_notify_p.start()
+ policy.init()
class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
@@ -911,10 +916,12 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertNotEqual(agent['host'], new_agent_host)
def test_router_auto_schedule_with_invalid_router(self):
- with self.router() as router:
+ project_id = uuidutils.generate_uuid()
+ with self.router(project_id=project_id) as router:
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
- self._delete('routers', router['router']['id'])
+ self._delete('routers', router['router']['id'],
+ tenant_id=project_id)
# deleted router
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
@@ -1106,19 +1113,22 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertEqual(0, len(router_ids))
def test_router_without_l3_agents(self):
+ project_id = uuidutils.generate_uuid()
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
- data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
+ data = {'router': {'tenant_id': project_id}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {
'network_id': s['subnet']['network_id']}
- router_req = self.new_create_request('routers', data, self.fmt)
+ router_req = self.new_create_request(
+ 'routers', data, self.fmt, tenant_id=project_id)
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
l3agents = (
self.l3plugin.get_l3_agents_hosting_routers(
self.adminContext, [router['router']['id']]))
- self._delete('routers', router['router']['id'])
+ self._delete(
+ 'routers', router['router']['id'], tenant_id=project_id)
self.assertEqual(0, len(l3agents))
def test_dvr_router_scheduling_to_only_dvr_snat_agent(self):
@@ -1217,26 +1227,30 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertEqual(agent['id'], new_agent['id'])
def test_router_sync_data(self):
- with self.subnet() as s1,\
- self.subnet(cidr='10.0.2.0/24') as s2,\
- self.subnet(cidr='10.0.3.0/24') as s3:
+ project_id = uuidutils.generate_uuid()
+ with self.subnet(project_id=project_id) as s1,\
+ self.subnet(project_id=project_id, cidr='10.0.2.0/24') as s2,\
+ self.subnet(project_id=project_id, cidr='10.0.3.0/24') as s3:
self._register_agent_states()
self._set_net_external(s1['subnet']['network_id'])
- data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
+ data = {'router': {'tenant_id': project_id}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {
'network_id': s1['subnet']['network_id']}
- router_req = self.new_create_request('routers', data, self.fmt)
+ router_req = self.new_create_request(
+ 'routers', data, self.fmt, tenant_id=project_id)
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
self._router_interface_action('add',
router['router']['id'],
s2['subnet']['id'],
- None)
+ None,
+ tenant_id=project_id)
self._router_interface_action('add',
router['router']['id'],
s3['subnet']['id'],
- None)
+ None,
+ tenant_id=project_id)
l3agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(1, len(l3agents['agents']))
@@ -1267,7 +1281,8 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self._router_interface_action('remove',
router['router']['id'],
s2['subnet']['id'],
- None)
+ None,
+ tenant_id=project_id)
l3agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(1,
@@ -1275,8 +1290,10 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self._router_interface_action('remove',
router['router']['id'],
s3['subnet']['id'],
- None)
- self._delete('routers', router['router']['id'])
+ None,
+ tenant_id=project_id)
+ self._delete('routers', router['router']['id'],
+ tenant_id=project_id)
def _test_router_add_to_l3_agent(self, admin_state_up=True):
with self.router() as router1:
diff --git a/neutron/tests/unit/db/test_db_base_plugin_v2.py b/neutron/tests/unit/db/test_db_base_plugin_v2.py
index a25ccc4f0d..b66492c3ea 100644
--- a/neutron/tests/unit/db/test_db_base_plugin_v2.py
+++ b/neutron/tests/unit/db/test_db_base_plugin_v2.py
@@ -246,60 +246,117 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
query_string=params, context=context,
headers=headers)
+ def _admin_req(self, method, resource, data=None, fmt=None, id=None,
+ params=None, action=None, subresource=None, sub_id=None,
+ ctx=None, headers=None, tenant_id=None):
+ tenant_id = tenant_id or self._tenant_id
+ req = self._req(method, resource, data, fmt, id, params, action,
+ subresource, sub_id, ctx, headers)
+ req.environ['neutron.context'] = context.Context(
+ '', tenant_id, is_admin=True,
+ roles=['admin', 'member', 'reader'])
+ return req
+
+ def _member_req(self, method, resource, data=None, fmt=None, id=None,
+ params=None, action=None, subresource=None, sub_id=None,
+ ctx=None, headers=None, tenant_id=None):
+ tenant_id = tenant_id or self._tenant_id
+ req = self._req(method, resource, data, fmt, id, params, action,
+ subresource, sub_id, ctx, headers)
+ req.environ['neutron.context'] = context.Context(
+ '', tenant_id, roles=['member', 'reader'])
+ return req
+
+ def _reader_req(self, method, resource, data=None, fmt=None, id=None,
+ params=None, action=None, subresource=None, sub_id=None,
+ ctx=None, headers=None, tenant_id=None):
+ tenant_id = tenant_id or self._tenant_id
+ req = self._req(method, resource, data, fmt, id, params, action,
+ subresource, sub_id, ctx, headers)
+ req.environ['neutron.context'] = context.Context(
+ '', tenant_id, roles=['reader'])
+ return req
+
def new_create_request(self, resource, data, fmt=None, id=None,
- subresource=None, context=None):
- return self._req('POST', resource, data, fmt, id=id,
- subresource=subresource, context=context)
+ subresource=None, context=None, tenant_id=None,
+ as_admin=False):
+ tenant_id = tenant_id or self._tenant_id
+ if as_admin:
+ return self._admin_req(
+ 'POST', resource, data, fmt, id=id,
+ subresource=subresource, ctx=context, tenant_id=tenant_id)
+ return self._member_req('POST', resource, data, fmt, id=id,
+ subresource=subresource, ctx=context,
+ tenant_id=tenant_id)
def new_list_request(self, resource, fmt=None, params=None,
- subresource=None, parent_id=None):
- return self._req(
+ subresource=None, parent_id=None, tenant_id=None,
+ as_admin=False):
+ tenant_id = tenant_id or self._tenant_id
+ if as_admin:
+ return self._admin_req(
+ 'GET', resource, None, fmt, params=params, id=parent_id,
+ subresource=subresource, tenant_id=tenant_id
+ )
+ return self._reader_req(
'GET', resource, None, fmt, params=params, id=parent_id,
- subresource=subresource
+ subresource=subresource, tenant_id=tenant_id
)
def new_show_request(self, resource, id, fmt=None,
- subresource=None, fields=None, sub_id=None):
+ subresource=None, fields=None, sub_id=None,
+ tenant_id=None, as_admin=False):
+ tenant_id = tenant_id or self._tenant_id
if fields:
params = "&".join(["fields=%s" % x for x in fields])
else:
params = None
- return self._req('GET', resource, None, fmt, id=id,
- params=params, subresource=subresource, sub_id=sub_id)
+ if as_admin:
+ return self._admin_req('GET', resource, None, fmt, id=id,
+ params=params, subresource=subresource,
+ sub_id=sub_id, tenant_id=tenant_id)
+ return self._reader_req('GET', resource, None, fmt, id=id,
+ params=params, subresource=subresource,
+ sub_id=sub_id, tenant_id=tenant_id)
def new_delete_request(self, resource, id, fmt=None, subresource=None,
- sub_id=None, data=None, headers=None):
- return self._req(
- 'DELETE',
- resource,
- data,
- fmt,
- id=id,
- subresource=subresource,
- sub_id=sub_id,
- headers=headers
- )
+ sub_id=None, data=None, headers=None,
+ tenant_id=None, as_admin=False):
+ tenant_id = tenant_id or self._tenant_id
+ if as_admin:
+ return self._admin_req('DELETE', resource, data, fmt, id=id,
+ subresource=subresource, sub_id=sub_id,
+ headers=headers, tenant_id=tenant_id)
+ return self._member_req('DELETE', resource, data, fmt, id=id,
+ subresource=subresource, sub_id=sub_id,
+ headers=headers, tenant_id=tenant_id)
def new_update_request(self, resource, data, id, fmt=None,
subresource=None, context=None, sub_id=None,
- headers=None):
- return self._req(
+ headers=None, as_admin=False, tenant_id=None):
+ tenant_id = tenant_id or self._tenant_id
+ if as_admin:
+ return self._admin_req(
+ 'PUT', resource, data, fmt, id=id, subresource=subresource,
+ sub_id=sub_id, ctx=context, headers=headers,
+ tenant_id=tenant_id
+ )
+ return self._member_req(
'PUT', resource, data, fmt, id=id, subresource=subresource,
- sub_id=sub_id, context=context, headers=headers
+ sub_id=sub_id, ctx=context, headers=headers, tenant_id=tenant_id
)
def new_action_request(self, resource, data, id, action, fmt=None,
- subresource=None, sub_id=None):
- return self._req(
- 'PUT',
- resource,
- data,
- fmt,
- id=id,
- action=action,
- subresource=subresource,
- sub_id=sub_id
- )
+ subresource=None, sub_id=None, tenant_id=None,
+ as_admin=False):
+ tenant_id = tenant_id or self._tenant_id
+ if as_admin:
+ return self._admin_req('PUT', resource, data, fmt, id=id,
+ action=action, subresource=subresource,
+ sub_id=sub_id, tenant_id=tenant_id)
+ return self._member_req('PUT', resource, data, fmt, id=id,
+ action=action, subresource=subresource,
+ sub_id=sub_id, tenant_id=tenant_id)
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
@@ -328,23 +385,19 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return random.choice(ip_list)
return ip_list[0]
- def _create_bulk_from_list(self, fmt, resource, objects, **kwargs):
+ def _create_bulk_from_list(self, fmt, resource, objects, tenant_id=None,
+ as_admin=False, **kwargs):
"""Creates a bulk request from a list of objects."""
collection = "%ss" % resource
req_data = {collection: objects}
- req = self.new_create_request(collection, req_data, fmt)
- if ('set_context' in kwargs and
- kwargs['set_context'] is True and
- 'tenant_id' in kwargs):
- # create a specific auth context for this request
- req.environ['neutron.context'] = context.Context(
- '', kwargs['tenant_id'])
- elif 'context' in kwargs:
- req.environ['neutron.context'] = kwargs['context']
+ req = self.new_create_request(collection, req_data, fmt,
+ tenant_id=tenant_id, as_admin=as_admin)
return req.get_response(self.api)
- def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
+ def _create_bulk(self, fmt, number, resource, data, name='test',
+ tenant_id=None, as_admin=False, **kwargs):
"""Creates a bulk request for any kind of resource."""
+ tenant_id = tenant_id or self._tenant_id
objects = []
collection = "%ss" % resource
for i in range(number):
@@ -354,19 +407,13 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
obj[resource].update(kwargs['override'][i])
objects.append(obj)
req_data = {collection: objects}
- req = self.new_create_request(collection, req_data, fmt)
- if ('set_context' in kwargs and
- kwargs['set_context'] is True and
- 'tenant_id' in kwargs):
- # create a specific auth context for this request
- req.environ['neutron.context'] = context.Context(
- '', kwargs['tenant_id'])
- elif 'context' in kwargs:
- req.environ['neutron.context'] = kwargs['context']
+ req = self.new_create_request(collection, req_data, fmt,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_state_up,
- arg_list=None, set_context=False, tenant_id=None,
+ arg_list=None, tenant_id=None, as_admin=False,
**kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'network': {'name': name,
@@ -378,11 +425,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
# Arg must be present
if arg in kwargs:
data['network'][arg] = kwargs[arg]
- network_req = self.new_create_request('networks', data, fmt)
- if set_context and tenant_id:
- # create a specific auth context for this request
- network_req.environ['neutron.context'] = context.Context(
- '', tenant_id)
+ network_req = self.new_create_request('networks', data, fmt,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
return network_req.get_response(self.api)
@@ -392,11 +437,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
- def _create_subnet(self, fmt, net_id, cidr,
- expected_res_status=None, **kwargs):
+ def _create_subnet(self, fmt, net_id, cidr, expected_res_status=None,
+ tenant_id=None, as_admin=False, **kwargs):
+ tenant_id = tenant_id or self._tenant_id
data = {'subnet': {'network_id': net_id,
'ip_version': constants.IP_VERSION_4,
- 'tenant_id': self._tenant_id}}
+ 'tenant_id': tenant_id}}
if cidr:
data['subnet']['cidr'] = cidr
for arg in ('ip_version', 'tenant_id', 'subnetpool_id', 'prefixlen',
@@ -412,11 +458,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
kwargs['gateway_ip'] is not constants.ATTR_NOT_SPECIFIED):
data['subnet']['gateway_ip'] = kwargs['gateway_ip']
- subnet_req = self.new_create_request('subnets', data, fmt)
- if (kwargs.get('set_context') and 'tenant_id' in kwargs):
- # create a specific auth context for this request
- subnet_req.environ['neutron.context'] = context.Context(
- '', kwargs['tenant_id'])
+ subnet_req = self.new_create_request('subnets', data, fmt,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
subnet_res = subnet_req.get_response(self.api)
if expected_res_status:
@@ -443,24 +487,25 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
def _create_subnetpool(self, fmt, prefixes,
- expected_res_status=None, admin=False, **kwargs):
+ expected_res_status=None, admin=False,
+ tenant_id=None, **kwargs):
+ tenant_id = tenant_id or self._tenant_id
subnetpool = {'subnetpool': {'prefixes': prefixes}}
for k, v in kwargs.items():
subnetpool['subnetpool'][k] = str(v)
api = self._api_for_resource('subnetpools')
subnetpools_req = self.new_create_request('subnetpools',
- subnetpool, fmt)
- if not admin:
- neutron_context = context.Context('', kwargs['tenant_id'])
- subnetpools_req.environ['neutron.context'] = neutron_context
+ subnetpool, fmt,
+ tenant_id=tenant_id,
+ as_admin=admin)
subnetpool_res = subnetpools_req.get_response(api)
if expected_res_status:
self.assertEqual(expected_res_status, subnetpool_res.status_int)
return subnetpool_res
def _create_port(self, fmt, net_id, expected_res_status=None,
- arg_list=None, set_context=False, is_admin=False,
+ arg_list=None, is_admin=False,
tenant_id=None, **kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'port': {'network_id': net_id,
@@ -481,11 +526,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
'device_id' not in kwargs):
device_id = utils.get_dhcp_agent_device_id(net_id, kwargs['host'])
data['port']['device_id'] = device_id
- port_req = self.new_create_request('ports', data, fmt)
- if set_context and tenant_id:
- # create a specific auth context for this request
- port_req.environ['neutron.context'] = context.Context(
- '', tenant_id, is_admin=is_admin)
+ port_req = self.new_create_request('ports', data, fmt,
+ tenant_id=tenant_id,
+ as_admin=is_admin)
port_res = port_req.get_response(self.api)
if expected_res_status:
@@ -499,28 +542,26 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
query_params.append("network_id=%s" % net_id)
if kwargs.get('device_owner'):
query_params.append("device_owner=%s" % kwargs.get('device_owner'))
- port_req = self.new_list_request('ports', fmt, '&'.join(query_params))
- if ('set_context' in kwargs and
- kwargs['set_context'] is True and
- 'tenant_id' in kwargs):
- # create a specific auth context for this request
- port_req.environ['neutron.context'] = context.Context(
- '', kwargs['tenant_id'])
-
+ port_req = self.new_list_request('ports', fmt, '&'.join(query_params),
+ tenant_id=kwargs.get('tenant_id'))
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, port_res.status_int)
return port_res
def _create_port_bulk(self, fmt, number, net_id, name,
- admin_state_up, **kwargs):
+ admin_state_up, tenant_id=None, as_admin=False,
+ **kwargs):
base_data = {'port': {'network_id': net_id,
- 'admin_state_up': admin_state_up,
- 'tenant_id': self._tenant_id}}
- return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
-
- def _make_network(self, fmt, name, admin_state_up, **kwargs):
- res = self._create_network(fmt, name, admin_state_up, **kwargs)
+ 'admin_state_up': admin_state_up}}
+ return self._create_bulk(fmt, number, 'port', base_data,
+ tenant_id=tenant_id, as_admin=as_admin,
+ **kwargs)
+
+ def _make_network(self, fmt, name, admin_state_up, as_admin=False,
+ **kwargs):
+ res = self._create_network(fmt, name, admin_state_up,
+ as_admin=as_admin, **kwargs)
# TODO(salvatore-orlando): do exception handling in this test module
# in a uniform way (we do it differently for ports, subnets, and nets
# Things can go wrong - raise HTTP exc with res code only
@@ -533,7 +574,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
allocation_pools=None, ip_version=constants.IP_VERSION_4,
enable_dhcp=True, dns_nameservers=None, host_routes=None,
shared=None, ipv6_ra_mode=None, ipv6_address_mode=None,
- tenant_id=None, set_context=False, segment_id=None):
+ tenant_id=None, segment_id=None, as_admin=False):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
@@ -550,7 +591,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode,
- set_context=set_context)
+ as_admin=as_admin)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
@@ -572,11 +613,13 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
ipv6_ra_mode=ra_addr_mode,
ipv6_address_mode=ra_addr_mode))
- def _make_subnetpool(self, fmt, prefixes, admin=False, **kwargs):
+ def _make_subnetpool(self, fmt, prefixes, admin=False, tenant_id=None,
+ **kwargs):
res = self._create_subnetpool(fmt,
prefixes,
None,
admin,
+ tenant_id=tenant_id,
**kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
@@ -584,8 +627,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
- def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs):
- res = self._create_port(fmt, net_id, expected_res_status, **kwargs)
+ def _make_port(self, fmt, net_id, expected_res_status=None,
+ as_admin=False, **kwargs):
+ res = self._create_port(fmt, net_id, expected_res_status,
+ is_admin=as_admin, **kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
@@ -596,7 +641,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
max_burst_kbps=None, dscp_mark=None, min_kbps=None,
direction=constants.EGRESS_DIRECTION,
expected_res_status=None, project_id=None,
- set_context=False, is_admin=False):
+ is_admin=False):
# Accepted rule types: "bandwidth_limit", "dscp_marking" and
# "minimum_bandwidth"
self.assertIn(rule_type, [qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
@@ -615,11 +660,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
data[type_req][qos_const.MIN_KBPS] = min_kbps
data[type_req][qos_const.DIRECTION] = direction
route = 'qos/policies/%s/%s' % (qos_policy_id, type_req + 's')
- qos_rule_req = self.new_create_request(route, data, fmt)
- if set_context and project_id:
- # create a specific auth context for this request
- qos_rule_req.environ['neutron.context'] = context.Context(
- '', project_id, is_admin=is_admin)
+ qos_rule_req = self.new_create_request(route, data, fmt,
+ tenant_id=project_id,
+ as_admin=is_admin)
qos_rule_res = qos_rule_req.get_response(self.api)
if expected_res_status:
@@ -628,16 +671,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _create_qos_policy(self, fmt, qos_policy_name=None,
expected_res_status=None, project_id=None,
- set_context=False, is_admin=False):
+ is_admin=False):
project_id = project_id or self._tenant_id
name = qos_policy_name or uuidutils.generate_uuid()
data = {'policy': {'name': name,
'project_id': project_id}}
- qos_req = self.new_create_request('policies', data, fmt)
- if set_context and project_id:
- # create a specific auth context for this request
- qos_req.environ['neutron.context'] = context.Context(
- '', project_id, is_admin=is_admin)
+ qos_req = self.new_create_request('policies', data, fmt,
+ tenant_id=project_id,
+ as_admin=is_admin)
qos_policy_res = qos_req.get_response(self.api)
if expected_res_status:
@@ -653,54 +694,49 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code,
- neutron_context=None, headers=None, subresource=None,
- sub_id=None):
+ headers=None, subresource=None, sub_id=None,
+ tenant_id=None, as_admin=False):
req = self.new_delete_request(collection, id, headers=headers,
- subresource=subresource, sub_id=sub_id)
- if neutron_context:
- # create a specific auth context for this request
- req.environ['neutron.context'] = neutron_context
+ subresource=subresource, sub_id=sub_id,
+ tenant_id=tenant_id, as_admin=as_admin)
+
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(expected_code, res.status_int)
- def _show_response(self, resource, id, neutron_context=None):
- req = self.new_show_request(resource, id)
- if neutron_context:
- # create a specific auth context for this request
- req.environ['neutron.context'] = neutron_context
- elif hasattr(self, 'tenant_id'):
- req.environ['neutron.context'] = context.Context('',
- self.tenant_id)
+ def _show_response(self, resource, id, tenant_id=None, as_admin=False):
+ req = self.new_show_request(resource, id,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
return req.get_response(self._api_for_resource(resource))
def _show(self, resource, id,
expected_code=webob.exc.HTTPOk.code,
- neutron_context=None):
- res = self._show_response(resource, id,
- neutron_context=neutron_context)
+ tenant_id=None, as_admin=False):
+ res = self._show_response(resource, id, tenant_id=tenant_id,
+ as_admin=as_admin)
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)
def _update(self, resource, id, new_data,
- expected_code=webob.exc.HTTPOk.code,
- neutron_context=None, headers=None):
- req = self.new_update_request(resource, new_data, id, headers=headers)
- if neutron_context:
- # create a specific auth context for this request
- req.environ['neutron.context'] = neutron_context
+ expected_code=webob.exc.HTTPOk.code, headers=None,
+ request_tenant_id=None, as_admin=False):
+ req = self.new_update_request(
+ resource, new_data, id, headers=headers,
+ tenant_id=request_tenant_id, as_admin=as_admin)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)
- def _list(self, resource, fmt=None, neutron_context=None,
+ def _list(self, resource, fmt=None,
query_params=None, expected_code=webob.exc.HTTPOk.code,
- parent_id=None, subresource=None):
+ parent_id=None, subresource=None,
+ tenant_id=None, as_admin=False):
fmt = fmt or self.fmt
req = self.new_list_request(resource, fmt, query_params,
subresource=subresource,
- parent_id=parent_id)
- if neutron_context:
- req.environ['neutron.context'] = neutron_context
+ parent_id=parent_id,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(expected_code, res.status_int)
return self.deserialize(fmt, res)
@@ -730,13 +766,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
self.assertEqual(items[0]['name'], 'test_0')
self.assertEqual(items[1]['name'], 'test_1')
- def _test_list_resources(self, resource, items, neutron_context=None,
- query_params=None,
- expected_code=webob.exc.HTTPOk.code):
+ def _test_list_resources(self, resource, items, query_params=None,
+ expected_code=webob.exc.HTTPOk.code,
+ tenant_id=None, as_admin=False):
res = self._list('%ss' % resource,
- neutron_context=neutron_context,
query_params=query_params,
- expected_code=expected_code)
+ expected_code=expected_code,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
if expected_code == webob.exc.HTTPOk.code:
resource = resource.replace('-', '_')
self.assertCountEqual([i['id'] for i in res['%ss' % resource]],
@@ -771,7 +808,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
tenant_id=None,
project_id=None,
service_types=None,
- set_context=False):
+ as_admin=False):
if project_id:
tenant_id = project_id
cidr = netaddr.IPNetwork(cidr) if cidr else None
@@ -780,7 +817,6 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
gateway_ip = netaddr.IPAddress(gateway_ip)
with optional_ctx(network, self.network,
- set_context=set_context,
tenant_id=tenant_id) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
@@ -797,7 +833,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode,
tenant_id=tenant_id,
- set_context=set_context)
+ as_admin=as_admin)
yield subnet
@contextlib.contextmanager
@@ -811,22 +847,22 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
yield subnetpool
@contextlib.contextmanager
- def port(self, subnet=None, fmt=None, set_context=False, project_id=None,
+ def port(self, subnet=None, fmt=None, project_id=None, is_admin=False,
**kwargs):
tenant_id = project_id if project_id else kwargs.pop(
'tenant_id', None)
with optional_ctx(
subnet, self.subnet,
- set_context=set_context, tenant_id=tenant_id) as subnet_to_use:
+ tenant_id=tenant_id) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(
- fmt or self.fmt, net_id,
- set_context=set_context, tenant_id=tenant_id,
- **kwargs)
+ fmt or self.fmt, net_id, tenant_id=tenant_id,
+ as_admin=is_admin, **kwargs)
yield port
def _test_list_with_sort(self, resource,
- items, sorts, resources=None, query_params=''):
+ items, sorts, resources=None, query_params='',
+ tenant_id=None, as_admin=False):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
@@ -834,7 +870,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
if not resources:
resources = '%ss' % resource
req = self.new_list_request(resources,
- params=query_str)
+ params=query_str,
+ tenant_id=tenant_id,
+ as_admin=as_admin)
api = self._api_for_resource(resources)
res = self.deserialize(self.fmt, req.get_response(api))
resource = resource.replace('-', '_')
@@ -846,13 +884,17 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
limit, expected_page_num,
resources=None,
query_params='',
- verify_key='id'):
+ verify_key='id',
+ tenant_id=None,
+ as_admin=False):
if not resources:
resources = '%ss' % resource
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
- req = self.new_list_request(resources, params=query_str)
+ req = self.new_list_request(resources, params=query_str,
+ tenant_id=tenant_id, as_admin=as_admin)
+ neutron_ctx = req.environ['neutron.context']
items_res = []
page_num = 0
api = self._api_for_resource(resources)
@@ -871,6 +913,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
+ req.environ['neutron.context'] = neutron_ctx
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(expected_page_num, page_num)
@@ -880,7 +923,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _test_list_with_pagination_reverse(self, resource, items, sort,
limit, expected_page_num,
resources=None,
- query_params=''):
+ query_params='',
+ tenant_id=None,
+ as_admin=False):
if not resources:
resources = '%ss' % resource
resource = resource.replace('-', '_')
@@ -891,7 +936,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
- req = self.new_list_request(resources, params=query_str)
+ req = self.new_list_request(resources, params=query_str,
+ tenant_id=tenant_id, as_admin=as_admin)
+ neutron_ctx = req.environ['neutron.context']
item_res = [items[-1][resource]]
page_num = 0
resources = resources.replace('-', '_')
@@ -909,6 +956,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
+ req.environ['neutron.context'] = neutron_ctx
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(expected_page_num, page_num)
@@ -1001,10 +1049,9 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
self._create_network(self.fmt,
'some_net',
True,
- tenant_id=tenant_id,
- set_context=True)
- req = self.new_list_request('networks', params="fields=name")
- req.environ['neutron.context'] = context.Context('', tenant_id)
+ tenant_id=tenant_id)
+ req = self.new_list_request(
+ 'networks', params="fields=name", tenant_id=tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
@@ -1020,10 +1067,9 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
self._create_network(self.fmt,
'some_net',
True,
- tenant_id=tenant_id,
- set_context=True)
- req = self.new_list_request('networks', params="fields=tenant_id")
- req.environ['neutron.context'] = context.Context('', tenant_id)
+ tenant_id=tenant_id)
+ req = self.new_list_request(
+ 'networks', params="fields=tenant_id", tenant_id=tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'tenant_id')
@@ -1086,7 +1132,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network=network) as subnet:
with self.port(name='myname', subnet=subnet) as port:
for k, v in keys:
@@ -1108,7 +1154,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
- set_context=False)
+ is_admin=True)
def test_create_port_bad_tenant(self):
with self.network() as network:
@@ -1118,17 +1164,15 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
- fixed_ips=[],
- set_context=True)
+ fixed_ips=[])
def test_create_port_public_network(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='another_tenant',
- set_context=True)
+ tenant_id='another_tenant')
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
@@ -1147,11 +1191,10 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
webob.exc.HTTPClientError.code,
tenant_id='tenant_id',
fixed_ips=[],
- set_context=False,
**kwargs)
def test_create_port_public_network_with_ip(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
ip_net = netaddr.IPNetwork('10.0.0.0/24')
with self.subnet(network=network, cidr=str(ip_net)):
keys = [('admin_state_up', True),
@@ -1159,8 +1202,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='another_tenant',
- set_context=True)
+ tenant_id='another_tenant')
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
@@ -1170,7 +1212,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
self._delete('ports', port['port']['id'])
def test_create_port_anticipating_allocation(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
fixed_ips = [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id'],
@@ -1181,14 +1223,13 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_public_network_with_invalid_ip_no_subnet_id(self,
expected_error='InvalidIpForNetwork'):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24'):
ips = [{'ip_address': '1.1.1.1'}]
res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPBadRequest.code,
- fixed_ips=ips,
- set_context=True)
+ fixed_ips=ips)
data = self.deserialize(self.fmt, res)
msg = str(lib_exc.InvalidIpForNetwork(ip_address='1.1.1.1'))
self.assertEqual(expected_error, data['NeutronError']['type'])
@@ -1196,15 +1237,14 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_public_network_with_invalid_ip_and_subnet_id(self,
expected_error='InvalidIpForSubnet'):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '1.1.1.1'}]
res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPBadRequest.code,
- fixed_ips=ips,
- set_context=True)
+ fixed_ips=ips)
data = self.deserialize(self.fmt, res)
msg = str(lib_exc.InvalidIpForSubnet(ip_address='1.1.1.1'))
self.assertEqual(expected_error, data['NeutronError']['type'])
@@ -1342,29 +1382,29 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self._test_list_ports_filtered_by_fixed_ip(limit=500)
def test_list_ports_public_network(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# Admin request - must return both ports
- self._test_list_resources('port', [port1, port2])
+ self._test_list_resources(
+ 'port', [port1, port2], as_admin=True)
# Tenant_1 request - must return single port
- n_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
- neutron_context=n_context)
+ tenant_id='tenant_1')
# Tenant_2 request - must return single port
- n_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
- neutron_context=n_context)
+ tenant_id='tenant_2')
def test_list_ports_for_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
- with self.subnet(network) as subnet:
- with self.port(subnet, tenant_id='tenant_1') as port1,\
- self.port(subnet, tenant_id='tenant_2') as port2:
+ with self.subnet(network, tenant_id='tenant_1') as subnet:
+ with self.port(subnet, project_id='tenant_1') as port1,\
+ self.port(subnet, project_id='tenant_2',
+ is_admin=True) as port2:
# network owner request, should return all ports
port_res = self._list_ports(
- 'json', set_context=True, tenant_id='tenant_1')
+ 'json', tenant_id='tenant_1')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(2, len(port_list))
@@ -1373,7 +1413,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
# another tenant request, only return ports belong to it
port_res = self._list_ports(
- 'json', set_context=True, tenant_id='tenant_2')
+ 'json', tenant_id='tenant_2')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(1, len(port_list))
@@ -1467,12 +1507,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
expected_code=webob.exc.HTTPNotFound.code)
def test_delete_port_public_network(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='another_tenant',
- set_context=True)
+ tenant_id='another_tenant')
port = self.deserialize(self.fmt, port_res)
self._delete('ports', port['port']['id'])
@@ -1482,15 +1521,15 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_delete_port_by_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network) as subnet:
- with self.port(subnet, tenant_id='tenant_2') as port:
+ with self.port(subnet, tenant_id='tenant_2',
+ is_admin=True) as port:
self._delete(
- 'ports', port['port']['id'],
- neutron_context=context.Context('', 'tenant_1'))
+ 'ports', port['port']['id'], tenant_id='tenant_1')
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_update_port_with_stale_subnet(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
port = self._make_port(self.fmt, network['network']['id'])
subnet = self._make_subnet(self.fmt, network,
'10.0.0.1', '10.0.0.0/24')
@@ -1528,7 +1567,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
data = {'port': {'mac_address': new_mac}}
if updated_fixed_ips:
data['port']['fixed_ips'] = updated_fixed_ips
- req = self.new_update_request('ports', data, port['id'])
+ req = self.new_update_request(
+ 'ports', data, port['id'], as_admin=True)
return req.get_response(self.api), new_mac
def _verify_ips_after_mac_change(self, orig_port, new_port):
@@ -1553,6 +1593,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
host_arg = host_arg or {}
arg_list = arg_list or []
with self.port(device_owner=device_owner, subnet=subnet,
+ is_admin=True,
arg_list=arg_list, **host_arg) as port:
self.assertIn('mac_address', port['port'])
res, new_mac = self.update_port_mac(
@@ -1634,7 +1675,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
new_mac = port2['port']['mac_address']
data = {'port': {'mac_address': new_mac}}
req = self.new_update_request('ports', data,
- port['port']['id'])
+ port['port']['id'],
+ as_admin=True)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPConflict.code,
res.status_int)
@@ -1647,16 +1689,14 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_update_port_not_admin(self):
res = self._create_network(self.fmt, 'net1', True,
- tenant_id='not_admin',
- set_context=True)
+ tenant_id='not_admin')
net1 = self.deserialize(self.fmt, res)
res = self._create_port(self.fmt, net1['network']['id'],
- tenant_id='not_admin', set_context=True)
+ tenant_id='not_admin')
port = self.deserialize(self.fmt, res)
data = {'port': {'admin_state_up': False}}
- neutron_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
- neutron_context=neutron_context)
+ request_tenant_id='not_admin')
self.assertFalse(port['port']['admin_state_up'])
def test_update_device_id_unchanged(self):
@@ -2746,7 +2786,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', self.net_create_status), ('shared', True)]
- with self.network(name=name, shared=True) as net:
+ with self.network(name=name, shared=True, as_admin=True) as net:
for k, v in keys:
self.assertEqual(net['network'][k], v)
@@ -2756,8 +2796,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
shared=True,
- tenant_id="another_tenant",
- set_context=True):
+ tenant_id="another_tenant"):
pass
self.assertEqual(webob.exc.HTTPForbidden.code,
ctx_manager.exception.code)
@@ -2773,12 +2812,12 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
res['network']['name'])
def test_update_shared_network_noadmin_returns_403(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
- req.environ['neutron.context'] = context.Context('', 'somebody')
+ network['network']['id'],
+ tenant_id='other-tenant')
res = req.get_response(self.api)
self.assertEqual(403, res.status_int)
@@ -2787,7 +2826,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
+ network['network']['id'],
+ as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertTrue(res['network']['shared'])
@@ -2808,7 +2848,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
+ network['network']['id'],
+ as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertTrue(res['network']['shared'])
# must query db to see whether subnet's shared attribute
@@ -2819,39 +2860,38 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
self.assertTrue(subnet_db['shared'])
def test_update_network_set_not_shared_single_tenant(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id=network['network']['tenant_id'],
- set_context=True)
+ tenant_id=network['network']['tenant_id'])
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
+ network['network']['id'],
+ as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertFalse(res['network']['shared'])
port1 = self.deserialize(self.fmt, res1)
self._delete('ports', port1['port']['id'])
- def test_update_network_set_not_shared_other_tenant_returns_409(self):
- with self.network(shared=True) as network:
+ def test_update_network_set_not_shared_other_tenant_returns_403(self):
+ with self.network(shared=True, as_admin=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='somebody_else',
- set_context=True)
+ tenant_id='somebody_else')
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
- self.assertEqual(webob.exc.HTTPConflict.code,
+ self.assertEqual(webob.exc.HTTPForbidden.code,
req.get_response(self.api).status_int)
port1 = self.deserialize(self.fmt, res1)
self._delete('ports', port1['port']['id'])
def test_update_network_set_not_shared_other_tenant_access_via_rbac(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
ctx = context.get_admin_context()
with db_api.CONTEXT_WRITER.using(ctx):
network_obj.NetworkRBAC(
@@ -2867,33 +2907,32 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='somebody_else',
- set_context=True)
+ tenant_id='somebody_else')
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
+ network['network']['id'],
+ as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertFalse(res['network']['shared'])
port1 = self.deserialize(self.fmt, res1)
self._delete('ports', port1['port']['id'])
def test_update_network_set_not_shared_multi_tenants_returns_409(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='somebody_else',
- set_context=True)
+ tenant_id='somebody_else')
res2 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id=network['network']['tenant_id'],
- set_context=True)
+ tenant_id=network['network']['tenant_id'])
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
+ network['network']['id'],
+ as_admin=True)
self.assertEqual(webob.exc.HTTPConflict.code,
req.get_response(self.api).status_int)
port1 = self.deserialize(self.fmt, res1)
@@ -2902,22 +2941,21 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
self._delete('ports', port2['port']['id'])
def test_update_network_set_not_shared_multi_tenants2_returns_409(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
- tenant_id='somebody_else',
- set_context=True)
+ tenant_id='somebody_else')
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.0.0/24',
webob.exc.HTTPCreated.code,
- tenant_id=network['network']['tenant_id'],
- set_context=True)
+ tenant_id=network['network']['tenant_id'])
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
- network['network']['id'])
+ network['network']['id'],
+ as_admin=True)
self.assertEqual(webob.exc.HTTPConflict.code,
req.get_response(self.api).status_int)
@@ -2967,7 +3005,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
{'network': {'name': 'n2',
'tenant_id': 't1'}}]
- res = self._create_bulk_from_list(self.fmt, 'network', networks)
+ res = self._create_bulk_from_list(self.fmt, 'network', networks,
+ as_admin=True)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_networks_bulk_tenants_and_quotas_fail(self):
@@ -2987,7 +3026,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
{'network': {'name': 'n2',
'tenant_id': 't1'}}]
- res = self._create_bulk_from_list(self.fmt, 'network', networks)
+ res = self._create_bulk_from_list(self.fmt, 'network', networks,
+ as_admin=True)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_networks_bulk_emulated(self):
@@ -3136,9 +3176,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
- with self.network(name='net1', shared=True) as net1,\
+ with self.network(name='net1', shared=True, as_admin=True) as net1,\
self.network(name='net2', shared=False) as net2,\
- self.network(name='net3', shared=True) as net3:
+ self.network(name='net3', shared=True, as_admin=True) as net3:
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2,
@@ -3215,14 +3255,13 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
tenant_id='tenant1') as net1,\
self.network(shared=True,
name='net2',
+ as_admin=True,
tenant_id='another_tenant') as net2,\
self.network(shared=False,
name='net3',
tenant_id='another_tenant'):
- ctx = context.Context(user_id='non_admin',
- tenant_id='tenant1',
- is_admin=False)
- self._test_list_resources('network', (net1, net2), ctx)
+ self._test_list_resources('network', (net1, net2),
+ tenant_id='tenant1')
def test_show_network(self):
with self.network(name='net1') as net:
@@ -3760,8 +3799,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
ip_version=constants.IP_VERSION_4,
tenant_id='bad_tenant_id',
gateway_ip='10.0.2.1',
- device_owner='fake_owner',
- set_context=True)
+ device_owner='fake_owner')
def test_create_subnet_as_admin(self):
with self.network() as network:
@@ -3773,7 +3811,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
tenant_id='bad_tenant_id',
gateway_ip='10.0.2.1',
device_owner='fake_owner',
- set_context=False)
+ as_admin=True)
def test_create_subnet_nonzero_cidr(self):
# Pass None as gateway_ip to prevent ip auto allocation for gw
@@ -4464,7 +4502,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
'tenant_id': 'tenant_id',
'device_id': 'fake_device',
'device_owner': constants.DEVICE_OWNER_ROUTER_GW}
- res = self._create_port(self.fmt, net_id=net_id, **kwargs)
+ res = self._create_port(self.fmt, net_id=net_id,
+ is_admin=True, **kwargs)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_subnet_ipv6_first_ip_owned_by_non_router(self):
@@ -4480,7 +4519,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
'tenant_id': 'tenant_id',
'device_id': 'fake_device',
'device_owner': 'fake_owner'}
- res = self._create_port(self.fmt, net_id=net_id, **kwargs)
+ res = self._create_port(self.fmt, net_id=net_id,
+ is_admin=True, **kwargs)
self.assertEqual(webob.exc.HTTPClientError.code,
res.status_int)
@@ -4804,7 +4844,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
ra_addr_mode=constants.DHCPV6_STATELESS)
def test_update_subnet_shared_returns_400(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'shared': True}}
req = self.new_update_request('subnets', data,
@@ -5294,7 +5334,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
with self.subnet(network=network,
gateway_ip='10.0.0.1',
cidr='10.0.0.0/24',
- tenant_id=project_id),\
+ tenant_id=project_id,
+ as_admin=True),\
self.subnet(network=network,
gateway_ip='10.0.1.1',
cidr='10.0.1.0/24'),\
@@ -5351,7 +5392,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self._test_list_resources('subnet', subnets)
def test_list_subnets_shared(self):
- with self.network(shared=True) as network:
+ with self.network(shared=True, as_admin=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
with self.subnet(cidr='10.0.1.0/24') as priv_subnet:
# normal user should see only 1 subnet
@@ -6117,8 +6158,7 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase):
min_prefixlen='24',
shared=True)
admin_res = self._list('subnetpools')
- mortal_res = self._list('subnetpools',
- neutron_context=context.Context('', 'not-the-owner'))
+ mortal_res = self._list('subnetpools', tenant_id='not-the-owner')
self.assertEqual(1, len(admin_res['subnetpools']))
self.assertEqual(1, len(mortal_res['subnetpools']))
@@ -6130,8 +6170,7 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase):
min_prefixlen='24',
shared=False)
admin_res = self._list('subnetpools')
- mortal_res = self._list('subnetpools',
- neutron_context=context.Context('', 'not-the-owner'))
+ mortal_res = self._list('subnetpools', tenant_id='not-the-owner')
self.assertEqual(1, len(admin_res['subnetpools']))
self.assertEqual(0, len(mortal_res['subnetpools']))
@@ -7197,10 +7236,10 @@ class DbOperationBoundMixin(object):
def get_api_kwargs(self):
context_ = self._get_context()
- return {'set_context': True, 'tenant_id': context_.project_id}
+ return {'tenant_id': context_.project_id}
def _list_and_record_queries(self, resource, query_params=None):
- kwargs = {'neutron_context': self._get_context()}
+ kwargs = {}
if query_params:
kwargs['query_params'] = query_params
# list once before tracking to flush out any quota recalculations.
diff --git a/neutron/tests/unit/db/test_dvr_mac_db.py b/neutron/tests/unit/db/test_dvr_mac_db.py
index 80d650a7d8..6f87672712 100644
--- a/neutron/tests/unit/db/test_dvr_mac_db.py
+++ b/neutron/tests/unit/db/test_dvr_mac_db.py
@@ -188,22 +188,28 @@ class DvrDbMixinTestCase(test_plugin.Ml2PluginV2TestCase):
arg_list = (portbindings.HOST_ID,)
with self.subnet() as subnet,\
self.port(subnet=subnet,
+ is_admin=True,
device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX,
arg_list=arg_list, **host_arg) as compute_port,\
self.port(subnet=subnet,
device_owner=constants.DEVICE_OWNER_DHCP,
+ is_admin=True,
arg_list=arg_list, **host_arg) as dhcp_port,\
self.port(subnet=subnet,
device_owner=constants.DEVICE_OWNER_LOADBALANCER,
+ is_admin=True,
arg_list=arg_list, **host_arg) as lb_port,\
self.port(device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX,
+ is_admin=True,
arg_list=arg_list, **host_arg),\
self.port(subnet=subnet,
device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX,
+ is_admin=True,
arg_list=arg_list,
**{portbindings.HOST_ID: 'other'}),\
self.port(subnet=subnet,
device_owner=constants.DEVICE_OWNER_NETWORK_PREFIX,
+ is_admin=True,
arg_list=arg_list, **host_arg):
expected_ids = [port['port']['id'] for port in
[compute_port, dhcp_port, lb_port]]
diff --git a/neutron/tests/unit/db/test_ipam_backend_mixin.py b/neutron/tests/unit/db/test_ipam_backend_mixin.py
index e81a908ec2..fa2872a317 100644
--- a/neutron/tests/unit/db/test_ipam_backend_mixin.py
+++ b/neutron/tests/unit/db/test_ipam_backend_mixin.py
@@ -373,7 +373,8 @@ class TestPortUpdateIpam(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
- **{portbindings.HOST_ID: 'fakehost'})
+ **{portbindings.HOST_ID: 'fakehost'},
+ is_admin=True)
port = self.deserialize(self.fmt, response)
# Create the subnet and try to update the port to get an IP
@@ -381,7 +382,8 @@ class TestPortUpdateIpam(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
data = {'port': {
'fixed_ips': [{'subnet_id': subnet['subnet']['id']}]}}
port_id = port['port']['id']
- port_req = self.new_update_request('ports', data, port_id)
+ port_req = self.new_update_request('ports', data, port_id,
+ as_admin=True)
response = port_req.get_response(self.api)
res = self.deserialize(self.fmt, response)
diff --git a/neutron/tests/unit/db/test_ipam_pluggable_backend.py b/neutron/tests/unit/db/test_ipam_pluggable_backend.py
index b77ea16458..958c0abbc6 100644
--- a/neutron/tests/unit/db/test_ipam_pluggable_backend.py
+++ b/neutron/tests/unit/db/test_ipam_pluggable_backend.py
@@ -71,7 +71,6 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
plugin = 'neutron.tests.unit.db.test_ipam_backend_mixin.TestPlugin'
super(TestDbBasePluginIpam, self).setUp(plugin=plugin)
cfg.CONF.set_override("ipam_driver", 'internal')
- self.tenant_id = uuidutils.generate_uuid()
self.subnet_id = uuidutils.generate_uuid()
self.admin_context = ncontext.get_admin_context()
@@ -89,7 +88,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
'device_owner': constants.DEVICE_OWNER_COMPUTE_PREFIX + 'None'
},
'subnet_request': ipam_req.SpecificSubnetRequest(
- self.tenant_id,
+ self._tenant_id,
self.subnet_id,
'10.0.0.0/24',
'10.0.0.1',
diff --git a/neutron/tests/unit/db/test_l3_db.py b/neutron/tests/unit/db/test_l3_db.py
index 9b65b60f09..43d4ec4a49 100644
--- a/neutron/tests/unit/db/test_l3_db.py
+++ b/neutron/tests/unit/db/test_l3_db.py
@@ -928,7 +928,8 @@ class L3TestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
with db_api.CONTEXT_WRITER.using(self.ctx):
res = self._create_network(
self.fmt, name, True,
- arg_list=(extnet_apidef.EXTERNAL,), **kwargs)
+ arg_list=(extnet_apidef.EXTERNAL,),
+ as_admin=True, **kwargs)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
diff --git a/neutron/tests/unit/db/test_ovn_revision_numbers_db.py b/neutron/tests/unit/db/test_ovn_revision_numbers_db.py
index 62dfc9caa1..f375d3602d 100644
--- a/neutron/tests/unit/db/test_ovn_revision_numbers_db.py
+++ b/neutron/tests/unit/db/test_ovn_revision_numbers_db.py
@@ -237,7 +237,7 @@ class TestRevisionNumberMaintenance(test_securitygroup.SecurityGroupsTestCase,
'10.0.0.0/24')['subnet']
self._set_net_external(self.net['id'])
info = {'network_id': self.net['id']}
- router = self._make_router(self.fmt, None,
+ router = self._make_router(self.fmt, self._tenant_id,
external_gateway_info=info)['router']
fip = self._make_floatingip(self.fmt, self.net['id'])['floatingip']
port = self._make_port(self.fmt, self.net['id'])['port']