summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosé Padilla <jpadilla@webapplicate.com>2014-10-15 23:51:04 -0400
committerJosé Padilla <jpadilla@webapplicate.com>2014-10-15 23:51:04 -0400
commite4ebd2e7b442f1e1805b8d030769136bb215db2c (patch)
tree1d205868b937d4856f3254919f713cdcafacd6c0
parenta99f4015d1f11ddc18ac8da5f6604d5b31aa125e (diff)
downloadpyjwt-e4ebd2e7b442f1e1805b8d030769136bb215db2c.tar.gz
Use single quotes instead of double quotes
-rw-r--r--jwt/__init__.py66
-rwxr-xr-xsetup.py33
-rw-r--r--tests/test_jwt.py84
3 files changed, 101 insertions, 82 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py
index 4ffb1c4..2f9e4d6 100644
--- a/jwt/__init__.py
+++ b/jwt/__init__.py
@@ -19,14 +19,15 @@ try:
except ImportError:
import simplejson as json
-__all__ = ['encode', 'decode', 'DecodeError']
-
if sys.version_info >= (3, 0, 0):
unicode = str
basestring = str
+__all__ = ['encode', 'decode', 'DecodeError']
+
+
class DecodeError(Exception):
pass
@@ -47,11 +48,14 @@ verify_methods = {
'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest()
}
+
def prepare_HS_key(key):
if not isinstance(key, basestring) and not isinstance(key, bytes):
- raise TypeError("Expecting a string- or bytes-formatted key.")
+ raise TypeError('Expecting a string- or bytes-formatted key.')
+
if isinstance(key, unicode):
key = key.encode('utf-8')
+
return key
prepare_key_methods = {
@@ -83,11 +87,13 @@ try:
if isinstance(key, basestring):
if isinstance(key, unicode):
key = key.encode('utf-8')
+
key = RSA.importKey(key)
elif isinstance(key, RSA._RSAobj):
pass
else:
- raise TypeError("Expecting a PEM- or RSA-formatted key.")
+ raise TypeError('Expecting a PEM- or RSA-formatted key.')
+
return key
prepare_key_methods.update({
@@ -108,20 +114,26 @@ def constant_time_compare(val1, val2):
"""
if len(val1) != len(val2):
return False
+
result = 0
- if sys.version_info >= (3, 0, 0): # bytes are numbers
+
+ if sys.version_info >= (3, 0, 0):
+ # Bytes are numbers
for x, y in zip(val1, val2):
result |= x ^ y
else:
for x, y in zip(val1, val2):
result |= ord(x) ^ ord(y)
+
return result == 0
def base64url_decode(input):
rem = len(input) % 4
+
if rem > 0:
input += b'=' * (4 - rem)
+
return base64.urlsafe_b64decode(input)
@@ -135,7 +147,7 @@ def header(jwt):
header_data = base64url_decode(header_segment).decode('utf-8')
return json.loads(header_data)
except (ValueError, TypeError):
- raise DecodeError("Invalid header encoding")
+ raise DecodeError('Invalid header encoding')
def encode(payload, key, algorithm='HS256', headers=None):
@@ -143,20 +155,23 @@ def encode(payload, key, algorithm='HS256', headers=None):
# Check that we get a mapping
if not isinstance(payload, Mapping):
- raise TypeError("Expecting a mapping object, as json web token only"
- "support json objects.")
+ raise TypeError('Expecting a mapping object, as json web token only'
+ 'support json objects.')
# Header
- header = {"typ": "JWT", "alg": algorithm}
+ header = {'typ': 'JWT', 'alg': algorithm}
if headers:
header.update(headers)
+
json_header = json.dumps(header, separators=(',', ':')).encode('utf-8')
segments.append(base64url_encode(json_header))
# Payload
- for time_claim in ['exp', 'iat', 'nbf']: # convert datetime to a intDate value in known time-format claims
+ for time_claim in ['exp', 'iat', 'nbf']:
+ # Convert datetime to a intDate value in known time-format claims
if isinstance(payload.get(time_claim), datetime):
payload[time_claim] = timegm(payload[time_claim].utctimetuple())
+
json_payload = json.dumps(payload, separators=(',', ':')).encode('utf-8')
segments.append(base64url_encode(json_payload))
@@ -166,8 +181,10 @@ def encode(payload, key, algorithm='HS256', headers=None):
key = prepare_key_methods[algorithm](key)
signature = signing_methods[algorithm](signing_input, key)
except KeyError:
- raise NotImplementedError("Algorithm not supported")
+ raise NotImplementedError('Algorithm not supported')
+
segments.append(base64url_encode(signature))
+
return b'.'.join(segments)
@@ -176,7 +193,7 @@ def decode(jwt, key='', verify=True, verify_expiration=True, leeway=0):
if verify:
verify_signature(payload, signing_input, header, signature, key,
- verify_expiration, leeway)
+ verify_expiration, leeway)
return payload
@@ -188,50 +205,53 @@ def load(jwt):
signing_input, crypto_segment = jwt.rsplit(b'.', 1)
header_segment, payload_segment = signing_input.split(b'.', 1)
except ValueError:
- raise DecodeError("Not enough segments")
+ raise DecodeError('Not enough segments')
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
- raise DecodeError("Invalid header padding")
+ raise DecodeError('Invalid header padding')
try:
header = json.loads(header_data.decode('utf-8'))
except ValueError as e:
- raise DecodeError("Invalid header string: %s" % e)
+ raise DecodeError('Invalid header string: %s' % e)
try:
payload_data = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
- raise DecodeError("Invalid payload padding")
+ raise DecodeError('Invalid payload padding')
try:
payload = json.loads(payload_data.decode('utf-8'))
except ValueError as e:
- raise DecodeError("Invalid payload string: %s" % e)
+ raise DecodeError('Invalid payload string: %s' % e)
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
- raise DecodeError("Invalid crypto padding")
+ raise DecodeError('Invalid crypto padding')
return (payload, signing_input, header, signature)
def verify_signature(payload, signing_input, header, signature, key='',
- verify_expiration=True, leeway=0):
+ verify_expiration=True, leeway=0):
try:
algorithm = header['alg'].upper()
key = prepare_key_methods[algorithm](key)
+
if algorithm.startswith('HS'):
expected = verify_methods[algorithm](signing_input, key)
+
if not constant_time_compare(signature, expected):
- raise DecodeError("Signature verification failed")
+ raise DecodeError('Signature verification failed')
else:
if not verify_methods[algorithm](signing_input, key, signature):
- raise DecodeError("Signature verification failed")
+ raise DecodeError('Signature verification failed')
except KeyError:
- raise DecodeError("Algorithm not supported")
+ raise DecodeError('Algorithm not supported')
if 'exp' in payload and verify_expiration:
utc_timestamp = timegm(datetime.utcnow().utctimetuple())
+
if payload['exp'] < (utc_timestamp - leeway):
- raise ExpiredSignature("Signature has expired")
+ raise ExpiredSignature('Signature has expired')
diff --git a/setup.py b/setup.py
index 42b2db2..0430c78 100755
--- a/setup.py
+++ b/setup.py
@@ -6,28 +6,27 @@ from setuptools import setup
with open(os.path.join(os.path.dirname(__file__), 'README.md')) as readme:
long_description = readme.read()
-
setup(
- name="PyJWT",
- version="0.2.2",
- author="Jeff Lindsay",
- author_email="progrium@gmail.com",
- description="JSON Web Token implementation in Python",
- license="MIT",
- keywords="jwt json web token security signing",
- url="http://github.com/progrium/pyjwt",
+ name='PyJWT',
+ version='0.2.2',
+ author='Jeff Lindsay',
+ author_email='progrium@gmail.com',
+ description='JSON Web Token implementation in Python',
+ license='MIT',
+ keywords='jwt json web token security signing',
+ url='http://github.com/progrium/pyjwt',
packages=['jwt'],
scripts=['bin/jwt'],
long_description=long_description,
classifiers=[
- "Development Status :: 3 - Alpha",
- "License :: OSI Approved :: MIT License",
- "Programming Language :: Python",
- "Programming Language :: Python :: 2.6",
- "Programming Language :: Python :: 2.7",
- "Programming Language :: Python :: 3.2",
- "Programming Language :: Python :: 3.3",
- "Topic :: Utilities",
+ 'Development Status :: 3 - Alpha',
+ 'License :: OSI Approved :: MIT License',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3.2',
+ 'Programming Language :: Python :: 3.3',
+ 'Topic :: Utilities',
],
test_suite='tests.test_jwt'
)
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 29b88f0..a66b181 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -18,8 +18,8 @@ def utc_timestamp():
class TestJWT(unittest.TestCase):
def setUp(self):
- self.payload = {"iss": "jeff", "exp": utc_timestamp() + 1,
- "claim": "insanity"}
+ self.payload = {'iss': 'jeff', 'exp': utc_timestamp() + 1,
+ 'claim': 'insanity'}
def test_encode_decode(self):
secret = 'secret'
@@ -36,12 +36,12 @@ class TestJWT(unittest.TestCase):
self.assertRaises(TypeError, lambda: jwt.encode(t, 'secret'))
def test_encode_datetime(self):
- secret = "secret"
+ secret = 'secret'
current_datetime = datetime.utcnow()
payload = {
- "exp": current_datetime,
- "iat": current_datetime,
- "nbf": current_datetime
+ 'exp': current_datetime,
+ 'iat': current_datetime,
+ 'nbf': current_datetime
}
jwt_message = jwt.encode(payload, secret)
decoded_payload = jwt.decode(jwt_message, secret, leeway=1)
@@ -66,23 +66,23 @@ class TestJWT(unittest.TestCase):
lambda: jwt.decode(jwt_message, bad_secret))
def test_decodes_valid_jwt(self):
- example_payload = {"hello": "world"}
- example_secret = "secret"
+ example_payload = {'hello': 'world'}
+ example_secret = 'secret'
example_jwt = (
- b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- b".eyJoZWxsbyI6ICJ3b3JsZCJ9"
- b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
+ b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
+ b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
decoded_payload = jwt.decode(example_jwt, example_secret)
self.assertEqual(decoded_payload, example_payload)
def test_load_verify_valid_jwt(self):
- example_payload = {"hello": "world"}
- example_secret = "secret"
+ example_payload = {'hello': 'world'}
+ example_secret = 'secret'
example_jwt = (
- b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- b".eyJoZWxsbyI6ICJ3b3JsZCJ9"
- b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
+ b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
+ b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
decoded_payload, signing, header, signature = jwt.load(example_jwt)
@@ -134,7 +134,7 @@ class TestJWT(unittest.TestCase):
def test_invalid_crypto_alg(self):
self.assertRaises(NotImplementedError, jwt.encode, self.payload,
- "secret", "HS1024")
+ 'secret', 'HS1024')
def test_unicode_secret(self):
secret = '\xc2'
@@ -182,12 +182,12 @@ class TestJWT(unittest.TestCase):
self.assertEqual(decoded_payload, self.payload)
def test_decode_unicode_value(self):
- example_payload = {"hello": "world"}
- example_secret = "secret"
+ example_payload = {'hello': 'world'}
+ example_secret = 'secret'
example_jwt = (
- "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
- ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
+ 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
+ '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
decoded_payload = jwt.decode(example_jwt, example_secret)
self.assertEqual(decoded_payload, example_payload)
@@ -196,10 +196,10 @@ class TestJWT(unittest.TestCase):
def test_decode_invalid_header_padding(self):
example_jwt = (
- "aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
- ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
- example_secret = "secret"
+ 'aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
+ '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ example_secret = 'secret'
self.assertRaises(
jwt.DecodeError,
@@ -211,10 +211,10 @@ class TestJWT(unittest.TestCase):
def test_decode_invalid_header_string(self):
example_jwt = (
- "eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ=="
- ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
- ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
- example_secret = "secret"
+ 'eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ=='
+ '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
+ '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ example_secret = 'secret'
try:
jwt.load(example_jwt)
@@ -232,10 +232,10 @@ class TestJWT(unittest.TestCase):
def test_decode_invalid_payload_padding(self):
example_jwt = (
- "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- ".aeyJoZWxsbyI6ICJ3b3JsZCJ9"
- ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
- example_secret = "secret"
+ 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ '.aeyJoZWxsbyI6ICJ3b3JsZCJ9'
+ '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ example_secret = 'secret'
self.assertRaises(
jwt.DecodeError,
@@ -247,10 +247,10 @@ class TestJWT(unittest.TestCase):
def test_decode_invalid_payload_string(self):
example_jwt = (
- "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- ".eyJoZWxsb-kiOiAid29ybGQifQ=="
- ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
- example_secret = "secret"
+ 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ '.eyJoZWxsb-kiOiAid29ybGQifQ=='
+ '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ example_secret = 'secret'
try:
jwt.load(example_jwt)
@@ -268,10 +268,10 @@ class TestJWT(unittest.TestCase):
def test_decode_invalid_crypto_padding(self):
example_jwt = (
- "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
- ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
- ".aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8")
- example_secret = "secret"
+ 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
+ '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
+ '.aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ example_secret = 'secret'
self.assertRaises(
jwt.DecodeError,