diff options
author | Ib Lundgren <ib.lundgren@gmail.com> | 2013-06-18 21:32:29 +0100 |
---|---|---|
committer | Ib Lundgren <ib.lundgren@gmail.com> | 2013-06-18 21:32:29 +0100 |
commit | 8ba2b3a6c7b5ba94eae93187a0f4ac6dbe80d22f (patch) | |
tree | bef7338534660618167e6f43e51b72b88dd310de | |
parent | 1571f0bbb3a22bdc70172246ac0a54793b538464 (diff) | |
download | oauthlib-8ba2b3a6c7b5ba94eae93187a0f4ac6dbe80d22f.tar.gz |
Request validator tests.
-rw-r--r-- | tests/oauth1/rfc5849/test_request_validator.py | 594 |
1 files changed, 59 insertions, 535 deletions
diff --git a/tests/oauth1/rfc5849/test_request_validator.py b/tests/oauth1/rfc5849/test_request_validator.py index a70a76e..34a950a 100644 --- a/tests/oauth1/rfc5849/test_request_validator.py +++ b/tests/oauth1/rfc5849/test_request_validator.py @@ -1,545 +1,69 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, unicode_literals -import time -from oauthlib.common import safe_string_equals -from oauthlib.oauth1.rfc5849 import * from ...unittest import TestCase +from oauthlib.oauth1 import RequestValidator -class ServerTests(TestCase): - CLIENT_KEY = 'dpf43f3p2l4k3l03' - CLIENT_SECRET = 'kd94hf93k423kf44' - - RESOURCE_OWNER_KEY = 'kkk9d7dh3k39sjv7' - RESOURCE_OWNER_SECRET = 'just-a-string asdasd' - - RSA_KEY = "-----BEGIN PRIVATE KEY-----\n MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAMOZ519ZczgJiUPI\n J9Oac424LUvJw+HXqB2PqwFxdrcar+FDJihQbuHxGhz7bhhHADPG9KhNH45V5sDI\n /g4USqdd9wys8lAqxQAA9AxV2vXX+HK+id+WOZUfBM78OnzeOdvUzyxgmRean+ps\n A/U+PwsiToeGp0ywFkBCF7VJvd8pAgMBAAECgYBuQDWWHQlAsL9aIVuxfgFcBFAj\n w9pRVglAgFZXPek4VCaGxh6f4pZdbFTXuTDZJkwK4z3MD4yV4f1q9N+ed/mLVsZv\n XJb22jQmnNKhiz/thDWz9f97z+TTSocC85H0zdsUrmRKlxIR6+ys9hpBPe2HSKbJ\n zEcW1IKDkM0acJYm8QJBAP12rHp00IrIdrUsm9rO6dinLZpbGeVu2LFPM0Me7nYO\n Kc/GqrLHTSnm91BDbj9IgFrk45mEcSCCOUYutoKgPy8CQQDFjv9ZHd3BkCSbojG1\n RyRVyJQXfZHGMBabta5jjjTJlO7bMjELSfPsnZxoILjyf06qX/LoqsAXrV0Imf8n\n d/EnAkAcZSUheuC6C4cw+NRlCPUtrlzvg/E8wNRJ2OOXS2nPk/qfKlSJPsaoQRXH\n yiYZtNecVzQgSLQbvjsIX8dWjvlFAkAEuHwFhx8rZuRZC7EgYcjOe/J99TQshi2k\n Ht1B573/Kx3iAvsFCAlaGBIKsu14be5VR+GoCZx5dF0KvZNJQCZ1AkEAuYIpaPLf\n xyvKM8kDJ3uyJ2OHiuVlhMNe8g9GX3hHU4UWx3QdnaVm92mx84iuwRdaB1k6Yhk/\n 9jQrjQ0RmlMjpw==\n -----END PRIVATE KEY-----" - - PUB_RSA_KEY = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDmedfWXM4CYlDyCfTmnONuC1L\nycPh16gdj6sBcXa3Gq/hQyYoUG7h8Roc+24YRwAzxvSoTR+OVebAyP4OFEqnXfcM\nrPJQKsUAAPQMVdr11/hyvonfljmVHwTO/Dp83jnb1M8sYJkXmp/qbAP1Pj8LIk6H\nhqdMsBZAQhe1Sb3fKQIDAQAB\n-----END PUBLIC KEY-----" - - URLENCODED = {"Content-Type": "application/x-www-form-urlencoded"} - - class TestServer(Server): - - @property - def client_key_length(self): - return 16, 16 - - @property - def request_token_length(self): - return 16, 16 - - @property - def access_token_length(self): - return 16, 16 - - @property - def enforce_ssl(self): - return False - - def get_client_secret(self, client_key): - return ServerTests.CLIENT_SECRET - - def get_access_token_secret(self, client_key, access_token): - return ServerTests.RESOURCE_OWNER_SECRET - - def get_request_token_secret(self, client_key, request_token): - return ServerTests.RESOURCE_OWNER_SECRET - - def get_rsa_key(self, client_key): - return ServerTests.PUB_RSA_KEY - - def validate_client_key(self, client_key): - return ServerTests.CLIENT_KEY == client_key - - def validate_access_token(self, client_key, access_token): - return (ServerTests.CLIENT_KEY == client_key and - ServerTests.RESOURCE_OWNER_KEY == access_token) - - def validate_request_token(self, client_key, request_token): - return (ServerTests.CLIENT_KEY == client_key and - ServerTests.RESOURCE_OWNER_KEY == request_token) - - def validate_timestamp_and_nonce(self, client_key, timestamp, nonce, - request_token=None, access_token=None): - return True - - def validate_requested_realm(self, client_key, realm): - return True - - def validate_realm(self, client_key, realm, uri, - request_token=None, access_token=None, required_realm=None): - return True - - def validate_verifier(self, client_key, request_token, verifier): - return True - - def validate_redirect_uri(self, client_key, redirect_uri): - return redirect_uri.startswith('http://client.example.com/') - - class ClientServer(Server): - clients = ['foo'] - nonces = [('foo', 'once', '1234567891', 'fez')] - owners = { 'foo' : ['abcdefghijklmnopqrstuvxyz', 'fez'] } - assigned_realms = { ('foo', 'abcdefghijklmnopqrstuvxyz') : 'photos' } - verifiers = { ('foo', 'fez') : 'shibboleth' } - - @property - def client_key_length(self): - return 1, 30 - - @property - def request_token_length(self): - return 1, 30 - - @property - def access_token_length(self): - return 1, 30 - - @property - def nonce_length(self): - return 2, 30 - - @property - def verifier_length(self): - return 2, 30 - - @property - def realms(self): - return ['photos'] - - @property - def timestamp_lifetime(self): - # Disabled check to allow hardcoded verification signatures - return 1000000000 - - @property - def dummy_client(self): - return 'dummy' - - @property - def dummy_request_token(self): - return 'dumbo' - - @property - def dummy_access_token(self): - return 'dumbo' - - def validate_timestamp_and_nonce(self, client_key, timestamp, nonce, - request_token=None, access_token=None): - resource_owner_key = request_token if request_token else access_token - return not (client_key, nonce, timestamp, resource_owner_key) in self.nonces - - def validate_client_key(self, client_key): - return client_key in self.clients - - def validate_access_token(self, client_key, access_token): - return (self.owners.get(client_key) and - access_token in self.owners.get(client_key)) - - def validate_request_token(self, client_key, request_token): - return (self.owners.get(client_key) and - request_token in self.owners.get(client_key)) - - def validate_requested_realm(self, client_key, realm): - return True - - def validate_realm(self, client_key, access_token, uri=None, required_realm=None): - return (client_key, access_token) in self.assigned_realms - - def validate_verifier(self, client_key, request_token, verifier): - return ((client_key, request_token) in self.verifiers and - safe_string_equals(verifier, self.verifiers.get( - (client_key, request_token)))) - - def validate_redirect_uri(self, client_key, redirect_uri): - return redirect_uri.startswith('http://client.example.com/') - - def get_client_secret(self, client_key): - return 'super secret' - - def get_access_token_secret(self, client_key, access_token): - return 'even more secret' - - def get_request_token_secret(self, client_key, request_token): - return 'even more secret' - - def test_basic_server_request(self): - c = Client(self.CLIENT_KEY, - client_secret=self.CLIENT_SECRET, - resource_owner_key=self.RESOURCE_OWNER_KEY, - resource_owner_secret=self.RESOURCE_OWNER_SECRET, - ) - - d = Client(self.CLIENT_KEY, - signature_method=SIGNATURE_RSA, - rsa_key=self.RSA_KEY, - resource_owner_key=self.RESOURCE_OWNER_KEY, - ) - - s = self.TestServer() - - uri, headers, body = c.sign('http://server.example.com:80/init') - self.assertTrue(s.verify_request(uri, body=body, headers=headers)[0]) - - uri, headers, body = d.sign('http://server.example.com:80/init') - self.assertTrue(s.verify_request(uri, body=body, headers=headers)[0]) - - def test_server_callback_request(self): - c = Client(self.CLIENT_KEY, - client_secret=self.CLIENT_SECRET, - resource_owner_key=self.RESOURCE_OWNER_KEY, - resource_owner_secret=self.RESOURCE_OWNER_SECRET, - callback_uri='http://client.example.com/callback' - ) - - uri, headers, body = c.sign('http://server.example.com:80/init') - - s = self.TestServer() - self.assertTrue(s.verify_request(uri, body=body, headers=headers, - require_callback=True)[0]) - - def test_server_invalid_callback_request(self): - c = Client(self.CLIENT_KEY, - client_secret=self.CLIENT_SECRET, - resource_owner_key=self.RESOURCE_OWNER_KEY, - resource_owner_secret=self.RESOURCE_OWNER_SECRET, - callback_uri='http://attacker.example.com/callback' - ) - - uri, headers, body = c.sign('http://server.example.com:80/init') - - s = self.TestServer() - self.assertFalse(s.verify_request(uri, body=body, headers=headers, - require_callback=True)[0]) - - def test_server_missing_callback_request(self): - c = Client(self.CLIENT_KEY, - resource_owner_key=self.RESOURCE_OWNER_KEY, - ) - - uri, headers, body = c.sign('http://server.example.com:80/init') - - s = self.TestServer() - self.assertRaises(ValueError, s.verify_request, uri, body=body, headers=headers, - require_callback=True) - - self.assertRaises(ValueError, s.verify_request_token_request, uri, body=body, - headers=headers) +class RequestValidatorTests(TestCase): def test_not_implemented(self): - s = Server() - self.assertRaises(NotImplementedError, s.get_client_secret, None) - self.assertRaises(NotImplementedError, s.get_request_token_secret, None, None) - self.assertRaises(NotImplementedError, s.get_access_token_secret, None, None) - self.assertRaises(NotImplementedError, lambda: s.dummy_client) - self.assertRaises(NotImplementedError, lambda: s.dummy_request_token) - self.assertRaises(NotImplementedError, lambda: s.dummy_access_token) - self.assertRaises(NotImplementedError, s.get_rsa_key, None) - self.assertRaises(NotImplementedError, s.validate_client_key, None) - self.assertRaises(NotImplementedError, s.validate_access_token, None, None) - self.assertRaises(NotImplementedError, s.validate_request_token, None, None) - self.assertRaises(NotImplementedError, s.validate_timestamp_and_nonce, - None, None, None) - self.assertRaises(NotImplementedError, s.validate_redirect_uri, None, None) - self.assertRaises(NotImplementedError, s.validate_realm, None, None, None, None) - self.assertRaises(NotImplementedError, s.validate_requested_realm, None, None) - self.assertRaises(NotImplementedError, s.validate_verifier, None, None, None) - - def test_enforce_ssl(self): - """Ensure SSL is enforced by default.""" - s = Server() - self.assertRaises(ValueError, s.verify_request, 'http://example.com') - - def test_multiple_source_params(self): - """Check for duplicate params""" - s = Server() - self.assertRaises(ValueError, s.verify_request, 'https://a.b/?oauth_signature_method=HMAC-SHA1', - body='oauth_version=foo') - self.assertRaises(ValueError, s.verify_request, 'https://a.b/?oauth_signature_method=HMAC-SHA1', - headers={'Authorization' : 'OAuth oauth_signature="foo"'}) - self.assertRaises(ValueError, s.verify_request, 'https://a.b/?oauth_signature_method=HMAC-SHA1', - body='oauth_version=foo', - headers={'Authorization' : 'OAuth oauth_signature="foo"'}) - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body='oauth_signature=foo', - headers={'Authorization' : 'OAuth oauth_signature_method="foo"'}) - - def test_duplicate_params(self): - """Ensure params are only supplied once""" - - s = Server() - self.assertRaises(ValueError, s.verify_request, - 'https://a.b/?oauth_version=a&oauth_version=b') - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body='oauth_version=a&oauth_version=b') - - def test_mandated_params(self): - """Ensure all mandatory params are present.""" - s = Server() - self.assertRaises(ValueError, s.verify_request, 'https://a.b/') - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body=('oauth_signature=a&oauth_consumer_key=b&oauth_nonce')) - - def test_oauth_version(self): - """OAuth version must be 1.0 if present.""" - s = Server() - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body=('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&' - 'oauth_timestamp=a&oauth_signature_method=RSA-SHA1&' - 'oauth_version=2.0'), - headers=self.URLENCODED) - - def test_oauth_timestamp(self): - """Check for a valid UNIX timestamp.""" - s = Server() - - # Invalid timestamp length, must be 10 - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body=('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&' - 'oauth_version=1.0&oauth_signature_method=RSA-SHA1&' - 'oauth_timestamp=123456789'), - headers=self.URLENCODED) - - # Invalid timestamp age, must be younger than 10 minutes - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body=('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&' - 'oauth_version=1.0&oauth_signature_method=RSA-SHA1&' - 'oauth_timestamp=1234567890'), - headers=self.URLENCODED) - - # Timestamp must be an integer - self.assertRaises(ValueError, s.verify_request, 'https://a.b/', - body=('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&' - 'oauth_version=1.0&oauth_signature_method=RSA-SHA1&' - 'oauth_timestamp=123456789a'), - headers=self.URLENCODED) - - def test_signature_method_validation(self): - """Ensure valid signature method is used.""" - - body=('oauth_signature=a&oauth_consumer_key=b&oauth_nonce=c&' - 'oauth_version=1.0&oauth_signature_method=%s&' - 'oauth_timestamp=1234567890') - - uri = 'https://example.com/' - - class HMACServer(Server): - - @property - def allowed_signature_methods(self): - return (SIGNATURE_HMAC,) - - s = HMACServer() - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'RSA-SHA1', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'PLAINTEXT', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'shibboleth', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % '', headers=self.URLENCODED) - - class RSAServer(Server): - - @property - def allowed_signature_methods(self): - return (SIGNATURE_RSA,) - - s = RSAServer() - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'HMAC-SHA1', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'PLAINTEXT', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'shibboleth', headers=self.URLENCODED) - - class PlainServer(Server): - + v = RequestValidator() + self.assertRaises(NotImplementedError, v.get_client_secret, None, None) + self.assertRaises(NotImplementedError, v.get_request_token_secret, + None, None, None) + self.assertRaises(NotImplementedError, v.get_access_token_secret, + None, None, None) + self.assertRaises(NotImplementedError, lambda: v.dummy_client) + self.assertRaises(NotImplementedError, lambda: v.dummy_request_token) + self.assertRaises(NotImplementedError, lambda: v.dummy_access_token) + self.assertRaises(NotImplementedError, v.get_rsa_key, None, None) + self.assertRaises(NotImplementedError, v.get_default_realms, None, None) + self.assertRaises(NotImplementedError, v.get_realms, None, None) + self.assertRaises(NotImplementedError, v.get_redirect_uri, None, None) + self.assertRaises(NotImplementedError, v.validate_client_key, None, None) + self.assertRaises(NotImplementedError, v.validate_access_token, + None, None, None) + self.assertRaises(NotImplementedError, v.validate_request_token, + None, None, None) + self.assertRaises(NotImplementedError, v.verify_request_token, + None, None) + self.assertRaises(NotImplementedError, v.verify_realms, + None, None, None) + self.assertRaises(NotImplementedError, v.validate_timestamp_and_nonce, + None, None, None, None) + self.assertRaises(NotImplementedError, v.validate_redirect_uri, + None, None, None) + self.assertRaises(NotImplementedError, v.validate_realm, + None, None, None, None, None) + self.assertRaises(NotImplementedError, v.validate_requested_realm, + None, None, None) + self.assertRaises(NotImplementedError, v.validate_verifier, + None, None, None, None) + self.assertRaises(NotImplementedError, v.save_access_token, None, None) + self.assertRaises(NotImplementedError, v.save_request_token, None, None) + self.assertRaises(NotImplementedError, v.save_verifier, + None, None, None) + + def test_check_length(self): + v = RequestValidator() + + for method in (v.check_client_key, v.check_request_token, + v.check_access_token, v.check_nonce, v.check_verifier): + for not_valid in ('tooshort', 'invalid?characters!', + 'thisclientkeyisalittlebittoolong'): + self.assertFalse(method(not_valid)) + for valid in ('itsjustaboutlongenough',): + self.assertTrue(method(valid)) + + def test_check_realm(self): + v = RequestValidator() + self.assertFalse(v.check_realm(['foo'])) + + class FooRealmValidator(RequestValidator): @property - def allowed_signature_methods(self): - return (SIGNATURE_PLAINTEXT,) - - s = PlainServer() - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'HMAC-SHA1', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'RSA-SHA1', headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=body % 'shibboleth', headers=self.URLENCODED) - - def test_check_methods(self): - """Ensure values are correctly formatted. - - Default setting is to only allow alphanumeric characters and a length - between 20 and 30 characters. - """ - - ts = int(time.time()) - - client=('oauth_signature=a&oauth_timestamp=%s&oauth_nonce=c&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_consumer_key=%s') - - owner=('oauth_signature=a&oauth_timestamp=%s&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_consumer_key=abcdefghijklmnopqrstuvxyz&' - 'oauth_token=%s') - - nonce=('oauth_signature=a&oauth_timestamp=%s&oauth_nonce=%s&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_consumer_key=abcdefghijklmnopqrstuvwxyz') - - realm=('oauth_signature=a&oauth_timestamp=%s&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_consumer_key=abcdefghijklmnopqrstuvxyz&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'realm=%s') - - verifier=('oauth_signature=a&oauth_timestamp=%s&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_consumer_key=abcdefghijklmnopqrstuvxyz&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_verifier=%s') - - noverifier=('oauth_signature=a&oauth_timestamp=%s&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_consumer_key=abcdefghijklmnopqrstuvxyz&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&') - - uri = 'https://example.com/' - s = Server() - - # Invalid characters - invalid = (ts, '%C3%A5abcdefghijklmnopqrstuvwxyz') - self.assertRaises(ValueError, s.verify_request, uri, body=client % invalid, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=owner % invalid, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=owner % invalid, headers=self.URLENCODED, require_verifier=True) - self.assertRaises(ValueError, s.verify_request, uri, body=nonce % invalid, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=verifier % invalid, - require_verifier=True, headers=self.URLENCODED) - - # Too short - short = (ts, 'abcdefghi') - self.assertRaises(ValueError, s.verify_request, uri, body=client % short, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=owner % short, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=owner % short, headers=self.URLENCODED, require_verifier=True) - self.assertRaises(ValueError, s.verify_request, uri, body=nonce % short, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=verifier % short, - require_verifier=True, headers=self.URLENCODED) - - # Too long - loong = (ts, 'abcdefghijklmnopqrstuvwxyz123456789') - self.assertRaises(ValueError, s.verify_request, uri, body=client % loong, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=owner % loong, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=owner % loong, headers=self.URLENCODED, require_verifier=True) - self.assertRaises(ValueError, s.verify_request, uri, body=nonce % loong, headers=self.URLENCODED) - self.assertRaises(ValueError, s.verify_request, uri, body=verifier % loong, - require_verifier=True, headers=self.URLENCODED) - - # By default no realms are allowed - test = (ts, 'shibboleth') - self.assertRaises(ValueError, s.verify_request, uri, body=realm % test, - require_realm=True, headers=self.URLENCODED) - - # Missing required owner - self.assertRaises(ValueError, s.verify_request, uri, body=owner % (ts, ''), headers=self.URLENCODED) - - # Missing required verifier - self.assertRaises(ValueError, s.verify_request, uri, body=noverifier % ts, - require_verifier=True, headers=self.URLENCODED) - - def test_client_validation(self): - uri = 'https://example.com/' - client = ('oauth_signature=fmrXnTF4lO4o%2BD0%2FlZaJHP%2FXqEY%3D&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_consumer_key={0}') - - s = self.ClientServer() - self.assertFalse(s.verify_request(uri, headers=self.URLENCODED, body=client.format('bar'))[0]) - self.assertFalse(s.verify_request(uri, headers=self.URLENCODED, body=client.format('bar'))[0]) - self.assertTrue(s.verify_request(uri, headers=self.URLENCODED, body=client.format('foo'))[0]) - - def test_nonce_and_timestamp_validation(self): - uri = 'https://example.com/' - replay = ('oauth_signature=fmrXnTF4lO4o%2BD0%2FlZaJHP%2FXqEY%3D&' - 'oauth_timestamp=1234567891&' - 'oauth_nonce=once&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=fez&' - 'oauth_consumer_key=foo') - - s = self.ClientServer() - self.assertFalse(s.verify_request(uri, headers=self.URLENCODED, body=replay)[0]) - - def test_resource_owner_validation(self): - uri = 'https://example.com/' - - invalid_owner = ('oauth_signature=B0FUgxzDNOPzol0gTTlXREelYrU%3D&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=invalid&' - 'oauth_consumer_key=foo') - - owner_optional = ('oauth_signature=GTmmjtDdyqhDYHL5dDWjVIDx%2BTo%3D&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_consumer_key=foo') - - s = self.ClientServer() - self.assertFalse(s.verify_request(uri, headers=self.URLENCODED, - body=invalid_owner)[0]) - self.assertTrue(s.verify_request(uri, headers=self.URLENCODED, - body=owner_optional, require_resource_owner=False)[0]) - - def test_signature_verification(self): - uri = 'https://example.com/' - short_sig = ('oauth_signature=fmrXnTF4lO4o%2BD0%2FlZaJHP%2FXqEY&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_consumer_key=foo') - - plain = ('oauth_signature=correctlengthbutthewrongcontent1111&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=PLAINTEXT&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_consumer_key=foo') - - s = self.ClientServer() - self.assertFalse(s.verify_request(uri, headers=self.URLENCODED, body=short_sig)[0]) - self.assertFalse(s.verify_request(uri, headers=self.URLENCODED, body=plain)[0]) - - def test_realm_validation(self): - uri = 'https://example.com/' - realm = ('oauth_signature=fmrXnTF4lO4o%2BD0%2FlZaJHP%2FXqEY%3D&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=abcdefghijklmnopqrstuvxyz&' - 'oauth_consumer_key=foo&realm=photos') - - s = self.ClientServer() - self.assertTrue(s.verify_request(uri, headers=self.URLENCODED, body=realm)[0]) - - def test_verifier_validation(self): - uri = 'https://example.com/' - verifier = ('oauth_signature=6AsWnRg%2BZnvfJOZKgaC5JKrF3Pk%3D&' - 'oauth_timestamp=1234567890&' - 'oauth_nonce=abcdefghijklmnopqrstuvwxyz&' - 'oauth_version=1.0&oauth_signature_method=HMAC-SHA1&' - 'oauth_token=fez&oauth_consumer_key=foo&' - 'oauth_verifier=shibboleth') - - s = self.ClientServer() - self.assertTrue(s.verify_request(uri, body=verifier, - headers=self.URLENCODED, require_verifier=True)[0]) + def realms(self): + return ['foo'] - def test_timing_attack(self): - """Ensure near constant time verification.""" - # TODO: - pass + v = FooRealmValidator() + self.assertTrue(v.check_realm(['foo'])) |