diff options
author | Ib Lundgren <ib.lundgren@gmail.com> | 2013-09-12 10:05:33 +0100 |
---|---|---|
committer | Ib Lundgren <ib.lundgren@gmail.com> | 2013-09-12 10:05:33 +0100 |
commit | 62058f2d031d91bb6173fe06a1f6f11e22a9f03e (patch) | |
tree | 91f947a737da32e53ef5bd16500285440499fbc8 | |
parent | 1122945efbf3d1be6fed0e2279dfb81f785ad706 (diff) | |
download | oauthlib-62058f2d031d91bb6173fe06a1f6f11e22a9f03e.tar.gz |
Restructure OAuth2 tests.
20 files changed, 1365 insertions, 1151 deletions
diff --git a/oauthlib/oauth2/rfc6749/request_validator.py b/oauthlib/oauth2/rfc6749/request_validator.py index 5fde7ee..8612366 100644 --- a/oauthlib/oauth2/rfc6749/request_validator.py +++ b/oauthlib/oauth2/rfc6749/request_validator.py @@ -60,7 +60,8 @@ class RequestValidator(object): """ raise NotImplementedError('Subclasses must implement this method.') - def confirm_redirect_uri(self, client_id, code, redirect_uri, client, *args, **kwargs): + def confirm_redirect_uri(self, client_id, code, redirect_uri, client, + request, *args, **kwargs): """Ensure client is authorized to redirect to the redirect_uri requested. If the client specifies a redirect_uri when obtaining code then diff --git a/tests/oauth2/rfc6749/endpoints/__init__.py b/tests/oauth2/rfc6749/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/__init__.py diff --git a/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py new file mode 100644 index 0000000..79124e3 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ....unittest import TestCase + +from oauthlib.oauth2.rfc6749 import BaseEndpoint, catch_errors_and_unavailability +from oauthlib.oauth2 import Server, RequestValidator, FatalClientError, OAuth2Error + + +class BaseEndpointTest(TestCase): + + def test_default_config(self): + endpoint = BaseEndpoint() + self.assertFalse(endpoint.catch_errors) + self.assertTrue(endpoint.available) + endpoint.catch_errors = True + self.assertTrue(endpoint.catch_errors) + endpoint.available = False + self.assertFalse(endpoint.available) + + def test_error_catching(self): + validator = RequestValidator() + server = Server(validator) + server.catch_errors = True + h, b, s = server.create_authorization_response('https://example.com') + self.assertIn("server_error", b) + self.assertEqual(s, 500) + + def test_unavailability(self): + validator = RequestValidator() + server = Server(validator) + server.available = False + h, b, s = server.create_authorization_response('https://example.com') + self.assertIn("temporarily_unavailable", b) + self.assertEqual(s, 503) + + def test_wrapper(self): + + class TestServer(Server): + + @catch_errors_and_unavailability + def throw_error(self, uri): + raise ValueError() + + @catch_errors_and_unavailability + def throw_oauth_error(self, uri): + raise OAuth2Error() + + @catch_errors_and_unavailability + def throw_fatal_oauth_error(self, uri): + raise FatalClientError() + + validator = RequestValidator() + server = TestServer(validator) + + server.catch_errors = True + h, b, s = server.throw_error('a') + self.assertIn("server_error", b) + self.assertEqual(s, 500) + + server.available = False + h, b, s = server.throw_error('a') + self.assertIn("temporarily_unavailable", b) + self.assertEqual(s, 503) + + server.available = True + self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a') + self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a') + server.catch_errors = False + self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a') + self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a') diff --git a/tests/oauth2/rfc6749/endpoints/test_client_authentication.py b/tests/oauth2/rfc6749/endpoints/test_client_authentication.py new file mode 100644 index 0000000..fdd9665 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_client_authentication.py @@ -0,0 +1,103 @@ +"""Client authentication tests across all endpoints. + +Client authentication in OAuth2 serve two purposes, to authenticate +confidential clients and to ensure public clients are in fact public. The +latter is achieved with authenticate_client_id and the former with +authenticate_client. + +We make sure authentication is done by requiring a client object to be set +on the request object with a client_id parameter. The client_id attribute +prevents this check from being circumvented with a client form parameter. +""" +from __future__ import absolute_import, unicode_literals +import json +import mock + +from .test_utils import get_fragment_credentials +from ....unittest import TestCase + +from oauthlib.oauth2 import RequestValidator +from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer +from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer + + +class ClientAuthenticationTest(TestCase): + + def inspect_client(self, request, refresh_token=False): + if not request.client or not request.client.client_id: + raise ValueError() + return 'abc' + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = 'http://i.b./path' + self.web = WebApplicationServer(self.validator, + token_generator=self.inspect_client) + self.mobile = MobileApplicationServer(self.validator, + token_generator=self.inspect_client) + self.legacy = LegacyApplicationServer(self.validator, + token_generator=self.inspect_client) + self.backend = BackendApplicationServer(self.validator, + token_generator=self.inspect_client) + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_client_id(self, client_id, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_username(self, username, password, client, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def test_client_id_authentication(self): + token_uri = 'http://example.com/path' + + # authorization code grant + self.validator.authenticate_client.return_value = False + self.validator.authenticate_client_id.return_value = False + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=mock') + self.assertEqual(json.loads(body)['error'], 'invalid_client') + + self.validator.authenticate_client_id.return_value = True + self.validator.authenticate_client.side_effect = self.set_client + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=mock') + self.assertIn('access_token', json.loads(body)) + + # implicit grant + auth_uri = 'http://example.com/path?client_id=abc&response_type=token' + self.assertRaises(ValueError, self.mobile.create_authorization_response, + auth_uri, scopes=['random']) + + self.validator.validate_client_id.side_effect = self.set_client_id + h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random']) + self.assertEqual(302, s) + self.assertIn('Location', h) + self.assertIn('access_token', get_fragment_credentials(h['Location'])) + + def test_custom_authentication(self): + token_uri = 'http://example.com/path' + + # authorization code grant + self.assertRaises(NotImplementedError, + self.web.create_token_response, token_uri, + body='grant_type=authorization_code&code=mock') + + # password grant + self.validator.authenticate_client.return_value = True + self.assertRaises(NotImplementedError, + self.legacy.create_token_response, token_uri, + body='grant_type=password&username=abc&password=secret') + + # client credentials grant + self.validator.authenticate_client.return_value = True + self.assertRaises(NotImplementedError, + self.backend.create_token_response, token_uri, + body='grant_type=client_credentials') diff --git a/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py b/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py new file mode 100644 index 0000000..98a132a --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py @@ -0,0 +1,116 @@ +"""Ensure credentials are preserved through the authorization. + +The Authorization Code Grant will need to preserve state as well as redirect +uri and the Implicit Grant will need to preserve state. +""" +from __future__ import absolute_import, unicode_literals +import json +import mock + +from .test_utils import get_query_credentials, get_fragment_credentials +from ....unittest import TestCase + +from oauthlib.oauth2 import RequestValidator +from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer +from oauthlib.oauth2.rfc6749 import errors + + +class PreservationTest(TestCase): + + DEFAULT_REDIRECT_URI = 'http://i.b./path' + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = self.DEFAULT_REDIRECT_URI + self.validator.authenticate_client.side_effect = self.set_client + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + + def set_state(self, state): + def set_request_state(client_id, code, client, request): + request.state = state + return True + return set_request_state + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def test_state_preservation(self): + auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + self.validator.validate_code.side_effect = self.set_state('xyz') + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['state'], 'xyz') + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz') + + def test_redirect_uri_preservation(self): + auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc' + redirect_uri = 'http://i.b/path' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + '&response_type=code', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertTrue(h['Location'].startswith(redirect_uri)) + + # confirm_redirect_uri should return false if the redirect uri + # was given in the authorization but not in the token request. + self.validator.confirm_redirect_uri.return_value = False + code = get_query_credentials(h['Location'])['code'][0] + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['error'], 'access_denied') + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + '&response_type=token', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertTrue(h['Location'].startswith(redirect_uri)) + + def test_invalid_redirect_uri(self): + auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc' + self.validator.validate_redirect_uri.return_value = False + + # authorization grant + self.assertRaises(errors.MismatchingRedirectURIError, + self.web.create_authorization_response, + auth_uri + '&response_type=code', scopes=['random']) + + # implicit grant + self.assertRaises(errors.MismatchingRedirectURIError, + self.mobile.create_authorization_response, + auth_uri + '&response_type=token', scopes=['random']) + + def test_default_uri(self): + auth_uri = 'http://example.com/path?state=xyz&client_id=abc' + + self.validator.get_default_redirect_uri.return_value = None + + # authorization grant + self.assertRaises(errors.MissingRedirectURIError, + self.web.create_authorization_response, + auth_uri + '&response_type=code', scopes=['random']) + + # implicit grant + self.assertRaises(errors.MissingRedirectURIError, + self.mobile.create_authorization_response, + auth_uri + '&response_type=token', scopes=['random']) diff --git a/tests/oauth2/rfc6749/endpoints/test_error_responses.py b/tests/oauth2/rfc6749/endpoints/test_error_responses.py new file mode 100644 index 0000000..5f65de3 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_error_responses.py @@ -0,0 +1,372 @@ +"""Ensure the correct error responses are returned for all defined error types. +""" +from __future__ import absolute_import, unicode_literals +import json +import mock + +from ....unittest import TestCase + +from oauthlib.oauth2 import RequestValidator +from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer +from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer +from oauthlib.oauth2.rfc6749 import errors + + +class ErrorResponseTest(TestCase): + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = None + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + self.legacy = LegacyApplicationServer(self.validator) + self.backend = BackendApplicationServer(self.validator) + + def test_invalid_redirect_uri(self): + uri = 'https://example.com/authorize?client_id=foo&redirect_uri=wrong' + # Authorization code grant + self.assertRaises(errors.InvalidRedirectURIError, + self.web.validate_authorization_request, uri) + self.assertRaises(errors.InvalidRedirectURIError, + self.web.create_authorization_response, uri, scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.InvalidRedirectURIError, + self.mobile.validate_authorization_request, uri) + self.assertRaises(errors.InvalidRedirectURIError, + self.mobile.create_authorization_response, uri, scopes=['foo']) + + def test_missing_redirect_uri(self): + uri = 'https://example.com/authorize?client_id=foo' + # Authorization code grant + self.assertRaises(errors.MissingRedirectURIError, + self.web.validate_authorization_request, uri) + self.assertRaises(errors.MissingRedirectURIError, + self.web.create_authorization_response, uri, scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.MissingRedirectURIError, + self.mobile.validate_authorization_request, uri) + self.assertRaises(errors.MissingRedirectURIError, + self.mobile.create_authorization_response, uri, scopes=['foo']) + + def test_mismatching_redirect_uri(self): + uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback' + # Authorization code grant + self.validator.validate_redirect_uri.return_value = False + self.assertRaises(errors.MismatchingRedirectURIError, + self.web.validate_authorization_request, uri) + self.assertRaises(errors.MismatchingRedirectURIError, + self.web.create_authorization_response, uri, scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.MismatchingRedirectURIError, + self.mobile.validate_authorization_request, uri) + self.assertRaises(errors.MismatchingRedirectURIError, + self.mobile.create_authorization_response, uri, scopes=['foo']) + + def test_missing_client_id(self): + uri = 'https://example.com/authorize?redirect_uri=https%3A%2F%2Fi.b%2Fback' + # Authorization code grant + self.validator.validate_redirect_uri.return_value = False + self.assertRaises(errors.MissingClientIdError, + self.web.validate_authorization_request, uri) + self.assertRaises(errors.MissingClientIdError, + self.web.create_authorization_response, uri, scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.MissingClientIdError, + self.mobile.validate_authorization_request, uri) + self.assertRaises(errors.MissingClientIdError, + self.mobile.create_authorization_response, uri, scopes=['foo']) + + def test_invalid_client_id(self): + uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback' + # Authorization code grant + self.validator.validate_client_id.return_value = False + self.assertRaises(errors.InvalidClientIdError, + self.web.validate_authorization_request, uri) + self.assertRaises(errors.InvalidClientIdError, + self.web.create_authorization_response, uri, scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.InvalidClientIdError, + self.mobile.validate_authorization_request, uri) + self.assertRaises(errors.InvalidClientIdError, + self.mobile.create_authorization_response, uri, scopes=['foo']) + + def test_invalid_request(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + token_uri = 'https://i.b/token' + invalid_uris = [ + # Duplicate parameters + 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}', + # Missing response type + 'https://i.b/auth?client_id=foo', + ] + + # Authorization code grant + for uri in invalid_uris: + self.assertRaises(errors.InvalidRequestError, + self.web.validate_authorization_request, + uri.format('code')) + h, _, s = self.web.create_authorization_response( + uri.format('code'), scopes=['foo']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertIn('error=invalid_request', h['Location']) + invalid_bodies = [ + # duplicate params + 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo' + ] + for body in invalid_bodies: + _, body, _ = self.web.create_token_response(token_uri, + body=body) + self.assertEqual('invalid_request', json.loads(body)['error']) + + # Implicit grant + for uri in invalid_uris: + self.assertRaises(errors.InvalidRequestError, + self.mobile.validate_authorization_request, + uri.format('token')) + h, _, s = self.mobile.create_authorization_response( + uri.format('token'), scopes=['foo']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertIn('error=invalid_request', h['Location']) + + # Password credentials grant + invalid_bodies = [ + # duplicate params + 'grant_type=password&username=foo&username=bar&password=baz' + # missing username + 'grant_type=password&password=baz' + # missing password + 'grant_type=password&username=foo' + ] + self.validator.authenticate_client.side_effect = self.set_client + for body in invalid_bodies: + _, body, _ = self.legacy.create_token_response(token_uri, + body=body) + self.assertEqual('invalid_request', json.loads(body)['error']) + + # Client credentials grant + invalid_bodies = [ + # duplicate params + 'grant_type=client_credentials&scope=foo&scope=bar' + ] + for body in invalid_bodies: + _, body, _ = self.backend.create_token_response(token_uri, + body=body) + self.assertEqual('invalid_request', json.loads(body)['error']) + + def test_unauthorized_client(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.validator.validate_grant_type.return_value = False + self.validator.validate_response_type.return_value = False + self.validator.authenticate_client.side_effect = self.set_client + token_uri = 'https://i.b/token' + + # Authorization code grant + self.assertRaises(errors.UnauthorizedClientError, + self.web.validate_authorization_request, + 'https://i.b/auth?response_type=code&client_id=foo') + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=foo') + self.assertEqual('unauthorized_client', json.loads(body)['error']) + + # Implicit grant + self.assertRaises(errors.UnauthorizedClientError, + self.mobile.validate_authorization_request, + 'https://i.b/auth?response_type=token&client_id=foo') + + # Password credentials grant + _, body, _ = self.legacy.create_token_response(token_uri, + body='grant_type=password&username=foo&password=bar') + self.assertEqual('unauthorized_client', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.backend.create_token_response(token_uri, + body='grant_type=client_credentials') + self.assertEqual('unauthorized_client', json.loads(body)['error']) + + def test_access_denied(self): + self.validator.authenticate_client.side_effect = self.set_client + self.validator.confirm_redirect_uri.return_value = False + token_uri = 'https://i.b/token' + # Authorization code grant + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=foo') + self.assertEqual('access_denied', json.loads(body)['error']) + + def test_unsupported_response_type(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + + # Authorization code grant + self.assertRaises(errors.UnsupportedResponseTypeError, + self.web.validate_authorization_request, + 'https://i.b/auth?response_type=foo&client_id=foo') + + # Implicit grant + self.assertRaises(errors.UnsupportedResponseTypeError, + self.mobile.validate_authorization_request, + 'https://i.b/auth?response_type=foo&client_id=foo') + + def test_invalid_scope(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.validator.validate_scopes.return_value = False + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + self.assertRaises(errors.InvalidScopeError, + self.web.validate_authorization_request, + 'https://i.b/auth?response_type=code&client_id=foo') + + # Implicit grant + self.assertRaises(errors.InvalidScopeError, + self.mobile.validate_authorization_request, + 'https://i.b/auth?response_type=token&client_id=foo') + + # Password credentials grant + _, body, _ = self.legacy.create_token_response( + 'https://i.b/token', + body='grant_type=password&username=foo&password=bar') + self.assertEqual('invalid_scope', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.backend.create_token_response( + 'https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual('invalid_scope', json.loads(body)['error']) + + def test_server_error(self): + def raise_error(*args, **kwargs): + raise ValueError() + + self.validator.validate_client_id.side_effect = raise_error + self.validator.authenticate_client.side_effect = raise_error + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + + # Authorization code grant + self.web.catch_errors = True + _, _, s = self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=code', + scopes=['foo']) + self.assertEqual(s, 500) + _, _, s = self.web.create_token_response( + 'https://i.b/token', + body='grant_type=authorization_code&code=foo', + scopes=['foo']) + self.assertEqual(s, 500) + + # Implicit grant + self.mobile.catch_errors = True + _, _, s = self.mobile.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=token', + scopes=['foo']) + self.assertEqual(s, 500) + + # Password credentials grant + self.legacy.catch_errors = True + _, _, s = self.legacy.create_token_response( + 'https://i.b/token', + body='grant_type=password&username=foo&password=foo') + self.assertEqual(s, 500) + + # Client credentials grant + self.backend.catch_errors = True + _, _, s = self.backend.create_token_response( + 'https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual(s, 500) + + def test_temporarily_unavailable(self): + # Authorization code grant + self.web.available = False + _, _, s = self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=code', + scopes=['foo']) + self.assertEqual(s, 503) + _, _, s = self.web.create_token_response( + 'https://i.b/token', + body='grant_type=authorization_code&code=foo', + scopes=['foo']) + self.assertEqual(s, 503) + + # Implicit grant + self.mobile.available = False + _, _, s = self.mobile.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=token', + scopes=['foo']) + self.assertEqual(s, 503) + + # Password credentials grant + self.legacy.available = False + _, _, s = self.legacy.create_token_response( + 'https://i.b/token', + body='grant_type=password&username=foo&password=foo') + self.assertEqual(s, 503) + + # Client credentials grant + self.backend.available = False + _, _, s = self.backend.create_token_response( + 'https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual(s, 503) + + def test_invalid_client(self): + self.validator.authenticate_client.return_value = False + self.validator.authenticate_client_id.return_value = False + + # Authorization code grant + _, body, _ = self.web.create_token_response('https://i.b/token', + body='grant_type=authorization_code&code=foo') + self.assertEqual('invalid_client', json.loads(body)['error']) + + # Password credentials grant + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=password&username=foo&password=bar') + self.assertEqual('invalid_client', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual('invalid_client', json.loads(body)['error']) + + def test_invalid_grant(self): + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + self.validator.validate_code.return_value = False + _, body, _ = self.web.create_token_response('https://i.b/token', + body='grant_type=authorization_code&code=foo') + self.assertEqual('invalid_grant', json.loads(body)['error']) + + # Password credentials grant + self.validator.validate_user.return_value = False + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=password&username=foo&password=bar') + self.assertEqual('invalid_grant', json.loads(body)['error']) + + def test_unsupported_grant_type(self): + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + _, body, _ = self.web.create_token_response('https://i.b/token', + body='grant_type=bar&code=foo') + self.assertEqual('unsupported_grant_type', json.loads(body)['error']) + + # Password credentials grant + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=bar&username=foo&password=bar') + self.assertEqual('unsupported_grant_type', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.backend.create_token_response('https://i.b/token', + body='grant_type=bar') + self.assertEqual('unsupported_grant_type', json.loads(body)['error']) diff --git a/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py b/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py new file mode 100644 index 0000000..b43fa02 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py @@ -0,0 +1,69 @@ +"""Ensure extra credentials can be supplied for inclusion in tokens. +""" +from __future__ import absolute_import, unicode_literals +import mock + +from ....unittest import TestCase + +from oauthlib.oauth2 import RequestValidator +from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer +from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer + + +class ExtraCredentialsTest(TestCase): + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + self.legacy = LegacyApplicationServer(self.validator) + self.backend = BackendApplicationServer(self.validator) + + def test_post_authorization_request(self): + def save_code(client_id, token, request): + self.assertEqual('creds', request.extra) + + def save_token(token, request): + self.assertEqual('creds', request.extra) + + # Authorization code grant + self.validator.save_authorization_code.side_effect = save_code + self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=code', + scopes=['foo'], + credentials={'extra': 'creds'}) + + # Implicit grant + self.validator.save_bearer_token.side_effect = save_token + self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=token', + scopes=['foo'], + credentials={'extra': 'creds'}) + + def test_token_request(self): + def save_token(token, request): + self.assertIn('extra', token) + + self.validator.save_bearer_token.side_effect = save_token + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + self.web.create_token_response('https://i.b/token', + body='grant_type=authorization_code&code=foo', + credentials={'extra': 'creds'}) + + # Password credentials grant + self.legacy.create_token_response('https://i.b/token', + body='grant_type=password&username=foo&password=bar', + credentials={'extra': 'creds'}) + + # Client credentials grant + self.backend.create_token_response('https://i.b/token', + body='grant_type=client_credentials', + credentials={'extra': 'creds'}) diff --git a/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py b/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py new file mode 100644 index 0000000..6b3137a --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py @@ -0,0 +1,106 @@ +"""Ensure all tokens are associated with a resource owner. +""" +from __future__ import absolute_import, unicode_literals +import json +import mock + +from .test_utils import get_query_credentials, get_fragment_credentials +from ....unittest import TestCase + +from oauthlib.oauth2 import RequestValidator +from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer +from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer + + +class ResourceOwnerAssociationTest(TestCase): + + auth_uri = 'http://example.com/path?client_id=abc' + token_uri = 'http://example.com/path' + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_user(self, client_id, code, client, request): + request.user = 'test' + return True + + def set_user_from_username(self, username, password, client, request): + request.user = 'test' + return True + + def set_user_from_credentials(self, request): + request.user = 'test' + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def inspect_client(self, request, refresh_token=False): + if not request.user: + raise ValueError() + return 'abc' + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = 'http://i.b./path' + self.validator.authenticate_client.side_effect = self.set_client + self.web = WebApplicationServer(self.validator, + token_generator=self.inspect_client) + self.mobile = MobileApplicationServer(self.validator, + token_generator=self.inspect_client) + self.legacy = LegacyApplicationServer(self.validator, + token_generator=self.inspect_client) + self.backend = BackendApplicationServer(self.validator, + token_generator=self.inspect_client) + + def test_web_application(self): + # TODO: code generator + intercept test + h, _, s = self.web.create_authorization_response( + self.auth_uri + '&response_type=code', + credentials={'user': 'test'}, scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + self.assertRaises(ValueError, + self.web.create_token_response, self.token_uri, + body='grant_type=authorization_code&code=%s' % code) + + self.validator.validate_code.side_effect = self.set_user + _, body, _ = self.web.create_token_response(self.token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['access_token'], 'abc') + + def test_mobile_application(self): + self.assertRaises(ValueError, + self.mobile.create_authorization_response, + self.auth_uri + '&response_type=token') + + h, _, s = self.mobile.create_authorization_response( + self.auth_uri + '&response_type=token', + credentials={'user': 'test'}, scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc') + + def test_legacy_application(self): + body = 'grant_type=password&username=abc&password=secret' + self.assertRaises(ValueError, + self.legacy.create_token_response, + self.token_uri, body=body) + + self.validator.validate_user.side_effect = self.set_user_from_username + _, body, _ = self.legacy.create_token_response( + self.token_uri, body=body) + self.assertEqual(json.loads(body)['access_token'], 'abc') + + def test_backend_application(self): + body = 'grant_type=client_credentials' + self.assertRaises(ValueError, + self.backend.create_token_response, + self.token_uri, body=body) + + self.validator.authenticate_client.side_effect = self.set_user_from_credentials + _, body, _ = self.backend.create_token_response( + self.token_uri, body=body) + self.assertEqual(json.loads(body)['access_token'], 'abc') diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py new file mode 100644 index 0000000..f48a4f9 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py @@ -0,0 +1,182 @@ +"""Ensure scope is preserved across authorization. + +Fairly trivial in all grants except the Authorization Code Grant where scope +need to be persisted temporarily in an authorization code. +""" +from __future__ import absolute_import, unicode_literals +import json +import mock + +from .test_utils import get_query_credentials, get_fragment_credentials +from ....unittest import TestCase + +from oauthlib.oauth2 import RequestValidator +from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer +from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer + + +class TestScopeHandling(TestCase): + + DEFAULT_REDIRECT_URI = 'http://i.b./path' + + def set_scopes(self, scopes): + def set_request_scopes(client_id, code, client, request): + request.scopes = scopes + return True + return set_request_scopes + + def set_user(self, request): + request.user = 'foo' + request.client_id = 'bar' + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI + self.validator.authenticate_client.side_effect = self.set_client + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + self.legacy = LegacyApplicationServer(self.validator) + self.backend = BackendApplicationServer(self.validator) + + def test_scope_extraction(self): + scopes = ( + ('images', ['images']), + ('images+videos', ['images', 'videos']), + ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']), + ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']), + ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']), + ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']), + ) + + uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s' + for scope, correct_scopes in scopes: + scopes, _ = self.web.validate_authorization_request( + uri % (scope, 'code')) + self.assertItemsEqual(scopes, correct_scopes) + scopes, _ = self.mobile.validate_authorization_request( + uri % (scope, 'token')) + self.assertItemsEqual(scopes, correct_scopes) + + def test_scope_preservation(self): + scope = 'pics+http%3A%2f%2fa.b%2fvideos' + decoded_scope = 'pics http://a.b/videos' + auth_uri = 'http://example.com/path?client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=decoded_scope.split(' ')) + self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' ')) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=decoded_scope.split(' ')) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope) + + # resource owner password credentials grant + body = 'grant_type=password&username=abc&password=secret&scope=%s' + + _, body, _ = self.legacy.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # client credentials grant + body = 'grant_type=client_credentials&scope=%s' + self.validator.authenticate_client.side_effect = self.set_user + _, body, _ = self.backend.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + def test_scope_changed(self): + scope = 'pics+http%3A%2f%2fa.b%2fvideos' + scopes = ['images', 'http://a.b/videos'] + decoded_scope = 'images http://a.b/videos' + auth_uri = 'http://example.com/path?client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=scopes) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + self.validator.validate_code.side_effect = self.set_scopes(scopes) + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # implicit grant + self.validator.validate_scopes.side_effect = self.set_scopes(scopes) + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=scopes) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope) + + # resource owner password credentials grant + self.validator.validate_scopes.side_effect = self.set_scopes(scopes) + body = 'grant_type=password&username=abc&password=secret&scope=%s' + _, body, _ = self.legacy.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # client credentials grant + self.validator.validate_scopes.side_effect = self.set_scopes(scopes) + self.validator.authenticate_client.side_effect = self.set_user + body = 'grant_type=client_credentials&scope=%s' + _, body, _ = self.backend.create_token_response(token_uri, + body=body % scope) + + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + def test_invalid_scope(self): + scope = 'pics+http%3A%2f%2fa.b%2fvideos' + auth_uri = 'http://example.com/path?client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + self.validator.validate_scopes.return_value = False + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=['invalid']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + error = get_query_credentials(h['Location'])['error'][0] + self.assertEqual(error, 'invalid_scope') + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=['invalid']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + error = get_fragment_credentials(h['Location'])['error'][0] + self.assertEqual(error, 'invalid_scope') + + # resource owner password credentials grant + body = 'grant_type=password&username=abc&password=secret&scope=%s' + _, body, _ = self.legacy.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['error'], 'invalid_scope') + + # client credentials grant + self.validator.authenticate_client.side_effect = self.set_user + body = 'grant_type=client_credentials&scope=%s' + _, body, _ = self.backend.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['error'], 'invalid_scope') diff --git a/tests/oauth2/rfc6749/endpoints/test_utils.py b/tests/oauth2/rfc6749/endpoints/test_utils.py new file mode 100644 index 0000000..6b7cff8 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_utils.py @@ -0,0 +1,14 @@ +try: + import urlparse +except ImportError: + import urllib.parse as urlparse + + +def get_query_credentials(uri): + return urlparse.parse_qs(urlparse.urlparse(uri).query, + keep_blank_values=True) + + +def get_fragment_credentials(uri): + return urlparse.parse_qs(urlparse.urlparse(uri).fragment, + keep_blank_values=True) diff --git a/tests/oauth2/rfc6749/grant_types/__init__.py b/tests/oauth2/rfc6749/grant_types/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/oauth2/rfc6749/grant_types/__init__.py diff --git a/tests/oauth2/rfc6749/grant_types/test_authorization_code.py b/tests/oauth2/rfc6749/grant_types/test_authorization_code.py new file mode 100644 index 0000000..a9c3e51 --- /dev/null +++ b/tests/oauth2/rfc6749/grant_types/test_authorization_code.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ....unittest import TestCase + +import json +import mock +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.errors import UnsupportedGrantTypeError +from oauthlib.oauth2.rfc6749.errors import InvalidRequestError +from oauthlib.oauth2.rfc6749.errors import InvalidClientError +from oauthlib.oauth2.rfc6749.errors import InvalidGrantError +from oauthlib.oauth2.rfc6749.grant_types import AuthorizationCodeGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + + +class AuthorizationCodeGrantTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'world') + self.request.expires_in = 1800 + self.request.client = 'batman' + self.request.client_id = 'abcdef' + self.request.code = '1234' + self.request.response_type = 'code' + self.request.grant_type = 'authorization_code' + + self.request_state = Request('http://a.b/path') + self.request_state.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def test_create_authorization_grant(self): + grant = self.auth.create_authorization_code(self.request) + self.assertIn('code', grant) + + grant = self.auth.create_authorization_code(self.request_state) + self.assertIn('code', grant) + self.assertIn('state', grant) + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator) + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + + def test_validate_token_request(self): + mock_validator = mock.MagicMock() + auth = AuthorizationCodeGrant(request_validator=mock_validator) + request = Request('http://a.b/path') + self.assertRaises(UnsupportedGrantTypeError, + auth.validate_token_request, request) + + request.grant_type = 'authorization_code' + self.assertRaises(InvalidRequestError, + auth.validate_token_request, request) + + mock_validator.authenticate_client.return_value = False + mock_validator.authenticate_client_id.return_value = False + request.code = 'waffles' + self.assertRaises(InvalidClientError, + auth.validate_token_request, request) + + request.client = 'batman' + mock_validator.authenticate_client = self.set_client + mock_validator.validate_code.return_value = False + self.assertRaises(InvalidGrantError, + auth.validate_token_request, request) diff --git a/tests/oauth2/rfc6749/grant_types/test_client_credentials.py b/tests/oauth2/rfc6749/grant_types/test_client_credentials.py new file mode 100644 index 0000000..80b92d3 --- /dev/null +++ b/tests/oauth2/rfc6749/grant_types/test_client_credentials.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ....unittest import TestCase + +import json +import mock +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.grant_types import ClientCredentialsGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + + +class ClientCredentialsGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.grant_type = 'client_credentials' + self.request.client = mock_client + self.request.scopes = ('mocked', 'scopes') + self.mock_validator = mock.MagicMock() + self.auth = ClientCredentialsGrant( + request_validator=self.mock_validator) + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + + def test_error_response(self): + pass + + def test_validate_token_response(self): + # wrong grant type, scope + pass diff --git a/tests/oauth2/rfc6749/grant_types/test_implicit.py b/tests/oauth2/rfc6749/grant_types/test_implicit.py new file mode 100644 index 0000000..df30c9a --- /dev/null +++ b/tests/oauth2/rfc6749/grant_types/test_implicit.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ....unittest import TestCase + +import mock +from oauthlib import common +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.grant_types import ImplicitGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + + +class ImplicitGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'world') + self.request.client = mock_client + self.request.client_id = 'abcdef' + self.request.response_type = 'token' + self.request.state = 'xyz' + self.request.redirect_uri = 'https://b.c/p' + + self.mock_validator = mock.MagicMock() + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator, expires_in=1800) + orig_generate_token = common.generate_token + self.addCleanup(setattr, common, 'generate_token', orig_generate_token) + common.generate_token = lambda *args, **kwargs: '1234' + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + correct_uri = 'https://b.c/p#access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world' + self.assertEqual(status_code, 302) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], correct_uri, parse_fragment=True) + + def test_error_response(self): + pass diff --git a/tests/oauth2/rfc6749/grant_types/test_refresh_token.py b/tests/oauth2/rfc6749/grant_types/test_refresh_token.py new file mode 100644 index 0000000..25c261c --- /dev/null +++ b/tests/oauth2/rfc6749/grant_types/test_refresh_token.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ....unittest import TestCase + +import json +import mock +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.grant_types import RefreshTokenGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + + +class RefreshTokenGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.grant_type = 'refresh_token' + self.request.refresh_token = 'lsdkfhj230' + self.request.client = mock_client + self.request.scope = 'foo' + self.mock_validator = mock.MagicMock() + self.auth = RefreshTokenGrant( + request_validator=self.mock_validator) + + def test_create_token_response(self): + self.mock_validator.get_original_scopes.return_value = ['foo', 'bar'] + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertEqual(token['scope'], 'foo') + + def test_create_token_inherit_scope(self): + self.request.scope = None + self.mock_validator.get_original_scopes.return_value = ['foo', 'bar'] + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertEqual(token['scope'], 'foo bar') + + def test_invalid_scope(self): + self.mock_validator.get_original_scopes.return_value = ['baz'] + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(token['error'], 'invalid_scope') + self.assertEqual(status_code, 401) + + def test_invalid_token(self): + self.mock_validator.validate_refresh_token.return_value = False + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(token['error'], 'invalid_grant') + self.assertEqual(status_code, 400) + + def test_invalid_client(self): + self.mock_validator.authenticate_client.return_value = False + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(token['error'], 'invalid_client') + self.assertEqual(status_code, 401) diff --git a/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py b/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py new file mode 100644 index 0000000..aaea440 --- /dev/null +++ b/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ....unittest import TestCase + +import json +import mock +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.grant_types import ResourceOwnerPasswordCredentialsGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + + +class ResourceOwnerPasswordCredentialsGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.grant_type = 'password' + self.request.username = 'john' + self.request.password = 'doe' + self.request.client = mock_client + self.request.scopes = ('mocked', 'scopes') + self.mock_validator = mock.MagicMock() + self.auth = ResourceOwnerPasswordCredentialsGrant( + request_validator=self.mock_validator) + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertIn('refresh_token', token) + + def test_error_response(self): + pass + + def test_scopes(self): + pass diff --git a/tests/oauth2/rfc6749/test_grant_types.py b/tests/oauth2/rfc6749/test_grant_types.py deleted file mode 100644 index 6aee0e8..0000000 --- a/tests/oauth2/rfc6749/test_grant_types.py +++ /dev/null @@ -1,260 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals -from ...unittest import TestCase - -import json -import mock -from oauthlib import common -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.errors import UnsupportedGrantTypeError -from oauthlib.oauth2.rfc6749.errors import InvalidRequestError -from oauthlib.oauth2.rfc6749.errors import InvalidClientError -from oauthlib.oauth2.rfc6749.errors import InvalidGrantError -from oauthlib.oauth2.rfc6749.grant_types import AuthorizationCodeGrant -from oauthlib.oauth2.rfc6749.grant_types import ImplicitGrant -from oauthlib.oauth2.rfc6749.grant_types import ResourceOwnerPasswordCredentialsGrant -from oauthlib.oauth2.rfc6749.grant_types import ClientCredentialsGrant -from oauthlib.oauth2.rfc6749.grant_types import RefreshTokenGrant -from oauthlib.oauth2.rfc6749.tokens import BearerToken - - -class RequestValidatorTest(TestCase): - - def test_client_id(self): - pass - - def test_client(self): - pass - - def test_response_type(self): - pass - - def test_scopes(self): - pass - - def test_redirect_uri(self): - pass - - -class AuthorizationCodeGrantTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'world') - self.request.expires_in = 1800 - self.request.client = 'batman' - self.request.client_id = 'abcdef' - self.request.code = '1234' - self.request.response_type = 'code' - self.request.grant_type = 'authorization_code' - - self.request_state = Request('http://a.b/path') - self.request_state.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def test_create_authorization_grant(self): - grant = self.auth.create_authorization_code(self.request) - self.assertIn('code', grant) - - grant = self.auth.create_authorization_code(self.request_state) - self.assertIn('code', grant) - self.assertIn('state', grant) - - def test_create_token_response(self): - bearer = BearerToken(self.mock_validator) - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - - def test_validate_token_request(self): - mock_validator = mock.MagicMock() - auth = AuthorizationCodeGrant(request_validator=mock_validator) - request = Request('http://a.b/path') - self.assertRaises(UnsupportedGrantTypeError, - auth.validate_token_request, request) - - request.grant_type = 'authorization_code' - self.assertRaises(InvalidRequestError, - auth.validate_token_request, request) - - mock_validator.authenticate_client.return_value = False - mock_validator.authenticate_client_id.return_value = False - request.code = 'waffles' - self.assertRaises(InvalidClientError, - auth.validate_token_request, request) - - request.client = 'batman' - mock_validator.authenticate_client = self.set_client - mock_validator.validate_code.return_value = False - self.assertRaises(InvalidGrantError, - auth.validate_token_request, request) - - -class ImplicitGrantTest(TestCase): - - def setUp(self): - mock_client = mock.MagicMock() - mock_client.user.return_value = 'mocked user' - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'world') - self.request.client = mock_client - self.request.client_id = 'abcdef' - self.request.response_type = 'token' - self.request.state = 'xyz' - self.request.redirect_uri = 'https://b.c/p' - - self.mock_validator = mock.MagicMock() - self.auth = ImplicitGrant(request_validator=self.mock_validator) - - def test_create_token_response(self): - bearer = BearerToken(self.mock_validator, expires_in=1800) - orig_generate_token = common.generate_token - self.addCleanup(setattr, common, 'generate_token', orig_generate_token) - common.generate_token = lambda *args, **kwargs: '1234' - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - correct_uri = 'https://b.c/p#access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world' - self.assertEqual(status_code, 302) - self.assertIn('Location', headers) - self.assertURLEqual(headers['Location'], correct_uri, parse_fragment=True) - - def test_error_response(self): - pass - - -class ResourceOwnerPasswordCredentialsGrantTest(TestCase): - - def setUp(self): - mock_client = mock.MagicMock() - mock_client.user.return_value = 'mocked user' - self.request = Request('http://a.b/path') - self.request.grant_type = 'password' - self.request.username = 'john' - self.request.password = 'doe' - self.request.client = mock_client - self.request.scopes = ('mocked', 'scopes') - self.mock_validator = mock.MagicMock() - self.auth = ResourceOwnerPasswordCredentialsGrant( - request_validator=self.mock_validator) - - def test_create_token_response(self): - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertIn('access_token', token) - self.assertIn('token_type', token) - self.assertIn('expires_in', token) - self.assertIn('refresh_token', token) - - def test_error_response(self): - pass - - def test_scopes(self): - pass - - -class ClientCredentialsGrantTest(TestCase): - - def setUp(self): - mock_client = mock.MagicMock() - mock_client.user.return_value = 'mocked user' - self.request = Request('http://a.b/path') - self.request.grant_type = 'client_credentials' - self.request.client = mock_client - self.request.scopes = ('mocked', 'scopes') - self.mock_validator = mock.MagicMock() - self.auth = ClientCredentialsGrant( - request_validator=self.mock_validator) - - def test_create_token_response(self): - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertIn('access_token', token) - self.assertIn('token_type', token) - self.assertIn('expires_in', token) - - def test_error_response(self): - pass - - def test_validate_token_response(self): - # wrong grant type, scope - pass - - -class RefreshTokenGrantTest(TestCase): - - def setUp(self): - mock_client = mock.MagicMock() - mock_client.user.return_value = 'mocked user' - self.request = Request('http://a.b/path') - self.request.grant_type = 'refresh_token' - self.request.refresh_token = 'lsdkfhj230' - self.request.client = mock_client - self.request.scope = 'foo' - self.mock_validator = mock.MagicMock() - self.auth = RefreshTokenGrant( - request_validator=self.mock_validator) - - def test_create_token_response(self): - self.mock_validator.get_original_scopes.return_value = ['foo', 'bar'] - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertIn('access_token', token) - self.assertIn('token_type', token) - self.assertIn('expires_in', token) - self.assertEqual(token['scope'], 'foo') - - def test_create_token_inherit_scope(self): - self.request.scope = None - self.mock_validator.get_original_scopes.return_value = ['foo', 'bar'] - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertIn('access_token', token) - self.assertIn('token_type', token) - self.assertIn('expires_in', token) - self.assertEqual(token['scope'], 'foo bar') - - def test_invalid_scope(self): - self.mock_validator.get_original_scopes.return_value = ['baz'] - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertEqual(token['error'], 'invalid_scope') - self.assertEqual(status_code, 401) - - def test_invalid_token(self): - self.mock_validator.validate_refresh_token.return_value = False - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertEqual(token['error'], 'invalid_grant') - self.assertEqual(status_code, 400) - - def test_invalid_client(self): - self.mock_validator.authenticate_client.return_value = False - bearer = BearerToken(self.mock_validator) - headers, body, status_code = self.auth.create_token_response( - self.request, bearer) - token = json.loads(body) - self.assertEqual(token['error'], 'invalid_client') - self.assertEqual(status_code, 401) diff --git a/tests/oauth2/rfc6749/test_request_validator.py b/tests/oauth2/rfc6749/test_request_validator.py new file mode 100644 index 0000000..1adbe70 --- /dev/null +++ b/tests/oauth2/rfc6749/test_request_validator.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from ...unittest import TestCase + +from oauthlib.oauth2 import RequestValidator + +class RequestValidatorTest(TestCase): + + def test_method_contracts(self): + v = RequestValidator() + self.assertRaises(NotImplementedError, v.authenticate_client, 'r') + self.assertRaises(NotImplementedError, v.authenticate_client_id, + 'client_id', 'r') + self.assertRaises(NotImplementedError, v.confirm_redirect_uri, + 'client_id', 'code', 'redirect_uri', 'client', 'request') + self.assertRaises(NotImplementedError, v.get_default_redirect_uri, + 'client_id', 'request') + self.assertRaises(NotImplementedError, v.get_default_scopes, + 'client_id', 'request') + self.assertRaises(NotImplementedError, v.get_original_scopes, + 'refresh_token', 'request') + self.assertRaises(NotImplementedError, v.invalidate_authorization_code, + 'client_id', 'code', 'request') + self.assertRaises(NotImplementedError, v.save_authorization_code, + 'client_id', 'code', 'request') + self.assertRaises(NotImplementedError, v.save_bearer_token, + 'token', 'request') + self.assertRaises(NotImplementedError, v.validate_bearer_token, + 'token', 'scopes', 'request') + self.assertRaises(NotImplementedError, v.validate_client_id, + 'client_id', 'request') + self.assertRaises(NotImplementedError, v.validate_code, + 'client_id', 'code', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_grant_type, + 'client_id', 'grant_type', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_redirect_uri, + 'client_id', 'redirect_uri', 'request') + self.assertRaises(NotImplementedError, v.validate_refresh_token, + 'refresh_token', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_response_type, + 'client_id', 'response_type', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_scopes, + 'client_id', 'scopes', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_user, + 'username', 'password', 'client', 'request') diff --git a/tests/oauth2/rfc6749/test_servers.py b/tests/oauth2/rfc6749/test_servers.py deleted file mode 100644 index 1a26d7c..0000000 --- a/tests/oauth2/rfc6749/test_servers.py +++ /dev/null @@ -1,890 +0,0 @@ - -"""Test for the various oauth 2 provider endpoints. - -The tests focus on shared functionality between the different endpoints to -ensure consistency in both behaviour and provided interfaces. -""" -from __future__ import absolute_import, unicode_literals -import json -import mock -try: - import urlparse -except ImportError: - import urllib.parse as urlparse -from ...unittest import TestCase - -from oauthlib.oauth2 import RequestValidator -from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer -from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer -from oauthlib.oauth2.rfc6749 import errors - - -def get_query_credentials(uri): - return urlparse.parse_qs(urlparse.urlparse(uri).query, - keep_blank_values=True) - - -def get_fragment_credentials(uri): - return urlparse.parse_qs(urlparse.urlparse(uri).fragment, - keep_blank_values=True) - - -class TestScopeHandling(TestCase): - - DEFAULT_REDIRECT_URI = 'http://i.b./path' - - def set_scopes(self, scopes): - def set_request_scopes(client_id, code, client, request): - request.scopes = scopes - return True - return set_request_scopes - - def set_user(self, request): - request.user = 'foo' - request.client_id = 'bar' - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI - self.validator.authenticate_client.side_effect = self.set_client - self.web = WebApplicationServer(self.validator) - self.mobile = MobileApplicationServer(self.validator) - self.legacy = LegacyApplicationServer(self.validator) - self.backend = BackendApplicationServer(self.validator) - - def test_scope_extraction(self): - scopes = ( - ('images', ['images']), - ('images+videos', ['images', 'videos']), - ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']), - ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']), - ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']), - ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']), - ) - - uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s' - for scope, correct_scopes in scopes: - scopes, _ = self.web.validate_authorization_request( - uri % (scope, 'code')) - self.assertItemsEqual(scopes, correct_scopes) - scopes, _ = self.mobile.validate_authorization_request( - uri % (scope, 'token')) - self.assertItemsEqual(scopes, correct_scopes) - - def test_scope_preservation(self): - scope = 'pics+http%3A%2f%2fa.b%2fvideos' - decoded_scope = 'pics http://a.b/videos' - auth_uri = 'http://example.com/path?client_id=abc&response_type=' - token_uri = 'http://example.com/path' - - # authorization grant - h, _, s = self.web.create_authorization_response( - auth_uri + 'code', scopes=decoded_scope.split(' ')) - self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' ')) - self.assertEqual(s, 302) - self.assertIn('Location', h) - code = get_query_credentials(h['Location'])['code'][0] - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) - self.assertEqual(json.loads(body)['scope'], decoded_scope) - - # implicit grant - h, _, s = self.mobile.create_authorization_response( - auth_uri + 'token', scopes=decoded_scope.split(' ')) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope) - - # resource owner password credentials grant - body = 'grant_type=password&username=abc&password=secret&scope=%s' - - _, body, _ = self.legacy.create_token_response(token_uri, - body=body % scope) - self.assertEqual(json.loads(body)['scope'], decoded_scope) - - # client credentials grant - body = 'grant_type=client_credentials&scope=%s' - self.validator.authenticate_client.side_effect = self.set_user - _, body, _ = self.backend.create_token_response(token_uri, - body=body % scope) - self.assertEqual(json.loads(body)['scope'], decoded_scope) - - def test_scope_changed(self): - scope = 'pics+http%3A%2f%2fa.b%2fvideos' - scopes = ['images', 'http://a.b/videos'] - decoded_scope = 'images http://a.b/videos' - auth_uri = 'http://example.com/path?client_id=abc&response_type=' - token_uri = 'http://example.com/path' - - # authorization grant - h, _, s = self.web.create_authorization_response( - auth_uri + 'code', scopes=scopes) - self.assertEqual(s, 302) - self.assertIn('Location', h) - code = get_query_credentials(h['Location'])['code'][0] - self.validator.validate_code.side_effect = self.set_scopes(scopes) - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) - self.assertEqual(json.loads(body)['scope'], decoded_scope) - - # implicit grant - self.validator.validate_scopes.side_effect = self.set_scopes(scopes) - h, _, s = self.mobile.create_authorization_response( - auth_uri + 'token', scopes=scopes) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope) - - # resource owner password credentials grant - self.validator.validate_scopes.side_effect = self.set_scopes(scopes) - body = 'grant_type=password&username=abc&password=secret&scope=%s' - _, body, _ = self.legacy.create_token_response(token_uri, - body=body % scope) - self.assertEqual(json.loads(body)['scope'], decoded_scope) - - # client credentials grant - self.validator.validate_scopes.side_effect = self.set_scopes(scopes) - self.validator.authenticate_client.side_effect = self.set_user - body = 'grant_type=client_credentials&scope=%s' - _, body, _ = self.backend.create_token_response(token_uri, - body=body % scope) - - self.assertEqual(json.loads(body)['scope'], decoded_scope) - - def test_invalid_scope(self): - scope = 'pics+http%3A%2f%2fa.b%2fvideos' - auth_uri = 'http://example.com/path?client_id=abc&response_type=' - token_uri = 'http://example.com/path' - - self.validator.validate_scopes.return_value = False - - # authorization grant - h, _, s = self.web.create_authorization_response( - auth_uri + 'code', scopes=['invalid']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - error = get_query_credentials(h['Location'])['error'][0] - self.assertEqual(error, 'invalid_scope') - - # implicit grant - h, _, s = self.mobile.create_authorization_response( - auth_uri + 'token', scopes=['invalid']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - error = get_fragment_credentials(h['Location'])['error'][0] - self.assertEqual(error, 'invalid_scope') - - # resource owner password credentials grant - body = 'grant_type=password&username=abc&password=secret&scope=%s' - _, body, _ = self.legacy.create_token_response(token_uri, - body=body % scope) - self.assertEqual(json.loads(body)['error'], 'invalid_scope') - - # client credentials grant - self.validator.authenticate_client.side_effect = self.set_user - body = 'grant_type=client_credentials&scope=%s' - _, body, _ = self.backend.create_token_response(token_uri, - body=body % scope) - self.assertEqual(json.loads(body)['error'], 'invalid_scope') - - -class PreservationTest(TestCase): - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI - self.validator.authenticate_client.side_effect = self.set_client - self.web = WebApplicationServer(self.validator) - self.mobile = MobileApplicationServer(self.validator) - - def set_state(self, state): - def set_request_state(client_id, code, client, request): - request.state = state - return True - return set_request_state - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def test_state_preservation(self): - auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type=' - token_uri = 'http://example.com/path' - - # authorization grant - h, _, s = self.web.create_authorization_response( - auth_uri + 'code', scopes=['random']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - code = get_query_credentials(h['Location'])['code'][0] - self.validator.validate_code.side_effect = self.set_state('xyz') - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) - self.assertEqual(json.loads(body)['state'], 'xyz') - - # implicit grant - h, _, s = self.mobile.create_authorization_response( - auth_uri + 'token', scopes=['random']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz') - - def test_redirect_uri_preservation(self): - auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc' - redirect_uri = 'http://i.b/path' - token_uri = 'http://example.com/path' - - # authorization grant - h, _, s = self.web.create_authorization_response( - auth_uri + '&response_type=code', scopes=['random']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertTrue(h['Location'].startswith(redirect_uri)) - - # confirm_redirect_uri should return false if the redirect uri - # was given in the authorization but not in the token request. - self.validator.confirm_redirect_uri.return_value = False - code = get_query_credentials(h['Location'])['code'][0] - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) - self.assertEqual(json.loads(body)['error'], 'access_denied') - - # implicit grant - h, _, s = self.mobile.create_authorization_response( - auth_uri + '&response_type=token', scopes=['random']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertTrue(h['Location'].startswith(redirect_uri)) - - def test_invalid_redirect_uri(self): - auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc' - self.validator.validate_redirect_uri.return_value = False - - # authorization grant - self.assertRaises(errors.MismatchingRedirectURIError, - self.web.create_authorization_response, - auth_uri + '&response_type=code', scopes=['random']) - - # implicit grant - self.assertRaises(errors.MismatchingRedirectURIError, - self.mobile.create_authorization_response, - auth_uri + '&response_type=token', scopes=['random']) - - def test_default_uri(self): - auth_uri = 'http://example.com/path?state=xyz&client_id=abc' - - self.validator.get_default_redirect_uri.return_value = None - - # authorization grant - self.assertRaises(errors.MissingRedirectURIError, - self.web.create_authorization_response, - auth_uri + '&response_type=code', scopes=['random']) - - # implicit grant - self.assertRaises(errors.MissingRedirectURIError, - self.mobile.create_authorization_response, - auth_uri + '&response_type=token', scopes=['random']) - - -class ClientAuthenticationTest(TestCase): - - def inspect_client(self, request, refresh_token=False): - if not request.client or not request.client.client_id: - raise ValueError() - return 'abc' - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = 'http://i.b./path' - self.web = WebApplicationServer(self.validator, - token_generator=self.inspect_client) - self.mobile = MobileApplicationServer(self.validator, - token_generator=self.inspect_client) - self.legacy = LegacyApplicationServer(self.validator, - token_generator=self.inspect_client) - self.backend = BackendApplicationServer(self.validator, - token_generator=self.inspect_client) - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def set_client_id(self, client_id, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def set_username(self, username, password, client, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def test_client_id_authentication(self): - token_uri = 'http://example.com/path' - - # authorization code grant - self.validator.authenticate_client.return_value = False - self.validator.authenticate_client_id.return_value = False - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=mock') - self.assertEqual(json.loads(body)['error'], 'invalid_client') - - self.validator.authenticate_client_id.return_value = True - self.validator.authenticate_client.side_effect = self.set_client - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=mock') - self.assertIn('access_token', json.loads(body)) - - # implicit grant - auth_uri = 'http://example.com/path?client_id=abc&response_type=token' - self.assertRaises(ValueError, self.mobile.create_authorization_response, - auth_uri, scopes=['random']) - - self.validator.validate_client_id.side_effect = self.set_client_id - h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random']) - self.assertEqual(302, s) - self.assertIn('Location', h) - self.assertIn('access_token', get_fragment_credentials(h['Location'])) - - def test_custom_authentication(self): - token_uri = 'http://example.com/path' - - # authorization code grant - self.assertRaises(NotImplementedError, - self.web.create_token_response, token_uri, - body='grant_type=authorization_code&code=mock') - - # password grant - self.validator.authenticate_client.return_value = True - self.assertRaises(NotImplementedError, - self.legacy.create_token_response, token_uri, - body='grant_type=password&username=abc&password=secret') - - # client credentials grant - self.validator.authenticate_client.return_value = True - self.assertRaises(NotImplementedError, - self.backend.create_token_response, token_uri, - body='grant_type=client_credentials') - - -class ResourceOwnerAssociationTest(TestCase): - - auth_uri = 'http://example.com/path?client_id=abc' - token_uri = 'http://example.com/path' - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def set_user(self, client_id, code, client, request): - request.user = 'test' - return True - - def set_user_from_username(self, username, password, client, request): - request.user = 'test' - return True - - def set_user_from_credentials(self, request): - request.user = 'test' - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def inspect_client(self, request, refresh_token=False): - if not request.user: - raise ValueError() - return 'abc' - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = 'http://i.b./path' - self.validator.authenticate_client.side_effect = self.set_client - self.web = WebApplicationServer(self.validator, - token_generator=self.inspect_client) - self.mobile = MobileApplicationServer(self.validator, - token_generator=self.inspect_client) - self.legacy = LegacyApplicationServer(self.validator, - token_generator=self.inspect_client) - self.backend = BackendApplicationServer(self.validator, - token_generator=self.inspect_client) - - def test_web_application(self): - # TODO: code generator + intercept test - h, _, s = self.web.create_authorization_response( - self.auth_uri + '&response_type=code', - credentials={'user': 'test'}, scopes=['random']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - code = get_query_credentials(h['Location'])['code'][0] - self.assertRaises(ValueError, - self.web.create_token_response, self.token_uri, - body='grant_type=authorization_code&code=%s' % code) - - self.validator.validate_code.side_effect = self.set_user - _, body, _ = self.web.create_token_response(self.token_uri, - body='grant_type=authorization_code&code=%s' % code) - self.assertEqual(json.loads(body)['access_token'], 'abc') - - def test_mobile_application(self): - self.assertRaises(ValueError, - self.mobile.create_authorization_response, - self.auth_uri + '&response_type=token') - - h, _, s = self.mobile.create_authorization_response( - self.auth_uri + '&response_type=token', - credentials={'user': 'test'}, scopes=['random']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc') - - def test_legacy_application(self): - body = 'grant_type=password&username=abc&password=secret' - self.assertRaises(ValueError, - self.legacy.create_token_response, - self.token_uri, body=body) - - self.validator.validate_user.side_effect = self.set_user_from_username - _, body, _ = self.legacy.create_token_response( - self.token_uri, body=body) - self.assertEqual(json.loads(body)['access_token'], 'abc') - - def test_backend_application(self): - body = 'grant_type=client_credentials' - self.assertRaises(ValueError, - self.backend.create_token_response, - self.token_uri, body=body) - - self.validator.authenticate_client.side_effect = self.set_user_from_credentials - _, body, _ = self.backend.create_token_response( - self.token_uri, body=body) - self.assertEqual(json.loads(body)['access_token'], 'abc') - - -class ErrorResponseTest(TestCase): - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = None - self.web = WebApplicationServer(self.validator) - self.mobile = MobileApplicationServer(self.validator) - self.legacy = LegacyApplicationServer(self.validator) - self.backend = BackendApplicationServer(self.validator) - - def test_invalid_redirect_uri(self): - uri = 'https://example.com/authorize?client_id=foo&redirect_uri=wrong' - # Authorization code grant - self.assertRaises(errors.InvalidRedirectURIError, - self.web.validate_authorization_request, uri) - self.assertRaises(errors.InvalidRedirectURIError, - self.web.create_authorization_response, uri, scopes=['foo']) - - # Implicit grant - self.assertRaises(errors.InvalidRedirectURIError, - self.mobile.validate_authorization_request, uri) - self.assertRaises(errors.InvalidRedirectURIError, - self.mobile.create_authorization_response, uri, scopes=['foo']) - - def test_missing_redirect_uri(self): - uri = 'https://example.com/authorize?client_id=foo' - # Authorization code grant - self.assertRaises(errors.MissingRedirectURIError, - self.web.validate_authorization_request, uri) - self.assertRaises(errors.MissingRedirectURIError, - self.web.create_authorization_response, uri, scopes=['foo']) - - # Implicit grant - self.assertRaises(errors.MissingRedirectURIError, - self.mobile.validate_authorization_request, uri) - self.assertRaises(errors.MissingRedirectURIError, - self.mobile.create_authorization_response, uri, scopes=['foo']) - - def test_mismatching_redirect_uri(self): - uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback' - # Authorization code grant - self.validator.validate_redirect_uri.return_value = False - self.assertRaises(errors.MismatchingRedirectURIError, - self.web.validate_authorization_request, uri) - self.assertRaises(errors.MismatchingRedirectURIError, - self.web.create_authorization_response, uri, scopes=['foo']) - - # Implicit grant - self.assertRaises(errors.MismatchingRedirectURIError, - self.mobile.validate_authorization_request, uri) - self.assertRaises(errors.MismatchingRedirectURIError, - self.mobile.create_authorization_response, uri, scopes=['foo']) - - def test_missing_client_id(self): - uri = 'https://example.com/authorize?redirect_uri=https%3A%2F%2Fi.b%2Fback' - # Authorization code grant - self.validator.validate_redirect_uri.return_value = False - self.assertRaises(errors.MissingClientIdError, - self.web.validate_authorization_request, uri) - self.assertRaises(errors.MissingClientIdError, - self.web.create_authorization_response, uri, scopes=['foo']) - - # Implicit grant - self.assertRaises(errors.MissingClientIdError, - self.mobile.validate_authorization_request, uri) - self.assertRaises(errors.MissingClientIdError, - self.mobile.create_authorization_response, uri, scopes=['foo']) - - def test_invalid_client_id(self): - uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback' - # Authorization code grant - self.validator.validate_client_id.return_value = False - self.assertRaises(errors.InvalidClientIdError, - self.web.validate_authorization_request, uri) - self.assertRaises(errors.InvalidClientIdError, - self.web.create_authorization_response, uri, scopes=['foo']) - - # Implicit grant - self.assertRaises(errors.InvalidClientIdError, - self.mobile.validate_authorization_request, uri) - self.assertRaises(errors.InvalidClientIdError, - self.mobile.create_authorization_response, uri, scopes=['foo']) - - def test_invalid_request(self): - self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' - token_uri = 'https://i.b/token' - invalid_uris = [ - # Duplicate parameters - 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}', - # Missing response type - 'https://i.b/auth?client_id=foo', - ] - - # Authorization code grant - for uri in invalid_uris: - self.assertRaises(errors.InvalidRequestError, - self.web.validate_authorization_request, - uri.format('code')) - h, _, s = self.web.create_authorization_response( - uri.format('code'), scopes=['foo']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertIn('error=invalid_request', h['Location']) - invalid_bodies = [ - # duplicate params - 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo' - ] - for body in invalid_bodies: - _, body, _ = self.web.create_token_response(token_uri, - body=body) - self.assertEqual('invalid_request', json.loads(body)['error']) - - # Implicit grant - for uri in invalid_uris: - self.assertRaises(errors.InvalidRequestError, - self.mobile.validate_authorization_request, - uri.format('token')) - h, _, s = self.mobile.create_authorization_response( - uri.format('token'), scopes=['foo']) - self.assertEqual(s, 302) - self.assertIn('Location', h) - self.assertIn('error=invalid_request', h['Location']) - - # Password credentials grant - invalid_bodies = [ - # duplicate params - 'grant_type=password&username=foo&username=bar&password=baz' - # missing username - 'grant_type=password&password=baz' - # missing password - 'grant_type=password&username=foo' - ] - self.validator.authenticate_client.side_effect = self.set_client - for body in invalid_bodies: - _, body, _ = self.legacy.create_token_response(token_uri, - body=body) - self.assertEqual('invalid_request', json.loads(body)['error']) - - # Client credentials grant - invalid_bodies = [ - # duplicate params - 'grant_type=client_credentials&scope=foo&scope=bar' - ] - for body in invalid_bodies: - _, body, _ = self.backend.create_token_response(token_uri, - body=body) - self.assertEqual('invalid_request', json.loads(body)['error']) - - def test_unauthorized_client(self): - self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' - self.validator.validate_grant_type.return_value = False - self.validator.validate_response_type.return_value = False - self.validator.authenticate_client.side_effect = self.set_client - token_uri = 'https://i.b/token' - - # Authorization code grant - self.assertRaises(errors.UnauthorizedClientError, - self.web.validate_authorization_request, - 'https://i.b/auth?response_type=code&client_id=foo') - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=foo') - self.assertEqual('unauthorized_client', json.loads(body)['error']) - - # Implicit grant - self.assertRaises(errors.UnauthorizedClientError, - self.mobile.validate_authorization_request, - 'https://i.b/auth?response_type=token&client_id=foo') - - # Password credentials grant - _, body, _ = self.legacy.create_token_response(token_uri, - body='grant_type=password&username=foo&password=bar') - self.assertEqual('unauthorized_client', json.loads(body)['error']) - - # Client credentials grant - _, body, _ = self.backend.create_token_response(token_uri, - body='grant_type=client_credentials') - self.assertEqual('unauthorized_client', json.loads(body)['error']) - - def test_access_denied(self): - self.validator.authenticate_client.side_effect = self.set_client - self.validator.confirm_redirect_uri.return_value = False - token_uri = 'https://i.b/token' - # Authorization code grant - _, body, _ = self.web.create_token_response(token_uri, - body='grant_type=authorization_code&code=foo') - self.assertEqual('access_denied', json.loads(body)['error']) - - def test_unsupported_response_type(self): - self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' - - # Authorization code grant - self.assertRaises(errors.UnsupportedResponseTypeError, - self.web.validate_authorization_request, - 'https://i.b/auth?response_type=foo&client_id=foo') - - # Implicit grant - self.assertRaises(errors.UnsupportedResponseTypeError, - self.mobile.validate_authorization_request, - 'https://i.b/auth?response_type=foo&client_id=foo') - - def test_invalid_scope(self): - self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' - self.validator.validate_scopes.return_value = False - self.validator.authenticate_client.side_effect = self.set_client - - # Authorization code grant - self.assertRaises(errors.InvalidScopeError, - self.web.validate_authorization_request, - 'https://i.b/auth?response_type=code&client_id=foo') - - # Implicit grant - self.assertRaises(errors.InvalidScopeError, - self.mobile.validate_authorization_request, - 'https://i.b/auth?response_type=token&client_id=foo') - - # Password credentials grant - _, body, _ = self.legacy.create_token_response( - 'https://i.b/token', - body='grant_type=password&username=foo&password=bar') - self.assertEqual('invalid_scope', json.loads(body)['error']) - - # Client credentials grant - _, body, _ = self.backend.create_token_response( - 'https://i.b/token', - body='grant_type=client_credentials') - self.assertEqual('invalid_scope', json.loads(body)['error']) - - def test_server_error(self): - def raise_error(*args, **kwargs): - raise ValueError() - - self.validator.validate_client_id.side_effect = raise_error - self.validator.authenticate_client.side_effect = raise_error - self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' - - # Authorization code grant - self.web.catch_errors = True - _, _, s = self.web.create_authorization_response( - 'https://i.b/auth?client_id=foo&response_type=code', - scopes=['foo']) - self.assertEqual(s, 500) - _, _, s = self.web.create_token_response( - 'https://i.b/token', - body='grant_type=authorization_code&code=foo', - scopes=['foo']) - self.assertEqual(s, 500) - - # Implicit grant - self.mobile.catch_errors = True - _, _, s = self.mobile.create_authorization_response( - 'https://i.b/auth?client_id=foo&response_type=token', - scopes=['foo']) - self.assertEqual(s, 500) - - # Password credentials grant - self.legacy.catch_errors = True - _, _, s = self.legacy.create_token_response( - 'https://i.b/token', - body='grant_type=password&username=foo&password=foo') - self.assertEqual(s, 500) - - # Client credentials grant - self.backend.catch_errors = True - _, _, s = self.backend.create_token_response( - 'https://i.b/token', - body='grant_type=client_credentials') - self.assertEqual(s, 500) - - def test_temporarily_unavailable(self): - # Authorization code grant - self.web.available = False - _, _, s = self.web.create_authorization_response( - 'https://i.b/auth?client_id=foo&response_type=code', - scopes=['foo']) - self.assertEqual(s, 503) - _, _, s = self.web.create_token_response( - 'https://i.b/token', - body='grant_type=authorization_code&code=foo', - scopes=['foo']) - self.assertEqual(s, 503) - - # Implicit grant - self.mobile.available = False - _, _, s = self.mobile.create_authorization_response( - 'https://i.b/auth?client_id=foo&response_type=token', - scopes=['foo']) - self.assertEqual(s, 503) - - # Password credentials grant - self.legacy.available = False - _, _, s = self.legacy.create_token_response( - 'https://i.b/token', - body='grant_type=password&username=foo&password=foo') - self.assertEqual(s, 503) - - # Client credentials grant - self.backend.available = False - _, _, s = self.backend.create_token_response( - 'https://i.b/token', - body='grant_type=client_credentials') - self.assertEqual(s, 503) - - def test_invalid_client(self): - self.validator.authenticate_client.return_value = False - self.validator.authenticate_client_id.return_value = False - - # Authorization code grant - _, body, _ = self.web.create_token_response('https://i.b/token', - body='grant_type=authorization_code&code=foo') - self.assertEqual('invalid_client', json.loads(body)['error']) - - # Password credentials grant - _, body, _ = self.legacy.create_token_response('https://i.b/token', - body='grant_type=password&username=foo&password=bar') - self.assertEqual('invalid_client', json.loads(body)['error']) - - # Client credentials grant - _, body, _ = self.legacy.create_token_response('https://i.b/token', - body='grant_type=client_credentials') - self.assertEqual('invalid_client', json.loads(body)['error']) - - def test_invalid_grant(self): - self.validator.authenticate_client.side_effect = self.set_client - - # Authorization code grant - self.validator.validate_code.return_value = False - _, body, _ = self.web.create_token_response('https://i.b/token', - body='grant_type=authorization_code&code=foo') - self.assertEqual('invalid_grant', json.loads(body)['error']) - - # Password credentials grant - self.validator.validate_user.return_value = False - _, body, _ = self.legacy.create_token_response('https://i.b/token', - body='grant_type=password&username=foo&password=bar') - self.assertEqual('invalid_grant', json.loads(body)['error']) - - def test_unsupported_grant_type(self): - self.validator.authenticate_client.side_effect = self.set_client - - # Authorization code grant - _, body, _ = self.web.create_token_response('https://i.b/token', - body='grant_type=bar&code=foo') - self.assertEqual('unsupported_grant_type', json.loads(body)['error']) - - # Password credentials grant - _, body, _ = self.legacy.create_token_response('https://i.b/token', - body='grant_type=bar&username=foo&password=bar') - self.assertEqual('unsupported_grant_type', json.loads(body)['error']) - - # Client credentials grant - _, body, _ = self.backend.create_token_response('https://i.b/token', - body='grant_type=bar') - self.assertEqual('unsupported_grant_type', json.loads(body)['error']) - - -class ExtraCredentialsTest(TestCase): - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - def setUp(self): - self.validator = mock.MagicMock(spec=RequestValidator) - self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' - self.web = WebApplicationServer(self.validator) - self.mobile = MobileApplicationServer(self.validator) - self.legacy = LegacyApplicationServer(self.validator) - self.backend = BackendApplicationServer(self.validator) - - def test_post_authorization_request(self): - def save_code(client_id, token, request): - self.assertEqual('creds', request.extra) - - def save_token(token, request): - self.assertEqual('creds', request.extra) - - # Authorization code grant - self.validator.save_authorization_code.side_effect = save_code - self.web.create_authorization_response( - 'https://i.b/auth?client_id=foo&response_type=code', - scopes=['foo'], - credentials={'extra': 'creds'}) - - # Implicit grant - self.validator.save_bearer_token.side_effect = save_token - self.web.create_authorization_response( - 'https://i.b/auth?client_id=foo&response_type=token', - scopes=['foo'], - credentials={'extra': 'creds'}) - - def test_token_request(self): - def save_token(token, request): - self.assertIn('extra', token) - - self.validator.save_bearer_token.side_effect = save_token - self.validator.authenticate_client.side_effect = self.set_client - - # Authorization code grant - self.web.create_token_response('https://i.b/token', - body='grant_type=authorization_code&code=foo', - credentials={'extra': 'creds'}) - - # Password credentials grant - self.legacy.create_token_response('https://i.b/token', - body='grant_type=password&username=foo&password=bar', - credentials={'extra': 'creds'}) - - # Client credentials grant - self.backend.create_token_response('https://i.b/token', - body='grant_type=client_credentials', - credentials={'extra': 'creds'}) diff --git a/tests/oauth2/rfc6749/test_utils.py b/tests/oauth2/rfc6749/test_utils.py index 6e713a7..ff7afd3 100644 --- a/tests/oauth2/rfc6749/test_utils.py +++ b/tests/oauth2/rfc6749/test_utils.py @@ -1,9 +1,13 @@ from __future__ import absolute_import, unicode_literals +import datetime import os + from ...unittest import TestCase from oauthlib.oauth2.rfc6749.utils import escape, host_from_uri +from oauthlib.oauth2.rfc6749.utils import generate_age from oauthlib.oauth2.rfc6749.utils import is_secure_transport +from oauthlib.oauth2.rfc6749.utils import params_from_uri class UtilsTests(TestCase): @@ -35,3 +39,12 @@ class UtilsTests(TestCase): os.environ['DEBUG'] = '1' self.assertTrue(is_secure_transport('http://example.com')) del os.environ['DEBUG'] + + def test_params_from_uri(self): + self.assertEqual(params_from_uri('http://i.b/?foo=bar&g&scope=a+d'), + {'foo': 'bar', 'g': '', 'scope': ['a', 'd']}) + + def test_generate_age(self): + issue_time = datetime.datetime.now() - datetime.timedelta( + days=3, minutes=1, seconds=4) + self.assertGreater(float(generate_age(issue_time)), 259263.0) |