summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrant Knudson <bknudson@us.ibm.com>2014-09-21 20:28:42 -0500
committerBrant Knudson <bknudson@us.ibm.com>2015-02-10 14:06:56 -0600
commitc320b6c46575c078240ebc827497e1363472b510 (patch)
treec58efde23b197c9bd953203279dd47a353afcc74
parentf302ed5fdbbf0925f59cf2ae13db680038d41ddd (diff)
downloadkeystonemiddleware-c320b6c46575c078240ebc827497e1363472b510.tar.gz
Refactor auth_token revocation list members to new class
This moves the revocation list members from AuthProtocol to their own class in auth_token. bp refactor-extract-class Change-Id: I756b92dee310fab7678fa800a833d6b13fd7e50a
-rw-r--r--keystonemiddleware/auth_token.py177
-rw-r--r--keystonemiddleware/tests/test_auth_token_middleware.py83
-rw-r--r--keystonemiddleware/tests/unit/test_auth_token.py43
3 files changed, 178 insertions, 125 deletions
diff --git a/keystonemiddleware/auth_token.py b/keystonemiddleware/auth_token.py
index 4095ebc..67f0858 100644
--- a/keystonemiddleware/auth_token.py
+++ b/keystonemiddleware/auth_token.py
@@ -696,7 +696,6 @@ class AuthProtocol(object):
_SIGNING_CERT_FILE_NAME = 'signing_cert.pem'
_SIGNING_CA_FILE_NAME = 'cacert.pem'
- _REVOKED_FILE_NAME = 'revoked.pem'
def __init__(self, app, conf):
self._LOG = logging.getLogger(conf.get('log_name', __name__))
@@ -731,10 +730,14 @@ class AuthProtocol(object):
directory_name=self._conf_get('signing_dir'), log=self._LOG)
self._token_cache = self._token_cache_factory()
- self._token_revocation_list_prop = None
- self._token_revocation_list_fetched_time_prop = None
- self._token_revocation_list_cache_timeout = datetime.timedelta(
+
+ revocation_cache_timeout = datetime.timedelta(
seconds=self._conf_get('revocation_cache_time'))
+ self._revocations = _Revocations(revocation_cache_timeout,
+ self._signing_directory,
+ self._identity_server,
+ self._cms_verify,
+ self._LOG)
self._check_revocations_for_cached = self._conf_get(
'check_revocations_for_cached')
@@ -936,13 +939,7 @@ class AuthProtocol(object):
# A token stored in Memcached might have been revoked
# regardless of initial mechanism used to validate it,
# and needs to be checked.
- for tid in token_ids:
- is_revoked = self._is_token_id_in_revoked_list(tid)
- if is_revoked:
- self._LOG.debug(
- 'Token is marked as having been revoked')
- raise InvalidToken(
- _('Token authorization failed'))
+ self._revocations.check(token_ids)
self._confirm_token_bind(data, env)
else:
verified = None
@@ -1150,24 +1147,6 @@ class AuthProtocol(object):
{'bind_type': bind_type, 'identifier': identifier})
self._invalid_user_token()
- def _is_signed_token_revoked(self, token_ids):
- """Indicate whether the token appears in the revocation list."""
- for token_id in token_ids:
- if self._is_token_id_in_revoked_list(token_id):
- self._LOG.debug('Token is marked as having been revoked')
- return True
- return False
-
- def _is_token_id_in_revoked_list(self, token_id):
- """Indicate whether the token_id appears in the revocation list."""
- revocation_list = self._token_revocation_list
- revoked_tokens = revocation_list.get('revoked', None)
- if not revoked_tokens:
- return False
-
- revoked_ids = (x['id'] for x in revoked_tokens)
- return token_id in revoked_ids
-
def _cms_verify(self, data, inform=cms.PKI_ASN1_FORM):
"""Verifies the signature of the provided data's IAW CMS syntax.
@@ -1205,16 +1184,13 @@ class AuthProtocol(object):
def _verify_signed_token(self, signed_text, token_ids):
"""Check that the token is unrevoked and has a valid signature."""
- if self._is_signed_token_revoked(token_ids):
- raise InvalidToken(_('Token has been revoked'))
-
+ self._revocations.check(token_ids)
formatted = cms.token_to_cms(signed_text)
verified = self._cms_verify(formatted)
return verified
def _verify_pkiz_token(self, signed_text, token_ids):
- if self._is_signed_token_revoked(token_ids):
- raise InvalidToken(_('Token has been revoked'))
+ self._revocations.check(token_ids)
try:
uncompressed = cms.pkiz_uncompress(signed_text)
verified = self._cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
@@ -1223,56 +1199,6 @@ class AuthProtocol(object):
except TypeError:
raise InvalidToken(signed_text)
- @property
- def _token_revocation_list_fetched_time(self):
- if not self._token_revocation_list_fetched_time_prop:
- # If the fetched list has been written to disk, use its
- # modification time.
- revoked_file_name = self._signing_directory.calc_path(
- self._REVOKED_FILE_NAME)
- if os.path.exists(revoked_file_name):
- mtime = os.path.getmtime(revoked_file_name)
- fetched_time = datetime.datetime.utcfromtimestamp(mtime)
- # Otherwise the list will need to be fetched.
- else:
- fetched_time = datetime.datetime.min
- self._token_revocation_list_fetched_time_prop = fetched_time
- return self._token_revocation_list_fetched_time_prop
-
- @_token_revocation_list_fetched_time.setter
- def _token_revocation_list_fetched_time(self, value):
- self._token_revocation_list_fetched_time_prop = value
-
- @property
- def _token_revocation_list(self):
- timeout = (self._token_revocation_list_fetched_time +
- self._token_revocation_list_cache_timeout)
- list_is_current = timeutils.utcnow() < timeout
-
- if list_is_current:
- # Load the list from disk if required
- if not self._token_revocation_list_prop:
- self._token_revocation_list_prop = jsonutils.loads(
- self._signing_directory.read_file(self._REVOKED_FILE_NAME))
- else:
- self._token_revocation_list = self._fetch_revocation_list()
- return self._token_revocation_list_prop
-
- @_token_revocation_list.setter
- def _token_revocation_list(self, value):
- """Save a revocation list to memory and to disk.
-
- :param value: A json-encoded revocation list
-
- """
- self._token_revocation_list_prop = jsonutils.loads(value)
- self._token_revocation_list_fetched_time = timeutils.utcnow()
- self._signing_directory.write_file(self._REVOKED_FILE_NAME, value)
-
- def _fetch_revocation_list(self):
- revocation_list_data = self._identity_server.fetch_revocation_list()
- return self._cms_verify(revocation_list_data)
-
def _fetch_signing_cert(self):
self._signing_directory.write_file(
self._SIGNING_CERT_FILE_NAME,
@@ -1643,6 +1569,89 @@ class _V3RequestStrategy(_RequestStrategy):
_REQUEST_STRATEGIES = [_V3RequestStrategy, _V2RequestStrategy]
+class _Revocations(object):
+ _FILE_NAME = 'revoked.pem'
+
+ def __init__(self, timeout, signing_directory, identity_server,
+ cms_verify, log=_LOG):
+ self._cache_timeout = timeout
+ self._signing_directory = signing_directory
+ self._identity_server = identity_server
+ self._cms_verify = cms_verify
+ self._log = log
+
+ self._fetched_time_prop = None
+ self._list_prop = None
+
+ @property
+ def _fetched_time(self):
+ if not self._fetched_time_prop:
+ # If the fetched list has been written to disk, use its
+ # modification time.
+ file_path = self._signing_directory.calc_path(self._FILE_NAME)
+ if os.path.exists(file_path):
+ mtime = os.path.getmtime(file_path)
+ fetched_time = datetime.datetime.utcfromtimestamp(mtime)
+ # Otherwise the list will need to be fetched.
+ else:
+ fetched_time = datetime.datetime.min
+ self._fetched_time_prop = fetched_time
+ return self._fetched_time_prop
+
+ @_fetched_time.setter
+ def _fetched_time(self, value):
+ self._fetched_time_prop = value
+
+ def _fetch(self):
+ revocation_list_data = self._identity_server.fetch_revocation_list()
+ return self._cms_verify(revocation_list_data)
+
+ @property
+ def _list(self):
+ timeout = self._fetched_time + self._cache_timeout
+ list_is_current = timeutils.utcnow() < timeout
+
+ if list_is_current:
+ # Load the list from disk if required
+ if not self._list_prop:
+ self._list_prop = jsonutils.loads(
+ self._signing_directory.read_file(self._FILE_NAME))
+ else:
+ self._list = self._fetch()
+ return self._list_prop
+
+ @_list.setter
+ def _list(self, value):
+ """Save a revocation list to memory and to disk.
+
+ :param value: A json-encoded revocation list
+
+ """
+ self._list_prop = jsonutils.loads(value)
+ self._fetched_time = timeutils.utcnow()
+ self._signing_directory.write_file(self._FILE_NAME, value)
+
+ def _is_revoked(self, token_id):
+ """Indicate whether the token_id appears in the revocation list."""
+ revoked_tokens = self._list.get('revoked', None)
+ if not revoked_tokens:
+ return False
+
+ revoked_ids = (x['id'] for x in revoked_tokens)
+ return token_id in revoked_ids
+
+ def _any_revoked(self, token_ids):
+ for token_id in token_ids:
+ if self._is_revoked(token_id):
+ return True
+ return False
+
+ def check(self, token_ids):
+ if self._any_revoked(token_ids):
+ self._log.debug('Token is marked as having been revoked')
+ raise InvalidToken(_('Token has been revoked'))
+
+
class _SigningDirectory(object):
def __init__(self, directory_name=None, log=None):
self._log = log or _LOG
diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py
index 09e6a79..7c80178 100644
--- a/keystonemiddleware/tests/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/test_auth_token_middleware.py
@@ -337,7 +337,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.middleware = auth_token.AuthProtocol(
self.fake_app(self.expected_env), self.conf)
- self.middleware._token_revocation_list = jsonutils.dumps(
+ self.middleware._revocations._list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
def update_expected_env(self, expected_env={}):
@@ -600,7 +600,7 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'admin_user': uuid.uuid4().hex
}
middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(middleware._token_revocation_list_cache_timeout,
+ self.assertEqual(middleware._revocations._cache_timeout,
datetime.timedelta(seconds=24))
def test_conf_values_type_convert(self):
@@ -613,7 +613,7 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
middleware = auth_token.AuthProtocol(self.fake_app, conf)
self.assertEqual(datetime.timedelta(seconds=24),
- middleware._token_revocation_list_cache_timeout)
+ middleware._revocations._cache_timeout)
self.assertEqual(False, middleware._include_service_catalog)
self.assertEqual('0', middleware._conf['nonexsit_option'])
@@ -706,7 +706,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual(200, self.response_status)
# Put it in revocation list.
- self.middleware._token_revocation_list = self.get_revocation_list_json(
+ self.middleware._revocations._list = self.get_revocation_list_json(
token_ids=[revoked_form or token])
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(401, self.response_status)
@@ -729,7 +729,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertLastPath(None)
def test_revoked_token_receives_401(self):
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json())
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
@@ -739,7 +739,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_revoked_token_receives_401_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
@@ -764,7 +764,7 @@ class CommonAuthTokenMiddlewareTest(object):
# considered revoked so returns 401.
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json())
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
@@ -786,7 +786,7 @@ class CommonAuthTokenMiddlewareTest(object):
# Put the token in the revocation list.
token_hashed = cms.cms_hash_token(token)
- self.middleware._token_revocation_list = self.get_revocation_list_json(
+ self.middleware._revocations._list = self.get_revocation_list_json(
token_ids=[token_hashed])
# First, request is using the hashed token, is valid so goes in
@@ -819,30 +819,30 @@ class CommonAuthTokenMiddlewareTest(object):
def test_is_signed_token_revoked_returns_false(self):
# explicitly setting an empty revocation list here to document intent
- self.middleware._token_revocation_list = jsonutils.dumps(
+ self.middleware._revocations._list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
- result = self.middleware._is_signed_token_revoked(
+ result = self.middleware._revocations._any_revoked(
[self.token_dict['revoked_token_hash']])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json())
- result = self.middleware._is_signed_token_revoked(
+ result = self.middleware._revocations._any_revoked(
[self.token_dict['revoked_token_hash']])
self.assertTrue(result)
def test_is_signed_token_revoked_returns_true_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
- result = self.middleware._is_signed_token_revoked(
+ result = self.middleware._revocations._any_revoked(
[self.token_dict['revoked_token_hash_sha256']])
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json())
self.assertRaises(auth_token.InvalidToken,
self.middleware._verify_signed_token,
@@ -852,7 +852,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signed_token_raises_exception_for_revoked_token_s256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
self.assertRaises(auth_token.InvalidToken,
self.middleware._verify_signed_token,
@@ -861,7 +861,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.token_dict['revoked_token_hash']])
def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self):
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
self.assertRaises(auth_token.InvalidToken,
self.middleware._verify_pkiz_token,
@@ -872,7 +872,7 @@ class CommonAuthTokenMiddlewareTest(object):
json.loads(text)
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json())
text = self.middleware._verify_signed_token(
self.token_dict['signed_token_scoped'],
@@ -880,7 +880,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertIsValidJSON(text)
def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json())
text = self.middleware._verify_pkiz_token(
self.token_dict['signed_token_scoped_pkiz'],
@@ -890,7 +890,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
- self.middleware._token_revocation_list = (
+ self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
text = self.middleware._verify_signed_token(
self.token_dict['signed_token_scoped'],
@@ -899,73 +899,74 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertIsValidJSON(text)
def test_get_token_revocation_list_fetched_time_returns_min(self):
- self.middleware._token_revocation_list_fetched_time = None
+ self.middleware._revocations._fetched_time = None
# Get rid of the revoked file
revoked_path = self.middleware._signing_directory.calc_path(
- auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ auth_token._Revocations._FILE_NAME)
os.remove(revoked_path)
- self.assertEqual(self.middleware._token_revocation_list_fetched_time,
+ self.assertEqual(self.middleware._revocations._fetched_time,
datetime.datetime.min)
+ # FIXME(blk-u): move the unit tests into unit/test_auth_token.py
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
- self.middleware._token_revocation_list_fetched_time = None
+ self.middleware._revocations._fetched_time = None
revoked_path = self.middleware._signing_directory.calc_path(
- auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ auth_token._Revocations._FILE_NAME)
mtime = os.path.getmtime(revoked_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
- self.middleware._token_revocation_list_fetched_time)
+ self.middleware._revocations._fetched_time)
@testtools.skipUnless(TimezoneFixture.supported(),
'TimezoneFixture not supported')
def test_get_token_revocation_list_fetched_time_returns_utc(self):
with TimezoneFixture('UTC-1'):
- self.middleware._token_revocation_list = jsonutils.dumps(
+ self.middleware._revocations._list = jsonutils.dumps(
self.examples.REVOCATION_LIST)
- self.middleware._token_revocation_list_fetched_time = None
- fetched_time = self.middleware._token_revocation_list_fetched_time
+ self.middleware._revocations._fetched_time = None
+ fetched_time = self.middleware._revocations._fetched_time
self.assertTrue(timeutils.is_soon(fetched_time, 1))
def test_get_token_revocation_list_fetched_time_returns_value(self):
- expected = self.middleware._token_revocation_list_fetched_time
- self.assertEqual(self.middleware._token_revocation_list_fetched_time,
+ expected = self.middleware._revocations._fetched_time
+ self.assertEqual(self.middleware._revocations._fetched_time,
expected)
def test_get_revocation_list_returns_fetched_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
- self.middleware._token_revocation_list_fetched_time = None
+ self.middleware._revocations._fetched_time = None
# Get rid of the revoked file
revoked_path = self.middleware._signing_directory.calc_path(
- auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ auth_token._Revocations._FILE_NAME)
os.remove(revoked_path)
- self.assertEqual(self.middleware._token_revocation_list,
+ self.assertEqual(self.middleware._revocations._list,
self.examples.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
- self.assertEqual(self.middleware._token_revocation_list,
- self.middleware._token_revocation_list_prop)
+ self.assertEqual(self.middleware._revocations._list,
+ self.middleware._revocations._list_prop)
def test_get_revocation_list_returns_current_list_from_disk(self):
- in_memory_list = self.middleware._token_revocation_list
- self.middleware._token_revocation_list_prop = None
- self.assertEqual(self.middleware._token_revocation_list,
+ in_memory_list = self.middleware._revocations._list
+ self.middleware._revocations._list_prop = None
+ self.assertEqual(self.middleware._revocations._list,
in_memory_list)
def test_invalid_revocation_list_raises_error(self):
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={})
self.assertRaises(auth_token.RevocationListError,
- self.middleware._fetch_revocation_list)
+ self.middleware._revocations._fetch)
def test_fetch_revocation_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
- fetched = jsonutils.loads(self.middleware._fetch_revocation_list())
+ fetched = jsonutils.loads(self.middleware._revocations._fetch())
self.assertEqual(fetched, self.examples.REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
diff --git a/keystonemiddleware/tests/unit/test_auth_token.py b/keystonemiddleware/tests/unit/test_auth_token.py
index 29f2dff..1041f43 100644
--- a/keystonemiddleware/tests/unit/test_auth_token.py
+++ b/keystonemiddleware/tests/unit/test_auth_token.py
@@ -12,16 +12,59 @@
# License for the specific language governing permissions and limitations
# under the License.
+import datetime
+import json
import os
import shutil
import stat
import uuid
+import mock
import testtools
from keystonemiddleware import auth_token
+class RevocationsTests(testtools.TestCase):
+
+ def _check_with_list(self, revoked_list, token_ids):
+ directory_name = '/tmp/%s' % uuid.uuid4().hex
+ signing_directory = auth_token._SigningDirectory(directory_name)
+ self.addCleanup(shutil.rmtree, directory_name)
+
+ identity_server = mock.Mock()
+
+ verify_result_obj = {
+ 'revoked': list({'id': r} for r in revoked_list)
+ }
+ cms_verify = mock.Mock(return_value=json.dumps(verify_result_obj))
+
+ revocations = auth_token._Revocations(
+ timeout=datetime.timedelta(1), signing_directory=signing_directory,
+ identity_server=identity_server, cms_verify=cms_verify)
+
+ revocations.check(token_ids)
+
+ def test_check_empty_list(self):
+ # When the identity server returns an empty list, a token isn't
+ # revoked.
+
+ revoked_tokens = []
+ token_ids = [uuid.uuid4().hex]
+ # No assert because this would raise
+ self._check_with_list(revoked_tokens, token_ids)
+
+ def test_check_revoked(self):
+ # When the identity server returns a list with a token in it, that
+ # token is revoked.
+
+ token_id = uuid.uuid4().hex
+ revoked_tokens = [token_id]
+ token_ids = [token_id]
+ self.assertRaises(auth_token.InvalidToken,
+ self._check_with_list, revoked_tokens, token_ids)
+
+
class SigningDirectoryTests(testtools.TestCase):
def test_directory_created_when_doesnt_exist(self):