summaryrefslogtreecommitdiff
path: root/tests/oauth1/rfc5849
diff options
context:
space:
mode:
authorIb Lundgren <ib.lundgren@gmail.com>2013-06-18 21:23:05 +0100
committerIb Lundgren <ib.lundgren@gmail.com>2013-06-18 21:23:05 +0100
commit4d627ce3e0f1ebe346052b8dcae92d04a42af105 (patch)
tree04109160e73ac178cacacf4cbbf8726edd02eef2 /tests/oauth1/rfc5849
parent2261e99ae65fafd03aed337bf100faa6942108e3 (diff)
downloadoauthlib-4d627ce3e0f1ebe346052b8dcae92d04a42af105.tar.gz
Base endpoint for parameter checking and signature verification. #95
Diffstat (limited to 'tests/oauth1/rfc5849')
-rw-r--r--tests/oauth1/rfc5849/endpoints/test_base.py357
1 files changed, 357 insertions, 0 deletions
diff --git a/tests/oauth1/rfc5849/endpoints/test_base.py b/tests/oauth1/rfc5849/endpoints/test_base.py
new file mode 100644
index 0000000..f175e6d
--- /dev/null
+++ b/tests/oauth1/rfc5849/endpoints/test_base.py
@@ -0,0 +1,357 @@
+from __future__ import unicode_literals, absolute_import
+
+from mock import MagicMock
+from re import sub
+from ....unittest import TestCase
+
+from oauthlib.common import safe_string_equals
+from oauthlib.oauth1 import Client, RequestValidator
+from oauthlib.oauth1.rfc5849 import errors, SIGNATURE_RSA, SIGNATURE_HMAC
+from oauthlib.oauth1.rfc5849 import SIGNATURE_PLAINTEXT
+from oauthlib.oauth1.rfc5849.endpoints import RequestTokenEndpoint, BaseEndpoint
+
+
+URLENCODED = {"Content-Type": "application/x-www-form-urlencoded"}
+
+
+class BaseEndpointTest(TestCase):
+
+ def setUp(self):
+ self.validator = MagicMock(spec=RequestValidator)
+ self.validator.allowed_signature_methods = ['HMAC-SHA1']
+ self.validator.timestamp_lifetime = 600
+ self.endpoint = RequestTokenEndpoint(self.validator)
+ self.client = Client('foo', callback_uri='https://c.b/cb')
+ self.uri, self.headers, self.body = self.client.sign(
+ 'https://i.b/request_token')
+
+ def test_ssl_enforcement(self):
+ uri, headers, _ = self.client.sign('http://i.b/request_token')
+ u, h, b, s = self.endpoint.create_request_token_response(
+ uri, headers=headers)
+ self.assertEqual(s, 400)
+ self.assertIn('insecure_transport_protocol', b)
+
+ def test_missing_parameters(self):
+ u, h, b, s = self.endpoint.create_request_token_response(self.uri)
+ self.assertEqual(s, 400)
+ self.assertIn('invalid_request', b)
+
+ def test_signature_methods(self):
+ headers = {}
+ headers['Authorization'] = self.headers['Authorization'].replace(
+ 'HMAC', 'RSA')
+ u, h, b, s = self.endpoint.create_request_token_response(
+ self.uri, headers=headers)
+ self.assertEqual(s, 400)
+ self.assertIn('invalid_signature_method', b)
+
+ def test_invalid_version(self):
+ headers = {}
+ headers['Authorization'] = self.headers['Authorization'].replace(
+ '1.0', '2.0')
+ u, h, b, s = self.endpoint.create_request_token_response(
+ self.uri, headers=headers)
+ self.assertEqual(s, 400)
+ self.assertIn('invalid_request', b)
+
+ def test_expired_timestamp(self):
+ headers = {}
+ for pattern in ('12345678901', '4567890123', '123456789K'):
+ headers['Authorization'] = sub('timestamp="\d*k?"',
+ 'timestamp="%s"' % pattern,
+ self.headers['Authorization'])
+ u, h, b, s = self.endpoint.create_request_token_response(
+ self.uri, headers=headers)
+ self.assertEqual(s, 400)
+ self.assertIn('invalid_request', b)
+
+ def test_client_key_check(self):
+ self.validator.check_client_key.return_value = False
+ u, h, b, s = self.endpoint.create_request_token_response(
+ self.uri, headers=self.headers)
+ self.assertEqual(s, 400)
+ self.assertIn('invalid_request', b)
+
+ def test_noncecheck(self):
+ self.validator.check_nonce.return_value = False
+ u, h, b, s = self.endpoint.create_request_token_response(
+ self.uri, headers=self.headers)
+ self.assertEqual(s, 400)
+ self.assertIn('invalid_request', b)
+
+ def test_enforce_ssl(self):
+ """Ensure SSL is enforced by default."""
+ v = RequestValidator()
+ e = BaseEndpoint(v)
+ c = Client('foo')
+ u, h, b = c.sign('http://example.com')
+ r = e._create_request(u, 'GET', b, h)
+ self.assertRaises(errors.InsecureTransportError,
+ e._check_transport_security, r)
+
+ def test_multiple_source_params(self):
+ """Check for duplicate params"""
+ v = RequestValidator()
+ e = BaseEndpoint(v)
+ self.assertRaises(errors.InvalidRequestError, e._create_request,
+ 'https://a.b/?oauth_signature_method=HMAC-SHA1',
+ 'GET', 'oauth_version=foo', URLENCODED)
+ headers = {'Authorization': 'OAuth oauth_signature="foo"'}
+ headers.update(URLENCODED)
+ self.assertRaises(errors.InvalidRequestError, e._create_request,
+ 'https://a.b/?oauth_signature_method=HMAC-SHA1',
+ 'GET',
+ 'oauth_version=foo',
+ headers)
+ headers = {'Authorization': 'OAuth oauth_signature_method="foo"'}
+ headers.update(URLENCODED)
+ self.assertRaises(errors.InvalidRequestError, e._create_request,
+ 'https://a.b/',
+ 'GET',
+ 'oauth_signature=foo',
+ headers)
+
+ def test_duplicate_params(self):
+ """Ensure params are only supplied once"""
+ v = RequestValidator()
+ e = BaseEndpoint(v)
+ self.assertRaises(errors.InvalidRequestError, e._create_request,
+ 'https://a.b/?oauth_version=a&oauth_version=b',
+ 'GET', None, URLENCODED)
+ self.assertRaises(errors.InvalidRequestError, e._create_request,
+ 'https://a.b/', 'GET', 'oauth_version=a&oauth_version=b',
+ URLENCODED)
+
+ def test_mandated_params(self):
+ """Ensure all mandatory params are present."""
+ v = RequestValidator()
+ e = BaseEndpoint(v)
+ r = e._create_request('https://a.b/', 'GET',
+ 'oauth_signature=a&oauth_consumer_key=b&oauth_nonce',
+ URLENCODED)
+ self.assertRaises(errors.InvalidRequestError,
+ e._check_mandatory_parameters, r)
+
+ def test_oauth_version(self):
+ """OAuth version must be 1.0 if present."""
+ v = RequestValidator()
+ e = BaseEndpoint(v)
+ r = e._create_request('https://a.b/', 'GET',
+ ('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&'
+ 'oauth_timestamp=a&oauth_signature_method=RSA-SHA1&'
+ 'oauth_version=2.0'),
+ URLENCODED)
+ self.assertRaises(errors.InvalidRequestError,
+ e._check_mandatory_parameters, r)
+
+ def test_oauth_timestamp(self):
+ """Check for a valid UNIX timestamp."""
+ v = RequestValidator()
+ e = BaseEndpoint(v)
+
+ # Invalid timestamp length, must be 10
+ r = e._create_request('https://a.b/', 'GET',
+ ('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&'
+ 'oauth_version=1.0&oauth_signature_method=RSA-SHA1&'
+ 'oauth_timestamp=123456789'),
+ URLENCODED)
+ self.assertRaises(errors.InvalidRequestError,
+ e._check_mandatory_parameters, r)
+
+ # Invalid timestamp age, must be younger than 10 minutes
+ r = e._create_request('https://a.b/', 'GET',
+ ('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&'
+ 'oauth_version=1.0&oauth_signature_method=RSA-SHA1&'
+ 'oauth_timestamp=1234567890'),
+ URLENCODED)
+ self.assertRaises(errors.InvalidRequestError,
+ e._check_mandatory_parameters, r)
+
+ # Timestamp must be an integer
+ r = e._create_request('https://a.b/', 'GET',
+ ('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&'
+ 'oauth_version=1.0&oauth_signature_method=RSA-SHA1&'
+ 'oauth_timestamp=123456789a'),
+ URLENCODED)
+ self.assertRaises(errors.InvalidRequestError,
+ e._check_mandatory_parameters, r)
+
+ def test_signature_method_validation(self):
+ """Ensure valid signature method is used."""
+
+ body = ('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&'
+ 'oauth_version=1.0&oauth_signature_method=%s&'
+ 'oauth_timestamp=1234567890')
+
+ uri = 'https://example.com/'
+
+ class HMACValidator(RequestValidator):
+
+ @property
+ def allowed_signature_methods(self):
+ return (SIGNATURE_HMAC,)
+
+ v = HMACValidator()
+ e = BaseEndpoint(v)
+ r = e._create_request(uri, 'GET', body % 'RSA-SHA1', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+ r = e._create_request(uri, 'GET', body % 'PLAINTEXT', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+ r = e._create_request(uri, 'GET', body % 'shibboleth', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+
+ class RSAValidator(RequestValidator):
+
+ @property
+ def allowed_signature_methods(self):
+ return (SIGNATURE_RSA,)
+
+ v = RSAValidator()
+ e = BaseEndpoint(v)
+ r = e._create_request(uri, 'GET', body % 'HMAC-SHA1', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+ r = e._create_request(uri, 'GET', body % 'PLAINTEXT', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+ r = e._create_request(uri, 'GET', body % 'shibboleth', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+
+ class PlainValidator(RequestValidator):
+
+ @property
+ def allowed_signature_methods(self):
+ return (SIGNATURE_PLAINTEXT,)
+
+ v = PlainValidator()
+ e = BaseEndpoint(v)
+ r = e._create_request(uri, 'GET', body % 'HMAC-SHA1', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+ r = e._create_request(uri, 'GET', body % 'RSA-SHA1', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+ r = e._create_request(uri, 'GET', body % 'shibboleth', URLENCODED)
+ self.assertRaises(errors.InvalidSignatureMethodError,
+ e._check_mandatory_parameters, r)
+
+
+class ClientValidator(RequestValidator):
+ clients = ['foo']
+ nonces = [('foo', 'once', '1234567891', 'fez')]
+ owners = {'foo': ['abcdefghijklmnopqrstuvxyz', 'fez']}
+ assigned_realms = {('foo', 'abcdefghijklmnopqrstuvxyz'): 'photos'}
+ verifiers = {('foo', 'fez'): 'shibboleth'}
+
+ @property
+ def client_key_length(self):
+ return 1, 30
+
+ @property
+ def request_token_length(self):
+ return 1, 30
+
+ @property
+ def access_token_length(self):
+ return 1, 30
+
+ @property
+ def nonce_length(self):
+ return 2, 30
+
+ @property
+ def verifier_length(self):
+ return 2, 30
+
+ @property
+ def realms(self):
+ return ['photos']
+
+ @property
+ def timestamp_lifetime(self):
+ # Disabled check to allow hardcoded verification signatures
+ return 1000000000
+
+ @property
+ def dummy_client(self):
+ return 'dummy'
+
+ @property
+ def dummy_request_token(self):
+ return 'dumbo'
+
+ @property
+ def dummy_access_token(self):
+ return 'dumbo'
+
+ def validate_timestamp_and_nonce(self, client_key, timestamp, nonce,
+ request, request_token=None, access_token=None):
+ resource_owner_key = request_token if request_token else access_token
+ return not (client_key, nonce, timestamp, resource_owner_key) in self.nonces
+
+ def validate_client_key(self, client_key):
+ return client_key in self.clients
+
+ def validate_access_token(self, client_key, access_token, request):
+ return (self.owners.get(client_key) and
+ access_token in self.owners.get(client_key))
+
+ def validate_request_token(self, client_key, request_token, request):
+ return (self.owners.get(client_key) and
+ request_token in self.owners.get(client_key))
+
+ def validate_requested_realm(self, client_key, realm, request):
+ return True
+
+ def validate_realm(self, client_key, access_token, request, uri=None,
+ required_realm=None):
+ return (client_key, access_token) in self.assigned_realms
+
+ def validate_verifier(self, client_key, request_token, verifier,
+ request):
+ return ((client_key, request_token) in self.verifiers and
+ safe_string_equals(verifier, self.verifiers.get(
+ (client_key, request_token))))
+
+ def validate_redirect_uri(self, client_key, redirect_uri, request):
+ return redirect_uri.startswith('http://client.example.com/')
+
+ def get_client_secret(self, client_key, request):
+ return 'super secret'
+
+ def get_access_token_secret(self, client_key, access_token, request):
+ return 'even more secret'
+
+ def get_request_token_secret(self, client_key, request_token, request):
+ return 'even more secret'
+
+
+class SignatureVerificationTest(TestCase):
+
+ def test_signature_verification(self):
+ v = ClientValidator()
+ e = BaseEndpoint(v)
+
+ uri = 'https://example.com/'
+ short_sig = ('oauth_signature=fmrXnTF4lO4o%2BD0%2FlZaJHP%2FXqEY&'
+ 'oauth_timestamp=1234567890&'
+ 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&'
+ 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&'
+ 'oauth_token=abcdefghijklmnopqrstuvxyz&'
+ 'oauth_consumer_key=foo')
+ r = e._create_request(uri, 'GET', short_sig, URLENCODED)
+ self.assertFalse(e._check_signature(r))
+
+ plain = ('oauth_signature=correctlengthbutthewrongcontent1111&'
+ 'oauth_timestamp=1234567890&'
+ 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&'
+ 'oauth_version=1.0&oauth_signature_method=PLAINTEXT&'
+ 'oauth_token=abcdefghijklmnopqrstuvxyz&'
+ 'oauth_consumer_key=foo')
+ r = e._create_request(uri, 'GET', plain, URLENCODED)
+ self.assertFalse(e._check_signature(r))