diff options
author | Wiliam Souza <wiliamsouza83@gmail.com> | 2018-06-05 11:33:21 -0300 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2018-06-05 17:33:21 +0300 |
commit | d5a4d5ea0eab04ddddefac7d1e7a4902fc469286 (patch) | |
tree | 580faf95f8f7bed257b2dac1d29c26c3d9d1b219 /tests/openid | |
parent | a102731c88f496b557dedd4024fb9b82801d134a (diff) | |
download | oauthlib-d5a4d5ea0eab04ddddefac7d1e7a4902fc469286.tar.gz |
OpenID Connect split (#525)
* Add command to clean up builds to makefile
* Fix docs strings for endpoints pre_configured
* Chnage grant_types.openid_connect to include a deprecation warning be a backward compatible
* Fix doc string for rfc6749.request_validator
* Remove unused import
* Change import to be explicity
* Move JWTTokenTestCase to openid.connect.core.test_token
* Move JWTToken to oauthlib.openid.connect.core.tokens
* Move to openid connect test
* Move openid connect exceptions to its own file
* Remove openid connect from oauth2 server
* Remove JWTToken from oauth tokens
* Remove grant_types.openid_connect file
* Add oauthlib/openid estructure and tests
Diffstat (limited to 'tests/openid')
-rw-r--r-- | tests/openid/__init__.py | 0 | ||||
-rw-r--r-- | tests/openid/connect/__init__.py | 0 | ||||
-rw-r--r-- | tests/openid/connect/core/__init__.py | 0 | ||||
-rw-r--r-- | tests/openid/connect/core/endpoints/test_claims_handling.py | 109 | ||||
-rw-r--r-- | tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py | 85 | ||||
-rw-r--r-- | tests/openid/connect/core/grant_types/test_authorization_code.py | 153 | ||||
-rw-r--r-- | tests/openid/connect/core/grant_types/test_dispatchers.py | 125 | ||||
-rw-r--r-- | tests/openid/connect/core/grant_types/test_hybrid.py | 13 | ||||
-rw-r--r-- | tests/openid/connect/core/grant_types/test_implicit.py | 148 | ||||
-rw-r--r-- | tests/openid/connect/core/test_request_validator.py | 52 | ||||
-rw-r--r-- | tests/openid/connect/core/test_server.py | 178 | ||||
-rw-r--r-- | tests/openid/connect/core/test_tokens.py | 133 |
12 files changed, 996 insertions, 0 deletions
diff --git a/tests/openid/__init__.py b/tests/openid/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/__init__.py diff --git a/tests/openid/connect/__init__.py b/tests/openid/connect/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/connect/__init__.py diff --git a/tests/openid/connect/core/__init__.py b/tests/openid/connect/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/connect/core/__init__.py diff --git a/tests/openid/connect/core/endpoints/test_claims_handling.py b/tests/openid/connect/core/endpoints/test_claims_handling.py new file mode 100644 index 0000000..37a7cdd --- /dev/null +++ b/tests/openid/connect/core/endpoints/test_claims_handling.py @@ -0,0 +1,109 @@ +"""Ensure OpenID Connect Authorization Request 'claims' are preserved across authorization. + +The claims parameter is an optional query param for the Authorization Request endpoint + but if it is provided and is valid it needs to be deserialized (from urlencoded JSON) + and persisted with the authorization code itself, then in the subsequent Access Token + request the claims should be transferred (via the oauthlib request) to be persisted + with the Access Token when it is created. +""" +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.oauth2 import RequestValidator + +from oauthlib.oauth2.rfc6749.endpoints.pre_configured import Server + +from ....unittest import TestCase +from .test_utils import get_query_credentials + + +class TestClaimsHandling(TestCase): + + DEFAULT_REDIRECT_URI = 'http://i.b./path' + + def set_scopes(self, scopes): + def set_request_scopes(client_id, code, client, request): + request.scopes = scopes + return True + return set_request_scopes + + def set_user(self, request): + request.user = 'foo' + request.client_id = 'bar' + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def save_claims_with_code(self, client_id, code, request, *args, **kwargs): + # a real validator would save the claims with the code during save_authorization_code() + self.claims_from_auth_code_request = request.claims + self.scopes = request.scopes.split() + + def retrieve_claims_saved_with_code(self, client_id, code, client, request, *args, **kwargs): + request.claims = self.claims_from_auth_code_request + request.scopes = self.scopes + + return True + + def save_claims_with_bearer_token(self, token, request, *args, **kwargs): + # a real validator would save the claims with the access token during save_bearer_token() + self.claims_saved_with_bearer_token = request.claims + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = TestClaimsHandling.DEFAULT_REDIRECT_URI + self.validator.authenticate_client.side_effect = self.set_client + + self.validator.save_authorization_code.side_effect = self.save_claims_with_code + self.validator.validate_code.side_effect = self.retrieve_claims_saved_with_code + self.validator.save_token.side_effect = self.save_claims_with_bearer_token + + self.server = Server(self.validator) + + def test_claims_stored_on_code_creation(self): + + claims = { + "id_token": { + "claim_1": None, + "claim_2": { + "essential": True + } + }, + "userinfo": { + "claim_3": { + "essential": True + }, + "claim_4": None + } + } + + claims_urlquoted = '%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' + uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s' + + h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope') + + self.assertDictEqual(self.claims_from_auth_code_request, claims) + + code = get_query_credentials(h['Location'])['code'][0] + token_uri = 'http://example.com/path' + _, body, _ = self.server.create_token_response( + token_uri, + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code + ) + + self.assertDictEqual(self.claims_saved_with_bearer_token, claims) + + def test_invalid_claims(self): + uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=this-is-not-json' + + h, b, s = self.server.create_authorization_response(uri, scopes='openid test_scope') + error = get_query_credentials(h['Location'])['error'][0] + error_desc = get_query_credentials(h['Location'])['error_description'][0] + self.assertEqual(error, 'invalid_request') + self.assertEqual(error_desc, "Malformed claims parameter") diff --git a/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py new file mode 100644 index 0000000..89431b6 --- /dev/null +++ b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py @@ -0,0 +1,85 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.oauth2 import InvalidRequestError +from oauthlib.oauth2.rfc6749.endpoints.authorization import \ + AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.grant_types import OpenIDConnectAuthCode +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from ....unittest import TestCase + +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode + + + + +class OpenIDConnectEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + grant = OpenIDConnectAuthCode(request_validator=self.mock_validator) + bearer = BearerToken(self.mock_validator) + self.endpoint = AuthorizationEndpoint(grant, bearer, + response_types={'code': grant}) + params = { + 'prompt': 'consent', + 'display': 'touch', + 'nonce': 'abcd', + 'state': 'abc', + 'redirect_uri': 'https://a.b/cb', + 'response_type': 'code', + 'client_id': 'abcdef', + 'scope': 'hello openid' + } + self.url = 'http://a.b/path?' + urlencode(params) + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + @mock.patch('oauthlib.common.generate_token') + def test_authorization_endpoint_handles_prompt(self, generate_token): + generate_token.return_value = "MOCK_CODE" + # In the GET view: + scopes, creds = self.endpoint.validate_authorization_request(self.url) + # In the POST view: + creds['scopes'] = scopes + h, b, s = self.endpoint.create_authorization_response(self.url, + credentials=creds) + expected = 'https://a.b/cb?state=abc&code=MOCK_CODE' + self.assertURLEqual(h['Location'], expected) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + def test_prompt_none_exclusiveness(self): + """ + Test that prompt=none can't be used with another prompt value. + """ + params = { + 'prompt': 'none consent', + 'state': 'abc', + 'redirect_uri': 'https://a.b/cb', + 'response_type': 'code', + 'client_id': 'abcdef', + 'scope': 'hello openid' + } + url = 'http://a.b/path?' + urlencode(params) + with self.assertRaises(InvalidRequestError): + self.endpoint.validate_authorization_request(url) + + def test_oidc_params_preservation(self): + """ + Test that the nonce parameter is passed through. + """ + scopes, creds = self.endpoint.validate_authorization_request(self.url) + + self.assertEqual(creds['prompt'], {'consent'}) + self.assertEqual(creds['nonce'], 'abcd') + self.assertEqual(creds['display'], 'touch') diff --git a/tests/openid/connect/core/grant_types/test_authorization_code.py b/tests/openid/connect/core/grant_types/test_authorization_code.py new file mode 100644 index 0000000..1bad120 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_authorization_code.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +def get_id_token_mock(token, token_handler, request): + return "MOCKED_TOKEN" + + +class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDAuthCodeInterferenceTest, self).setUp() + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + +class OpenIDAuthCodeTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.code = '1234' + self.request.response_type = 'code' + self.request.grant_type = 'authorization_code' + self.request.redirect_uri = 'https://a.b/cb' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + self.url_query = 'https://a.b/cb?code=abc&state=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc' + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'query' + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + def set_scopes(self, client_id, code, client, request): + request.scopes = self.request.scopes + request.state = self.request.state + request.user = 'bob' + return True + + def test_create_token_response(self): + self.request.response_type = None + self.mock_validator.validate_code.side_effect = self.set_scopes + + bearer = BearerToken(self.mock_validator) + + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertIn('id_token', token) + self.assertIn('openid', token['scope']) + + self.mock_validator.reset_mock() + + self.request.scopes = ('hello', 'world') + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertNotIn('id_token', token) + self.assertNotIn('openid', token['scope']) diff --git a/tests/openid/connect/core/grant_types/test_dispatchers.py b/tests/openid/connect/core/grant_types/test_dispatchers.py new file mode 100644 index 0000000..f90ec46 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_dispatchers.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +import mock + +from oauthlib.common import Request + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.dispatchers import ( + ImplicitTokenGrantDispatcher, + AuthorizationTokenGrantDispatcher +) + +from oauthlib.oauth2.rfc6749.grant_types import ( + AuthorizationCodeGrant as OAuth2AuthorizationCodeGrant, + ImplicitGrant as OAuth2ImplicitGrant, +) + + +from ....unittest import TestCase + + +class ImplicitTokenGrantDispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + request_validator = mock.MagicMock() + implicit_grant = OAuth2ImplicitGrant(request_validator) + openid_connect_implicit = ImplicitGrant(request_validator) + + self.dispatcher = ImplicitTokenGrantDispatcher( + default_implicit_grant=implicit_grant, + oidc_implicit_grant=openid_connect_implicit + ) + + def test_create_authorization_response_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_create_authorization_response_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + +class DispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + self.request.decoded_body = ( + ("client_id", "me"), + ("code", "code"), + ("redirect_url", "https://a.b/cb"), + ) + + self.request_validator = mock.MagicMock() + self.auth_grant = OAuth2AuthorizationCodeGrant(self.request_validator) + self.openid_connect_auth = OAuth2AuthorizationCodeGrant(self.request_validator) + + +class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() + self.request.decoded_body = ( + ("client_id", "me"), + ("code", ""), + ("redirect_url", "https://a.b/cb"), + ) + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid_without_code(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOAuthTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_oauth(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/openid/connect/core/grant_types/test_hybrid.py b/tests/openid/connect/core/grant_types/test_hybrid.py new file mode 100644 index 0000000..531ae7f --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_hybrid.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDHybridInterferenceTest, self).setUp() + self.auth = HybridGrant(request_validator=self.mock_validator) diff --git a/tests/openid/connect/core/grant_types/test_implicit.py b/tests/openid/connect/core/grant_types/test_implicit.py new file mode 100644 index 0000000..56247d9 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_implicit.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.common import Request + +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from .test_authorization_code import get_id_token_mock, OpenIDAuthCodeTest + +from ....oauth2.rfc6749.grant_types.test_implicit import ImplicitGrantTest + + +class OpenIDImplicitInterferenceTest(ImplicitGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDImplicitInterferenceTest, self).setUp() + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + +class OpenIDImplicitTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.response_type = 'id_token token' + self.request.redirect_uri = 'https://a.b/cb' + self.request.nonce = 'zxc' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_type = 'id_token' + token = 'MOCKED_TOKEN' + url = 'https://a.b/cb#state=abc&id_token=%s' % token + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], url, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.nonce = None + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + +class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeTokenTest, self).setUp() + self.request.response_type = 'code token' + self.auth = HybridGrant(request_validator=self.mock_validator) + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + + +class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTest, self).setUp() + self.request.response_type = 'code id_token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token + + +class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() + self.request.response_type = 'code id_token token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token diff --git a/tests/openid/connect/core/test_request_validator.py b/tests/openid/connect/core/test_request_validator.py new file mode 100644 index 0000000..14a7c23 --- /dev/null +++ b/tests/openid/connect/core/test_request_validator.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from oauthlib.openid.connect.core.request_validator import RequestValidator + +from ....unittest import TestCase + + +class RequestValidatorTest(TestCase): + + def test_method_contracts(self): + v = RequestValidator() + self.assertRaises( + NotImplementedError, + v.get_authorization_code_scopes, + 'client_id', 'code', 'redirect_uri', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_jwt_bearer_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_id_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_jwt_bearer_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_id_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_authorization, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_login, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_user_match, + 'id_token_hint', 'scopes', 'claims', 'request' + ) diff --git a/tests/openid/connect/core/test_server.py b/tests/openid/connect/core/test_server.py new file mode 100644 index 0000000..83290db --- /dev/null +++ b/tests/openid/connect/core/test_server.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....unittest import TestCase + + +class AuthorizationEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant(request_validator=self.mock_validator) + auth_code.save_authorization_code = mock.MagicMock() + implicit = ImplicitGrant( + request_validator=self.mock_validator) + implicit.save_token = mock.MagicMock() + hybrid = HybridGrant(self.mock_validator) + + response_types = { + 'code': auth_code, + 'token': implicit, + 'id_token': implicit, + 'id_token token': implicit, + 'code token': hybrid, + 'code id_token': hybrid, + 'code token id_token': hybrid, + 'none': auth_code + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = AuthorizationEndpoint( + default_response_type='code', + default_token_type=token, + response_types=response_types + ) + + # TODO: Add hybrid grant test + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_implicit_grant(self): + uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) + + def test_none_grant(self): + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + # and without the state parameter + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + def test_missing_type(self): + uri = 'http://i.b/l?client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.InvalidRequestError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') + + def test_invalid_type(self): + uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.UnsupportedResponseTypeError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') + + +class TokenEndpointTest(TestCase): + + def setUp(self): + def set_user(request): + request.user = mock.MagicMock() + request.client = mock.MagicMock() + request.client.client_id = 'mocked_client_id' + return True + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = set_user + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant( + request_validator=self.mock_validator) + supported_types = { + 'authorization_code': auth_code, + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'scope': 'all of them', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + body = 'grant_type=authorization_code&code=abc&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + def test_missing_type(self): + _, body, _ = self.endpoint.create_token_response('', body='') + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + def test_invalid_type(self): + body = 'grant_type=invalid' + _, body, _ = self.endpoint.create_token_response('', body=body) + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) diff --git a/tests/openid/connect/core/test_tokens.py b/tests/openid/connect/core/test_tokens.py new file mode 100644 index 0000000..12c75f1 --- /dev/null +++ b/tests/openid/connect/core/test_tokens.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.openid.connect.core.tokens import JWTToken + +from ....unittest import TestCase + + +class JWTTokenTestCase(TestCase): + + def test_create_token_callable_expires_in(self): + """ + Test retrieval of the expires in value by calling the callable expires_in property + """ + + expires_in_mock = mock.MagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + expires_in_mock.assert_called_once_with(request_mock) + + def test_create_token_non_callable_expires_in(self): + """ + When a non callable expires in is set this should just be set to the request + """ + + expires_in_mock = mock.NonCallableMagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + self.assertFalse(expires_in_mock.called) + self.assertEqual(request_mock.expires_in, expires_in_mock) + + def test_create_token_calls_get_id_token(self): + """ + When create_token is called the call should be forwarded to the get_id_token on the token validator + """ + request_mock = mock.MagicMock() + + with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + + request_validator = RequestValidatorMock() + + token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) + token.create_token(request=request_mock) + + request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) + + def test_validate_request_token_from_headers(self): + """ + Bearer token get retrieved from headers. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.headers = { + 'Authorization': 'Bearer some-token-from-header' + } + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', + request.scopes, + request) + + def test_validate_token_from_request(self): + """ + Token get retrieved from request object. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.access_token = 'some-token-from-request-object' + request.headers = {} + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', + request.scopes, + request) + + def test_estimate_type(self): + """ + Estimate type results for a jwt token + """ + + def test_token(token, expected_result): + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: + jwt_token = JWTToken() + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.headers = { + 'Authorization': 'Bearer {}'.format(token) + } + + result = jwt_token.estimate_type(request=request) + + self.assertEqual(result, expected_result) + + test_items = ( + ('eyfoo.foo.foo', 10), + ('eyfoo.foo.foo.foo.foo', 10), + ('eyfoobar', 0) + ) + + for token, expected_result in test_items: + test_token(token, expected_result) |