summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keystonemiddleware/auth_token.py294
-rw-r--r--keystonemiddleware/tests/client_fixtures.py39
-rw-r--r--keystonemiddleware/tests/test_auth_token_middleware.py411
3 files changed, 613 insertions, 131 deletions
diff --git a/keystonemiddleware/auth_token.py b/keystonemiddleware/auth_token.py
index 328172e..7a07858 100644
--- a/keystonemiddleware/auth_token.py
+++ b/keystonemiddleware/auth_token.py
@@ -41,6 +41,9 @@ Coming in from initial call from client or customer
HTTP_X_AUTH_TOKEN
The client token being passed in.
+HTTP_X_SERVICE_TOKEN
+ A service token being passed in.
+
HTTP_X_STORAGE_TOKEN
The client token being passed in (legacy Rackspace use) to support
swift/cloud files
@@ -55,55 +58,61 @@ WWW-Authenticate
What we add to the request for use by the OpenStack service
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+When using composite authentication (a user and service token are
+present) additional service headers relating to the service user
+will be added. They take the same form as the standard headers but add
+'_SERVICE_'. These headers will not exist in the environment if no
+service token is present.
+
HTTP_X_IDENTITY_STATUS
'Confirmed' or 'Invalid'
The underlying service will only see a value of 'Invalid' if the Middleware
is configured to run in 'delay_auth_decision' mode
-HTTP_X_DOMAIN_ID
+HTTP_X_DOMAIN_ID, HTTP_X_SERVICE_DOMAIN_ID
Identity service managed unique identifier, string. Only present if
this is a domain-scoped v3 token.
-HTTP_X_DOMAIN_NAME
+HTTP_X_DOMAIN_NAME, HTTP_X_SERVICE_DOMAIN_NAME
Unique domain name, string. Only present if this is a domain-scoped
v3 token.
-HTTP_X_PROJECT_ID
+HTTP_X_PROJECT_ID, HTTP_X_SERVICE_PROJECT_ID
Identity service managed unique identifier, string. Only present if
this is a project-scoped v3 token, or a tenant-scoped v2 token.
-HTTP_X_PROJECT_NAME
+HTTP_X_PROJECT_NAME, HTTP_X_SERVICE_PROJECT_NAME
Project name, unique within owning domain, string. Only present if
this is a project-scoped v3 token, or a tenant-scoped v2 token.
-HTTP_X_PROJECT_DOMAIN_ID
+HTTP_X_PROJECT_DOMAIN_ID, HTTP_X_SERVICE_PROJECT_DOMAIN_ID
Identity service managed unique identifier of owning domain of
project, string. Only present if this is a project-scoped v3 token. If
this variable is set, this indicates that the PROJECT_NAME can only
be assumed to be unique within this domain.
-HTTP_X_PROJECT_DOMAIN_NAME
+HTTP_X_PROJECT_DOMAIN_NAME, HTTP_X_SERVICE_PROJECT_DOMAIN_NAME
Name of owning domain of project, string. Only present if this is a
project-scoped v3 token. If this variable is set, this indicates that
the PROJECT_NAME can only be assumed to be unique within this domain.
-HTTP_X_USER_ID
+HTTP_X_USER_ID, HTTP_X_SERVICE_USER_ID
Identity-service managed unique identifier, string
-HTTP_X_USER_NAME
+HTTP_X_USER_NAME, HTTP_X_SERVICE_USER_NAME
User identifier, unique within owning domain, string
-HTTP_X_USER_DOMAIN_ID
+HTTP_X_USER_DOMAIN_ID, HTTP_X_SERVICE_USER_DOMAIN_ID
Identity service managed unique identifier of owning domain of
user, string. If this variable is set, this indicates that the USER_NAME
can only be assumed to be unique within this domain.
-HTTP_X_USER_DOMAIN_NAME
+HTTP_X_USER_DOMAIN_NAME, HTTP_X_SERVICE_USER_DOMAIN_NAME
Name of owning domain of user, string. If this variable is set, this
indicates that the USER_NAME can only be assumed to be unique within
this domain.
-HTTP_X_ROLES
+HTTP_X_ROLES, HTTP_X_SERVICE_ROLES
Comma delimited list of case-sensitive role names
HTTP_X_SERVICE_CATALOG
@@ -111,6 +120,11 @@ HTTP_X_SERVICE_CATALOG
For compatibility reasons this catalog will always be in the V2 catalog
format even if it is a v3 token.
+ Note: This is an exception in that it contains 'SERVICE' but relates to a
+ user token, not a service token. The existing user's
+ catalog can be very large; it was decided not to present a catalog
+ relating to the service token to avoid using more HTTP header space.
+
HTTP_X_TENANT_ID
*Deprecated* in favor of HTTP_X_PROJECT_ID
Identity service managed unique identifier, string. For v3 tokens, this
@@ -345,6 +359,26 @@ CONF.register_opts(_OPTS, group='keystone_authtoken')
_LIST_OF_VERSIONS_TO_ATTEMPT = ['v3.0', 'v2.0']
+_HEADER_TEMPLATE = {
+ 'X%s-Domain-Id': 'domain_id',
+ 'X%s-Domain-Name': 'domain_name',
+ 'X%s-Project-Id': 'project_id',
+ 'X%s-Project-Name': 'project_name',
+ 'X%s-Project-Domain-Id': 'project_domain_id',
+ 'X%s-Project-Domain-Name': 'project_domain_name',
+ 'X%s-User-Id': 'user_id',
+ 'X%s-User-Name': 'username',
+ 'X%s-User-Domain-Id': 'user_domain_id',
+ 'X%s-User-Domain-Name': 'user_domain_name',
+}
+
+_DEPRECATED_HEADER_TEMPLATE = {
+ 'X-User': 'username',
+ 'X-Tenant-Id': 'project_id',
+ 'X-Tenant-Name': 'project_name',
+ 'X-Tenant': 'project_name',
+}
+
class _BIND_MODE:
DISABLED = 'disabled'
@@ -374,13 +408,13 @@ def _token_is_v3(token_info):
def _get_token_expiration(data):
if not data:
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
if _token_is_v2(data):
timestamp = data['access']['token']['expires']
elif _token_is_v3(data):
timestamp = data['token']['expires_at']
else:
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp)
expires = timeutils.normalize_time(expires)
return expires
@@ -390,7 +424,7 @@ def _confirm_token_not_expired(data):
expires = _get_token_expiration(data)
utcnow = timeutils.utcnow()
if utcnow >= expires:
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
return timeutils.isotime(at=expires, subsecond=True)
@@ -456,7 +490,7 @@ def _conf_values_type_convert(conf):
return opts
-class InvalidUserToken(Exception):
+class InvalidToken(Exception):
pass
@@ -603,6 +637,7 @@ class AuthProtocol(object):
self._check_revocations_for_cached = self._conf_get(
'check_revocations_for_cached')
+ self._init_auth_headers()
def _conf_get(self, name):
# try config from paste-deploy first
@@ -630,29 +665,51 @@ class AuthProtocol(object):
we can't authenticate.
"""
- self._LOG.debug('Authenticating user token')
+ def _fmt_msg(env):
+ msg = ('user: user_id %s, project_id %s, roles %s '
+ 'service: user_id %s, project_id %s, roles %s' % (
+ env.get('X_USER_ID'), env.get('X_PROJECT_ID'),
+ env.get('X_ROLES'), env.get('X_SERVICE_USER_ID'),
+ env.get('X_SERVICE_PROJECT_ID'),
+ env.get('X_SERVICE_ROLES')))
+ return msg
self._token_cache.initialize(env)
+ self._remove_auth_headers(env)
try:
- self._remove_auth_headers(env)
- user_token = self._get_user_token_from_header(env)
- token_info = self._validate_user_token(user_token, env)
- auth_ref = access.AccessInfo.factory(body=token_info)
- env['keystone.token_info'] = token_info
- env['keystone.token_auth'] = _UserAuthPlugin(user_token, auth_ref)
- user_headers = self._build_user_headers(auth_ref, token_info)
- self._add_headers(env, user_headers)
- return self._call_app(env, start_response)
-
- except InvalidUserToken:
- if self._delay_auth_decision:
- self._LOG.info(
- 'Invalid user token - deferring reject downstream')
- self._add_headers(env, {'X-Identity-Status': 'Invalid'})
- return self._call_app(env, start_response)
- else:
- self._LOG.info('Invalid user token - rejecting request')
+
+ try:
+ self._LOG.debug('Authenticating user token')
+ user_token = self._get_user_token_from_header(env)
+ token_info = self._validate_token(user_token, env)
+ auth_ref = access.AccessInfo.factory(body=token_info)
+ env['keystone.token_info'] = token_info
+ env['keystone.token_auth'] = _UserAuthPlugin(
+ user_token, auth_ref)
+ user_headers = self._build_user_headers(auth_ref, token_info)
+ self._add_headers(env, user_headers)
+ except InvalidToken:
+ if self._delay_auth_decision:
+ self._LOG.info(
+ 'Invalid user token - deferring reject downstream')
+ self._add_headers(env, {'X-Identity-Status': 'Invalid'})
+ else:
+ self._LOG.info('Invalid user token - rejecting request')
+ return self._reject_request(env, start_response)
+
+ try:
+ self._LOG.debug('Authenticating service token')
+ serv_token = self._get_service_token_from_header(env)
+ if serv_token is not None:
+ serv_token_info = self._validate_token(
+ serv_token, env)
+ serv_headers = self._build_service_headers(serv_token_info)
+ self._add_headers(env, serv_headers)
+ except InvalidToken:
+ # Delayed auth not currently supported for service tokens.
+ # (Can be implemented if a use case is found.)
+ self._LOG.info('Invalid service token - rejecting request')
return self._reject_request(env, start_response)
except ServiceError as e:
@@ -661,43 +718,49 @@ class AuthProtocol(object):
start_response('503 Service Unavailable', resp.headers)
return resp.body
+ self._LOG.debug("Received request from %s" % _fmt_msg(env))
+
+ return self._call_app(env, start_response)
+
+ def _init_auth_headers(self):
+ """Initialize auth header list.
+
+ Both user and service token headers are generated.
+ """
+ auth_headers = ['X-Service-Catalog',
+ 'X-Identity-Status',
+ 'X-Roles',
+ 'X-Service-Roles']
+ for key in six.iterkeys(_HEADER_TEMPLATE):
+ auth_headers.append(key % '')
+ # Service headers
+ auth_headers.append(key % '-Service')
+
+ # Deprecated headers
+ auth_headers.append('X-Role')
+ for key in six.iterkeys(_DEPRECATED_HEADER_TEMPLATE):
+ auth_headers.append(key)
+
+ self._auth_headers = auth_headers
+
def _remove_auth_headers(self, env):
"""Remove headers so a user can't fake authentication.
+ Both user and service token headers are removed.
+
:param env: wsgi request environment
"""
- auth_headers = (
- 'X-Identity-Status',
- 'X-Domain-Id',
- 'X-Domain-Name',
- 'X-Project-Id',
- 'X-Project-Name',
- 'X-Project-Domain-Id',
- 'X-Project-Domain-Name',
- 'X-User-Id',
- 'X-User-Name',
- 'X-User-Domain-Id',
- 'X-User-Domain-Name',
- 'X-Roles',
- 'X-Service-Catalog',
- # Deprecated
- 'X-User',
- 'X-Tenant-Id',
- 'X-Tenant-Name',
- 'X-Tenant',
- 'X-Role',
- )
self._LOG.debug('Removing headers from request environment: %s',
- ','.join(auth_headers))
- self._remove_headers(env, auth_headers)
+ ','.join(self._auth_headers))
+ self._remove_headers(env, self._auth_headers)
def _get_user_token_from_header(self, env):
"""Get token id from request.
:param env: wsgi request environment
:return token id
- :raises InvalidUserToken if no token is provided in request
+ :raises InvalidToken if no token is provided in request
"""
token = self._get_header(env, 'X-Auth-Token',
@@ -709,7 +772,16 @@ class AuthProtocol(object):
self._LOG.warn('Unable to find authentication token'
' in headers')
self._LOG.debug('Headers: %s', env)
- raise InvalidUserToken('Unable to find token in headers')
+ raise InvalidToken('Unable to find token in headers')
+
+ def _get_service_token_from_header(self, env):
+ """Get service token id from request.
+
+ :param env: wsgi request environment
+ :return service token id or None if not present
+
+ """
+ return self._get_header(env, 'X-Service-Token')
@property
def _reject_auth_headers(self):
@@ -729,20 +801,20 @@ class AuthProtocol(object):
start_response('401 Unauthorized', resp.headers)
return resp.body
- def _validate_user_token(self, user_token, env, retry=True):
+ def _validate_token(self, token, env, retry=True):
"""Authenticate user token
- :param user_token: user's token id
+ :param token: token id
:param retry: Ignored, as it is not longer relevant
:return uncrypted body of the token if the token is valid
- :raise InvalidUserToken if token is rejected
+ :raise InvalidToken if token is rejected
:no longer raises ServiceError since it no longer makes RPC
"""
token_id = None
try:
- token_ids, cached = self._token_cache.get(user_token)
+ token_ids, cached = self._token_cache.get(token)
token_id = token_ids[0]
if cached:
# Token was retrieved from the cache. In this case, there's no
@@ -761,22 +833,22 @@ class AuthProtocol(object):
if is_revoked:
self._LOG.debug(
'Token is marked as having been revoked')
- raise InvalidUserToken(
+ raise InvalidToken(
'Token authorization failed')
self._confirm_token_bind(data, env)
else:
# Token wasn't cached. In this case, the token needs to be
# checked that it's not expired, and also put in the cache.
- if cms.is_pkiz(user_token):
- verified = self._verify_pkiz_token(user_token, token_ids)
+ if cms.is_pkiz(token):
+ verified = self._verify_pkiz_token(token, token_ids)
data = jsonutils.loads(verified)
expires = _confirm_token_not_expired(data)
- elif cms.is_asn1_token(user_token):
- verified = self._verify_signed_token(user_token, token_ids)
+ elif cms.is_asn1_token(token):
+ verified = self._verify_signed_token(token, token_ids)
data = jsonutils.loads(verified)
expires = _confirm_token_not_expired(data)
else:
- data = self._identity_server.verify_token(user_token,
+ data = self._identity_server.verify_token(token,
retry)
# No need to confirm token expiration here since
# verify_token fails for expired tokens.
@@ -787,13 +859,13 @@ class AuthProtocol(object):
except NetworkError:
self._LOG.debug('Token validation failure.', exc_info=True)
self._LOG.warn('Authorization failed for token')
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
except Exception:
self._LOG.debug('Token validation failure.', exc_info=True)
if token_id:
self._token_cache.store_invalid(token_id)
self._LOG.warn('Authorization failed for token')
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
def _build_user_headers(self, auth_ref, token_info):
"""Convert token object into headers.
@@ -801,39 +873,28 @@ class AuthProtocol(object):
Build headers that represent authenticated user - see main
doc info at start of file for details of headers to be defined.
- :param token_info: token object returned by keystone on authentication
- :raise InvalidUserToken when unable to parse token object
+ :param token_info: token object returned by identity
+ server on authentication
+ :raise InvalidToken: when unable to parse token object
"""
roles = ','.join(auth_ref.role_names)
if _token_is_v2(token_info) and not auth_ref.project_id:
- raise InvalidUserToken('Unable to determine tenancy.')
+ raise InvalidToken('Unable to determine tenancy.')
rval = {
'X-Identity-Status': 'Confirmed',
- 'X-Domain-Id': auth_ref.domain_id,
- 'X-Domain-Name': auth_ref.domain_name,
- 'X-Project-Id': auth_ref.project_id,
- 'X-Project-Name': auth_ref.project_name,
- 'X-Project-Domain-Id': auth_ref.project_domain_id,
- 'X-Project-Domain-Name': auth_ref.project_domain_name,
- 'X-User-Id': auth_ref.user_id,
- 'X-User-Name': auth_ref.username,
- 'X-User-Domain-Id': auth_ref.user_domain_id,
- 'X-User-Domain-Name': auth_ref.user_domain_name,
'X-Roles': roles,
- # Deprecated
- 'X-User': auth_ref.username,
- 'X-Tenant-Id': auth_ref.project_id,
- 'X-Tenant-Name': auth_ref.project_name,
- 'X-Tenant': auth_ref.project_name,
- 'X-Role': roles,
}
- self._LOG.debug('Received request from user: %s with project_id : %s'
- ' and roles: %s ',
- auth_ref.user_id, auth_ref.project_id, roles)
+ for header_tmplt, attr in six.iteritems(_HEADER_TEMPLATE):
+ rval[header_tmplt % ''] = getattr(auth_ref, attr)
+
+ # Deprecated headers
+ rval['X-Role'] = roles
+ for header_tmplt, attr in six.iteritems(_DEPRECATED_HEADER_TEMPLATE):
+ rval[header_tmplt] = getattr(auth_ref, attr)
if self._include_service_catalog and auth_ref.has_service_catalog():
catalog = auth_ref.service_catalog.get_data()
@@ -843,6 +904,33 @@ class AuthProtocol(object):
return rval
+ def _build_service_headers(self, token_info):
+ """Convert token object into service headers.
+
+ Build headers that represent authenticated user - see main
+ doc info at start of file for details of headers to be defined.
+
+ :param token_info: token object returned by identity
+ server on authentication
+ :raise InvalidToken: when unable to parse token object
+
+ """
+ auth_ref = access.AccessInfo.factory(body=token_info)
+
+ if _token_is_v2(token_info) and not auth_ref.project_id:
+ raise InvalidToken('Unable to determine service tenancy.')
+
+ roles = ','.join(auth_ref.role_names)
+ rval = {
+ 'X-Service-Roles': roles,
+ }
+
+ header_type = '-Service'
+ for header_tmplt, attr in six.iteritems(_HEADER_TEMPLATE):
+ rval[header_tmplt % header_type] = getattr(auth_ref, attr)
+
+ return rval
+
def _header_to_env_var(self, key):
"""Convert header to wsgi env variable.
@@ -877,7 +965,7 @@ class AuthProtocol(object):
if msg is False:
msg = 'Token authorization failed'
- raise InvalidUserToken(msg)
+ raise InvalidToken(msg)
def _confirm_token_bind(self, data, env):
bind_mode = self._conf_get('enforce_token_bind')
@@ -995,7 +1083,7 @@ class AuthProtocol(object):
def _verify_signed_token(self, signed_text, token_ids):
"""Check that the token is unrevoked and has a valid signature."""
if self._is_signed_token_revoked(token_ids):
- raise InvalidUserToken('Token has been revoked')
+ raise InvalidToken('Token has been revoked')
formatted = cms.token_to_cms(signed_text)
verified = self._cms_verify(formatted)
@@ -1003,14 +1091,14 @@ class AuthProtocol(object):
def _verify_pkiz_token(self, signed_text, token_ids):
if self._is_signed_token_revoked(token_ids):
- raise InvalidUserToken('Token has been revoked')
+ raise InvalidToken('Token has been revoked')
try:
uncompressed = cms.pkiz_uncompress(signed_text)
verified = self._cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
return verified
# TypeError If the signed_text is not zlib compressed
except TypeError:
- raise InvalidUserToken(signed_text)
+ raise InvalidToken(signed_text)
def _verify_signing_dir(self):
if os.path.exists(self._signing_dirname):
@@ -1234,7 +1322,7 @@ class _IdentityServer(object):
user authentication when an indeterminate
response is received. Optional.
:return: token object received from keystone on success
- :raise InvalidUserToken: if token is rejected
+ :raise InvalidToken: if token is rejected
:raise ServiceError: if unable to authenticate token
"""
@@ -1278,7 +1366,7 @@ class _IdentityServer(object):
if response.status_code == 200:
return data
- raise InvalidUserToken()
+ raise InvalidToken()
def fetch_revocation_list(self):
try:
@@ -1490,7 +1578,7 @@ class _TokenCache(object):
The second element is the token data from the cache if the token was
cached, otherwise ``None``.
- :raises InvalidUserToken: if the token is invalid
+ :raises InvalidToken: if the token is invalid
"""
@@ -1541,7 +1629,7 @@ class _TokenCache(object):
def _cache_get(self, token_id):
"""Return token information from cache.
- If token is invalid raise InvalidUserToken
+ If token is invalid raise InvalidToken
return token only if fresh (not expired).
"""
@@ -1590,7 +1678,7 @@ class _TokenCache(object):
cached = jsonutils.loads(serialized)
if cached == self._INVALID_INDICATOR:
self._LOG.debug('Cached Token is marked unauthorized')
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
data, expires = cached
@@ -1609,7 +1697,7 @@ class _TokenCache(object):
return data
else:
self._LOG.debug('Cached Token seems expired')
- raise InvalidUserToken('Token authorization failed')
+ raise InvalidToken('Token authorization failed')
def _cache_store(self, token_id, data):
"""Store value into memcache.
diff --git a/keystonemiddleware/tests/client_fixtures.py b/keystonemiddleware/tests/client_fixtures.py
index 84e5edc..cb29ad5 100644
--- a/keystonemiddleware/tests/client_fixtures.py
+++ b/keystonemiddleware/tests/client_fixtures.py
@@ -127,6 +127,9 @@ class Examples(fixtures.Fixture):
self.v3_UUID_TOKEN_BIND = '2f61f73e1c854cbb9534c487f9bd63c2'
self.v3_UUID_TOKEN_UNKNOWN_BIND = '7ed9781b62cd4880b8d8c6788ab1d1e2'
+ self.UUID_SERVICE_TOKEN_DEFAULT = 'fe4c0710ec2f492748596c1b53ab124'
+ self.v3_UUID_SERVICE_TOKEN_DEFAULT = 'g431071bbc2f492748596c1b53cb229'
+
revoked_token = self.REVOKED_TOKEN
if isinstance(revoked_token, six.text_type):
revoked_token = revoked_token.encode('utf-8')
@@ -235,6 +238,15 @@ class Examples(fixtures.Fixture):
ROLE_NAME1 = 'role1'
ROLE_NAME2 = 'role2'
+ SERVICE_PROJECT_ID = 'service_project_id1'
+ SERVICE_PROJECT_NAME = 'service_project_name1'
+ SERVICE_USER_ID = 'service_user_id1'
+ SERVICE_USER_NAME = 'service_user_name1'
+ SERVICE_DOMAIN_ID = 'service_domain_id1'
+ SERVICE_DOMAIN_NAME = 'service_domain_name1'
+ SERVICE_ROLE_NAME1 = 'service_role1'
+ SERVICE_ROLE_NAME2 = 'service_role2'
+
self.SERVICE_TYPE = 'identity'
self.UNVERSIONED_SERVICE_URL = 'http://keystone.server:5000/'
self.SERVICE_URL = self.UNVERSIONED_SERVICE_URL + 'v2.0'
@@ -320,6 +332,17 @@ class Examples(fixtures.Fixture):
token['access']['token']['bind'] = {'FOO': 'BAR'}
self.TOKEN_RESPONSES[self.UUID_TOKEN_UNKNOWN_BIND] = token
+ token = fixture.V2Token(token_id=self.UUID_SERVICE_TOKEN_DEFAULT,
+ tenant_id=SERVICE_PROJECT_ID,
+ tenant_name=SERVICE_PROJECT_NAME,
+ user_id=SERVICE_USER_ID,
+ user_name=SERVICE_USER_NAME)
+ token.add_role(name=SERVICE_ROLE_NAME1)
+ token.add_role(name=SERVICE_ROLE_NAME2)
+ svc = token.add_service(self.SERVICE_TYPE)
+ svc.add_endpoint(public=self.SERVICE_URL)
+ self.TOKEN_RESPONSES[self.UUID_SERVICE_TOKEN_DEFAULT] = token
+
# Generated V3 Tokens
token = fixture.V3Token(user_id=USER_ID,
@@ -398,6 +421,22 @@ class Examples(fixtures.Fixture):
token['token']['bind'] = {'FOO': 'BAR'}
self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_UNKNOWN_BIND] = token
+ token = fixture.V3Token(user_id=SERVICE_USER_ID,
+ user_name=SERVICE_USER_NAME,
+ user_domain_id=SERVICE_DOMAIN_ID,
+ user_domain_name=SERVICE_DOMAIN_NAME,
+ project_id=SERVICE_PROJECT_ID,
+ project_name=SERVICE_PROJECT_NAME,
+ project_domain_id=SERVICE_DOMAIN_ID,
+ project_domain_name=SERVICE_DOMAIN_NAME)
+ token.add_role(id=SERVICE_ROLE_NAME1,
+ name=SERVICE_ROLE_NAME1)
+ token.add_role(id=SERVICE_ROLE_NAME2,
+ name=SERVICE_ROLE_NAME2)
+ svc = token.add_service(self.SERVICE_TYPE)
+ svc.add_endpoint('public', self.SERVICE_URL)
+ self.TOKEN_RESPONSES[self.v3_UUID_SERVICE_TOKEN_DEFAULT] = token
+
# PKIZ tokens generally link to above tokens
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_PKIZ_KEY] = (
diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py
index 5462281..88f7944 100644
--- a/keystonemiddleware/tests/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/test_auth_token_middleware.py
@@ -17,7 +17,6 @@ import datetime
import json
import os
import shutil
-import six
import stat
import tempfile
import time
@@ -32,6 +31,7 @@ from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient import session
import mock
+import six
import testresources
import testtools
from testtools import matchers
@@ -58,6 +58,28 @@ EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
+EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = {
+ 'HTTP_X_SERVICE_PROJECT_ID': 'service_project_id1',
+ 'HTTP_X_SERVICE_PROJECT_NAME': 'service_project_name1',
+ 'HTTP_X_SERVICE_USER_ID': 'service_user_id1',
+ 'HTTP_X_SERVICE_USER_NAME': 'service_user_name1',
+ 'HTTP_X_SERVICE_ROLES': 'service_role1,service_role2',
+}
+
+EXPECTED_V3_DEFAULT_ENV_ADDITIONS = {
+ 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
+ 'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1',
+}
+
+EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS = {
+ 'HTTP_X_SERVICE_PROJECT_DOMAIN_ID': 'service_domain_id1',
+ 'HTTP_X_SERVICE_PROJECT_DOMAIN_NAME': 'service_domain_name1',
+ 'HTTP_X_SERVICE_USER_DOMAIN_ID': 'service_domain_id1',
+ 'HTTP_X_SERVICE_USER_DOMAIN_NAME': 'service_domain_name1'
+}
+
BASE_HOST = 'https://keystone.example.com:1234'
BASE_URI = '%s/testadmin' % BASE_HOST
@@ -167,41 +189,86 @@ class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware."""
SUCCESS = b'SUCCESS'
+ FORBIDDEN = b'FORBIDDEN'
+ expected_env = {}
- def __init__(self, expected_env=None):
+ def __init__(self, expected_env=None, need_service_token=False):
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
if expected_env:
self.expected_env.update(expected_env)
+ self.need_service_token = need_service_token
+
def __call__(self, env, start_response):
for k, v in self.expected_env.items():
assert env[k] == v, '%s != %s' % (env[k], v)
resp = webob.Response()
- resp.body = FakeApp.SUCCESS
+
+ if env['HTTP_X_IDENTITY_STATUS'] == 'Invalid':
+ # Simulate delayed auth forbidding access
+ resp.status = 403
+ resp.body = FakeApp.FORBIDDEN
+ elif (self.need_service_token is True and
+ env.get('HTTP_X_SERVICE_TOKEN') is None):
+ # Simulate requiring composite auth
+ # Arbitrary value to allow checking this code path
+ resp.status = 418
+ resp.body = FakeApp.FORBIDDEN
+ else:
+ resp.body = FakeApp.SUCCESS
+
return resp(env, start_response)
class v3FakeApp(FakeApp):
"""This represents a v3 WSGI app protected by the auth_token middleware."""
- def __init__(self, expected_env=None):
+ def __init__(self, expected_env=None, need_service_token=False):
# with v3 additions, these are for the DEFAULT TOKEN
- v3_default_env_additions = {
- 'HTTP_X_PROJECT_ID': 'tenant_id1',
- 'HTTP_X_PROJECT_NAME': 'tenant_name1',
- 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
- 'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
- }
-
+ v3_default_env_additions = dict(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
if expected_env:
v3_default_env_additions.update(expected_env)
+ super(v3FakeApp, self).__init__(expected_env=v3_default_env_additions,
+ need_service_token=need_service_token)
+
+
+class CompositeBase(object):
+ """Base composite auth object with common service token environment."""
+
+ def __init__(self, expected_env=None):
+ comp_expected_env = dict(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
+
+ if expected_env:
+ comp_expected_env.update(expected_env)
+
+ super(CompositeBase, self).__init__(
+ expected_env=comp_expected_env, need_service_token=True)
+
+
+class CompositeFakeApp(CompositeBase, FakeApp):
+ """A fake v2 WSGI app protected by composite auth_token middleware."""
- super(v3FakeApp, self).__init__(v3_default_env_additions)
+ def __init__(self, expected_env):
+ super(CompositeFakeApp, self).__init__(expected_env=expected_env)
+
+
+class v3CompositeFakeApp(CompositeBase, v3FakeApp):
+ """A fake v3 WSGI app protected by composite auth_token middleware."""
+
+ def __init__(self, expected_env=None):
+
+ # with v3 additions, these are for the DEFAULT SERVICE TOKEN
+ v3_default_service_env_additions = dict(
+ EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS)
+
+ if expected_env:
+ v3_default_service_env_additions.update(expected_env)
+
+ super(v3CompositeFakeApp, self).__init__(
+ v3_default_service_env_additions)
def new_app(status, body, headers={}):
@@ -281,6 +348,17 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.middleware._token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
+ def update_expected_env(self, expected_env={}):
+ self.middleware._app.expected_env.update(expected_env)
+
+ def purge_token_expected_env(self):
+ for key in six.iterkeys(self.token_expected_env):
+ del self.middleware._app.expected_env[key]
+
+ def purge_service_token_expected_env(self):
+ for key in six.iterkeys(self.service_token_expected_env):
+ del self.middleware._app.expected_env[key]
+
def start_fake_response(self, status, headers, exc_info=None):
self.response_status = int(status.split(' ', 1)[0])
self.response_headers = dict(headers)
@@ -778,7 +856,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signed_token_raises_exception_for_revoked_token(self):
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
self.middleware._verify_signed_token,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash']])
@@ -788,7 +866,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.set_middleware()
self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
self.middleware._verify_signed_token,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash_sha256'],
@@ -797,7 +875,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self):
self.middleware._token_revocation_list = (
self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
self.middleware._verify_pkiz_token,
self.token_dict['revoked_token_pkiz'],
[self.token_dict['revoked_token_pkiz_hash']])
@@ -960,7 +1038,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.middleware._LOG = FakeLog()
self.middleware._delay_auth_decision = False
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
self.middleware._get_user_token_from_header, {})
self.assertIsNotNone(self.middleware._LOG.msg)
self.assertIsNotNone(self.middleware._LOG.debugmsg)
@@ -1012,7 +1090,7 @@ class CommonAuthTokenMiddlewareTest(object):
token = 'invalid-token'
req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response)
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
self._get_cached_token, token)
def _test_memcache_set_invalid_signed(self, hash_algorithms=None,
@@ -1024,7 +1102,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.conf['hash_algorithms'] = hash_algorithms
self.set_middleware()
self.middleware(req.environ, self.start_fake_response)
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
self._get_cached_token, token, mode=exp_mode)
def test_memcache_set_invalid_signed(self):
@@ -1876,13 +1954,13 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def test_no_data(self):
data = {}
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
auth_token._confirm_token_not_expired,
data)
def test_bad_data(self):
data = {'my_happy_token_dict': 'woo'}
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
auth_token._confirm_token_not_expired,
data)
@@ -1894,7 +1972,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def test_v2_token_expired(self):
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
auth_token._confirm_token_not_expired,
data)
@@ -1917,7 +1995,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
auth_token._confirm_token_not_expired,
data)
@@ -1929,7 +2007,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def test_v3_token_expired(self):
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
auth_token._confirm_token_not_expired,
data)
@@ -1952,7 +2030,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
mock_utcnow.return_value = current_time
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
- self.assertRaises(auth_token.InvalidUserToken,
+ self.assertRaises(auth_token.InvalidToken,
auth_token._confirm_token_not_expired,
data)
@@ -1993,7 +2071,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires = some_time_earlier
self.middleware._token_cache.store(token, data, expires)
self.assertThat(lambda: self.middleware._token_cache._cache_get(token),
- matchers.raises(auth_token.InvalidUserToken))
+ matchers.raises(auth_token.InvalidToken))
def test_cached_token_with_timezone_offset_not_expired(self):
token = 'mytoken'
@@ -2016,7 +2094,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires = timeutils.strtime(some_time_earlier) + '-02:00'
self.middleware._token_cache.store(token, data, expires)
self.assertThat(lambda: self.middleware._token_cache._cache_get(token),
- matchers.raises(auth_token.InvalidUserToken))
+ matchers.raises(auth_token.InvalidToken))
class CatalogConversionTests(BaseAuthTokenMiddlewareTest):
@@ -2102,5 +2180,282 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
self.response_headers['WWW-Authenticate'])
+class CommonCompositeAuthTests(object):
+ """Test Composite authentication.
+
+ Test the behaviour of adding a service-token.
+ """
+
+ def test_composite_auth_ok(self):
+ req = webob.Request.blank('/')
+ token = self.token_dict['uuid_token_default']
+ service_token = self.token_dict['uuid_service_token_default']
+ req.headers['X-Auth-Token'] = token
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(200, self.response_status)
+ self.assertEqual([FakeApp.SUCCESS], body)
+
+ def test_composite_auth_invalid_service_token(self):
+ req = webob.Request.blank('/')
+ token = self.token_dict['uuid_token_default']
+ service_token = 'invalid-service-token'
+ req.headers['X-Auth-Token'] = token
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(401, self.response_status)
+ self.assertEqual(['Authentication required'], body)
+
+ def test_composite_auth_no_service_token(self):
+ self.purge_service_token_expected_env()
+ req = webob.Request.blank('/')
+ token = self.token_dict['uuid_token_default']
+ req.headers['X-Auth-Token'] = token
+
+ # Ensure injection of service headers is not possible
+ for key, value in six.iteritems(self.service_token_expected_env):
+ header_key = key[len('HTTP_'):].replace('_', '-')
+ req.headers[header_key] = value
+ # Check arbitrary headers not removed
+ req.headers['X-Foo'] = 'Bar'
+ body = self.middleware(req.environ, self.start_fake_response)
+ for key in six.iterkeys(self.service_token_expected_env):
+ self.assertFalse(req.headers.get(key))
+ self.assertEqual('Bar', req.headers.get('X-Foo'))
+ self.assertEqual(418, self.response_status)
+ self.assertEqual([FakeApp.FORBIDDEN], body)
+
+ def test_composite_auth_invalid_user_token(self):
+ req = webob.Request.blank('/')
+ token = 'invalid-token'
+ service_token = self.token_dict['uuid_service_token_default']
+ req.headers['X-Auth-Token'] = token
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(401, self.response_status)
+ self.assertEqual(['Authentication required'], body)
+
+ def test_composite_auth_no_user_token(self):
+ req = webob.Request.blank('/')
+ service_token = self.token_dict['uuid_service_token_default']
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(401, self.response_status)
+ self.assertEqual(['Authentication required'], body)
+
+ def test_composite_auth_delay_ok(self):
+ self.middleware._delay_auth_decision = True
+ req = webob.Request.blank('/')
+ token = self.token_dict['uuid_token_default']
+ service_token = self.token_dict['uuid_service_token_default']
+ req.headers['X-Auth-Token'] = token
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(200, self.response_status)
+ self.assertEqual([FakeApp.SUCCESS], body)
+
+ def test_composite_auth_delay_invalid_service_token(self):
+ self.middleware._delay_auth_decision = True
+ req = webob.Request.blank('/')
+ token = self.token_dict['uuid_token_default']
+ service_token = 'invalid-service-token'
+ req.headers['X-Auth-Token'] = token
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(401, self.response_status)
+ self.assertEqual(['Authentication required'], body)
+
+ def test_composite_auth_delay_no_service_token(self):
+ self.middleware._delay_auth_decision = True
+ self.purge_service_token_expected_env()
+
+ req = webob.Request.blank('/')
+ token = self.token_dict['uuid_token_default']
+ req.headers['X-Auth-Token'] = token
+
+ # Ensure injection of service headers is not possible
+ for key, value in six.iteritems(self.service_token_expected_env):
+ header_key = key[len('HTTP_'):].replace('_', '-')
+ req.headers[header_key] = value
+ # Check arbitrary headers not removed
+ req.headers['X-Foo'] = 'Bar'
+ body = self.middleware(req.environ, self.start_fake_response)
+ for key in six.iterkeys(self.service_token_expected_env):
+ self.assertFalse(req.headers.get(key))
+ self.assertEqual('Bar', req.headers.get('X-Foo'))
+ self.assertEqual(418, self.response_status)
+ self.assertEqual([FakeApp.FORBIDDEN], body)
+
+ def test_composite_auth_delay_invalid_user_token(self):
+ self.middleware._delay_auth_decision = True
+ self.purge_token_expected_env()
+ expected_env = {
+ 'HTTP_X_IDENTITY_STATUS': 'Invalid',
+ }
+ self.update_expected_env(expected_env)
+
+ req = webob.Request.blank('/')
+ token = 'invalid-token'
+ service_token = self.token_dict['uuid_service_token_default']
+ req.headers['X-Auth-Token'] = token
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(403, self.response_status)
+ self.assertEqual([FakeApp.FORBIDDEN], body)
+
+ def test_composite_auth_delay_no_user_token(self):
+ self.middleware._delay_auth_decision = True
+ self.purge_token_expected_env()
+ expected_env = {
+ 'HTTP_X_IDENTITY_STATUS': 'Invalid',
+ }
+ self.update_expected_env(expected_env)
+
+ req = webob.Request.blank('/')
+ service_token = self.token_dict['uuid_service_token_default']
+ req.headers['X-Service-Token'] = service_token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(403, self.response_status)
+ self.assertEqual([FakeApp.FORBIDDEN], body)
+
+
+class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
+ CommonCompositeAuthTests,
+ testresources.ResourcedTestCase):
+ """Test auth_token middleware with v2 token based composite auth.
+
+ Execute the Composite auth class tests, but with the
+ auth_token middleware configured to expect v2 tokens back from
+ a keystone server.
+ """
+
+ resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
+
+ def setUp(self):
+ super(v2CompositeAuthTests, self).setUp(
+ expected_env=EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE,
+ fake_app=CompositeFakeApp)
+
+ uuid_token_default = self.examples.UUID_TOKEN_DEFAULT
+ uuid_service_token_default = self.examples.UUID_SERVICE_TOKEN_DEFAULT
+ self.token_dict = {
+ 'uuid_token_default': uuid_token_default,
+ 'uuid_service_token_default': uuid_service_token_default,
+ }
+
+ httpretty.reset()
+ httpretty.enable()
+ self.addCleanup(httpretty.disable)
+
+ httpretty.register_uri(httpretty.GET,
+ "%s/" % BASE_URI,
+ body=VERSION_LIST_v2,
+ status=300)
+
+ httpretty.register_uri(httpretty.POST,
+ "%s/v2.0/tokens" % BASE_URI,
+ body=FAKE_ADMIN_TOKEN)
+
+ httpretty.register_uri(httpretty.GET,
+ "%s/v2.0/tokens/revoked" % BASE_URI,
+ body=self.examples.SIGNED_REVOCATION_LIST,
+ status=200)
+
+ for token in (self.examples.UUID_TOKEN_DEFAULT,
+ self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
+ httpretty.register_uri(httpretty.GET,
+ "%s/v2.0/tokens/%s" % (BASE_URI, token),
+ body=
+ self.examples.JSON_TOKEN_RESPONSES[token])
+
+ for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
+ "%s/v2.0/tokens/invalid-service-token" % BASE_URI):
+ httpretty.register_uri(httpretty.GET,
+ invalid_uri,
+ body="", status=404)
+
+ self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
+ self.service_token_expected_env = dict(
+ EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
+ self.set_middleware()
+
+
+class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
+ CommonCompositeAuthTests,
+ testresources.ResourcedTestCase):
+ """Test auth_token middleware with v3 token based composite auth.
+
+ Execute the Composite auth class tests, but with the
+ auth_token middleware configured to expect v3 tokens back from
+ a keystone server.
+ """
+
+ resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
+
+ def setUp(self):
+ super(v3CompositeAuthTests, self).setUp(
+ auth_version='v3.0',
+ fake_app=v3CompositeFakeApp)
+
+ uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT
+ uuid_serv_token_default = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT
+ self.token_dict = {
+ 'uuid_token_default': uuid_token_default,
+ 'uuid_service_token_default': uuid_serv_token_default,
+ }
+
+ httpretty.reset()
+ httpretty.enable()
+ self.addCleanup(httpretty.disable)
+
+ httpretty.register_uri(httpretty.GET,
+ "%s" % BASE_URI,
+ body=VERSION_LIST_v3,
+ status=300)
+
+ # TODO(jamielennox): auth_token middleware uses a v2 admin token
+ # regardless of the auth_version that is set.
+ httpretty.register_uri(httpretty.POST,
+ "%s/v2.0/tokens" % BASE_URI,
+ body=FAKE_ADMIN_TOKEN)
+
+ # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
+ httpretty.register_uri(httpretty.GET,
+ "%s/v2.0/tokens/revoked" % BASE_URI,
+ body=self.examples.SIGNED_REVOCATION_LIST,
+ status=200)
+
+ httpretty.register_uri(httpretty.GET,
+ "%s/v3/auth/tokens" % BASE_URI,
+ body=self.token_response)
+
+ self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
+ self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
+ self.service_token_expected_env = dict(
+ EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
+ self.service_token_expected_env.update(
+ EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS)
+ self.set_middleware()
+
+ def token_response(self, request, uri, headers):
+ auth_id = request.headers.get('X-Auth-Token')
+ token_id = request.headers.get('X-Subject-Token')
+ self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
+ headers.pop('status')
+
+ status = 200
+ response = ""
+
+ if token_id == ERROR_TOKEN:
+ raise auth_token.NetworkError("Network connection error.")
+
+ try:
+ response = self.examples.JSON_TOKEN_RESPONSES[token_id]
+ except KeyError:
+ status = 404
+
+ return status, headers, response
+
+
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)