summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-01-21 08:36:54 +0000
committerGerrit Code Review <review@openstack.org>2014-01-21 08:36:54 +0000
commit43cd1af5ae0257fadabc84886af969b084320a9f (patch)
treeaf962c96cbd1243f453f8f33e2a24edee4f8fa58
parent8e0118fc0f8f4059974467b4a30c89f38109a80f (diff)
parent829a2349312c2c294df22c55c76fb0b0afda200f (diff)
downloadkeystone-43cd1af5ae0257fadabc84886af969b084320a9f.tar.gz
Merge "LDAP Assignment does not support grant v3 API"
-rw-r--r--keystone/assignment/backends/ldap.py176
-rw-r--r--keystone/tests/test_backend_ldap.py180
2 files changed, 323 insertions, 33 deletions
diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py
index 5985cb45f..ea75a2d77 100644
--- a/keystone/assignment/backends/ldap.py
+++ b/keystone/assignment/backends/ldap.py
@@ -91,13 +91,35 @@ class Assignment(assignment.Driver):
(self.project._id_to_dn(tenant_id))
if self.user._dn_to_id(a.user_dn) == user_id]
+ def _get_roles_for_group_and_project(group_id, project_id):
+ self.identity_api.get_group(group_id)
+ self.get_project(project_id)
+ group_dn = self.group._id_to_dn(group_id)
+ # NOTE(marcos-fermin-lobo): In Active Directory, for functions
+ # such as "self.role.get_role_assignments", it returns
+ # the key "CN" or "OU" in uppercase.
+ # The group_dn var has "CN" and "OU" in lowercase.
+ # For this reason, it is necessary to use the "upper()"
+ # function so both are consistent.
+ return [self.role._dn_to_id(a.role_dn)
+ for a in self.role.get_role_assignments
+ (self.project._id_to_dn(project_id))
+ if a.user_dn.upper() == group_dn.upper()]
+
if domain_id is not None:
msg = 'Domain metadata not supported by LDAP'
raise exception.NotImplemented(message=msg)
- if tenant_id is None or user_id is None:
+ if group_id is None and user_id is None:
return {}
- metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id)
+ if tenant_id is None:
+ return {}
+ if user_id is None:
+ metadata_ref = _get_roles_for_group_and_project(group_id,
+ tenant_id)
+ else:
+ metadata_ref = _get_roles_for_just_user_and_project(user_id,
+ tenant_id)
if not metadata_ref:
return {}
return {'roles': [self._role_to_dict(r, False) for r in metadata_ref]}
@@ -145,10 +167,21 @@ class Assignment(assignment.Driver):
role_dn = self._subrole_id_to_dn(role_id, tenant_id)
self.role.add_user(role_id, role_dn, user_dn, user_id, tenant_id)
tenant_dn = self.project._id_to_dn(tenant_id)
- return UserRoleAssociation(
- role_dn=role_dn,
- user_dn=user_dn,
- tenant_dn=tenant_dn)
+ return UserRoleAssociation(role_dn=role_dn,
+ user_dn=user_dn,
+ tenant_dn=tenant_dn)
+
+ def _add_role_to_group_and_project(self, group_id, tenant_id, role_id):
+ self.identity_api.get_group(group_id)
+ self.get_project(tenant_id)
+ self.get_role(role_id)
+ group_dn = self.group._id_to_dn(group_id)
+ role_dn = self._subrole_id_to_dn(role_id, tenant_id)
+ self.role.add_user(role_id, role_dn, group_dn, group_id, tenant_id)
+ tenant_dn = self.project._id_to_dn(tenant_id)
+ return GroupRoleAssociation(group_dn=group_dn,
+ role_dn=role_dn,
+ tenant_dn=tenant_dn)
def _create_metadata(self, user_id, tenant_id, metadata):
return {}
@@ -190,6 +223,14 @@ class Assignment(assignment.Driver):
self.project._id_to_dn(tenant_id),
user_id, role_id)
+ def _remove_role_from_group_and_project(self, group_id, tenant_id,
+ role_id):
+ role_dn = self._subrole_id_to_dn(role_id, tenant_id)
+ return self.role.delete_user(role_dn,
+ self.group._id_to_dn(group_id),
+ self.project._id_to_dn(tenant_id),
+ group_id, role_id)
+
def update_role(self, role_id, role):
self.get_role(role_id)
return self.role.update(role_id, role)
@@ -255,28 +296,112 @@ class Assignment(assignment.Driver):
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
- raise exception.NotImplemented()
+ self.get_role(role_id)
+
+ if domain_id:
+ self.get_domain(domain_id)
+ if project_id:
+ self.get_project(project_id)
+
+ if project_id and inherited_to_projects:
+ msg = _('Inherited roles can only be assigned to domains')
+ raise exception.Conflict(type='role grant', details=msg)
+
+ try:
+ metadata_ref = self._get_metadata(user_id, project_id,
+ domain_id, group_id)
+ except exception.MetadataNotFound:
+ metadata_ref = {}
+
+ if user_id is None:
+ metadata_ref['roles'] = self._add_role_to_group_and_project(
+ group_id, project_id, role_id)
+ else:
+ metadata_ref['roles'] = self.add_role_to_user_and_project(
+ user_id, project_id, role_id)
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
- raise exception.NotImplemented()
+ role_ref = self.get_role(role_id)
+
+ if domain_id:
+ self.get_domain(domain_id)
+ if project_id:
+ self.get_project(project_id)
+
+ try:
+ metadata_ref = self._get_metadata(user_id, project_id,
+ domain_id, group_id)
+ except exception.MetadataNotFound:
+ metadata_ref = {}
+ role_ids = set(self._roles_from_role_dicts(
+ metadata_ref.get('roles', []), inherited_to_projects))
+ if role_id not in role_ids:
+ raise exception.RoleNotFound(role_id=role_id)
+ return role_ref
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
- raise exception.NotImplemented()
+ if user_id:
+ self.identity_api.get_user(user_id)
+ if group_id:
+ self.identity_api.get_group(group_id)
+
+ self.get_role(role_id)
+
+ if domain_id:
+ self.get_domain(domain_id)
+ if project_id:
+ self.get_project(project_id)
+
+ try:
+ metadata_ref = self._get_metadata(user_id, project_id,
+ domain_id, group_id)
+ except exception.MetadataNotFound:
+ metadata_ref = {}
+
+ try:
+ if user_id is None:
+ metadata_ref['roles'] = (
+ self._remove_role_from_group_and_project(
+ group_id, project_id, role_id))
+ else:
+ metadata_ref['roles'] = self.remove_role_from_user_and_project(
+ user_id, project_id, role_id)
+ except KeyError:
+ raise exception.RoleNotFound(role_id=role_id)
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
- raise exception.NotImplemented()
+ if domain_id:
+ self.get_domain(domain_id)
+ if project_id:
+ self.get_project(project_id)
+
+ try:
+ metadata_ref = self._get_metadata(user_id, project_id,
+ domain_id, group_id)
+ except exception.MetadataNotFound:
+ metadata_ref = {}
+
+ return [self.get_role(x) for x in
+ self._roles_from_role_dicts(metadata_ref.get('roles', []),
+ inherited_to_projects)]
def get_domain_by_name(self, domain_name):
raise exception.NotImplemented()
def list_role_assignments(self):
- raise exception.NotImplemented()
+ role_assignments = []
+ for a in self.role.list_role_assignments(self.project.tree_dn):
+ assignment = {'role_id': self.role._dn_to_id(a.role_dn),
+ 'user_id': self.user._dn_to_id(a.user_dn),
+ 'project_id': self.project._dn_to_id(a.project_dn)}
+ role_assignments.append(assignment)
+ return role_assignments
# TODO(termie): turn this into a data object and move logic to driver
@@ -551,3 +676,32 @@ class RoleApi(common_ldap.BaseLdap):
finally:
conn.unbind_s()
super(RoleApi, self).delete(role_id)
+
+ def list_role_assignments(self, project_tree_dn):
+ """Returns a list of all the role assignments linked to project_tree_dn
+ attribute.
+ """
+ conn = self.get_connection()
+ query = '(objectClass=%s)' % (self.object_class)
+ try:
+ roles = conn.search_s(project_tree_dn,
+ ldap.SCOPE_SUBTREE,
+ query)
+ except ldap.NO_SUCH_OBJECT:
+ return []
+ finally:
+ conn.unbind_s()
+
+ res = []
+ for role_dn, role in roles:
+ tenant = ldap.dn.str2dn(role_dn)
+ tenant.pop(0)
+ # It obtains the tenant DN to construct the UserRoleAssociation
+ # object.
+ tenant_dn = ldap.dn.dn2str(tenant)
+ for user_dn in role[self.member_attribute]:
+ res.append(UserRoleAssociation(
+ user_dn=user_dn,
+ role_dn=role_dn,
+ tenant_dn=tenant_dn))
+ return res
diff --git a/keystone/tests/test_backend_ldap.py b/keystone/tests/test_backend_ldap.py
index fc3a5ebe2..f94877087 100644
--- a/keystone/tests/test_backend_ldap.py
+++ b/keystone/tests/test_backend_ldap.py
@@ -124,20 +124,67 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.identity_api.get_user,
self.user_foo['id'])
- def test_get_role_grant_by_user_and_project(self):
- self.skipTest('Blocked by bug 1101287')
-
- def test_get_role_grants_for_user_and_project_404(self):
- self.skipTest('Blocked by bug 1101287')
-
- def test_add_role_grant_to_user_and_project_404(self):
- self.skipTest('Blocked by bug 1101287')
-
def test_remove_role_grant_from_user_and_project(self):
- self.skipTest('Blocked by bug 1101287')
+ self.assignment_api.create_grant(user_id=self.user_foo['id'],
+ project_id=self.tenant_baz['id'],
+ role_id='member')
+ roles_ref = self.assignment_api.list_grants(
+ user_id=self.user_foo['id'],
+ project_id=self.tenant_baz['id'])
+ self.assertDictEqual(roles_ref[0], self.role_member)
+
+ self.assignment_api.delete_grant(user_id=self.user_foo['id'],
+ project_id=self.tenant_baz['id'],
+ role_id='member')
+ roles_ref = self.assignment_api.list_grants(
+ user_id=self.user_foo['id'],
+ project_id=self.tenant_baz['id'])
+ self.assertEqual(len(roles_ref), 0)
+ self.assertRaises(exception.NotFound,
+ self.assignment_api.delete_grant,
+ user_id=self.user_foo['id'],
+ project_id=self.tenant_baz['id'],
+ role_id='member')
def test_get_and_remove_role_grant_by_group_and_project(self):
- self.skipTest('Blocked by bug 1101287')
+ new_domain = self._get_domain_fixture()
+ new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
+ 'name': uuid.uuid4().hex}
+ self.identity_api.create_group(new_group['id'], new_group)
+ new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
+ 'enabled': True,
+ 'domain_id': new_domain['id']}
+ self.identity_api.create_user(new_user['id'], new_user)
+ self.identity_api.add_user_to_group(new_user['id'],
+ new_group['id'])
+
+ roles_ref = self.assignment_api.list_grants(
+ group_id=new_group['id'],
+ project_id=self.tenant_bar['id'])
+ self.assertEqual(roles_ref, [])
+ self.assertEqual(len(roles_ref), 0)
+
+ self.assignment_api.create_grant(group_id=new_group['id'],
+ project_id=self.tenant_bar['id'],
+ role_id='member')
+ roles_ref = self.assignment_api.list_grants(
+ group_id=new_group['id'],
+ project_id=self.tenant_bar['id'])
+ self.assertNotEmpty(roles_ref)
+ self.assertDictEqual(roles_ref[0], self.role_member)
+
+ self.assignment_api.delete_grant(group_id=new_group['id'],
+ project_id=self.tenant_bar['id'],
+ role_id='member')
+ roles_ref = self.assignment_api.list_grants(
+ group_id=new_group['id'],
+ project_id=self.tenant_bar['id'])
+ self.assertEqual(len(roles_ref), 0)
+ self.assertRaises(exception.NotFound,
+ self.assignment_api.delete_grant,
+ group_id=new_group['id'],
+ project_id=self.tenant_bar['id'],
+ role_id='member')
def test_delete_user_grant_no_user(self):
self.skipTest('Blocked by bug 1101287')
@@ -179,10 +226,61 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.skipTest('N/A: LDAP does not support multiple domains')
def test_list_projects_for_user(self):
- self.skipTest('Blocked by bug 1101287')
+ domain = self._get_domain_fixture()
+ user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex, 'domain_id': domain['id'],
+ 'enabled': True}
+ self.identity_api.create_user(user1['id'], user1)
+ user_projects = self.assignment_api.list_projects_for_user(user1['id'])
+ self.assertEqual(len(user_projects), 0)
+ self.assignment_api.create_grant(user_id=user1['id'],
+ project_id=self.tenant_bar['id'],
+ role_id=self.role_member['id'])
+ self.assignment_api.create_grant(user_id=user1['id'],
+ project_id=self.tenant_baz['id'],
+ role_id=self.role_member['id'])
+ user_projects = self.assignment_api.list_projects_for_user(user1['id'])
+ self.assertEqual(len(user_projects), 2)
def test_list_projects_for_user_with_grants(self):
- self.skipTest('Blocked by bug 1221805')
+ domain = self._get_domain_fixture()
+ new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
+ 'password': uuid.uuid4().hex, 'enabled': True,
+ 'domain_id': domain['id']}
+ self.identity_api.create_user(new_user['id'], new_user)
+
+ group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id']}
+ self.identity_api.create_group(group1['id'], group1)
+ group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id']}
+ self.identity_api.create_group(group2['id'], group2)
+
+ project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id']}
+ self.assignment_api.create_project(project1['id'], project1)
+ project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id']}
+ self.assignment_api.create_project(project2['id'], project2)
+
+ self.identity_api.add_user_to_group(new_user['id'],
+ group1['id'])
+ self.identity_api.add_user_to_group(new_user['id'],
+ group2['id'])
+
+ self.assignment_api.create_grant(user_id=new_user['id'],
+ project_id=self.tenant_bar['id'],
+ role_id=self.role_member['id'])
+ self.assignment_api.create_grant(user_id=new_user['id'],
+ project_id=project1['id'],
+ role_id=self.role_admin['id'])
+ self.assignment_api.create_grant(group_id=group2['id'],
+ project_id=project2['id'],
+ role_id=self.role_admin['id'])
+
+ user_projects = self.assignment_api.list_projects_for_user(
+ new_user['id'])
+ self.assertEqual(len(user_projects), 2)
def test_create_duplicate_user_name_in_different_domains(self):
self.skipTest('Blocked by bug 1101276')
@@ -217,7 +315,33 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.skipTest('N/A: LDAP does not support multiple domains')
def test_list_role_assignments_unfiltered(self):
- self.skipTest('Blocked by bug 1221805')
+ new_domain = self._get_domain_fixture()
+ new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex, 'enabled': True,
+ 'domain_id': new_domain['id']}
+ self.identity_api.create_user(new_user['id'],
+ new_user)
+ new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
+ 'name': uuid.uuid4().hex}
+ self.identity_api.create_group(new_group['id'], new_group)
+ new_project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': new_domain['id']}
+ self.assignment_api.create_project(new_project['id'], new_project)
+
+ # First check how many role grant already exist
+ existing_assignments = len(self.assignment_api.list_role_assignments())
+
+ self.assignment_api.create_grant(user_id=new_user['id'],
+ project_id=new_project['id'],
+ role_id='other')
+ self.assignment_api.create_grant(group_id=new_group['id'],
+ project_id=new_project['id'],
+ role_id='admin')
+
+ # Read back the list of assignments - check it is gone up by 2
+ after_assignments = len(self.assignment_api.list_role_assignments())
+ self.assertEqual(after_assignments, existing_assignments + 2)
def test_list_role_assignments_bad_role(self):
self.skipTest('Blocked by bug 1221805')
@@ -920,16 +1044,10 @@ class LDAPIdentity(tests.TestCase, BaseLDAPIdentity):
'N/A: LDAP does not support multiple domains')
def test_create_grant_no_user(self):
- # The LDAP assignment backend doesn't implement create_grant.
- self.assertRaises(
- exception.NotImplemented,
- super(BaseLDAPIdentity, self).test_create_grant_no_user)
+ self.skipTest('Blocked by bug 1101287')
def test_create_grant_no_group(self):
- # The LDAP assignment backend doesn't implement create_grant.
- self.assertRaises(
- exception.NotImplemented,
- super(BaseLDAPIdentity, self).test_create_grant_no_group)
+ self.skipTest('Blocked by bug 1101287')
class LDAPIdentityEnabledEmulation(LDAPIdentity):
@@ -1044,6 +1162,15 @@ class LdapIdentitySqlAssignment(sql.Base, tests.TestCase, BaseLDAPIdentity):
self.skipTest(
'N/A: Not part of SQL backend')
+ def test_add_role_grant_to_user_and_project_404(self):
+ self.skipTest('Blocked by bug 1101287')
+
+ def test_get_role_grants_for_user_and_project_404(self):
+ self.skipTest('Blocked by bug 1101287')
+
+ def test_list_projects_for_user_with_grants(self):
+ self.skipTest('Blocked by bug 1221805')
+
class MultiLDAPandSQLIdentity(sql.Base, tests.TestCase, BaseLDAPIdentity):
"""Class to test common SQL plus individual LDAP backends.
@@ -1274,3 +1401,12 @@ class MultiLDAPandSQLIdentity(sql.Base, tests.TestCase, BaseLDAPIdentity):
self.assertFalse(conf.identity.domain_specific_drivers_enabled)
# ..and make sure a domain-specifc options is also set
self.assertEqual(conf.ldap.url, 'fake://memory1')
+
+ def test_add_role_grant_to_user_and_project_404(self):
+ self.skipTest('Blocked by bug 1101287')
+
+ def test_get_role_grants_for_user_and_project_404(self):
+ self.skipTest('Blocked by bug 1101287')
+
+ def test_list_projects_for_user_with_grants(self):
+ self.skipTest('Blocked by bug 1221805')