diff options
author | José Padilla <jpadilla@webapplicate.com> | 2014-10-15 23:51:04 -0400 |
---|---|---|
committer | José Padilla <jpadilla@webapplicate.com> | 2014-10-15 23:51:04 -0400 |
commit | e4ebd2e7b442f1e1805b8d030769136bb215db2c (patch) | |
tree | 1d205868b937d4856f3254919f713cdcafacd6c0 | |
parent | a99f4015d1f11ddc18ac8da5f6604d5b31aa125e (diff) | |
download | pyjwt-e4ebd2e7b442f1e1805b8d030769136bb215db2c.tar.gz |
Use single quotes instead of double quotes
-rw-r--r-- | jwt/__init__.py | 66 | ||||
-rwxr-xr-x | setup.py | 33 | ||||
-rw-r--r-- | tests/test_jwt.py | 84 |
3 files changed, 101 insertions, 82 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py index 4ffb1c4..2f9e4d6 100644 --- a/jwt/__init__.py +++ b/jwt/__init__.py @@ -19,14 +19,15 @@ try: except ImportError: import simplejson as json -__all__ = ['encode', 'decode', 'DecodeError'] - if sys.version_info >= (3, 0, 0): unicode = str basestring = str +__all__ = ['encode', 'decode', 'DecodeError'] + + class DecodeError(Exception): pass @@ -47,11 +48,14 @@ verify_methods = { 'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest() } + def prepare_HS_key(key): if not isinstance(key, basestring) and not isinstance(key, bytes): - raise TypeError("Expecting a string- or bytes-formatted key.") + raise TypeError('Expecting a string- or bytes-formatted key.') + if isinstance(key, unicode): key = key.encode('utf-8') + return key prepare_key_methods = { @@ -83,11 +87,13 @@ try: if isinstance(key, basestring): if isinstance(key, unicode): key = key.encode('utf-8') + key = RSA.importKey(key) elif isinstance(key, RSA._RSAobj): pass else: - raise TypeError("Expecting a PEM- or RSA-formatted key.") + raise TypeError('Expecting a PEM- or RSA-formatted key.') + return key prepare_key_methods.update({ @@ -108,20 +114,26 @@ def constant_time_compare(val1, val2): """ if len(val1) != len(val2): return False + result = 0 - if sys.version_info >= (3, 0, 0): # bytes are numbers + + if sys.version_info >= (3, 0, 0): + # Bytes are numbers for x, y in zip(val1, val2): result |= x ^ y else: for x, y in zip(val1, val2): result |= ord(x) ^ ord(y) + return result == 0 def base64url_decode(input): rem = len(input) % 4 + if rem > 0: input += b'=' * (4 - rem) + return base64.urlsafe_b64decode(input) @@ -135,7 +147,7 @@ def header(jwt): header_data = base64url_decode(header_segment).decode('utf-8') return json.loads(header_data) except (ValueError, TypeError): - raise DecodeError("Invalid header encoding") + raise DecodeError('Invalid header encoding') def encode(payload, key, algorithm='HS256', headers=None): @@ -143,20 +155,23 @@ def encode(payload, key, algorithm='HS256', headers=None): # Check that we get a mapping if not isinstance(payload, Mapping): - raise TypeError("Expecting a mapping object, as json web token only" - "support json objects.") + raise TypeError('Expecting a mapping object, as json web token only' + 'support json objects.') # Header - header = {"typ": "JWT", "alg": algorithm} + header = {'typ': 'JWT', 'alg': algorithm} if headers: header.update(headers) + json_header = json.dumps(header, separators=(',', ':')).encode('utf-8') segments.append(base64url_encode(json_header)) # Payload - for time_claim in ['exp', 'iat', 'nbf']: # convert datetime to a intDate value in known time-format claims + for time_claim in ['exp', 'iat', 'nbf']: + # Convert datetime to a intDate value in known time-format claims if isinstance(payload.get(time_claim), datetime): payload[time_claim] = timegm(payload[time_claim].utctimetuple()) + json_payload = json.dumps(payload, separators=(',', ':')).encode('utf-8') segments.append(base64url_encode(json_payload)) @@ -166,8 +181,10 @@ def encode(payload, key, algorithm='HS256', headers=None): key = prepare_key_methods[algorithm](key) signature = signing_methods[algorithm](signing_input, key) except KeyError: - raise NotImplementedError("Algorithm not supported") + raise NotImplementedError('Algorithm not supported') + segments.append(base64url_encode(signature)) + return b'.'.join(segments) @@ -176,7 +193,7 @@ def decode(jwt, key='', verify=True, verify_expiration=True, leeway=0): if verify: verify_signature(payload, signing_input, header, signature, key, - verify_expiration, leeway) + verify_expiration, leeway) return payload @@ -188,50 +205,53 @@ def load(jwt): signing_input, crypto_segment = jwt.rsplit(b'.', 1) header_segment, payload_segment = signing_input.split(b'.', 1) except ValueError: - raise DecodeError("Not enough segments") + raise DecodeError('Not enough segments') try: header_data = base64url_decode(header_segment) except (TypeError, binascii.Error): - raise DecodeError("Invalid header padding") + raise DecodeError('Invalid header padding') try: header = json.loads(header_data.decode('utf-8')) except ValueError as e: - raise DecodeError("Invalid header string: %s" % e) + raise DecodeError('Invalid header string: %s' % e) try: payload_data = base64url_decode(payload_segment) except (TypeError, binascii.Error): - raise DecodeError("Invalid payload padding") + raise DecodeError('Invalid payload padding') try: payload = json.loads(payload_data.decode('utf-8')) except ValueError as e: - raise DecodeError("Invalid payload string: %s" % e) + raise DecodeError('Invalid payload string: %s' % e) try: signature = base64url_decode(crypto_segment) except (TypeError, binascii.Error): - raise DecodeError("Invalid crypto padding") + raise DecodeError('Invalid crypto padding') return (payload, signing_input, header, signature) def verify_signature(payload, signing_input, header, signature, key='', - verify_expiration=True, leeway=0): + verify_expiration=True, leeway=0): try: algorithm = header['alg'].upper() key = prepare_key_methods[algorithm](key) + if algorithm.startswith('HS'): expected = verify_methods[algorithm](signing_input, key) + if not constant_time_compare(signature, expected): - raise DecodeError("Signature verification failed") + raise DecodeError('Signature verification failed') else: if not verify_methods[algorithm](signing_input, key, signature): - raise DecodeError("Signature verification failed") + raise DecodeError('Signature verification failed') except KeyError: - raise DecodeError("Algorithm not supported") + raise DecodeError('Algorithm not supported') if 'exp' in payload and verify_expiration: utc_timestamp = timegm(datetime.utcnow().utctimetuple()) + if payload['exp'] < (utc_timestamp - leeway): - raise ExpiredSignature("Signature has expired") + raise ExpiredSignature('Signature has expired') @@ -6,28 +6,27 @@ from setuptools import setup with open(os.path.join(os.path.dirname(__file__), 'README.md')) as readme: long_description = readme.read() - setup( - name="PyJWT", - version="0.2.2", - author="Jeff Lindsay", - author_email="progrium@gmail.com", - description="JSON Web Token implementation in Python", - license="MIT", - keywords="jwt json web token security signing", - url="http://github.com/progrium/pyjwt", + name='PyJWT', + version='0.2.2', + author='Jeff Lindsay', + author_email='progrium@gmail.com', + description='JSON Web Token implementation in Python', + license='MIT', + keywords='jwt json web token security signing', + url='http://github.com/progrium/pyjwt', packages=['jwt'], scripts=['bin/jwt'], long_description=long_description, classifiers=[ - "Development Status :: 3 - Alpha", - "License :: OSI Approved :: MIT License", - "Programming Language :: Python", - "Programming Language :: Python :: 2.6", - "Programming Language :: Python :: 2.7", - "Programming Language :: Python :: 3.2", - "Programming Language :: Python :: 3.3", - "Topic :: Utilities", + 'Development Status :: 3 - Alpha', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.2', + 'Programming Language :: Python :: 3.3', + 'Topic :: Utilities', ], test_suite='tests.test_jwt' ) diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 29b88f0..a66b181 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -18,8 +18,8 @@ def utc_timestamp(): class TestJWT(unittest.TestCase): def setUp(self): - self.payload = {"iss": "jeff", "exp": utc_timestamp() + 1, - "claim": "insanity"} + self.payload = {'iss': 'jeff', 'exp': utc_timestamp() + 1, + 'claim': 'insanity'} def test_encode_decode(self): secret = 'secret' @@ -36,12 +36,12 @@ class TestJWT(unittest.TestCase): self.assertRaises(TypeError, lambda: jwt.encode(t, 'secret')) def test_encode_datetime(self): - secret = "secret" + secret = 'secret' current_datetime = datetime.utcnow() payload = { - "exp": current_datetime, - "iat": current_datetime, - "nbf": current_datetime + 'exp': current_datetime, + 'iat': current_datetime, + 'nbf': current_datetime } jwt_message = jwt.encode(payload, secret) decoded_payload = jwt.decode(jwt_message, secret, leeway=1) @@ -66,23 +66,23 @@ class TestJWT(unittest.TestCase): lambda: jwt.decode(jwt_message, bad_secret)) def test_decodes_valid_jwt(self): - example_payload = {"hello": "world"} - example_secret = "secret" + example_payload = {'hello': 'world'} + example_secret = 'secret' example_jwt = ( - b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - b".eyJoZWxsbyI6ICJ3b3JsZCJ9" - b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") + b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' + b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') decoded_payload = jwt.decode(example_jwt, example_secret) self.assertEqual(decoded_payload, example_payload) def test_load_verify_valid_jwt(self): - example_payload = {"hello": "world"} - example_secret = "secret" + example_payload = {'hello': 'world'} + example_secret = 'secret' example_jwt = ( - b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - b".eyJoZWxsbyI6ICJ3b3JsZCJ9" - b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") + b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' + b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') decoded_payload, signing, header, signature = jwt.load(example_jwt) @@ -134,7 +134,7 @@ class TestJWT(unittest.TestCase): def test_invalid_crypto_alg(self): self.assertRaises(NotImplementedError, jwt.encode, self.payload, - "secret", "HS1024") + 'secret', 'HS1024') def test_unicode_secret(self): secret = '\xc2' @@ -182,12 +182,12 @@ class TestJWT(unittest.TestCase): self.assertEqual(decoded_payload, self.payload) def test_decode_unicode_value(self): - example_payload = {"hello": "world"} - example_secret = "secret" + example_payload = {'hello': 'world'} + example_secret = 'secret' example_jwt = ( - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - ".eyJoZWxsbyI6ICJ3b3JsZCJ9" - ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") + 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + '.eyJoZWxsbyI6ICJ3b3JsZCJ9' + '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') decoded_payload = jwt.decode(example_jwt, example_secret) self.assertEqual(decoded_payload, example_payload) @@ -196,10 +196,10 @@ class TestJWT(unittest.TestCase): def test_decode_invalid_header_padding(self): example_jwt = ( - "aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - ".eyJoZWxsbyI6ICJ3b3JsZCJ9" - ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") - example_secret = "secret" + 'aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + '.eyJoZWxsbyI6ICJ3b3JsZCJ9' + '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + example_secret = 'secret' self.assertRaises( jwt.DecodeError, @@ -211,10 +211,10 @@ class TestJWT(unittest.TestCase): def test_decode_invalid_header_string(self): example_jwt = ( - "eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ==" - ".eyJoZWxsbyI6ICJ3b3JsZCJ9" - ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") - example_secret = "secret" + 'eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ==' + '.eyJoZWxsbyI6ICJ3b3JsZCJ9' + '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + example_secret = 'secret' try: jwt.load(example_jwt) @@ -232,10 +232,10 @@ class TestJWT(unittest.TestCase): def test_decode_invalid_payload_padding(self): example_jwt = ( - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - ".aeyJoZWxsbyI6ICJ3b3JsZCJ9" - ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") - example_secret = "secret" + 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + '.aeyJoZWxsbyI6ICJ3b3JsZCJ9' + '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + example_secret = 'secret' self.assertRaises( jwt.DecodeError, @@ -247,10 +247,10 @@ class TestJWT(unittest.TestCase): def test_decode_invalid_payload_string(self): example_jwt = ( - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - ".eyJoZWxsb-kiOiAid29ybGQifQ==" - ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") - example_secret = "secret" + 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + '.eyJoZWxsb-kiOiAid29ybGQifQ==' + '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + example_secret = 'secret' try: jwt.load(example_jwt) @@ -268,10 +268,10 @@ class TestJWT(unittest.TestCase): def test_decode_invalid_crypto_padding(self): example_jwt = ( - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - ".eyJoZWxsbyI6ICJ3b3JsZCJ9" - ".aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8") - example_secret = "secret" + 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' + '.eyJoZWxsbyI6ICJ3b3JsZCJ9' + '.aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + example_secret = 'secret' self.assertRaises( jwt.DecodeError, |