diff options
author | José Padilla <jpadilla@webapplicate.com> | 2014-03-17 12:33:55 -0400 |
---|---|---|
committer | José Padilla <jpadilla@webapplicate.com> | 2014-03-17 12:33:55 -0400 |
commit | 0894d64c648266c289cb9123a1787be388f86a85 (patch) | |
tree | 6b248fbcb6b0431144dae60d7d78e397ff745469 | |
parent | d9a121d0c35926bdce8939390d55dd94d91a2d2d (diff) | |
parent | 66daa8265eae5e7b0b1431024439578bb747b698 (diff) | |
download | pyjwt-0894d64c648266c289cb9123a1787be388f86a85.tar.gz |
Merge pull request #29 from gullpong/change
Added some convenience features
-rw-r--r-- | jwt/__init__.py | 49 | ||||
-rw-r--r-- | tests/test_jwt.py | 68 |
2 files changed, 107 insertions, 10 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py index 4e1d5d2..07e77fe 100644 --- a/jwt/__init__.py +++ b/jwt/__init__.py @@ -24,6 +24,7 @@ __all__ = ['encode', 'decode', 'DecodeError'] if sys.version_info >= (3, 0, 0): unicode = str + basestring = str class DecodeError(Exception): @@ -46,11 +47,26 @@ verify_methods = { 'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest() } +def prepare_HS_key(key): + if isinstance(key, basestring): + if isinstance(key, unicode): + key = key.encode('utf-8') + else: + raise TypeError("Expecting a string-formatted key.") + return key + +prepare_key_methods = { + 'HS256': prepare_HS_key, + 'HS384': prepare_HS_key, + 'HS512': prepare_HS_key +} + try: from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 from Crypto.Hash import SHA384 from Crypto.Hash import SHA512 + from Crypto.PublicKey import RSA signing_methods.update({ 'RS256': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA256.new(msg)), @@ -63,6 +79,24 @@ try: 'RS384': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA384.new(msg), sig), 'RS512': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA512.new(msg), sig) }) + + def prepare_RS_key(key): + if isinstance(key, basestring): + if isinstance(key, unicode): + key = key.encode('utf-8') + key = RSA.importKey(key) + elif isinstance(key, RSA._RSAobj): + pass + else: + raise TypeError("Expecting a PEM- or RSA-formatted key.") + return key + + prepare_key_methods.update({ + 'RS256': prepare_RS_key, + 'RS384': prepare_RS_key, + 'RS512': prepare_RS_key + }) + except ImportError: pass @@ -115,20 +149,20 @@ def encode(payload, key, algorithm='HS256'): # Header header = {"typ": "JWT", "alg": algorithm} - json_header = json.dumps(header).encode('utf-8') + json_header = json.dumps(header, separators=(',', ':')).encode('utf-8') segments.append(base64url_encode(json_header)) # Payload - if isinstance(payload.get('exp'), datetime): - payload['exp'] = timegm(payload['exp'].utctimetuple()) - json_payload = json.dumps(payload).encode('utf-8') + for time_claim in ['exp', 'iat', 'nbf']: # convert datetime to a intDate value in known time-format claims + if isinstance(payload.get(time_claim), datetime): + payload[time_claim] = timegm(payload[time_claim].utctimetuple()) + json_payload = json.dumps(payload, separators=(',', ':')).encode('utf-8') segments.append(base64url_encode(json_payload)) # Segments signing_input = b'.'.join(segments) try: - if isinstance(key, unicode): - key = key.encode('utf-8') + key = prepare_key_methods[algorithm](key) signature = signing_methods[algorithm](signing_input, key) except KeyError: raise NotImplementedError("Algorithm not supported") @@ -184,8 +218,7 @@ def load(jwt): def verify_signature(payload, signing_input, header, signature, key='', verify_expiration=True, leeway=0): try: - if isinstance(key, unicode): - key = key.encode('utf-8') + key = prepare_key_methods[header['alg']](key) if header['alg'].startswith('HS'): expected = verify_methods[header['alg']](signing_input, key) if not constant_time_compare(signature, expected): diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 0ee5422..5dd7374 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -35,16 +35,26 @@ class TestJWT(unittest.TestCase): for t in types: self.assertRaises(TypeError, lambda: jwt.encode(t, 'secret')) - def test_encode_expiration_datetime(self): + def test_encode_datetime(self): secret = "secret" current_datetime = datetime.utcnow() - payload = {"exp": current_datetime} + payload = { + "exp": current_datetime, + "iat": current_datetime, + "nbf": current_datetime + } jwt_message = jwt.encode(payload, secret) decoded_payload = jwt.decode(jwt_message, secret, leeway=1) self.assertEqual( decoded_payload['exp'], timegm(current_datetime.utctimetuple())) + self.assertEqual( + decoded_payload['iat'], + timegm(current_datetime.utctimetuple())) + self.assertEqual( + decoded_payload['nbf'], + timegm(current_datetime.utctimetuple())) def test_bad_secret(self): right_secret = 'foo' @@ -301,6 +311,7 @@ class TestJWT(unittest.TestCase): try: from Crypto.PublicKey import RSA + # RSA-formatted key with open('tests/testkey', 'r') as rsa_priv_file: priv_rsakey = RSA.importKey(rsa_priv_file.read()) jwt_message = jwt.encode(self.payload, priv_rsakey, @@ -312,6 +323,20 @@ class TestJWT(unittest.TestCase): load_output = jwt.load(jwt_message) jwt.verify_signature(key=pub_rsakey, *load_output) + + # string-formatted key + with open('tests/testkey', 'r') as rsa_priv_file: + priv_rsakey = rsa_priv_file.read() + jwt_message = jwt.encode(self.payload, priv_rsakey, + algorithm='RS256') + + with open('tests/testkey.pub', 'r') as rsa_pub_file: + pub_rsakey = rsa_pub_file.read() + assert jwt.decode(jwt_message, pub_rsakey) + + load_output = jwt.load(jwt_message) + jwt.verify_signature(key=pub_rsakey, *load_output) + except ImportError: pass @@ -319,6 +344,7 @@ class TestJWT(unittest.TestCase): try: from Crypto.PublicKey import RSA + # RSA-formatted key with open('tests/testkey', 'r') as rsa_priv_file: priv_rsakey = RSA.importKey(rsa_priv_file.read()) jwt_message = jwt.encode(self.payload, priv_rsakey, @@ -330,6 +356,19 @@ class TestJWT(unittest.TestCase): load_output = jwt.load(jwt_message) jwt.verify_signature(key=pub_rsakey, *load_output) + + # string-formatted key + with open('tests/testkey', 'r') as rsa_priv_file: + priv_rsakey = rsa_priv_file.read() + jwt_message = jwt.encode(self.payload, priv_rsakey, + algorithm='RS384') + + with open('tests/testkey.pub', 'r') as rsa_pub_file: + pub_rsakey = rsa_pub_file.read() + assert jwt.decode(jwt_message, pub_rsakey) + + load_output = jwt.load(jwt_message) + jwt.verify_signature(key=pub_rsakey, *load_output) except ImportError: pass @@ -337,6 +376,7 @@ class TestJWT(unittest.TestCase): try: from Crypto.PublicKey import RSA + # RSA-formatted key with open('tests/testkey', 'r') as rsa_priv_file: priv_rsakey = RSA.importKey(rsa_priv_file.read()) jwt_message = jwt.encode(self.payload, priv_rsakey, @@ -348,6 +388,19 @@ class TestJWT(unittest.TestCase): load_output = jwt.load(jwt_message) jwt.verify_signature(key=pub_rsakey, *load_output) + + # string-formatted key + with open('tests/testkey', 'r') as rsa_priv_file: + priv_rsakey = rsa_priv_file.read() + jwt_message = jwt.encode(self.payload, priv_rsakey, + algorithm='RS512') + + with open('tests/testkey.pub', 'r') as rsa_pub_file: + pub_rsakey = rsa_pub_file.read() + assert jwt.decode(jwt_message, pub_rsakey) + + load_output = jwt.load(jwt_message) + jwt.verify_signature(key=pub_rsakey, *load_output) except ImportError: pass @@ -373,6 +426,17 @@ class TestJWT(unittest.TestCase): self.assertFalse('RS384' in jwt.verify_methods) self.assertFalse('RS512' in jwt.verify_methods) + def test_crypto_related_key_preparation_methods(self): + try: + import Crypto + self.assertTrue('RS256' in jwt.prepare_key_methods) + self.assertTrue('RS384' in jwt.prepare_key_methods) + self.assertTrue('RS512' in jwt.prepare_key_methods) + except ImportError: + self.assertFalse('RS256' in jwt.prepare_key_methods) + self.assertFalse('RS384' in jwt.prepare_key_methods) + self.assertFalse('RS512' in jwt.prepare_key_methods) + if __name__ == '__main__': unittest.main() |