summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIb Lundgren <ib.lundgren@gmail.com>2013-09-12 10:05:33 +0100
committerIb Lundgren <ib.lundgren@gmail.com>2013-09-12 10:05:33 +0100
commit62058f2d031d91bb6173fe06a1f6f11e22a9f03e (patch)
tree91f947a737da32e53ef5bd16500285440499fbc8
parent1122945efbf3d1be6fed0e2279dfb81f785ad706 (diff)
downloadoauthlib-62058f2d031d91bb6173fe06a1f6f11e22a9f03e.tar.gz
Restructure OAuth2 tests.
-rw-r--r--oauthlib/oauth2/rfc6749/request_validator.py3
-rw-r--r--tests/oauth2/rfc6749/endpoints/__init__.py0
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_base_endpoint.py70
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_client_authentication.py103
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py116
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_error_responses.py372
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_extra_credentials.py69
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py106
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_scope_handling.py182
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_utils.py14
-rw-r--r--tests/oauth2/rfc6749/grant_types/__init__.py0
-rw-r--r--tests/oauth2/rfc6749/grant_types/test_authorization_code.py78
-rw-r--r--tests/oauth2/rfc6749/grant_types/test_client_credentials.py39
-rw-r--r--tests/oauth2/rfc6749/grant_types/test_implicit.py41
-rw-r--r--tests/oauth2/rfc6749/grant_types/test_refresh_token.py74
-rw-r--r--tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py41
-rw-r--r--tests/oauth2/rfc6749/test_grant_types.py260
-rw-r--r--tests/oauth2/rfc6749/test_request_validator.py45
-rw-r--r--tests/oauth2/rfc6749/test_servers.py890
-rw-r--r--tests/oauth2/rfc6749/test_utils.py13
20 files changed, 1365 insertions, 1151 deletions
diff --git a/oauthlib/oauth2/rfc6749/request_validator.py b/oauthlib/oauth2/rfc6749/request_validator.py
index 5fde7ee..8612366 100644
--- a/oauthlib/oauth2/rfc6749/request_validator.py
+++ b/oauthlib/oauth2/rfc6749/request_validator.py
@@ -60,7 +60,8 @@ class RequestValidator(object):
"""
raise NotImplementedError('Subclasses must implement this method.')
- def confirm_redirect_uri(self, client_id, code, redirect_uri, client, *args, **kwargs):
+ def confirm_redirect_uri(self, client_id, code, redirect_uri, client,
+ request, *args, **kwargs):
"""Ensure client is authorized to redirect to the redirect_uri requested.
If the client specifies a redirect_uri when obtaining code then
diff --git a/tests/oauth2/rfc6749/endpoints/__init__.py b/tests/oauth2/rfc6749/endpoints/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/__init__.py
diff --git a/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py
new file mode 100644
index 0000000..79124e3
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+from oauthlib.oauth2.rfc6749 import BaseEndpoint, catch_errors_and_unavailability
+from oauthlib.oauth2 import Server, RequestValidator, FatalClientError, OAuth2Error
+
+
+class BaseEndpointTest(TestCase):
+
+ def test_default_config(self):
+ endpoint = BaseEndpoint()
+ self.assertFalse(endpoint.catch_errors)
+ self.assertTrue(endpoint.available)
+ endpoint.catch_errors = True
+ self.assertTrue(endpoint.catch_errors)
+ endpoint.available = False
+ self.assertFalse(endpoint.available)
+
+ def test_error_catching(self):
+ validator = RequestValidator()
+ server = Server(validator)
+ server.catch_errors = True
+ h, b, s = server.create_authorization_response('https://example.com')
+ self.assertIn("server_error", b)
+ self.assertEqual(s, 500)
+
+ def test_unavailability(self):
+ validator = RequestValidator()
+ server = Server(validator)
+ server.available = False
+ h, b, s = server.create_authorization_response('https://example.com')
+ self.assertIn("temporarily_unavailable", b)
+ self.assertEqual(s, 503)
+
+ def test_wrapper(self):
+
+ class TestServer(Server):
+
+ @catch_errors_and_unavailability
+ def throw_error(self, uri):
+ raise ValueError()
+
+ @catch_errors_and_unavailability
+ def throw_oauth_error(self, uri):
+ raise OAuth2Error()
+
+ @catch_errors_and_unavailability
+ def throw_fatal_oauth_error(self, uri):
+ raise FatalClientError()
+
+ validator = RequestValidator()
+ server = TestServer(validator)
+
+ server.catch_errors = True
+ h, b, s = server.throw_error('a')
+ self.assertIn("server_error", b)
+ self.assertEqual(s, 500)
+
+ server.available = False
+ h, b, s = server.throw_error('a')
+ self.assertIn("temporarily_unavailable", b)
+ self.assertEqual(s, 503)
+
+ server.available = True
+ self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a')
+ self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a')
+ server.catch_errors = False
+ self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a')
+ self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a')
diff --git a/tests/oauth2/rfc6749/endpoints/test_client_authentication.py b/tests/oauth2/rfc6749/endpoints/test_client_authentication.py
new file mode 100644
index 0000000..fdd9665
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_client_authentication.py
@@ -0,0 +1,103 @@
+"""Client authentication tests across all endpoints.
+
+Client authentication in OAuth2 serve two purposes, to authenticate
+confidential clients and to ensure public clients are in fact public. The
+latter is achieved with authenticate_client_id and the former with
+authenticate_client.
+
+We make sure authentication is done by requiring a client object to be set
+on the request object with a client_id parameter. The client_id attribute
+prevents this check from being circumvented with a client form parameter.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class ClientAuthenticationTest(TestCase):
+
+ def inspect_client(self, request, refresh_token=False):
+ if not request.client or not request.client.client_id:
+ raise ValueError()
+ return 'abc'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
+ self.web = WebApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.mobile = MobileApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.legacy = LegacyApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.backend = BackendApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client_id(self, client_id, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_username(self, username, password, client, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_client_id_authentication(self):
+ token_uri = 'http://example.com/path'
+
+ # authorization code grant
+ self.validator.authenticate_client.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=mock')
+ self.assertEqual(json.loads(body)['error'], 'invalid_client')
+
+ self.validator.authenticate_client_id.return_value = True
+ self.validator.authenticate_client.side_effect = self.set_client
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=mock')
+ self.assertIn('access_token', json.loads(body))
+
+ # implicit grant
+ auth_uri = 'http://example.com/path?client_id=abc&response_type=token'
+ self.assertRaises(ValueError, self.mobile.create_authorization_response,
+ auth_uri, scopes=['random'])
+
+ self.validator.validate_client_id.side_effect = self.set_client_id
+ h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random'])
+ self.assertEqual(302, s)
+ self.assertIn('Location', h)
+ self.assertIn('access_token', get_fragment_credentials(h['Location']))
+
+ def test_custom_authentication(self):
+ token_uri = 'http://example.com/path'
+
+ # authorization code grant
+ self.assertRaises(NotImplementedError,
+ self.web.create_token_response, token_uri,
+ body='grant_type=authorization_code&code=mock')
+
+ # password grant
+ self.validator.authenticate_client.return_value = True
+ self.assertRaises(NotImplementedError,
+ self.legacy.create_token_response, token_uri,
+ body='grant_type=password&username=abc&password=secret')
+
+ # client credentials grant
+ self.validator.authenticate_client.return_value = True
+ self.assertRaises(NotImplementedError,
+ self.backend.create_token_response, token_uri,
+ body='grant_type=client_credentials')
diff --git a/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py b/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py
new file mode 100644
index 0000000..98a132a
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py
@@ -0,0 +1,116 @@
+"""Ensure credentials are preserved through the authorization.
+
+The Authorization Code Grant will need to preserve state as well as redirect
+uri and the Implicit Grant will need to preserve state.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_query_credentials, get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2.rfc6749 import errors
+
+
+class PreservationTest(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = self.DEFAULT_REDIRECT_URI
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+
+ def set_state(self, state):
+ def set_request_state(client_id, code, client, request):
+ request.state = state
+ return True
+ return set_request_state
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_state_preservation(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.validator.validate_code.side_effect = self.set_state('xyz')
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['state'], 'xyz')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz')
+
+ def test_redirect_uri_preservation(self):
+ auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
+ redirect_uri = 'http://i.b/path'
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + '&response_type=code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(redirect_uri))
+
+ # confirm_redirect_uri should return false if the redirect uri
+ # was given in the authorization but not in the token request.
+ self.validator.confirm_redirect_uri.return_value = False
+ code = get_query_credentials(h['Location'])['code'][0]
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['error'], 'access_denied')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + '&response_type=token', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(redirect_uri))
+
+ def test_invalid_redirect_uri(self):
+ auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
+ self.validator.validate_redirect_uri.return_value = False
+
+ # authorization grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.create_authorization_response,
+ auth_uri + '&response_type=code', scopes=['random'])
+
+ # implicit grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.create_authorization_response,
+ auth_uri + '&response_type=token', scopes=['random'])
+
+ def test_default_uri(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc'
+
+ self.validator.get_default_redirect_uri.return_value = None
+
+ # authorization grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.create_authorization_response,
+ auth_uri + '&response_type=code', scopes=['random'])
+
+ # implicit grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.create_authorization_response,
+ auth_uri + '&response_type=token', scopes=['random'])
diff --git a/tests/oauth2/rfc6749/endpoints/test_error_responses.py b/tests/oauth2/rfc6749/endpoints/test_error_responses.py
new file mode 100644
index 0000000..5f65de3
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_error_responses.py
@@ -0,0 +1,372 @@
+"""Ensure the correct error responses are returned for all defined error types.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+from oauthlib.oauth2.rfc6749 import errors
+
+
+class ErrorResponseTest(TestCase):
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = None
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_invalid_redirect_uri(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=wrong'
+ # Authorization code grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_missing_redirect_uri(self):
+ uri = 'https://example.com/authorize?client_id=foo'
+ # Authorization code grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_mismatching_redirect_uri(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+ # Authorization code grant
+ self.validator.validate_redirect_uri.return_value = False
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_missing_client_id(self):
+ uri = 'https://example.com/authorize?redirect_uri=https%3A%2F%2Fi.b%2Fback'
+ # Authorization code grant
+ self.validator.validate_redirect_uri.return_value = False
+ self.assertRaises(errors.MissingClientIdError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingClientIdError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingClientIdError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingClientIdError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_invalid_client_id(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+ # Authorization code grant
+ self.validator.validate_client_id.return_value = False
+ self.assertRaises(errors.InvalidClientIdError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidClientIdError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidClientIdError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidClientIdError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_invalid_request(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ token_uri = 'https://i.b/token'
+ invalid_uris = [
+ # Duplicate parameters
+ 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}',
+ # Missing response type
+ 'https://i.b/auth?client_id=foo',
+ ]
+
+ # Authorization code grant
+ for uri in invalid_uris:
+ self.assertRaises(errors.InvalidRequestError,
+ self.web.validate_authorization_request,
+ uri.format('code'))
+ h, _, s = self.web.create_authorization_response(
+ uri.format('code'), scopes=['foo'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertIn('error=invalid_request', h['Location'])
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo'
+ ]
+ for body in invalid_bodies:
+ _, body, _ = self.web.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ # Implicit grant
+ for uri in invalid_uris:
+ self.assertRaises(errors.InvalidRequestError,
+ self.mobile.validate_authorization_request,
+ uri.format('token'))
+ h, _, s = self.mobile.create_authorization_response(
+ uri.format('token'), scopes=['foo'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertIn('error=invalid_request', h['Location'])
+
+ # Password credentials grant
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=password&username=foo&username=bar&password=baz'
+ # missing username
+ 'grant_type=password&password=baz'
+ # missing password
+ 'grant_type=password&username=foo'
+ ]
+ self.validator.authenticate_client.side_effect = self.set_client
+ for body in invalid_bodies:
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ # Client credentials grant
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=client_credentials&scope=foo&scope=bar'
+ ]
+ for body in invalid_bodies:
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ def test_unauthorized_client(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.validate_grant_type.return_value = False
+ self.validator.validate_response_type.return_value = False
+ self.validator.authenticate_client.side_effect = self.set_client
+ token_uri = 'https://i.b/token'
+
+ # Authorization code grant
+ self.assertRaises(errors.UnauthorizedClientError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=code&client_id=foo')
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ # Implicit grant
+ self.assertRaises(errors.UnauthorizedClientError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=token&client_id=foo')
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body='grant_type=client_credentials')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ def test_access_denied(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.validator.confirm_redirect_uri.return_value = False
+ token_uri = 'https://i.b/token'
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('access_denied', json.loads(body)['error'])
+
+ def test_unsupported_response_type(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ # Authorization code grant
+ self.assertRaises(errors.UnsupportedResponseTypeError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=foo&client_id=foo')
+
+ # Implicit grant
+ self.assertRaises(errors.UnsupportedResponseTypeError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=foo&client_id=foo')
+
+ def test_invalid_scope(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.validate_scopes.return_value = False
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.assertRaises(errors.InvalidScopeError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=code&client_id=foo')
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidScopeError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=token&client_id=foo')
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_scope', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual('invalid_scope', json.loads(body)['error'])
+
+ def test_server_error(self):
+ def raise_error(*args, **kwargs):
+ raise ValueError()
+
+ self.validator.validate_client_id.side_effect = raise_error
+ self.validator.authenticate_client.side_effect = raise_error
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ # Authorization code grant
+ self.web.catch_errors = True
+ _, _, s = self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+ _, _, s = self.web.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+
+ # Implicit grant
+ self.mobile.catch_errors = True
+ _, _, s = self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+
+ # Password credentials grant
+ self.legacy.catch_errors = True
+ _, _, s = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=foo')
+ self.assertEqual(s, 500)
+
+ # Client credentials grant
+ self.backend.catch_errors = True
+ _, _, s = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual(s, 500)
+
+ def test_temporarily_unavailable(self):
+ # Authorization code grant
+ self.web.available = False
+ _, _, s = self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+ _, _, s = self.web.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+
+ # Implicit grant
+ self.mobile.available = False
+ _, _, s = self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+
+ # Password credentials grant
+ self.legacy.available = False
+ _, _, s = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=foo')
+ self.assertEqual(s, 503)
+
+ # Client credentials grant
+ self.backend.available = False
+ _, _, s = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual(s, 503)
+
+ def test_invalid_client(self):
+ self.validator.authenticate_client.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ def test_invalid_grant(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.validator.validate_code.return_value = False
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_grant', json.loads(body)['error'])
+
+ # Password credentials grant
+ self.validator.validate_user.return_value = False
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_grant', json.loads(body)['error'])
+
+ def test_unsupported_grant_type(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=bar&code=foo')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=bar&username=foo&password=bar')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response('https://i.b/token',
+ body='grant_type=bar')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
diff --git a/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py b/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py
new file mode 100644
index 0000000..b43fa02
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py
@@ -0,0 +1,69 @@
+"""Ensure extra credentials can be supplied for inclusion in tokens.
+"""
+from __future__ import absolute_import, unicode_literals
+import mock
+
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class ExtraCredentialsTest(TestCase):
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_post_authorization_request(self):
+ def save_code(client_id, token, request):
+ self.assertEqual('creds', request.extra)
+
+ def save_token(token, request):
+ self.assertEqual('creds', request.extra)
+
+ # Authorization code grant
+ self.validator.save_authorization_code.side_effect = save_code
+ self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'],
+ credentials={'extra': 'creds'})
+
+ # Implicit grant
+ self.validator.save_bearer_token.side_effect = save_token
+ self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'],
+ credentials={'extra': 'creds'})
+
+ def test_token_request(self):
+ def save_token(token, request):
+ self.assertIn('extra', token)
+
+ self.validator.save_bearer_token.side_effect = save_token
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ credentials={'extra': 'creds'})
+
+ # Password credentials grant
+ self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar',
+ credentials={'extra': 'creds'})
+
+ # Client credentials grant
+ self.backend.create_token_response('https://i.b/token',
+ body='grant_type=client_credentials',
+ credentials={'extra': 'creds'})
diff --git a/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py b/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py
new file mode 100644
index 0000000..6b3137a
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py
@@ -0,0 +1,106 @@
+"""Ensure all tokens are associated with a resource owner.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_query_credentials, get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class ResourceOwnerAssociationTest(TestCase):
+
+ auth_uri = 'http://example.com/path?client_id=abc'
+ token_uri = 'http://example.com/path'
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_user(self, client_id, code, client, request):
+ request.user = 'test'
+ return True
+
+ def set_user_from_username(self, username, password, client, request):
+ request.user = 'test'
+ return True
+
+ def set_user_from_credentials(self, request):
+ request.user = 'test'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def inspect_client(self, request, refresh_token=False):
+ if not request.user:
+ raise ValueError()
+ return 'abc'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.mobile = MobileApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.legacy = LegacyApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.backend = BackendApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+
+ def test_web_application(self):
+ # TODO: code generator + intercept test
+ h, _, s = self.web.create_authorization_response(
+ self.auth_uri + '&response_type=code',
+ credentials={'user': 'test'}, scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.assertRaises(ValueError,
+ self.web.create_token_response, self.token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+
+ self.validator.validate_code.side_effect = self.set_user
+ _, body, _ = self.web.create_token_response(self.token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
+
+ def test_mobile_application(self):
+ self.assertRaises(ValueError,
+ self.mobile.create_authorization_response,
+ self.auth_uri + '&response_type=token')
+
+ h, _, s = self.mobile.create_authorization_response(
+ self.auth_uri + '&response_type=token',
+ credentials={'user': 'test'}, scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc')
+
+ def test_legacy_application(self):
+ body = 'grant_type=password&username=abc&password=secret'
+ self.assertRaises(ValueError,
+ self.legacy.create_token_response,
+ self.token_uri, body=body)
+
+ self.validator.validate_user.side_effect = self.set_user_from_username
+ _, body, _ = self.legacy.create_token_response(
+ self.token_uri, body=body)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
+
+ def test_backend_application(self):
+ body = 'grant_type=client_credentials'
+ self.assertRaises(ValueError,
+ self.backend.create_token_response,
+ self.token_uri, body=body)
+
+ self.validator.authenticate_client.side_effect = self.set_user_from_credentials
+ _, body, _ = self.backend.create_token_response(
+ self.token_uri, body=body)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
new file mode 100644
index 0000000..f48a4f9
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
@@ -0,0 +1,182 @@
+"""Ensure scope is preserved across authorization.
+
+Fairly trivial in all grants except the Authorization Code Grant where scope
+need to be persisted temporarily in an authorization code.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_query_credentials, get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class TestScopeHandling(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def set_scopes(self, scopes):
+ def set_request_scopes(client_id, code, client, request):
+ request.scopes = scopes
+ return True
+ return set_request_scopes
+
+ def set_user(self, request):
+ request.user = 'foo'
+ request.client_id = 'bar'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_scope_extraction(self):
+ scopes = (
+ ('images', ['images']),
+ ('images+videos', ['images', 'videos']),
+ ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']),
+ ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']),
+ ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']),
+ ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']),
+ )
+
+ uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s'
+ for scope, correct_scopes in scopes:
+ scopes, _ = self.web.validate_authorization_request(
+ uri % (scope, 'code'))
+ self.assertItemsEqual(scopes, correct_scopes)
+ scopes, _ = self.mobile.validate_authorization_request(
+ uri % (scope, 'token'))
+ self.assertItemsEqual(scopes, correct_scopes)
+
+ def test_scope_preservation(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ decoded_scope = 'pics http://a.b/videos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=decoded_scope.split(' '))
+ self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' '))
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=decoded_scope.split(' '))
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
+
+ # resource owner password credentials grant
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # client credentials grant
+ body = 'grant_type=client_credentials&scope=%s'
+ self.validator.authenticate_client.side_effect = self.set_user
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ def test_scope_changed(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ scopes = ['images', 'http://a.b/videos']
+ decoded_scope = 'images http://a.b/videos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=scopes)
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.validator.validate_code.side_effect = self.set_scopes(scopes)
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # implicit grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=scopes)
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
+
+ # resource owner password credentials grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # client credentials grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ self.validator.authenticate_client.side_effect = self.set_user
+ body = 'grant_type=client_credentials&scope=%s'
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ def test_invalid_scope(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ self.validator.validate_scopes.return_value = False
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=['invalid'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ error = get_query_credentials(h['Location'])['error'][0]
+ self.assertEqual(error, 'invalid_scope')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=['invalid'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ error = get_fragment_credentials(h['Location'])['error'][0]
+ self.assertEqual(error, 'invalid_scope')
+
+ # resource owner password credentials grant
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['error'], 'invalid_scope')
+
+ # client credentials grant
+ self.validator.authenticate_client.side_effect = self.set_user
+ body = 'grant_type=client_credentials&scope=%s'
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['error'], 'invalid_scope')
diff --git a/tests/oauth2/rfc6749/endpoints/test_utils.py b/tests/oauth2/rfc6749/endpoints/test_utils.py
new file mode 100644
index 0000000..6b7cff8
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_utils.py
@@ -0,0 +1,14 @@
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+
+def get_query_credentials(uri):
+ return urlparse.parse_qs(urlparse.urlparse(uri).query,
+ keep_blank_values=True)
+
+
+def get_fragment_credentials(uri):
+ return urlparse.parse_qs(urlparse.urlparse(uri).fragment,
+ keep_blank_values=True)
diff --git a/tests/oauth2/rfc6749/grant_types/__init__.py b/tests/oauth2/rfc6749/grant_types/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/oauth2/rfc6749/grant_types/__init__.py
diff --git a/tests/oauth2/rfc6749/grant_types/test_authorization_code.py b/tests/oauth2/rfc6749/grant_types/test_authorization_code.py
new file mode 100644
index 0000000..a9c3e51
--- /dev/null
+++ b/tests/oauth2/rfc6749/grant_types/test_authorization_code.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+import json
+import mock
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.errors import UnsupportedGrantTypeError
+from oauthlib.oauth2.rfc6749.errors import InvalidRequestError
+from oauthlib.oauth2.rfc6749.errors import InvalidClientError
+from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
+from oauthlib.oauth2.rfc6749.grant_types import AuthorizationCodeGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+
+class AuthorizationCodeGrantTest(TestCase):
+
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'world')
+ self.request.expires_in = 1800
+ self.request.client = 'batman'
+ self.request.client_id = 'abcdef'
+ self.request.code = '1234'
+ self.request.response_type = 'code'
+ self.request.grant_type = 'authorization_code'
+
+ self.request_state = Request('http://a.b/path')
+ self.request_state.state = 'abc'
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = self.set_client
+ self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_create_authorization_grant(self):
+ grant = self.auth.create_authorization_code(self.request)
+ self.assertIn('code', grant)
+
+ grant = self.auth.create_authorization_code(self.request_state)
+ self.assertIn('code', grant)
+ self.assertIn('state', grant)
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator)
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertIn('access_token', token)
+ self.assertIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+
+ def test_validate_token_request(self):
+ mock_validator = mock.MagicMock()
+ auth = AuthorizationCodeGrant(request_validator=mock_validator)
+ request = Request('http://a.b/path')
+ self.assertRaises(UnsupportedGrantTypeError,
+ auth.validate_token_request, request)
+
+ request.grant_type = 'authorization_code'
+ self.assertRaises(InvalidRequestError,
+ auth.validate_token_request, request)
+
+ mock_validator.authenticate_client.return_value = False
+ mock_validator.authenticate_client_id.return_value = False
+ request.code = 'waffles'
+ self.assertRaises(InvalidClientError,
+ auth.validate_token_request, request)
+
+ request.client = 'batman'
+ mock_validator.authenticate_client = self.set_client
+ mock_validator.validate_code.return_value = False
+ self.assertRaises(InvalidGrantError,
+ auth.validate_token_request, request)
diff --git a/tests/oauth2/rfc6749/grant_types/test_client_credentials.py b/tests/oauth2/rfc6749/grant_types/test_client_credentials.py
new file mode 100644
index 0000000..80b92d3
--- /dev/null
+++ b/tests/oauth2/rfc6749/grant_types/test_client_credentials.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+import json
+import mock
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.grant_types import ClientCredentialsGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+
+class ClientCredentialsGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.grant_type = 'client_credentials'
+ self.request.client = mock_client
+ self.request.scopes = ('mocked', 'scopes')
+ self.mock_validator = mock.MagicMock()
+ self.auth = ClientCredentialsGrant(
+ request_validator=self.mock_validator)
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+
+ def test_error_response(self):
+ pass
+
+ def test_validate_token_response(self):
+ # wrong grant type, scope
+ pass
diff --git a/tests/oauth2/rfc6749/grant_types/test_implicit.py b/tests/oauth2/rfc6749/grant_types/test_implicit.py
new file mode 100644
index 0000000..df30c9a
--- /dev/null
+++ b/tests/oauth2/rfc6749/grant_types/test_implicit.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+import mock
+from oauthlib import common
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.grant_types import ImplicitGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+
+class ImplicitGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'world')
+ self.request.client = mock_client
+ self.request.client_id = 'abcdef'
+ self.request.response_type = 'token'
+ self.request.state = 'xyz'
+ self.request.redirect_uri = 'https://b.c/p'
+
+ self.mock_validator = mock.MagicMock()
+ self.auth = ImplicitGrant(request_validator=self.mock_validator)
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator, expires_in=1800)
+ orig_generate_token = common.generate_token
+ self.addCleanup(setattr, common, 'generate_token', orig_generate_token)
+ common.generate_token = lambda *args, **kwargs: '1234'
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ correct_uri = 'https://b.c/p#access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world'
+ self.assertEqual(status_code, 302)
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], correct_uri, parse_fragment=True)
+
+ def test_error_response(self):
+ pass
diff --git a/tests/oauth2/rfc6749/grant_types/test_refresh_token.py b/tests/oauth2/rfc6749/grant_types/test_refresh_token.py
new file mode 100644
index 0000000..25c261c
--- /dev/null
+++ b/tests/oauth2/rfc6749/grant_types/test_refresh_token.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+import json
+import mock
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.grant_types import RefreshTokenGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+
+class RefreshTokenGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.grant_type = 'refresh_token'
+ self.request.refresh_token = 'lsdkfhj230'
+ self.request.client = mock_client
+ self.request.scope = 'foo'
+ self.mock_validator = mock.MagicMock()
+ self.auth = RefreshTokenGrant(
+ request_validator=self.mock_validator)
+
+ def test_create_token_response(self):
+ self.mock_validator.get_original_scopes.return_value = ['foo', 'bar']
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertEqual(token['scope'], 'foo')
+
+ def test_create_token_inherit_scope(self):
+ self.request.scope = None
+ self.mock_validator.get_original_scopes.return_value = ['foo', 'bar']
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertEqual(token['scope'], 'foo bar')
+
+ def test_invalid_scope(self):
+ self.mock_validator.get_original_scopes.return_value = ['baz']
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(token['error'], 'invalid_scope')
+ self.assertEqual(status_code, 401)
+
+ def test_invalid_token(self):
+ self.mock_validator.validate_refresh_token.return_value = False
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(token['error'], 'invalid_grant')
+ self.assertEqual(status_code, 400)
+
+ def test_invalid_client(self):
+ self.mock_validator.authenticate_client.return_value = False
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(token['error'], 'invalid_client')
+ self.assertEqual(status_code, 401)
diff --git a/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py b/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py
new file mode 100644
index 0000000..aaea440
--- /dev/null
+++ b/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+import json
+import mock
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.grant_types import ResourceOwnerPasswordCredentialsGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+
+class ResourceOwnerPasswordCredentialsGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.grant_type = 'password'
+ self.request.username = 'john'
+ self.request.password = 'doe'
+ self.request.client = mock_client
+ self.request.scopes = ('mocked', 'scopes')
+ self.mock_validator = mock.MagicMock()
+ self.auth = ResourceOwnerPasswordCredentialsGrant(
+ request_validator=self.mock_validator)
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('refresh_token', token)
+
+ def test_error_response(self):
+ pass
+
+ def test_scopes(self):
+ pass
diff --git a/tests/oauth2/rfc6749/test_grant_types.py b/tests/oauth2/rfc6749/test_grant_types.py
deleted file mode 100644
index 6aee0e8..0000000
--- a/tests/oauth2/rfc6749/test_grant_types.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import absolute_import, unicode_literals
-from ...unittest import TestCase
-
-import json
-import mock
-from oauthlib import common
-from oauthlib.common import Request
-from oauthlib.oauth2.rfc6749.errors import UnsupportedGrantTypeError
-from oauthlib.oauth2.rfc6749.errors import InvalidRequestError
-from oauthlib.oauth2.rfc6749.errors import InvalidClientError
-from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
-from oauthlib.oauth2.rfc6749.grant_types import AuthorizationCodeGrant
-from oauthlib.oauth2.rfc6749.grant_types import ImplicitGrant
-from oauthlib.oauth2.rfc6749.grant_types import ResourceOwnerPasswordCredentialsGrant
-from oauthlib.oauth2.rfc6749.grant_types import ClientCredentialsGrant
-from oauthlib.oauth2.rfc6749.grant_types import RefreshTokenGrant
-from oauthlib.oauth2.rfc6749.tokens import BearerToken
-
-
-class RequestValidatorTest(TestCase):
-
- def test_client_id(self):
- pass
-
- def test_client(self):
- pass
-
- def test_response_type(self):
- pass
-
- def test_scopes(self):
- pass
-
- def test_redirect_uri(self):
- pass
-
-
-class AuthorizationCodeGrantTest(TestCase):
-
- def setUp(self):
- self.request = Request('http://a.b/path')
- self.request.scopes = ('hello', 'world')
- self.request.expires_in = 1800
- self.request.client = 'batman'
- self.request.client_id = 'abcdef'
- self.request.code = '1234'
- self.request.response_type = 'code'
- self.request.grant_type = 'authorization_code'
-
- self.request_state = Request('http://a.b/path')
- self.request_state.state = 'abc'
-
- self.mock_validator = mock.MagicMock()
- self.mock_validator.authenticate_client.side_effect = self.set_client
- self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def test_create_authorization_grant(self):
- grant = self.auth.create_authorization_code(self.request)
- self.assertIn('code', grant)
-
- grant = self.auth.create_authorization_code(self.request_state)
- self.assertIn('code', grant)
- self.assertIn('state', grant)
-
- def test_create_token_response(self):
- bearer = BearerToken(self.mock_validator)
- h, token, s = self.auth.create_token_response(self.request, bearer)
- token = json.loads(token)
- self.assertIn('access_token', token)
- self.assertIn('refresh_token', token)
- self.assertIn('expires_in', token)
- self.assertIn('scope', token)
-
- def test_validate_token_request(self):
- mock_validator = mock.MagicMock()
- auth = AuthorizationCodeGrant(request_validator=mock_validator)
- request = Request('http://a.b/path')
- self.assertRaises(UnsupportedGrantTypeError,
- auth.validate_token_request, request)
-
- request.grant_type = 'authorization_code'
- self.assertRaises(InvalidRequestError,
- auth.validate_token_request, request)
-
- mock_validator.authenticate_client.return_value = False
- mock_validator.authenticate_client_id.return_value = False
- request.code = 'waffles'
- self.assertRaises(InvalidClientError,
- auth.validate_token_request, request)
-
- request.client = 'batman'
- mock_validator.authenticate_client = self.set_client
- mock_validator.validate_code.return_value = False
- self.assertRaises(InvalidGrantError,
- auth.validate_token_request, request)
-
-
-class ImplicitGrantTest(TestCase):
-
- def setUp(self):
- mock_client = mock.MagicMock()
- mock_client.user.return_value = 'mocked user'
- self.request = Request('http://a.b/path')
- self.request.scopes = ('hello', 'world')
- self.request.client = mock_client
- self.request.client_id = 'abcdef'
- self.request.response_type = 'token'
- self.request.state = 'xyz'
- self.request.redirect_uri = 'https://b.c/p'
-
- self.mock_validator = mock.MagicMock()
- self.auth = ImplicitGrant(request_validator=self.mock_validator)
-
- def test_create_token_response(self):
- bearer = BearerToken(self.mock_validator, expires_in=1800)
- orig_generate_token = common.generate_token
- self.addCleanup(setattr, common, 'generate_token', orig_generate_token)
- common.generate_token = lambda *args, **kwargs: '1234'
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- correct_uri = 'https://b.c/p#access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world'
- self.assertEqual(status_code, 302)
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], correct_uri, parse_fragment=True)
-
- def test_error_response(self):
- pass
-
-
-class ResourceOwnerPasswordCredentialsGrantTest(TestCase):
-
- def setUp(self):
- mock_client = mock.MagicMock()
- mock_client.user.return_value = 'mocked user'
- self.request = Request('http://a.b/path')
- self.request.grant_type = 'password'
- self.request.username = 'john'
- self.request.password = 'doe'
- self.request.client = mock_client
- self.request.scopes = ('mocked', 'scopes')
- self.mock_validator = mock.MagicMock()
- self.auth = ResourceOwnerPasswordCredentialsGrant(
- request_validator=self.mock_validator)
-
- def test_create_token_response(self):
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertIn('access_token', token)
- self.assertIn('token_type', token)
- self.assertIn('expires_in', token)
- self.assertIn('refresh_token', token)
-
- def test_error_response(self):
- pass
-
- def test_scopes(self):
- pass
-
-
-class ClientCredentialsGrantTest(TestCase):
-
- def setUp(self):
- mock_client = mock.MagicMock()
- mock_client.user.return_value = 'mocked user'
- self.request = Request('http://a.b/path')
- self.request.grant_type = 'client_credentials'
- self.request.client = mock_client
- self.request.scopes = ('mocked', 'scopes')
- self.mock_validator = mock.MagicMock()
- self.auth = ClientCredentialsGrant(
- request_validator=self.mock_validator)
-
- def test_create_token_response(self):
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertIn('access_token', token)
- self.assertIn('token_type', token)
- self.assertIn('expires_in', token)
-
- def test_error_response(self):
- pass
-
- def test_validate_token_response(self):
- # wrong grant type, scope
- pass
-
-
-class RefreshTokenGrantTest(TestCase):
-
- def setUp(self):
- mock_client = mock.MagicMock()
- mock_client.user.return_value = 'mocked user'
- self.request = Request('http://a.b/path')
- self.request.grant_type = 'refresh_token'
- self.request.refresh_token = 'lsdkfhj230'
- self.request.client = mock_client
- self.request.scope = 'foo'
- self.mock_validator = mock.MagicMock()
- self.auth = RefreshTokenGrant(
- request_validator=self.mock_validator)
-
- def test_create_token_response(self):
- self.mock_validator.get_original_scopes.return_value = ['foo', 'bar']
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertIn('access_token', token)
- self.assertIn('token_type', token)
- self.assertIn('expires_in', token)
- self.assertEqual(token['scope'], 'foo')
-
- def test_create_token_inherit_scope(self):
- self.request.scope = None
- self.mock_validator.get_original_scopes.return_value = ['foo', 'bar']
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertIn('access_token', token)
- self.assertIn('token_type', token)
- self.assertIn('expires_in', token)
- self.assertEqual(token['scope'], 'foo bar')
-
- def test_invalid_scope(self):
- self.mock_validator.get_original_scopes.return_value = ['baz']
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertEqual(token['error'], 'invalid_scope')
- self.assertEqual(status_code, 401)
-
- def test_invalid_token(self):
- self.mock_validator.validate_refresh_token.return_value = False
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertEqual(token['error'], 'invalid_grant')
- self.assertEqual(status_code, 400)
-
- def test_invalid_client(self):
- self.mock_validator.authenticate_client.return_value = False
- bearer = BearerToken(self.mock_validator)
- headers, body, status_code = self.auth.create_token_response(
- self.request, bearer)
- token = json.loads(body)
- self.assertEqual(token['error'], 'invalid_client')
- self.assertEqual(status_code, 401)
diff --git a/tests/oauth2/rfc6749/test_request_validator.py b/tests/oauth2/rfc6749/test_request_validator.py
new file mode 100644
index 0000000..1adbe70
--- /dev/null
+++ b/tests/oauth2/rfc6749/test_request_validator.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ...unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+
+class RequestValidatorTest(TestCase):
+
+ def test_method_contracts(self):
+ v = RequestValidator()
+ self.assertRaises(NotImplementedError, v.authenticate_client, 'r')
+ self.assertRaises(NotImplementedError, v.authenticate_client_id,
+ 'client_id', 'r')
+ self.assertRaises(NotImplementedError, v.confirm_redirect_uri,
+ 'client_id', 'code', 'redirect_uri', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.get_default_redirect_uri,
+ 'client_id', 'request')
+ self.assertRaises(NotImplementedError, v.get_default_scopes,
+ 'client_id', 'request')
+ self.assertRaises(NotImplementedError, v.get_original_scopes,
+ 'refresh_token', 'request')
+ self.assertRaises(NotImplementedError, v.invalidate_authorization_code,
+ 'client_id', 'code', 'request')
+ self.assertRaises(NotImplementedError, v.save_authorization_code,
+ 'client_id', 'code', 'request')
+ self.assertRaises(NotImplementedError, v.save_bearer_token,
+ 'token', 'request')
+ self.assertRaises(NotImplementedError, v.validate_bearer_token,
+ 'token', 'scopes', 'request')
+ self.assertRaises(NotImplementedError, v.validate_client_id,
+ 'client_id', 'request')
+ self.assertRaises(NotImplementedError, v.validate_code,
+ 'client_id', 'code', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_grant_type,
+ 'client_id', 'grant_type', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_redirect_uri,
+ 'client_id', 'redirect_uri', 'request')
+ self.assertRaises(NotImplementedError, v.validate_refresh_token,
+ 'refresh_token', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_response_type,
+ 'client_id', 'response_type', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_scopes,
+ 'client_id', 'scopes', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_user,
+ 'username', 'password', 'client', 'request')
diff --git a/tests/oauth2/rfc6749/test_servers.py b/tests/oauth2/rfc6749/test_servers.py
deleted file mode 100644
index 1a26d7c..0000000
--- a/tests/oauth2/rfc6749/test_servers.py
+++ /dev/null
@@ -1,890 +0,0 @@
-
-"""Test for the various oauth 2 provider endpoints.
-
-The tests focus on shared functionality between the different endpoints to
-ensure consistency in both behaviour and provided interfaces.
-"""
-from __future__ import absolute_import, unicode_literals
-import json
-import mock
-try:
- import urlparse
-except ImportError:
- import urllib.parse as urlparse
-from ...unittest import TestCase
-
-from oauthlib.oauth2 import RequestValidator
-from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
-from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
-from oauthlib.oauth2.rfc6749 import errors
-
-
-def get_query_credentials(uri):
- return urlparse.parse_qs(urlparse.urlparse(uri).query,
- keep_blank_values=True)
-
-
-def get_fragment_credentials(uri):
- return urlparse.parse_qs(urlparse.urlparse(uri).fragment,
- keep_blank_values=True)
-
-
-class TestScopeHandling(TestCase):
-
- DEFAULT_REDIRECT_URI = 'http://i.b./path'
-
- def set_scopes(self, scopes):
- def set_request_scopes(client_id, code, client, request):
- request.scopes = scopes
- return True
- return set_request_scopes
-
- def set_user(self, request):
- request.user = 'foo'
- request.client_id = 'bar'
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def setUp(self):
- self.validator = mock.MagicMock(spec=RequestValidator)
- self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI
- self.validator.authenticate_client.side_effect = self.set_client
- self.web = WebApplicationServer(self.validator)
- self.mobile = MobileApplicationServer(self.validator)
- self.legacy = LegacyApplicationServer(self.validator)
- self.backend = BackendApplicationServer(self.validator)
-
- def test_scope_extraction(self):
- scopes = (
- ('images', ['images']),
- ('images+videos', ['images', 'videos']),
- ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']),
- ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']),
- ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']),
- ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']),
- )
-
- uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s'
- for scope, correct_scopes in scopes:
- scopes, _ = self.web.validate_authorization_request(
- uri % (scope, 'code'))
- self.assertItemsEqual(scopes, correct_scopes)
- scopes, _ = self.mobile.validate_authorization_request(
- uri % (scope, 'token'))
- self.assertItemsEqual(scopes, correct_scopes)
-
- def test_scope_preservation(self):
- scope = 'pics+http%3A%2f%2fa.b%2fvideos'
- decoded_scope = 'pics http://a.b/videos'
- auth_uri = 'http://example.com/path?client_id=abc&response_type='
- token_uri = 'http://example.com/path'
-
- # authorization grant
- h, _, s = self.web.create_authorization_response(
- auth_uri + 'code', scopes=decoded_scope.split(' '))
- self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' '))
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- code = get_query_credentials(h['Location'])['code'][0]
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=%s' % code)
- self.assertEqual(json.loads(body)['scope'], decoded_scope)
-
- # implicit grant
- h, _, s = self.mobile.create_authorization_response(
- auth_uri + 'token', scopes=decoded_scope.split(' '))
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
-
- # resource owner password credentials grant
- body = 'grant_type=password&username=abc&password=secret&scope=%s'
-
- _, body, _ = self.legacy.create_token_response(token_uri,
- body=body % scope)
- self.assertEqual(json.loads(body)['scope'], decoded_scope)
-
- # client credentials grant
- body = 'grant_type=client_credentials&scope=%s'
- self.validator.authenticate_client.side_effect = self.set_user
- _, body, _ = self.backend.create_token_response(token_uri,
- body=body % scope)
- self.assertEqual(json.loads(body)['scope'], decoded_scope)
-
- def test_scope_changed(self):
- scope = 'pics+http%3A%2f%2fa.b%2fvideos'
- scopes = ['images', 'http://a.b/videos']
- decoded_scope = 'images http://a.b/videos'
- auth_uri = 'http://example.com/path?client_id=abc&response_type='
- token_uri = 'http://example.com/path'
-
- # authorization grant
- h, _, s = self.web.create_authorization_response(
- auth_uri + 'code', scopes=scopes)
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- code = get_query_credentials(h['Location'])['code'][0]
- self.validator.validate_code.side_effect = self.set_scopes(scopes)
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=%s' % code)
- self.assertEqual(json.loads(body)['scope'], decoded_scope)
-
- # implicit grant
- self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
- h, _, s = self.mobile.create_authorization_response(
- auth_uri + 'token', scopes=scopes)
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
-
- # resource owner password credentials grant
- self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
- body = 'grant_type=password&username=abc&password=secret&scope=%s'
- _, body, _ = self.legacy.create_token_response(token_uri,
- body=body % scope)
- self.assertEqual(json.loads(body)['scope'], decoded_scope)
-
- # client credentials grant
- self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
- self.validator.authenticate_client.side_effect = self.set_user
- body = 'grant_type=client_credentials&scope=%s'
- _, body, _ = self.backend.create_token_response(token_uri,
- body=body % scope)
-
- self.assertEqual(json.loads(body)['scope'], decoded_scope)
-
- def test_invalid_scope(self):
- scope = 'pics+http%3A%2f%2fa.b%2fvideos'
- auth_uri = 'http://example.com/path?client_id=abc&response_type='
- token_uri = 'http://example.com/path'
-
- self.validator.validate_scopes.return_value = False
-
- # authorization grant
- h, _, s = self.web.create_authorization_response(
- auth_uri + 'code', scopes=['invalid'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- error = get_query_credentials(h['Location'])['error'][0]
- self.assertEqual(error, 'invalid_scope')
-
- # implicit grant
- h, _, s = self.mobile.create_authorization_response(
- auth_uri + 'token', scopes=['invalid'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- error = get_fragment_credentials(h['Location'])['error'][0]
- self.assertEqual(error, 'invalid_scope')
-
- # resource owner password credentials grant
- body = 'grant_type=password&username=abc&password=secret&scope=%s'
- _, body, _ = self.legacy.create_token_response(token_uri,
- body=body % scope)
- self.assertEqual(json.loads(body)['error'], 'invalid_scope')
-
- # client credentials grant
- self.validator.authenticate_client.side_effect = self.set_user
- body = 'grant_type=client_credentials&scope=%s'
- _, body, _ = self.backend.create_token_response(token_uri,
- body=body % scope)
- self.assertEqual(json.loads(body)['error'], 'invalid_scope')
-
-
-class PreservationTest(TestCase):
-
- def setUp(self):
- self.validator = mock.MagicMock(spec=RequestValidator)
- self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI
- self.validator.authenticate_client.side_effect = self.set_client
- self.web = WebApplicationServer(self.validator)
- self.mobile = MobileApplicationServer(self.validator)
-
- def set_state(self, state):
- def set_request_state(client_id, code, client, request):
- request.state = state
- return True
- return set_request_state
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def test_state_preservation(self):
- auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type='
- token_uri = 'http://example.com/path'
-
- # authorization grant
- h, _, s = self.web.create_authorization_response(
- auth_uri + 'code', scopes=['random'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- code = get_query_credentials(h['Location'])['code'][0]
- self.validator.validate_code.side_effect = self.set_state('xyz')
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=%s' % code)
- self.assertEqual(json.loads(body)['state'], 'xyz')
-
- # implicit grant
- h, _, s = self.mobile.create_authorization_response(
- auth_uri + 'token', scopes=['random'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz')
-
- def test_redirect_uri_preservation(self):
- auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
- redirect_uri = 'http://i.b/path'
- token_uri = 'http://example.com/path'
-
- # authorization grant
- h, _, s = self.web.create_authorization_response(
- auth_uri + '&response_type=code', scopes=['random'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertTrue(h['Location'].startswith(redirect_uri))
-
- # confirm_redirect_uri should return false if the redirect uri
- # was given in the authorization but not in the token request.
- self.validator.confirm_redirect_uri.return_value = False
- code = get_query_credentials(h['Location'])['code'][0]
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=%s' % code)
- self.assertEqual(json.loads(body)['error'], 'access_denied')
-
- # implicit grant
- h, _, s = self.mobile.create_authorization_response(
- auth_uri + '&response_type=token', scopes=['random'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertTrue(h['Location'].startswith(redirect_uri))
-
- def test_invalid_redirect_uri(self):
- auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
- self.validator.validate_redirect_uri.return_value = False
-
- # authorization grant
- self.assertRaises(errors.MismatchingRedirectURIError,
- self.web.create_authorization_response,
- auth_uri + '&response_type=code', scopes=['random'])
-
- # implicit grant
- self.assertRaises(errors.MismatchingRedirectURIError,
- self.mobile.create_authorization_response,
- auth_uri + '&response_type=token', scopes=['random'])
-
- def test_default_uri(self):
- auth_uri = 'http://example.com/path?state=xyz&client_id=abc'
-
- self.validator.get_default_redirect_uri.return_value = None
-
- # authorization grant
- self.assertRaises(errors.MissingRedirectURIError,
- self.web.create_authorization_response,
- auth_uri + '&response_type=code', scopes=['random'])
-
- # implicit grant
- self.assertRaises(errors.MissingRedirectURIError,
- self.mobile.create_authorization_response,
- auth_uri + '&response_type=token', scopes=['random'])
-
-
-class ClientAuthenticationTest(TestCase):
-
- def inspect_client(self, request, refresh_token=False):
- if not request.client or not request.client.client_id:
- raise ValueError()
- return 'abc'
-
- def setUp(self):
- self.validator = mock.MagicMock(spec=RequestValidator)
- self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
- self.web = WebApplicationServer(self.validator,
- token_generator=self.inspect_client)
- self.mobile = MobileApplicationServer(self.validator,
- token_generator=self.inspect_client)
- self.legacy = LegacyApplicationServer(self.validator,
- token_generator=self.inspect_client)
- self.backend = BackendApplicationServer(self.validator,
- token_generator=self.inspect_client)
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def set_client_id(self, client_id, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def set_username(self, username, password, client, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def test_client_id_authentication(self):
- token_uri = 'http://example.com/path'
-
- # authorization code grant
- self.validator.authenticate_client.return_value = False
- self.validator.authenticate_client_id.return_value = False
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=mock')
- self.assertEqual(json.loads(body)['error'], 'invalid_client')
-
- self.validator.authenticate_client_id.return_value = True
- self.validator.authenticate_client.side_effect = self.set_client
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=mock')
- self.assertIn('access_token', json.loads(body))
-
- # implicit grant
- auth_uri = 'http://example.com/path?client_id=abc&response_type=token'
- self.assertRaises(ValueError, self.mobile.create_authorization_response,
- auth_uri, scopes=['random'])
-
- self.validator.validate_client_id.side_effect = self.set_client_id
- h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random'])
- self.assertEqual(302, s)
- self.assertIn('Location', h)
- self.assertIn('access_token', get_fragment_credentials(h['Location']))
-
- def test_custom_authentication(self):
- token_uri = 'http://example.com/path'
-
- # authorization code grant
- self.assertRaises(NotImplementedError,
- self.web.create_token_response, token_uri,
- body='grant_type=authorization_code&code=mock')
-
- # password grant
- self.validator.authenticate_client.return_value = True
- self.assertRaises(NotImplementedError,
- self.legacy.create_token_response, token_uri,
- body='grant_type=password&username=abc&password=secret')
-
- # client credentials grant
- self.validator.authenticate_client.return_value = True
- self.assertRaises(NotImplementedError,
- self.backend.create_token_response, token_uri,
- body='grant_type=client_credentials')
-
-
-class ResourceOwnerAssociationTest(TestCase):
-
- auth_uri = 'http://example.com/path?client_id=abc'
- token_uri = 'http://example.com/path'
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def set_user(self, client_id, code, client, request):
- request.user = 'test'
- return True
-
- def set_user_from_username(self, username, password, client, request):
- request.user = 'test'
- return True
-
- def set_user_from_credentials(self, request):
- request.user = 'test'
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def inspect_client(self, request, refresh_token=False):
- if not request.user:
- raise ValueError()
- return 'abc'
-
- def setUp(self):
- self.validator = mock.MagicMock(spec=RequestValidator)
- self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
- self.validator.authenticate_client.side_effect = self.set_client
- self.web = WebApplicationServer(self.validator,
- token_generator=self.inspect_client)
- self.mobile = MobileApplicationServer(self.validator,
- token_generator=self.inspect_client)
- self.legacy = LegacyApplicationServer(self.validator,
- token_generator=self.inspect_client)
- self.backend = BackendApplicationServer(self.validator,
- token_generator=self.inspect_client)
-
- def test_web_application(self):
- # TODO: code generator + intercept test
- h, _, s = self.web.create_authorization_response(
- self.auth_uri + '&response_type=code',
- credentials={'user': 'test'}, scopes=['random'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- code = get_query_credentials(h['Location'])['code'][0]
- self.assertRaises(ValueError,
- self.web.create_token_response, self.token_uri,
- body='grant_type=authorization_code&code=%s' % code)
-
- self.validator.validate_code.side_effect = self.set_user
- _, body, _ = self.web.create_token_response(self.token_uri,
- body='grant_type=authorization_code&code=%s' % code)
- self.assertEqual(json.loads(body)['access_token'], 'abc')
-
- def test_mobile_application(self):
- self.assertRaises(ValueError,
- self.mobile.create_authorization_response,
- self.auth_uri + '&response_type=token')
-
- h, _, s = self.mobile.create_authorization_response(
- self.auth_uri + '&response_type=token',
- credentials={'user': 'test'}, scopes=['random'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc')
-
- def test_legacy_application(self):
- body = 'grant_type=password&username=abc&password=secret'
- self.assertRaises(ValueError,
- self.legacy.create_token_response,
- self.token_uri, body=body)
-
- self.validator.validate_user.side_effect = self.set_user_from_username
- _, body, _ = self.legacy.create_token_response(
- self.token_uri, body=body)
- self.assertEqual(json.loads(body)['access_token'], 'abc')
-
- def test_backend_application(self):
- body = 'grant_type=client_credentials'
- self.assertRaises(ValueError,
- self.backend.create_token_response,
- self.token_uri, body=body)
-
- self.validator.authenticate_client.side_effect = self.set_user_from_credentials
- _, body, _ = self.backend.create_token_response(
- self.token_uri, body=body)
- self.assertEqual(json.loads(body)['access_token'], 'abc')
-
-
-class ErrorResponseTest(TestCase):
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def setUp(self):
- self.validator = mock.MagicMock(spec=RequestValidator)
- self.validator.get_default_redirect_uri.return_value = None
- self.web = WebApplicationServer(self.validator)
- self.mobile = MobileApplicationServer(self.validator)
- self.legacy = LegacyApplicationServer(self.validator)
- self.backend = BackendApplicationServer(self.validator)
-
- def test_invalid_redirect_uri(self):
- uri = 'https://example.com/authorize?client_id=foo&redirect_uri=wrong'
- # Authorization code grant
- self.assertRaises(errors.InvalidRedirectURIError,
- self.web.validate_authorization_request, uri)
- self.assertRaises(errors.InvalidRedirectURIError,
- self.web.create_authorization_response, uri, scopes=['foo'])
-
- # Implicit grant
- self.assertRaises(errors.InvalidRedirectURIError,
- self.mobile.validate_authorization_request, uri)
- self.assertRaises(errors.InvalidRedirectURIError,
- self.mobile.create_authorization_response, uri, scopes=['foo'])
-
- def test_missing_redirect_uri(self):
- uri = 'https://example.com/authorize?client_id=foo'
- # Authorization code grant
- self.assertRaises(errors.MissingRedirectURIError,
- self.web.validate_authorization_request, uri)
- self.assertRaises(errors.MissingRedirectURIError,
- self.web.create_authorization_response, uri, scopes=['foo'])
-
- # Implicit grant
- self.assertRaises(errors.MissingRedirectURIError,
- self.mobile.validate_authorization_request, uri)
- self.assertRaises(errors.MissingRedirectURIError,
- self.mobile.create_authorization_response, uri, scopes=['foo'])
-
- def test_mismatching_redirect_uri(self):
- uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
- # Authorization code grant
- self.validator.validate_redirect_uri.return_value = False
- self.assertRaises(errors.MismatchingRedirectURIError,
- self.web.validate_authorization_request, uri)
- self.assertRaises(errors.MismatchingRedirectURIError,
- self.web.create_authorization_response, uri, scopes=['foo'])
-
- # Implicit grant
- self.assertRaises(errors.MismatchingRedirectURIError,
- self.mobile.validate_authorization_request, uri)
- self.assertRaises(errors.MismatchingRedirectURIError,
- self.mobile.create_authorization_response, uri, scopes=['foo'])
-
- def test_missing_client_id(self):
- uri = 'https://example.com/authorize?redirect_uri=https%3A%2F%2Fi.b%2Fback'
- # Authorization code grant
- self.validator.validate_redirect_uri.return_value = False
- self.assertRaises(errors.MissingClientIdError,
- self.web.validate_authorization_request, uri)
- self.assertRaises(errors.MissingClientIdError,
- self.web.create_authorization_response, uri, scopes=['foo'])
-
- # Implicit grant
- self.assertRaises(errors.MissingClientIdError,
- self.mobile.validate_authorization_request, uri)
- self.assertRaises(errors.MissingClientIdError,
- self.mobile.create_authorization_response, uri, scopes=['foo'])
-
- def test_invalid_client_id(self):
- uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
- # Authorization code grant
- self.validator.validate_client_id.return_value = False
- self.assertRaises(errors.InvalidClientIdError,
- self.web.validate_authorization_request, uri)
- self.assertRaises(errors.InvalidClientIdError,
- self.web.create_authorization_response, uri, scopes=['foo'])
-
- # Implicit grant
- self.assertRaises(errors.InvalidClientIdError,
- self.mobile.validate_authorization_request, uri)
- self.assertRaises(errors.InvalidClientIdError,
- self.mobile.create_authorization_response, uri, scopes=['foo'])
-
- def test_invalid_request(self):
- self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
- token_uri = 'https://i.b/token'
- invalid_uris = [
- # Duplicate parameters
- 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}',
- # Missing response type
- 'https://i.b/auth?client_id=foo',
- ]
-
- # Authorization code grant
- for uri in invalid_uris:
- self.assertRaises(errors.InvalidRequestError,
- self.web.validate_authorization_request,
- uri.format('code'))
- h, _, s = self.web.create_authorization_response(
- uri.format('code'), scopes=['foo'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertIn('error=invalid_request', h['Location'])
- invalid_bodies = [
- # duplicate params
- 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo'
- ]
- for body in invalid_bodies:
- _, body, _ = self.web.create_token_response(token_uri,
- body=body)
- self.assertEqual('invalid_request', json.loads(body)['error'])
-
- # Implicit grant
- for uri in invalid_uris:
- self.assertRaises(errors.InvalidRequestError,
- self.mobile.validate_authorization_request,
- uri.format('token'))
- h, _, s = self.mobile.create_authorization_response(
- uri.format('token'), scopes=['foo'])
- self.assertEqual(s, 302)
- self.assertIn('Location', h)
- self.assertIn('error=invalid_request', h['Location'])
-
- # Password credentials grant
- invalid_bodies = [
- # duplicate params
- 'grant_type=password&username=foo&username=bar&password=baz'
- # missing username
- 'grant_type=password&password=baz'
- # missing password
- 'grant_type=password&username=foo'
- ]
- self.validator.authenticate_client.side_effect = self.set_client
- for body in invalid_bodies:
- _, body, _ = self.legacy.create_token_response(token_uri,
- body=body)
- self.assertEqual('invalid_request', json.loads(body)['error'])
-
- # Client credentials grant
- invalid_bodies = [
- # duplicate params
- 'grant_type=client_credentials&scope=foo&scope=bar'
- ]
- for body in invalid_bodies:
- _, body, _ = self.backend.create_token_response(token_uri,
- body=body)
- self.assertEqual('invalid_request', json.loads(body)['error'])
-
- def test_unauthorized_client(self):
- self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
- self.validator.validate_grant_type.return_value = False
- self.validator.validate_response_type.return_value = False
- self.validator.authenticate_client.side_effect = self.set_client
- token_uri = 'https://i.b/token'
-
- # Authorization code grant
- self.assertRaises(errors.UnauthorizedClientError,
- self.web.validate_authorization_request,
- 'https://i.b/auth?response_type=code&client_id=foo')
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=foo')
- self.assertEqual('unauthorized_client', json.loads(body)['error'])
-
- # Implicit grant
- self.assertRaises(errors.UnauthorizedClientError,
- self.mobile.validate_authorization_request,
- 'https://i.b/auth?response_type=token&client_id=foo')
-
- # Password credentials grant
- _, body, _ = self.legacy.create_token_response(token_uri,
- body='grant_type=password&username=foo&password=bar')
- self.assertEqual('unauthorized_client', json.loads(body)['error'])
-
- # Client credentials grant
- _, body, _ = self.backend.create_token_response(token_uri,
- body='grant_type=client_credentials')
- self.assertEqual('unauthorized_client', json.loads(body)['error'])
-
- def test_access_denied(self):
- self.validator.authenticate_client.side_effect = self.set_client
- self.validator.confirm_redirect_uri.return_value = False
- token_uri = 'https://i.b/token'
- # Authorization code grant
- _, body, _ = self.web.create_token_response(token_uri,
- body='grant_type=authorization_code&code=foo')
- self.assertEqual('access_denied', json.loads(body)['error'])
-
- def test_unsupported_response_type(self):
- self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
-
- # Authorization code grant
- self.assertRaises(errors.UnsupportedResponseTypeError,
- self.web.validate_authorization_request,
- 'https://i.b/auth?response_type=foo&client_id=foo')
-
- # Implicit grant
- self.assertRaises(errors.UnsupportedResponseTypeError,
- self.mobile.validate_authorization_request,
- 'https://i.b/auth?response_type=foo&client_id=foo')
-
- def test_invalid_scope(self):
- self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
- self.validator.validate_scopes.return_value = False
- self.validator.authenticate_client.side_effect = self.set_client
-
- # Authorization code grant
- self.assertRaises(errors.InvalidScopeError,
- self.web.validate_authorization_request,
- 'https://i.b/auth?response_type=code&client_id=foo')
-
- # Implicit grant
- self.assertRaises(errors.InvalidScopeError,
- self.mobile.validate_authorization_request,
- 'https://i.b/auth?response_type=token&client_id=foo')
-
- # Password credentials grant
- _, body, _ = self.legacy.create_token_response(
- 'https://i.b/token',
- body='grant_type=password&username=foo&password=bar')
- self.assertEqual('invalid_scope', json.loads(body)['error'])
-
- # Client credentials grant
- _, body, _ = self.backend.create_token_response(
- 'https://i.b/token',
- body='grant_type=client_credentials')
- self.assertEqual('invalid_scope', json.loads(body)['error'])
-
- def test_server_error(self):
- def raise_error(*args, **kwargs):
- raise ValueError()
-
- self.validator.validate_client_id.side_effect = raise_error
- self.validator.authenticate_client.side_effect = raise_error
- self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
-
- # Authorization code grant
- self.web.catch_errors = True
- _, _, s = self.web.create_authorization_response(
- 'https://i.b/auth?client_id=foo&response_type=code',
- scopes=['foo'])
- self.assertEqual(s, 500)
- _, _, s = self.web.create_token_response(
- 'https://i.b/token',
- body='grant_type=authorization_code&code=foo',
- scopes=['foo'])
- self.assertEqual(s, 500)
-
- # Implicit grant
- self.mobile.catch_errors = True
- _, _, s = self.mobile.create_authorization_response(
- 'https://i.b/auth?client_id=foo&response_type=token',
- scopes=['foo'])
- self.assertEqual(s, 500)
-
- # Password credentials grant
- self.legacy.catch_errors = True
- _, _, s = self.legacy.create_token_response(
- 'https://i.b/token',
- body='grant_type=password&username=foo&password=foo')
- self.assertEqual(s, 500)
-
- # Client credentials grant
- self.backend.catch_errors = True
- _, _, s = self.backend.create_token_response(
- 'https://i.b/token',
- body='grant_type=client_credentials')
- self.assertEqual(s, 500)
-
- def test_temporarily_unavailable(self):
- # Authorization code grant
- self.web.available = False
- _, _, s = self.web.create_authorization_response(
- 'https://i.b/auth?client_id=foo&response_type=code',
- scopes=['foo'])
- self.assertEqual(s, 503)
- _, _, s = self.web.create_token_response(
- 'https://i.b/token',
- body='grant_type=authorization_code&code=foo',
- scopes=['foo'])
- self.assertEqual(s, 503)
-
- # Implicit grant
- self.mobile.available = False
- _, _, s = self.mobile.create_authorization_response(
- 'https://i.b/auth?client_id=foo&response_type=token',
- scopes=['foo'])
- self.assertEqual(s, 503)
-
- # Password credentials grant
- self.legacy.available = False
- _, _, s = self.legacy.create_token_response(
- 'https://i.b/token',
- body='grant_type=password&username=foo&password=foo')
- self.assertEqual(s, 503)
-
- # Client credentials grant
- self.backend.available = False
- _, _, s = self.backend.create_token_response(
- 'https://i.b/token',
- body='grant_type=client_credentials')
- self.assertEqual(s, 503)
-
- def test_invalid_client(self):
- self.validator.authenticate_client.return_value = False
- self.validator.authenticate_client_id.return_value = False
-
- # Authorization code grant
- _, body, _ = self.web.create_token_response('https://i.b/token',
- body='grant_type=authorization_code&code=foo')
- self.assertEqual('invalid_client', json.loads(body)['error'])
-
- # Password credentials grant
- _, body, _ = self.legacy.create_token_response('https://i.b/token',
- body='grant_type=password&username=foo&password=bar')
- self.assertEqual('invalid_client', json.loads(body)['error'])
-
- # Client credentials grant
- _, body, _ = self.legacy.create_token_response('https://i.b/token',
- body='grant_type=client_credentials')
- self.assertEqual('invalid_client', json.loads(body)['error'])
-
- def test_invalid_grant(self):
- self.validator.authenticate_client.side_effect = self.set_client
-
- # Authorization code grant
- self.validator.validate_code.return_value = False
- _, body, _ = self.web.create_token_response('https://i.b/token',
- body='grant_type=authorization_code&code=foo')
- self.assertEqual('invalid_grant', json.loads(body)['error'])
-
- # Password credentials grant
- self.validator.validate_user.return_value = False
- _, body, _ = self.legacy.create_token_response('https://i.b/token',
- body='grant_type=password&username=foo&password=bar')
- self.assertEqual('invalid_grant', json.loads(body)['error'])
-
- def test_unsupported_grant_type(self):
- self.validator.authenticate_client.side_effect = self.set_client
-
- # Authorization code grant
- _, body, _ = self.web.create_token_response('https://i.b/token',
- body='grant_type=bar&code=foo')
- self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
-
- # Password credentials grant
- _, body, _ = self.legacy.create_token_response('https://i.b/token',
- body='grant_type=bar&username=foo&password=bar')
- self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
-
- # Client credentials grant
- _, body, _ = self.backend.create_token_response('https://i.b/token',
- body='grant_type=bar')
- self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
-
-
-class ExtraCredentialsTest(TestCase):
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- def setUp(self):
- self.validator = mock.MagicMock(spec=RequestValidator)
- self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
- self.web = WebApplicationServer(self.validator)
- self.mobile = MobileApplicationServer(self.validator)
- self.legacy = LegacyApplicationServer(self.validator)
- self.backend = BackendApplicationServer(self.validator)
-
- def test_post_authorization_request(self):
- def save_code(client_id, token, request):
- self.assertEqual('creds', request.extra)
-
- def save_token(token, request):
- self.assertEqual('creds', request.extra)
-
- # Authorization code grant
- self.validator.save_authorization_code.side_effect = save_code
- self.web.create_authorization_response(
- 'https://i.b/auth?client_id=foo&response_type=code',
- scopes=['foo'],
- credentials={'extra': 'creds'})
-
- # Implicit grant
- self.validator.save_bearer_token.side_effect = save_token
- self.web.create_authorization_response(
- 'https://i.b/auth?client_id=foo&response_type=token',
- scopes=['foo'],
- credentials={'extra': 'creds'})
-
- def test_token_request(self):
- def save_token(token, request):
- self.assertIn('extra', token)
-
- self.validator.save_bearer_token.side_effect = save_token
- self.validator.authenticate_client.side_effect = self.set_client
-
- # Authorization code grant
- self.web.create_token_response('https://i.b/token',
- body='grant_type=authorization_code&code=foo',
- credentials={'extra': 'creds'})
-
- # Password credentials grant
- self.legacy.create_token_response('https://i.b/token',
- body='grant_type=password&username=foo&password=bar',
- credentials={'extra': 'creds'})
-
- # Client credentials grant
- self.backend.create_token_response('https://i.b/token',
- body='grant_type=client_credentials',
- credentials={'extra': 'creds'})
diff --git a/tests/oauth2/rfc6749/test_utils.py b/tests/oauth2/rfc6749/test_utils.py
index 6e713a7..ff7afd3 100644
--- a/tests/oauth2/rfc6749/test_utils.py
+++ b/tests/oauth2/rfc6749/test_utils.py
@@ -1,9 +1,13 @@
from __future__ import absolute_import, unicode_literals
+import datetime
import os
+
from ...unittest import TestCase
from oauthlib.oauth2.rfc6749.utils import escape, host_from_uri
+from oauthlib.oauth2.rfc6749.utils import generate_age
from oauthlib.oauth2.rfc6749.utils import is_secure_transport
+from oauthlib.oauth2.rfc6749.utils import params_from_uri
class UtilsTests(TestCase):
@@ -35,3 +39,12 @@ class UtilsTests(TestCase):
os.environ['DEBUG'] = '1'
self.assertTrue(is_secure_transport('http://example.com'))
del os.environ['DEBUG']
+
+ def test_params_from_uri(self):
+ self.assertEqual(params_from_uri('http://i.b/?foo=bar&g&scope=a+d'),
+ {'foo': 'bar', 'g': '', 'scope': ['a', 'd']})
+
+ def test_generate_age(self):
+ issue_time = datetime.datetime.now() - datetime.timedelta(
+ days=3, minutes=1, seconds=4)
+ self.assertGreater(float(generate_age(issue_time)), 259263.0)