diff options
author | lovetox <philipp@hoerist.com> | 2022-05-20 14:44:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-20 14:44:59 +0200 |
commit | dd98e31ad6f9e33f351d47246d6d09c804ba8c29 (patch) | |
tree | b356ff30f5d8d45ef0023f6985059575522364ca /src/OpenSSL/crypto.py | |
parent | 45ebb73416a67cb87b7ca0bfcfe7902b4f38250a (diff) | |
download | pyopenssl-git-dd98e31ad6f9e33f351d47246d6d09c804ba8c29.tar.gz |
Add inline type annotations (#1089)
* crypto: Add type annotations
* Don’t redefine var
mypy complains about the redefinition
* _util: Add type annotations
* rand: Add type annotations
* Prepare package & CI for running mypy
* fix toxenv name
Co-authored-by: Maximilian Hils <github@maximilianhils.com>
Diffstat (limited to 'src/OpenSSL/crypto.py')
-rw-r--r-- | src/OpenSSL/crypto.py | 466 |
1 files changed, 274 insertions, 192 deletions
diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py index 7f95d20..ff1fd08 100644 --- a/src/OpenSSL/crypto.py +++ b/src/OpenSSL/crypto.py @@ -3,6 +3,20 @@ import datetime import functools from base64 import b16encode from functools import partial +from os import PathLike +from typing import ( + Any, + Callable, + Iterable, + List, + NoReturn, + Optional, + Sequence, + Set, + Tuple, + Type, + Union, +) from cryptography import utils, x509 from cryptography.hazmat.primitives.asymmetric import ( @@ -64,16 +78,24 @@ __all__ = [ "load_pkcs12", ] -FILETYPE_PEM = _lib.SSL_FILETYPE_PEM -FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 + +_Key = Union[ + dsa.DSAPrivateKey, dsa.DSAPublicKey, rsa.RSAPrivateKey, rsa.RSAPublicKey +] +StrOrBytesPath = Union[str, bytes, PathLike] +PassphraseCallableT = Union[bytes, Callable[..., bytes]] + + +FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM +FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1 # TODO This was an API mistake. OpenSSL has no such constant. FILETYPE_TEXT = 2**16 - 1 -TYPE_RSA = _lib.EVP_PKEY_RSA -TYPE_DSA = _lib.EVP_PKEY_DSA -TYPE_DH = _lib.EVP_PKEY_DH -TYPE_EC = _lib.EVP_PKEY_EC +TYPE_RSA: int = _lib.EVP_PKEY_RSA +TYPE_DSA: int = _lib.EVP_PKEY_DSA +TYPE_DH: int = _lib.EVP_PKEY_DH +TYPE_EC: int = _lib.EVP_PKEY_EC class Error(Exception): @@ -86,7 +108,7 @@ _raise_current_error = partial(_exception_from_error_queue, Error) _openssl_assert = _make_assert(Error) -def _untested_error(where): +def _untested_error(where: str) -> NoReturn: """ An OpenSSL API failed somehow. Additionally, the failure which was encountered isn't one that's exercised by the test suite so future behavior @@ -95,7 +117,7 @@ def _untested_error(where): raise RuntimeError("Unknown %s failure" % (where,)) -def _new_mem_buf(buffer=None): +def _new_mem_buf(buffer: Optional[bytes] = None) -> Any: """ Allocate a new OpenSSL memory BIO. @@ -112,7 +134,7 @@ def _new_mem_buf(buffer=None): bio = _lib.BIO_new_mem_buf(data, len(buffer)) # Keep the memory alive as long as the bio is alive! - def free(bio, ref=data): + def free(bio: Any, ref: Any = data) -> Any: return _lib.BIO_free(bio) _openssl_assert(bio != _ffi.NULL) @@ -121,7 +143,7 @@ def _new_mem_buf(buffer=None): return bio -def _bio_to_string(bio): +def _bio_to_string(bio: Any) -> bytes: """ Copy the contents of an OpenSSL BIO object into a Python byte string. """ @@ -130,7 +152,7 @@ def _bio_to_string(bio): return _ffi.buffer(result_buffer[0], buffer_length)[:] -def _set_asn1_time(boundary, when): +def _set_asn1_time(boundary: Any, when: bytes) -> None: """ The the time value of an ASN1 time object. @@ -152,7 +174,7 @@ def _set_asn1_time(boundary, when): raise ValueError("Invalid string") -def _get_asn1_time(timestamp): +def _get_asn1_time(timestamp: Any) -> Optional[bytes]: """ Retrieve the time value of an ASN1 time object. @@ -194,13 +216,13 @@ def _get_asn1_time(timestamp): class _X509NameInvalidator: - def __init__(self): - self._names = [] + def __init__(self) -> None: + self._names: List[X509Name] = [] - def add(self, name): + def add(self, name: "X509Name") -> None: self._names.append(name) - def clear(self): + def clear(self) -> None: for name in self._names: # Breaks the object, but also prevents UAF! del name._name @@ -214,12 +236,12 @@ class PKey: _only_public = False _initialized = True - def __init__(self): + def __init__(self) -> None: pkey = _lib.EVP_PKEY_new() self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) self._initialized = False - def to_cryptography_key(self): + def to_cryptography_key(self) -> _Key: """ Export as a ``cryptography`` key. @@ -243,7 +265,7 @@ class PKey: return load_der_private_key(der, None) @classmethod - def from_cryptography_key(cls, crypto_key): + def from_cryptography_key(cls, crypto_key: _Key) -> "PKey": """ Construct based on a ``cryptography`` *crypto_key*. @@ -288,7 +310,7 @@ class PKey: ) return load_privatekey(FILETYPE_ASN1, der) - def generate_key(self, type, bits): + def generate_key(self, type: int, bits: int) -> None: """ Generate a key pair of the given type, with the given number of bits. @@ -344,7 +366,7 @@ class PKey: self._initialized = True - def check(self): + def check(self) -> bool: """ Check the consistency of an RSA private key. @@ -370,7 +392,7 @@ class PKey: return True _raise_current_error() - def type(self): + def type(self) -> int: """ Returns the type of the key @@ -378,7 +400,7 @@ class PKey: """ return _lib.EVP_PKEY_id(self._pkey) - def bits(self): + def bits(self) -> int: """ Returns the number of bits of the key @@ -399,7 +421,7 @@ class _EllipticCurve: _curves = None - def __ne__(self, other): + def __ne__(self, other: Any) -> bool: """ Implement cooperation with the right-hand side argument of ``!=``. @@ -411,7 +433,7 @@ class _EllipticCurve: return NotImplemented @classmethod - def _load_elliptic_curves(cls, lib): + def _load_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]: """ Get the curves supported by OpenSSL. @@ -429,7 +451,7 @@ class _EllipticCurve: return set(cls.from_nid(lib, c.nid) for c in builtin_curves) @classmethod - def _get_elliptic_curves(cls, lib): + def _get_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]: """ Get, cache, and return the curves supported by OpenSSL. @@ -443,7 +465,7 @@ class _EllipticCurve: return cls._curves @classmethod - def from_nid(cls, lib, nid): + def from_nid(cls, lib: Any, nid: int) -> "_EllipticCurve": """ Instantiate a new :py:class:`_EllipticCurve` associated with the given OpenSSL NID. @@ -459,7 +481,7 @@ class _EllipticCurve: """ return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) - def __init__(self, lib, nid, name): + def __init__(self, lib: Any, nid: int, name: str) -> None: """ :param _lib: The :py:mod:`cryptography` binding instance used to interface with OpenSSL. @@ -476,10 +498,10 @@ class _EllipticCurve: self._nid = nid self.name = name - def __repr__(self): + def __repr__(self) -> str: return "<Curve %r>" % (self.name,) - def _to_EC_KEY(self): + def _to_EC_KEY(self) -> Any: """ Create a new OpenSSL EC_KEY structure initialized to use this curve. @@ -490,7 +512,7 @@ class _EllipticCurve: return _ffi.gc(key, _lib.EC_KEY_free) -def get_elliptic_curves(): +def get_elliptic_curves() -> Set["_EllipticCurve"]: """ Return a set of objects representing the elliptic curves supported in the OpenSSL build in use. @@ -505,7 +527,7 @@ def get_elliptic_curves(): return _EllipticCurve._get_elliptic_curves(_lib) -def get_elliptic_curve(name): +def get_elliptic_curve(name: str) -> _EllipticCurve: """ Return a single curve object selected by name. @@ -549,7 +571,7 @@ class X509Name: :ivar emailAddress: The e-mail address of the entity. """ - def __init__(self, name): + def __init__(self, name: "X509Name") -> None: """ Create a new X509Name, copying the given X509Name instance. @@ -557,9 +579,9 @@ class X509Name: :type name: :py:class:`X509Name` """ name = _lib.X509_NAME_dup(name._name) - self._name = _ffi.gc(name, _lib.X509_NAME_free) + self._name: Any = _ffi.gc(name, _lib.X509_NAME_free) - def __setattr__(self, name, value): + def __setattr__(self, name: str, value: Any) -> None: if name.startswith("_"): return super(X509Name, self).__setattr__(name, value) @@ -598,7 +620,7 @@ class X509Name: if not add_result: _raise_current_error() - def __getattr__(self, name): + def __getattr__(self, name: str) -> Optional[str]: """ Find attribute. An X509Name object has the following attributes: countryName (alias C), stateOrProvince (alias ST), locality (alias L), @@ -638,19 +660,19 @@ class X509Name: _lib.OPENSSL_free(result_buffer[0]) return result - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if not isinstance(other, X509Name): return NotImplemented return _lib.X509_NAME_cmp(self._name, other._name) == 0 - def __lt__(self, other): + def __lt__(self, other: Any) -> bool: if not isinstance(other, X509Name): return NotImplemented return _lib.X509_NAME_cmp(self._name, other._name) < 0 - def __repr__(self): + def __repr__(self) -> str: """ String representation of an X509Name """ @@ -664,7 +686,7 @@ class X509Name: _ffi.string(result_buffer).decode("utf-8"), ) - def hash(self): + def hash(self) -> int: """ Return an integer representation of the first four bytes of the MD5 digest of the DER representation of the name. @@ -676,7 +698,7 @@ class X509Name: """ return _lib.X509_NAME_hash(self._name) - def der(self): + def der(self) -> bytes: """ Return the DER encoding of this name. @@ -691,7 +713,7 @@ class X509Name: _lib.OPENSSL_free(result_buffer[0]) return string_result - def get_components(self): + def get_components(self) -> List[Tuple[bytes, bytes]]: """ Returns the components of this name, as a sequence of 2-tuples. @@ -723,7 +745,14 @@ class X509Extension: An X.509 v3 certificate extension. """ - def __init__(self, type_name, critical, value, subject=None, issuer=None): + def __init__( + self, + type_name: bytes, + critical: bool, + value: bytes, + subject: Optional["X509"] = None, + issuer: Optional["X509"] = None, + ) -> None: """ Initializes an X509 extension. @@ -785,7 +814,7 @@ class X509Extension: self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) @property - def _nid(self): + def _nid(self) -> Any: return _lib.OBJ_obj2nid( _lib.X509_EXTENSION_get_object(self._extension) ) @@ -796,7 +825,7 @@ class X509Extension: _lib.GEN_URI: "URI", } - def _subjectAltNameString(self): + def _subjectAltNameString(self) -> str: names = _ffi.cast( "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) ) @@ -818,7 +847,7 @@ class X509Extension: parts.append(label + ":" + value) return ", ".join(parts) - def __str__(self): + def __str__(self) -> str: """ :return: a nice text representation of the extension """ @@ -831,7 +860,7 @@ class X509Extension: return _bio_to_string(bio).decode("utf-8") - def get_critical(self): + def get_critical(self) -> bool: """ Returns the critical field of this X.509 extension. @@ -839,7 +868,7 @@ class X509Extension: """ return _lib.X509_EXTENSION_get_critical(self._extension) - def get_short_name(self): + def get_short_name(self) -> bytes: """ Returns the short type name of this X.509 extension. @@ -854,7 +883,7 @@ class X509Extension: nid = _lib.OBJ_obj2nid(obj) return _ffi.string(_lib.OBJ_nid2sn(nid)) - def get_data(self): + def get_data(self) -> bytes: """ Returns the data of the X509 extension, encoded as ASN.1. @@ -875,13 +904,13 @@ class X509Req: An X.509 certificate signing requests. """ - def __init__(self): + def __init__(self) -> None: req = _lib.X509_REQ_new() self._req = _ffi.gc(req, _lib.X509_REQ_free) # Default to version 0. self.set_version(0) - def to_cryptography(self): + def to_cryptography(self) -> x509.CertificateSigningRequest: """ Export as a ``cryptography`` certificate signing request. @@ -896,7 +925,9 @@ class X509Req: return load_der_x509_csr(der) @classmethod - def from_cryptography(cls, crypto_req): + def from_cryptography( + cls, crypto_req: x509.CertificateSigningRequest + ) -> "X509Req": """ Construct based on a ``cryptography`` *crypto_req*. @@ -915,7 +946,7 @@ class X509Req: der = crypto_req.public_bytes(Encoding.DER) return load_certificate_request(FILETYPE_ASN1, der) - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate signing request. @@ -927,7 +958,7 @@ class X509Req: set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) _openssl_assert(set_result == 1) - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of the certificate signing request. @@ -941,7 +972,7 @@ class X509Req: pkey._only_public = True return pkey - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate request. @@ -952,7 +983,7 @@ class X509Req: set_result = _lib.X509_REQ_set_version(self._req, version) _openssl_assert(set_result == 1) - def get_version(self): + def get_version(self) -> int: """ Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate request. @@ -962,7 +993,7 @@ class X509Req: """ return _lib.X509_REQ_get_version(self._req) - def get_subject(self): + def get_subject(self) -> X509Name: """ Return the subject of this certificate signing request. @@ -984,7 +1015,7 @@ class X509Req: return name - def add_extensions(self, extensions): + def add_extensions(self, extensions: Iterable[X509Extension]) -> None: """ Add extensions to the certificate signing request. @@ -1007,7 +1038,7 @@ class X509Req: add_result = _lib.X509_REQ_add_extensions(self._req, stack) _openssl_assert(add_result == 1) - def get_extensions(self): + def get_extensions(self) -> List[X509Extension]: """ Get X.509 extensions in the certificate signing request. @@ -1035,7 +1066,7 @@ class X509Req: exts.append(ext) return exts - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate signing request with this key and digest type. @@ -1059,7 +1090,7 @@ class X509Req: sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) _openssl_assert(sign_result > 0) - def verify(self, pkey): + def verify(self, pkey: PKey) -> bool: """ Verifies the signature on this certificate signing request. @@ -1086,7 +1117,7 @@ class X509: An X.509 certificate. """ - def __init__(self): + def __init__(self) -> None: x509 = _lib.X509_new() _openssl_assert(x509 != _ffi.NULL) self._x509 = _ffi.gc(x509, _lib.X509_free) @@ -1095,14 +1126,14 @@ class X509: self._subject_invalidator = _X509NameInvalidator() @classmethod - def _from_raw_x509_ptr(cls, x509): + def _from_raw_x509_ptr(cls, x509: Any) -> "X509": cert = cls.__new__(cls) cert._x509 = _ffi.gc(x509, _lib.X509_free) cert._issuer_invalidator = _X509NameInvalidator() cert._subject_invalidator = _X509NameInvalidator() return cert - def to_cryptography(self): + def to_cryptography(self) -> x509.Certificate: """ Export as a ``cryptography`` certificate. @@ -1116,7 +1147,7 @@ class X509: return load_der_x509_certificate(der) @classmethod - def from_cryptography(cls, crypto_cert): + def from_cryptography(cls, crypto_cert: x509.Certificate) -> "X509": """ Construct based on a ``cryptography`` *crypto_cert*. @@ -1135,7 +1166,7 @@ class X509: der = crypto_cert.public_bytes(Encoding.DER) return load_certificate(FILETYPE_ASN1, der) - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the version number of the certificate. Note that the version value is zero-based, eg. a value of 0 is V1. @@ -1150,7 +1181,7 @@ class X509: _openssl_assert(_lib.X509_set_version(self._x509, version) == 1) - def get_version(self): + def get_version(self) -> int: """ Return the version number of the certificate. @@ -1159,7 +1190,7 @@ class X509: """ return _lib.X509_get_version(self._x509) - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of the certificate. @@ -1174,7 +1205,7 @@ class X509: pkey._only_public = True return pkey - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate. @@ -1189,7 +1220,7 @@ class X509: set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) _openssl_assert(set_result == 1) - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate with this key and digest type. @@ -1217,7 +1248,7 @@ class X509: sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) _openssl_assert(sign_result > 0) - def get_signature_algorithm(self): + def get_signature_algorithm(self) -> bytes: """ Return the signature algorithm used in the certificate. @@ -1234,7 +1265,7 @@ class X509: raise ValueError("Undefined signature algorithm") return _ffi.string(_lib.OBJ_nid2ln(nid)) - def digest(self, digest_name): + def digest(self, digest_name: str) -> bytes: """ Return the digest of the X509 object. @@ -1265,7 +1296,7 @@ class X509: ] ) - def subject_name_hash(self): + def subject_name_hash(self) -> bytes: """ Return the hash of the X509 subject. @@ -1274,7 +1305,7 @@ class X509: """ return _lib.X509_subject_name_hash(self._x509) - def set_serial_number(self, serial): + def set_serial_number(self, serial: int) -> None: """ Set the serial number of the certificate. @@ -1313,7 +1344,7 @@ class X509: set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) _openssl_assert(set_result == 1) - def get_serial_number(self): + def get_serial_number(self) -> int: """ Return the serial number of this certificate. @@ -1333,7 +1364,7 @@ class X509: finally: _lib.BN_free(bignum_serial) - def gmtime_adj_notAfter(self, amount): + def gmtime_adj_notAfter(self, amount: int) -> None: """ Adjust the time stamp on which the certificate stops being valid. @@ -1347,7 +1378,7 @@ class X509: notAfter = _lib.X509_getm_notAfter(self._x509) _lib.X509_gmtime_adj(notAfter, amount) - def gmtime_adj_notBefore(self, amount): + def gmtime_adj_notBefore(self, amount: int) -> None: """ Adjust the timestamp on which the certificate starts being valid. @@ -1360,25 +1391,25 @@ class X509: notBefore = _lib.X509_getm_notBefore(self._x509) _lib.X509_gmtime_adj(notBefore, amount) - def has_expired(self): + def has_expired(self) -> bool: """ Check whether the certificate has expired. :return: ``True`` if the certificate has expired, ``False`` otherwise. :rtype: bool """ - time_string = self.get_notAfter() - if time_string is None: + time_bytes = self.get_notAfter() + if time_bytes is None: raise ValueError("Unable to determine notAfter") - time_string = time_string.decode("utf-8") + time_string = time_bytes.decode("utf-8") not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") return not_after < datetime.datetime.utcnow() - def _get_boundary_time(self, which): + def _get_boundary_time(self, which: Any) -> Optional[bytes]: return _get_asn1_time(which(self._x509)) - def get_notBefore(self): + def get_notBefore(self) -> Optional[bytes]: """ Get the timestamp at which the certificate starts being valid. @@ -1391,10 +1422,12 @@ class X509: """ return self._get_boundary_time(_lib.X509_getm_notBefore) - def _set_boundary_time(self, which, when): + def _set_boundary_time( + self, which: Callable[..., Any], when: bytes + ) -> None: return _set_asn1_time(which(self._x509), when) - def set_notBefore(self, when): + def set_notBefore(self, when: bytes) -> None: """ Set the timestamp at which the certificate starts being valid. @@ -1407,7 +1440,7 @@ class X509: """ return self._set_boundary_time(_lib.X509_getm_notBefore, when) - def get_notAfter(self): + def get_notAfter(self) -> Optional[bytes]: """ Get the timestamp at which the certificate stops being valid. @@ -1420,7 +1453,7 @@ class X509: """ return self._get_boundary_time(_lib.X509_getm_notAfter) - def set_notAfter(self, when): + def set_notAfter(self, when: bytes) -> None: """ Set the timestamp at which the certificate stops being valid. @@ -1433,7 +1466,7 @@ class X509: """ return self._set_boundary_time(_lib.X509_getm_notAfter, when) - def _get_name(self, which): + def _get_name(self, which: Any) -> X509Name: name = X509Name.__new__(X509Name) name._name = which(self._x509) _openssl_assert(name._name != _ffi.NULL) @@ -1444,13 +1477,13 @@ class X509: return name - def _set_name(self, which, name): + def _set_name(self, which: Any, name: X509Name) -> None: if not isinstance(name, X509Name): raise TypeError("name must be an X509Name") set_result = which(self._x509, name._name) _openssl_assert(set_result == 1) - def get_issuer(self): + def get_issuer(self) -> X509Name: """ Return the issuer of this certificate. @@ -1466,7 +1499,7 @@ class X509: self._issuer_invalidator.add(name) return name - def set_issuer(self, issuer): + def set_issuer(self, issuer: X509Name) -> None: """ Set the issuer of this certificate. @@ -1478,7 +1511,7 @@ class X509: self._set_name(_lib.X509_set_issuer_name, issuer) self._issuer_invalidator.clear() - def get_subject(self): + def get_subject(self) -> X509Name: """ Return the subject of this certificate. @@ -1494,7 +1527,7 @@ class X509: self._subject_invalidator.add(name) return name - def set_subject(self, subject): + def set_subject(self, subject: X509Name) -> None: """ Set the subject of this certificate. @@ -1506,7 +1539,7 @@ class X509: self._set_name(_lib.X509_set_subject_name, subject) self._subject_invalidator.clear() - def get_extension_count(self): + def get_extension_count(self) -> int: """ Get the number of extensions on this certificate. @@ -1517,7 +1550,7 @@ class X509: """ return _lib.X509_get_ext_count(self._x509) - def add_extensions(self, extensions): + def add_extensions(self, extensions: Iterable[X509Extension]) -> None: """ Add extensions to the certificate. @@ -1533,7 +1566,7 @@ class X509: if not add_result: _raise_current_error() - def get_extension(self, index): + def get_extension(self, index: int) -> X509Extension: """ Get a specific extension of the certificate by index. @@ -1568,16 +1601,16 @@ class X509StoreFlags: https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html """ - CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK - CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL - IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL - X509_STRICT = _lib.X509_V_FLAG_X509_STRICT - ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS - POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK - EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY - INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP - NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY - CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE + CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK + CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL + IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL + X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT + ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS + POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK + EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY + INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP + NOTIFY_POLICY: int = _lib.X509_V_FLAG_NOTIFY_POLICY + CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE class X509Store: @@ -1594,11 +1627,11 @@ class X509Store: :class:`X509StoreContext`. """ - def __init__(self): + def __init__(self) -> None: store = _lib.X509_STORE_new() self._store = _ffi.gc(store, _lib.X509_STORE_free) - def add_cert(self, cert): + def add_cert(self, cert: X509) -> None: """ Adds a trusted certificate to this store. @@ -1620,7 +1653,7 @@ class X509Store: res = _lib.X509_STORE_add_cert(self._store, cert._x509) _openssl_assert(res == 1) - def add_crl(self, crl): + def add_crl(self, crl: "CRL") -> None: """ Add a certificate revocation list to this store. @@ -1636,7 +1669,7 @@ class X509Store: """ _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0) - def set_flags(self, flags): + def set_flags(self, flags: int) -> None: """ Set verification flags to this store. @@ -1660,7 +1693,7 @@ class X509Store: """ _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) - def set_time(self, vfy_time): + def set_time(self, vfy_time: datetime.datetime) -> None: """ Set the time against which the certificates are verified. @@ -1684,7 +1717,9 @@ class X509Store: ) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) - def load_locations(self, cafile, capath=None): + def load_locations( + self, cafile: StrOrBytesPath, capath: Optional[StrOrBytesPath] = None + ) -> None: """ Let X509Store know where we can find trusted certificates for the certificate chain. Note that the certificates have to be in PEM @@ -1741,7 +1776,7 @@ class X509StoreContextError(Exception): :type certificate: :class:`X509` """ - def __init__(self, message, certificate): + def __init__(self, message: Any, certificate: X509) -> None: super(X509StoreContextError, self).__init__(message) self.certificate = certificate @@ -1768,7 +1803,12 @@ class X509StoreContext: :type chain: :class:`list` of :class:`X509` """ - def __init__(self, store, certificate, chain=None): + def __init__( + self, + store: X509Store, + certificate: X509, + chain: Optional[Sequence[X509]] = None, + ) -> None: store_ctx = _lib.X509_STORE_CTX_new() self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) self._store = store @@ -1780,8 +1820,10 @@ class X509StoreContext: self._init() @staticmethod - def _build_certificate_stack(certificates): - def cleanup(s): + def _build_certificate_stack( + certificates: Optional[Sequence[X509]], + ) -> None: + def cleanup(s: Any) -> None: # Equivalent to sk_X509_pop_free, but we don't # currently have a CFFI binding for that available for i in range(_lib.sk_X509_num(s)): @@ -1807,7 +1849,7 @@ class X509StoreContext: return stack - def _init(self): + def _init(self) -> None: """ Set up the store context for a subsequent verification operation. @@ -1820,7 +1862,7 @@ class X509StoreContext: if ret <= 0: _raise_current_error() - def _cleanup(self): + def _cleanup(self) -> None: """ Internally cleans up the store context. @@ -1828,7 +1870,7 @@ class X509StoreContext: """ _lib.X509_STORE_CTX_cleanup(self._store_ctx) - def _exception_from_context(self): + def _exception_from_context(self) -> X509StoreContextError: """ Convert an OpenSSL native context error failure into a Python exception. @@ -1852,7 +1894,7 @@ class X509StoreContext: pycert = X509._from_raw_x509_ptr(_cert) return X509StoreContextError(errors, pycert) - def set_store(self, store): + def set_store(self, store: X509Store) -> None: """ Set the context's X.509 store. @@ -1863,7 +1905,7 @@ class X509StoreContext: """ self._store = store - def verify_certificate(self): + def verify_certificate(self) -> None: """ Verify a certificate in a context. @@ -1885,7 +1927,7 @@ class X509StoreContext: if ret <= 0: raise self._exception_from_context() - def get_verified_chain(self): + def get_verified_chain(self) -> List[X509]: """ Verify a certificate in a context and return the complete validated chain. @@ -1925,7 +1967,7 @@ class X509StoreContext: return result -def load_certificate(type, buffer): +def load_certificate(type: int, buffer: bytes) -> X509: """ Load a certificate (X509) from the string *buffer* encoded with the type *type*. @@ -1954,7 +1996,7 @@ def load_certificate(type, buffer): return X509._from_raw_x509_ptr(x509) -def dump_certificate(type, cert): +def dump_certificate(type: int, cert: X509) -> bytes: """ Dump the certificate *cert* into a buffer string encoded with the type *type*. @@ -1982,7 +2024,7 @@ def dump_certificate(type, cert): return _bio_to_string(bio) -def dump_publickey(type, pkey): +def dump_publickey(type: int, pkey: PKey) -> bytes: """ Dump a public key to a buffer. @@ -2007,7 +2049,12 @@ def dump_publickey(type, pkey): return _bio_to_string(bio) -def dump_privatekey(type, pkey, cipher=None, passphrase=None): +def dump_privatekey( + type: int, + pkey: PKey, + cipher: Optional[str] = None, + passphrase: Optional[PassphraseCallableT] = None, +) -> bytes: """ Dump the private key *pkey* into a buffer string encoded with the type *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it @@ -2091,11 +2138,11 @@ class Revoked: # b"removeFromCRL", ] - def __init__(self): + def __init__(self) -> None: revoked = _lib.X509_REVOKED_new() self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free) - def set_serial(self, hex_str): + def set_serial(self, hex_str: bytes) -> None: """ Set the serial number. @@ -2119,7 +2166,7 @@ class Revoked: ) _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) - def get_serial(self): + def get_serial(self) -> bytes: """ Get the serial number. @@ -2137,7 +2184,7 @@ class Revoked: _openssl_assert(result >= 0) return _bio_to_string(bio) - def _delete_reason(self): + def _delete_reason(self) -> None: for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): ext = _lib.X509_REVOKED_get_ext(self._revoked, i) obj = _lib.X509_EXTENSION_get_object(ext) @@ -2146,7 +2193,7 @@ class Revoked: _lib.X509_REVOKED_delete_ext(self._revoked, i) break - def set_reason(self, reason): + def set_reason(self, reason: Optional[bytes]) -> None: """ Set the reason of this revocation. @@ -2183,7 +2230,7 @@ class Revoked: ) _openssl_assert(add_result == 1) - def get_reason(self): + def get_reason(self) -> Optional[bytes]: """ Get the reason of this revocation. @@ -2209,8 +2256,9 @@ class Revoked: _openssl_assert(print_result != 0) return _bio_to_string(bio) + return None - def all_reasons(self): + def all_reasons(self) -> List[bytes]: """ Return a list of all the supported reason strings. @@ -2222,7 +2270,7 @@ class Revoked: """ return self._crl_reasons[:] - def set_rev_date(self, when): + def set_rev_date(self, when: bytes) -> None: """ Set the revocation timestamp. @@ -2233,7 +2281,7 @@ class Revoked: dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) return _set_asn1_time(dt, when) - def get_rev_date(self): + def get_rev_date(self) -> Optional[bytes]: """ Get the revocation timestamp. @@ -2249,11 +2297,11 @@ class CRL: A certificate revocation list. """ - def __init__(self): + def __init__(self) -> None: crl = _lib.X509_CRL_new() self._crl = _ffi.gc(crl, _lib.X509_CRL_free) - def to_cryptography(self): + def to_cryptography(self) -> x509.CertificateRevocationList: """ Export as a ``cryptography`` CRL. @@ -2267,7 +2315,9 @@ class CRL: return load_der_x509_crl(der) @classmethod - def from_cryptography(cls, crypto_crl): + def from_cryptography( + cls, crypto_crl: x509.CertificateRevocationList + ) -> "CRL": """ Construct based on a ``cryptography`` *crypto_crl*. @@ -2286,7 +2336,7 @@ class CRL: der = crypto_crl.public_bytes(Encoding.DER) return load_crl(FILETYPE_ASN1, der) - def get_revoked(self): + def get_revoked(self) -> Optional[Tuple[Revoked, ...]]: """ Return the revocations in this certificate revocation list. @@ -2306,8 +2356,9 @@ class CRL: results.append(pyrev) if results: return tuple(results) + return None - def add_revoked(self, revoked): + def add_revoked(self, revoked: Revoked) -> None: """ Add a revoked (by value not reference) to the CRL structure @@ -2324,7 +2375,7 @@ class CRL: add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) _openssl_assert(add_result != 0) - def get_issuer(self): + def get_issuer(self) -> X509Name: """ Get the CRL's issuer. @@ -2339,7 +2390,7 @@ class CRL: issuer._name = _issuer return issuer - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the CRL version. @@ -2350,10 +2401,12 @@ class CRL: """ _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0) - def _set_boundary_time(self, which, when): + def _set_boundary_time( + self, which: Callable[..., Any], when: bytes + ) -> None: return _set_asn1_time(which(self._crl), when) - def set_lastUpdate(self, when): + def set_lastUpdate(self, when: bytes) -> None: """ Set when the CRL was last updated. @@ -2368,7 +2421,7 @@ class CRL: """ return self._set_boundary_time(_lib.X509_CRL_get0_lastUpdate, when) - def set_nextUpdate(self, when): + def set_nextUpdate(self, when: bytes) -> None: """ Set when the CRL will next be updated. @@ -2383,7 +2436,7 @@ class CRL: """ return self._set_boundary_time(_lib.X509_CRL_get0_nextUpdate, when) - def sign(self, issuer_cert, issuer_key, digest): + def sign(self, issuer_cert: X509, issuer_key: PKey, digest: bytes) -> None: """ Sign the CRL. @@ -2410,8 +2463,13 @@ class CRL: _openssl_assert(result != 0) def export( - self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED - ): + self, + cert: X509, + key: PKey, + type: int = FILETYPE_PEM, + days: int = 100, + digest: bytes = _UNSPECIFIED, # type: ignore + ) -> bytes: """ Export the CRL as a string. @@ -2465,7 +2523,10 @@ class CRL: class PKCS7: - def type_is_signed(self): + + _pkcs7: Any + + def type_is_signed(self) -> bool: """ Check if this NID_pkcs7_signed object @@ -2473,7 +2534,7 @@ class PKCS7: """ return bool(_lib.PKCS7_type_is_signed(self._pkcs7)) - def type_is_enveloped(self): + def type_is_enveloped(self) -> bool: """ Check if this NID_pkcs7_enveloped object @@ -2481,7 +2542,7 @@ class PKCS7: """ return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7)) - def type_is_signedAndEnveloped(self): + def type_is_signedAndEnveloped(self) -> bool: """ Check if this NID_pkcs7_signedAndEnveloped object @@ -2489,7 +2550,7 @@ class PKCS7: """ return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7)) - def type_is_data(self): + def type_is_data(self) -> bool: """ Check if this NID_pkcs7_data object @@ -2497,7 +2558,7 @@ class PKCS7: """ return bool(_lib.PKCS7_type_is_data(self._pkcs7)) - def get_type_name(self): + def get_type_name(self) -> str: """ Returns the type name of the PKCS7 structure @@ -2513,13 +2574,13 @@ class PKCS12: A PKCS #12 archive. """ - def __init__(self): - self._pkey = None - self._cert = None - self._cacerts = None - self._friendlyname = None + def __init__(self) -> None: + self._pkey: Optional[PKey] = None + self._cert: Optional[X509] = None + self._cacerts: Optional[List[X509]] = None + self._friendlyname: Optional[bytes] = None - def get_certificate(self): + def get_certificate(self) -> Optional[X509]: """ Get the certificate in the PKCS #12 structure. @@ -2528,7 +2589,7 @@ class PKCS12: """ return self._cert - def set_certificate(self, cert): + def set_certificate(self, cert: X509) -> None: """ Set the certificate in the PKCS #12 structure. @@ -2541,7 +2602,7 @@ class PKCS12: raise TypeError("cert must be an X509 instance") self._cert = cert - def get_privatekey(self): + def get_privatekey(self) -> Optional[PKey]: """ Get the private key in the PKCS #12 structure. @@ -2550,7 +2611,7 @@ class PKCS12: """ return self._pkey - def set_privatekey(self, pkey): + def set_privatekey(self, pkey: PKey) -> None: """ Set the certificate portion of the PKCS #12 structure. @@ -2563,7 +2624,7 @@ class PKCS12: raise TypeError("pkey must be a PKey instance") self._pkey = pkey - def get_ca_certificates(self): + def get_ca_certificates(self) -> Optional[Tuple[X509, ...]]: """ Get the CA certificates in the PKCS #12 structure. @@ -2573,8 +2634,9 @@ class PKCS12: """ if self._cacerts is not None: return tuple(self._cacerts) + return None - def set_ca_certificates(self, cacerts): + def set_ca_certificates(self, cacerts: Optional[Iterable[X509]]) -> None: """ Replace or set the CA certificates within the PKCS12 object. @@ -2595,7 +2657,7 @@ class PKCS12: ) self._cacerts = cacerts - def set_friendlyname(self, name): + def set_friendlyname(self, name: Optional[bytes]) -> None: """ Set the friendly name in the PKCS #12 structure. @@ -2612,7 +2674,7 @@ class PKCS12: ) self._friendlyname = name - def get_friendlyname(self): + def get_friendlyname(self) -> Optional[bytes]: """ Get the friendly name in the PKCS# 12 structure. @@ -2621,7 +2683,12 @@ class PKCS12: """ return self._friendlyname - def export(self, passphrase=None, iter=2048, maciter=1): + def export( + self, + passphrase: Optional[bytes] = None, + iter: int = 2048, + maciter: int = 1, + ) -> bytes: """ Dump a PKCS12 object as a string. @@ -2694,11 +2761,11 @@ class NetscapeSPKI: A Netscape SPKI object. """ - def __init__(self): + def __init__(self) -> None: spki = _lib.NETSCAPE_SPKI_new() self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free) - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate request with this key and digest type. @@ -2725,7 +2792,7 @@ class NetscapeSPKI: ) _openssl_assert(sign_result > 0) - def verify(self, key): + def verify(self, key: PKey) -> bool: """ Verifies a signature on a certificate request. @@ -2742,7 +2809,7 @@ class NetscapeSPKI: _raise_current_error() return True - def b64_encode(self): + def b64_encode(self) -> bytes: """ Generate a base64 encoded representation of this SPKI object. @@ -2754,7 +2821,7 @@ class NetscapeSPKI: _lib.OPENSSL_free(encoded) return result - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of this certificate. @@ -2768,7 +2835,7 @@ class NetscapeSPKI: pkey._only_public = True return pkey - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate @@ -2780,7 +2847,13 @@ class NetscapeSPKI: class _PassphraseHelper: - def __init__(self, type, passphrase, more_args=False, truncate=False): + def __init__( + self, + type: int, + passphrase: Optional[PassphraseCallableT], + more_args: bool = False, + truncate: bool = False, + ) -> None: if type != FILETYPE_PEM and passphrase is not None: raise ValueError( "only FILETYPE_PEM key format supports encryption" @@ -2788,10 +2861,10 @@ class _PassphraseHelper: self._passphrase = passphrase self._more_args = more_args self._truncate = truncate - self._problems = [] + self._problems: List[Exception] = [] @property - def callback(self): + def callback(self) -> Any: if self._passphrase is None: return _ffi.NULL elif isinstance(self._passphrase, bytes) or callable(self._passphrase): @@ -2802,7 +2875,7 @@ class _PassphraseHelper: ) @property - def callback_args(self): + def callback_args(self) -> Any: if self._passphrase is None: return _ffi.NULL elif isinstance(self._passphrase, bytes) or callable(self._passphrase): @@ -2812,7 +2885,7 @@ class _PassphraseHelper: "Last argument must be a byte string or a callable." ) - def raise_if_problem(self, exceptionType=Error): + def raise_if_problem(self, exceptionType: Type[Exception] = Error) -> None: if self._problems: # Flush the OpenSSL error queue @@ -2823,7 +2896,9 @@ class _PassphraseHelper: raise self._problems.pop(0) - def _read_passphrase(self, buf, size, rwflag, userdata): + def _read_passphrase( + self, buf: Any, size: int, rwflag: Any, userdata: Any + ) -> int: try: if callable(self._passphrase): if self._more_args: @@ -2831,6 +2906,7 @@ class _PassphraseHelper: else: result = self._passphrase(rwflag) else: + assert self._passphrase is not None result = self._passphrase if not isinstance(result, bytes): raise ValueError("Bytes expected") @@ -2849,7 +2925,7 @@ class _PassphraseHelper: return 0 -def load_publickey(type, buffer): +def load_publickey(type: int, buffer: Union[str, bytes]) -> PKey: """ Load a public key from a buffer. @@ -2883,7 +2959,11 @@ def load_publickey(type, buffer): return pkey -def load_privatekey(type, buffer, passphrase=None): +def load_privatekey( + type: int, + buffer: Union[str, bytes], + passphrase: Optional[PassphraseCallableT] = None, +) -> PKey: """ Load a private key (PKey) from the string *buffer* encoded with the type *type*. @@ -2920,7 +3000,7 @@ def load_privatekey(type, buffer, passphrase=None): return pkey -def dump_certificate_request(type, req): +def dump_certificate_request(type: int, req: X509Req) -> bytes: """ Dump the certificate request *req* into a buffer string encoded with the type *type*. @@ -2948,7 +3028,7 @@ def dump_certificate_request(type, req): return _bio_to_string(bio) -def load_certificate_request(type, buffer): +def load_certificate_request(type: int, buffer: bytes) -> X509Req: """ Load a certificate request (X509Req) from the string *buffer* encoded with the type *type*. @@ -2976,7 +3056,7 @@ def load_certificate_request(type, buffer): return x509req -def sign(pkey, data, digest): +def sign(pkey: PKey, data: Union[str, bytes], digest: str) -> bytes: """ Sign a data string using the given key and message digest. @@ -3011,7 +3091,9 @@ def sign(pkey, data, digest): return _ffi.buffer(signature_buffer, signature_length[0])[:] -def verify(cert, signature, data, digest): +def verify( + cert: X509, signature: bytes, data: Union[str, bytes], digest: str +) -> None: """ Verify the signature for a data string. @@ -3047,7 +3129,7 @@ def verify(cert, signature, data, digest): _raise_current_error() -def dump_crl(type, crl): +def dump_crl(type: int, crl: CRL) -> bytes: """ Dump a certificate revocation list to a buffer. @@ -3076,7 +3158,7 @@ def dump_crl(type, crl): return _bio_to_string(bio) -def load_crl(type, buffer): +def load_crl(type: int, buffer: Union[str, bytes]) -> CRL: """ Load Certificate Revocation List (CRL) data from a string *buffer*. *buffer* encoded with the type *type*. @@ -3106,7 +3188,7 @@ def load_crl(type, buffer): return result -def load_pkcs7_data(type, buffer): +def load_pkcs7_data(type: int, buffer: Union[str, bytes]) -> PKCS7: """ Load pkcs7 data from the string *buffer* encoded with the type *type*. @@ -3146,7 +3228,9 @@ load_pkcs7_data = utils.deprecated( ) -def load_pkcs12(buffer, passphrase=None): +def load_pkcs12( + buffer: Union[str, bytes], passphrase: Optional[bytes] = None +) -> PKCS12: """ Load pkcs12 data from the string *buffer*. If the pkcs12 structure is encrypted, a *passphrase* must be included. The MAC is always @@ -3222,13 +3306,11 @@ def load_pkcs12(buffer, passphrase=None): x509 = _lib.sk_X509_value(cacerts, i) pycacert = X509._from_raw_x509_ptr(x509) pycacerts.append(pycacert) - if not pycacerts: - pycacerts = None pkcs12 = PKCS12.__new__(PKCS12) pkcs12._pkey = pykey pkcs12._cert = pycert - pkcs12._cacerts = pycacerts + pkcs12._cacerts = pycacerts if pycacerts else None pkcs12._friendlyname = friendlyname return pkcs12 |