diff options
Diffstat (limited to 'tests/oauth2')
-rw-r--r-- | tests/oauth2/rfc6749/clients/test_base.py | 72 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/clients/test_service_application.py | 70 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/clients/test_web_application.py | 2 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/endpoints/test_claims_handling.py | 105 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py | 132 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py | 85 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/endpoints/test_scope_handling.py | 2 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/grant_types/test_openid_connect.py | 291 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/test_parameters.py | 11 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/test_server.py | 107 | ||||
-rw-r--r-- | tests/oauth2/rfc6749/test_tokens.py | 20 |
11 files changed, 342 insertions, 555 deletions
diff --git a/tests/oauth2/rfc6749/clients/test_base.py b/tests/oauth2/rfc6749/clients/test_base.py index c788bc1..d48a944 100644 --- a/tests/oauth2/rfc6749/clients/test_base.py +++ b/tests/oauth2/rfc6749/clients/test_base.py @@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals import datetime from oauthlib import common -from oauthlib.oauth2 import Client, InsecureTransportError +from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError from oauthlib.oauth2.rfc6749 import utils from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY @@ -51,10 +51,26 @@ class ClientTest(TestCase): self.assertFormBodyEqual(body, self.body) self.assertEqual(headers, self.bearer_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + client = Client(self.client_id, access_token=self.access_token, token_type="Bearer") + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers) + # Missing access token client = Client(self.client_id) self.assertRaises(ValueError, client.add_token, self.uri) + # Expired token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, headers=self.headers) + # The default token placement, bearer in auth header client = Client(self.client_id, access_token=self.access_token) uri, headers, body = client.add_token(self.uri, body=self.body, @@ -150,8 +166,26 @@ class ClientTest(TestCase): self.assertEqual(uri, self.uri) self.assertEqual(body, self.body) self.assertEqual(headers, self.mac_00_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) - # Add the Authorization header (draft 00) + # Add the Authorization header (draft 01) client = Client(self.client_id, token_type="MAC", access_token=self.access_token, mac_key=self.mac_key, mac_algorithm="hmac-sha-1") @@ -160,7 +194,24 @@ class ClientTest(TestCase): self.assertEqual(uri, self.uri) self.assertEqual(body, self.body) self.assertEqual(headers, self.mac_01_header) - + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + draft=1) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + draft=1) def test_revocation_request(self): client = Client(self.client_id) @@ -208,6 +259,21 @@ class ClientTest(TestCase): # NotImplementedError self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url) + def test_prepare_token_request(self): + redirect_url = 'https://example.com/callback/' + scopes = 'read' + token_url = 'https://example.com/token/' + state = 'fake_state' + + client = Client(self.client_id, scope=scopes, state=state) + + # Non-HTTPS + self.assertRaises(InsecureTransportError, + client.prepare_token_request, 'http://example.com/token/') + + # NotImplementedError + self.assertRaises(NotImplementedError, client.prepare_token_request, token_url) + def test_prepare_refresh_token_request(self): client = Client(self.client_id) diff --git a/tests/oauth2/rfc6749/clients/test_service_application.py b/tests/oauth2/rfc6749/clients/test_service_application.py index 2dc633a..dc337cf 100644 --- a/tests/oauth2/rfc6749/clients/test_service_application.py +++ b/tests/oauth2/rfc6749/clients/test_service_application.py @@ -89,8 +89,8 @@ mfvGGg3xNjTMO7IdrwIDAQAB audience=self.audience, body=self.body) r = Request('https://a.b', body=body) - self.assertEqual(r.isnot, 'empty') - self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) @@ -98,6 +98,72 @@ mfvGGg3xNjTMO7IdrwIDAQAB # audience verification is handled during decode now self.assertEqual(claim['sub'], self.subject) self.assertEqual(claim['iat'], int(t.return_value)) + self.assertNotIn('nbf', claim) + self.assertNotIn('jti', claim) + + # Missing issuer parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=None, subject=self.subject, audience=self.audience, body=self.body) + + # Missing subject parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=None, audience=self.audience, body=self.body) + + # Missing audience parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=None, body=self.body) + + # Optional kwargs + not_before = time() - 3600 + jwt_id = '8zd15df4s35f43sd' + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + not_before=not_before, + jwt_id=jwt_id) + + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + self.assertEqual(claim['nbf'], not_before) + self.assertEqual(claim['jti'], jwt_id) + + @patch('time.time') + def test_request_body_no_initial_private_key(self, t): + t.return_value = time() + self.token['expires_at'] = self.token['expires_in'] + t.return_value + + client = ServiceApplicationClient( + self.client_id, private_key=None) + + # Basic with private key provided + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + private_key=self.private_key) + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + + # No private key provided + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body) @patch('time.time') def test_parse_token_response(self, t): diff --git a/tests/oauth2/rfc6749/clients/test_web_application.py b/tests/oauth2/rfc6749/clients/test_web_application.py index fa6643e..4ecc3b3 100644 --- a/tests/oauth2/rfc6749/clients/test_web_application.py +++ b/tests/oauth2/rfc6749/clients/test_web_application.py @@ -38,7 +38,7 @@ class WebApplicationClientTest(TestCase): code = "zzzzaaaa" body = "not=empty" - body_code = "not=empty&grant_type=authorization_code&code=%s&client_id=%s" % (code, client_id) + body_code = "not=empty&grant_type=authorization_code&code=%s" % code body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" body_kwargs = body_code + "&some=providers&require=extra+arguments" diff --git a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py b/tests/oauth2/rfc6749/endpoints/test_claims_handling.py deleted file mode 100644 index 9795c80..0000000 --- a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Ensure OpenID Connect Authorization Request 'claims' are preserved across authorization. - -The claims parameter is an optional query param for the Authorization Request endpoint - but if it is provided and is valid it needs to be deserialized (from urlencoded JSON) - and persisted with the authorization code itself, then in the subsequent Access Token - request the claims should be transferred (via the oauthlib request) to be persisted - with the Access Token when it is created. -""" -from __future__ import absolute_import, unicode_literals - -import mock - -from oauthlib.oauth2 import InvalidRequestError, RequestValidator, Server - -from ....unittest import TestCase -from .test_utils import get_fragment_credentials, get_query_credentials - - -class TestClaimsHandling(TestCase): - - DEFAULT_REDIRECT_URI = 'http://i.b./path' - - def set_scopes(self, scopes): - def set_request_scopes(client_id, code, client, request): - request.scopes = scopes - return True - return set_request_scopes - - def set_user(self, request): - request.user = 'foo' - request.client_id = 'bar' - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def save_claims_with_code(self, client_id, code, request, *args, **kwargs): - # a real validator would save the claims with the code during save_authorization_code() - self.claims_from_auth_code_request = request.claims - self.scopes = request.scopes.split() - - def retrieve_claims_saved_with_code(self, client_id, code, client, request, *args, **kwargs): - request.claims = self.claims_from_auth_code_request - request.scopes = self.scopes - - return True - - def save_claims_with_bearer_token(self, token, request, *args, **kwargs): - # a real validator would save the claims with the access token during save_bearer_token() - self.claims_saved_with_bearer_token = request.claims - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = TestClaimsHandling.DEFAULT_REDIRECT_URI - self.validator.authenticate_client.side_effect = self.set_client - - self.validator.save_authorization_code.side_effect = self.save_claims_with_code - self.validator.validate_code.side_effect = self.retrieve_claims_saved_with_code - self.validator.save_token.side_effect = self.save_claims_with_bearer_token - - self.server = Server(self.validator) - - def test_claims_stored_on_code_creation(self): - - claims = { - "id_token": { - "claim_1": None, - "claim_2": { - "essential": True - } - }, - "userinfo": { - "claim_3": { - "essential": True - }, - "claim_4": None - } - } - - claims_urlquoted='%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' - uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s' - - h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope') - - self.assertDictEqual(self.claims_from_auth_code_request, claims) - - code = get_query_credentials(h['Location'])['code'][0] - token_uri = 'http://example.com/path' - _, body, _ = self.server.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) - - self.assertDictEqual(self.claims_saved_with_bearer_token, claims) - - def test_invalid_claims(self): - uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=this-is-not-json' - - h, b, s = self.server.create_authorization_response(uri, scopes='openid test_scope') - error = get_query_credentials(h['Location'])['error'][0] - error_desc = get_query_credentials(h['Location'])['error_description'][0] - self.assertEqual(error, 'invalid_request') - self.assertEqual(error_desc, "Malformed claims parameter") diff --git a/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py new file mode 100644 index 0000000..7ec8190 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from json import loads + +from mock import MagicMock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import RequestValidator, IntrospectEndpoint + +from ....unittest import TestCase + + +class IntrospectEndpointTest(TestCase): + + def setUp(self): + self.validator = MagicMock(wraps=RequestValidator()) + self.validator.client_authentication_required.return_value = True + self.validator.authenticate_client.return_value = True + self.validator.validate_bearer_token.return_value = True + self.validator.introspect_token.return_value = {} + self.endpoint = IntrospectEndpoint(self.validator) + + self.uri = 'should_not_matter' + self.headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + self.resp_h = { + 'Cache-Control': 'no-store', + 'Content-Type': 'application/json', + 'Pragma': 'no-cache' + } + self.resp_b = { + "active": True + } + + def test_introspect_token(self): + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_nohint(self): + # don't specify token_type_hint + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_false(self): + self.validator.introspect_token.return_value = None + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": False}) + self.assertEqual(s, 200) + + def test_introspect_token_claims(self): + self.validator.introspect_token.return_value = {"foo": "bar"} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_claims_spoof_active(self): + self.validator.introspect_token.return_value = {"foo": "bar", "active": False} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_client_authentication_failed(self): + self.validator.authenticate_client.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_introspect_token_public_client_authentication(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = True + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_public_client_authentication_failed(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + + def test_introspect_unsupported_token(self): + endpoint = IntrospectEndpoint(self.validator, + supported_token_types=['access_token']) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'unsupported_token_type') + self.assertEqual(s, 400) + + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body='') + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertEqual(s, 400) diff --git a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py b/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py deleted file mode 100644 index 89431b6..0000000 --- a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py +++ /dev/null @@ -1,85 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -import mock - -from oauthlib.oauth2 import InvalidRequestError -from oauthlib.oauth2.rfc6749.endpoints.authorization import \ - AuthorizationEndpoint -from oauthlib.oauth2.rfc6749.grant_types import OpenIDConnectAuthCode -from oauthlib.oauth2.rfc6749.tokens import BearerToken - -from ....unittest import TestCase - -try: - from urllib.parse import urlencode -except ImportError: - from urllib import urlencode - - - - -class OpenIDConnectEndpointTest(TestCase): - - def setUp(self): - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - grant = OpenIDConnectAuthCode(request_validator=self.mock_validator) - bearer = BearerToken(self.mock_validator) - self.endpoint = AuthorizationEndpoint(grant, bearer, - response_types={'code': grant}) - params = { - 'prompt': 'consent', - 'display': 'touch', - 'nonce': 'abcd', - 'state': 'abc', - 'redirect_uri': 'https://a.b/cb', - 'response_type': 'code', - 'client_id': 'abcdef', - 'scope': 'hello openid' - } - self.url = 'http://a.b/path?' + urlencode(params) - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - @mock.patch('oauthlib.common.generate_token') - def test_authorization_endpoint_handles_prompt(self, generate_token): - generate_token.return_value = "MOCK_CODE" - # In the GET view: - scopes, creds = self.endpoint.validate_authorization_request(self.url) - # In the POST view: - creds['scopes'] = scopes - h, b, s = self.endpoint.create_authorization_response(self.url, - credentials=creds) - expected = 'https://a.b/cb?state=abc&code=MOCK_CODE' - self.assertURLEqual(h['Location'], expected) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - def test_prompt_none_exclusiveness(self): - """ - Test that prompt=none can't be used with another prompt value. - """ - params = { - 'prompt': 'none consent', - 'state': 'abc', - 'redirect_uri': 'https://a.b/cb', - 'response_type': 'code', - 'client_id': 'abcdef', - 'scope': 'hello openid' - } - url = 'http://a.b/path?' + urlencode(params) - with self.assertRaises(InvalidRequestError): - self.endpoint.validate_authorization_request(url) - - def test_oidc_params_preservation(self): - """ - Test that the nonce parameter is passed through. - """ - scopes, creds = self.endpoint.validate_authorization_request(self.url) - - self.assertEqual(creds['prompt'], {'consent'}) - self.assertEqual(creds['nonce'], 'abcd') - self.assertEqual(creds['display'], 'touch') diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py index 87781b3..8490c03 100644 --- a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py +++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py @@ -87,7 +87,7 @@ class TestScopeHandling(TestCase): self.assertIn('Location', h) code = get_query_credentials(h['Location'])['code'][0] _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) self.assertEqual(json.loads(body)['scope'], decoded_scope) # implicit grant diff --git a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py deleted file mode 100644 index f10d36c..0000000 --- a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py +++ /dev/null @@ -1,291 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals - -import json - -import mock - -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.grant_types import (OIDCNoPrompt, - OpenIDConnectAuthCode, - OpenIDConnectHybrid, - OpenIDConnectImplicit) -from oauthlib.oauth2.rfc6749.tokens import BearerToken - -from ....unittest import TestCase -from .test_authorization_code import AuthorizationCodeGrantTest -from .test_implicit import ImplicitGrantTest - - -class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDAuthCodeInterferenceTest, self).setUp() - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - -class OpenIDImplicitInterferenceTest(ImplicitGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDImplicitInterferenceTest, self).setUp() - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - -class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDHybridInterferenceTest, self).setUp() - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - - -def get_id_token_mock(token, token_handler, request): - return "MOCKED_TOKEN" - - -class OpenIDAuthCodeTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.code = '1234' - self.request.response_type = 'code' - self.request.grant_type = 'authorization_code' - self.request.redirect_uri = 'https://a.b/cb' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - - self.url_query = 'https://a.b/cb?code=abc&state=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc' - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'query' - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - def set_scopes(self, client_id, code, client, request): - request.scopes = self.request.scopes - request.state = self.request.state - request.user = 'bob' - return True - - def test_create_token_response(self): - self.request.response_type = None - self.mock_validator.validate_code.side_effect = self.set_scopes - - bearer = BearerToken(self.mock_validator) - - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertIn('id_token', token) - self.assertIn('openid', token['scope']) - - self.mock_validator.reset_mock() - - self.request.scopes = ('hello', 'world') - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertNotIn('id_token', token) - self.assertNotIn('openid', token['scope']) - - -class OpenIDImplicitTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.response_type = 'id_token token' - self.request.redirect_uri = 'https://a.b/cb' - self.request.nonce = 'zxc' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_type = 'id_token' - token = 'MOCKED_TOKEN' - url = 'https://a.b/cb#state=abc&id_token=%s' % token - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], url, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.nonce = None - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - -class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeTokenTest, self).setUp() - self.request.response_type = 'code token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - -class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTest, self).setUp() - self.request.response_type = 'code id_token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token - -class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() - self.request.response_type = 'code id_token token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token diff --git a/tests/oauth2/rfc6749/test_parameters.py b/tests/oauth2/rfc6749/test_parameters.py index 1cac879..2a11d33 100644 --- a/tests/oauth2/rfc6749/test_parameters.py +++ b/tests/oauth2/rfc6749/test_parameters.py @@ -116,13 +116,6 @@ class ParameterTests(TestCase): ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' ' "example_parameter": "example_value" }') - json_expires = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' - ' "token_type": "example",' - ' "expires": 3600,' - ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' - ' "example_parameter": "example_value",' - ' "scope":"abc def"}') - json_dict = { 'access_token': '2YotnFZFEjr1zCsicMWpAA', 'token_type': 'example', @@ -268,7 +261,3 @@ class ParameterTests(TestCase): finally: signals.scope_changed.disconnect(record_scope_change) del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] - - def test_token_response_with_expires(self): - """Verify fallback for alternate spelling of expires_in. """ - self.assertEqual(parse_token_response(self.json_expires), self.json_dict) diff --git a/tests/oauth2/rfc6749/test_server.py b/tests/oauth2/rfc6749/test_server.py index 305b795..bc7a2b7 100644 --- a/tests/oauth2/rfc6749/test_server.py +++ b/tests/oauth2/rfc6749/test_server.py @@ -3,21 +3,17 @@ from __future__ import absolute_import, unicode_literals import json -import jwt import mock from oauthlib import common from oauthlib.oauth2.rfc6749 import errors, tokens from oauthlib.oauth2.rfc6749.endpoints import Server -from oauthlib.oauth2.rfc6749.endpoints.authorization import \ - AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint from oauthlib.oauth2.rfc6749.grant_types import (AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant, - OpenIDConnectAuthCode, - OpenIDConnectImplicit, ResourceOwnerPasswordCredentialsGrant) from ...unittest import TestCase @@ -29,40 +25,34 @@ class AuthorizationEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) auth_code.save_authorization_code = mock.MagicMock() implicit = ImplicitGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) implicit.save_token = mock.MagicMock() - openid_connect_auth = OpenIDConnectAuthCode(self.mock_validator) - openid_connect_implicit = OpenIDConnectImplicit(self.mock_validator) - response_types = { - 'code': auth_code, - 'token': implicit, - - 'id_token': openid_connect_implicit, - 'id_token token': openid_connect_implicit, - 'code token': openid_connect_auth, - 'code id_token': openid_connect_auth, - 'code token id_token': openid_connect_auth, - 'none': auth_code + 'code': auth_code, + 'token': implicit, + 'none': auth_code } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) self.endpoint = AuthorizationEndpoint( - default_response_type='code', - default_token_type=token, - response_types=response_types) + default_response_type='code', + default_token_type=token, + response_types=response_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') @@ -71,7 +61,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) @@ -79,7 +69,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) self.assertEqual(body, None) @@ -99,9 +89,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.InvalidRequestError()) + side_effect=errors.InvalidRequestError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') @@ -109,9 +99,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.UnsupportedResponseTypeError()) + side_effect=errors.UnsupportedResponseTypeError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') @@ -129,27 +119,32 @@ class TokenEndpointTest(TestCase): self.mock_validator.authenticate_client.side_effect = set_user self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) password = ResourceOwnerPasswordCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) client = ClientCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) supported_types = { - 'authorization_code': auth_code, - 'password': password, - 'client_credentials': client, + 'authorization_code': auth_code, + 'password': password, + 'client_credentials': client, } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) - self.endpoint = TokenEndpoint('authorization_code', - default_token_type=token, grant_types=supported_types) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -176,7 +171,7 @@ class TokenEndpointTest(TestCase): def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -190,7 +185,7 @@ class TokenEndpointTest(TestCase): def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -279,9 +274,9 @@ twIDAQAB @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): - body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -293,9 +288,9 @@ twIDAQAB } self.assertEqual(body, token) - body = 'grant_type=authorization_code&code=abc&state=xyz' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -310,7 +305,7 @@ twIDAQAB def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -325,7 +320,7 @@ twIDAQAB def test_scopes_and_user_id_stored_in_access_token(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) access_token = json.loads(body)['access_token'] @@ -338,7 +333,7 @@ twIDAQAB def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -349,12 +344,12 @@ twIDAQAB self.assertEqual(body, token) def test_missing_type(self): - _, body, _ = self.endpoint.create_token_response('', body='') + _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc') token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) def test_invalid_type(self): - body = 'grant_type=invalid' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc' _, body, _ = self.endpoint.create_token_response('', body=body) token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) @@ -366,8 +361,10 @@ class ResourceEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) token = tokens.BearerToken(request_validator=self.mock_validator) - self.endpoint = ResourceEndpoint(default_token='Bearer', - token_types={'Bearer': token}) + self.endpoint = ResourceEndpoint( + default_token='Bearer', + token_types={'Bearer': token} + ) def test_defaults(self): uri = 'http://a.b/path?some=query' diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index e2e558d..061754f 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -1,6 +1,11 @@ from __future__ import absolute_import, unicode_literals -from oauthlib.oauth2.rfc6749.tokens import * +from oauthlib.oauth2.rfc6749.tokens import ( + prepare_mac_header, + prepare_bearer_headers, + prepare_bearer_body, + prepare_bearer_uri, +) from ...unittest import TestCase @@ -59,9 +64,22 @@ class TokenTest(TestCase): bearer_headers = { 'Authorization': 'Bearer vF9dft4qmT' } + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9d ft4qmT'}, + ] + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} bearer_body = 'access_token=vF9dft4qmT' bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT' + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True + def test_prepare_mac_header(self): """Verify mac signatures correctness |