diff options
author | Colleen Murphy <colleen.murphy@suse.de> | 2019-03-18 14:20:15 +0100 |
---|---|---|
committer | Colleen Murphy <colleen.murphy@suse.de> | 2019-03-27 17:15:00 +0100 |
commit | be452fee80fabe252b2dae3be76c1d46fdd857e4 (patch) | |
tree | 9b8d3b15b968fc229313e8d5e15c2306c5bdfa56 /keystone | |
parent | 947e0a2e394fe47047ca1568e3a6f8b19b3c1f78 (diff) | |
download | keystone-be452fee80fabe252b2dae3be76c1d46fdd857e4.tar.gz |
Add domain scope support for group policies
This commit adds support for the domain scope type for the group API
policies. It defines appropriate policies for the reader, member, and
admin role and adds tests for each case.
Change-Id: Iaff3c0e45423ef427ef1458250c402c44be4b1d6
Closes-bug: #1808859
Partial-Bug: #968696
Diffstat (limited to 'keystone')
-rw-r--r-- | keystone/api/groups.py | 36 | ||||
-rw-r--r-- | keystone/api/users.py | 21 | ||||
-rw-r--r-- | keystone/common/policies/group.py | 76 | ||||
-rw-r--r-- | keystone/tests/unit/protection/v3/test_groups.py | 642 |
4 files changed, 735 insertions, 40 deletions
diff --git a/keystone/api/groups.py b/keystone/api/groups.py index ecb7bf2ba..483de9b4c 100644 --- a/keystone/api/groups.py +++ b/keystone/api/groups.py @@ -21,12 +21,14 @@ from keystone.common import json_home from keystone.common import provider_api from keystone.common import rbac_enforcer from keystone.common import validation +import keystone.conf from keystone import exception from keystone.identity import schema from keystone import notifications from keystone.server import flask as ks_flask +CONF = keystone.conf.CONF ENFORCER = rbac_enforcer.RBACEnforcer PROVIDERS = provider_api.ProviderAPIs @@ -73,11 +75,21 @@ class GroupsResource(ks_flask.ResourceBase): GET/HEAD /groups """ filters = ['domain_id', 'name'] - ENFORCER.enforce_call(action='identity:list_groups', filters=filters) + target = None + if self.oslo_context.domain_id: + target = {'group': {'domain_id': self.oslo_context.domain_id}} + ENFORCER.enforce_call(action='identity:list_groups', filters=filters, + target_attr=target) hints = self.build_driver_hints(filters) domain = self._get_domain_id_for_list_request() refs = PROVIDERS.identity_api.list_groups(domain_scope=domain, hints=hints) + if self.oslo_context.domain_id: + filtered_refs = [] + for ref in refs: + if ref['domain_id'] == target['group']['domain_id']: + filtered_refs.append(ref) + refs = filtered_refs return self.wrap_collection(refs, hints=hints) def post(self): @@ -85,8 +97,11 @@ class GroupsResource(ks_flask.ResourceBase): POST /groups """ - ENFORCER.enforce_call(action='identity:create_group') group = self.request_body_json.get('group', {}) + target = {'group': group} + ENFORCER.enforce_call( + action='identity:create_group', target_attr=target + ) validation.lazy_validate(schema.group_create, group) group = self._normalize_dict(group) group = self._normalize_domain_id(group) @@ -99,7 +114,10 @@ class GroupsResource(ks_flask.ResourceBase): PATCH /groups/{group_id} """ - ENFORCER.enforce_call(action='identity:update_group') + ENFORCER.enforce_call( + action='identity:update_group', + build_target=_build_group_target_enforcement + ) group = self.request_body_json.get('group', {}) validation.lazy_validate(schema.group_update, group) self._require_matching_id(group) @@ -118,16 +136,16 @@ class GroupsResource(ks_flask.ResourceBase): return None, http_client.NO_CONTENT -class GroupUsersResource(flask_restful.Resource): +class GroupUsersResource(ks_flask.ResourceBase): def get(self, group_id): """Get list of users in group. GET/HEAD /groups/{group_id}/users """ filters = ['domain_id', 'enabled', 'name', 'password_expires_at'] - target = {} + target = None try: - target['group'] = PROVIDERS.identity_api.get_group(group_id) + target = {'group': PROVIDERS.identity_api.get_group(group_id)} except exception.GroupNotFound: # NOTE(morgan): If we have an issue populating the group # data, leage target empty. This is the safest route and does not @@ -138,6 +156,12 @@ class GroupUsersResource(flask_restful.Resource): hints = ks_flask.ResourceBase.build_driver_hints(filters) refs = PROVIDERS.identity_api.list_users_in_group( group_id, hints=hints) + if (self.oslo_context.domain_id): + filtered_refs = [] + for ref in refs: + if ref['domain_id'] == self.oslo_context.domain_id: + filtered_refs.append(ref) + refs = filtered_refs return ks_flask.ResourceBase.wrap_collection( refs, hints=hints, collection_name='users') diff --git a/keystone/api/users.py b/keystone/api/users.py index cf2bf7a40..97626da9b 100644 --- a/keystone/api/users.py +++ b/keystone/api/users.py @@ -98,6 +98,10 @@ def _build_user_target_enforcement(): target['user'] = PROVIDERS.identity_api.get_user( flask.request.view_args.get('user_id') ) + if flask.request.view_args.get('group_id'): + target['group'] = PROVIDERS.identity_api.get_group( + flask.request.view_args.get('group_id') + ) except ks_exception.NotFound: # nosec # Defer existence in the event the user doesn't exist, we'll # check this later anyway. @@ -285,10 +289,15 @@ class UserGroupsResource(ks_flask.ResourceBase): @staticmethod def _built_target_attr_enforcement(): - ref = {} + ref = None if flask.request.view_args: - ref['user'] = PROVIDERS.identity_api.get_user( - flask.request.view_args.get('user_id')) + try: + ref = {'user': PROVIDERS.identity_api.get_user( + flask.request.view_args.get('user_id'))} + except ks_exception.NotFound: # nosec + # Defer existence in the event the user doesn't exist, we'll + # check this later anyway. + pass return ref def get(self, user_id): @@ -303,6 +312,12 @@ class UserGroupsResource(ks_flask.ResourceBase): filters=filters) refs = PROVIDERS.identity_api.list_groups_for_user(user_id=user_id, hints=hints) + if (self.oslo_context.domain_id): + filtered_refs = [] + for ref in refs: + if ref['domain_id'] == self.oslo_context.domain_id: + filtered_refs.append(ref) + refs = filtered_refs return self.wrap_collection(refs, hints=hints) diff --git a/keystone/common/policies/group.py b/keystone/common/policies/group.py index 4e19d41f4..63324d41e 100644 --- a/keystone/common/policies/group.py +++ b/keystone/common/policies/group.py @@ -15,8 +15,34 @@ from oslo_policy import policy from keystone.common.policies import base -SYSTEM_READER_OR_OWNER = ( - '(role:reader and system_scope:all) or user_id:%(user_id)s' +SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER = ( + '(role:reader and system_scope:all) or ' + '(role:reader and domain_id:%(target.user.domain_id)s) or ' + 'user_id:%(user_id)s' +) + +SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER = ( + '(role:reader and system_scope:all) or ' + '(role:reader and ' + 'domain_id:%(target.group.domain_id)s and ' + 'domain_id:%(target.user.domain_id)s)' +) + +SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER = ( + '(role:admin and system_scope:all) or ' + '(role:admin and ' + 'domain_id:%(target.group.domain_id)s and ' + 'domain_id:%(target.user.domain_id)s)' +) + +SYSTEM_READER_OR_DOMAIN_READER = ( + '(role:reader and system_scope:all) or ' + '(role:reader and domain_id:%(target.group.domain_id)s)' +) + +SYSTEM_ADMIN_OR_DOMAIN_ADMIN = ( + '(role:admin and system_scope:all) or ' + '(role:admin and domain_id:%(target.group.domain_id)s)' ) DEPRECATED_REASON = """ @@ -70,14 +96,8 @@ deprecated_add_user_to_group = policy.DeprecatedRule( group_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'get_group', - check_str=base.SYSTEM_READER, - # FIXME(lbragstad): Groups have traditionally been a resource managed - # by system or cloud administrators. If, or when, keystone supports the - # ability for groups to be created or managed by project - # administrators, scope_types should also include 'project'. Until - # then, let's make sure these APIs are only accessible to system - # administrators. - scope_types=['system'], + check_str=SYSTEM_READER_OR_DOMAIN_READER, + scope_types=['system', 'domain'], description='Show group details.', operations=[{'path': '/v3/groups/{group_id}', 'method': 'GET'}, @@ -88,8 +108,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_groups', - check_str=base.SYSTEM_READER, - scope_types=['system'], + check_str=SYSTEM_READER_OR_DOMAIN_READER, + scope_types=['system', 'domain'], description='List groups.', operations=[{'path': '/v3/groups', 'method': 'GET'}, @@ -100,8 +120,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_groups_for_user', - check_str=SYSTEM_READER_OR_OWNER, - scope_types=['system', 'project'], + check_str=SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER, + scope_types=['system', 'domain', 'project'], description='List groups to which a user belongs.', operations=[{'path': '/v3/users/{user_id}/groups', 'method': 'GET'}, @@ -112,8 +132,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'create_group', - check_str=base.SYSTEM_ADMIN, - scope_types=['system'], + check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN, + scope_types=['system', 'domain'], description='Create group.', operations=[{'path': '/v3/groups', 'method': 'POST'}], @@ -122,8 +142,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'update_group', - check_str=base.SYSTEM_ADMIN, - scope_types=['system'], + check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN, + scope_types=['system', 'domain'], description='Update group.', operations=[{'path': '/v3/groups/{group_id}', 'method': 'PATCH'}], @@ -132,8 +152,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'delete_group', - check_str=base.SYSTEM_ADMIN, - scope_types=['system'], + check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN, + scope_types=['system', 'domain'], description='Delete group.', operations=[{'path': '/v3/groups/{group_id}', 'method': 'DELETE'}], @@ -142,8 +162,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_users_in_group', - check_str=base.SYSTEM_READER, - scope_types=['system'], + check_str=SYSTEM_READER_OR_DOMAIN_READER, + scope_types=['system', 'domain'], description='List members of a specific group.', operations=[{'path': '/v3/groups/{group_id}/users', 'method': 'GET'}, @@ -154,8 +174,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'remove_user_from_group', - check_str=base.SYSTEM_ADMIN, - scope_types=['system'], + check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER, + scope_types=['system', 'domain'], description='Remove user from group.', operations=[{'path': '/v3/groups/{group_id}/users/{user_id}', 'method': 'DELETE'}], @@ -164,8 +184,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'check_user_in_group', - check_str=base.SYSTEM_READER, - scope_types=['system'], + check_str=SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER, + scope_types=['system', 'domain'], description='Check whether a user is a member of a group.', operations=[{'path': '/v3/groups/{group_id}/users/{user_id}', 'method': 'HEAD'}, @@ -176,8 +196,8 @@ group_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'add_user_to_group', - check_str=base.SYSTEM_ADMIN, - scope_types=['system'], + check_str=SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER, + scope_types=['system', 'domain'], description='Add user to group.', operations=[{'path': '/v3/groups/{group_id}/users/{user_id}', 'method': 'PUT'}], diff --git a/keystone/tests/unit/protection/v3/test_groups.py b/keystone/tests/unit/protection/v3/test_groups.py index 0201d6060..faa3a9b4f 100644 --- a/keystone/tests/unit/protection/v3/test_groups.py +++ b/keystone/tests/unit/protection/v3/test_groups.py @@ -12,14 +12,17 @@ import uuid +from oslo_serialization import jsonutils from six.moves import http_client +from keystone.common.policies import group as gp from keystone.common import provider_api import keystone.conf from keystone.tests.common import auth as common_auth from keystone.tests import unit from keystone.tests.unit import base_classes from keystone.tests.unit import ksfixtures +from keystone.tests.unit.ksfixtures import temporaryfile CONF = keystone.conf.CONF PROVIDERS = provider_api.ProviderAPIs @@ -121,7 +124,7 @@ class _SystemUserGroupTests(object): ) -class _SystemMemberAndReaderGroupTests(object): +class _SystemAndDomainMemberAndReaderGroupTests(object): """Common default functionality for system readers and system members.""" def test_user_cannot_create_group(self): @@ -215,7 +218,7 @@ class _SystemMemberAndReaderGroupTests(object): class SystemReaderTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, _SystemUserGroupTests, - _SystemMemberAndReaderGroupTests): + _SystemAndDomainMemberAndReaderGroupTests): def setUp(self): super(SystemReaderTests, self).setUp() @@ -249,7 +252,7 @@ class SystemReaderTests(base_classes.TestCaseWithBootstrap, class SystemMemberTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, _SystemUserGroupTests, - _SystemMemberAndReaderGroupTests): + _SystemAndDomainMemberAndReaderGroupTests): def setUp(self): super(SystemMemberTests, self).setUp() @@ -385,6 +388,639 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap, ) +class _DomainUserGroupTests(object): + + def test_user_can_list_groups_in_domain(self): + # second domain + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + # one group in new domain + group1 = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + # one group in user's domain + group2 = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + + # user should only see one group + with self.test_client() as c: + r = c.get('/v3/groups', headers=self.headers) + self.assertEqual(1, len(r.json['groups'])) + self.assertNotIn(group1['id'], [g['id'] for g in r.json['groups']]) + self.assertEqual(group2['id'], r.json['groups'][0]['id']) + + def test_user_cannot_list_groups_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + with self.test_client() as c: + r = c.get('/v3/groups?domain_id=%s' % domain['id'], + headers=self.headers) + self.assertEqual(0, len(r.json['groups'])) + + def test_user_can_get_group_in_domain(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + r = c.get('/v3/groups/%s' % group['id'], + headers=self.headers) + self.assertEqual(group['id'], r.json['group']['id']) + + def test_user_cannot_get_group_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + with self.test_client() as c: + c.get('/v3/groups/%s' % group['id'], + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_get_non_existent_group_forbidden(self): + with self.test_client() as c: + c.get( + '/v3/groups/%s' % uuid.uuid4().hex, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_can_list_groups_in_domain_for_user_in_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + r = c.get('/v3/users/%s/groups' % user['id'], + headers=self.headers) + self.assertEqual(1, len(r.json['groups'])) + self.assertEqual(group['id'], r.json['groups'][0]['id']) + + def test_user_cannot_list_groups_in_own_domain_user_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.get('/v3/users/%s/groups' % user['id'], + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_list_groups_for_non_existent_user_forbidden(self): + with self.test_client() as c: + c.get('/v3/users/%s/groups' % uuid.uuid4().hex, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_list_groups_in_other_domain_user_in_own_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + # one group in other domain + group1 = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + # one group in own domain + group2 = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group1['id']) + PROVIDERS.identity_api.add_user_to_group(user['id'], group2['id']) + with self.test_client() as c: + r = c.get('/v3/users/%s/groups' % user['id'], + headers=self.headers) + # only one group should be visible + self.assertEqual(1, len(r.json['groups'])) + self.assertEqual(group2['id'], r.json['groups'][0]['id']) + + def test_user_can_list_users_in_own_domain_for_group_in_own_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + r = c.get('/v3/groups/%s/users' % group['id'], + headers=self.headers) + self.assertEqual(1, len(r.json['users'])) + self.assertEqual(user['id'], r.json['users'][0]['id']) + + def test_user_cannot_list_users_in_other_domain_group_in_own_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + # one user in other domain + user1 = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + # one user in own domain + user2 = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user1['id'], group['id']) + PROVIDERS.identity_api.add_user_to_group(user2['id'], group['id']) + with self.test_client() as c: + r = c.get('/v3/groups/%s/users' % group['id'], + headers=self.headers) + # only one user should be visible + self.assertEqual(1, len(r.json['users'])) + self.assertEqual(user2['id'], r.json['users'][0]['id']) + + def test_user_cannot_list_users_in_own_domain_group_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.get('/v3/groups/%s/users' % group['id'], + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_list_users_in_non_existent_group_forbidden(self): + with self.test_client() as c: + c.get('/v3/groups/%s/users' % uuid.uuid4().hex, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_can_check_user_in_own_domain_group_in_own_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.head('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.NO_CONTENT) + c.get('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.NO_CONTENT) + + def test_user_cannot_check_user_in_other_domain_group_in_own_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.head('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + c.get('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + +class DomainReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _DomainUserGroupTests, + _SystemAndDomainMemberAndReaderGroupTests): + + def setUp(self): + super(DomainReaderTests, self).setUp() + self.loadapp() + self.policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.policy_file_name = self.policy_file.file_name + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=self.policy_file_name + ) + ) + self._override_policy() + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_admin = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + domain_id=self.domain_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, + password=domain_admin['password'], + domain_id=self.domain_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def _override_policy(self): + # TODO(cmurphy): Remove this once the deprecated policies in + # keystone.common.policies.group have been removed. This is only + # here to make sure we test the new policies instead of the deprecated + # ones. Oslo.policy will OR deprecated policies with new policies to + # maintain compatibility and give operators a chance to update + # permissions or update policies without breaking users. This will + # cause these specific tests to fail since we're trying to correct this + # broken behavior with better scope checking. + with open(self.policy_file_name, 'w') as f: + overridden_policies = { + 'identity:get_group': gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:list_groups': gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:list_groups_for_user': + gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER, + 'identity:list_users_in_group': + gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:check_user_in_group': + gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER + } + f.write(jsonutils.dumps(overridden_policies)) + + +class DomainMemberTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _DomainUserGroupTests, + _SystemAndDomainMemberAndReaderGroupTests): + + def setUp(self): + super(DomainMemberTests, self).setUp() + self.loadapp() + self.policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.policy_file_name = self.policy_file.file_name + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=self.policy_file_name + ) + ) + self._override_policy() + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_admin = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=self.user_id, + domain_id=self.domain_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, + password=domain_admin['password'], + domain_id=self.domain_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def _override_policy(self): + # TODO(cmurphy): Remove this once the deprecated policies in + # keystone.common.policies.group have been removed. This is only + # here to make sure we test the new policies instead of the deprecated + # ones. Oslo.policy will OR deprecated policies with new policies to + # maintain compatibility and give operators a chance to update + # permissions or update policies without breaking users. This will + # cause these specific tests to fail since we're trying to correct this + # broken behavior with better scope checking. + with open(self.policy_file_name, 'w') as f: + overridden_policies = { + 'identity:get_group': gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:list_groups': gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:list_groups_for_user': + gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER, + 'identity:list_users_in_group': + gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:check_user_in_group': + gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER + } + f.write(jsonutils.dumps(overridden_policies)) + + +class DomainAdminTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _DomainUserGroupTests): + + def setUp(self): + super(DomainAdminTests, self).setUp() + self.loadapp() + self.policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.policy_file_name = self.policy_file.file_name + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=self.policy_file_name + ) + ) + self._override_policy() + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_admin = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.admin_role_id, user_id=self.user_id, + domain_id=self.domain_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, + password=domain_admin['password'], + domain_id=self.domain_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def _override_policy(self): + # TODO(cmurphy): Remove this once the deprecated policies in + # keystone.common.policies.group have been removed. This is only + # here to make sure we test the new policies instead of the deprecated + # ones. Oslo.policy will OR deprecated policies with new policies to + # maintain compatibility and give operators a chance to update + # permissions or update policies without breaking users. This will + # cause these specific tests to fail since we're trying to correct this + # broken behavior with better scope checking. + with open(self.policy_file_name, 'w') as f: + overridden_policies = { + 'identity:get_group': gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:list_groups': gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:list_groups_for_user': + gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_USER_OR_OWNER, + 'identity:create_group': gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN, + 'identity:update_group': gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN, + 'identity:delete_group': gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN, + 'identity:list_users_in_group': + gp.SYSTEM_READER_OR_DOMAIN_READER, + 'identity:remove_user_from_group': + gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER, + 'identity:check_user_in_group': + gp.SYSTEM_READER_OR_DOMAIN_READER_FOR_TARGET_GROUP_USER, + 'identity:add_user_to_group': + gp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN_FOR_TARGET_GROUP_USER + } + f.write(jsonutils.dumps(overridden_policies)) + + def test_user_can_create_group_for_own_domain(self): + create = { + 'group': { + 'name': uuid.uuid4().hex, + 'domain_id': self.domain_id + } + } + + with self.test_client() as c: + c.post('/v3/groups', json=create, headers=self.headers) + + def test_user_cannot_create_group_for_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + create = { + 'group': { + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'] + } + } + + with self.test_client() as c: + c.post('/v3/groups', json=create, headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_can_update_group_in_own_domain(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + + update = {'group': {'description': uuid.uuid4().hex}} + with self.test_client() as c: + c.patch( + '/v3/groups/%s' % group['id'], json=update, + headers=self.headers) + + def test_user_cannot_update_group_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + update = {'group': {'description': uuid.uuid4().hex}} + with self.test_client() as c: + c.patch( + '/v3/groups/%s' % group['id'], json=update, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_can_delete_group_in_own_domain(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.delete( + '/v3/groups/%s' % group['id'], + headers=self.headers + ) + + def test_user_cannot_delete_group_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + with self.test_client() as c: + c.delete( + '/v3/groups/%s' % group['id'], + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_can_remove_user_in_own_domain_from_group_in_own_domain(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.delete('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers) + + def test_user_cannot_remove_user_other_domain_from_group_own_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.delete('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_remove_user_own_domain_from_group_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + with self.test_client() as c: + c.delete('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_remove_non_existent_user_from_group_forbidden(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.delete('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': uuid.uuid4().hex}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_remove_user_from_non_existent_group_forbidden(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.delete('/v3/groups/%(group)s/users/%(user)s' % { + 'group': uuid.uuid4().hex, 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_can_add_user_in_own_domain_to_group_in_own_domain(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.put('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers) + + def test_user_cannot_add_user_other_domain_to_group_own_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + with self.test_client() as c: + c.put('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_add_user_own_domain_to_group_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.put('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_add_non_existent_user_to_group_forbidden(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.put('/v3/groups/%(group)s/users/%(user)s' % { + 'group': group['id'], 'user': uuid.uuid4().hex}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_add_user_from_non_existent_group_forbidden(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + with self.test_client() as c: + c.put('/v3/groups/%(group)s/users/%(user)s' % { + 'group': uuid.uuid4().hex, 'user': user['id']}, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + class ProjectUserTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin): |