summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jwt/__init__.py59
-rw-r--r--tests/test_jwt.py76
-rw-r--r--tox.ini2
3 files changed, 84 insertions, 53 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py
index f28c067..195aeee 100644
--- a/jwt/__init__.py
+++ b/jwt/__init__.py
@@ -76,35 +76,66 @@ prepare_key_methods = {
}
try:
- from Crypto.Signature import PKCS1_v1_5
- from Crypto.Hash import SHA256
- from Crypto.Hash import SHA384
- from Crypto.Hash import SHA512
- from Crypto.PublicKey import RSA
+
+ from cryptography.hazmat.primitives import interfaces, hashes
+ from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key, load_ssh_public_key
+ from cryptography.hazmat.primitives.asymmetric import rsa, padding
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.exceptions import InvalidSignature
+
+ def sign_rsa(msg, key, hashalg):
+ signer = key.signer(
+ padding.PKCS1v15(),
+ hashalg
+ )
+
+ signer.update(msg)
+ return signer.finalize()
+
+ def verify_rsa(msg, key, hashalg, sig):
+ verifier = key.verifier(
+ sig,
+ padding.PKCS1v15(),
+ hashalg
+ )
+
+ verifier.update(msg)
+
+ try:
+ verifier.verify()
+ return True
+ except InvalidSignature:
+ return False
signing_methods.update({
- 'RS256': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA256.new(msg)),
- 'RS384': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA384.new(msg)),
- 'RS512': lambda msg, key: PKCS1_v1_5.new(key).sign(SHA512.new(msg))
+ 'RS256': lambda msg, key: sign_rsa(msg, key, hashes.SHA256()),
+ 'RS384': lambda msg, key: sign_rsa(msg, key, hashes.SHA384()),
+ 'RS512': lambda msg, key: sign_rsa(msg, key, hashes.SHA512())
})
verify_methods.update({
- 'RS256': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA256.new(msg), sig),
- 'RS384': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA384.new(msg), sig),
- 'RS512': lambda msg, key, sig: PKCS1_v1_5.new(key).verify(SHA512.new(msg), sig)
+ 'RS256': lambda msg, key, sig: verify_rsa(msg, key, hashes.SHA256(), sig),
+ 'RS384': lambda msg, key, sig: verify_rsa(msg, key, hashes.SHA384(), sig),
+ 'RS512': lambda msg, key, sig: verify_rsa(msg, key, hashes.SHA512(), sig)
})
def prepare_RS_key(key):
- if isinstance(key, RSA._RSAobj):
+ if isinstance(key, interfaces.RSAPrivateKey) or isinstance(key, interfaces.RSAPublicKey):
return key
if isinstance(key, basestring):
if isinstance(key, unicode):
key = key.encode('utf-8')
- key = RSA.importKey(key)
+ try:
+ if key.startswith(b'ssh-rsa'):
+ key = load_ssh_public_key(key, backend=default_backend())
+ else:
+ key = load_pem_private_key(key, password=None, backend=default_backend())
+ except ValueError:
+ key = load_pem_public_key(key, backend=default_backend())
else:
- raise TypeError('Expecting a PEM- or RSA-formatted key.')
+ raise TypeError('Expecting a PEM-formatted key.')
return key
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 6b285f8..f63a62a 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -19,7 +19,8 @@ if sys.version_info >= (3, 0, 0):
unicode = str
try:
- from Crypto.PublicKey import RSA
+ from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key, load_ssh_public_key
+ from cryptography.hazmat.backends import default_backend
has_rsa = True
except ImportError:
has_rsa = False
@@ -35,6 +36,13 @@ def utc_timestamp():
return timegm(datetime.utcnow().utctimetuple())
+def ensure_bytes(key):
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+
+ return key
+
+
class TestJWT(unittest.TestCase):
def setUp(self):
@@ -457,12 +465,12 @@ class TestJWT(unittest.TestCase):
def test_encode_decode_with_rsa_sha256(self):
# PEM-formatted RSA key
with open('tests/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = RSA.importKey(rsa_priv_file.read())
+ priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()), password=None, backend=default_backend())
jwt_message = jwt.encode(self.payload, priv_rsakey,
algorithm='RS256')
with open('tests/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = RSA.importKey(rsa_pub_file.read())
+ pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), backend=default_backend())
assert jwt.decode(jwt_message, pub_rsakey)
load_output = jwt.load(jwt_message)
@@ -485,17 +493,14 @@ class TestJWT(unittest.TestCase):
def test_encode_decode_with_rsa_sha384(self):
# PEM-formatted RSA key
with open('tests/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = RSA.importKey(rsa_priv_file.read())
+ priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()), password=None, backend=default_backend())
jwt_message = jwt.encode(self.payload, priv_rsakey,
algorithm='RS384')
with open('tests/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = RSA.importKey(rsa_pub_file.read())
+ pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), backend=default_backend())
assert jwt.decode(jwt_message, pub_rsakey)
- load_output = jwt.load(jwt_message)
- jwt.verify_signature(key=pub_rsakey, *load_output)
-
# string-formatted key
with open('tests/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
@@ -511,36 +516,31 @@ class TestJWT(unittest.TestCase):
@unittest.skipIf(not has_rsa, 'Not supported without crypto library')
def test_encode_decode_with_rsa_sha512(self):
- try:
- from Crypto.PublicKey import RSA
-
- # PEM-formatted RSA key
- with open('tests/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = RSA.importKey(rsa_priv_file.read())
- jwt_message = jwt.encode(self.payload, priv_rsakey,
- algorithm='RS512')
-
- with open('tests/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = RSA.importKey(rsa_pub_file.read())
- assert jwt.decode(jwt_message, pub_rsakey)
-
- load_output = jwt.load(jwt_message)
- jwt.verify_signature(key=pub_rsakey, *load_output)
-
- # string-formatted key
- with open('tests/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = rsa_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_rsakey,
- algorithm='RS512')
-
- with open('tests/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
-
- load_output = jwt.load(jwt_message)
- jwt.verify_signature(key=pub_rsakey, *load_output)
- except ImportError:
- pass
+ # PEM-formatted RSA key
+ with open('tests/testkey_rsa', 'r') as rsa_priv_file:
+ priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()), password=None, backend=default_backend())
+ jwt_message = jwt.encode(self.payload, priv_rsakey,
+ algorithm='RS512')
+
+ with open('tests/testkey_rsa.pub', 'r') as rsa_pub_file:
+ pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), backend=default_backend())
+ assert jwt.decode(jwt_message, pub_rsakey)
+
+ load_output = jwt.load(jwt_message)
+ jwt.verify_signature(key=pub_rsakey, *load_output)
+
+ # string-formatted key
+ with open('tests/testkey_rsa', 'r') as rsa_priv_file:
+ priv_rsakey = rsa_priv_file.read()
+ jwt_message = jwt.encode(self.payload, priv_rsakey,
+ algorithm='RS512')
+
+ with open('tests/testkey_rsa.pub', 'r') as rsa_pub_file:
+ pub_rsakey = rsa_pub_file.read()
+ assert jwt.decode(jwt_message, pub_rsakey)
+
+ load_output = jwt.load(jwt_message)
+ jwt.verify_signature(key=pub_rsakey, *load_output)
def test_rsa_related_signing_methods(self):
if has_rsa:
diff --git a/tox.ini b/tox.ini
index e8e9181..7c3d876 100644
--- a/tox.ini
+++ b/tox.ini
@@ -3,7 +3,7 @@ envlist = py26, py27, py32, py33, py34
[testenv]
deps =
- PyCrypto
+ cryptography
ecdsa
unittest2
commands = {envpython} setup.py test