diff options
author | Brant Knudson <bknudson@us.ibm.com> | 2014-09-21 20:28:42 -0500 |
---|---|---|
committer | Brant Knudson <bknudson@us.ibm.com> | 2015-02-10 14:06:56 -0600 |
commit | c320b6c46575c078240ebc827497e1363472b510 (patch) | |
tree | c58efde23b197c9bd953203279dd47a353afcc74 | |
parent | f302ed5fdbbf0925f59cf2ae13db680038d41ddd (diff) | |
download | keystonemiddleware-c320b6c46575c078240ebc827497e1363472b510.tar.gz |
Refactor auth_token revocation list members to new class
This moves the revocation list members from AuthProtocol to their
own class in auth_token.
bp refactor-extract-class
Change-Id: I756b92dee310fab7678fa800a833d6b13fd7e50a
-rw-r--r-- | keystonemiddleware/auth_token.py | 177 | ||||
-rw-r--r-- | keystonemiddleware/tests/test_auth_token_middleware.py | 83 | ||||
-rw-r--r-- | keystonemiddleware/tests/unit/test_auth_token.py | 43 |
3 files changed, 178 insertions, 125 deletions
diff --git a/keystonemiddleware/auth_token.py b/keystonemiddleware/auth_token.py index 4095ebc..67f0858 100644 --- a/keystonemiddleware/auth_token.py +++ b/keystonemiddleware/auth_token.py @@ -696,7 +696,6 @@ class AuthProtocol(object): _SIGNING_CERT_FILE_NAME = 'signing_cert.pem' _SIGNING_CA_FILE_NAME = 'cacert.pem' - _REVOKED_FILE_NAME = 'revoked.pem' def __init__(self, app, conf): self._LOG = logging.getLogger(conf.get('log_name', __name__)) @@ -731,10 +730,14 @@ class AuthProtocol(object): directory_name=self._conf_get('signing_dir'), log=self._LOG) self._token_cache = self._token_cache_factory() - self._token_revocation_list_prop = None - self._token_revocation_list_fetched_time_prop = None - self._token_revocation_list_cache_timeout = datetime.timedelta( + + revocation_cache_timeout = datetime.timedelta( seconds=self._conf_get('revocation_cache_time')) + self._revocations = _Revocations(revocation_cache_timeout, + self._signing_directory, + self._identity_server, + self._cms_verify, + self._LOG) self._check_revocations_for_cached = self._conf_get( 'check_revocations_for_cached') @@ -936,13 +939,7 @@ class AuthProtocol(object): # A token stored in Memcached might have been revoked # regardless of initial mechanism used to validate it, # and needs to be checked. - for tid in token_ids: - is_revoked = self._is_token_id_in_revoked_list(tid) - if is_revoked: - self._LOG.debug( - 'Token is marked as having been revoked') - raise InvalidToken( - _('Token authorization failed')) + self._revocations.check(token_ids) self._confirm_token_bind(data, env) else: verified = None @@ -1150,24 +1147,6 @@ class AuthProtocol(object): {'bind_type': bind_type, 'identifier': identifier}) self._invalid_user_token() - def _is_signed_token_revoked(self, token_ids): - """Indicate whether the token appears in the revocation list.""" - for token_id in token_ids: - if self._is_token_id_in_revoked_list(token_id): - self._LOG.debug('Token is marked as having been revoked') - return True - return False - - def _is_token_id_in_revoked_list(self, token_id): - """Indicate whether the token_id appears in the revocation list.""" - revocation_list = self._token_revocation_list - revoked_tokens = revocation_list.get('revoked', None) - if not revoked_tokens: - return False - - revoked_ids = (x['id'] for x in revoked_tokens) - return token_id in revoked_ids - def _cms_verify(self, data, inform=cms.PKI_ASN1_FORM): """Verifies the signature of the provided data's IAW CMS syntax. @@ -1205,16 +1184,13 @@ class AuthProtocol(object): def _verify_signed_token(self, signed_text, token_ids): """Check that the token is unrevoked and has a valid signature.""" - if self._is_signed_token_revoked(token_ids): - raise InvalidToken(_('Token has been revoked')) - + self._revocations.check(token_ids) formatted = cms.token_to_cms(signed_text) verified = self._cms_verify(formatted) return verified def _verify_pkiz_token(self, signed_text, token_ids): - if self._is_signed_token_revoked(token_ids): - raise InvalidToken(_('Token has been revoked')) + self._revocations.check(token_ids) try: uncompressed = cms.pkiz_uncompress(signed_text) verified = self._cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM) @@ -1223,56 +1199,6 @@ class AuthProtocol(object): except TypeError: raise InvalidToken(signed_text) - @property - def _token_revocation_list_fetched_time(self): - if not self._token_revocation_list_fetched_time_prop: - # If the fetched list has been written to disk, use its - # modification time. - revoked_file_name = self._signing_directory.calc_path( - self._REVOKED_FILE_NAME) - if os.path.exists(revoked_file_name): - mtime = os.path.getmtime(revoked_file_name) - fetched_time = datetime.datetime.utcfromtimestamp(mtime) - # Otherwise the list will need to be fetched. - else: - fetched_time = datetime.datetime.min - self._token_revocation_list_fetched_time_prop = fetched_time - return self._token_revocation_list_fetched_time_prop - - @_token_revocation_list_fetched_time.setter - def _token_revocation_list_fetched_time(self, value): - self._token_revocation_list_fetched_time_prop = value - - @property - def _token_revocation_list(self): - timeout = (self._token_revocation_list_fetched_time + - self._token_revocation_list_cache_timeout) - list_is_current = timeutils.utcnow() < timeout - - if list_is_current: - # Load the list from disk if required - if not self._token_revocation_list_prop: - self._token_revocation_list_prop = jsonutils.loads( - self._signing_directory.read_file(self._REVOKED_FILE_NAME)) - else: - self._token_revocation_list = self._fetch_revocation_list() - return self._token_revocation_list_prop - - @_token_revocation_list.setter - def _token_revocation_list(self, value): - """Save a revocation list to memory and to disk. - - :param value: A json-encoded revocation list - - """ - self._token_revocation_list_prop = jsonutils.loads(value) - self._token_revocation_list_fetched_time = timeutils.utcnow() - self._signing_directory.write_file(self._REVOKED_FILE_NAME, value) - - def _fetch_revocation_list(self): - revocation_list_data = self._identity_server.fetch_revocation_list() - return self._cms_verify(revocation_list_data) - def _fetch_signing_cert(self): self._signing_directory.write_file( self._SIGNING_CERT_FILE_NAME, @@ -1643,6 +1569,89 @@ class _V3RequestStrategy(_RequestStrategy): _REQUEST_STRATEGIES = [_V3RequestStrategy, _V2RequestStrategy] +class _Revocations(object): + _FILE_NAME = 'revoked.pem' + + def __init__(self, timeout, signing_directory, identity_server, + cms_verify, log=_LOG): + self._cache_timeout = timeout + self._signing_directory = signing_directory + self._identity_server = identity_server + self._cms_verify = cms_verify + self._log = log + + self._fetched_time_prop = None + self._list_prop = None + + @property + def _fetched_time(self): + if not self._fetched_time_prop: + # If the fetched list has been written to disk, use its + # modification time. + file_path = self._signing_directory.calc_path(self._FILE_NAME) + if os.path.exists(file_path): + mtime = os.path.getmtime(file_path) + fetched_time = datetime.datetime.utcfromtimestamp(mtime) + # Otherwise the list will need to be fetched. + else: + fetched_time = datetime.datetime.min + self._fetched_time_prop = fetched_time + return self._fetched_time_prop + + @_fetched_time.setter + def _fetched_time(self, value): + self._fetched_time_prop = value + + def _fetch(self): + revocation_list_data = self._identity_server.fetch_revocation_list() + return self._cms_verify(revocation_list_data) + + @property + def _list(self): + timeout = self._fetched_time + self._cache_timeout + list_is_current = timeutils.utcnow() < timeout + + if list_is_current: + # Load the list from disk if required + if not self._list_prop: + self._list_prop = jsonutils.loads( + self._signing_directory.read_file(self._FILE_NAME)) + else: + self._list = self._fetch() + return self._list_prop + + @_list.setter + def _list(self, value): + """Save a revocation list to memory and to disk. + + :param value: A json-encoded revocation list + + """ + self._list_prop = jsonutils.loads(value) + self._fetched_time = timeutils.utcnow() + self._signing_directory.write_file(self._FILE_NAME, value) + + def _is_revoked(self, token_id): + """Indicate whether the token_id appears in the revocation list.""" + revoked_tokens = self._list.get('revoked', None) + if not revoked_tokens: + return False + + revoked_ids = (x['id'] for x in revoked_tokens) + return token_id in revoked_ids + + def _any_revoked(self, token_ids): + for token_id in token_ids: + if self._is_revoked(token_id): + return True + return False + + def check(self, token_ids): + if self._any_revoked(token_ids): + self._log.debug('Token is marked as having been revoked') + raise InvalidToken(_('Token has been revoked')) + + class _SigningDirectory(object): def __init__(self, directory_name=None, log=None): self._log = log or _LOG diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py index 09e6a79..7c80178 100644 --- a/keystonemiddleware/tests/test_auth_token_middleware.py +++ b/keystonemiddleware/tests/test_auth_token_middleware.py @@ -337,7 +337,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): self.middleware = auth_token.AuthProtocol( self.fake_app(self.expected_env), self.conf) - self.middleware._token_revocation_list = jsonutils.dumps( + self.middleware._revocations._list = jsonutils.dumps( {"revoked": [], "extra": "success"}) def update_expected_env(self, expected_env={}): @@ -600,7 +600,7 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, 'admin_user': uuid.uuid4().hex } middleware = auth_token.AuthProtocol(self.fake_app, conf) - self.assertEqual(middleware._token_revocation_list_cache_timeout, + self.assertEqual(middleware._revocations._cache_timeout, datetime.timedelta(seconds=24)) def test_conf_values_type_convert(self): @@ -613,7 +613,7 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, middleware = auth_token.AuthProtocol(self.fake_app, conf) self.assertEqual(datetime.timedelta(seconds=24), - middleware._token_revocation_list_cache_timeout) + middleware._revocations._cache_timeout) self.assertEqual(False, middleware._include_service_catalog) self.assertEqual('0', middleware._conf['nonexsit_option']) @@ -706,7 +706,7 @@ class CommonAuthTokenMiddlewareTest(object): self.assertEqual(200, self.response_status) # Put it in revocation list. - self.middleware._token_revocation_list = self.get_revocation_list_json( + self.middleware._revocations._list = self.get_revocation_list_json( token_ids=[revoked_form or token]) self.middleware(req.environ, self.start_fake_response) self.assertEqual(401, self.response_status) @@ -729,7 +729,7 @@ class CommonAuthTokenMiddlewareTest(object): self.assertLastPath(None) def test_revoked_token_receives_401(self): - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json()) req = webob.Request.blank('/') req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] @@ -739,7 +739,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_revoked_token_receives_401_sha256(self): self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json(mode='sha256')) req = webob.Request.blank('/') req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] @@ -764,7 +764,7 @@ class CommonAuthTokenMiddlewareTest(object): # considered revoked so returns 401. self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json()) req = webob.Request.blank('/') req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] @@ -786,7 +786,7 @@ class CommonAuthTokenMiddlewareTest(object): # Put the token in the revocation list. token_hashed = cms.cms_hash_token(token) - self.middleware._token_revocation_list = self.get_revocation_list_json( + self.middleware._revocations._list = self.get_revocation_list_json( token_ids=[token_hashed]) # First, request is using the hashed token, is valid so goes in @@ -819,30 +819,30 @@ class CommonAuthTokenMiddlewareTest(object): def test_is_signed_token_revoked_returns_false(self): # explicitly setting an empty revocation list here to document intent - self.middleware._token_revocation_list = jsonutils.dumps( + self.middleware._revocations._list = jsonutils.dumps( {"revoked": [], "extra": "success"}) - result = self.middleware._is_signed_token_revoked( + result = self.middleware._revocations._any_revoked( [self.token_dict['revoked_token_hash']]) self.assertFalse(result) def test_is_signed_token_revoked_returns_true(self): - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json()) - result = self.middleware._is_signed_token_revoked( + result = self.middleware._revocations._any_revoked( [self.token_dict['revoked_token_hash']]) self.assertTrue(result) def test_is_signed_token_revoked_returns_true_sha256(self): self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json(mode='sha256')) - result = self.middleware._is_signed_token_revoked( + result = self.middleware._revocations._any_revoked( [self.token_dict['revoked_token_hash_sha256']]) self.assertTrue(result) def test_verify_signed_token_raises_exception_for_revoked_token(self): - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json()) self.assertRaises(auth_token.InvalidToken, self.middleware._verify_signed_token, @@ -852,7 +852,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_verify_signed_token_raises_exception_for_revoked_token_s256(self): self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json(mode='sha256')) self.assertRaises(auth_token.InvalidToken, self.middleware._verify_signed_token, @@ -861,7 +861,7 @@ class CommonAuthTokenMiddlewareTest(object): self.token_dict['revoked_token_hash']]) def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self): - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON) self.assertRaises(auth_token.InvalidToken, self.middleware._verify_pkiz_token, @@ -872,7 +872,7 @@ class CommonAuthTokenMiddlewareTest(object): json.loads(text) def test_verify_signed_token_succeeds_for_unrevoked_token(self): - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json()) text = self.middleware._verify_signed_token( self.token_dict['signed_token_scoped'], @@ -880,7 +880,7 @@ class CommonAuthTokenMiddlewareTest(object): self.assertIsValidJSON(text) def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self): - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json()) text = self.middleware._verify_pkiz_token( self.token_dict['signed_token_scoped_pkiz'], @@ -890,7 +890,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self): self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() - self.middleware._token_revocation_list = ( + self.middleware._revocations._list = ( self.get_revocation_list_json(mode='sha256')) text = self.middleware._verify_signed_token( self.token_dict['signed_token_scoped'], @@ -899,73 +899,74 @@ class CommonAuthTokenMiddlewareTest(object): self.assertIsValidJSON(text) def test_get_token_revocation_list_fetched_time_returns_min(self): - self.middleware._token_revocation_list_fetched_time = None + self.middleware._revocations._fetched_time = None # Get rid of the revoked file revoked_path = self.middleware._signing_directory.calc_path( - auth_token.AuthProtocol._REVOKED_FILE_NAME) + auth_token._Revocations._FILE_NAME) os.remove(revoked_path) - self.assertEqual(self.middleware._token_revocation_list_fetched_time, + self.assertEqual(self.middleware._revocations._fetched_time, datetime.datetime.min) + # FIXME(blk-u): move the unit tests into unit/test_auth_token.py def test_get_token_revocation_list_fetched_time_returns_mtime(self): - self.middleware._token_revocation_list_fetched_time = None + self.middleware._revocations._fetched_time = None revoked_path = self.middleware._signing_directory.calc_path( - auth_token.AuthProtocol._REVOKED_FILE_NAME) + auth_token._Revocations._FILE_NAME) mtime = os.path.getmtime(revoked_path) fetched_time = datetime.datetime.utcfromtimestamp(mtime) self.assertEqual(fetched_time, - self.middleware._token_revocation_list_fetched_time) + self.middleware._revocations._fetched_time) @testtools.skipUnless(TimezoneFixture.supported(), 'TimezoneFixture not supported') def test_get_token_revocation_list_fetched_time_returns_utc(self): with TimezoneFixture('UTC-1'): - self.middleware._token_revocation_list = jsonutils.dumps( + self.middleware._revocations._list = jsonutils.dumps( self.examples.REVOCATION_LIST) - self.middleware._token_revocation_list_fetched_time = None - fetched_time = self.middleware._token_revocation_list_fetched_time + self.middleware._revocations._fetched_time = None + fetched_time = self.middleware._revocations._fetched_time self.assertTrue(timeutils.is_soon(fetched_time, 1)) def test_get_token_revocation_list_fetched_time_returns_value(self): - expected = self.middleware._token_revocation_list_fetched_time - self.assertEqual(self.middleware._token_revocation_list_fetched_time, + expected = self.middleware._revocations._fetched_time + self.assertEqual(self.middleware._revocations._fetched_time, expected) def test_get_revocation_list_returns_fetched_list(self): # auth_token uses v2 to fetch this, so don't allow the v3 # tests to override the fake http connection - self.middleware._token_revocation_list_fetched_time = None + self.middleware._revocations._fetched_time = None # Get rid of the revoked file revoked_path = self.middleware._signing_directory.calc_path( - auth_token.AuthProtocol._REVOKED_FILE_NAME) + auth_token._Revocations._FILE_NAME) os.remove(revoked_path) - self.assertEqual(self.middleware._token_revocation_list, + self.assertEqual(self.middleware._revocations._list, self.examples.REVOCATION_LIST) def test_get_revocation_list_returns_current_list_from_memory(self): - self.assertEqual(self.middleware._token_revocation_list, - self.middleware._token_revocation_list_prop) + self.assertEqual(self.middleware._revocations._list, + self.middleware._revocations._list_prop) def test_get_revocation_list_returns_current_list_from_disk(self): - in_memory_list = self.middleware._token_revocation_list - self.middleware._token_revocation_list_prop = None - self.assertEqual(self.middleware._token_revocation_list, + in_memory_list = self.middleware._revocations._list + self.middleware._revocations._list_prop = None + self.assertEqual(self.middleware._revocations._list, in_memory_list) def test_invalid_revocation_list_raises_error(self): self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={}) self.assertRaises(auth_token.RevocationListError, - self.middleware._fetch_revocation_list) + self.middleware._revocations._fetch) def test_fetch_revocation_list(self): # auth_token uses v2 to fetch this, so don't allow the v3 # tests to override the fake http connection - fetched = jsonutils.loads(self.middleware._fetch_revocation_list()) + fetched = jsonutils.loads(self.middleware._revocations._fetch()) self.assertEqual(fetched, self.examples.REVOCATION_LIST) def test_request_invalid_uuid_token(self): diff --git a/keystonemiddleware/tests/unit/test_auth_token.py b/keystonemiddleware/tests/unit/test_auth_token.py index 29f2dff..1041f43 100644 --- a/keystonemiddleware/tests/unit/test_auth_token.py +++ b/keystonemiddleware/tests/unit/test_auth_token.py @@ -12,16 +12,59 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime +import json import os import shutil import stat import uuid +import mock import testtools from keystonemiddleware import auth_token +class RevocationsTests(testtools.TestCase): + + def _check_with_list(self, revoked_list, token_ids): + directory_name = '/tmp/%s' % uuid.uuid4().hex + signing_directory = auth_token._SigningDirectory(directory_name) + self.addCleanup(shutil.rmtree, directory_name) + + identity_server = mock.Mock() + + verify_result_obj = { + 'revoked': list({'id': r} for r in revoked_list) + } + cms_verify = mock.Mock(return_value=json.dumps(verify_result_obj)) + + revocations = auth_token._Revocations( + timeout=datetime.timedelta(1), signing_directory=signing_directory, + identity_server=identity_server, cms_verify=cms_verify) + + revocations.check(token_ids) + + def test_check_empty_list(self): + # When the identity server returns an empty list, a token isn't + # revoked. + + revoked_tokens = [] + token_ids = [uuid.uuid4().hex] + # No assert because this would raise + self._check_with_list(revoked_tokens, token_ids) + + def test_check_revoked(self): + # When the identity server returns a list with a token in it, that + # token is revoked. + + token_id = uuid.uuid4().hex + revoked_tokens = [token_id] + token_ids = [token_id] + self.assertRaises(auth_token.InvalidToken, + self._check_with_list, revoked_tokens, token_ids) + + class SigningDirectoryTests(testtools.TestCase): def test_directory_created_when_doesnt_exist(self): |