summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSlawek Kaplonski <skaplons@redhat.com>2023-05-09 12:54:28 +0200
committerSlawek Kaplonski <skaplons@redhat.com>2023-05-09 12:54:28 +0200
commit4edff4fe8dff102f13e3da0a000c03538755337d (patch)
tree475f0f6c7fb70df4066ad0a471055278c6599082
parentbe0dc09d52efd5e7236a33be552edb6644371cd0 (diff)
downloadneutron-4edff4fe8dff102f13e3da0a000c03538755337d.tar.gz
[S-RBAC] Fix new policies for FIP PFs APIs
During transition to the new secure RBAC API policies, we made mistake with policies for FIP PFs by defining them to be available for ADMIN_OR_PROJECT_MEMBER/READER or FIP owner. First, rule PROJECT_MEMBER/READER is not appropriate in this case as FIP PFs don't have tenant_id attribute at all and belongs to the owner of FIP always. Second issue was that any FIP owner, even with just READER role could possibly e.g. create port forwarding. To fix that, this patch changes those API policies to new rules: ADMIN_OR_PARENT_OWNER_READER ADMIN_OR_PARENT_OWNER_MEMBER Closes-Bug: #2018989 Change-Id: Ibff4c4f5b6d020fd598831a8a6e8ec0e2f559005
-rw-r--r--neutron/conf/policies/floatingip_port_forwarding.py16
-rw-r--r--neutron/tests/unit/conf/policies/test_floatingip_port_forwarding.py347
2 files changed, 209 insertions, 154 deletions
diff --git a/neutron/conf/policies/floatingip_port_forwarding.py b/neutron/conf/policies/floatingip_port_forwarding.py
index 2ca8c5ce91..685cb45174 100644
--- a/neutron/conf/policies/floatingip_port_forwarding.py
+++ b/neutron/conf/policies/floatingip_port_forwarding.py
@@ -30,9 +30,7 @@ RESOURCE_PATH = ('/floatingips/{floatingip_id}'
rules = [
policy.DocumentedRuleDefault(
name='create_floatingip_port_forwarding',
- check_str=neutron_policy.policy_or(
- base.ADMIN_OR_PROJECT_MEMBER,
- base.RULE_PARENT_OWNER),
+ check_str=base.ADMIN_OR_PARENT_OWNER_MEMBER,
scope_types=['project'],
description='Create a floating IP port forwarding',
operations=[
@@ -49,9 +47,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_floatingip_port_forwarding',
- check_str=neutron_policy.policy_or(
- base.ADMIN_OR_PROJECT_READER,
- base.RULE_PARENT_OWNER),
+ check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a floating IP port forwarding',
operations=[
@@ -72,9 +68,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_floatingip_port_forwarding',
- check_str=neutron_policy.policy_or(
- base.ADMIN_OR_PROJECT_MEMBER,
- base.RULE_PARENT_OWNER),
+ check_str=base.ADMIN_OR_PARENT_OWNER_MEMBER,
scope_types=['project'],
description='Update a floating IP port forwarding',
operations=[
@@ -91,9 +85,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_floatingip_port_forwarding',
- check_str=neutron_policy.policy_or(
- base.ADMIN_OR_PROJECT_MEMBER,
- base.RULE_PARENT_OWNER),
+ check_str=base.ADMIN_OR_PARENT_OWNER_MEMBER,
scope_types=['project'],
description='Delete a floating IP port forwarding',
operations=[
diff --git a/neutron/tests/unit/conf/policies/test_floatingip_port_forwarding.py b/neutron/tests/unit/conf/policies/test_floatingip_port_forwarding.py
index d72d61870e..ab1446e98d 100644
--- a/neutron/tests/unit/conf/policies/test_floatingip_port_forwarding.py
+++ b/neutron/tests/unit/conf/policies/test_floatingip_port_forwarding.py
@@ -28,16 +28,19 @@ class FloatingipPortForwardingAPITestCase(base.PolicyBaseTestCase):
super(FloatingipPortForwardingAPITestCase, self).setUp()
self.fip = {
'id': uuidutils.generate_uuid(),
+ 'tenant_id': self.project_id,
'project_id': self.project_id}
+ self.alt_fip = {
+ 'id': uuidutils.generate_uuid(),
+ 'tenant_id': self.alt_project_id,
+ 'project_id': self.alt_project_id}
self.target = {
- 'project_id': self.project_id,
'floatingip_id': self.fip['id'],
'ext_parent_floatingip_id': self.fip['id']}
self.alt_target = {
- 'project_id': self.alt_project_id,
- 'floatingip_id': self.fip['id'],
- 'ext_parent_floatingip_id': self.fip['id']}
+ 'floatingip_id': self.alt_fip['id'],
+ 'ext_parent_floatingip_id': self.alt_fip['id']}
self.plugin_mock = mock.Mock()
self.plugin_mock.get_floatingip.return_value = self.fip
@@ -53,52 +56,68 @@ class SystemAdminTests(FloatingipPortForwardingAPITestCase):
self.context = self.system_admin_ctx
def test_create_fip_pf(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'create_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'create_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'create_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'create_floatingip_port_forwarding',
+ self.alt_target)
def test_get_fip_pf(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'get_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'get_floatingip_port_forwarding',
+ self.alt_target)
def test_update_fip_pf(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'update_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'update_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'update_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'update_floatingip_port_forwarding',
+ self.alt_target)
def test_delete_fip_pf(self):
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'delete_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.InvalidScope,
- policy.enforce,
- self.context, 'delete_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'delete_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.InvalidScope,
+ policy.enforce,
+ self.context, 'delete_floatingip_port_forwarding',
+ self.alt_target)
class SystemMemberTests(SystemAdminTests):
@@ -122,44 +141,60 @@ class AdminTests(FloatingipPortForwardingAPITestCase):
self.context = self.project_admin_ctx
def test_create_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'create_floatingip_port_forwarding',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'create_floatingip_port_forwarding',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'create_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'create_floatingip_port_forwarding',
+ self.alt_target))
def test_get_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_floatingip_port_forwarding',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'get_floatingip_port_forwarding',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_floatingip_port_forwarding',
+ self.alt_target))
def test_update_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'update_floatingip_port_forwarding',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'update_floatingip_port_forwarding',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'update_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'update_floatingip_port_forwarding',
+ self.alt_target))
def test_delete_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'delete_floatingip_port_forwarding',
- self.target))
- self.assertTrue(
- policy.enforce(self.context,
- 'delete_floatingip_port_forwarding',
- self.alt_target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'delete_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'delete_floatingip_port_forwarding',
+ self.alt_target))
class ProjectMemberTests(AdminTests):
@@ -169,48 +204,64 @@ class ProjectMemberTests(AdminTests):
self.context = self.project_member_ctx
def test_create_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'create_floatingip_port_forwarding',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'create_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'create_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'create_floatingip_port_forwarding',
+ self.alt_target)
def test_get_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'get_floatingip_port_forwarding',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'get_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'get_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'get_floatingip_port_forwarding',
+ self.alt_target)
def test_update_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'update_floatingip_port_forwarding',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'update_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'update_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'update_floatingip_port_forwarding',
+ self.alt_target)
def test_delete_fip_pf(self):
- self.assertTrue(
- policy.enforce(self.context,
- 'delete_floatingip_port_forwarding',
- self.target))
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'delete_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertTrue(
+ policy.enforce(self.context,
+ 'delete_floatingip_port_forwarding',
+ self.target))
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'delete_floatingip_port_forwarding',
+ self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
@@ -220,37 +271,49 @@ class ProjectReaderTests(ProjectMemberTests):
self.context = self.project_reader_ctx
def test_create_fip_pf(self):
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'create_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'create_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'create_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'create_floatingip_port_forwarding',
+ self.alt_target)
def test_update_fip_pf(self):
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'update_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'update_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'update_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'update_floatingip_port_forwarding',
+ self.alt_target)
def test_delete_fip_pf(self):
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'delete_floatingip_port_forwarding',
- self.target)
- self.assertRaises(
- base_policy.PolicyNotAuthorized,
- policy.enforce,
- self.context, 'delete_floatingip_port_forwarding',
- self.alt_target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'delete_floatingip_port_forwarding',
+ self.target)
+ with mock.patch.object(self.plugin_mock, 'get_floatingip',
+ return_value=self.alt_fip):
+ self.assertRaises(
+ base_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context, 'delete_floatingip_port_forwarding',
+ self.alt_target)