diff options
Diffstat (limited to 'jwt/__init__.py')
-rw-r--r-- | jwt/__init__.py | 49 |
1 files changed, 41 insertions, 8 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py index 4e1d5d2..07e77fe 100644 --- a/jwt/__init__.py +++ b/jwt/__init__.py @@ -24,6 +24,7 @@ __all__ = ['encode', 'decode', 'DecodeError'] if sys.version_info >= (3, 0, 0): unicode = str + basestring = str class DecodeError(Exception): @@ -46,11 +47,26 @@ verify_methods = { 'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest() } +def prepare_HS_key(key): + if isinstance(key, basestring): + if isinstance(key, unicode): + key = key.encode('utf-8') + else: + raise TypeError("Expecting a string-formatted key.") + return key + +prepare_key_methods = { + 'HS256': prepare_HS_key, + 'HS384': prepare_HS_key, + 'HS512': prepare_HS_key +} + try: from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 from Crypto.Hash import SHA384 from Crypto.Hash import SHA512 + from Crypto.PublicKey import RSA signing_methods.update({ 'RS256': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA256.new(msg)), @@ -63,6 +79,24 @@ try: 'RS384': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA384.new(msg), sig), 'RS512': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA512.new(msg), sig) }) + + def prepare_RS_key(key): + if isinstance(key, basestring): + if isinstance(key, unicode): + key = key.encode('utf-8') + key = RSA.importKey(key) + elif isinstance(key, RSA._RSAobj): + pass + else: + raise TypeError("Expecting a PEM- or RSA-formatted key.") + return key + + prepare_key_methods.update({ + 'RS256': prepare_RS_key, + 'RS384': prepare_RS_key, + 'RS512': prepare_RS_key + }) + except ImportError: pass @@ -115,20 +149,20 @@ def encode(payload, key, algorithm='HS256'): # Header header = {"typ": "JWT", "alg": algorithm} - json_header = json.dumps(header).encode('utf-8') + json_header = json.dumps(header, separators=(',', ':')).encode('utf-8') segments.append(base64url_encode(json_header)) # Payload - if isinstance(payload.get('exp'), datetime): - payload['exp'] = timegm(payload['exp'].utctimetuple()) - json_payload = json.dumps(payload).encode('utf-8') + for time_claim in ['exp', 'iat', 'nbf']: # convert datetime to a intDate value in known time-format claims + if isinstance(payload.get(time_claim), datetime): + payload[time_claim] = timegm(payload[time_claim].utctimetuple()) + json_payload = json.dumps(payload, separators=(',', ':')).encode('utf-8') segments.append(base64url_encode(json_payload)) # Segments signing_input = b'.'.join(segments) try: - if isinstance(key, unicode): - key = key.encode('utf-8') + key = prepare_key_methods[algorithm](key) signature = signing_methods[algorithm](signing_input, key) except KeyError: raise NotImplementedError("Algorithm not supported") @@ -184,8 +218,7 @@ def load(jwt): def verify_signature(payload, signing_input, header, signature, key='', verify_expiration=True, leeway=0): try: - if isinstance(key, unicode): - key = key.encode('utf-8') + key = prepare_key_methods[header['alg']](key) if header['alg'].startswith('HS'): expected = verify_methods[header['alg']](signing_input, key) if not constant_time_compare(signature, expected): |