From e575cca3e5d18b1e7051c64f435f2cdea71a29ab Mon Sep 17 00:00:00 2001 From: Wiliam Souza Date: Sun, 1 Oct 2017 03:07:11 -0300 Subject: OpenID connect improvements (#484) * Change create_token_response to only save access_token when it's present in request.response_type * Remove unused import, fix indentation and improve comment * Fix AuthorizationEndpoint response_type for OpenID Connect hybrid flow * Add new ImplicitTokenGrantDispatcher Changes AuthorizationEndpoint response_type `'token'`, `'id_token'` and `'id_token token'` to work with OpenID Connect and OAuth2 implicit flow in a transparent way * Add new AuthTokenGrantDispatcher Change AuthorizationEndpoint grant_types `'authorization_code'` to work with OpenID Connect and OAuth2 authorization flow in a transparent way * Change tests to include required client_id and redirect_uri * Remove AuthorizationEndpoint grant_types `'openid'` Now OpenID Connect and OAuth2 authorization flow can use `authorization_code` in a transparent way * Add sone blank lines and fix indentation * Change AuthorizationEndpoint grant type id_token and id_token token to use openid_connect_implicit direct * Change default empty value to None and fix a typo * Add assert called to AuthTokenGrantDispatcher tests * Add request to get_authorization_code_scopes --- .../rfc6749/endpoints/test_claims_handling.py | 2 +- .../rfc6749/endpoints/test_scope_handling.py | 2 +- .../rfc6749/grant_types/test_openid_connect.py | 114 ++++++++++++++++++++- tests/oauth2/rfc6749/test_server.py | 8 +- 4 files changed, 119 insertions(+), 7 deletions(-) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py b/tests/oauth2/rfc6749/endpoints/test_claims_handling.py index 9795c80..ff72673 100644 --- a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py +++ b/tests/oauth2/rfc6749/endpoints/test_claims_handling.py @@ -91,7 +91,7 @@ class TestClaimsHandling(TestCase): code = get_query_credentials(h['Location'])['code'][0] token_uri = 'http://example.com/path' _, body, _ = self.server.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) self.assertDictEqual(self.claims_saved_with_bearer_token, claims) diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py index 87781b3..8490c03 100644 --- a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py +++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py @@ -87,7 +87,7 @@ class TestScopeHandling(TestCase): self.assertIn('Location', h) code = get_query_credentials(h['Location'])['code'][0] _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) self.assertEqual(json.loads(body)['scope'], decoded_scope) # implicit grant diff --git a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py index f10d36c..573d491 100644 --- a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py +++ b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py @@ -6,7 +6,11 @@ import json import mock from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.grant_types import (OIDCNoPrompt, +from oauthlib.oauth2.rfc6749.grant_types import (AuthTokenGrantDispatcher, + AuthorizationCodeGrant, + ImplicitGrant, + ImplicitTokenGrantDispatcher, + OIDCNoPrompt, OpenIDConnectAuthCode, OpenIDConnectHybrid, OpenIDConnectImplicit) @@ -24,6 +28,7 @@ class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): super(OpenIDAuthCodeInterferenceTest, self).setUp() self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) + class OpenIDImplicitInterferenceTest(ImplicitGrantTest): """Test that OpenID don't interfere with normal OAuth 2 flows.""" @@ -270,6 +275,7 @@ class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): def setUp(self): @@ -280,6 +286,7 @@ class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token + class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): def setUp(self): @@ -289,3 +296,108 @@ class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): token = 'MOCKED_TOKEN' self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + + +class ImplicitTokenGrantDispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + request_validator = mock.MagicMock() + implicit_grant = ImplicitGrant(request_validator) + openid_connect_implicit = OpenIDConnectImplicit(request_validator) + + self.dispatcher = ImplicitTokenGrantDispatcher( + default_implicit_grant=implicit_grant, + oidc_implicit_grant=openid_connect_implicit + ) + + def test_create_authorization_response_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OpenIDConnectImplicit)) + + def test_validate_authorization_request_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OpenIDConnectImplicit)) + + def test_create_authorization_response_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + +class DispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + self.request.decoded_body = ( + ("client_id", "me"), + ("code", "code"), + ("redirect_url", "https://a.b/cb"), + ) + + self.request_validator = mock.MagicMock() + self.auth_grant = AuthorizationCodeGrant(self.request_validator) + self.openid_connect_auth = OpenIDConnectAuthCode(self.request_validator) + + +class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OpenIDConnectAuthCode)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() + self.request.decoded_body = ( + ("client_id", "me"), + ("code", ""), + ("redirect_url", "https://a.b/cb"), + ) + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid_without_code(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) + self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOAuthTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') + self.dispatcher = AuthTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_oauth(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/oauth2/rfc6749/test_server.py b/tests/oauth2/rfc6749/test_server.py index 305b795..da303ce 100644 --- a/tests/oauth2/rfc6749/test_server.py +++ b/tests/oauth2/rfc6749/test_server.py @@ -279,7 +279,7 @@ twIDAQAB @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): - body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) @@ -293,7 +293,7 @@ twIDAQAB } self.assertEqual(body, token) - body = 'grant_type=authorization_code&code=abc&state=xyz' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=xyz' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) @@ -349,12 +349,12 @@ twIDAQAB self.assertEqual(body, token) def test_missing_type(self): - _, body, _ = self.endpoint.create_token_response('', body='') + _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc') token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) def test_invalid_type(self): - body = 'grant_type=invalid' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc' _, body, _ = self.endpoint.create_token_response('', body=body) token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) -- cgit v1.2.1 From cfb82feb03fcd60b3b66ac09bf1b478cd5f11b7d Mon Sep 17 00:00:00 2001 From: Viktor Haag Date: Tue, 14 Nov 2017 07:44:44 -0800 Subject: Add support for HMAC-SHA256 (builds on PR#388) (#498) * Add support for HMAC-SHA256 * Add explicit declaration of HMAC-SHA1 and point HMAC at it To avoid confusion, HMAC constant name should explicitly state which SHA variant is used, but for backwards compatibility, SIGNATURE_HMAC is still needed * add support for HMAC-SHA256 including tests and comments * constructor tests verify client built with correct signer method --- tests/oauth1/rfc5849/test_client.py | 40 +++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) (limited to 'tests') diff --git a/tests/oauth1/rfc5849/test_client.py b/tests/oauth1/rfc5849/test_client.py index dcb4c3d..777efc2 100644 --- a/tests/oauth1/rfc5849/test_client.py +++ b/tests/oauth1/rfc5849/test_client.py @@ -2,7 +2,8 @@ from __future__ import absolute_import, unicode_literals from oauthlib.common import Request -from oauthlib.oauth1 import (SIGNATURE_PLAINTEXT, SIGNATURE_RSA, +from oauthlib.oauth1 import (SIGNATURE_PLAINTEXT, SIGNATURE_HMAC_SHA1, + SIGNATURE_HMAC_SHA256, SIGNATURE_RSA, SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY) from oauthlib.oauth1.rfc5849 import Client, bytes_type @@ -62,13 +63,48 @@ class ClientConstructorTests(TestCase): self.assertIsInstance(k, bytes_type) self.assertIsInstance(v, bytes_type) + def test_hmac_sha1(self): + client = Client('client_key') + # instance is using the correct signer method + self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_HMAC_SHA1], + client.SIGNATURE_METHODS[client.signature_method]) + + def test_hmac_sha256(self): + client = Client('client_key', signature_method=SIGNATURE_HMAC_SHA256) + # instance is using the correct signer method + self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_HMAC_SHA256], + client.SIGNATURE_METHODS[client.signature_method]) + def test_rsa(self): client = Client('client_key', signature_method=SIGNATURE_RSA) - self.assertIsNone(client.rsa_key) # don't need an RSA key to instantiate + # instance is using the correct signer method + self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_RSA], + client.SIGNATURE_METHODS[client.signature_method]) + # don't need an RSA key to instantiate + self.assertIsNone(client.rsa_key) class SignatureMethodTest(TestCase): + def test_hmac_sha1_method(self): + client = Client('client_key', timestamp='1234567890', nonce='abc') + u, h, b = client.sign('http://example.com') + correct = ('OAuth oauth_nonce="abc", oauth_timestamp="1234567890", ' + 'oauth_version="1.0", oauth_signature_method="HMAC-SHA1", ' + 'oauth_consumer_key="client_key", ' + 'oauth_signature="hH5BWYVqo7QI4EmPBUUe9owRUUQ%3D"') + self.assertEqual(h['Authorization'], correct) + + def test_hmac_sha256_method(self): + client = Client('client_key', signature_method=SIGNATURE_HMAC_SHA256, + timestamp='1234567890', nonce='abc') + u, h, b = client.sign('http://example.com') + correct = ('OAuth oauth_nonce="abc", oauth_timestamp="1234567890", ' + 'oauth_version="1.0", oauth_signature_method="HMAC-SHA256", ' + 'oauth_consumer_key="client_key", ' + 'oauth_signature="JzgJWBxX664OiMW3WE4MEjtYwOjI%2FpaUWHqtdHe68Es%3D"') + self.assertEqual(h['Authorization'], correct) + def test_rsa_method(self): private_key = ( "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDk1/bxy" -- cgit v1.2.1 From 9b95e4e8f094d78abe577203ad1ef53aecfdb270 Mon Sep 17 00:00:00 2001 From: Jonathan Huot Date: Wed, 8 Nov 2017 09:55:03 +0100 Subject: Added initial introspect support --- .../rfc6749/endpoints/test_introspect_endpoint.py | 132 +++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py (limited to 'tests') diff --git a/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py new file mode 100644 index 0000000..7ec8190 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from json import loads + +from mock import MagicMock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import RequestValidator, IntrospectEndpoint + +from ....unittest import TestCase + + +class IntrospectEndpointTest(TestCase): + + def setUp(self): + self.validator = MagicMock(wraps=RequestValidator()) + self.validator.client_authentication_required.return_value = True + self.validator.authenticate_client.return_value = True + self.validator.validate_bearer_token.return_value = True + self.validator.introspect_token.return_value = {} + self.endpoint = IntrospectEndpoint(self.validator) + + self.uri = 'should_not_matter' + self.headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + self.resp_h = { + 'Cache-Control': 'no-store', + 'Content-Type': 'application/json', + 'Pragma': 'no-cache' + } + self.resp_b = { + "active": True + } + + def test_introspect_token(self): + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_nohint(self): + # don't specify token_type_hint + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_false(self): + self.validator.introspect_token.return_value = None + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": False}) + self.assertEqual(s, 200) + + def test_introspect_token_claims(self): + self.validator.introspect_token.return_value = {"foo": "bar"} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_claims_spoof_active(self): + self.validator.introspect_token.return_value = {"foo": "bar", "active": False} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_client_authentication_failed(self): + self.validator.authenticate_client.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_introspect_token_public_client_authentication(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = True + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_public_client_authentication_failed(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + + def test_introspect_unsupported_token(self): + endpoint = IntrospectEndpoint(self.validator, + supported_token_types=['access_token']) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'unsupported_token_type') + self.assertEqual(s, 400) + + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body='') + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertEqual(s, 400) -- cgit v1.2.1 From d7fc1336d81b39f3d2193eb3155ff66da6caadd9 Mon Sep 17 00:00:00 2001 From: Antoine Bertin Date: Mon, 29 Jan 2018 10:17:54 +0100 Subject: Fix cliend_id in web request body (#505) Previously, cliend_id was always included in the request body in the Authorization Code flow and the client_id parameter was ignored in contradiction with the docs. Fixes #495 --- tests/oauth2/rfc6749/clients/test_web_application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/clients/test_web_application.py b/tests/oauth2/rfc6749/clients/test_web_application.py index 85b247d..0a80c9a 100644 --- a/tests/oauth2/rfc6749/clients/test_web_application.py +++ b/tests/oauth2/rfc6749/clients/test_web_application.py @@ -38,7 +38,7 @@ class WebApplicationClientTest(TestCase): code = "zzzzaaaa" body = "not=empty" - body_code = "not=empty&grant_type=authorization_code&code=%s&client_id=%s" % (code, client_id) + body_code = "not=empty&grant_type=authorization_code&code=%s" % code body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" body_kwargs = body_code + "&some=providers&require=extra+arguments" -- cgit v1.2.1 From 2fe1cdb88e076f624824496c4aba6a8665e991d9 Mon Sep 17 00:00:00 2001 From: Wiliam Souza Date: Tue, 30 Jan 2018 17:30:26 -0200 Subject: Openid connect jwt (#488) * Add JWT token with it the server knows how to validate this new type of token in resource requests * Change find_token_type sorted function to reverse result and choose the valued estimated token handler * Add validate_id_token method to RequestValidator * Added unittest for JWTToken model * Updated version of Mock * Add get_jwt_bearer_token and validate_jwt_bearer_token oauthlib.oauth2.RequestValidator and change oauthlib.oauth2.tokens JWTToken to use it * Change to improve token type estimate test * Add a note in RequestValidator.validate_jwt_bearer_token about error 5xx rather 4xx --- tests/oauth2/rfc6749/test_tokens.py | 128 ++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index e2e558d..570afb0 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -1,5 +1,7 @@ from __future__ import absolute_import, unicode_literals +import mock + from oauthlib.oauth2.rfc6749.tokens import * from ...unittest import TestCase @@ -80,3 +82,129 @@ class TokenTest(TestCase): self.assertEqual(prepare_bearer_headers(self.token), self.bearer_headers) self.assertEqual(prepare_bearer_body(self.token), self.bearer_body) self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri) + + +class JWTTokenTestCase(TestCase): + + def test_create_token_callable_expires_in(self): + """ + Test retrieval of the expires in value by calling the callable expires_in property + """ + + expires_in_mock = mock.MagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + expires_in_mock.assert_called_once_with(request_mock) + + def test_create_token_non_callable_expires_in(self): + """ + When a non callable expires in is set this should just be set to the request + """ + + expires_in_mock = mock.NonCallableMagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + self.assertFalse(expires_in_mock.called) + self.assertEqual(request_mock.expires_in, expires_in_mock) + + def test_create_token_calls_get_id_token(self): + """ + When create_token is called the call should be forwarded to the get_id_token on the token validator + """ + request_mock = mock.MagicMock() + + with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + + request_validator = RequestValidatorMock() + + token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) + token.create_token(request=request_mock) + + request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) + + def test_validate_request_token_from_headers(self): + """ + Bearer token get retrieved from headers. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.headers = { + 'Authorization': 'Bearer some-token-from-header' + } + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', + request.scopes, + request) + + def test_validate_token_from_request(self): + """ + Token get retrieved from request object. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.access_token = 'some-token-from-request-object' + request.headers = {} + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', + request.scopes, + request) + + def test_estimate_type(self): + """ + Estimate type results for a jwt token + """ + + def test_token(token, expected_result): + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: + jwt_token = JWTToken() + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.headers = { + 'Authorization': 'Bearer {}'.format(token) + } + + result = jwt_token.estimate_type(request=request) + + self.assertEqual(result, expected_result) + + test_items = ( + ('eyfoo.foo.foo', 10), + ('eyfoo.foo.foo.foo.foo', 10), + ('eyfoobar', 0) + ) + + for token, expected_result in test_items: + test_token(token, expected_result) -- cgit v1.2.1 From 657065d76d59a100ffcacd0954fb2091552dfaa2 Mon Sep 17 00:00:00 2001 From: Pieter Ennes Date: Tue, 8 May 2018 21:14:35 +0100 Subject: Avoid populating spurious token credentials (#542) --- .../oauth2/rfc6749/clients/test_mobile_application.py | 12 ++++++++++++ tests/oauth2/rfc6749/clients/test_web_application.py | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/clients/test_mobile_application.py b/tests/oauth2/rfc6749/clients/test_mobile_application.py index 309220b..51e4dab 100644 --- a/tests/oauth2/rfc6749/clients/test_mobile_application.py +++ b/tests/oauth2/rfc6749/clients/test_mobile_application.py @@ -69,6 +69,18 @@ class MobileApplicationClientTest(TestCase): uri = client.prepare_request_uri(self.uri, **self.kwargs) self.assertURLEqual(uri, self.uri_kwargs) + def test_populate_attributes(self): + + client = MobileApplicationClient(self.client_id) + + response_uri = (self.response_uri + "&code=EVIL-CODE") + + client.parse_request_uri_response(response_uri, scope=self.scope) + + # We must not accidentally pick up any further security + # credentials at this point. + self.assertIsNone(client.code) + def test_parse_token_response(self): client = MobileApplicationClient(self.client_id) diff --git a/tests/oauth2/rfc6749/clients/test_web_application.py b/tests/oauth2/rfc6749/clients/test_web_application.py index 0a80c9a..4ecc3b3 100644 --- a/tests/oauth2/rfc6749/clients/test_web_application.py +++ b/tests/oauth2/rfc6749/clients/test_web_application.py @@ -117,6 +117,25 @@ class WebApplicationClientTest(TestCase): self.response_uri, state="invalid") + def test_populate_attributes(self): + + client = WebApplicationClient(self.client_id) + + response_uri = (self.response_uri + + "&access_token=EVIL-TOKEN" + "&refresh_token=EVIL-TOKEN" + "&mac_key=EVIL-KEY") + + client.parse_request_uri_response(response_uri, self.state) + + self.assertEqual(client.code, self.code) + + # We must not accidentally pick up any further security + # credentials at this point. + self.assertIsNone(client.access_token) + self.assertIsNone(client.refresh_token) + self.assertIsNone(client.mac_key) + def test_parse_token_response(self): client = WebApplicationClient(self.client_id) -- cgit v1.2.1 From a306b12b98b5d2aaf469b89b956db0df050823e7 Mon Sep 17 00:00:00 2001 From: Florian Strzelecki Date: Mon, 21 May 2018 07:15:22 +0200 Subject: Add test coverage (#544) * Add testcase for prepare_token_request() * Add testcase for InsecureTransportError in add_token() * Fix typo in testcase of add_token() for MAC token type * Add testcase for TokenExpiredError in add_token() * Add testcase for prepare_request_body without private key * Add testcase for optional kwargs in prepare_request_body() --- tests/oauth2/rfc6749/clients/test_base.py | 72 +++++++++++++++++++++- .../rfc6749/clients/test_service_application.py | 70 ++++++++++++++++++++- 2 files changed, 137 insertions(+), 5 deletions(-) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/clients/test_base.py b/tests/oauth2/rfc6749/clients/test_base.py index c788bc1..d48a944 100644 --- a/tests/oauth2/rfc6749/clients/test_base.py +++ b/tests/oauth2/rfc6749/clients/test_base.py @@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals import datetime from oauthlib import common -from oauthlib.oauth2 import Client, InsecureTransportError +from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError from oauthlib.oauth2.rfc6749 import utils from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY @@ -51,10 +51,26 @@ class ClientTest(TestCase): self.assertFormBodyEqual(body, self.body) self.assertEqual(headers, self.bearer_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + client = Client(self.client_id, access_token=self.access_token, token_type="Bearer") + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers) + # Missing access token client = Client(self.client_id) self.assertRaises(ValueError, client.add_token, self.uri) + # Expired token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, headers=self.headers) + # The default token placement, bearer in auth header client = Client(self.client_id, access_token=self.access_token) uri, headers, body = client.add_token(self.uri, body=self.body, @@ -150,8 +166,26 @@ class ClientTest(TestCase): self.assertEqual(uri, self.uri) self.assertEqual(body, self.body) self.assertEqual(headers, self.mac_00_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) - # Add the Authorization header (draft 00) + # Add the Authorization header (draft 01) client = Client(self.client_id, token_type="MAC", access_token=self.access_token, mac_key=self.mac_key, mac_algorithm="hmac-sha-1") @@ -160,7 +194,24 @@ class ClientTest(TestCase): self.assertEqual(uri, self.uri) self.assertEqual(body, self.body) self.assertEqual(headers, self.mac_01_header) - + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + draft=1) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + draft=1) def test_revocation_request(self): client = Client(self.client_id) @@ -208,6 +259,21 @@ class ClientTest(TestCase): # NotImplementedError self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url) + def test_prepare_token_request(self): + redirect_url = 'https://example.com/callback/' + scopes = 'read' + token_url = 'https://example.com/token/' + state = 'fake_state' + + client = Client(self.client_id, scope=scopes, state=state) + + # Non-HTTPS + self.assertRaises(InsecureTransportError, + client.prepare_token_request, 'http://example.com/token/') + + # NotImplementedError + self.assertRaises(NotImplementedError, client.prepare_token_request, token_url) + def test_prepare_refresh_token_request(self): client = Client(self.client_id) diff --git a/tests/oauth2/rfc6749/clients/test_service_application.py b/tests/oauth2/rfc6749/clients/test_service_application.py index 2dc633a..dc337cf 100644 --- a/tests/oauth2/rfc6749/clients/test_service_application.py +++ b/tests/oauth2/rfc6749/clients/test_service_application.py @@ -89,8 +89,8 @@ mfvGGg3xNjTMO7IdrwIDAQAB audience=self.audience, body=self.body) r = Request('https://a.b', body=body) - self.assertEqual(r.isnot, 'empty') - self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) @@ -98,6 +98,72 @@ mfvGGg3xNjTMO7IdrwIDAQAB # audience verification is handled during decode now self.assertEqual(claim['sub'], self.subject) self.assertEqual(claim['iat'], int(t.return_value)) + self.assertNotIn('nbf', claim) + self.assertNotIn('jti', claim) + + # Missing issuer parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=None, subject=self.subject, audience=self.audience, body=self.body) + + # Missing subject parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=None, audience=self.audience, body=self.body) + + # Missing audience parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=None, body=self.body) + + # Optional kwargs + not_before = time() - 3600 + jwt_id = '8zd15df4s35f43sd' + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + not_before=not_before, + jwt_id=jwt_id) + + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + self.assertEqual(claim['nbf'], not_before) + self.assertEqual(claim['jti'], jwt_id) + + @patch('time.time') + def test_request_body_no_initial_private_key(self, t): + t.return_value = time() + self.token['expires_at'] = self.token['expires_in'] + t.return_value + + client = ServiceApplicationClient( + self.client_id, private_key=None) + + # Basic with private key provided + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + private_key=self.private_key) + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + + # No private key provided + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body) @patch('time.time') def test_parse_token_response(self, t): -- cgit v1.2.1 From 27702f40753f88fc5bbf15128dac15758d4bc29a Mon Sep 17 00:00:00 2001 From: Mattia Procopio Date: Sat, 26 May 2018 21:33:41 +0200 Subject: Check that the Bearer header is properly formatted (#491) --- tests/oauth2/rfc6749/test_tokens.py | 81 +++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index 570afb0..ecac03e 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals import mock +from oauthlib.common import Request from oauthlib.oauth2.rfc6749.tokens import * from ...unittest import TestCase @@ -61,9 +62,22 @@ class TokenTest(TestCase): bearer_headers = { 'Authorization': 'Bearer vF9dft4qmT' } + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9d ft4qmT'}, + ] + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} bearer_body = 'access_token=vF9dft4qmT' bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT' + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True + def test_prepare_mac_header(self): """Verify mac signatures correctness @@ -83,8 +97,57 @@ class TokenTest(TestCase): self.assertEqual(prepare_bearer_body(self.token), self.bearer_body) self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri) + def test_fake_bearer_is_not_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request('/', headers=fake_header) + result = BearerToken(request_validator=request_validator).validate_request(request) + + self.assertFalse(result) + + def test_header_with_multispaces_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + request = Request('/', headers=self.valid_header_with_multiple_spaces) + result = BearerToken(request_validator=request_validator).validate_request(request) + + self.assertTrue(result) + + def test_estimate_type_with_fake_header_returns_type_0(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request('/', headers=fake_header) + result = BearerToken(request_validator=request_validator).estimate_type(request) + + if fake_header['Authorization'].count(' ') == 2 and \ + fake_header['Authorization'].split()[0] == 'Bearer': + # If we're dealing with the header containing 2 spaces, it will be recognized + # as a Bearer valid header, the token itself will be invalid by the way. + self.assertEqual(result, 9) + else: + self.assertEqual(result, 0) + class JWTTokenTestCase(TestCase): + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9df t4qmT'}, + ] + + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} + + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True def test_create_token_callable_expires_in(self): """ @@ -180,6 +243,24 @@ class JWTTokenTestCase(TestCase): request.scopes, request) + def test_fake_bearer_is_not_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request('/', headers=fake_header) + result = JWTToken(request_validator=request_validator).validate_request(request) + + self.assertFalse(result) + + def test_header_with_multiple_spaces_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token + request = Request('/', headers=self.valid_header_with_multiple_spaces) + result = JWTToken(request_validator=request_validator).validate_request(request) + + self.assertTrue(result) + def test_estimate_type(self): """ Estimate type results for a jwt token -- cgit v1.2.1 From d5a4d5ea0eab04ddddefac7d1e7a4902fc469286 Mon Sep 17 00:00:00 2001 From: Wiliam Souza Date: Tue, 5 Jun 2018 11:33:21 -0300 Subject: OpenID Connect split (#525) * Add command to clean up builds to makefile * Fix docs strings for endpoints pre_configured * Chnage grant_types.openid_connect to include a deprecation warning be a backward compatible * Fix doc string for rfc6749.request_validator * Remove unused import * Change import to be explicity * Move JWTTokenTestCase to openid.connect.core.test_token * Move JWTToken to oauthlib.openid.connect.core.tokens * Move to openid connect test * Move openid connect exceptions to its own file * Remove openid connect from oauth2 server * Remove JWTToken from oauth tokens * Remove grant_types.openid_connect file * Add oauthlib/openid estructure and tests --- .../rfc6749/endpoints/test_claims_handling.py | 105 ------ .../test_openid_connect_params_handling.py | 85 ----- .../rfc6749/grant_types/test_openid_connect.py | 403 --------------------- tests/oauth2/rfc6749/test_server.py | 99 +++-- tests/oauth2/rfc6749/test_tokens.py | 203 +---------- tests/openid/__init__.py | 0 tests/openid/connect/__init__.py | 0 tests/openid/connect/core/__init__.py | 0 .../connect/core/endpoints/test_claims_handling.py | 109 ++++++ .../test_openid_connect_params_handling.py | 85 +++++ .../core/grant_types/test_authorization_code.py | 153 ++++++++ .../connect/core/grant_types/test_dispatchers.py | 125 +++++++ .../openid/connect/core/grant_types/test_hybrid.py | 13 + .../connect/core/grant_types/test_implicit.py | 148 ++++++++ .../openid/connect/core/test_request_validator.py | 52 +++ tests/openid/connect/core/test_server.py | 178 +++++++++ tests/openid/connect/core/test_tokens.py | 133 +++++++ 17 files changed, 1050 insertions(+), 841 deletions(-) delete mode 100644 tests/oauth2/rfc6749/endpoints/test_claims_handling.py delete mode 100644 tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py delete mode 100644 tests/oauth2/rfc6749/grant_types/test_openid_connect.py create mode 100644 tests/openid/__init__.py create mode 100644 tests/openid/connect/__init__.py create mode 100644 tests/openid/connect/core/__init__.py create mode 100644 tests/openid/connect/core/endpoints/test_claims_handling.py create mode 100644 tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py create mode 100644 tests/openid/connect/core/grant_types/test_authorization_code.py create mode 100644 tests/openid/connect/core/grant_types/test_dispatchers.py create mode 100644 tests/openid/connect/core/grant_types/test_hybrid.py create mode 100644 tests/openid/connect/core/grant_types/test_implicit.py create mode 100644 tests/openid/connect/core/test_request_validator.py create mode 100644 tests/openid/connect/core/test_server.py create mode 100644 tests/openid/connect/core/test_tokens.py (limited to 'tests') diff --git a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py b/tests/oauth2/rfc6749/endpoints/test_claims_handling.py deleted file mode 100644 index ff72673..0000000 --- a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Ensure OpenID Connect Authorization Request 'claims' are preserved across authorization. - -The claims parameter is an optional query param for the Authorization Request endpoint - but if it is provided and is valid it needs to be deserialized (from urlencoded JSON) - and persisted with the authorization code itself, then in the subsequent Access Token - request the claims should be transferred (via the oauthlib request) to be persisted - with the Access Token when it is created. -""" -from __future__ import absolute_import, unicode_literals - -import mock - -from oauthlib.oauth2 import InvalidRequestError, RequestValidator, Server - -from ....unittest import TestCase -from .test_utils import get_fragment_credentials, get_query_credentials - - -class TestClaimsHandling(TestCase): - - DEFAULT_REDIRECT_URI = 'http://i.b./path' - - def set_scopes(self, scopes): - def set_request_scopes(client_id, code, client, request): - request.scopes = scopes - return True - return set_request_scopes - - def set_user(self, request): - request.user = 'foo' - request.client_id = 'bar' - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def save_claims_with_code(self, client_id, code, request, *args, **kwargs): - # a real validator would save the claims with the code during save_authorization_code() - self.claims_from_auth_code_request = request.claims - self.scopes = request.scopes.split() - - def retrieve_claims_saved_with_code(self, client_id, code, client, request, *args, **kwargs): - request.claims = self.claims_from_auth_code_request - request.scopes = self.scopes - - return True - - def save_claims_with_bearer_token(self, token, request, *args, **kwargs): - # a real validator would save the claims with the access token during save_bearer_token() - self.claims_saved_with_bearer_token = request.claims - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = TestClaimsHandling.DEFAULT_REDIRECT_URI - self.validator.authenticate_client.side_effect = self.set_client - - self.validator.save_authorization_code.side_effect = self.save_claims_with_code - self.validator.validate_code.side_effect = self.retrieve_claims_saved_with_code - self.validator.save_token.side_effect = self.save_claims_with_bearer_token - - self.server = Server(self.validator) - - def test_claims_stored_on_code_creation(self): - - claims = { - "id_token": { - "claim_1": None, - "claim_2": { - "essential": True - } - }, - "userinfo": { - "claim_3": { - "essential": True - }, - "claim_4": None - } - } - - claims_urlquoted='%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' - uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s' - - h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope') - - self.assertDictEqual(self.claims_from_auth_code_request, claims) - - code = get_query_credentials(h['Location'])['code'][0] - token_uri = 'http://example.com/path' - _, body, _ = self.server.create_token_response(token_uri, - body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) - - self.assertDictEqual(self.claims_saved_with_bearer_token, claims) - - def test_invalid_claims(self): - uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=this-is-not-json' - - h, b, s = self.server.create_authorization_response(uri, scopes='openid test_scope') - error = get_query_credentials(h['Location'])['error'][0] - error_desc = get_query_credentials(h['Location'])['error_description'][0] - self.assertEqual(error, 'invalid_request') - self.assertEqual(error_desc, "Malformed claims parameter") diff --git a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py b/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py deleted file mode 100644 index 89431b6..0000000 --- a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py +++ /dev/null @@ -1,85 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -import mock - -from oauthlib.oauth2 import InvalidRequestError -from oauthlib.oauth2.rfc6749.endpoints.authorization import \ - AuthorizationEndpoint -from oauthlib.oauth2.rfc6749.grant_types import OpenIDConnectAuthCode -from oauthlib.oauth2.rfc6749.tokens import BearerToken - -from ....unittest import TestCase - -try: - from urllib.parse import urlencode -except ImportError: - from urllib import urlencode - - - - -class OpenIDConnectEndpointTest(TestCase): - - def setUp(self): - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - grant = OpenIDConnectAuthCode(request_validator=self.mock_validator) - bearer = BearerToken(self.mock_validator) - self.endpoint = AuthorizationEndpoint(grant, bearer, - response_types={'code': grant}) - params = { - 'prompt': 'consent', - 'display': 'touch', - 'nonce': 'abcd', - 'state': 'abc', - 'redirect_uri': 'https://a.b/cb', - 'response_type': 'code', - 'client_id': 'abcdef', - 'scope': 'hello openid' - } - self.url = 'http://a.b/path?' + urlencode(params) - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - @mock.patch('oauthlib.common.generate_token') - def test_authorization_endpoint_handles_prompt(self, generate_token): - generate_token.return_value = "MOCK_CODE" - # In the GET view: - scopes, creds = self.endpoint.validate_authorization_request(self.url) - # In the POST view: - creds['scopes'] = scopes - h, b, s = self.endpoint.create_authorization_response(self.url, - credentials=creds) - expected = 'https://a.b/cb?state=abc&code=MOCK_CODE' - self.assertURLEqual(h['Location'], expected) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - def test_prompt_none_exclusiveness(self): - """ - Test that prompt=none can't be used with another prompt value. - """ - params = { - 'prompt': 'none consent', - 'state': 'abc', - 'redirect_uri': 'https://a.b/cb', - 'response_type': 'code', - 'client_id': 'abcdef', - 'scope': 'hello openid' - } - url = 'http://a.b/path?' + urlencode(params) - with self.assertRaises(InvalidRequestError): - self.endpoint.validate_authorization_request(url) - - def test_oidc_params_preservation(self): - """ - Test that the nonce parameter is passed through. - """ - scopes, creds = self.endpoint.validate_authorization_request(self.url) - - self.assertEqual(creds['prompt'], {'consent'}) - self.assertEqual(creds['nonce'], 'abcd') - self.assertEqual(creds['display'], 'touch') diff --git a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py deleted file mode 100644 index 573d491..0000000 --- a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py +++ /dev/null @@ -1,403 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals - -import json - -import mock - -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.grant_types import (AuthTokenGrantDispatcher, - AuthorizationCodeGrant, - ImplicitGrant, - ImplicitTokenGrantDispatcher, - OIDCNoPrompt, - OpenIDConnectAuthCode, - OpenIDConnectHybrid, - OpenIDConnectImplicit) -from oauthlib.oauth2.rfc6749.tokens import BearerToken - -from ....unittest import TestCase -from .test_authorization_code import AuthorizationCodeGrantTest -from .test_implicit import ImplicitGrantTest - - -class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDAuthCodeInterferenceTest, self).setUp() - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - - -class OpenIDImplicitInterferenceTest(ImplicitGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDImplicitInterferenceTest, self).setUp() - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - -class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDHybridInterferenceTest, self).setUp() - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - - -def get_id_token_mock(token, token_handler, request): - return "MOCKED_TOKEN" - - -class OpenIDAuthCodeTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.code = '1234' - self.request.response_type = 'code' - self.request.grant_type = 'authorization_code' - self.request.redirect_uri = 'https://a.b/cb' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - - self.url_query = 'https://a.b/cb?code=abc&state=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc' - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'query' - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - def set_scopes(self, client_id, code, client, request): - request.scopes = self.request.scopes - request.state = self.request.state - request.user = 'bob' - return True - - def test_create_token_response(self): - self.request.response_type = None - self.mock_validator.validate_code.side_effect = self.set_scopes - - bearer = BearerToken(self.mock_validator) - - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertIn('id_token', token) - self.assertIn('openid', token['scope']) - - self.mock_validator.reset_mock() - - self.request.scopes = ('hello', 'world') - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertNotIn('id_token', token) - self.assertNotIn('openid', token['scope']) - - -class OpenIDImplicitTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.response_type = 'id_token token' - self.request.redirect_uri = 'https://a.b/cb' - self.request.nonce = 'zxc' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_type = 'id_token' - token = 'MOCKED_TOKEN' - url = 'https://a.b/cb#state=abc&id_token=%s' % token - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], url, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.nonce = None - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - -class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeTokenTest, self).setUp() - self.request.response_type = 'code token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - - -class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTest, self).setUp() - self.request.response_type = 'code id_token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token - - -class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() - self.request.response_type = 'code id_token token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - - -class ImplicitTokenGrantDispatcherTest(TestCase): - def setUp(self): - self.request = Request('http://a.b/path') - request_validator = mock.MagicMock() - implicit_grant = ImplicitGrant(request_validator) - openid_connect_implicit = OpenIDConnectImplicit(request_validator) - - self.dispatcher = ImplicitTokenGrantDispatcher( - default_implicit_grant=implicit_grant, - oidc_implicit_grant=openid_connect_implicit - ) - - def test_create_authorization_response_openid(self): - self.request.scopes = ('hello', 'openid') - self.request.response_type = 'id_token' - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, OpenIDConnectImplicit)) - - def test_validate_authorization_request_openid(self): - self.request.scopes = ('hello', 'openid') - self.request.response_type = 'id_token' - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, OpenIDConnectImplicit)) - - def test_create_authorization_response_oauth(self): - self.request.scopes = ('hello', 'world') - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, ImplicitGrant)) - - def test_validate_authorization_request_oauth(self): - self.request.scopes = ('hello', 'world') - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, ImplicitGrant)) - - -class DispatcherTest(TestCase): - def setUp(self): - self.request = Request('http://a.b/path') - self.request.decoded_body = ( - ("client_id", "me"), - ("code", "code"), - ("redirect_url", "https://a.b/cb"), - ) - - self.request_validator = mock.MagicMock() - self.auth_grant = AuthorizationCodeGrant(self.request_validator) - self.openid_connect_auth = OpenIDConnectAuthCode(self.request_validator) - - -class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): - - def setUp(self): - super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() - self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') - self.dispatcher = AuthTokenGrantDispatcher( - self.request_validator, - default_token_grant=self.auth_grant, - oidc_token_grant=self.openid_connect_auth - ) - - def test_create_token_response_openid(self): - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, OpenIDConnectAuthCode)) - self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) - - -class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): - - def setUp(self): - super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() - self.request.decoded_body = ( - ("client_id", "me"), - ("code", ""), - ("redirect_url", "https://a.b/cb"), - ) - self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') - self.dispatcher = AuthTokenGrantDispatcher( - self.request_validator, - default_token_grant=self.auth_grant, - oidc_token_grant=self.openid_connect_auth - ) - - def test_create_token_response_openid_without_code(self): - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) - self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) - - -class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): - - def setUp(self): - super(AuthTokenGrantDispatcherOAuthTest, self).setUp() - self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') - self.dispatcher = AuthTokenGrantDispatcher( - self.request_validator, - default_token_grant=self.auth_grant, - oidc_token_grant=self.openid_connect_auth - ) - - def test_create_token_response_oauth(self): - handler = self.dispatcher._handler_for_request(self.request) - self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) - self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/oauth2/rfc6749/test_server.py b/tests/oauth2/rfc6749/test_server.py index da303ce..bc7a2b7 100644 --- a/tests/oauth2/rfc6749/test_server.py +++ b/tests/oauth2/rfc6749/test_server.py @@ -3,21 +3,17 @@ from __future__ import absolute_import, unicode_literals import json -import jwt import mock from oauthlib import common from oauthlib.oauth2.rfc6749 import errors, tokens from oauthlib.oauth2.rfc6749.endpoints import Server -from oauthlib.oauth2.rfc6749.endpoints.authorization import \ - AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint from oauthlib.oauth2.rfc6749.grant_types import (AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant, - OpenIDConnectAuthCode, - OpenIDConnectImplicit, ResourceOwnerPasswordCredentialsGrant) from ...unittest import TestCase @@ -29,40 +25,34 @@ class AuthorizationEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) auth_code.save_authorization_code = mock.MagicMock() implicit = ImplicitGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) implicit.save_token = mock.MagicMock() - openid_connect_auth = OpenIDConnectAuthCode(self.mock_validator) - openid_connect_implicit = OpenIDConnectImplicit(self.mock_validator) - response_types = { - 'code': auth_code, - 'token': implicit, - - 'id_token': openid_connect_implicit, - 'id_token token': openid_connect_implicit, - 'code token': openid_connect_auth, - 'code id_token': openid_connect_auth, - 'code token id_token': openid_connect_auth, - 'none': auth_code + 'code': auth_code, + 'token': implicit, + 'none': auth_code } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) self.endpoint = AuthorizationEndpoint( - default_response_type='code', - default_token_type=token, - response_types=response_types) + default_response_type='code', + default_token_type=token, + response_types=response_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') @@ -71,7 +61,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) @@ -79,7 +69,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) self.assertEqual(body, None) @@ -99,9 +89,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.InvalidRequestError()) + side_effect=errors.InvalidRequestError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') @@ -109,9 +99,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.UnsupportedResponseTypeError()) + side_effect=errors.UnsupportedResponseTypeError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') @@ -129,27 +119,32 @@ class TokenEndpointTest(TestCase): self.mock_validator.authenticate_client.side_effect = set_user self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) password = ResourceOwnerPasswordCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) client = ClientCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) supported_types = { - 'authorization_code': auth_code, - 'password': password, - 'client_credentials': client, + 'authorization_code': auth_code, + 'password': password, + 'client_credentials': client, } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) - self.endpoint = TokenEndpoint('authorization_code', - default_token_type=token, grant_types=supported_types) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -176,7 +171,7 @@ class TokenEndpointTest(TestCase): def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -190,7 +185,7 @@ class TokenEndpointTest(TestCase): def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -281,7 +276,7 @@ twIDAQAB def test_authorization_grant(self): body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -295,7 +290,7 @@ twIDAQAB body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -310,7 +305,7 @@ twIDAQAB def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -325,7 +320,7 @@ twIDAQAB def test_scopes_and_user_id_stored_in_access_token(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) access_token = json.loads(body)['access_token'] @@ -338,7 +333,7 @@ twIDAQAB def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -366,8 +361,10 @@ class ResourceEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) token = tokens.BearerToken(request_validator=self.mock_validator) - self.endpoint = ResourceEndpoint(default_token='Bearer', - token_types={'Bearer': token}) + self.endpoint = ResourceEndpoint( + default_token='Bearer', + token_types={'Bearer': token} + ) def test_defaults(self): uri = 'http://a.b/path?some=query' diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index ecac03e..061754f 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -1,9 +1,11 @@ from __future__ import absolute_import, unicode_literals -import mock - -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.tokens import * +from oauthlib.oauth2.rfc6749.tokens import ( + prepare_mac_header, + prepare_bearer_headers, + prepare_bearer_body, + prepare_bearer_uri, +) from ...unittest import TestCase @@ -96,196 +98,3 @@ class TokenTest(TestCase): self.assertEqual(prepare_bearer_headers(self.token), self.bearer_headers) self.assertEqual(prepare_bearer_body(self.token), self.bearer_body) self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri) - - def test_fake_bearer_is_not_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_bearer_token = self._mocked_validate_bearer_token - - for fake_header in self.fake_bearer_headers: - request = Request('/', headers=fake_header) - result = BearerToken(request_validator=request_validator).validate_request(request) - - self.assertFalse(result) - - def test_header_with_multispaces_is_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_bearer_token = self._mocked_validate_bearer_token - - request = Request('/', headers=self.valid_header_with_multiple_spaces) - result = BearerToken(request_validator=request_validator).validate_request(request) - - self.assertTrue(result) - - def test_estimate_type_with_fake_header_returns_type_0(self): - request_validator = mock.MagicMock() - request_validator.validate_bearer_token = self._mocked_validate_bearer_token - - for fake_header in self.fake_bearer_headers: - request = Request('/', headers=fake_header) - result = BearerToken(request_validator=request_validator).estimate_type(request) - - if fake_header['Authorization'].count(' ') == 2 and \ - fake_header['Authorization'].split()[0] == 'Bearer': - # If we're dealing with the header containing 2 spaces, it will be recognized - # as a Bearer valid header, the token itself will be invalid by the way. - self.assertEqual(result, 9) - else: - self.assertEqual(result, 0) - - -class JWTTokenTestCase(TestCase): - fake_bearer_headers = [ - {'Authorization': 'Beaver vF9dft4qmT'}, - {'Authorization': 'BeavervF9dft4qmT'}, - {'Authorization': 'Beaver vF9dft4qmT'}, - {'Authorization': 'BearerF9dft4qmT'}, - {'Authorization': 'Bearer vF9df t4qmT'}, - ] - - valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} - - def _mocked_validate_bearer_token(self, token, scopes, request): - if not token: - return False - return True - - def test_create_token_callable_expires_in(self): - """ - Test retrieval of the expires in value by calling the callable expires_in property - """ - - expires_in_mock = mock.MagicMock() - request_mock = mock.MagicMock() - - token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) - token.create_token(request=request_mock) - - expires_in_mock.assert_called_once_with(request_mock) - - def test_create_token_non_callable_expires_in(self): - """ - When a non callable expires in is set this should just be set to the request - """ - - expires_in_mock = mock.NonCallableMagicMock() - request_mock = mock.MagicMock() - - token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) - token.create_token(request=request_mock) - - self.assertFalse(expires_in_mock.called) - self.assertEqual(request_mock.expires_in, expires_in_mock) - - def test_create_token_calls_get_id_token(self): - """ - When create_token is called the call should be forwarded to the get_id_token on the token validator - """ - request_mock = mock.MagicMock() - - with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', - autospec=True) as RequestValidatorMock: - - request_validator = RequestValidatorMock() - - token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) - token.create_token(request=request_mock) - - request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) - - def test_validate_request_token_from_headers(self): - """ - Bearer token get retrieved from headers. - """ - - with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ - mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', - autospec=True) as RequestValidatorMock: - request_validator_mock = RequestValidatorMock() - - token = JWTToken(request_validator=request_validator_mock) - - request = RequestMock('/uri') - # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch - # with autospec=True - request.scopes = mock.MagicMock() - request.headers = { - 'Authorization': 'Bearer some-token-from-header' - } - - token.validate_request(request=request) - - request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', - request.scopes, - request) - - def test_validate_token_from_request(self): - """ - Token get retrieved from request object. - """ - - with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ - mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', - autospec=True) as RequestValidatorMock: - request_validator_mock = RequestValidatorMock() - - token = JWTToken(request_validator=request_validator_mock) - - request = RequestMock('/uri') - # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch - # with autospec=True - request.scopes = mock.MagicMock() - request.access_token = 'some-token-from-request-object' - request.headers = {} - - token.validate_request(request=request) - - request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', - request.scopes, - request) - - def test_fake_bearer_is_not_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token - - for fake_header in self.fake_bearer_headers: - request = Request('/', headers=fake_header) - result = JWTToken(request_validator=request_validator).validate_request(request) - - self.assertFalse(result) - - def test_header_with_multiple_spaces_is_validated(self): - request_validator = mock.MagicMock() - request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token - request = Request('/', headers=self.valid_header_with_multiple_spaces) - result = JWTToken(request_validator=request_validator).validate_request(request) - - self.assertTrue(result) - - def test_estimate_type(self): - """ - Estimate type results for a jwt token - """ - - def test_token(token, expected_result): - with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: - jwt_token = JWTToken() - - request = RequestMock('/uri') - # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch - # with autospec=True - request.headers = { - 'Authorization': 'Bearer {}'.format(token) - } - - result = jwt_token.estimate_type(request=request) - - self.assertEqual(result, expected_result) - - test_items = ( - ('eyfoo.foo.foo', 10), - ('eyfoo.foo.foo.foo.foo', 10), - ('eyfoobar', 0) - ) - - for token, expected_result in test_items: - test_token(token, expected_result) diff --git a/tests/openid/__init__.py b/tests/openid/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/openid/connect/__init__.py b/tests/openid/connect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/openid/connect/core/__init__.py b/tests/openid/connect/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/openid/connect/core/endpoints/test_claims_handling.py b/tests/openid/connect/core/endpoints/test_claims_handling.py new file mode 100644 index 0000000..37a7cdd --- /dev/null +++ b/tests/openid/connect/core/endpoints/test_claims_handling.py @@ -0,0 +1,109 @@ +"""Ensure OpenID Connect Authorization Request 'claims' are preserved across authorization. + +The claims parameter is an optional query param for the Authorization Request endpoint + but if it is provided and is valid it needs to be deserialized (from urlencoded JSON) + and persisted with the authorization code itself, then in the subsequent Access Token + request the claims should be transferred (via the oauthlib request) to be persisted + with the Access Token when it is created. +""" +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.oauth2 import RequestValidator + +from oauthlib.oauth2.rfc6749.endpoints.pre_configured import Server + +from ....unittest import TestCase +from .test_utils import get_query_credentials + + +class TestClaimsHandling(TestCase): + + DEFAULT_REDIRECT_URI = 'http://i.b./path' + + def set_scopes(self, scopes): + def set_request_scopes(client_id, code, client, request): + request.scopes = scopes + return True + return set_request_scopes + + def set_user(self, request): + request.user = 'foo' + request.client_id = 'bar' + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def save_claims_with_code(self, client_id, code, request, *args, **kwargs): + # a real validator would save the claims with the code during save_authorization_code() + self.claims_from_auth_code_request = request.claims + self.scopes = request.scopes.split() + + def retrieve_claims_saved_with_code(self, client_id, code, client, request, *args, **kwargs): + request.claims = self.claims_from_auth_code_request + request.scopes = self.scopes + + return True + + def save_claims_with_bearer_token(self, token, request, *args, **kwargs): + # a real validator would save the claims with the access token during save_bearer_token() + self.claims_saved_with_bearer_token = request.claims + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = TestClaimsHandling.DEFAULT_REDIRECT_URI + self.validator.authenticate_client.side_effect = self.set_client + + self.validator.save_authorization_code.side_effect = self.save_claims_with_code + self.validator.validate_code.side_effect = self.retrieve_claims_saved_with_code + self.validator.save_token.side_effect = self.save_claims_with_bearer_token + + self.server = Server(self.validator) + + def test_claims_stored_on_code_creation(self): + + claims = { + "id_token": { + "claim_1": None, + "claim_2": { + "essential": True + } + }, + "userinfo": { + "claim_3": { + "essential": True + }, + "claim_4": None + } + } + + claims_urlquoted = '%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' + uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s' + + h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope') + + self.assertDictEqual(self.claims_from_auth_code_request, claims) + + code = get_query_credentials(h['Location'])['code'][0] + token_uri = 'http://example.com/path' + _, body, _ = self.server.create_token_response( + token_uri, + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code + ) + + self.assertDictEqual(self.claims_saved_with_bearer_token, claims) + + def test_invalid_claims(self): + uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=this-is-not-json' + + h, b, s = self.server.create_authorization_response(uri, scopes='openid test_scope') + error = get_query_credentials(h['Location'])['error'][0] + error_desc = get_query_credentials(h['Location'])['error_description'][0] + self.assertEqual(error, 'invalid_request') + self.assertEqual(error_desc, "Malformed claims parameter") diff --git a/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py new file mode 100644 index 0000000..89431b6 --- /dev/null +++ b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py @@ -0,0 +1,85 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.oauth2 import InvalidRequestError +from oauthlib.oauth2.rfc6749.endpoints.authorization import \ + AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.grant_types import OpenIDConnectAuthCode +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from ....unittest import TestCase + +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode + + + + +class OpenIDConnectEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + grant = OpenIDConnectAuthCode(request_validator=self.mock_validator) + bearer = BearerToken(self.mock_validator) + self.endpoint = AuthorizationEndpoint(grant, bearer, + response_types={'code': grant}) + params = { + 'prompt': 'consent', + 'display': 'touch', + 'nonce': 'abcd', + 'state': 'abc', + 'redirect_uri': 'https://a.b/cb', + 'response_type': 'code', + 'client_id': 'abcdef', + 'scope': 'hello openid' + } + self.url = 'http://a.b/path?' + urlencode(params) + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + @mock.patch('oauthlib.common.generate_token') + def test_authorization_endpoint_handles_prompt(self, generate_token): + generate_token.return_value = "MOCK_CODE" + # In the GET view: + scopes, creds = self.endpoint.validate_authorization_request(self.url) + # In the POST view: + creds['scopes'] = scopes + h, b, s = self.endpoint.create_authorization_response(self.url, + credentials=creds) + expected = 'https://a.b/cb?state=abc&code=MOCK_CODE' + self.assertURLEqual(h['Location'], expected) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + def test_prompt_none_exclusiveness(self): + """ + Test that prompt=none can't be used with another prompt value. + """ + params = { + 'prompt': 'none consent', + 'state': 'abc', + 'redirect_uri': 'https://a.b/cb', + 'response_type': 'code', + 'client_id': 'abcdef', + 'scope': 'hello openid' + } + url = 'http://a.b/path?' + urlencode(params) + with self.assertRaises(InvalidRequestError): + self.endpoint.validate_authorization_request(url) + + def test_oidc_params_preservation(self): + """ + Test that the nonce parameter is passed through. + """ + scopes, creds = self.endpoint.validate_authorization_request(self.url) + + self.assertEqual(creds['prompt'], {'consent'}) + self.assertEqual(creds['nonce'], 'abcd') + self.assertEqual(creds['display'], 'touch') diff --git a/tests/openid/connect/core/grant_types/test_authorization_code.py b/tests/openid/connect/core/grant_types/test_authorization_code.py new file mode 100644 index 0000000..1bad120 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_authorization_code.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +def get_id_token_mock(token, token_handler, request): + return "MOCKED_TOKEN" + + +class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDAuthCodeInterferenceTest, self).setUp() + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + +class OpenIDAuthCodeTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.code = '1234' + self.request.response_type = 'code' + self.request.grant_type = 'authorization_code' + self.request.redirect_uri = 'https://a.b/cb' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + self.url_query = 'https://a.b/cb?code=abc&state=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc' + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'query' + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + def set_scopes(self, client_id, code, client, request): + request.scopes = self.request.scopes + request.state = self.request.state + request.user = 'bob' + return True + + def test_create_token_response(self): + self.request.response_type = None + self.mock_validator.validate_code.side_effect = self.set_scopes + + bearer = BearerToken(self.mock_validator) + + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertIn('id_token', token) + self.assertIn('openid', token['scope']) + + self.mock_validator.reset_mock() + + self.request.scopes = ('hello', 'world') + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertNotIn('id_token', token) + self.assertNotIn('openid', token['scope']) diff --git a/tests/openid/connect/core/grant_types/test_dispatchers.py b/tests/openid/connect/core/grant_types/test_dispatchers.py new file mode 100644 index 0000000..f90ec46 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_dispatchers.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +import mock + +from oauthlib.common import Request + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.dispatchers import ( + ImplicitTokenGrantDispatcher, + AuthorizationTokenGrantDispatcher +) + +from oauthlib.oauth2.rfc6749.grant_types import ( + AuthorizationCodeGrant as OAuth2AuthorizationCodeGrant, + ImplicitGrant as OAuth2ImplicitGrant, +) + + +from ....unittest import TestCase + + +class ImplicitTokenGrantDispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + request_validator = mock.MagicMock() + implicit_grant = OAuth2ImplicitGrant(request_validator) + openid_connect_implicit = ImplicitGrant(request_validator) + + self.dispatcher = ImplicitTokenGrantDispatcher( + default_implicit_grant=implicit_grant, + oidc_implicit_grant=openid_connect_implicit + ) + + def test_create_authorization_response_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_create_authorization_response_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + +class DispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + self.request.decoded_body = ( + ("client_id", "me"), + ("code", "code"), + ("redirect_url", "https://a.b/cb"), + ) + + self.request_validator = mock.MagicMock() + self.auth_grant = OAuth2AuthorizationCodeGrant(self.request_validator) + self.openid_connect_auth = OAuth2AuthorizationCodeGrant(self.request_validator) + + +class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() + self.request.decoded_body = ( + ("client_id", "me"), + ("code", ""), + ("redirect_url", "https://a.b/cb"), + ) + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid_without_code(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOAuthTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_oauth(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/openid/connect/core/grant_types/test_hybrid.py b/tests/openid/connect/core/grant_types/test_hybrid.py new file mode 100644 index 0000000..531ae7f --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_hybrid.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDHybridInterferenceTest, self).setUp() + self.auth = HybridGrant(request_validator=self.mock_validator) diff --git a/tests/openid/connect/core/grant_types/test_implicit.py b/tests/openid/connect/core/grant_types/test_implicit.py new file mode 100644 index 0000000..56247d9 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_implicit.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.common import Request + +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from .test_authorization_code import get_id_token_mock, OpenIDAuthCodeTest + +from ....oauth2.rfc6749.grant_types.test_implicit import ImplicitGrantTest + + +class OpenIDImplicitInterferenceTest(ImplicitGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDImplicitInterferenceTest, self).setUp() + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + +class OpenIDImplicitTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.response_type = 'id_token token' + self.request.redirect_uri = 'https://a.b/cb' + self.request.nonce = 'zxc' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_type = 'id_token' + token = 'MOCKED_TOKEN' + url = 'https://a.b/cb#state=abc&id_token=%s' % token + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], url, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.nonce = None + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + +class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeTokenTest, self).setUp() + self.request.response_type = 'code token' + self.auth = HybridGrant(request_validator=self.mock_validator) + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + + +class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTest, self).setUp() + self.request.response_type = 'code id_token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token + + +class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() + self.request.response_type = 'code id_token token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token diff --git a/tests/openid/connect/core/test_request_validator.py b/tests/openid/connect/core/test_request_validator.py new file mode 100644 index 0000000..14a7c23 --- /dev/null +++ b/tests/openid/connect/core/test_request_validator.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from oauthlib.openid.connect.core.request_validator import RequestValidator + +from ....unittest import TestCase + + +class RequestValidatorTest(TestCase): + + def test_method_contracts(self): + v = RequestValidator() + self.assertRaises( + NotImplementedError, + v.get_authorization_code_scopes, + 'client_id', 'code', 'redirect_uri', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_jwt_bearer_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_id_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_jwt_bearer_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_id_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_authorization, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_login, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_user_match, + 'id_token_hint', 'scopes', 'claims', 'request' + ) diff --git a/tests/openid/connect/core/test_server.py b/tests/openid/connect/core/test_server.py new file mode 100644 index 0000000..83290db --- /dev/null +++ b/tests/openid/connect/core/test_server.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....unittest import TestCase + + +class AuthorizationEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant(request_validator=self.mock_validator) + auth_code.save_authorization_code = mock.MagicMock() + implicit = ImplicitGrant( + request_validator=self.mock_validator) + implicit.save_token = mock.MagicMock() + hybrid = HybridGrant(self.mock_validator) + + response_types = { + 'code': auth_code, + 'token': implicit, + 'id_token': implicit, + 'id_token token': implicit, + 'code token': hybrid, + 'code id_token': hybrid, + 'code token id_token': hybrid, + 'none': auth_code + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = AuthorizationEndpoint( + default_response_type='code', + default_token_type=token, + response_types=response_types + ) + + # TODO: Add hybrid grant test + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_implicit_grant(self): + uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) + + def test_none_grant(self): + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + # and without the state parameter + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + def test_missing_type(self): + uri = 'http://i.b/l?client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.InvalidRequestError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') + + def test_invalid_type(self): + uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.UnsupportedResponseTypeError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') + + +class TokenEndpointTest(TestCase): + + def setUp(self): + def set_user(request): + request.user = mock.MagicMock() + request.client = mock.MagicMock() + request.client.client_id = 'mocked_client_id' + return True + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = set_user + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant( + request_validator=self.mock_validator) + supported_types = { + 'authorization_code': auth_code, + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'scope': 'all of them', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + body = 'grant_type=authorization_code&code=abc&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + def test_missing_type(self): + _, body, _ = self.endpoint.create_token_response('', body='') + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + def test_invalid_type(self): + body = 'grant_type=invalid' + _, body, _ = self.endpoint.create_token_response('', body=body) + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) diff --git a/tests/openid/connect/core/test_tokens.py b/tests/openid/connect/core/test_tokens.py new file mode 100644 index 0000000..12c75f1 --- /dev/null +++ b/tests/openid/connect/core/test_tokens.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.openid.connect.core.tokens import JWTToken + +from ....unittest import TestCase + + +class JWTTokenTestCase(TestCase): + + def test_create_token_callable_expires_in(self): + """ + Test retrieval of the expires in value by calling the callable expires_in property + """ + + expires_in_mock = mock.MagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + expires_in_mock.assert_called_once_with(request_mock) + + def test_create_token_non_callable_expires_in(self): + """ + When a non callable expires in is set this should just be set to the request + """ + + expires_in_mock = mock.NonCallableMagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + self.assertFalse(expires_in_mock.called) + self.assertEqual(request_mock.expires_in, expires_in_mock) + + def test_create_token_calls_get_id_token(self): + """ + When create_token is called the call should be forwarded to the get_id_token on the token validator + """ + request_mock = mock.MagicMock() + + with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + + request_validator = RequestValidatorMock() + + token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) + token.create_token(request=request_mock) + + request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) + + def test_validate_request_token_from_headers(self): + """ + Bearer token get retrieved from headers. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.headers = { + 'Authorization': 'Bearer some-token-from-header' + } + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', + request.scopes, + request) + + def test_validate_token_from_request(self): + """ + Token get retrieved from request object. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.access_token = 'some-token-from-request-object' + request.headers = {} + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', + request.scopes, + request) + + def test_estimate_type(self): + """ + Estimate type results for a jwt token + """ + + def test_token(token, expected_result): + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: + jwt_token = JWTToken() + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.headers = { + 'Authorization': 'Bearer {}'.format(token) + } + + result = jwt_token.estimate_type(request=request) + + self.assertEqual(result, expected_result) + + test_items = ( + ('eyfoo.foo.foo', 10), + ('eyfoo.foo.foo.foo.foo', 10), + ('eyfoobar', 0) + ) + + for token, expected_result in test_items: + test_token(token, expected_result) -- cgit v1.2.1 From 3eaf962311dfbc566dbfa66a988e0331b91184be Mon Sep 17 00:00:00 2001 From: Seth Davis Date: Sat, 30 Jun 2018 18:09:26 -0400 Subject: Remove handling of nonstandard parameter "expires" (#506) --- tests/oauth2/rfc6749/test_parameters.py | 11 ----------- 1 file changed, 11 deletions(-) (limited to 'tests') diff --git a/tests/oauth2/rfc6749/test_parameters.py b/tests/oauth2/rfc6749/test_parameters.py index 2a9cbe8..6ba98c0 100644 --- a/tests/oauth2/rfc6749/test_parameters.py +++ b/tests/oauth2/rfc6749/test_parameters.py @@ -115,13 +115,6 @@ class ParameterTests(TestCase): ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' ' "example_parameter": "example_value" }') - json_expires = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' - ' "token_type": "example",' - ' "expires": 3600,' - ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' - ' "example_parameter": "example_value",' - ' "scope":"abc def"}') - json_dict = { 'access_token': '2YotnFZFEjr1zCsicMWpAA', 'token_type': 'example', @@ -264,7 +257,3 @@ class ParameterTests(TestCase): finally: signals.scope_changed.disconnect(record_scope_change) del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] - - def test_token_response_with_expires(self): - """Verify fallback for alternate spelling of expires_in. """ - self.assertEqual(parse_token_response(self.json_expires), self.json_dict) -- cgit v1.2.1