summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-07-03 00:26:35 +0000
committerGerrit Code Review <review@openstack.org>2014-07-03 00:26:35 +0000
commitb9927f4e13ef1b6ed955c19151af980c2d558532 (patch)
tree8217cbe5fe640c02fe00d56fa3d0803a601841de
parent5556599b5750d1839aee39c5e39e22ef93d9ca3a (diff)
parent4b6dc3f1bd7166eb32627e4b3595a2dd2b8d2b2c (diff)
downloadkeystonemiddleware-b9927f4e13ef1b6ed955c19151af980c2d558532.tar.gz
Merge "Privatize Everything"
-rw-r--r--keystonemiddleware/auth_token.py506
-rw-r--r--keystonemiddleware/tests/test_auth_token_middleware.py185
2 files changed, 351 insertions, 340 deletions
diff --git a/keystonemiddleware/auth_token.py b/keystonemiddleware/auth_token.py
index 6f3a4b0..2146f07 100644
--- a/keystonemiddleware/auth_token.py
+++ b/keystonemiddleware/auth_token.py
@@ -195,7 +195,7 @@ from keystonemiddleware.openstack.common import timeutils
# deprecation works) then other projects will not be able to override the
# options via CONF.
-opts = [
+_OPTS = [
cfg.StrOpt('auth_admin_prefix',
default='',
help='Prefix to prepend at the beginning of the path. '
@@ -333,13 +333,12 @@ opts = [
]
CONF = cfg.CONF
-CONF.register_opts(opts, group='keystone_authtoken')
+CONF.register_opts(_OPTS, group='keystone_authtoken')
-LIST_OF_VERSIONS_TO_ATTEMPT = ['v2.0', 'v3.0']
-CACHE_KEY_TEMPLATE = 'tokens/%s'
+_LIST_OF_VERSIONS_TO_ATTEMPT = ['v2.0', 'v3.0']
-class BIND_MODE:
+class _BIND_MODE:
DISABLED = 'disabled'
PERMISSIVE = 'permissive'
STRICT = 'strict'
@@ -347,7 +346,7 @@ class BIND_MODE:
KERBEROS = 'kerberos'
-def will_expire_soon(expiry):
+def _will_expire_soon(expiry):
"""Determines if expiration is about to occur.
:param expiry: a datetime of the expected expiration
@@ -365,7 +364,7 @@ def _token_is_v3(token_info):
return ('token' in token_info)
-def confirm_token_not_expired(data):
+def _confirm_token_not_expired(data):
if not data:
raise InvalidUserToken('Token authorization failed')
if _token_is_v2(data):
@@ -418,7 +417,7 @@ def _v3_to_v2_catalog(catalog):
return v2_services
-def safe_quote(s):
+def _safe_quote(s):
"""URL-encode strings that are not already URL-encoded."""
return urllib.parse.quote(s) if s == urllib.parse.unquote(s) else s
@@ -439,7 +438,7 @@ class NetworkError(Exception):
pass
-class MiniResp(object):
+class _MiniResp(object):
def __init__(self, error_message, env, headers=[]):
# The HEAD method is unique: it must never return a body, even if
# it reports an error (RFC-2616 clause 9.4). We relieve callers
@@ -456,27 +455,28 @@ class AuthProtocol(object):
"""Auth Middleware that handles authenticating client calls."""
def __init__(self, app, conf):
- self.LOG = logging.getLogger(conf.get('log_name', __name__))
- self.LOG.info('Starting keystone auth_token middleware')
- self.conf = conf
- self.app = app
+ self._LOG = logging.getLogger(conf.get('log_name', __name__))
+ self._LOG.info('Starting keystone auth_token middleware')
+ self._conf = conf
+ self._app = app
# delay_auth_decision means we still allow unauthenticated requests
# through and we let the downstream service make the final decision
- self.delay_auth_decision = (self._conf_get('delay_auth_decision') in
- (True, 'true', 't', '1', 'on', 'yes', 'y'))
+ self._delay_auth_decision = (self._conf_get('delay_auth_decision') in
+ (True, 'true', 't', '1', 'on', 'yes', 'y')
+ )
# where to find the auth service (we use this to validate tokens)
- self.identity_uri = self._conf_get('identity_uri')
- self.auth_uri = self._conf_get('auth_uri')
+ self._identity_uri = self._conf_get('identity_uri')
+ self._auth_uri = self._conf_get('auth_uri')
# NOTE(jamielennox): it does appear here that our defaults arguments
# are backwards. We need to do it this way so that we can handle the
# same deprecation strategy for CONF and the conf variable.
- if not self.identity_uri:
- self.LOG.warning('Configuring admin URI using auth fragments. '
- 'This is deprecated, use \'identity_uri\''
- ' instead.')
+ if not self._identity_uri:
+ self._LOG.warning('Configuring admin URI using auth fragments. '
+ 'This is deprecated, use \'identity_uri\''
+ ' instead.')
auth_host = self._conf_get('auth_host')
auth_port = int(self._conf_get('auth_port'))
@@ -489,16 +489,16 @@ class AuthProtocol(object):
# http://www.ietf.org/rfc/rfc2732.txt
auth_host = '[%s]' % auth_host
- self.identity_uri = '%s://%s:%s' % (auth_protocol, auth_host,
- auth_port)
+ self._identity_uri = '%s://%s:%s' % (auth_protocol, auth_host,
+ auth_port)
if auth_admin_prefix:
- self.identity_uri = '%s/%s' % (self.identity_uri,
- auth_admin_prefix.strip('/'))
+ self._identity_uri = '%s/%s' % (self._identity_uri,
+ auth_admin_prefix.strip('/'))
else:
- self.identity_uri = self.identity_uri.rstrip('/')
+ self._identity_uri = self._identity_uri.rstrip('/')
- if self.auth_uri is None:
- self.LOG.warning(
+ if self._auth_uri is None:
+ self._LOG.warning(
'Configuring auth_uri to point to the public identity '
'endpoint is required; clients may not be able to '
'authenticate against an admin endpoint')
@@ -507,49 +507,50 @@ class AuthProtocol(object):
# documented in bug 1207517.
# NOTE(jamielennox): we urljoin '/' to get just the base URI as
# this is the original behaviour.
- self.auth_uri = urllib.parse.urljoin(self.identity_uri, '/')
- self.auth_uri = self.auth_uri.rstrip('/')
+ self._auth_uri = urllib.parse.urljoin(self._identity_uri, '/')
+ self._auth_uri = self._auth_uri.rstrip('/')
# SSL
- self.cert_file = self._conf_get('certfile')
- self.key_file = self._conf_get('keyfile')
- self.ssl_ca_file = self._conf_get('cafile')
- self.ssl_insecure = self._conf_get('insecure')
+ self._cert_file = self._conf_get('certfile')
+ self._key_file = self._conf_get('keyfile')
+ self._ssl_ca_file = self._conf_get('cafile')
+ self._ssl_insecure = self._conf_get('insecure')
# signing
- self.signing_dirname = self._conf_get('signing_dir')
- if self.signing_dirname is None:
- self.signing_dirname = tempfile.mkdtemp(prefix='keystone-signing-')
- self.LOG.info('Using %s as cache directory for signing certificate',
- self.signing_dirname)
- self.verify_signing_dir()
-
- val = '%s/signing_cert.pem' % self.signing_dirname
- self.signing_cert_file_name = val
- val = '%s/cacert.pem' % self.signing_dirname
- self.signing_ca_file_name = val
- val = '%s/revoked.pem' % self.signing_dirname
- self.revoked_file_name = val
+ self._signing_dirname = self._conf_get('signing_dir')
+ if self._signing_dirname is None:
+ self._signing_dirname = tempfile.mkdtemp(
+ prefix='keystone-signing-')
+ self._LOG.info('Using %s as cache directory for signing certificate',
+ self._signing_dirname)
+ self._verify_signing_dir()
+
+ val = '%s/signing_cert.pem' % self._signing_dirname
+ self._signing_cert_file_name = val
+ val = '%s/cacert.pem' % self._signing_dirname
+ self._signing_ca_file_name = val
+ val = '%s/revoked.pem' % self._signing_dirname
+ self._revoked_file_name = val
# Credentials used to verify this component with the Auth service since
# validating tokens is a privileged call
- self.admin_token = self._conf_get('admin_token')
- if self.admin_token:
- self.LOG.warning(
+ self._admin_token = self._conf_get('admin_token')
+ if self._admin_token:
+ self._LOG.warning(
"The admin_token option in the auth_token middleware is "
"deprecated and should not be used. The admin_user and "
"admin_password options should be used instead. The "
"admin_token option may be removed in a future release.")
- self.admin_token_expiry = None
- self.admin_user = self._conf_get('admin_user')
- self.admin_password = self._conf_get('admin_password')
- self.admin_tenant_name = self._conf_get('admin_tenant_name')
+ self._admin_token_expiry = None
+ self._admin_user = self._conf_get('admin_user')
+ self._admin_password = self._conf_get('admin_password')
+ self._admin_tenant_name = self._conf_get('admin_tenant_name')
memcache_security_strategy = (
self._conf_get('memcache_security_strategy'))
- self._token_cache = TokenCache(
- self.LOG,
+ self._token_cache = _TokenCache(
+ self._LOG,
cache_time=int(self._conf_get('token_cache_time')),
hash_algorithms=self._conf_get('hash_algorithms'),
env_cache_name=self._conf_get('cache'),
@@ -557,27 +558,27 @@ class AuthProtocol(object):
memcache_security_strategy=memcache_security_strategy,
memcache_secret_key=self._conf_get('memcache_secret_key'))
- self._token_revocation_list = None
- self._token_revocation_list_fetched_time = None
- self.token_revocation_list_cache_timeout = datetime.timedelta(
+ self._token_revocation_list_prop = None
+ self._token_revocation_list_fetched_time_prop = None
+ self._token_revocation_list_cache_timeout = datetime.timedelta(
seconds=self._conf_get('revocation_cache_time'))
http_connect_timeout_cfg = self._conf_get('http_connect_timeout')
- self.http_connect_timeout = (http_connect_timeout_cfg and
- int(http_connect_timeout_cfg))
- self.auth_version = None
- self.http_request_max_retries = (
+ self._http_connect_timeout = (http_connect_timeout_cfg and
+ int(http_connect_timeout_cfg))
+ self._auth_version = None
+ self._http_request_max_retries = (
self._conf_get('http_request_max_retries'))
- self.include_service_catalog = self._conf_get(
+ self._include_service_catalog = self._conf_get(
'include_service_catalog')
- self.check_revocations_for_cached = self._conf_get(
+ self._check_revocations_for_cached = self._conf_get(
'check_revocations_for_cached')
def _conf_get(self, name):
# try config from paste-deploy first
- if name in self.conf:
- return self.conf[name]
+ if name in self._conf:
+ return self._conf[name]
else:
return CONF.keystone_authtoken[name]
@@ -592,24 +593,24 @@ class AuthProtocol(object):
# server.
if self._conf_get('auth_version'):
version_to_use = self._conf_get('auth_version')
- self.LOG.info('Auth Token proceeding with requested %s apis',
- version_to_use)
+ self._LOG.info('Auth Token proceeding with requested %s apis',
+ version_to_use)
else:
version_to_use = None
versions_supported_by_server = self._get_supported_versions()
if versions_supported_by_server:
- for version in LIST_OF_VERSIONS_TO_ATTEMPT:
+ for version in _LIST_OF_VERSIONS_TO_ATTEMPT:
if version in versions_supported_by_server:
version_to_use = version
break
if version_to_use:
- self.LOG.info('Auth Token confirmed use of %s apis',
- version_to_use)
+ self._LOG.info('Auth Token confirmed use of %s apis',
+ version_to_use)
else:
- self.LOG.error(
+ self._LOG.error(
'Attempted versions [%s] not in list supported by '
'server [%s]',
- ', '.join(LIST_OF_VERSIONS_TO_ATTEMPT),
+ ', '.join(_LIST_OF_VERSIONS_TO_ATTEMPT),
', '.join(versions_supported_by_server))
raise ServiceError('No compatible apis supported by server')
return version_to_use
@@ -618,24 +619,25 @@ class AuthProtocol(object):
versions = []
response, data = self._json_request('GET', '/')
if response.status_code == 501:
- self.LOG.warning('Old keystone installation found...assuming v2.0')
+ msg = 'Old keystone installation found...assuming v2.0'
+ self._LOG.warning(msg)
versions.append('v2.0')
elif response.status_code != 300:
- self.LOG.error('Unable to get version info from keystone: %s',
- response.status_code)
+ self._LOG.error('Unable to get version info from keystone: %s',
+ response.status_code)
raise ServiceError('Unable to get version info from keystone')
else:
try:
for version in data['versions']['values']:
versions.append(version['id'])
except KeyError:
- self.LOG.error(
+ self._LOG.error(
'Invalid version response format from server')
raise ServiceError('Unable to parse version response '
'from keystone')
- self.LOG.debug('Server reports support for api versions: %s',
- ', '.join(versions))
+ self._LOG.debug('Server reports support for api versions: %s',
+ ', '.join(versions))
return versions
def __call__(self, env, start_response):
@@ -645,7 +647,7 @@ class AuthProtocol(object):
we can't authenticate.
"""
- self.LOG.debug('Authenticating user token')
+ self._LOG.debug('Authenticating user token')
self._token_cache.initialize(env)
@@ -656,21 +658,21 @@ class AuthProtocol(object):
env['keystone.token_info'] = token_info
user_headers = self._build_user_headers(token_info)
self._add_headers(env, user_headers)
- return self.app(env, start_response)
+ return self._app(env, start_response)
except InvalidUserToken:
- if self.delay_auth_decision:
- self.LOG.info(
+ if self._delay_auth_decision:
+ self._LOG.info(
'Invalid user token - deferring reject downstream')
self._add_headers(env, {'X-Identity-Status': 'Invalid'})
- return self.app(env, start_response)
+ return self._app(env, start_response)
else:
- self.LOG.info('Invalid user token - rejecting request')
+ self._LOG.info('Invalid user token - rejecting request')
return self._reject_request(env, start_response)
except ServiceError as e:
- self.LOG.critical('Unable to obtain admin token: %s', e)
- resp = MiniResp('Service unavailable', env)
+ self._LOG.critical('Unable to obtain admin token: %s', e)
+ resp = _MiniResp('Service unavailable', env)
start_response('503 Service Unavailable', resp.headers)
return resp.body
@@ -701,8 +703,8 @@ class AuthProtocol(object):
'X-Tenant',
'X-Role',
)
- self.LOG.debug('Removing headers from request environment: %s',
- ','.join(auth_headers))
+ self._LOG.debug('Removing headers from request environment: %s',
+ ','.join(auth_headers))
self._remove_headers(env, auth_headers)
def _get_user_token_from_header(self, env):
@@ -718,10 +720,10 @@ class AuthProtocol(object):
if token:
return token
else:
- if not self.delay_auth_decision:
- self.LOG.warn('Unable to find authentication token'
- ' in headers')
- self.LOG.debug('Headers: %s', env)
+ if not self._delay_auth_decision:
+ self._LOG.warn('Unable to find authentication token'
+ ' in headers')
+ self._LOG.debug('Headers: %s', env)
raise InvalidUserToken('Unable to find token in headers')
def _reject_request(self, env, start_response):
@@ -732,12 +734,13 @@ class AuthProtocol(object):
:returns HTTPUnauthorized http response
"""
- headers = [('WWW-Authenticate', 'Keystone uri=\'%s\'' % self.auth_uri)]
- resp = MiniResp('Authentication required', env, headers)
+ header_val = 'Keystone uri=\'%s\'' % self._auth_uri
+ headers = [('WWW-Authenticate', header_val)]
+ resp = _MiniResp('Authentication required', env, headers)
start_response('401 Unauthorized', resp.headers)
return resp.body
- def get_admin_token(self):
+ def _get_admin_token(self):
"""Return admin token, possibly fetching a new one.
if self.admin_token_expiry is set from fetching an admin token, check
@@ -748,15 +751,15 @@ class AuthProtocol(object):
:raise ServiceError when unable to retrieve token from keystone
"""
- if self.admin_token_expiry:
- if will_expire_soon(self.admin_token_expiry):
- self.admin_token = None
+ if self._admin_token_expiry:
+ if _will_expire_soon(self._admin_token_expiry):
+ self._admin_token = None
- if not self.admin_token:
- (self.admin_token,
- self.admin_token_expiry) = self._request_admin_token()
+ if not self._admin_token:
+ (self._admin_token,
+ self._admin_token_expiry) = self._request_admin_token()
- return self.admin_token
+ return self._admin_token
def _http_request(self, method, path, **kwargs):
"""HTTP request helper used to make unspecified content type requests.
@@ -767,20 +770,20 @@ class AuthProtocol(object):
:raise ServerError when unable to communicate with keystone
"""
- url = '%s/%s' % (self.identity_uri, path.lstrip('/'))
+ url = '%s/%s' % (self._identity_uri, path.lstrip('/'))
- kwargs.setdefault('timeout', self.http_connect_timeout)
- if self.cert_file and self.key_file:
- kwargs['cert'] = (self.cert_file, self.key_file)
- elif self.cert_file or self.key_file:
- self.LOG.warn('Cannot use only a cert or key file. '
- 'Please provide both. Ignoring.')
+ kwargs.setdefault('timeout', self._http_connect_timeout)
+ if self._cert_file and self._key_file:
+ kwargs['cert'] = (self._cert_file, self._key_file)
+ elif self._cert_file or self._key_file:
+ self._LOG.warn('Cannot use only a cert or key file. '
+ 'Please provide both. Ignoring.')
- kwargs['verify'] = self.ssl_ca_file or True
- if self.ssl_insecure:
+ kwargs['verify'] = self._ssl_ca_file or True
+ if self._ssl_insecure:
kwargs['verify'] = False
- RETRIES = self.http_request_max_retries
+ RETRIES = self._http_request_max_retries
retry = 0
while True:
try:
@@ -788,10 +791,10 @@ class AuthProtocol(object):
break
except Exception as e:
if retry >= RETRIES:
- self.LOG.error('HTTP connection exception: %s', e)
+ self._LOG.error('HTTP connection exception: %s', e)
raise NetworkError('Unable to communicate with keystone')
# NOTE(vish): sleep 0.5, 1, 2
- self.LOG.warn('Retrying on HTTP connection exception: %s', e)
+ self._LOG.warn('Retrying on HTTP connection exception: %s', e)
time.sleep(2.0 ** retry / 2)
retry += 1
@@ -827,7 +830,7 @@ class AuthProtocol(object):
try:
data = jsonutils.loads(response.text)
except ValueError:
- self.LOG.debug('Keystone did not return json-encoded body')
+ self._LOG.debug('Keystone did not return json-encoded body')
data = {}
return response, data
@@ -846,10 +849,10 @@ class AuthProtocol(object):
params = {
'auth': {
'passwordCredentials': {
- 'username': self.admin_user,
- 'password': self.admin_password,
+ 'username': self._admin_user,
+ 'password': self._admin_password,
},
- 'tenantName': self.admin_tenant_name,
+ 'tenantName': self._admin_tenant_name,
}
}
@@ -865,12 +868,12 @@ class AuthProtocol(object):
datetime_expiry = timeutils.parse_isotime(expiry)
return (token, timeutils.normalize_time(datetime_expiry))
except (AssertionError, KeyError):
- self.LOG.warn(
+ self._LOG.warn(
'Unexpected response from keystone service: %s', data)
raise ServiceError('invalid json response')
except (ValueError):
data['access']['token']['id'] = '<SANITIZED>'
- self.LOG.warn(
+ self._LOG.warn(
'Unable to parse expiration time from token: %s', data)
raise ServiceError('invalid json response')
@@ -892,38 +895,38 @@ class AuthProtocol(object):
if cached:
data = cached
- if self.check_revocations_for_cached:
+ if self._check_revocations_for_cached:
# A token stored in Memcached might have been revoked
# regardless of initial mechanism used to validate it,
# and needs to be checked.
for tid in token_ids:
is_revoked = self._is_token_id_in_revoked_list(tid)
if is_revoked:
- self.LOG.debug(
+ self._LOG.debug(
'Token is marked as having been revoked')
raise InvalidUserToken(
'Token authorization failed')
elif cms.is_pkiz(user_token):
- verified = self.verify_pkiz_token(user_token, token_ids)
+ verified = self._verify_pkiz_token(user_token, token_ids)
data = jsonutils.loads(verified)
elif cms.is_asn1_token(user_token):
- verified = self.verify_signed_token(user_token, token_ids)
+ verified = self._verify_signed_token(user_token, token_ids)
data = jsonutils.loads(verified)
else:
- data = self.verify_uuid_token(user_token, retry)
- expires = confirm_token_not_expired(data)
+ data = self._verify_uuid_token(user_token, retry)
+ expires = _confirm_token_not_expired(data)
self._confirm_token_bind(data, env)
self._token_cache.store(token_id, data, expires)
return data
except NetworkError:
- self.LOG.debug('Token validation failure.', exc_info=True)
- self.LOG.warn('Authorization failed for token')
+ self._LOG.debug('Token validation failure.', exc_info=True)
+ self._LOG.warn('Authorization failed for token')
raise InvalidUserToken('Token authorization failed')
except Exception:
- self.LOG.debug('Token validation failure.', exc_info=True)
+ self._LOG.debug('Token validation failure.', exc_info=True)
if token_id:
self._token_cache.store_invalid(token_id)
- self.LOG.warn('Authorization failed for token')
+ self._LOG.warn('Authorization failed for token')
raise InvalidUserToken('Token authorization failed')
def _build_user_headers(self, token_info):
@@ -963,11 +966,11 @@ class AuthProtocol(object):
'X-Role': roles,
}
- self.LOG.debug('Received request from user: %s with project_id : %s'
- ' and roles: %s ',
- auth_ref.user_id, auth_ref.project_id, roles)
+ self._LOG.debug('Received request from user: %s with project_id : %s'
+ ' and roles: %s ',
+ auth_ref.user_id, auth_ref.project_id, roles)
- if self.include_service_catalog and auth_ref.has_service_catalog():
+ if self._include_service_catalog and auth_ref.has_service_catalog():
catalog = auth_ref.service_catalog.get_data()
if _token_is_v3(token_info):
catalog = _v3_to_v2_catalog(catalog)
@@ -1014,7 +1017,7 @@ class AuthProtocol(object):
def _confirm_token_bind(self, data, env):
bind_mode = self._conf_get('enforce_token_bind')
- if bind_mode == BIND_MODE.DISABLED:
+ if bind_mode == _BIND_MODE.DISABLED:
return
try:
@@ -1028,54 +1031,54 @@ class AuthProtocol(object):
bind = {}
# permissive and strict modes don't require there to be a bind
- permissive = bind_mode in (BIND_MODE.PERMISSIVE, BIND_MODE.STRICT)
+ permissive = bind_mode in (_BIND_MODE.PERMISSIVE, _BIND_MODE.STRICT)
if not bind:
if permissive:
# no bind provided and none required
return
else:
- self.LOG.info('No bind information present in token.')
+ self._LOG.info('No bind information present in token.')
self._invalid_user_token()
# get the named mode if bind_mode is not one of the predefined
- if permissive or bind_mode == BIND_MODE.REQUIRED:
+ if permissive or bind_mode == _BIND_MODE.REQUIRED:
name = None
else:
name = bind_mode
if name and name not in bind:
- self.LOG.info('Named bind mode %s not in bind information', name)
+ self._LOG.info('Named bind mode %s not in bind information', name)
self._invalid_user_token()
for bind_type, identifier in six.iteritems(bind):
- if bind_type == BIND_MODE.KERBEROS:
+ if bind_type == _BIND_MODE.KERBEROS:
if not env.get('AUTH_TYPE', '').lower() == 'negotiate':
- self.LOG.info('Kerberos credentials required and '
- 'not present.')
+ self._LOG.info('Kerberos credentials required and '
+ 'not present.')
self._invalid_user_token()
if not env.get('REMOTE_USER') == identifier:
- self.LOG.info('Kerberos credentials do not match '
- 'those in bind.')
+ self._LOG.info('Kerberos credentials do not match '
+ 'those in bind.')
self._invalid_user_token()
- self.LOG.debug('Kerberos bind authentication successful.')
+ self._LOG.debug('Kerberos bind authentication successful.')
- elif bind_mode == BIND_MODE.PERMISSIVE:
- self.LOG.debug('Ignoring Unknown bind for permissive mode: '
- '%(bind_type)s: %(identifier)s.',
- {'bind_type': bind_type,
- 'identifier': identifier})
+ elif bind_mode == _BIND_MODE.PERMISSIVE:
+ self._LOG.debug('Ignoring Unknown bind for permissive mode: '
+ '%(bind_type)s: %(identifier)s.',
+ {'bind_type': bind_type,
+ 'identifier': identifier})
else:
- self.LOG.info('Couldn`t verify unknown bind: %(bind_type)s: '
- '%(identifier)s.',
- {'bind_type': bind_type,
- 'identifier': identifier})
+ self._LOG.info('Couldn`t verify unknown bind: %(bind_type)s: '
+ '%(identifier)s.',
+ {'bind_type': bind_type,
+ 'identifier': identifier})
self._invalid_user_token()
- def verify_uuid_token(self, user_token, retry=True):
+ def _verify_uuid_token(self, user_token, retry=True):
"""Authenticate user token with keystone.
:param user_token: user's token id
@@ -1088,14 +1091,14 @@ class AuthProtocol(object):
"""
# Determine the highest api version we can use.
- if not self.auth_version:
- self.auth_version = self._choose_api_version()
+ if not self._auth_version:
+ self._auth_version = self._choose_api_version()
- if self.auth_version == 'v3.0':
- headers = {'X-Auth-Token': self.get_admin_token(),
- 'X-Subject-Token': safe_quote(user_token)}
+ if self._auth_version == 'v3.0':
+ headers = {'X-Auth-Token': self._get_admin_token(),
+ 'X-Subject-Token': _safe_quote(user_token)}
path = '/v3/auth/tokens'
- if not self.include_service_catalog:
+ if not self._include_service_catalog:
# NOTE(gyee): only v3 API support this option
path = path + '?nocatalog'
response, data = self._json_request(
@@ -1103,43 +1106,42 @@ class AuthProtocol(object):
path,
additional_headers=headers)
else:
- headers = {'X-Auth-Token': self.get_admin_token()}
+ headers = {'X-Auth-Token': self._get_admin_token()}
response, data = self._json_request(
'GET',
- '/v2.0/tokens/%s' % safe_quote(user_token),
+ '/v2.0/tokens/%s' % _safe_quote(user_token),
additional_headers=headers)
if response.status_code == 200:
return data
if response.status_code == 404:
- self.LOG.warn('Authorization failed for token')
+ self._LOG.warn('Authorization failed for token')
raise InvalidUserToken('Token authorization failed')
if response.status_code == 401:
- self.LOG.info(
- 'Keystone rejected admin token, resetting')
+ self._LOG.info('Keystone rejected admin token, resetting')
self.admin_token = None
else:
- self.LOG.error('Bad response code while validating token: %s',
- response.status_code)
+ self._LOG.error('Bad response code while validating token: %s',
+ response.status_code)
if retry:
- self.LOG.info('Retrying validation')
- return self.verify_uuid_token(user_token, False)
+ self._LOG.info('Retrying validation')
+ return self._verify_uuid_token(user_token, False)
else:
- self.LOG.warn('Invalid user token. Keystone response: %s', data)
+ self._LOG.warn('Invalid user token. Keystone response: %s', data)
raise InvalidUserToken()
- def is_signed_token_revoked(self, token_ids):
+ def _is_signed_token_revoked(self, token_ids):
"""Indicate whether the token appears in the revocation list."""
for token_id in token_ids:
if self._is_token_id_in_revoked_list(token_id):
- self.LOG.debug('Token is marked as having been revoked')
+ self._LOG.debug('Token is marked as having been revoked')
return True
return False
def _is_token_id_in_revoked_list(self, token_id):
"""Indicate whether the token_id appears in the revocation list."""
- revocation_list = self.token_revocation_list
+ revocation_list = self._token_revocation_list
revoked_tokens = revocation_list.get('revoked', None)
if not revoked_tokens:
return False
@@ -1147,7 +1149,7 @@ class AuthProtocol(object):
revoked_ids = (x['id'] for x in revoked_tokens)
return token_id in revoked_ids
- def cms_verify(self, data, inform=cms.PKI_ASN1_FORM):
+ def _cms_verify(self, data, inform=cms.PKI_ASN1_FORM):
"""Verifies the signature of the provided data's IAW CMS syntax.
If either of the certificate files might be missing, fetch them and
@@ -1155,19 +1157,19 @@ class AuthProtocol(object):
"""
def verify():
try:
- return cms.cms_verify(data, self.signing_cert_file_name,
- self.signing_ca_file_name,
+ return cms.cms_verify(data, self._signing_cert_file_name,
+ self._signing_ca_file_name,
inform=inform).decode('utf-8')
except cms.subprocess.CalledProcessError as err:
- self.LOG.warning('Verify error: %s', err)
+ self._LOG.warning('Verify error: %s', err)
raise
try:
return verify()
except exceptions.CertificateConfigError:
# the certs might be missing; unconditionally fetch to avoid racing
- self.fetch_signing_cert()
- self.fetch_ca_cert()
+ self._fetch_signing_cert()
+ self._fetch_ca_cert()
try:
# retry with certs in place
@@ -1175,79 +1177,79 @@ class AuthProtocol(object):
except exceptions.CertificateConfigError as err:
# if this is still occurring, something else is wrong and we
# need err.output to identify the problem
- self.LOG.error('CMS Verify output: %s', err.output)
+ self._LOG.error('CMS Verify output: %s', err.output)
raise
- def verify_signed_token(self, signed_text, token_ids):
+ def _verify_signed_token(self, signed_text, token_ids):
"""Check that the token is unrevoked and has a valid signature."""
- if self.is_signed_token_revoked(token_ids):
+ if self._is_signed_token_revoked(token_ids):
raise InvalidUserToken('Token has been revoked')
formatted = cms.token_to_cms(signed_text)
- verified = self.cms_verify(formatted)
+ verified = self._cms_verify(formatted)
return verified
- def verify_pkiz_token(self, signed_text, token_ids):
- if self.is_signed_token_revoked(token_ids):
+ def _verify_pkiz_token(self, signed_text, token_ids):
+ if self._is_signed_token_revoked(token_ids):
raise InvalidUserToken('Token has been revoked')
try:
uncompressed = cms.pkiz_uncompress(signed_text)
- verified = self.cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
+ verified = self._cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
return verified
# TypeError If the signed_text is not zlib compressed
except TypeError:
raise InvalidUserToken(signed_text)
- def verify_signing_dir(self):
- if os.path.exists(self.signing_dirname):
- if not os.access(self.signing_dirname, os.W_OK):
+ def _verify_signing_dir(self):
+ if os.path.exists(self._signing_dirname):
+ if not os.access(self._signing_dirname, os.W_OK):
raise ConfigurationError(
- 'unable to access signing_dir %s' % self.signing_dirname)
+ 'unable to access signing_dir %s' % self._signing_dirname)
uid = os.getuid()
- if os.stat(self.signing_dirname).st_uid != uid:
- self.LOG.warning(
- 'signing_dir is not owned by %s', uid)
- current_mode = stat.S_IMODE(os.stat(self.signing_dirname).st_mode)
+ if os.stat(self._signing_dirname).st_uid != uid:
+ self._LOG.warning('signing_dir is not owned by %s', uid)
+ current_mode = stat.S_IMODE(os.stat(self._signing_dirname).st_mode)
if current_mode != stat.S_IRWXU:
- self.LOG.warning(
+ self._LOG.warning(
'signing_dir mode is %s instead of %s',
oct(current_mode), oct(stat.S_IRWXU))
else:
- os.makedirs(self.signing_dirname, stat.S_IRWXU)
+ os.makedirs(self._signing_dirname, stat.S_IRWXU)
@property
- def token_revocation_list_fetched_time(self):
- if not self._token_revocation_list_fetched_time:
+ def _token_revocation_list_fetched_time(self):
+ if not self._token_revocation_list_fetched_time_prop:
# If the fetched list has been written to disk, use its
# modification time.
- if os.path.exists(self.revoked_file_name):
- mtime = os.path.getmtime(self.revoked_file_name)
+ if os.path.exists(self._revoked_file_name):
+ mtime = os.path.getmtime(self._revoked_file_name)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
# Otherwise the list will need to be fetched.
else:
fetched_time = datetime.datetime.min
- self._token_revocation_list_fetched_time = fetched_time
- return self._token_revocation_list_fetched_time
+ self._token_revocation_list_fetched_time_prop = fetched_time
+ return self._token_revocation_list_fetched_time_prop
- @token_revocation_list_fetched_time.setter
- def token_revocation_list_fetched_time(self, value):
- self._token_revocation_list_fetched_time = value
+ @_token_revocation_list_fetched_time.setter
+ def _token_revocation_list_fetched_time(self, value):
+ self._token_revocation_list_fetched_time_prop = value
@property
- def token_revocation_list(self):
- timeout = (self.token_revocation_list_fetched_time +
- self.token_revocation_list_cache_timeout)
+ def _token_revocation_list(self):
+ timeout = (self._token_revocation_list_fetched_time +
+ self._token_revocation_list_cache_timeout)
list_is_current = timeutils.utcnow() < timeout
if list_is_current:
# Load the list from disk if required
- if not self._token_revocation_list:
+ if not self._token_revocation_list_prop:
open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
- with open(self.revoked_file_name, 'r', **open_kwargs) as f:
- self._token_revocation_list = jsonutils.loads(f.read())
+ with open(self._revoked_file_name, 'r', **open_kwargs) as f:
+ self._token_revocation_list_prop = jsonutils.loads(
+ f.read())
else:
- self.token_revocation_list = self.fetch_revocation_list()
- return self._token_revocation_list
+ self._token_revocation_list = self._fetch_revocation_list()
+ return self._token_revocation_list_prop
def _atomic_write_to_signing_dir(self, file_name, value):
# In Python2, encoding is slow so the following check avoids it if it
@@ -1256,7 +1258,7 @@ class AuthProtocol(object):
value = value.encode('utf-8')
def _atomic_write(destination, data):
- with tempfile.NamedTemporaryFile(dir=self.signing_dirname,
+ with tempfile.NamedTemporaryFile(dir=self._signing_dirname,
delete=False) as f:
f.write(data)
os.rename(f.name, destination)
@@ -1264,41 +1266,41 @@ class AuthProtocol(object):
try:
_atomic_write(file_name, value)
except (OSError, IOError):
- self.verify_signing_dir()
+ self._verify_signing_dir()
_atomic_write(file_name, value)
- @token_revocation_list.setter
- def token_revocation_list(self, value):
+ @_token_revocation_list.setter
+ def _token_revocation_list(self, value):
"""Save a revocation list to memory and to disk.
:param value: A json-encoded revocation list
"""
- self._token_revocation_list = jsonutils.loads(value)
- self.token_revocation_list_fetched_time = timeutils.utcnow()
- self._atomic_write_to_signing_dir(self.revoked_file_name, value)
+ self._token_revocation_list_prop = jsonutils.loads(value)
+ self._token_revocation_list_fetched_time = timeutils.utcnow()
+ self._atomic_write_to_signing_dir(self._revoked_file_name, value)
- def fetch_revocation_list(self, retry=True):
- headers = {'X-Auth-Token': self.get_admin_token()}
+ def _fetch_revocation_list(self, retry=True):
+ headers = {'X-Auth-Token': self._get_admin_token()}
response, data = self._json_request('GET', '/v2.0/tokens/revoked',
additional_headers=headers)
if response.status_code == 401:
if retry:
- self.LOG.info(
+ self._LOG.info(
'Keystone rejected admin token, resetting admin token')
- self.admin_token = None
- return self.fetch_revocation_list(retry=False)
+ self._admin_token = None
+ return self._fetch_revocation_list(retry=False)
if response.status_code != 200:
raise ServiceError('Unable to fetch token revocation list.')
if 'signed' not in data:
raise ServiceError('Revocation list improperly formatted.')
- return self.cms_verify(data['signed'])
+ return self._cms_verify(data['signed'])
def _fetch_cert_file(self, cert_file_name, cert_type):
- if not self.auth_version:
- self.auth_version = self._choose_api_version()
+ if not self._auth_version:
+ self._auth_version = self._choose_api_version()
- if self.auth_version == 'v3.0':
+ if self._auth_version == 'v3.0':
if cert_type == 'signing':
cert_type = 'certificates'
path = '/v3/OS-SIMPLE-CERT/' + cert_type
@@ -1309,14 +1311,14 @@ class AuthProtocol(object):
raise exceptions.CertificateConfigError(response.text)
self._atomic_write_to_signing_dir(cert_file_name, response.text)
- def fetch_signing_cert(self):
- self._fetch_cert_file(self.signing_cert_file_name, 'signing')
+ def _fetch_signing_cert(self):
+ self._fetch_cert_file(self._signing_cert_file_name, 'signing')
- def fetch_ca_cert(self):
- self._fetch_cert_file(self.signing_ca_file_name, 'ca')
+ def _fetch_ca_cert(self):
+ self._fetch_cert_file(self._signing_ca_file_name, 'ca')
-class CachePool(list):
+class _CachePool(list):
"""A lazy pool of cache references."""
def __init__(self, cache, memcached_servers):
@@ -1343,7 +1345,7 @@ class CachePool(list):
self.append(c)
-class TokenCache(object):
+class _TokenCache(object):
"""Encapsulates the auth_token token cache functionality.
auth_token caches tokens that it's seen so that when a token is re-used the
@@ -1359,12 +1361,13 @@ class TokenCache(object):
"""
+ _CACHE_KEY_TEMPLATE = 'tokens/%s'
_INVALID_INDICATOR = 'invalid'
def __init__(self, log, cache_time=None, hash_algorithms=None,
env_cache_name=None, memcached_servers=None,
memcache_security_strategy=None, memcache_secret_key=None):
- self.LOG = log
+ self._LOG = log
self._cache_time = cache_time
self._hash_algorithms = hash_algorithms
self._env_cache_name = env_cache_name
@@ -1386,8 +1389,8 @@ class TokenCache(object):
if self._initialized:
return
- self._cache_pool = CachePool(env.get(self._env_cache_name),
- self._memcached_servers)
+ self._cache_pool = _CachePool(env.get(self._env_cache_name),
+ self._memcached_servers)
self._initialized = True
def get(self, user_token):
@@ -1429,12 +1432,12 @@ class TokenCache(object):
quick check of token freshness on retrieval.
"""
- self.LOG.debug('Storing token in cache')
+ self._LOG.debug('Storing token in cache')
self._cache_store(token_id, (data, expires))
def store_invalid(self, token_id):
"""Store invalid token in cache."""
- self.LOG.debug('Marking token as unauthorized in cache')
+ self._LOG.debug('Marking token as unauthorized in cache')
self._cache_store(token_id, self._INVALID_INDICATOR)
def _assert_valid_memcache_protection_config(self):
@@ -1459,7 +1462,7 @@ class TokenCache(object):
return
if self._memcache_security_strategy is None:
- key = CACHE_KEY_TEMPLATE % token_id
+ key = self._CACHE_KEY_TEMPLATE % token_id
with self._cache_pool.reserve() as cache:
serialized = cache.get(key)
else:
@@ -1473,7 +1476,7 @@ class TokenCache(object):
token_id,
secret_key,
security_strategy)
- cache_key = CACHE_KEY_TEMPLATE % (
+ cache_key = self._CACHE_KEY_TEMPLATE % (
memcache_crypt.get_cache_key(keys))
with self._cache_pool.reserve() as cache:
raw_cached = cache.get(cache_key)
@@ -1483,7 +1486,7 @@ class TokenCache(object):
raw_cached)
except Exception:
msg = 'Failed to decrypt/verify cache data'
- self.LOG.exception(msg)
+ self._LOG.exception(msg)
# this should have the same effect as data not
# found in cache
serialized = None
@@ -1498,7 +1501,7 @@ class TokenCache(object):
serialized = serialized.decode('utf-8')
cached = jsonutils.loads(serialized)
if cached == self._INVALID_INDICATOR:
- self.LOG.debug('Cached Token is marked unauthorized')
+ self._LOG.debug('Cached Token is marked unauthorized')
raise InvalidUserToken('Token authorization failed')
data, expires = cached
@@ -1514,10 +1517,10 @@ class TokenCache(object):
expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if utcnow < expires:
- self.LOG.debug('Returning cached token')
+ self._LOG.debug('Returning cached token')
return data
else:
- self.LOG.debug('Cached Token seems expired')
+ self._LOG.debug('Cached Token seems expired')
raise InvalidUserToken('Token authorization failed')
def _cache_store(self, token_id, data):
@@ -1530,7 +1533,7 @@ class TokenCache(object):
if isinstance(serialized_data, six.text_type):
serialized_data = serialized_data.encode('utf-8')
if self._memcache_security_strategy is None:
- cache_key = CACHE_KEY_TEMPLATE % token_id
+ cache_key = self._CACHE_KEY_TEMPLATE % token_id
data_to_store = serialized_data
else:
secret_key = self._memcache_secret_key
@@ -1541,7 +1544,8 @@ class TokenCache(object):
security_strategy = security_strategy.encode('utf-8')
keys = memcache_crypt.derive_keys(
token_id, secret_key, security_strategy)
- cache_key = CACHE_KEY_TEMPLATE % memcache_crypt.get_cache_key(keys)
+ cache_key = memcache_crypt.get_cache_key(keys)
+ cache_key = self._CACHE_KEY_TEMPLATE % cache_key
data_to_store = memcache_crypt.protect_data(keys, serialized_data)
with self._cache_pool.reserve() as cache:
diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py
index b464b4c..49a683b 100644
--- a/keystonemiddleware/tests/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/test_auth_token_middleware.py
@@ -250,15 +250,15 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.fake_app(self.expected_env), self.conf)
self.middleware._iso8601 = iso8601
- with tempfile.NamedTemporaryFile(dir=self.middleware.signing_dirname,
+ with tempfile.NamedTemporaryFile(dir=self.middleware._signing_dirname,
delete=False) as f:
pass
- self.middleware.revoked_file_name = f.name
+ self.middleware._revoked_file_name = f.name
self.addCleanup(cleanup_revoked_file,
- self.middleware.revoked_file_name)
+ self.middleware._revoked_file_name)
- self.middleware.token_revocation_list = jsonutils.dumps(
+ self.middleware._token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
def start_fake_response(self, status, headers):
@@ -295,8 +295,8 @@ class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
"%s/v2.0/tokens/revoked" % BASE_URI,
responses=responses)
- fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
- self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
+ fetched = jsonutils.loads(self.middleware._fetch_revocation_list())
+ self.assertEqual(fetched, self.examples.REVOCATION_LIST)
# Check that 4 requests have been made
self.assertEqual(len(httpretty.httpretty.latest_requests), 4)
@@ -436,10 +436,10 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_will_expire_soon(self):
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=10)
- self.assertTrue(auth_token.will_expire_soon(tenseconds))
+ self.assertTrue(auth_token._will_expire_soon(tenseconds))
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=40)
- self.assertFalse(auth_token.will_expire_soon(fortyseconds))
+ self.assertFalse(auth_token._will_expire_soon(fortyseconds))
def test_token_is_v2_accepts_v2(self):
token = self.examples.UUID_TOKEN_DEFAULT
@@ -557,7 +557,7 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_uri': 'https://keystone.example.com:1234',
}
middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(middleware.token_revocation_list_cache_timeout,
+ self.assertEqual(middleware._token_revocation_list_cache_timeout,
datetime.timedelta(seconds=24))
@@ -581,7 +581,7 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234'
- self.assertEqual(expected_auth_uri, self.middleware.auth_uri)
+ self.assertEqual(expected_auth_uri, self.middleware._auth_uri)
def assert_valid_request_200(self, token, with_catalog=True):
req = webob.Request.blank('/')
@@ -614,7 +614,7 @@ class CommonAuthTokenMiddlewareTest(object):
def _test_cache_revoked(self, token, revoked_form=None):
# When the token is cached and revoked, 401 is returned.
- self.middleware.check_revocations_for_cached = True
+ self.middleware._check_revocations_for_cached = True
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
@@ -624,7 +624,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual(200, self.response_status)
# Put it in revocation list.
- self.middleware.token_revocation_list = self.get_revocation_list_json(
+ self.middleware._token_revocation_list = self.get_revocation_list_json(
token_ids=[revoked_form or token])
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(401, self.response_status)
@@ -647,7 +647,8 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertLastPath(None)
def test_revoked_token_receives_401(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
+ self.middleware._token_revocation_list = (
+ self.get_revocation_list_json())
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
@@ -656,7 +657,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_revoked_token_receives_401_sha256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
- self.middleware.token_revocation_list = (
+ self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
@@ -675,7 +676,8 @@ class CommonAuthTokenMiddlewareTest(object):
# considered revoked so returns 401.
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
- self.middleware.token_revocation_list = self.get_revocation_list_json()
+ self.middleware._token_revocation_list = (
+ self.get_revocation_list_json())
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
@@ -696,7 +698,7 @@ class CommonAuthTokenMiddlewareTest(object):
# Put the token in the revocation list.
token_hashed = cms.cms_hash_token(token)
- self.middleware.token_revocation_list = self.get_revocation_list_json(
+ self.middleware._token_revocation_list = self.get_revocation_list_json(
token_ids=[token_hashed])
# First, request is using the hashed token, is valid so goes in
@@ -723,50 +725,52 @@ class CommonAuthTokenMiddlewareTest(object):
def test_is_signed_token_revoked_returns_false(self):
#explicitly setting an empty revocation list here to document intent
- self.middleware.token_revocation_list = jsonutils.dumps(
+ self.middleware._token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
- result = self.middleware.is_signed_token_revoked(
+ result = self.middleware._is_signed_token_revoked(
[self.token_dict['revoked_token_hash']])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- result = self.middleware.is_signed_token_revoked(
+ self.middleware._token_revocation_list = (
+ self.get_revocation_list_json())
+ result = self.middleware._is_signed_token_revoked(
[self.token_dict['revoked_token_hash']])
self.assertTrue(result)
def test_is_signed_token_revoked_returns_true_sha256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
- self.middleware.token_revocation_list = (
+ self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
- result = self.middleware.is_signed_token_revoked(
+ result = self.middleware._is_signed_token_revoked(
[self.token_dict['revoked_token_hash_sha256']])
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
+ self.middleware._token_revocation_list = (
+ self.get_revocation_list_json())
self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_signed_token,
+ self.middleware._verify_signed_token,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash']])
def test_verify_signed_token_raises_exception_for_revoked_token_s256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
- self.middleware.token_revocation_list = (
+ self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_signed_token,
+ self.middleware._verify_signed_token,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash_sha256'],
self.token_dict['revoked_token_hash']])
def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self):
- self.middleware.token_revocation_list = (
+ self.middleware._token_revocation_list = (
self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_pkiz_token,
+ self.middleware._verify_pkiz_token,
self.token_dict['revoked_token_pkiz'],
[self.token_dict['revoked_token_pkiz_hash']])
@@ -774,15 +778,17 @@ class CommonAuthTokenMiddlewareTest(object):
json.loads(text)
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- text = self.middleware.verify_signed_token(
+ self.middleware._token_revocation_list = (
+ self.get_revocation_list_json())
+ text = self.middleware._verify_signed_token(
self.token_dict['signed_token_scoped'],
[self.token_dict['signed_token_scoped_hash']])
self.assertIsValidJSON(text)
def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- text = self.middleware.verify_pkiz_token(
+ self.middleware._token_revocation_list = (
+ self.get_revocation_list_json())
+ text = self.middleware._verify_pkiz_token(
self.token_dict['signed_token_scoped_pkiz'],
[self.token_dict['signed_token_scoped_hash']])
self.assertIsValidJSON(text)
@@ -790,9 +796,9 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
- self.middleware.token_revocation_list = (
+ self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
- text = self.middleware.verify_signed_token(
+ text = self.middleware._verify_signed_token(
self.token_dict['signed_token_scoped'],
[self.token_dict['signed_token_scoped_hash_sha256'],
self.token_dict['signed_token_scoped_hash']])
@@ -801,64 +807,65 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signing_dir_create_while_missing(self):
tmp_name = uuid.uuid4().hex
test_parent_signing_dir = "/tmp/%s" % tmp_name
- self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
- self.middleware.signing_cert_file_name = (
- "%s/test.pem" % self.middleware.signing_dirname)
- self.middleware.verify_signing_dir()
+ self.middleware._signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
+ self.middleware._signing_cert_file_name = (
+ "%s/test.pem" % self.middleware._signing_dirname)
+ self.middleware._verify_signing_dir()
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
- self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
- self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
- self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
+ self.assertTrue(os.path.isdir(self.middleware._signing_dirname))
+ self.assertTrue(os.access(self.middleware._signing_dirname, os.W_OK))
+ self.assertEqual(os.stat(self.middleware._signing_dirname).st_uid,
os.getuid())
self.assertEqual(
- stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
+ stat.S_IMODE(os.stat(self.middleware._signing_dirname).st_mode),
stat.S_IRWXU)
shutil.rmtree(test_parent_signing_dir)
def test_get_token_revocation_list_fetched_time_returns_min(self):
- self.middleware.token_revocation_list_fetched_time = None
- self.middleware.revoked_file_name = ''
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
+ self.middleware._token_revocation_list_fetched_time = None
+ self.middleware._revoked_file_name = ''
+ self.assertEqual(self.middleware._token_revocation_list_fetched_time,
datetime.datetime.min)
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
- self.middleware.token_revocation_list_fetched_time = None
- mtime = os.path.getmtime(self.middleware.revoked_file_name)
+ self.middleware._token_revocation_list_fetched_time = None
+ mtime = os.path.getmtime(self.middleware._revoked_file_name)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
- self.middleware.token_revocation_list_fetched_time)
+ self.middleware._token_revocation_list_fetched_time)
@testtools.skipUnless(TimezoneFixture.supported(),
'TimezoneFixture not supported')
def test_get_token_revocation_list_fetched_time_returns_utc(self):
with TimezoneFixture('UTC-1'):
- self.middleware.token_revocation_list = jsonutils.dumps(
+ self.middleware._token_revocation_list = jsonutils.dumps(
self.examples.REVOCATION_LIST)
- self.middleware.token_revocation_list_fetched_time = None
- fetched_time = self.middleware.token_revocation_list_fetched_time
+ self.middleware._token_revocation_list_fetched_time = None
+ fetched_time = self.middleware._token_revocation_list_fetched_time
self.assertTrue(timeutils.is_soon(fetched_time, 1))
def test_get_token_revocation_list_fetched_time_returns_value(self):
expected = self.middleware._token_revocation_list_fetched_time
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
+ self.assertEqual(self.middleware._token_revocation_list_fetched_time,
expected)
def test_get_revocation_list_returns_fetched_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
- self.middleware.token_revocation_list_fetched_time = None
- os.remove(self.middleware.revoked_file_name)
- self.assertEqual(self.middleware.token_revocation_list,
+ self.middleware._token_revocation_list_fetched_time = None
+ os.remove(self.middleware._revoked_file_name)
+ self.assertEqual(self.middleware._token_revocation_list,
self.examples.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
- self.assertEqual(self.middleware.token_revocation_list,
- self.middleware._token_revocation_list)
+ self.assertEqual(self.middleware._token_revocation_list,
+ self.middleware._token_revocation_list_prop)
def test_get_revocation_list_returns_current_list_from_disk(self):
- in_memory_list = self.middleware.token_revocation_list
- self.middleware._token_revocation_list = None
- self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
+ in_memory_list = self.middleware._token_revocation_list
+ self.middleware._token_revocation_list_prop = None
+ self.assertEqual(self.middleware._token_revocation_list,
+ in_memory_list)
def test_invalid_revocation_list_raises_service_error(self):
httpretty.register_uri(httpretty.GET,
@@ -867,13 +874,13 @@ class CommonAuthTokenMiddlewareTest(object):
status=200)
self.assertRaises(auth_token.ServiceError,
- self.middleware.fetch_revocation_list)
+ self.middleware._fetch_revocation_list)
def test_fetch_revocation_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
- fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
- self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
+ fetched = jsonutils.loads(self.middleware._fetch_revocation_list())
+ self.assertEqual(fetched, self.examples.REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection
@@ -923,12 +930,12 @@ class CommonAuthTokenMiddlewareTest(object):
def debug(self, msg=None, *args, **kwargs):
self.debugmsg = msg
- self.middleware.LOG = FakeLog()
- self.middleware.delay_auth_decision = False
+ self.middleware._LOG = FakeLog()
+ self.middleware._delay_auth_decision = False
self.assertRaises(auth_token.InvalidUserToken,
self.middleware._get_user_token_from_header, {})
- self.assertIsNotNone(self.middleware.LOG.msg)
- self.assertIsNotNone(self.middleware.LOG.debugmsg)
+ self.assertIsNotNone(self.middleware._LOG.msg)
+ self.assertIsNotNone(self.middleware._LOG.debugmsg)
def test_request_no_token_http(self):
req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
@@ -1042,7 +1049,7 @@ class CommonAuthTokenMiddlewareTest(object):
"""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = ERROR_TOKEN
- self.middleware.http_request_max_retries = 0
+ self.middleware._http_request_max_retries = 0
self.middleware(req.environ, self.start_fake_response)
self.assertIsNone(self._get_cached_token(ERROR_TOKEN))
self.assert_valid_last_url(ERROR_TOKEN)
@@ -1277,7 +1284,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
"%s%s" % (BASE_URI, self.signing_path),
status=404)
self.assertRaises(exceptions.CertificateConfigError,
- self.middleware.verify_signed_token,
+ self.middleware._verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED,
[self.examples.SIGNED_TOKEN_SCOPED_HASH])
@@ -1286,9 +1293,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_URI, self.signing_path),
body=data)
- self.middleware.fetch_signing_cert()
+ self.middleware._fetch_signing_cert()
- with open(self.middleware.signing_cert_file_name, 'r') as f:
+ with open(self.middleware._signing_cert_file_name, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual("/testadmin%s" % self.signing_path,
@@ -1299,9 +1306,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_URI, self.ca_path),
body=data)
- self.middleware.fetch_ca_cert()
+ self.middleware._fetch_ca_cert()
- with open(self.middleware.signing_ca_file_name, 'r') as f:
+ with open(self.middleware._signing_ca_file_name, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual("/testadmin%s" % self.ca_path,
@@ -1323,12 +1330,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.set_middleware(conf=self.conf)
- self.middleware.fetch_ca_cert()
+ self.middleware._fetch_ca_cert()
self.assertEqual('/newadmin%s' % self.ca_path,
httpretty.last_request().path)
- self.middleware.fetch_signing_cert()
+ self.middleware._fetch_signing_cert()
self.assertEqual('/newadmin%s' % self.signing_path,
httpretty.last_request().path)
@@ -1349,12 +1356,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.set_middleware(conf=self.conf)
- self.middleware.fetch_ca_cert()
+ self.middleware._fetch_ca_cert()
self.assertEqual(self.ca_path,
httpretty.last_request().path)
- self.middleware.fetch_signing_cert()
+ self.middleware._fetch_signing_cert()
self.assertEqual(self.signing_path,
httpretty.last_request().path)
@@ -1715,10 +1722,10 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
class TokenEncodingTest(testtools.TestCase):
def test_unquoted_token(self):
- self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
+ self.assertEqual('foo%20bar', auth_token._safe_quote('foo bar'))
def test_quoted_token(self):
- self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
+ self.assertEqual('foo%20bar', auth_token._safe_quote('foo%20bar'))
class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
@@ -1790,25 +1797,25 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def test_no_data(self):
data = {}
self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
+ auth_token._confirm_token_not_expired,
data)
def test_bad_data(self):
data = {'my_happy_token_dict': 'woo'}
self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
+ auth_token._confirm_token_not_expired,
data)
def test_v2_token_not_expired(self):
data = self.create_v2_token_fixture()
expected_expires = data['access']['token']['expires']
- actual_expires = auth_token.confirm_token_not_expired(data)
+ actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v2_token_expired(self):
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
+ auth_token._confirm_token_not_expired,
data)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@@ -1819,7 +1826,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
- actual_expires = auth_token.confirm_token_not_expired(data)
+ actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@@ -1831,19 +1838,19 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires='2000-01-01T00:05:10.000123+05:00')
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
+ auth_token._confirm_token_not_expired,
data)
def test_v3_token_not_expired(self):
data = self.create_v3_token_fixture()
expected_expires = data['token']['expires_at']
- actual_expires = auth_token.confirm_token_not_expired(data)
+ actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v3_token_expired(self):
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
+ auth_token._confirm_token_not_expired,
data)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@@ -1855,7 +1862,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
- actual_expires = auth_token.confirm_token_not_expired(data)
+ actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@@ -1866,7 +1873,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
+ auth_token._confirm_token_not_expired,
data)
def test_cached_token_not_expired(self):