summaryrefslogtreecommitdiff
path: root/jwt/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'jwt/__init__.py')
-rw-r--r--jwt/__init__.py49
1 files changed, 41 insertions, 8 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py
index 4e1d5d2..07e77fe 100644
--- a/jwt/__init__.py
+++ b/jwt/__init__.py
@@ -24,6 +24,7 @@ __all__ = ['encode', 'decode', 'DecodeError']
if sys.version_info >= (3, 0, 0):
unicode = str
+ basestring = str
class DecodeError(Exception):
@@ -46,11 +47,26 @@ verify_methods = {
'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest()
}
+def prepare_HS_key(key):
+ if isinstance(key, basestring):
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+ else:
+ raise TypeError("Expecting a string-formatted key.")
+ return key
+
+prepare_key_methods = {
+ 'HS256': prepare_HS_key,
+ 'HS384': prepare_HS_key,
+ 'HS512': prepare_HS_key
+}
+
try:
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from Crypto.Hash import SHA384
from Crypto.Hash import SHA512
+ from Crypto.PublicKey import RSA
signing_methods.update({
'RS256': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA256.new(msg)),
@@ -63,6 +79,24 @@ try:
'RS384': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA384.new(msg), sig),
'RS512': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA512.new(msg), sig)
})
+
+ def prepare_RS_key(key):
+ if isinstance(key, basestring):
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+ key = RSA.importKey(key)
+ elif isinstance(key, RSA._RSAobj):
+ pass
+ else:
+ raise TypeError("Expecting a PEM- or RSA-formatted key.")
+ return key
+
+ prepare_key_methods.update({
+ 'RS256': prepare_RS_key,
+ 'RS384': prepare_RS_key,
+ 'RS512': prepare_RS_key
+ })
+
except ImportError:
pass
@@ -115,20 +149,20 @@ def encode(payload, key, algorithm='HS256'):
# Header
header = {"typ": "JWT", "alg": algorithm}
- json_header = json.dumps(header).encode('utf-8')
+ json_header = json.dumps(header, separators=(',', ':')).encode('utf-8')
segments.append(base64url_encode(json_header))
# Payload
- if isinstance(payload.get('exp'), datetime):
- payload['exp'] = timegm(payload['exp'].utctimetuple())
- json_payload = json.dumps(payload).encode('utf-8')
+ for time_claim in ['exp', 'iat', 'nbf']: # convert datetime to a intDate value in known time-format claims
+ if isinstance(payload.get(time_claim), datetime):
+ payload[time_claim] = timegm(payload[time_claim].utctimetuple())
+ json_payload = json.dumps(payload, separators=(',', ':')).encode('utf-8')
segments.append(base64url_encode(json_payload))
# Segments
signing_input = b'.'.join(segments)
try:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
+ key = prepare_key_methods[algorithm](key)
signature = signing_methods[algorithm](signing_input, key)
except KeyError:
raise NotImplementedError("Algorithm not supported")
@@ -184,8 +218,7 @@ def load(jwt):
def verify_signature(payload, signing_input, header, signature, key='',
verify_expiration=True, leeway=0):
try:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
+ key = prepare_key_methods[header['alg']](key)
if header['alg'].startswith('HS'):
expected = verify_methods[header['alg']](signing_input, key)
if not constant_time_compare(signature, expected):