summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosé Padilla <jpadilla@webapplicate.com>2014-03-17 12:33:55 -0400
committerJosé Padilla <jpadilla@webapplicate.com>2014-03-17 12:33:55 -0400
commit0894d64c648266c289cb9123a1787be388f86a85 (patch)
tree6b248fbcb6b0431144dae60d7d78e397ff745469
parentd9a121d0c35926bdce8939390d55dd94d91a2d2d (diff)
parent66daa8265eae5e7b0b1431024439578bb747b698 (diff)
downloadpyjwt-0894d64c648266c289cb9123a1787be388f86a85.tar.gz
Merge pull request #29 from gullpong/change
Added some convenience features
-rw-r--r--jwt/__init__.py49
-rw-r--r--tests/test_jwt.py68
2 files changed, 107 insertions, 10 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py
index 4e1d5d2..07e77fe 100644
--- a/jwt/__init__.py
+++ b/jwt/__init__.py
@@ -24,6 +24,7 @@ __all__ = ['encode', 'decode', 'DecodeError']
if sys.version_info >= (3, 0, 0):
unicode = str
+ basestring = str
class DecodeError(Exception):
@@ -46,11 +47,26 @@ verify_methods = {
'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest()
}
+def prepare_HS_key(key):
+ if isinstance(key, basestring):
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+ else:
+ raise TypeError("Expecting a string-formatted key.")
+ return key
+
+prepare_key_methods = {
+ 'HS256': prepare_HS_key,
+ 'HS384': prepare_HS_key,
+ 'HS512': prepare_HS_key
+}
+
try:
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from Crypto.Hash import SHA384
from Crypto.Hash import SHA512
+ from Crypto.PublicKey import RSA
signing_methods.update({
'RS256': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA256.new(msg)),
@@ -63,6 +79,24 @@ try:
'RS384': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA384.new(msg), sig),
'RS512': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA512.new(msg), sig)
})
+
+ def prepare_RS_key(key):
+ if isinstance(key, basestring):
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+ key = RSA.importKey(key)
+ elif isinstance(key, RSA._RSAobj):
+ pass
+ else:
+ raise TypeError("Expecting a PEM- or RSA-formatted key.")
+ return key
+
+ prepare_key_methods.update({
+ 'RS256': prepare_RS_key,
+ 'RS384': prepare_RS_key,
+ 'RS512': prepare_RS_key
+ })
+
except ImportError:
pass
@@ -115,20 +149,20 @@ def encode(payload, key, algorithm='HS256'):
# Header
header = {"typ": "JWT", "alg": algorithm}
- json_header = json.dumps(header).encode('utf-8')
+ json_header = json.dumps(header, separators=(',', ':')).encode('utf-8')
segments.append(base64url_encode(json_header))
# Payload
- if isinstance(payload.get('exp'), datetime):
- payload['exp'] = timegm(payload['exp'].utctimetuple())
- json_payload = json.dumps(payload).encode('utf-8')
+ for time_claim in ['exp', 'iat', 'nbf']: # convert datetime to a intDate value in known time-format claims
+ if isinstance(payload.get(time_claim), datetime):
+ payload[time_claim] = timegm(payload[time_claim].utctimetuple())
+ json_payload = json.dumps(payload, separators=(',', ':')).encode('utf-8')
segments.append(base64url_encode(json_payload))
# Segments
signing_input = b'.'.join(segments)
try:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
+ key = prepare_key_methods[algorithm](key)
signature = signing_methods[algorithm](signing_input, key)
except KeyError:
raise NotImplementedError("Algorithm not supported")
@@ -184,8 +218,7 @@ def load(jwt):
def verify_signature(payload, signing_input, header, signature, key='',
verify_expiration=True, leeway=0):
try:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
+ key = prepare_key_methods[header['alg']](key)
if header['alg'].startswith('HS'):
expected = verify_methods[header['alg']](signing_input, key)
if not constant_time_compare(signature, expected):
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 0ee5422..5dd7374 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -35,16 +35,26 @@ class TestJWT(unittest.TestCase):
for t in types:
self.assertRaises(TypeError, lambda: jwt.encode(t, 'secret'))
- def test_encode_expiration_datetime(self):
+ def test_encode_datetime(self):
secret = "secret"
current_datetime = datetime.utcnow()
- payload = {"exp": current_datetime}
+ payload = {
+ "exp": current_datetime,
+ "iat": current_datetime,
+ "nbf": current_datetime
+ }
jwt_message = jwt.encode(payload, secret)
decoded_payload = jwt.decode(jwt_message, secret, leeway=1)
self.assertEqual(
decoded_payload['exp'],
timegm(current_datetime.utctimetuple()))
+ self.assertEqual(
+ decoded_payload['iat'],
+ timegm(current_datetime.utctimetuple()))
+ self.assertEqual(
+ decoded_payload['nbf'],
+ timegm(current_datetime.utctimetuple()))
def test_bad_secret(self):
right_secret = 'foo'
@@ -301,6 +311,7 @@ class TestJWT(unittest.TestCase):
try:
from Crypto.PublicKey import RSA
+ # RSA-formatted key
with open('tests/testkey', 'r') as rsa_priv_file:
priv_rsakey = RSA.importKey(rsa_priv_file.read())
jwt_message = jwt.encode(self.payload, priv_rsakey,
@@ -312,6 +323,20 @@ class TestJWT(unittest.TestCase):
load_output = jwt.load(jwt_message)
jwt.verify_signature(key=pub_rsakey, *load_output)
+
+ # string-formatted key
+ with open('tests/testkey', 'r') as rsa_priv_file:
+ priv_rsakey = rsa_priv_file.read()
+ jwt_message = jwt.encode(self.payload, priv_rsakey,
+ algorithm='RS256')
+
+ with open('tests/testkey.pub', 'r') as rsa_pub_file:
+ pub_rsakey = rsa_pub_file.read()
+ assert jwt.decode(jwt_message, pub_rsakey)
+
+ load_output = jwt.load(jwt_message)
+ jwt.verify_signature(key=pub_rsakey, *load_output)
+
except ImportError:
pass
@@ -319,6 +344,7 @@ class TestJWT(unittest.TestCase):
try:
from Crypto.PublicKey import RSA
+ # RSA-formatted key
with open('tests/testkey', 'r') as rsa_priv_file:
priv_rsakey = RSA.importKey(rsa_priv_file.read())
jwt_message = jwt.encode(self.payload, priv_rsakey,
@@ -330,6 +356,19 @@ class TestJWT(unittest.TestCase):
load_output = jwt.load(jwt_message)
jwt.verify_signature(key=pub_rsakey, *load_output)
+
+ # string-formatted key
+ with open('tests/testkey', 'r') as rsa_priv_file:
+ priv_rsakey = rsa_priv_file.read()
+ jwt_message = jwt.encode(self.payload, priv_rsakey,
+ algorithm='RS384')
+
+ with open('tests/testkey.pub', 'r') as rsa_pub_file:
+ pub_rsakey = rsa_pub_file.read()
+ assert jwt.decode(jwt_message, pub_rsakey)
+
+ load_output = jwt.load(jwt_message)
+ jwt.verify_signature(key=pub_rsakey, *load_output)
except ImportError:
pass
@@ -337,6 +376,7 @@ class TestJWT(unittest.TestCase):
try:
from Crypto.PublicKey import RSA
+ # RSA-formatted key
with open('tests/testkey', 'r') as rsa_priv_file:
priv_rsakey = RSA.importKey(rsa_priv_file.read())
jwt_message = jwt.encode(self.payload, priv_rsakey,
@@ -348,6 +388,19 @@ class TestJWT(unittest.TestCase):
load_output = jwt.load(jwt_message)
jwt.verify_signature(key=pub_rsakey, *load_output)
+
+ # string-formatted key
+ with open('tests/testkey', 'r') as rsa_priv_file:
+ priv_rsakey = rsa_priv_file.read()
+ jwt_message = jwt.encode(self.payload, priv_rsakey,
+ algorithm='RS512')
+
+ with open('tests/testkey.pub', 'r') as rsa_pub_file:
+ pub_rsakey = rsa_pub_file.read()
+ assert jwt.decode(jwt_message, pub_rsakey)
+
+ load_output = jwt.load(jwt_message)
+ jwt.verify_signature(key=pub_rsakey, *load_output)
except ImportError:
pass
@@ -373,6 +426,17 @@ class TestJWT(unittest.TestCase):
self.assertFalse('RS384' in jwt.verify_methods)
self.assertFalse('RS512' in jwt.verify_methods)
+ def test_crypto_related_key_preparation_methods(self):
+ try:
+ import Crypto
+ self.assertTrue('RS256' in jwt.prepare_key_methods)
+ self.assertTrue('RS384' in jwt.prepare_key_methods)
+ self.assertTrue('RS512' in jwt.prepare_key_methods)
+ except ImportError:
+ self.assertFalse('RS256' in jwt.prepare_key_methods)
+ self.assertFalse('RS384' in jwt.prepare_key_methods)
+ self.assertFalse('RS512' in jwt.prepare_key_methods)
+
if __name__ == '__main__':
unittest.main()