1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
""" JSON Web Token implementation
Minimum implementation based on this spec:
http://self-issued.info/docs/draft-jones-json-web-token-01.html
"""
import base64
import hashlib
import hmac
from time import time
from datetime import datetime
from calendar import timegm
try:
import json
except ImportError:
import simplejson as json
__all__ = ['encode', 'decode', 'DecodeError']
class DecodeError(Exception):
pass
class ExpiredSignature(Exception):
pass
signing_methods = {
'HS256': lambda msg, key: hmac.new(key, msg, hashlib.sha256).digest(),
'HS384': lambda msg, key: hmac.new(key, msg, hashlib.sha384).digest(),
'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest(),
}
def base64url_decode(input):
rem = len(input) % 4
if rem > 0:
input += '=' * (4 - rem)
return base64.urlsafe_b64decode(input)
def base64url_encode(input):
return base64.urlsafe_b64encode(input).replace('=', '')
def header(jwt):
header_segment = jwt.split('.', 1)[0]
try:
return json.loads(base64url_decode(header_segment))
except (ValueError, TypeError):
raise DecodeError("Invalid header encoding")
def encode(payload, key, algorithm='HS256'):
segments = []
# Header
header = {"typ": "JWT", "alg": algorithm}
segments.append(base64url_encode(json.dumps(header)))
# Payload
if isinstance(payload.get('exp'), datetime):
payload['exp'] = timegm(payload['exp'].utctimetuple())
segments.append(base64url_encode(json.dumps(payload)))
# Segments
signing_input = '.'.join(segments)
try:
if isinstance(key, unicode):
key = key.encode('utf-8')
signature = signing_methods[algorithm](signing_input, key)
except KeyError:
raise NotImplementedError("Algorithm not supported")
segments.append(base64url_encode(signature))
return '.'.join(segments)
def decode(jwt, key='', verify=True, verify_expiration=True, leeway=0):
try:
signing_input, crypto_segment = str(jwt).rsplit('.', 1)
header_segment, payload_segment = signing_input.split('.', 1)
except ValueError:
raise DecodeError("Not enough segments")
try:
header = json.loads(base64url_decode(header_segment))
except TypeError:
raise DecodeError("Invalid header padding")
except ValueError as e:
raise DecodeError("Invalid header string: %s" % e)
try:
payload = json.loads(base64url_decode(payload_segment))
except TypeError:
raise DecodeError("Invalid payload padding")
except ValueError as e:
raise DecodeError("Invalid payload string: %s" % e)
try:
signature = base64url_decode(crypto_segment)
except TypeError:
raise DecodeError("Invalid crypto padding")
if verify:
try:
if isinstance(key, unicode):
key = key.encode('utf-8')
if not signature == signing_methods[header['alg']](signing_input, key):
raise DecodeError("Signature verification failed")
except KeyError:
raise DecodeError("Algorithm not supported")
if 'exp' in payload and verify_expiration:
utc_timestamp = timegm(datetime.utcnow().utctimetuple())
if payload['exp'] < (utc_timestamp - leeway):
raise ExpiredSignature("Signature has expired")
return payload
|