diff options
Diffstat (limited to 'keystone/tests/unit/test_v3_auth.py')
-rw-r--r-- | keystone/tests/unit/test_v3_auth.py | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index e710634d0..eb7ea0e29 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -19,8 +19,10 @@ import itertools import operator import re from unittest import mock +from urllib import parse import uuid +from cryptography.hazmat.primitives.serialization import Encoding import freezegun import http.client from oslo_serialization import jsonutils as json @@ -2645,6 +2647,187 @@ class TokenAPITests(object): r = self._validate_token(token, allow_expired=True) self.assertValidProjectScopedTokenResponse(r) + def _create_project_user(self): + new_domain_ref = unit.new_domain_ref() + PROVIDERS.resource_api.create_domain( + new_domain_ref['id'], new_domain_ref + ) + new_project_ref = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project( + new_project_ref['id'], new_project_ref + ) + new_user = unit.create_user(PROVIDERS.identity_api, + domain_id=new_domain_ref['id'], + project_id=new_project_ref['id']) + PROVIDERS.assignment_api.create_grant( + self.role['id'], + user_id=new_user['id'], + project_id=new_project_ref['id']) + return new_user, new_domain_ref, new_project_ref + + def _create_certificates(self, + root_dn=None, + server_dn=None, + client_dn=None): + root_subj = unit.create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='root' + ) + if root_dn: + root_subj = unit.update_dn(root_subj, root_dn) + + root_cert, root_key = unit.create_certificate(root_subj) + keystone_subj = unit.create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='keystone.local' + ) + if server_dn: + keystone_subj = unit.update_dn(keystone_subj, server_dn) + + ks_cert, ks_key = unit.create_certificate( + keystone_subj, ca=root_cert, ca_key=root_key) + client_subj = unit.create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='client' + ) + if client_dn: + client_subj = unit.update_dn(client_subj, client_dn) + + client_cert, client_key = unit.create_certificate( + client_subj, ca=root_cert, ca_key=root_key) + return root_cert, root_key, ks_cert, ks_key, client_cert, client_key + + def _get_cert_content(self, cert): + return cert.public_bytes(Encoding.PEM).decode('ascii') + + def _get_oauth2_access_token(self, client_id, client_cert_content, + expected_status=http.client.OK): + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + data = { + 'grant_type': 'client_credentials', + 'client_id': client_id + } + extra_environ = { + 'SSL_CLIENT_CERT': client_cert_content + } + data = parse.urlencode(data).encode() + resp = self.post( + '/OS-OAUTH2/token', + headers=headers, + noauth=True, + convert=False, + body=data, + environ=extra_environ, + expected_status=expected_status) + return resp + + def _create_mapping(self): + mapping = { + 'id': 'oauth2_mapping', + 'rules': [ + { + 'local': [ + { + 'user': { + 'name': '{0}', + 'id': '{1}', + 'email': '{2}', + 'domain': { + 'name': '{3}', + 'id': '{4}' + } + } + } + ], + 'remote': [ + { + 'type': 'SSL_CLIENT_SUBJECT_DN_CN' + }, + { + 'type': 'SSL_CLIENT_SUBJECT_DN_UID' + }, + { + 'type': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS' + }, + { + 'type': 'SSL_CLIENT_SUBJECT_DN_O' + }, + { + 'type': 'SSL_CLIENT_SUBJECT_DN_DC' + }, + { + 'type': 'SSL_CLIENT_ISSUER_DN_CN', + 'any_one_of': [ + 'root' + ] + } + ] + } + ] + } + PROVIDERS.federation_api.create_mapping(mapping['id'], mapping) + + def test_verify_oauth2_token_project_scope_ok(self): + cache_on_issue = CONF.token.cache_on_issue + caching = CONF.token.caching + self._create_mapping() + user, user_domain, _ = self._create_project_user() + + *_, client_cert, _ = self._create_certificates( + root_dn=unit.create_dn( + common_name='root' + ), + client_dn=unit.create_dn( + common_name=user['name'], + user_id=user['id'], + email_address=user['email'], + organization_name=user_domain['name'], + domain_component=user_domain['id'] + ) + ) + + cert_content = self._get_cert_content(client_cert) + CONF.token.cache_on_issue = False + CONF.token.caching = False + resp = self._get_oauth2_access_token(user['id'], cert_content) + + json_resp = json.loads(resp.body) + self.assertIn('access_token', json_resp) + self.assertEqual('Bearer', json_resp['token_type']) + self.assertEqual(3600, json_resp['expires_in']) + + verify_resp = self.get( + '/auth/tokens', + headers={ + 'X-Subject-Token': json_resp['access_token'], + 'X-Auth-Token': json_resp['access_token'] + }, + expected_status=http.client.OK) + self.assertIn('token', verify_resp.result) + self.assertIn('oauth2_credential', verify_resp.result['token']) + self.assertIn('roles', verify_resp.result['token']) + self.assertIn('project', verify_resp.result['token']) + self.assertIn('catalog', verify_resp.result['token']) + check_oauth2 = verify_resp.result['token']['oauth2_credential'] + self.assertEqual(utils.get_certificate_thumbprint(cert_content), + check_oauth2['x5t#S256']) + CONF.token.cache_on_issue = cache_on_issue + CONF.token.caching = caching + class TokenDataTests(object): """Test the data in specific token types.""" @@ -5577,6 +5760,21 @@ class ApplicationCredentialAuth(test_v3.RestfulTestCase): self.v3_create_token(auth_data, expected_status=http.client.UNAUTHORIZED) + def test_application_credential_expiration_limits_token_expiration(self): + expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1) + app_cred = self._make_app_cred(expires=expires_at) + app_cred_ref = self.app_cred_api.create_application_credential( + app_cred) + auth_data = self.build_authentication_request( + app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) + resp = self.v3_create_token(auth_data, + expected_status=http.client.CREATED) + token = resp.headers.get('X-Subject-Token') + future = datetime.datetime.utcnow() + datetime.timedelta(minutes=2) + with freezegun.freeze_time(future): + self._validate_token(token, + expected_status=http.client.UNAUTHORIZED) + def test_application_credential_fails_when_user_deleted(self): app_cred = self._make_app_cred() app_cred_ref = self.app_cred_api.create_application_credential( |