summaryrefslogtreecommitdiff
path: root/src/saml2/cert.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/saml2/cert.py')
-rw-r--r--src/saml2/cert.py99
1 files changed, 50 insertions, 49 deletions
diff --git a/src/saml2/cert.py b/src/saml2/cert.py
index aeb5b662..395c8e06 100644
--- a/src/saml2/cert.py
+++ b/src/saml2/cert.py
@@ -1,14 +1,14 @@
-__author__ = 'haho0032'
+__author__ = "haho0032"
import base64
import datetime
-import dateutil.parser
-import pytz
-import six
from os import remove
from os.path import join
from OpenSSL import crypto
+import dateutil.parser
+import pytz
+import six
import saml2.cryptography.pki
@@ -29,10 +29,19 @@ class OpenSSLWrapper(object):
def __init__(self):
pass
- def create_certificate(self, cert_info, request=False, valid_from=0,
- valid_to=315360000, sn=1, key_length=1024,
- hash_alg="sha256", write_to_file=False, cert_dir="",
- cipher_passphrase=None):
+ def create_certificate(
+ self,
+ cert_info,
+ request=False,
+ valid_from=0,
+ valid_to=315360000,
+ sn=1,
+ key_length=1024,
+ hash_alg="sha256",
+ write_to_file=False,
+ cert_dir="",
+ cipher_passphrase=None,
+ ):
"""
Can create certificate requests, to be signed later by another
certificate with the method
@@ -122,7 +131,6 @@ class OpenSSLWrapper(object):
c_f = join(cert_dir, cert_file)
k_f = join(cert_dir, key_file)
-
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, key_length)
@@ -133,7 +141,7 @@ class OpenSSLWrapper(object):
if request:
cert = crypto.X509Req()
- if (len(cert_info["country_code"]) != 2):
+ if len(cert_info["country_code"]) != 2:
raise WrongInput("Country code must be two letters!")
cert.get_subject().C = cert_info["country_code"]
cert.get_subject().ST = cert_info["state"]
@@ -143,45 +151,41 @@ class OpenSSLWrapper(object):
cert.get_subject().CN = cn
if not request:
cert.set_serial_number(sn)
- cert.gmtime_adj_notBefore(valid_from) #Valid before present time
- cert.gmtime_adj_notAfter(valid_to) #3 650 days
+ cert.gmtime_adj_notBefore(valid_from) # Valid before present time
+ cert.gmtime_adj_notAfter(valid_to) # 3 650 days
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, hash_alg)
try:
if request:
- tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM,
- cert)
+ tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, cert)
else:
tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
tmp_key = None
if cipher_passphrase is not None:
passphrase = cipher_passphrase["passphrase"]
- if isinstance(cipher_passphrase["passphrase"],
- six.string_types):
- passphrase = passphrase.encode('utf-8')
- tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k,
- cipher_passphrase["cipher"],
- passphrase)
+ if isinstance(cipher_passphrase["passphrase"], six.string_types):
+ passphrase = passphrase.encode("utf-8")
+ tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, cipher_passphrase["cipher"], passphrase)
else:
tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
if write_to_file:
- with open(c_f, 'wt') as fc:
- fc.write(tmp_cert.decode('utf-8'))
- with open(k_f, 'wt') as fk:
- fk.write(tmp_key.decode('utf-8'))
+ with open(c_f, "wt") as fc:
+ fc.write(tmp_cert.decode("utf-8"))
+ with open(k_f, "wt") as fk:
+ fk.write(tmp_key.decode("utf-8"))
return c_f, k_f
return tmp_cert, tmp_key
except Exception as ex:
raise CertificateError("Certificate cannot be generated.", ex)
def write_str_to_file(self, file, str_data):
- with open(file, 'wt') as f:
+ with open(file, "wt") as f:
f.write(str_data)
def read_str_from_file(self, file, type="pem"):
- with open(file, 'rb') as f:
+ with open(file, "rb") as f:
str_data = f.read()
if type == "pem":
@@ -190,11 +194,17 @@ class OpenSSLWrapper(object):
if type in ["der", "cer", "crt"]:
return base64.b64encode(str(str_data))
-
- def create_cert_signed_certificate(self, sign_cert_str, sign_key_str,
- request_cert_str, hash_alg="sha256",
- valid_from=0, valid_to=315360000, sn=1,
- passphrase=None):
+ def create_cert_signed_certificate(
+ self,
+ sign_cert_str,
+ sign_key_str,
+ request_cert_str,
+ hash_alg="sha256",
+ valid_from=0,
+ valid_to=315360000,
+ sn=1,
+ passphrase=None,
+ ):
"""
Will sign a certificate request with a give certificate.
@@ -231,12 +241,10 @@ class OpenSSLWrapper(object):
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
ca_key = None
if passphrase is not None:
- ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str,
- passphrase)
+ ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, passphrase)
else:
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
- req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM,
- request_cert_str)
+ req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, request_cert_str)
cert = crypto.X509()
cert.set_subject(req_cert.get_subject())
@@ -250,7 +258,7 @@ class OpenSSLWrapper(object):
cert_dump = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
if isinstance(cert_dump, six.string_types):
return cert_dump
- return cert_dump.decode('utf-8')
+ return cert_dump.decode("utf-8")
def verify_chain(self, cert_chain_str_list, cert_str):
"""
@@ -267,9 +275,7 @@ class OpenSSLWrapper(object):
return False, message
else:
cert_str = tmp_cert_str
- return (True,
- "Signed certificate is valid and correctly signed by CA "
- "certificate.")
+ return (True, "Signed certificate is valid and correctly signed by CA " "certificate.")
def certificate_not_valid_yet(self, cert):
starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
@@ -278,7 +284,6 @@ class OpenSSLWrapper(object):
return False
return True
-
def verify(self, signing_cert_str, cert_str):
"""
Verifies if a certificate is valid and signed by a given certificate.
@@ -299,8 +304,7 @@ class OpenSSLWrapper(object):
Message = Why the validation failed.
"""
try:
- ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,
- signing_cert_str)
+ ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, signing_cert_str)
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
if self.certificate_not_valid_yet(ca_cert):
@@ -316,20 +320,17 @@ class OpenSSLWrapper(object):
return False, "The signed certificate is not valid yet."
if ca_cert.get_subject().CN == cert.get_subject().CN:
- return False, ("CN may not be equal for CA certificate and the "
- "signed certificate.")
+ return False, ("CN may not be equal for CA certificate and the " "signed certificate.")
cert_algorithm = cert.get_signature_algorithm()
if six.PY3:
- cert_algorithm = cert_algorithm.decode('ascii')
- cert_str = cert_str.encode('ascii')
+ cert_algorithm = cert_algorithm.decode("ascii")
+ cert_str = cert_str.encode("ascii")
cert_crypto = saml2.cryptography.pki.load_pem_x509_certificate(cert_str)
try:
- crypto.verify(ca_cert, cert_crypto.signature,
- cert_crypto.tbs_certificate_bytes,
- cert_algorithm)
+ crypto.verify(ca_cert, cert_crypto.signature, cert_crypto.tbs_certificate_bytes, cert_algorithm)
return True, "Signed certificate is valid and correctly signed by CA certificate."
except crypto.Error as e:
return False, "Certificate is incorrectly signed."