import json import time from calendar import timegm from datetime import datetime, timedelta from decimal import Decimal from jwt.api_jwt import PyJWT from jwt.exceptions import ( DecodeError, ExpiredSignatureError, ImmatureSignatureError, InvalidAudienceError, InvalidIssuedAtError, InvalidIssuerError, MissingRequiredClaimError ) import pytest from .test_api_jws import has_crypto from .utils import utc_timestamp @pytest.fixture def jwt(): return PyJWT() @pytest.fixture def payload(): """ Creates a sample JWT claimset for use as a payload during tests """ return { 'iss': 'jeff', 'exp': utc_timestamp() + 15, 'claim': 'insanity' } class TestJWT: def test_decodes_valid_jwt(self, jwt): example_payload = {'hello': 'world'} example_secret = 'secret' example_jwt = ( b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') decoded_payload = jwt.decode(example_jwt, example_secret) assert decoded_payload == example_payload def test_load_verify_valid_jwt(self, jwt): example_payload = {'hello': 'world'} example_secret = 'secret' example_jwt = ( b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') decoded_payload = jwt.decode(example_jwt, key=example_secret) assert decoded_payload == example_payload def test_decode_invalid_payload_string(self, jwt): example_jwt = ( 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.aGVsb' 'G8gd29ybGQ.SIr03zM64awWRdPrAM_61QWsZchAtgDV' '3pphfHPPWkI') example_secret = 'secret' with pytest.raises(DecodeError) as exc: jwt.decode(example_jwt, example_secret) assert 'Invalid payload string' in str(exc.value) def test_decode_with_non_mapping_payload_throws_exception(self, jwt): secret = 'secret' example_jwt = ('eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.' 'MQ.' # == 1 'AbcSR3DWum91KOgfKxUHm78rLs_DrrZ1CrDgpUFFzls') with pytest.raises(DecodeError) as context: jwt.decode(example_jwt, secret) exception = context.value assert str(exception) == 'Invalid payload string: must be a json object' def test_decode_with_invalid_audience_param_throws_exception(self, jwt): secret = 'secret' example_jwt = ('eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' '.eyJoZWxsbyI6ICJ3b3JsZCJ9' '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') with pytest.raises(TypeError) as context: jwt.decode(example_jwt, secret, audience=1) exception = context.value assert str(exception) == 'audience must be a string or None' def test_decode_with_nonlist_aud_claim_throws_exception(self, jwt): secret = 'secret' example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9' '.eyJoZWxsbyI6IndvcmxkIiwiYXVkIjoxfQ' # aud = 1 '.Rof08LBSwbm8Z_bhA2N3DFY-utZR1Gi9rbIS5Zthnnc') with pytest.raises(InvalidAudienceError) as context: jwt.decode(example_jwt, secret, audience='my_audience') exception = context.value assert str(exception) == 'Invalid claim format in token' def test_decode_with_invalid_aud_list_member_throws_exception(self, jwt): secret = 'secret' example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9' '.eyJoZWxsbyI6IndvcmxkIiwiYXVkIjpbMV19' '.iQgKpJ8shetwNMIosNXWBPFB057c2BHs-8t1d2CCM2A') with pytest.raises(InvalidAudienceError) as context: jwt.decode(example_jwt, secret, audience='my_audience') exception = context.value assert str(exception) == 'Invalid claim format in token' def test_encode_bad_type(self, jwt): types = ['string', tuple(), list(), 42, set()] for t in types: pytest.raises(TypeError, lambda: jwt.encode(t, 'secret')) def test_decode_raises_exception_if_exp_is_not_int(self, jwt): # >>> jwt.encode({'exp': 'not-an-int'}, 'secret') example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' 'eyJleHAiOiJub3QtYW4taW50In0.' 'P65iYgoHtBqB07PMtBSuKNUEIPPPfmjfJG217cEE66s') with pytest.raises(DecodeError) as exc: jwt.decode(example_jwt, 'secret') assert 'exp' in str(exc.value) def test_decode_raises_exception_if_iat_is_not_int(self, jwt): # >>> jwt.encode({'iat': 'not-an-int'}, 'secret') example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' 'eyJpYXQiOiJub3QtYW4taW50In0.' 'H1GmcQgSySa5LOKYbzGm--b1OmRbHFkyk8pq811FzZM') with pytest.raises(DecodeError): jwt.decode(example_jwt, 'secret') def test_decode_raises_exception_if_nbf_is_not_int(self, jwt): # >>> jwt.encode({'nbf': 'not-an-int'}, 'secret') example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' 'eyJuYmYiOiJub3QtYW4taW50In0.' 'c25hldC8G2ZamC8uKpax9sYMTgdZo3cxrmzFHaAAluw') with pytest.raises(DecodeError): jwt.decode(example_jwt, 'secret') def test_decode_raises_exception_if_iat_in_the_future(self, jwt): now = datetime.utcnow() token = jwt.encode({'iat': now + timedelta(days=1)}, key='secret') with pytest.raises(InvalidIssuedAtError): jwt.decode(token, 'secret') def test_encode_datetime(self, jwt): secret = 'secret' current_datetime = datetime.utcnow() payload = { 'exp': current_datetime, 'iat': current_datetime, 'nbf': current_datetime } jwt_message = jwt.encode(payload, secret) decoded_payload = jwt.decode(jwt_message, secret, leeway=1) assert (decoded_payload['exp'] == timegm(current_datetime.utctimetuple())) assert (decoded_payload['iat'] == timegm(current_datetime.utctimetuple())) assert (decoded_payload['nbf'] == timegm(current_datetime.utctimetuple())) # 'Control' Elliptic Curve JWT created by another library. # Used to test for regressions that could affect both # encoding / decoding operations equally (causing tests # to still pass). @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") def test_decodes_valid_es384_jwt(self, jwt): example_payload = {'hello': 'world'} with open('tests/keys/testkey_ec.pub', 'r') as fp: example_pubkey = fp.read() example_jwt = ( b'eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9' b'.eyJoZWxsbyI6IndvcmxkIn0' b'.AddMgkmRhzqptDYqlmy_f2dzM6O9YZmVo-txs_CeAJD' b'NoD8LN7YiPeLmtIhkO5_VZeHHKvtQcGc4lsq-Y72c4dK' b'pANr1f6HEYhjpBc03u_bv06PYMcr5N2-9k97-qf-JCSb' b'zqW6R250Q7gNCX5R7NrCl7MTM4DTBZkGbUlqsFUleiGlj') decoded_payload = jwt.decode(example_jwt, example_pubkey) assert decoded_payload == example_payload # 'Control' RSA JWT created by another library. # Used to test for regressions that could affect both # encoding / decoding operations equally (causing tests # to still pass). @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") def test_decodes_valid_rs384_jwt(self, jwt): example_payload = {'hello': 'world'} with open('tests/keys/testkey_rsa.pub', 'r') as fp: example_pubkey = fp.read() example_jwt = ( b'eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9' b'.eyJoZWxsbyI6IndvcmxkIn0' b'.yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X' b'lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju' b'O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457' b'W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx' b'mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ' b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t' b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr' b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A') decoded_payload = jwt.decode(example_jwt, example_pubkey) assert decoded_payload == example_payload def test_decode_with_expiration(self, jwt, payload): payload['exp'] = utc_timestamp() - 1 secret = 'secret' jwt_message = jwt.encode(payload, secret) with pytest.raises(ExpiredSignatureError): jwt.decode(jwt_message, secret) def test_decode_with_notbefore(self, jwt, payload): payload['nbf'] = utc_timestamp() + 10 secret = 'secret' jwt_message = jwt.encode(payload, secret) with pytest.raises(ImmatureSignatureError): jwt.decode(jwt_message, secret) def test_decode_skip_expiration_verification(self, jwt, payload): payload['exp'] = time.time() - 1 secret = 'secret' jwt_message = jwt.encode(payload, secret) jwt.decode(jwt_message, secret, options={'verify_exp': False}) def test_decode_skip_notbefore_verification(self, jwt, payload): payload['nbf'] = time.time() + 10 secret = 'secret' jwt_message = jwt.encode(payload, secret) jwt.decode(jwt_message, secret, options={'verify_nbf': False}) def test_decode_with_expiration_with_leeway(self, jwt, payload): payload['exp'] = utc_timestamp() - 2 secret = 'secret' jwt_message = jwt.encode(payload, secret) decoded_payload, signing, header, signature = jwt._load(jwt_message) # With 3 seconds leeway, should be ok for leeway in (3, timedelta(seconds=3)): jwt.decode(jwt_message, secret, leeway=leeway) # With 1 seconds, should fail for leeway in (1, timedelta(seconds=1)): with pytest.raises(ExpiredSignatureError): jwt.decode(jwt_message, secret, leeway=leeway) def test_decode_with_notbefore_with_leeway(self, jwt, payload): payload['nbf'] = utc_timestamp() + 10 secret = 'secret' jwt_message = jwt.encode(payload, secret) # With 13 seconds leeway, should be ok jwt.decode(jwt_message, secret, leeway=13) with pytest.raises(ImmatureSignatureError): jwt.decode(jwt_message, secret, leeway=1) def test_check_audience_when_valid(self, jwt): payload = { 'some': 'payload', 'aud': 'urn:me' } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', audience='urn:me') def test_check_audience_in_array_when_valid(self, jwt): payload = { 'some': 'payload', 'aud': ['urn:me', 'urn:someone-else'] } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', audience='urn:me') def test_raise_exception_invalid_audience(self, jwt): payload = { 'some': 'payload', 'aud': 'urn:someone-else' } token = jwt.encode(payload, 'secret') with pytest.raises(InvalidAudienceError): jwt.decode(token, 'secret', audience='urn-me') def test_raise_exception_invalid_audience_in_array(self, jwt): payload = { 'some': 'payload', 'aud': ['urn:someone', 'urn:someone-else'] } token = jwt.encode(payload, 'secret') with pytest.raises(InvalidAudienceError): jwt.decode(token, 'secret', audience='urn:me') def test_raise_exception_token_without_issuer(self, jwt): issuer = 'urn:wrong' payload = { 'some': 'payload' } token = jwt.encode(payload, 'secret') with pytest.raises(MissingRequiredClaimError) as exc: jwt.decode(token, 'secret', issuer=issuer) assert exc.value.claim == 'iss' def test_raise_exception_token_without_audience(self, jwt): payload = { 'some': 'payload', } token = jwt.encode(payload, 'secret') with pytest.raises(MissingRequiredClaimError) as exc: jwt.decode(token, 'secret', audience='urn:me') assert exc.value.claim == 'aud' def test_check_issuer_when_valid(self, jwt): issuer = 'urn:foo' payload = { 'some': 'payload', 'iss': 'urn:foo' } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', issuer=issuer) def test_raise_exception_invalid_issuer(self, jwt): issuer = 'urn:wrong' payload = { 'some': 'payload', 'iss': 'urn:foo' } token = jwt.encode(payload, 'secret') with pytest.raises(InvalidIssuerError): jwt.decode(token, 'secret', issuer=issuer) def test_skip_check_audience(self, jwt): payload = { 'some': 'payload', 'aud': 'urn:me', } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', options={'verify_aud': False}) def test_skip_check_exp(self, jwt): payload = { 'some': 'payload', 'exp': datetime.utcnow() - timedelta(days=1) } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', options={'verify_exp': False}) def test_decode_should_raise_error_if_exp_required_but_not_present(self, jwt): payload = { 'some': 'payload', # exp not present } token = jwt.encode(payload, 'secret') with pytest.raises(MissingRequiredClaimError) as exc: jwt.decode(token, 'secret', options={'require_exp': True}) assert exc.value.claim == 'exp' def test_decode_should_raise_error_if_iat_required_but_not_present(self, jwt): payload = { 'some': 'payload', # iat not present } token = jwt.encode(payload, 'secret') with pytest.raises(MissingRequiredClaimError) as exc: jwt.decode(token, 'secret', options={'require_iat': True}) assert exc.value.claim == 'iat' def test_decode_should_raise_error_if_nbf_required_but_not_present(self, jwt): payload = { 'some': 'payload', # nbf not present } token = jwt.encode(payload, 'secret') with pytest.raises(MissingRequiredClaimError) as exc: jwt.decode(token, 'secret', options={'require_nbf': True}) assert exc.value.claim == 'nbf' def test_skip_check_signature(self, jwt): token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" ".eyJzb21lIjoicGF5bG9hZCJ9" ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA") jwt.decode(token, 'secret', options={'verify_signature': False}) def test_skip_check_iat(self, jwt): payload = { 'some': 'payload', 'iat': datetime.utcnow() + timedelta(days=1) } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', options={'verify_iat': False}) def test_skip_check_nbf(self, jwt): payload = { 'some': 'payload', 'nbf': datetime.utcnow() + timedelta(days=1) } token = jwt.encode(payload, 'secret') jwt.decode(token, 'secret', options={'verify_nbf': False}) def test_custom_json_encoder(self, jwt): class CustomJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Decimal): return 'it worked' return super(CustomJSONEncoder, self).default(o) data = { 'some_decimal': Decimal('2.2') } with pytest.raises(TypeError): jwt.encode(data, 'secret') token = jwt.encode(data, 'secret', json_encoder=CustomJSONEncoder) payload = jwt.decode(token, 'secret') assert payload == {'some_decimal': 'it worked'} def test_decode_with_verify_expiration_kwarg(self, jwt, payload): payload['exp'] = utc_timestamp() - 1 secret = 'secret' jwt_message = jwt.encode(payload, secret) pytest.deprecated_call( jwt.decode, jwt_message, secret, verify_expiration=False ) with pytest.raises(ExpiredSignatureError): pytest.deprecated_call( jwt.decode, jwt_message, secret, verify_expiration=True )