diff options
Diffstat (limited to 'keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py')
-rw-r--r-- | keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py | 326 |
1 files changed, 0 insertions, 326 deletions
diff --git a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py index 9051d71..c62b3cc 100644 --- a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py +++ b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py @@ -35,7 +35,6 @@ from oslo_utils import timeutils import pbr.version import six import testresources -import testtools from testtools import matchers import webob import webob.dec @@ -44,7 +43,6 @@ from keystonemiddleware import auth_token from keystonemiddleware.auth_token import _base from keystonemiddleware.auth_token import _cache from keystonemiddleware.auth_token import _exceptions as ksm_exceptions -from keystonemiddleware.auth_token import _revocations from keystonemiddleware.tests.unit.auth_token import base from keystonemiddleware.tests.unit import client_fixtures @@ -101,13 +99,6 @@ ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2' TIMEOUT_TOKEN = '4ed1c5e53beee59458adcf8261a8cae2' -def cleanup_revoked_file(filename): - try: - os.remove(filename) - except OSError: - pass - - def strtime(at=None): at = at or timeutils.utcnow() return at.strftime(timeutils.PERFECT_TIME_FORMAT) @@ -337,9 +328,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): self.middleware = auth_token.AuthProtocol( self.fake_app(self.expected_env), self.conf) - self.middleware._revocations._list = jsonutils.dumps( - {"revoked": [], "extra": "success"}) - def update_expected_env(self, expected_env={}): self.middleware._app.expected_env.update(expected_env) @@ -476,27 +464,14 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertThat(hashed_short_string_key, matchers.HasLength(len(hashed_long_string_key))) - def test_config_revocation_cache_timeout(self): - conf = { - 'revocation_cache_time': '24', - 'www_authenticate_uri': 'https://keystone.example.com:1234', - 'admin_user': uuid.uuid4().hex - } - middleware = auth_token.AuthProtocol(self.fake_app, conf) - self.assertEqual(middleware._revocations._cache_timeout, - datetime.timedelta(seconds=24)) - def test_conf_values_type_convert(self): conf = { - 'revocation_cache_time': '24', 'identity_uri': 'https://keystone.example.com:1234', 'include_service_catalog': '0', 'nonexsit_option': '0', } middleware = auth_token.AuthProtocol(self.fake_app, conf) - self.assertEqual(datetime.timedelta(seconds=24), - middleware._revocations._cache_timeout) self.assertFalse(middleware._include_service_catalog) self.assertEqual('0', middleware._conf.get('nonexsit_option')) @@ -643,42 +618,6 @@ class CommonAuthTokenMiddlewareTest(object): self.assert_valid_request_200(self.token_dict['uuid_token_default']) self.assert_valid_last_url(self.token_dict['uuid_token_default']) - def _test_cache_revoked(self, token, revoked_form=None): - # When the token is cached and revoked, 401 is returned. - self.middleware._check_revocations_for_cached = True - - # Token should be cached as ok after this. - self.call_middleware(headers={'X-Auth-Token': token}) - - # Put it in revocation list. - self.middleware._revocations._list = self.get_revocation_list_json( - token_ids=[revoked_form or token]) - - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - - def test_cached_revoked_error(self): - # When the token is cached and revocation list retrieval fails, - # 503 is returned - token = self.token_dict['uuid_token_default'] - self.middleware._check_revocations_for_cached = True - - # Token should be cached as ok after this. - resp = self.call_middleware(headers={'X-Auth-Token': token}) - self.assertEqual(200, resp.status_int) - - # Cause the revocation list to be fetched again next time so we can - # test the case where that retrieval fails - self.middleware._revocations._fetched_time = datetime.datetime.min - with mock.patch.object(self.middleware._revocations, '_fetch', - side_effect=ksm_exceptions.RevocationListError): - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=503) - - def test_cached_revoked_uuid(self): - # When the UUID token is cached and revoked, 401 is returned. - self._test_cache_revoked(self.token_dict['uuid_token_default']) - def test_valid_signed_request(self): for _ in range(2): # Do it twice because first result was cached. self.assert_valid_request_200( @@ -692,176 +631,13 @@ class CommonAuthTokenMiddlewareTest(object): # ensure that signed requests do not generate HTTP traffic self.assertLastPath(None) - def test_revoked_token_receives_401(self): - self.middleware._revocations._list = ( - self.get_revocation_list_json()) - - token = self.token_dict['revoked_token'] - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - - def test_revoked_token_receives_401_sha256(self): - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.set_middleware() - self.middleware._revocations._list = ( - self.get_revocation_list_json(mode='sha256')) - - token = self.token_dict['revoked_token'] - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - - def test_cached_revoked_pki(self): - # When the PKI token is cached and revoked, 401 is returned. - token = self.token_dict['signed_token_scoped'] - revoked_form = cms.cms_hash_token(token) - self._test_cache_revoked(token, revoked_form) - - def test_cached_revoked_pkiz(self): - # When the PKIZ token is cached and revoked, 401 is returned. - token = self.token_dict['signed_token_scoped_pkiz'] - revoked_form = cms.cms_hash_token(token) - self._test_cache_revoked(token, revoked_form) - - def test_revoked_token_receives_401_md5_secondary(self): - # When hash_algorithms has 'md5' as the secondary hash and the - # revocation list contains the md5 hash for a token, that token is - # considered revoked so returns 401. - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.set_middleware() - self.middleware._revocations._list = ( - self.get_revocation_list_json()) - - token = self.token_dict['revoked_token'] - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - - def _test_revoked_hashed_token(self, token_name): - # If hash_algorithms is set as ['sha256', 'md5'], - # and check_revocations_for_cached is True, - # and a token is in the cache because it was successfully validated - # using the md5 hash, then - # if the token is in the revocation list by md5 hash, it'll be - # rejected and auth_token returns 401. - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.conf['check_revocations_for_cached'] = 'true' - self.set_middleware() - - token = self.token_dict[token_name] - - # Put the token in the revocation list. - token_hashed = cms.cms_hash_token(token) - self.middleware._revocations._list = self.get_revocation_list_json( - token_ids=[token_hashed]) - - # First, request is using the hashed token, is valid so goes in - # cache using the given hash. - self.call_middleware(headers={'X-Auth-Token': token_hashed}) - - # This time use the PKI(Z) token - # Should find the token in the cache and revocation list. - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - - def test_revoked_hashed_pki_token(self): - self._test_revoked_hashed_token('signed_token_scoped') - - def test_revoked_hashed_pkiz_token(self): - self._test_revoked_hashed_token('signed_token_scoped_pkiz') - - def test_revoked_pki_token_by_audit_id(self): - # When the audit ID is in the revocation list, the token is invalid. - self.set_middleware() - token = self.token_dict['signed_token_scoped'] - - # Put the token audit ID in the revocation list, - # the entry will have a false token ID so the token ID doesn't match. - fake_token_id = uuid.uuid4().hex - # The audit_id value is in examples/pki/cms/auth_*_token_scoped.json. - audit_id = 'SLIXlXQUQZWUi9VJrqdXqA' - revocation_list_data = { - 'revoked': [ - { - 'id': fake_token_id, - 'audit_id': audit_id - }, - ] - } - self.middleware._revocations._list = jsonutils.dumps( - revocation_list_data) - - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - - def get_revocation_list_json(self, token_ids=None, mode=None): - if token_ids is None: - key = 'revoked_token_hash' + (('_' + mode) if mode else '') - token_ids = [self.token_dict[key]] - revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()} - for x in token_ids]} - return jsonutils.dumps(revocation_list) - - def test_is_signed_token_revoked_returns_false(self): - # explicitly setting an empty revocation list here to document intent - self.middleware._revocations._list = jsonutils.dumps( - {"revoked": [], "extra": "success"}) - result = self.middleware._revocations._any_revoked( - [self.token_dict['revoked_token_hash']]) - self.assertFalse(result) - - def test_is_signed_token_revoked_returns_true(self): - self.middleware._revocations._list = ( - self.get_revocation_list_json()) - result = self.middleware._revocations._any_revoked( - [self.token_dict['revoked_token_hash']]) - self.assertTrue(result) - - def test_is_signed_token_revoked_returns_true_sha256(self): - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.set_middleware() - self.middleware._revocations._list = ( - self.get_revocation_list_json(mode='sha256')) - result = self.middleware._revocations._any_revoked( - [self.token_dict['revoked_token_hash_sha256']]) - self.assertTrue(result) - - def test_validate_offline_raises_exception_for_revoked_token(self): - self.middleware._revocations._list = ( - self.get_revocation_list_json()) - self.assertRaises(ksm_exceptions.InvalidToken, - self.middleware._validate_offline, - self.token_dict['revoked_token'], - [self.token_dict['revoked_token_hash']]) - - def test_validate_offline_raises_exception_for_revoked_token_s256(self): - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.set_middleware() - self.middleware._revocations._list = ( - self.get_revocation_list_json(mode='sha256')) - self.assertRaises(ksm_exceptions.InvalidToken, - self.middleware._validate_offline, - self.token_dict['revoked_token'], - [self.token_dict['revoked_token_hash_sha256'], - self.token_dict['revoked_token_hash']]) - - def test_validate_offline_raises_exception_for_revoked_pkiz_token(self): - self.middleware._revocations._list = ( - self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON) - self.assertRaises(ksm_exceptions.InvalidToken, - self.middleware._validate_offline, - self.token_dict['revoked_token_pkiz'], - [self.token_dict['revoked_token_pkiz_hash']]) - def test_validate_offline_succeeds_for_unrevoked_token(self): - self.middleware._revocations._list = ( - self.get_revocation_list_json()) token = self.middleware._validate_offline( self.token_dict['signed_token_scoped'], [self.token_dict['signed_token_scoped_hash']]) self.assertIsInstance(token, dict) def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self): - self.middleware._revocations._list = ( - self.get_revocation_list_json()) token = self.middleware._validate_offline( self.token_dict['signed_token_scoped_pkiz'], [self.token_dict['signed_token_scoped_hash']]) @@ -870,84 +646,12 @@ class CommonAuthTokenMiddlewareTest(object): def test_validate_offline_token_succeeds_for_unrevoked_token_sha256(self): self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() - self.middleware._revocations._list = ( - self.get_revocation_list_json(mode='sha256')) token = self.middleware._validate_offline( self.token_dict['signed_token_scoped'], [self.token_dict['signed_token_scoped_hash_sha256'], self.token_dict['signed_token_scoped_hash']]) self.assertIsInstance(token, dict) - def test_get_token_revocation_list_fetched_time_returns_min(self): - self.middleware._revocations._fetched_time = None - - # Get rid of the revoked file - revoked_path = self.middleware._signing_directory.calc_path( - _revocations.Revocations._FILE_NAME) - os.remove(revoked_path) - - self.assertEqual(self.middleware._revocations._fetched_time, - datetime.datetime.min) - - # FIXME(blk-u): move the unit tests into unit/test_auth_token.py - def test_get_token_revocation_list_fetched_time_returns_mtime(self): - self.middleware._revocations._fetched_time = None - revoked_path = self.middleware._signing_directory.calc_path( - _revocations.Revocations._FILE_NAME) - mtime = os.path.getmtime(revoked_path) - fetched_time = datetime.datetime.utcfromtimestamp(mtime) - self.assertEqual(fetched_time, - self.middleware._revocations._fetched_time) - - @testtools.skipUnless(TimezoneFixture.supported(), - 'TimezoneFixture not supported') - def test_get_token_revocation_list_fetched_time_returns_utc(self): - with TimezoneFixture('UTC-1'): - self.middleware._revocations._list = jsonutils.dumps( - self.examples.REVOCATION_LIST) - self.middleware._revocations._fetched_time = None - fetched_time = self.middleware._revocations._fetched_time - self.assertTrue(timeutils.is_soon(fetched_time, 1)) - - def test_get_token_revocation_list_fetched_time_returns_value(self): - expected = self.middleware._revocations._fetched_time - self.assertEqual(self.middleware._revocations._fetched_time, - expected) - - def test_get_revocation_list_returns_fetched_list(self): - # auth_token uses v2 to fetch this, so don't allow the v3 - # tests to override the fake http connection - self.middleware._revocations._fetched_time = None - - # Get rid of the revoked file - revoked_path = self.middleware._signing_directory.calc_path( - _revocations.Revocations._FILE_NAME) - os.remove(revoked_path) - - self.assertEqual(self.middleware._revocations._list, - self.examples.REVOCATION_LIST) - - def test_get_revocation_list_returns_current_list_from_memory(self): - self.assertEqual(self.middleware._revocations._list, - self.middleware._revocations._list_prop) - - def test_get_revocation_list_returns_current_list_from_disk(self): - in_memory_list = self.middleware._revocations._list - self.middleware._revocations._list_prop = None - self.assertEqual(self.middleware._revocations._list, - in_memory_list) - - def test_invalid_revocation_list_raises_error(self): - self.requests_mock.get(self.revocation_url, json={}) - self.assertRaises(ksm_exceptions.RevocationListError, - self.middleware._revocations._fetch) - - def test_fetch_revocation_list(self): - # auth_token uses v2 to fetch this, so don't allow the v3 - # tests to override the fake http connection - fetched = jsonutils.loads(self.middleware._revocations._fetch()) - self.assertEqual(fetched, self.examples.REVOCATION_LIST) - def test_request_invalid_uuid_token(self): # remember because we are testing the middleware we stub the connection # to the keystone server, but this is not what gets returned @@ -1604,13 +1308,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256, 'signed_token_scoped_expired': self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, - 'revoked_token': self.examples.REVOKED_TOKEN, - 'revoked_token_pkiz': self.examples.REVOKED_TOKEN_PKIZ, - 'revoked_token_pkiz_hash': - self.examples.REVOKED_TOKEN_PKIZ_HASH, - 'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH, - 'revoked_token_hash_sha256': - self.examples.REVOKED_TOKEN_HASH_SHA256, 'uuid_service_token_default': self.examples.UUID_SERVICE_TOKEN_DEFAULT, } @@ -1622,10 +1319,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN) - self.revocation_url = '%s/v2.0/tokens/revoked' % BASE_URI - self.requests_mock.get(self.revocation_url, - text=self.examples.SIGNED_REVOCATION_LIST) - for token in (self.examples.UUID_TOKEN_DEFAULT, self.examples.UUID_TOKEN_UNSCOPED, self.examples.UUID_TOKEN_BIND, @@ -1812,13 +1505,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256, 'signed_token_scoped_expired': self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, - 'revoked_token': self.examples.REVOKED_v3_TOKEN, - 'revoked_token_pkiz': self.examples.REVOKED_v3_TOKEN_PKIZ, - 'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH, - 'revoked_token_hash_sha256': - self.examples.REVOKED_v3_TOKEN_HASH_SHA256, - 'revoked_token_pkiz_hash': - self.examples.REVOKED_v3_PKIZ_TOKEN_HASH, 'uuid_service_token_default': self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT, } @@ -1832,10 +1518,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN) - self.revocation_url = '%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI - self.requests_mock.get(self.revocation_url, - text=self.examples.SIGNED_REVOCATION_LIST) - self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI, text=self.token_response, headers={'X-Subject-Token': uuid.uuid4().hex}) @@ -1944,7 +1626,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.token_dict['signed_token_scoped_pkiz']) def test_fallback_to_online_validation_with_revocation_list_error(self): - self.requests_mock.get(self.revocation_url, status_code=404) self.assert_valid_request_200(self.token_dict['signed_token_scoped']) self.assert_valid_request_200( self.token_dict['signed_token_scoped_pkiz']) @@ -2398,10 +2079,6 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN) - self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI, - text=self.examples.SIGNED_REVOCATION_LIST, - status_code=200) - for token in (self.examples.UUID_TOKEN_DEFAULT, self.examples.UUID_SERVICE_TOKEN_DEFAULT, self.examples.UUID_TOKEN_BIND, @@ -2455,9 +2132,6 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN) - self.requests_mock.get('%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI, - text=self.examples.SIGNED_REVOCATION_LIST) - self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI, text=self.token_response, headers={'X-Subject-Token': uuid.uuid4().hex}) |