diff options
author | Wiliam Souza <wiliamsouza83@gmail.com> | 2018-06-05 11:33:21 -0300 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2018-06-05 17:33:21 +0300 |
commit | d5a4d5ea0eab04ddddefac7d1e7a4902fc469286 (patch) | |
tree | 580faf95f8f7bed257b2dac1d29c26c3d9d1b219 /tests | |
parent | a102731c88f496b557dedd4024fb9b82801d134a (diff) | |
download | oauthlib-d5a4d5ea0eab04ddddefac7d1e7a4902fc469286.tar.gz |
OpenID Connect split (#525)
* Add command to clean up builds to makefile
* Fix docs strings for endpoints pre_configured
* Chnage grant_types.openid_connect to include a deprecation warning be a backward compatible
* Fix doc string for rfc6749.request_validator
* Remove unused import
* Change import to be explicity
* Move JWTTokenTestCase to openid.connect.core.test_token
* Move JWTToken to oauthlib.openid.connect.core.tokens
* Move to openid connect test
* Move openid connect exceptions to its own file
* Remove openid connect from oauth2 server
* Remove JWTToken from oauth tokens
* Remove grant_types.openid_connect file
* Add oauthlib/openid estructure and tests
Diffstat (limited to 'tests')
15 files changed, 865 insertions, 656 deletions
diff --git a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py deleted file mode 100644 index 573d491..0000000 --- a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py +++ /dev/null @@ -1,403 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals - -import json - -import mock - -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.grant_types import (AuthTokenGrantDispatcher, - AuthorizationCodeGrant, - ImplicitGrant, - ImplicitTokenGrantDispatcher, - OIDCNoPrompt, - OpenIDConnectAuthCode, - OpenIDConnectHybrid, - OpenIDConnectImplicit) -from oauthlib.oauth2.rfc6749.tokens import BearerToken - -from ....unittest import TestCase -from .test_authorization_code import AuthorizationCodeGrantTest -from .test_implicit import ImplicitGrantTest - - -class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDAuthCodeInterferenceTest, self).setUp() - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - - -class OpenIDImplicitInterferenceTest(ImplicitGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDImplicitInterferenceTest, self).setUp() - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - -class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDHybridInterferenceTest, self).setUp() - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - - -def get_id_token_mock(token, token_handler, request): - return "MOCKED_TOKEN" - - -class OpenIDAuthCodeTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.code = '1234' - self.request.response_type = 'code' - self.request.grant_type = 'authorization_code' - self.request.redirect_uri = 'https://a.b/cb' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - - self.url_query = 'https://a.b/cb?code=abc&state=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc' - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'query' - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - def set_scopes(self, client_id, code, client, request): - request.scopes = self.request.scopes - request.state = self.request.state - request.user = 'bob' - return True - - def test_create_token_response(self): - self.request.response_type = None - self.mock_validator.validate_code.side_effect = self.set_scopes - - bearer = BearerToken(self.mock_validator) - - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertIn('id_token', token) - self.assertIn('openid', token['scope']) - - self.mock_validator.reset_mock() - - self.request.scopes = ('hello', 'world') - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertNotIn('id_token', token) - self.assertNotIn('openid', token['scope']) - - -class OpenIDImplicitTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.response_type = 'id_token token' - self.request.redirect_uri = 'https://a.b/cb' - self.request.nonce = 'zxc' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_type = 'id_token' - token = 'MOCKED_TOKEN' - url = 'https://a.b/cb#state=abc&id_token=%s' % token - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], url, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.nonce = None - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - -class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeTokenTest, self).setUp() - self.request.response_type = 'code token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - - -class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTest, self).setUp() - self.request.response_type = 'code id_token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token - - -class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() - self.request.response_type = 'code id_token token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - - -class ImplicitTokenGrantDispatcherTest(TestCase): - def setUp(self): - self.request = Request('http://a.b/path') - request_validator = mock.MagicMock() - implicit_grant = ImplicitGrant(request_validator) - openid_connect_implicit = OpenIDConnectImplicit(request_validator) - - self.dispatcher = ImplicitTokenGrantDispatcher( - default_implicit_grant=implicit_grant, - oidc_implicit_grant=openid_connect_implicit - ) - - def test_create_authorization_response_openid(self): - self.request.scopes = ('hello', 'openid') - self.request.response_type = 'id_token' - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, OpenIDConnectImplicit)) - - def test_validate_authorization_request_openid(self): - self.request.scopes = ('hello', 'openid') - self.request.response_type = 'id_token' - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, OpenIDConnectImplicit)) - - def test_create_authorization_response_oauth(self): - self.request.scopes = ('hello', 'world') - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, ImplicitGrant)) - - def test_validate_authorization_request_oauth(self): - self.request.scopes = ('hello', 'world') - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, ImplicitGrant)) - - -class DispatcherTest(TestCase): - def setUp(self): - self.request = Request('http://a.b/path') - self.request.decoded_body = ( - ("client_id", "me"), - ("code", "code"), - ("redirect_url", "https://a.b/cb"), - ) - - self.request_validator = mock.MagicMock() - self.auth_grant = AuthorizationCodeGrant(self.request_validator) - self.openid_connect_auth = OpenIDConnectAuthCode(self.request_validator) - - -class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): - - def setUp(self): - super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() - self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') - self.dispatcher = AuthTokenGrantDispatcher( - self.request_validator, - default_token_grant=self.auth_grant, - oidc_token_grant=self.openid_connect_auth - ) - - def test_create_token_response_openid(self): - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, OpenIDConnectAuthCode)) - self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) - - -class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): - - def setUp(self): - super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() - self.request.decoded_body = ( - ("client_id", "me"), - ("code", ""), - ("redirect_url", "https://a.b/cb"), - ) - self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') - self.dispatcher = AuthTokenGrantDispatcher( - self.request_validator, - default_token_grant=self.auth_grant, - oidc_token_grant=self.openid_connect_auth - ) - - def test_create_token_response_openid_without_code(self): - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) - self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) - - -class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): - - def setUp(self): - super(AuthTokenGrantDispatcherOAuthTest, self).setUp() - self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') - self.dispatcher = AuthTokenGrantDispatcher( - self.request_validator, - default_token_grant=self.auth_grant, - oidc_token_grant=self.openid_connect_auth - ) - - def test_create_token_response_oauth(self): - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) - self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/oauth2/rfc6749/test_server.py b/tests/oauth2/rfc6749/test_server.py index da303ce..bc7a2b7 100644 --- a/tests/oauth2/rfc6749/test_server.py +++ b/tests/oauth2/rfc6749/test_server.py @@ -3,21 +3,17 @@ from __future__ import absolute_import, unicode_literals import json -import jwt import mock from oauthlib import common from oauthlib.oauth2.rfc6749 import errors, tokens from oauthlib.oauth2.rfc6749.endpoints import Server -from oauthlib.oauth2.rfc6749.endpoints.authorization import \ - AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint from oauthlib.oauth2.rfc6749.grant_types import (AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant, - OpenIDConnectAuthCode, - OpenIDConnectImplicit, ResourceOwnerPasswordCredentialsGrant) from ...unittest import TestCase @@ -29,40 +25,34 @@ class AuthorizationEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) auth_code.save_authorization_code = mock.MagicMock() implicit = ImplicitGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) implicit.save_token = mock.MagicMock() - openid_connect_auth = OpenIDConnectAuthCode(self.mock_validator) - openid_connect_implicit = OpenIDConnectImplicit(self.mock_validator) - response_types = { - 'code': auth_code, - 'token': implicit, - - 'id_token': openid_connect_implicit, - 'id_token token': openid_connect_implicit, - 'code token': openid_connect_auth, - 'code id_token': openid_connect_auth, - 'code token id_token': openid_connect_auth, - 'none': auth_code + 'code': auth_code, + 'token': implicit, + 'none': auth_code } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) self.endpoint = AuthorizationEndpoint( - default_response_type='code', - default_token_type=token, - response_types=response_types) + default_response_type='code', + default_token_type=token, + response_types=response_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') @@ -71,7 +61,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) @@ -79,7 +69,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) self.assertEqual(body, None) @@ -99,9 +89,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.InvalidRequestError()) + side_effect=errors.InvalidRequestError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') @@ -109,9 +99,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.UnsupportedResponseTypeError()) + side_effect=errors.UnsupportedResponseTypeError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') @@ -129,27 +119,32 @@ class TokenEndpointTest(TestCase): self.mock_validator.authenticate_client.side_effect = set_user self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) password = ResourceOwnerPasswordCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) client = ClientCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) supported_types = { - 'authorization_code': auth_code, - 'password': password, - 'client_credentials': client, + 'authorization_code': auth_code, + 'password': password, + 'client_credentials': client, } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) - self.endpoint = TokenEndpoint('authorization_code', - default_token_type=token, grant_types=supported_types) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -176,7 +171,7 @@ class TokenEndpointTest(TestCase): def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -190,7 +185,7 @@ class TokenEndpointTest(TestCase): def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -281,7 +276,7 @@ twIDAQAB def test_authorization_grant(self): body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -295,7 +290,7 @@ twIDAQAB body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -310,7 +305,7 @@ twIDAQAB def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -325,7 +320,7 @@ twIDAQAB def test_scopes_and_user_id_stored_in_access_token(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) access_token = json.loads(body)['access_token'] @@ -338,7 +333,7 @@ twIDAQAB def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -366,8 +361,10 @@ class ResourceEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) token = tokens.BearerToken(request_validator=self.mock_validator) - self.endpoint = ResourceEndpoint(default_token='Bearer', - token_types={'Bearer': token}) + self.endpoint = ResourceEndpoint( + default_token='Bearer', + token_types={'Bearer': token} + ) def test_defaults(self): uri = 'http://a.b/path?some=query' diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index ecac03e..061754f 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -1,9 +1,11 @@ from __future__ import absolute_import, unicode_literals -import mock - -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.tokens import * +from oauthlib.oauth2.rfc6749.tokens import ( + prepare_mac_header, + prepare_bearer_headers, + prepare_bearer_body, + prepare_bearer_uri, +) from ...unittest import TestCase @@ -96,196 +98,3 @@ class TokenTest(TestCase): self.assertEqual(prepare_bearer_headers(self.token), self.bearer_headers) self.assertEqual(prepare_bearer_body(self.token), self.bearer_body) self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri) - - def test_fake_bearer_is_not_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_bearer_token = self._mocked_validate_bearer_token - - for fake_header in self.fake_bearer_headers: - request = Request('/', headers=fake_header) - result = BearerToken(request_validator=request_validator).validate_request(request) - - self.assertFalse(result) - - def test_header_with_multispaces_is_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_bearer_token = self._mocked_validate_bearer_token - - request = Request('/', headers=self.valid_header_with_multiple_spaces) - result = BearerToken(request_validator=request_validator).validate_request(request) - - self.assertTrue(result) - - def test_estimate_type_with_fake_header_returns_type_0(self): - request_validator = mock.MagicMock() - request_validator.validate_bearer_token = self._mocked_validate_bearer_token - - for fake_header in self.fake_bearer_headers: - request = Request('/', headers=fake_header) - result = BearerToken(request_validator=request_validator).estimate_type(request) - - if fake_header['Authorization'].count(' ') == 2 and \ - fake_header['Authorization'].split()[0] == 'Bearer': - # If we're dealing with the header containing 2 spaces, it will be recognized - # as a Bearer valid header, the token itself will be invalid by the way. - self.assertEqual(result, 9) - else: - self.assertEqual(result, 0) - - -class JWTTokenTestCase(TestCase): - fake_bearer_headers = [ - {'Authorization': 'Beaver vF9dft4qmT'}, - {'Authorization': 'BeavervF9dft4qmT'}, - {'Authorization': 'Beaver vF9dft4qmT'}, - {'Authorization': 'BearerF9dft4qmT'}, - {'Authorization': 'Bearer vF9df t4qmT'}, - ] - - valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} - - def _mocked_validate_bearer_token(self, token, scopes, request): - if not token: - return False - return True - - def test_create_token_callable_expires_in(self): - """ - Test retrieval of the expires in value by calling the callable expires_in property - """ - - expires_in_mock = mock.MagicMock() - request_mock = mock.MagicMock() - - token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) - token.create_token(request=request_mock) - - expires_in_mock.assert_called_once_with(request_mock) - - def test_create_token_non_callable_expires_in(self): - """ - When a non callable expires in is set this should just be set to the request - """ - - expires_in_mock = mock.NonCallableMagicMock() - request_mock = mock.MagicMock() - - token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) - token.create_token(request=request_mock) - - self.assertFalse(expires_in_mock.called) - self.assertEqual(request_mock.expires_in, expires_in_mock) - - def test_create_token_calls_get_id_token(self): - """ - When create_token is called the call should be forwarded to the get_id_token on the token validator - """ - request_mock = mock.MagicMock() - - with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', - autospec=True) as RequestValidatorMock: - - request_validator = RequestValidatorMock() - - token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) - token.create_token(request=request_mock) - - request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) - - def test_validate_request_token_from_headers(self): - """ - Bearer token get retrieved from headers. - """ - - with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ - mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', - autospec=True) as RequestValidatorMock: - request_validator_mock = RequestValidatorMock() - - token = JWTToken(request_validator=request_validator_mock) - - request = RequestMock('/uri') - # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch - # with autospec=True - request.scopes = mock.MagicMock() - request.headers = { - 'Authorization': 'Bearer some-token-from-header' - } - - token.validate_request(request=request) - - request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', - request.scopes, - request) - - def test_validate_token_from_request(self): - """ - Token get retrieved from request object. - """ - - with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ - mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', - autospec=True) as RequestValidatorMock: - request_validator_mock = RequestValidatorMock() - - token = JWTToken(request_validator=request_validator_mock) - - request = RequestMock('/uri') - # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch - # with autospec=True - request.scopes = mock.MagicMock() - request.access_token = 'some-token-from-request-object' - request.headers = {} - - token.validate_request(request=request) - - request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', - request.scopes, - request) - - def test_fake_bearer_is_not_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token - - for fake_header in self.fake_bearer_headers: - request = Request('/', headers=fake_header) - result = JWTToken(request_validator=request_validator).validate_request(request) - - self.assertFalse(result) - - def test_header_with_multiple_spaces_is_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token - request = Request('/', headers=self.valid_header_with_multiple_spaces) - result = JWTToken(request_validator=request_validator).validate_request(request) - - self.assertTrue(result) - - def test_estimate_type(self): - """ - Estimate type results for a jwt token - """ - - def test_token(token, expected_result): - with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: - jwt_token = JWTToken() - - request = RequestMock('/uri') - # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch - # with autospec=True - request.headers = { - 'Authorization': 'Bearer {}'.format(token) - } - - result = jwt_token.estimate_type(request=request) - - self.assertEqual(result, expected_result) - - test_items = ( - ('eyfoo.foo.foo', 10), - ('eyfoo.foo.foo.foo.foo', 10), - ('eyfoobar', 0) - ) - - for token, expected_result in test_items: - test_token(token, expected_result) diff --git a/tests/openid/__init__.py b/tests/openid/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/__init__.py diff --git a/tests/openid/connect/__init__.py b/tests/openid/connect/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/connect/__init__.py diff --git a/tests/openid/connect/core/__init__.py b/tests/openid/connect/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/connect/core/__init__.py diff --git a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py b/tests/openid/connect/core/endpoints/test_claims_handling.py index ff72673..37a7cdd 100644 --- a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py +++ b/tests/openid/connect/core/endpoints/test_claims_handling.py @@ -10,10 +10,12 @@ from __future__ import absolute_import, unicode_literals import mock -from oauthlib.oauth2 import InvalidRequestError, RequestValidator, Server +from oauthlib.oauth2 import RequestValidator + +from oauthlib.oauth2.rfc6749.endpoints.pre_configured import Server from ....unittest import TestCase -from .test_utils import get_fragment_credentials, get_query_credentials +from .test_utils import get_query_credentials class TestClaimsHandling(TestCase): @@ -81,7 +83,7 @@ class TestClaimsHandling(TestCase): } } - claims_urlquoted='%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' + claims_urlquoted = '%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s' h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope') @@ -90,8 +92,10 @@ class TestClaimsHandling(TestCase): code = get_query_credentials(h['Location'])['code'][0] token_uri = 'http://example.com/path' - _, body, _ = self.server.create_token_response(token_uri, - body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) + _, body, _ = self.server.create_token_response( + token_uri, + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code + ) self.assertDictEqual(self.claims_saved_with_bearer_token, claims) diff --git a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py index 89431b6..89431b6 100644 --- a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py +++ b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py diff --git a/tests/openid/connect/core/grant_types/test_authorization_code.py b/tests/openid/connect/core/grant_types/test_authorization_code.py new file mode 100644 index 0000000..1bad120 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_authorization_code.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +def get_id_token_mock(token, token_handler, request): + return "MOCKED_TOKEN" + + +class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDAuthCodeInterferenceTest, self).setUp() + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + +class OpenIDAuthCodeTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.code = '1234' + self.request.response_type = 'code' + self.request.grant_type = 'authorization_code' + self.request.redirect_uri = 'https://a.b/cb' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + self.url_query = 'https://a.b/cb?code=abc&state=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc' + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'query' + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + def set_scopes(self, client_id, code, client, request): + request.scopes = self.request.scopes + request.state = self.request.state + request.user = 'bob' + return True + + def test_create_token_response(self): + self.request.response_type = None + self.mock_validator.validate_code.side_effect = self.set_scopes + + bearer = BearerToken(self.mock_validator) + + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertIn('id_token', token) + self.assertIn('openid', token['scope']) + + self.mock_validator.reset_mock() + + self.request.scopes = ('hello', 'world') + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertNotIn('id_token', token) + self.assertNotIn('openid', token['scope']) diff --git a/tests/openid/connect/core/grant_types/test_dispatchers.py b/tests/openid/connect/core/grant_types/test_dispatchers.py new file mode 100644 index 0000000..f90ec46 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_dispatchers.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +import mock + +from oauthlib.common import Request + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.dispatchers import ( + ImplicitTokenGrantDispatcher, + AuthorizationTokenGrantDispatcher +) + +from oauthlib.oauth2.rfc6749.grant_types import ( + AuthorizationCodeGrant as OAuth2AuthorizationCodeGrant, + ImplicitGrant as OAuth2ImplicitGrant, +) + + +from ....unittest import TestCase + + +class ImplicitTokenGrantDispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + request_validator = mock.MagicMock() + implicit_grant = OAuth2ImplicitGrant(request_validator) + openid_connect_implicit = ImplicitGrant(request_validator) + + self.dispatcher = ImplicitTokenGrantDispatcher( + default_implicit_grant=implicit_grant, + oidc_implicit_grant=openid_connect_implicit + ) + + def test_create_authorization_response_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_create_authorization_response_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + +class DispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + self.request.decoded_body = ( + ("client_id", "me"), + ("code", "code"), + ("redirect_url", "https://a.b/cb"), + ) + + self.request_validator = mock.MagicMock() + self.auth_grant = OAuth2AuthorizationCodeGrant(self.request_validator) + self.openid_connect_auth = OAuth2AuthorizationCodeGrant(self.request_validator) + + +class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() + self.request.decoded_body = ( + ("client_id", "me"), + ("code", ""), + ("redirect_url", "https://a.b/cb"), + ) + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid_without_code(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOAuthTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_oauth(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/openid/connect/core/grant_types/test_hybrid.py b/tests/openid/connect/core/grant_types/test_hybrid.py new file mode 100644 index 0000000..531ae7f --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_hybrid.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDHybridInterferenceTest, self).setUp() + self.auth = HybridGrant(request_validator=self.mock_validator) diff --git a/tests/openid/connect/core/grant_types/test_implicit.py b/tests/openid/connect/core/grant_types/test_implicit.py new file mode 100644 index 0000000..56247d9 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_implicit.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.common import Request + +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from .test_authorization_code import get_id_token_mock, OpenIDAuthCodeTest + +from ....oauth2.rfc6749.grant_types.test_implicit import ImplicitGrantTest + + +class OpenIDImplicitInterferenceTest(ImplicitGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDImplicitInterferenceTest, self).setUp() + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + +class OpenIDImplicitTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.response_type = 'id_token token' + self.request.redirect_uri = 'https://a.b/cb' + self.request.nonce = 'zxc' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_type = 'id_token' + token = 'MOCKED_TOKEN' + url = 'https://a.b/cb#state=abc&id_token=%s' % token + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], url, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.nonce = None + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + +class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeTokenTest, self).setUp() + self.request.response_type = 'code token' + self.auth = HybridGrant(request_validator=self.mock_validator) + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + + +class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTest, self).setUp() + self.request.response_type = 'code id_token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token + + +class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() + self.request.response_type = 'code id_token token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token diff --git a/tests/openid/connect/core/test_request_validator.py b/tests/openid/connect/core/test_request_validator.py new file mode 100644 index 0000000..14a7c23 --- /dev/null +++ b/tests/openid/connect/core/test_request_validator.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from oauthlib.openid.connect.core.request_validator import RequestValidator + +from ....unittest import TestCase + + +class RequestValidatorTest(TestCase): + + def test_method_contracts(self): + v = RequestValidator() + self.assertRaises( + NotImplementedError, + v.get_authorization_code_scopes, + 'client_id', 'code', 'redirect_uri', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_jwt_bearer_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_id_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_jwt_bearer_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_id_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_authorization, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_login, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_user_match, + 'id_token_hint', 'scopes', 'claims', 'request' + ) diff --git a/tests/openid/connect/core/test_server.py b/tests/openid/connect/core/test_server.py new file mode 100644 index 0000000..83290db --- /dev/null +++ b/tests/openid/connect/core/test_server.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....unittest import TestCase + + +class AuthorizationEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant(request_validator=self.mock_validator) + auth_code.save_authorization_code = mock.MagicMock() + implicit = ImplicitGrant( + request_validator=self.mock_validator) + implicit.save_token = mock.MagicMock() + hybrid = HybridGrant(self.mock_validator) + + response_types = { + 'code': auth_code, + 'token': implicit, + 'id_token': implicit, + 'id_token token': implicit, + 'code token': hybrid, + 'code id_token': hybrid, + 'code token id_token': hybrid, + 'none': auth_code + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = AuthorizationEndpoint( + default_response_type='code', + default_token_type=token, + response_types=response_types + ) + + # TODO: Add hybrid grant test + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_implicit_grant(self): + uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) + + def test_none_grant(self): + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + # and without the state parameter + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + def test_missing_type(self): + uri = 'http://i.b/l?client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.InvalidRequestError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') + + def test_invalid_type(self): + uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.UnsupportedResponseTypeError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') + + +class TokenEndpointTest(TestCase): + + def setUp(self): + def set_user(request): + request.user = mock.MagicMock() + request.client = mock.MagicMock() + request.client.client_id = 'mocked_client_id' + return True + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = set_user + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant( + request_validator=self.mock_validator) + supported_types = { + 'authorization_code': auth_code, + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'scope': 'all of them', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + body = 'grant_type=authorization_code&code=abc&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + def test_missing_type(self): + _, body, _ = self.endpoint.create_token_response('', body='') + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + def test_invalid_type(self): + body = 'grant_type=invalid' + _, body, _ = self.endpoint.create_token_response('', body=body) + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) diff --git a/tests/openid/connect/core/test_tokens.py b/tests/openid/connect/core/test_tokens.py new file mode 100644 index 0000000..12c75f1 --- /dev/null +++ b/tests/openid/connect/core/test_tokens.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.openid.connect.core.tokens import JWTToken + +from ....unittest import TestCase + + +class JWTTokenTestCase(TestCase): + + def test_create_token_callable_expires_in(self): + """ + Test retrieval of the expires in value by calling the callable expires_in property + """ + + expires_in_mock = mock.MagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + expires_in_mock.assert_called_once_with(request_mock) + + def test_create_token_non_callable_expires_in(self): + """ + When a non callable expires in is set this should just be set to the request + """ + + expires_in_mock = mock.NonCallableMagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + self.assertFalse(expires_in_mock.called) + self.assertEqual(request_mock.expires_in, expires_in_mock) + + def test_create_token_calls_get_id_token(self): + """ + When create_token is called the call should be forwarded to the get_id_token on the token validator + """ + request_mock = mock.MagicMock() + + with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + + request_validator = RequestValidatorMock() + + token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) + token.create_token(request=request_mock) + + request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) + + def test_validate_request_token_from_headers(self): + """ + Bearer token get retrieved from headers. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.headers = { + 'Authorization': 'Bearer some-token-from-header' + } + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', + request.scopes, + request) + + def test_validate_token_from_request(self): + """ + Token get retrieved from request object. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.access_token = 'some-token-from-request-object' + request.headers = {} + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', + request.scopes, + request) + + def test_estimate_type(self): + """ + Estimate type results for a jwt token + """ + + def test_token(token, expected_result): + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: + jwt_token = JWTToken() + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.headers = { + 'Authorization': 'Bearer {}'.format(token) + } + + result = jwt_token.estimate_type(request=request) + + self.assertEqual(result, expected_result) + + test_items = ( + ('eyfoo.foo.foo', 10), + ('eyfoo.foo.foo.foo.foo', 10), + ('eyfoobar', 0) + ) + + for token, expected_result in test_items: + test_token(token, expected_result) |