summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSlawek Kaplonski <skaplons@redhat.com>2023-05-09 12:28:03 +0200
committerSlawek Kaplonski <skaplons@redhat.com>2023-05-09 12:30:50 +0200
commitbe0dc09d52efd5e7236a33be552edb6644371cd0 (patch)
tree1c557c919e44edcaf64acf6a2408825379a740d0
parent9319ba00a986e693eeb4707055aa53111601e7c8 (diff)
downloadneutron-be0dc09d52efd5e7236a33be552edb6644371cd0.tar.gz
[S-RBAC] Fix new policies for get QoS rules APIs
During transition to the new secure RBAC API policies, we made mistake with policies for QoS rules by defining them to be available for ADMIN_OR_PROJECT_READER. This can't be like that as QoS rules don't have tenant_id attribute and belongs always to the owner of the QoS policy. To fix that, this patch introduces new rules: ADMIN_OR_PARENT_OWNER_READER ADMIN_OR_PARENT_OWNER_MEMBER and uses those in the QoS rules APIs. Closes-Bug: #2018727 Change-Id: I522aeab5094b3f4854303d5e18f3abf6130fb33c
-rw-r--r--neutron/conf/policies/base.py14
-rw-r--r--neutron/conf/policies/qos.py16
-rw-r--r--neutron/tests/unit/conf/policies/test_qos.py593
3 files changed, 356 insertions, 267 deletions
diff --git a/neutron/conf/policies/base.py b/neutron/conf/policies/base.py
index 14e419e7d1..395e4fd7f5 100644
--- a/neutron/conf/policies/base.py
+++ b/neutron/conf/policies/base.py
@@ -43,6 +43,20 @@ RULE_NET_OWNER = 'rule:network_owner'
RULE_PARENT_OWNER = 'rule:ext_parent_owner'
RULE_SG_OWNER = 'rule:sg_owner'
+# In some cases we need to check owner of the parent resource, it's like that
+# for example for QoS rules (check owner of QoS policy rule belongs to) or
+# Floating IP port forwarding (check owner of FIP which PF is using). It's like
+# that becasue those resources (QOS rules, FIP PFs) don't have project_id
+# attribute at all and they belongs to the same project as parent resource (QoS
+# policy, FIP).
+PARENT_OWNER_MEMBER = 'role:member and ' + RULE_PARENT_OWNER
+PARENT_OWNER_READER = 'role:reader and ' + RULE_PARENT_OWNER
+ADMIN_OR_PARENT_OWNER_MEMBER = (
+ '(' + ADMIN + ') or (' + PARENT_OWNER_MEMBER + ')')
+ADMIN_OR_PARENT_OWNER_READER = (
+ '(' + ADMIN + ') or (' + PARENT_OWNER_READER + ')')
+
+
rules = [
policy.RuleDefault(
'context_is_admin',
diff --git a/neutron/conf/policies/qos.py b/neutron/conf/policies/qos.py
index c9381bab23..2fc9d0975c 100644
--- a/neutron/conf/policies/qos.py
+++ b/neutron/conf/policies/qos.py
@@ -126,7 +126,7 @@ rules = [
policy.DocumentedRuleDefault(
name='get_policy_bandwidth_limit_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS bandwidth limit rule',
operations=[
@@ -202,7 +202,7 @@ rules = [
policy.DocumentedRuleDefault(
name='get_policy_packet_rate_limit_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS packet rate limit rule',
operations=[
@@ -258,7 +258,7 @@ rules = [
policy.DocumentedRuleDefault(
name='get_policy_dscp_marking_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS DSCP marking rule',
operations=[
@@ -334,7 +334,7 @@ rules = [
policy.DocumentedRuleDefault(
name='get_policy_minimum_bandwidth_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS minimum bandwidth rule',
operations=[
@@ -409,7 +409,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_policy_minimum_packet_rate_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS minimum packet rate rule',
operations=[
@@ -464,7 +464,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_alias_bandwidth_limit_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS bandwidth limit rule through alias',
operations=[
@@ -515,7 +515,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_alias_dscp_marking_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS DSCP marking rule through alias',
operations=[
@@ -566,7 +566,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_alias_minimum_bandwidth_rule',
- check_str=base.ADMIN_OR_PROJECT_READER,
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a QoS minimum bandwidth rule through alias',
operations=[
diff --git a/neutron/tests/unit/conf/policies/test_qos.py b/neutron/tests/unit/conf/policies/test_qos.py
index 8b468484da..2b4d7aea03 100644
--- a/neutron/tests/unit/conf/policies/test_qos.py
+++ b/neutron/tests/unit/conf/policies/test_qos.py
@@ -230,18 +230,20 @@ class QosRulesAPITestCase(base.PolicyBaseTestCase):
super(QosRulesAPITestCase, self).setUp()
self.qos_policy = {
'id': uuidutils.generate_uuid(),
+ 'tenant_id': self.project_id,
'project_id': self.project_id}
+ self.alt_qos_policy = {
+ 'id': uuidutils.generate_uuid(),
+ 'tenant_id': self.alt_project_id,
+ 'project_id': self.alt_project_id}
self.target = {
- 'project_id': self.project_id,
'policy_id': self.qos_policy['id'],
'ext_parent_policy_id': self.qos_policy['id']}
self.alt_target = {
- 'project_id': self.alt_project_id,
- 'policy_id': self.qos_policy['id'],
- 'ext_parent_policy_id': self.qos_policy['id']}
+ 'policy_id': self.alt_qos_policy['id'],
+ 'ext_parent_policy_id': self.alt_qos_policy['id']}
self.plugin_mock = mock.Mock()
- self.plugin_mock.get_qos_policy.return_value = self.qos_policy
mock.patch(
'neutron_lib.plugins.directory.get_plugin',
return_value=self.plugin_mock).start()
@@ -254,28 +256,33 @@ class SystemAdminQosBandwidthLimitRuleTests(QosRulesAPITestCase):
self.context = self.system_admin_ctx
def test_get_policy_bandwidth_limit_rule(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_bandwidth_limit_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_bandwidth_limit_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_bandwidth_limit_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_bandwidth_limit_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_bandwidth_limit_rule',
+ self.target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_bandwidth_limit_rule',
+ self.target)
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_bandwidth_limit_rule',
+ self.alt_target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_bandwidth_limit_rule',
+ self.alt_target)
def test_create_policy_bandwidth_limit_rule(self):
self.assertRaises(
@@ -361,24 +368,29 @@ class AdminQosBandwidthLimitRuleTests(QosRulesAPITestCase):
self.context = self.project_admin_ctx
def test_get_policy_bandwidth_limit_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_bandwidth_limit_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_bandwidth_limit_rule',
- self.alt_target))
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_bandwidth_limit_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_bandwidth_limit_rule',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_bandwidth_limit_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_bandwidth_limit_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_bandwidth_limit_rule',
+ self.alt_target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_bandwidth_limit_rule',
+ self.alt_target))
def test_create_policy_bandwidth_limit_rule(self):
self.assertTrue(
@@ -439,26 +451,32 @@ class ProjectMemberQosBandwidthLimitRuleTests(
self.context = self.project_member_ctx
def test_get_policy_bandwidth_limit_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_bandwidth_limit_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_policy_bandwidth_limit_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_bandwidth_limit_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_alias_bandwidth_limit_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_bandwidth_limit_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_bandwidth_limit_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_policy_bandwidth_limit_rule',
+ self.alt_target)
+
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_alias_bandwidth_limit_rule',
+ self.alt_target)
def test_create_policy_bandwidth_limit_rule(self):
self.assertRaises(
@@ -591,14 +609,19 @@ class AdminQosPacketRateLimitRuleTests(QosRulesAPITestCase):
self.context = self.project_admin_ctx
def test_get_policy_packet_rate_limit_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_packet_rate_limit_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_packet_rate_limit_rule',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_packet_rate_limit_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_packet_rate_limit_rule',
+ self.alt_target))
def test_create_policy_packet_rate_limit_rule(self):
self.assertTrue(
@@ -639,15 +662,20 @@ class ProjectMemberQosPacketRateLimitRuleTests(
self.context = self.project_member_ctx
def test_get_policy_packet_rate_limit_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_packet_rate_limit_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_policy_packet_rate_limit_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_packet_rate_limit_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_policy_packet_rate_limit_rule',
+ self.alt_target)
def test_create_policy_packet_rate_limit_rule(self):
self.assertRaises(
@@ -701,28 +729,35 @@ class SystemAdminQosDSCPMarkingRuleTests(QosRulesAPITestCase):
self.context = self.system_admin_ctx
def test_get_policy_dscp_marking_rule(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_dscp_marking_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_dscp_marking_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_dscp_marking_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_dscp_marking_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_dscp_marking_rule',
+ self.target)
+
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_dscp_marking_rule',
+ self.target)
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_dscp_marking_rule',
+ self.alt_target)
+
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_dscp_marking_rule',
+ self.alt_target)
def test_create_policy_dscp_marking_rule(self):
self.assertRaises(
@@ -806,24 +841,29 @@ class AdminQosDSCPMarkingRuleTests(QosRulesAPITestCase):
self.context = self.project_admin_ctx
def test_get_policy_dscp_marking_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_dscp_marking_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_dscp_marking_rule',
- self.alt_target))
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_dscp_marking_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_dscp_marking_rule',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_dscp_marking_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_dscp_marking_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_dscp_marking_rule',
+ self.alt_target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_dscp_marking_rule',
+ self.alt_target))
def test_create_policy_dscp_marking_rule(self):
self.assertTrue(
@@ -884,26 +924,31 @@ class ProjectMemberQosDSCPMarkingRuleTests(
self.context = self.project_member_ctx
def test_get_policy_dscp_marking_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_dscp_marking_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_policy_dscp_marking_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_dscp_marking_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_alias_dscp_marking_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_dscp_marking_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_dscp_marking_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_policy_dscp_marking_rule',
+ self.alt_target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_alias_dscp_marking_rule',
+ self.alt_target)
def test_create_policy_dscp_marking_rule(self):
self.assertRaises(
@@ -981,28 +1026,33 @@ class SystemAdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase):
self.context = self.system_admin_ctx
def test_get_policy_minimum_bandwidth_rule(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_minimum_bandwidth_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_minimum_bandwidth_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_minimum_bandwidth_rule',
+ self.target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_minimum_bandwidth_rule',
+ self.target)
- # And the same for aliases
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_minimum_bandwidth_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_minimum_bandwidth_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_minimum_bandwidth_rule',
+ self.alt_target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_minimum_bandwidth_rule',
+ self.alt_target)
def test_create_policy_minimum_bandwidth_rule(self):
self.assertRaises(
@@ -1088,24 +1138,29 @@ class AdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase):
self.context = self.project_admin_ctx
def test_get_policy_minimum_bandwidth_rule(self):
- self.assertTrue(
- policy.enforce(
- self.context, 'get_policy_minimum_bandwidth_rule',
- self.target))
- self.assertTrue(
- policy.enforce(
- self.context, 'get_policy_minimum_bandwidth_rule',
- self.alt_target))
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(
- self.context, 'get_alias_minimum_bandwidth_rule',
- self.target))
- self.assertTrue(
- policy.enforce(
- self.context, 'get_alias_minimum_bandwidth_rule',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(
+ self.context, 'get_policy_minimum_bandwidth_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(
+ self.context, 'get_alias_minimum_bandwidth_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertTrue(
+ policy.enforce(
+ self.context, 'get_policy_minimum_bandwidth_rule',
+ self.alt_target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(
+ self.context, 'get_alias_minimum_bandwidth_rule',
+ self.alt_target))
def test_create_policy_minimum_bandwidth_rule(self):
self.assertTrue(
@@ -1166,26 +1221,31 @@ class ProjectMemberQosMinimumBandwidthRuleTests(
self.context = self.project_member_ctx
def test_get_policy_minimum_bandwidth_rule(self):
- self.assertTrue(
- policy.enforce(
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(
+ self.context, 'get_policy_minimum_bandwidth_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(
+ self.context, 'get_alias_minimum_bandwidth_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
self.context, 'get_policy_minimum_bandwidth_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_policy_minimum_bandwidth_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(
+ self.alt_target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
self.context, 'get_alias_minimum_bandwidth_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_alias_minimum_bandwidth_rule',
- self.alt_target)
+ self.alt_target)
def test_create_policy_minimum_bandwidth_rule(self):
self.assertRaises(
@@ -1263,28 +1323,33 @@ class SystemAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase):
self.context = self.system_admin_ctx
def test_get_policy_minimum_packet_rate_rule(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_minimum_packet_rate_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_policy_minimum_packet_rate_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_minimum_packet_rate_rule',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_alias_minimum_packet_rate_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_minimum_packet_rate_rule',
+ self.target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_minimum_packet_rate_rule',
+ self.target)
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_policy_minimum_packet_rate_rule',
+ self.alt_target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_alias_minimum_packet_rate_rule',
+ self.alt_target)
def test_create_policy_minimum_packet_rate_rule(self):
self.assertRaises(
@@ -1370,24 +1435,29 @@ class AdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase):
self.context = self.project_admin_ctx
def test_get_policy_minimum_packet_rate_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_minimum_packet_rate_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_minimum_packet_rate_rule',
- self.alt_target))
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_minimum_packet_rate_rule',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_minimum_packet_rate_rule',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_minimum_packet_rate_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_minimum_packet_rate_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_minimum_packet_rate_rule',
+ self.alt_target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_minimum_packet_rate_rule',
+ self.alt_target))
def test_create_policy_minimum_packet_rate_rule(self):
self.assertTrue(
@@ -1438,26 +1508,31 @@ class ProjectMemberQosMinimumPacketRateRuleTests(
self.context = self.project_member_ctx
def test_get_policy_minimum_packet_rate_rule(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_policy_minimum_packet_rate_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_policy_minimum_packet_rate_rule',
- self.alt_target)
-
- # And the same for aliases
- self.assertTrue(
- policy.enforce(self.context,
- 'get_alias_minimum_packet_rate_rule',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_alias_minimum_packet_rate_rule',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.qos_policy):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_policy_minimum_packet_rate_rule',
+ self.target))
+ # And the same for aliases
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_alias_minimum_packet_rate_rule',
+ self.target))
+
+ with mock.patch.object(self.plugin_mock, "get_policy",
+ return_value=self.alt_qos_policy):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_policy_minimum_packet_rate_rule',
+ self.alt_target)
+ # And the same for aliases
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_alias_minimum_packet_rate_rule',
+ self.alt_target)
def test_create_policy_minimum_packet_rate_rule(self):
self.assertRaises(