diff options
Diffstat (limited to 'keystonemiddleware/tests/test_auth_token_middleware.py')
-rw-r--r-- | keystonemiddleware/tests/test_auth_token_middleware.py | 411 |
1 files changed, 383 insertions, 28 deletions
diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py index 5462281..88f7944 100644 --- a/keystonemiddleware/tests/test_auth_token_middleware.py +++ b/keystonemiddleware/tests/test_auth_token_middleware.py @@ -17,7 +17,6 @@ import datetime import json import os import shutil -import six import stat import tempfile import time @@ -32,6 +31,7 @@ from keystoneclient import exceptions from keystoneclient import fixture from keystoneclient import session import mock +import six import testresources import testtools from testtools import matchers @@ -58,6 +58,28 @@ EXPECTED_V2_DEFAULT_ENV_RESPONSE = { 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat) } +EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = { + 'HTTP_X_SERVICE_PROJECT_ID': 'service_project_id1', + 'HTTP_X_SERVICE_PROJECT_NAME': 'service_project_name1', + 'HTTP_X_SERVICE_USER_ID': 'service_user_id1', + 'HTTP_X_SERVICE_USER_NAME': 'service_user_name1', + 'HTTP_X_SERVICE_ROLES': 'service_role1,service_role2', +} + +EXPECTED_V3_DEFAULT_ENV_ADDITIONS = { + 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1', + 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1', + 'HTTP_X_USER_DOMAIN_ID': 'domain_id1', + 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1', +} + +EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS = { + 'HTTP_X_SERVICE_PROJECT_DOMAIN_ID': 'service_domain_id1', + 'HTTP_X_SERVICE_PROJECT_DOMAIN_NAME': 'service_domain_name1', + 'HTTP_X_SERVICE_USER_DOMAIN_ID': 'service_domain_id1', + 'HTTP_X_SERVICE_USER_DOMAIN_NAME': 'service_domain_name1' +} + BASE_HOST = 'https://keystone.example.com:1234' BASE_URI = '%s/testadmin' % BASE_HOST @@ -167,41 +189,86 @@ class FakeApp(object): """This represents a WSGI app protected by the auth_token middleware.""" SUCCESS = b'SUCCESS' + FORBIDDEN = b'FORBIDDEN' + expected_env = {} - def __init__(self, expected_env=None): + def __init__(self, expected_env=None, need_service_token=False): self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) if expected_env: self.expected_env.update(expected_env) + self.need_service_token = need_service_token + def __call__(self, env, start_response): for k, v in self.expected_env.items(): assert env[k] == v, '%s != %s' % (env[k], v) resp = webob.Response() - resp.body = FakeApp.SUCCESS + + if env['HTTP_X_IDENTITY_STATUS'] == 'Invalid': + # Simulate delayed auth forbidding access + resp.status = 403 + resp.body = FakeApp.FORBIDDEN + elif (self.need_service_token is True and + env.get('HTTP_X_SERVICE_TOKEN') is None): + # Simulate requiring composite auth + # Arbitrary value to allow checking this code path + resp.status = 418 + resp.body = FakeApp.FORBIDDEN + else: + resp.body = FakeApp.SUCCESS + return resp(env, start_response) class v3FakeApp(FakeApp): """This represents a v3 WSGI app protected by the auth_token middleware.""" - def __init__(self, expected_env=None): + def __init__(self, expected_env=None, need_service_token=False): # with v3 additions, these are for the DEFAULT TOKEN - v3_default_env_additions = { - 'HTTP_X_PROJECT_ID': 'tenant_id1', - 'HTTP_X_PROJECT_NAME': 'tenant_name1', - 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1', - 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1', - 'HTTP_X_USER_DOMAIN_ID': 'domain_id1', - 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1' - } - + v3_default_env_additions = dict(EXPECTED_V3_DEFAULT_ENV_ADDITIONS) if expected_env: v3_default_env_additions.update(expected_env) + super(v3FakeApp, self).__init__(expected_env=v3_default_env_additions, + need_service_token=need_service_token) + + +class CompositeBase(object): + """Base composite auth object with common service token environment.""" + + def __init__(self, expected_env=None): + comp_expected_env = dict(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + + if expected_env: + comp_expected_env.update(expected_env) + + super(CompositeBase, self).__init__( + expected_env=comp_expected_env, need_service_token=True) + + +class CompositeFakeApp(CompositeBase, FakeApp): + """A fake v2 WSGI app protected by composite auth_token middleware.""" - super(v3FakeApp, self).__init__(v3_default_env_additions) + def __init__(self, expected_env): + super(CompositeFakeApp, self).__init__(expected_env=expected_env) + + +class v3CompositeFakeApp(CompositeBase, v3FakeApp): + """A fake v3 WSGI app protected by composite auth_token middleware.""" + + def __init__(self, expected_env=None): + + # with v3 additions, these are for the DEFAULT SERVICE TOKEN + v3_default_service_env_additions = dict( + EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS) + + if expected_env: + v3_default_service_env_additions.update(expected_env) + + super(v3CompositeFakeApp, self).__init__( + v3_default_service_env_additions) def new_app(status, body, headers={}): @@ -281,6 +348,17 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): self.middleware._token_revocation_list = jsonutils.dumps( {"revoked": [], "extra": "success"}) + def update_expected_env(self, expected_env={}): + self.middleware._app.expected_env.update(expected_env) + + def purge_token_expected_env(self): + for key in six.iterkeys(self.token_expected_env): + del self.middleware._app.expected_env[key] + + def purge_service_token_expected_env(self): + for key in six.iterkeys(self.service_token_expected_env): + del self.middleware._app.expected_env[key] + def start_fake_response(self, status, headers, exc_info=None): self.response_status = int(status.split(' ', 1)[0]) self.response_headers = dict(headers) @@ -778,7 +856,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_verify_signed_token_raises_exception_for_revoked_token(self): self.middleware._token_revocation_list = ( self.get_revocation_list_json()) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, self.middleware._verify_signed_token, self.token_dict['revoked_token'], [self.token_dict['revoked_token_hash']]) @@ -788,7 +866,7 @@ class CommonAuthTokenMiddlewareTest(object): self.set_middleware() self.middleware._token_revocation_list = ( self.get_revocation_list_json(mode='sha256')) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, self.middleware._verify_signed_token, self.token_dict['revoked_token'], [self.token_dict['revoked_token_hash_sha256'], @@ -797,7 +875,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self): self.middleware._token_revocation_list = ( self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, self.middleware._verify_pkiz_token, self.token_dict['revoked_token_pkiz'], [self.token_dict['revoked_token_pkiz_hash']]) @@ -960,7 +1038,7 @@ class CommonAuthTokenMiddlewareTest(object): self.middleware._LOG = FakeLog() self.middleware._delay_auth_decision = False - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, self.middleware._get_user_token_from_header, {}) self.assertIsNotNone(self.middleware._LOG.msg) self.assertIsNotNone(self.middleware._LOG.debugmsg) @@ -1012,7 +1090,7 @@ class CommonAuthTokenMiddlewareTest(object): token = 'invalid-token' req.headers['X-Auth-Token'] = token self.middleware(req.environ, self.start_fake_response) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, self._get_cached_token, token) def _test_memcache_set_invalid_signed(self, hash_algorithms=None, @@ -1024,7 +1102,7 @@ class CommonAuthTokenMiddlewareTest(object): self.conf['hash_algorithms'] = hash_algorithms self.set_middleware() self.middleware(req.environ, self.start_fake_response) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, self._get_cached_token, token, mode=exp_mode) def test_memcache_set_invalid_signed(self): @@ -1876,13 +1954,13 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): def test_no_data(self): data = {} - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, auth_token._confirm_token_not_expired, data) def test_bad_data(self): data = {'my_happy_token_dict': 'woo'} - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, auth_token._confirm_token_not_expired, data) @@ -1894,7 +1972,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): def test_v2_token_expired(self): data = self.create_v2_token_fixture(expires=self.one_hour_ago) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, auth_token._confirm_token_not_expired, data) @@ -1917,7 +1995,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): data = self.create_v2_token_fixture( expires='2000-01-01T00:05:10.000123+05:00') data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00' - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, auth_token._confirm_token_not_expired, data) @@ -1929,7 +2007,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): def test_v3_token_expired(self): data = self.create_v3_token_fixture(expires=self.one_hour_ago) - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, auth_token._confirm_token_not_expired, data) @@ -1952,7 +2030,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): mock_utcnow.return_value = current_time data = self.create_v3_token_fixture( expires='2000-01-01T00:05:10.000123+05:00') - self.assertRaises(auth_token.InvalidUserToken, + self.assertRaises(auth_token.InvalidToken, auth_token._confirm_token_not_expired, data) @@ -1993,7 +2071,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): expires = some_time_earlier self.middleware._token_cache.store(token, data, expires) self.assertThat(lambda: self.middleware._token_cache._cache_get(token), - matchers.raises(auth_token.InvalidUserToken)) + matchers.raises(auth_token.InvalidToken)) def test_cached_token_with_timezone_offset_not_expired(self): token = 'mytoken' @@ -2016,7 +2094,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest): expires = timeutils.strtime(some_time_earlier) + '-02:00' self.middleware._token_cache.store(token, data, expires) self.assertThat(lambda: self.middleware._token_cache._cache_get(token), - matchers.raises(auth_token.InvalidUserToken)) + matchers.raises(auth_token.InvalidToken)) class CatalogConversionTests(BaseAuthTokenMiddlewareTest): @@ -2102,5 +2180,282 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest): self.response_headers['WWW-Authenticate']) +class CommonCompositeAuthTests(object): + """Test Composite authentication. + + Test the behaviour of adding a service-token. + """ + + def test_composite_auth_ok(self): + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + self.assertEqual([FakeApp.SUCCESS], body) + + def test_composite_auth_invalid_service_token(self): + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = 'invalid-service-token' + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual(['Authentication required'], body) + + def test_composite_auth_no_service_token(self): + self.purge_service_token_expected_env() + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + req.headers['X-Auth-Token'] = token + + # Ensure injection of service headers is not possible + for key, value in six.iteritems(self.service_token_expected_env): + header_key = key[len('HTTP_'):].replace('_', '-') + req.headers[header_key] = value + # Check arbitrary headers not removed + req.headers['X-Foo'] = 'Bar' + body = self.middleware(req.environ, self.start_fake_response) + for key in six.iterkeys(self.service_token_expected_env): + self.assertFalse(req.headers.get(key)) + self.assertEqual('Bar', req.headers.get('X-Foo')) + self.assertEqual(418, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_invalid_user_token(self): + req = webob.Request.blank('/') + token = 'invalid-token' + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual(['Authentication required'], body) + + def test_composite_auth_no_user_token(self): + req = webob.Request.blank('/') + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual(['Authentication required'], body) + + def test_composite_auth_delay_ok(self): + self.middleware._delay_auth_decision = True + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + self.assertEqual([FakeApp.SUCCESS], body) + + def test_composite_auth_delay_invalid_service_token(self): + self.middleware._delay_auth_decision = True + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = 'invalid-service-token' + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual(['Authentication required'], body) + + def test_composite_auth_delay_no_service_token(self): + self.middleware._delay_auth_decision = True + self.purge_service_token_expected_env() + + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + req.headers['X-Auth-Token'] = token + + # Ensure injection of service headers is not possible + for key, value in six.iteritems(self.service_token_expected_env): + header_key = key[len('HTTP_'):].replace('_', '-') + req.headers[header_key] = value + # Check arbitrary headers not removed + req.headers['X-Foo'] = 'Bar' + body = self.middleware(req.environ, self.start_fake_response) + for key in six.iterkeys(self.service_token_expected_env): + self.assertFalse(req.headers.get(key)) + self.assertEqual('Bar', req.headers.get('X-Foo')) + self.assertEqual(418, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_delay_invalid_user_token(self): + self.middleware._delay_auth_decision = True + self.purge_token_expected_env() + expected_env = { + 'HTTP_X_IDENTITY_STATUS': 'Invalid', + } + self.update_expected_env(expected_env) + + req = webob.Request.blank('/') + token = 'invalid-token' + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(403, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_delay_no_user_token(self): + self.middleware._delay_auth_decision = True + self.purge_token_expected_env() + expected_env = { + 'HTTP_X_IDENTITY_STATUS': 'Invalid', + } + self.update_expected_env(expected_env) + + req = webob.Request.blank('/') + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(403, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + +class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, + CommonCompositeAuthTests, + testresources.ResourcedTestCase): + """Test auth_token middleware with v2 token based composite auth. + + Execute the Composite auth class tests, but with the + auth_token middleware configured to expect v2 tokens back from + a keystone server. + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(v2CompositeAuthTests, self).setUp( + expected_env=EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE, + fake_app=CompositeFakeApp) + + uuid_token_default = self.examples.UUID_TOKEN_DEFAULT + uuid_service_token_default = self.examples.UUID_SERVICE_TOKEN_DEFAULT + self.token_dict = { + 'uuid_token_default': uuid_token_default, + 'uuid_service_token_default': uuid_service_token_default, + } + + httpretty.reset() + httpretty.enable() + self.addCleanup(httpretty.disable) + + httpretty.register_uri(httpretty.GET, + "%s/" % BASE_URI, + body=VERSION_LIST_v2, + status=300) + + httpretty.register_uri(httpretty.POST, + "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/revoked" % BASE_URI, + body=self.examples.SIGNED_REVOCATION_LIST, + status=200) + + for token in (self.examples.UUID_TOKEN_DEFAULT, + self.examples.UUID_SERVICE_TOKEN_DEFAULT,): + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/%s" % (BASE_URI, token), + body= + self.examples.JSON_TOKEN_RESPONSES[token]) + + for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI, + "%s/v2.0/tokens/invalid-service-token" % BASE_URI): + httpretty.register_uri(httpretty.GET, + invalid_uri, + body="", status=404) + + self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) + self.service_token_expected_env = dict( + EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + self.set_middleware() + + +class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, + CommonCompositeAuthTests, + testresources.ResourcedTestCase): + """Test auth_token middleware with v3 token based composite auth. + + Execute the Composite auth class tests, but with the + auth_token middleware configured to expect v3 tokens back from + a keystone server. + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(v3CompositeAuthTests, self).setUp( + auth_version='v3.0', + fake_app=v3CompositeFakeApp) + + uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT + uuid_serv_token_default = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT + self.token_dict = { + 'uuid_token_default': uuid_token_default, + 'uuid_service_token_default': uuid_serv_token_default, + } + + httpretty.reset() + httpretty.enable() + self.addCleanup(httpretty.disable) + + httpretty.register_uri(httpretty.GET, + "%s" % BASE_URI, + body=VERSION_LIST_v3, + status=300) + + # TODO(jamielennox): auth_token middleware uses a v2 admin token + # regardless of the auth_version that is set. + httpretty.register_uri(httpretty.POST, + "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/revoked" % BASE_URI, + body=self.examples.SIGNED_REVOCATION_LIST, + status=200) + + httpretty.register_uri(httpretty.GET, + "%s/v3/auth/tokens" % BASE_URI, + body=self.token_response) + + self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) + self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS) + self.service_token_expected_env = dict( + EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + self.service_token_expected_env.update( + EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS) + self.set_middleware() + + def token_response(self, request, uri, headers): + auth_id = request.headers.get('X-Auth-Token') + token_id = request.headers.get('X-Subject-Token') + self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) + headers.pop('status') + + status = 200 + response = "" + + if token_id == ERROR_TOKEN: + raise auth_token.NetworkError("Network connection error.") + + try: + response = self.examples.JSON_TOKEN_RESPONSES[token_id] + except KeyError: + status = 404 + + return status, headers, response + + def load_tests(loader, tests, pattern): return testresources.OptimisingTestSuite(tests) |