diff options
| author | Alex Gaynor <alex.gaynor@gmail.com> | 2021-01-30 17:44:14 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-01-30 16:44:14 -0600 |
| commit | f16bff2cbd2855f0becb27724334e4bdb0679247 (patch) | |
| tree | f5636309e64b6dab7055863fd7426d62a8816c55 /src | |
| parent | 83c598ac3df8b06f4f68b0cc845de1fde0e454a3 (diff) | |
| download | cryptography-f16bff2cbd2855f0becb27724334e4bdb0679247.tar.gz | |
Apply type annotations to x509 ct and ocsp (#5712)
Diffstat (limited to 'src')
| -rw-r--r-- | src/cryptography/fernet.py | 4 | ||||
| -rw-r--r-- | src/cryptography/hazmat/backends/openssl/ocsp.py | 76 | ||||
| -rw-r--r-- | src/cryptography/hazmat/backends/openssl/x509.py | 6 | ||||
| -rw-r--r-- | src/cryptography/x509/base.py | 4 | ||||
| -rw-r--r-- | src/cryptography/x509/certificate_transparency.py | 9 | ||||
| -rw-r--r-- | src/cryptography/x509/extensions.py | 9 | ||||
| -rw-r--r-- | src/cryptography/x509/general_name.py | 77 | ||||
| -rw-r--r-- | src/cryptography/x509/name.py | 52 | ||||
| -rw-r--r-- | src/cryptography/x509/ocsp.py | 395 |
9 files changed, 334 insertions, 298 deletions
diff --git a/src/cryptography/fernet.py b/src/cryptography/fernet.py index c9464c5ad..57772fee4 100644 --- a/src/cryptography/fernet.py +++ b/src/cryptography/fernet.py @@ -157,8 +157,8 @@ class Fernet(object): class MultiFernet(object): - def __init__(self, fernets_it: typing.Iterator[Fernet]): - fernets = list(fernets_it) + def __init__(self, fernets: typing.Iterable[Fernet]): + fernets = list(fernets) if not fernets: raise ValueError( "MultiFernet requires at least one Fernet instance" diff --git a/src/cryptography/hazmat/backends/openssl/ocsp.py b/src/cryptography/hazmat/backends/openssl/ocsp.py index 231794c6b..9138e78db 100644 --- a/src/cryptography/hazmat/backends/openssl/ocsp.py +++ b/src/cryptography/hazmat/backends/openssl/ocsp.py @@ -2,6 +2,8 @@ # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. +import datetime +import typing from cryptography import utils, x509 from cryptography.exceptions import UnsupportedAlgorithm @@ -14,7 +16,7 @@ from cryptography.hazmat.backends.openssl.decode_asn1 import ( _parse_asn1_generalized_time, ) from cryptography.hazmat.backends.openssl.x509 import _Certificate -from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes, serialization from cryptography.x509.ocsp import ( OCSPCertStatus, OCSPRequest, @@ -84,8 +86,7 @@ def _hash_algorithm(backend, cert_id): ) -@utils.register_interface(OCSPResponse) -class _OCSPResponse(object): +class _OCSPResponse(OCSPResponse): def __init__(self, backend, ocsp_response): self._backend = backend self._ocsp_response = ocsp_response @@ -120,7 +121,7 @@ class _OCSPResponse(object): response_status = utils.read_only_property("_status") - def _requires_successful_response(self): + def _requires_successful_response(self) -> None: if self.response_status != OCSPResponseStatus.SUCCESSFUL: raise ValueError( "OCSP response status is not successful so the property " @@ -128,7 +129,7 @@ class _OCSPResponse(object): ) @property - def signature_algorithm_oid(self): + def signature_algorithm_oid(self) -> x509.ObjectIdentifier: self._requires_successful_response() alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic) self._backend.openssl_assert(alg != self._backend._ffi.NULL) @@ -136,7 +137,9 @@ class _OCSPResponse(object): return x509.ObjectIdentifier(oid) @property - def signature_hash_algorithm(self): + def signature_hash_algorithm( + self, + ) -> typing.Optional[hashes.HashAlgorithm]: self._requires_successful_response() oid = self.signature_algorithm_oid try: @@ -147,14 +150,14 @@ class _OCSPResponse(object): ) @property - def signature(self): + def signature(self) -> bytes: self._requires_successful_response() sig = self._backend._lib.OCSP_resp_get0_signature(self._basic) self._backend.openssl_assert(sig != self._backend._ffi.NULL) return _asn1_string_to_bytes(self._backend, sig) @property - def tbs_response_bytes(self): + def tbs_response_bytes(self) -> bytes: self._requires_successful_response() respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic) self._backend.openssl_assert(respdata != self._backend._ffi.NULL) @@ -168,25 +171,25 @@ class _OCSPResponse(object): return self._backend._ffi.buffer(pp[0], res)[:] @property - def certificates(self): + def certificates(self) -> typing.List[x509.Certificate]: self._requires_successful_response() sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic) num = self._backend._lib.sk_X509_num(sk_x509) - certs = [] + certs: typing.List[x509.Certificate] = [] for i in range(num): - x509 = self._backend._lib.sk_X509_value(sk_x509, i) - self._backend.openssl_assert(x509 != self._backend._ffi.NULL) - cert = _Certificate(self._backend, x509) + x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i) + self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL) + cert = _Certificate(self._backend, x509_ptr) # We need to keep the OCSP response that the certificate came from # alive until the Certificate object itself goes out of scope, so # we give it a private reference. - cert._ocsp_resp = self + cert._ocsp_resp_ref = self certs.append(cert) return certs @property - def responder_key_hash(self): + def responder_key_hash(self) -> typing.Optional[bytes]: self._requires_successful_response() _, asn1_string = self._responder_key_name() if asn1_string == self._backend._ffi.NULL: @@ -195,7 +198,7 @@ class _OCSPResponse(object): return _asn1_string_to_bytes(self._backend, asn1_string) @property - def responder_name(self): + def responder_name(self) -> typing.Optional[x509.Name]: self._requires_successful_response() x509_name, _ = self._responder_key_name() if x509_name == self._backend._ffi.NULL: @@ -213,7 +216,7 @@ class _OCSPResponse(object): return x509_name[0], asn1_string[0] @property - def produced_at(self): + def produced_at(self) -> datetime.datetime: self._requires_successful_response() produced_at = self._backend._lib.OCSP_resp_get0_produced_at( self._basic @@ -221,7 +224,7 @@ class _OCSPResponse(object): return _parse_asn1_generalized_time(self._backend, produced_at) @property - def certificate_status(self): + def certificate_status(self) -> OCSPCertStatus: self._requires_successful_response() status = self._backend._lib.OCSP_single_get0_status( self._single, @@ -234,7 +237,7 @@ class _OCSPResponse(object): return _CERT_STATUS_TO_ENUM[status] @property - def revocation_time(self): + def revocation_time(self) -> typing.Optional[datetime.datetime]: self._requires_successful_response() if self.certificate_status is not OCSPCertStatus.REVOKED: return None @@ -251,7 +254,7 @@ class _OCSPResponse(object): return _parse_asn1_generalized_time(self._backend, asn1_time[0]) @property - def revocation_reason(self): + def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]: self._requires_successful_response() if self.certificate_status is not OCSPCertStatus.REVOKED: return None @@ -274,7 +277,7 @@ class _OCSPResponse(object): return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]] @property - def this_update(self): + def this_update(self) -> datetime.datetime: self._requires_successful_response() asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") self._backend._lib.OCSP_single_get0_status( @@ -288,7 +291,7 @@ class _OCSPResponse(object): return _parse_asn1_generalized_time(self._backend, asn1_time[0]) @property - def next_update(self): + def next_update(self) -> typing.Optional[datetime.datetime]: self._requires_successful_response() asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") self._backend._lib.OCSP_single_get0_status( @@ -304,36 +307,36 @@ class _OCSPResponse(object): return None @property - def issuer_key_hash(self): + def issuer_key_hash(self) -> bytes: self._requires_successful_response() return _issuer_key_hash(self._backend, self._cert_id) @property - def issuer_name_hash(self): + def issuer_name_hash(self) -> bytes: self._requires_successful_response() return _issuer_name_hash(self._backend, self._cert_id) @property - def hash_algorithm(self): + def hash_algorithm(self) -> hashes.HashAlgorithm: self._requires_successful_response() return _hash_algorithm(self._backend, self._cert_id) @property - def serial_number(self): + def serial_number(self) -> int: self._requires_successful_response() return _serial_number(self._backend, self._cert_id) @utils.cached_property - def extensions(self): + def extensions(self) -> x509.Extensions: self._requires_successful_response() return self._backend._ocsp_basicresp_ext_parser.parse(self._basic) @utils.cached_property - def single_extensions(self): + def single_extensions(self) -> x509.Extensions: self._requires_successful_response() return self._backend._ocsp_singleresp_ext_parser.parse(self._single) - def public_bytes(self, encoding): + def public_bytes(self, encoding: serialization.Encoding) -> bytes: if encoding is not serialization.Encoding.DER: raise ValueError("The only allowed encoding value is Encoding.DER") @@ -345,8 +348,7 @@ class _OCSPResponse(object): return self._backend._read_mem_bio(bio) -@utils.register_interface(OCSPRequest) -class _OCSPRequest(object): +class _OCSPRequest(OCSPRequest): def __init__(self, backend, ocsp_request): if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1: raise NotImplementedError( @@ -362,26 +364,26 @@ class _OCSPRequest(object): self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL) @property - def issuer_key_hash(self): + def issuer_key_hash(self) -> bytes: return _issuer_key_hash(self._backend, self._cert_id) @property - def issuer_name_hash(self): + def issuer_name_hash(self) -> bytes: return _issuer_name_hash(self._backend, self._cert_id) @property - def serial_number(self): + def serial_number(self) -> int: return _serial_number(self._backend, self._cert_id) @property - def hash_algorithm(self): + def hash_algorithm(self) -> hashes.HashAlgorithm: return _hash_algorithm(self._backend, self._cert_id) @utils.cached_property - def extensions(self): + def extensions(self) -> x509.Extensions: return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request) - def public_bytes(self, encoding): + def public_bytes(self, encoding: serialization.Encoding) -> bytes: if encoding is not serialization.Encoding.DER: raise ValueError("The only allowed encoding value is Encoding.DER") diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py index f460ed2da..07a1dd8a4 100644 --- a/src/cryptography/hazmat/backends/openssl/x509.py +++ b/src/cryptography/hazmat/backends/openssl/x509.py @@ -26,8 +26,10 @@ from cryptography.x509.base import _PUBLIC_KEY_TYPES from cryptography.x509.name import _ASN1Type -@utils.register_interface(x509.Certificate) -class _Certificate(object): +class _Certificate(x509.Certificate): + # Keep-alive reference used by OCSP + _ocsp_resp_ref: typing.Any + def __init__(self, backend, x509_cert): self._backend = backend self._x509 = x509_cert diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py index c17fff8ce..5c5e6c6da 100644 --- a/src/cryptography/x509/base.py +++ b/src/cryptography/x509/base.py @@ -140,7 +140,9 @@ class Certificate(metaclass=abc.ABCMeta): """ @abc.abstractproperty - def signature_hash_algorithm(self) -> hashes.HashAlgorithm: + def signature_hash_algorithm( + self, + ) -> typing.Optional[hashes.HashAlgorithm]: """ Returns a HashAlgorithm corresponding to the type of the digest signed in the certificate. diff --git a/src/cryptography/x509/certificate_transparency.py b/src/cryptography/x509/certificate_transparency.py index 44e18803d..d51bee92e 100644 --- a/src/cryptography/x509/certificate_transparency.py +++ b/src/cryptography/x509/certificate_transparency.py @@ -4,6 +4,7 @@ import abc +import datetime from enum import Enum @@ -18,25 +19,25 @@ class Version(Enum): class SignedCertificateTimestamp(metaclass=abc.ABCMeta): @abc.abstractproperty - def version(self): + def version(self) -> Version: """ Returns the SCT version. """ @abc.abstractproperty - def log_id(self): + def log_id(self) -> bytes: """ Returns an identifier indicating which log this SCT is for. """ @abc.abstractproperty - def timestamp(self): + def timestamp(self) -> datetime.datetime: """ Returns the timestamp for this SCT. """ @abc.abstractproperty - def entry_type(self): + def entry_type(self) -> LogEntryType: """ Returns whether this is an SCT for a certificate or pre-certificate. """ diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py index dfad69770..0602908e4 100644 --- a/src/cryptography/x509/extensions.py +++ b/src/cryptography/x509/extensions.py @@ -7,6 +7,7 @@ import abc import datetime import hashlib import ipaddress +import typing from enum import Enum from cryptography import utils @@ -86,27 +87,27 @@ def _make_sequence_methods(field_name): class DuplicateExtension(Exception): - def __init__(self, msg, oid): + def __init__(self, msg: str, oid: ObjectIdentifier): super(DuplicateExtension, self).__init__(msg) self.oid = oid class ExtensionNotFound(Exception): - def __init__(self, msg, oid): + def __init__(self, msg: str, oid: ObjectIdentifier): super(ExtensionNotFound, self).__init__(msg) self.oid = oid class ExtensionType(metaclass=abc.ABCMeta): @abc.abstractproperty - def oid(self): + def oid(self) -> ObjectIdentifier: """ Returns the oid associated with the given extension type. """ class Extensions(object): - def __init__(self, extensions): + def __init__(self, extensions: typing.List[ExtensionType]): self._extensions = extensions def get_extension_for_oid(self, oid): diff --git a/src/cryptography/x509/general_name.py b/src/cryptography/x509/general_name.py index 7c1306e26..6683e9313 100644 --- a/src/cryptography/x509/general_name.py +++ b/src/cryptography/x509/general_name.py @@ -5,6 +5,7 @@ import abc import ipaddress +import typing from email.utils import parseaddr from cryptography import utils @@ -41,7 +42,7 @@ class GeneralName(metaclass=abc.ABCMeta): @utils.register_interface(GeneralName) class RFC822Name(object): - def __init__(self, value): + def __init__(self, value: str): if isinstance(value, str): try: value.encode("ascii") @@ -70,25 +71,25 @@ class RFC822Name(object): instance._value = value return instance - def __repr__(self): + def __repr__(self) -> str: return "<RFC822Name(value={0!r})>".format(self.value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, RFC822Name): return NotImplemented return self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self.value) @utils.register_interface(GeneralName) class DNSName(object): - def __init__(self, value): + def __init__(self, value: str): if isinstance(value, str): try: value.encode("ascii") @@ -114,22 +115,22 @@ class DNSName(object): def __repr__(self): return "<DNSName(value={0!r})>".format(self.value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, DNSName): return NotImplemented return self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self.value) @utils.register_interface(GeneralName) class UniformResourceIdentifier(object): - def __init__(self, value): + def __init__(self, value: str): if isinstance(value, str): try: value.encode("ascii") @@ -152,25 +153,25 @@ class UniformResourceIdentifier(object): instance._value = value return instance - def __repr__(self): + def __repr__(self) -> str: return "<UniformResourceIdentifier(value={0!r})>".format(self.value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, UniformResourceIdentifier): return NotImplemented return self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self.value) @utils.register_interface(GeneralName) class DirectoryName(object): - def __init__(self, value): + def __init__(self, value: Name): if not isinstance(value, Name): raise TypeError("value must be a Name") @@ -178,25 +179,25 @@ class DirectoryName(object): value = utils.read_only_property("_value") - def __repr__(self): + def __repr__(self) -> str: return "<DirectoryName(value={})>".format(self.value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, DirectoryName): return NotImplemented return self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self.value) @utils.register_interface(GeneralName) class RegisteredID(object): - def __init__(self, value): + def __init__(self, value: ObjectIdentifier): if not isinstance(value, ObjectIdentifier): raise TypeError("value must be an ObjectIdentifier") @@ -204,25 +205,33 @@ class RegisteredID(object): value = utils.read_only_property("_value") - def __repr__(self): + def __repr__(self) -> str: return "<RegisteredID(value={})>".format(self.value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, RegisteredID): return NotImplemented return self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self.value) @utils.register_interface(GeneralName) class IPAddress(object): - def __init__(self, value): + def __init__( + self, + value: typing.Union[ + ipaddress.IPv4Address, + ipaddress.IPv6Address, + ipaddress.IPv4Network, + ipaddress.IPv6Network, + ], + ): if not isinstance( value, ( @@ -242,25 +251,25 @@ class IPAddress(object): value = utils.read_only_property("_value") - def __repr__(self): + def __repr__(self) -> str: return "<IPAddress(value={})>".format(self.value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, IPAddress): return NotImplemented return self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self.value) @utils.register_interface(GeneralName) class OtherName(object): - def __init__(self, type_id, value): + def __init__(self, type_id: ObjectIdentifier, value: bytes): if not isinstance(type_id, ObjectIdentifier): raise TypeError("type_id must be an ObjectIdentifier") if not isinstance(value, bytes): @@ -272,19 +281,19 @@ class OtherName(object): type_id = utils.read_only_property("_type_id") value = utils.read_only_property("_value") - def __repr__(self): + def __repr__(self) -> str: return "<OtherName(type_id={}, value={!r})>".format( self.type_id, self.value ) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, OtherName): return NotImplemented return self.type_id == other.type_id and self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash((self.type_id, self.value)) diff --git a/src/cryptography/x509/name.py b/src/cryptography/x509/name.py index 54439ed41..c183160e0 100644 --- a/src/cryptography/x509/name.py +++ b/src/cryptography/x509/name.py @@ -2,7 +2,7 @@ # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. - +import typing from enum import Enum from cryptography import utils @@ -74,7 +74,7 @@ def _escape_dn_value(val): class NameAttribute(object): - def __init__(self, oid, value, _type=_SENTINEL): + def __init__(self, oid: ObjectIdentifier, value: str, _type=_SENTINEL): if not isinstance(oid, ObjectIdentifier): raise TypeError( "oid argument must be an ObjectIdentifier instance." @@ -111,7 +111,7 @@ class NameAttribute(object): oid = utils.read_only_property("_oid") value = utils.read_only_property("_value") - def rfc4514_string(self): + def rfc4514_string(self) -> str: """ Format as RFC4514 Distinguished Name string. @@ -121,24 +121,24 @@ class NameAttribute(object): key = _NAMEOID_TO_NAME.get(self.oid, self.oid.dotted_string) return "%s=%s" % (key, _escape_dn_value(self.value)) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, NameAttribute): return NotImplemented return self.oid == other.oid and self.value == other.value - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash((self.oid, self.value)) - def __repr__(self): + def __repr__(self) -> str: return "<NameAttribute(oid={0.oid}, value={0.value!r})>".format(self) class RelativeDistinguishedName(object): - def __init__(self, attributes): + def __init__(self, attributes: typing.Iterable[NameAttribute]): attributes = list(attributes) if not attributes: raise ValueError("a relative distinguished name cannot be empty") @@ -152,10 +152,10 @@ class RelativeDistinguishedName(object): if len(self._attribute_set) != len(attributes): raise ValueError("duplicate attributes are not allowed") - def get_attributes_for_oid(self, oid): + def get_attributes_for_oid(self, oid) -> typing.List[NameAttribute]: return [i for i in self if i.oid == oid] - def rfc4514_string(self): + def rfc4514_string(self) -> str: """ Format as RFC4514 Distinguished Name string. @@ -164,25 +164,25 @@ class RelativeDistinguishedName(object): """ return "+".join(attr.rfc4514_string() for attr in self._attributes) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, RelativeDistinguishedName): return NotImplemented return self._attribute_set == other._attribute_set - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: return hash(self._attribute_set) - def __iter__(self): + def __iter__(self) -> typing.Iterator[NameAttribute]: return iter(self._attributes) - def __len__(self): + def __len__(self) -> int: return len(self._attributes) - def __repr__(self): + def __repr__(self) -> str: return "<RelativeDistinguishedName({})>".format(self.rfc4514_string()) @@ -201,7 +201,7 @@ class Name(object): " or a list RelativeDistinguishedName" ) - def rfc4514_string(self): + def rfc4514_string(self) -> str: """ Format as RFC4514 Distinguished Name string. For example 'CN=foobar.com,O=Foo Corp,C=US' @@ -216,39 +216,39 @@ class Name(object): attr.rfc4514_string() for attr in reversed(self._attributes) ) - def get_attributes_for_oid(self, oid): + def get_attributes_for_oid(self, oid) -> typing.Iterable[NameAttribute]: return [i for i in self if i.oid == oid] @property - def rdns(self): + def rdns(self) -> typing.Iterable[RelativeDistinguishedName]: return self._attributes - def public_bytes(self, backend=None): + def public_bytes(self, backend=None) -> bytes: backend = _get_backend(backend) return backend.x509_name_bytes(self) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, Name): return NotImplemented return self._attributes == other._attributes - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self == other - def __hash__(self): + def __hash__(self) -> int: # TODO: this is relatively expensive, if this looks like a bottleneck # for you, consider optimizing! return hash(tuple(self._attributes)) - def __iter__(self): + def __iter__(self) -> typing.Iterator[NameAttribute]: for rdn in self._attributes: for ava in rdn: yield ava - def __len__(self): + def __len__(self) -> int: return sum(len(rdn) for rdn in self._attributes) - def __repr__(self): + def __repr__(self) -> str: rdns = ",".join(attr.rfc4514_string() for attr in self._attributes) return "<Name({})>".format(rdns) diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py index 5d137d697..4d4992060 100644 --- a/src/cryptography/x509/ocsp.py +++ b/src/cryptography/x509/ocsp.py @@ -5,12 +5,14 @@ import abc import datetime +import typing from enum import Enum from cryptography import x509 -from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import hashes, serialization from cryptography.x509.base import ( _EARLIEST_UTC_TIME, + _PRIVATE_KEY_TYPES, _convert_to_naive_utc_time, _reject_duplicate_extension, ) @@ -65,55 +67,6 @@ class OCSPCertStatus(Enum): _CERT_STATUS_TO_ENUM = {x.value: x for x in OCSPCertStatus} -def load_der_ocsp_request(data): - from cryptography.hazmat.backends.openssl.backend import backend - - return backend.load_der_ocsp_request(data) - - -def load_der_ocsp_response(data): - from cryptography.hazmat.backends.openssl.backend import backend - - return backend.load_der_ocsp_response(data) - - -class OCSPRequestBuilder(object): - def __init__(self, request=None, extensions=[]): - self._request = request - self._extensions = extensions - - def add_certificate(self, cert, issuer, algorithm): - if self._request is not None: - raise ValueError("Only one certificate can be added to a request") - - _verify_algorithm(algorithm) - if not isinstance(cert, x509.Certificate) or not isinstance( - issuer, x509.Certificate - ): - raise TypeError("cert and issuer must be a Certificate") - - return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions) - - def add_extension(self, extension, critical): - if not isinstance(extension, x509.ExtensionType): - raise TypeError("extension must be an ExtensionType") - - extension = x509.Extension(extension.oid, critical, extension) - _reject_duplicate_extension(extension, self._extensions) - - return OCSPRequestBuilder( - self._request, self._extensions + [extension] - ) - - def build(self): - from cryptography.hazmat.backends.openssl.backend import backend - - if self._request is None: - raise ValueError("You must add a certificate before building") - - return backend.create_ocsp_request(self) - - class _SingleResponse(object): def __init__( self, @@ -184,151 +137,39 @@ class _SingleResponse(object): self._revocation_reason = revocation_reason -class OCSPResponseBuilder(object): - def __init__( - self, response=None, responder_id=None, certs=None, extensions=[] - ): - self._response = response - self._responder_id = responder_id - self._certs = certs - self._extensions = extensions - - def add_response( - self, - cert, - issuer, - algorithm, - cert_status, - this_update, - next_update, - revocation_time, - revocation_reason, - ): - if self._response is not None: - raise ValueError("Only one response per OCSPResponse.") - - singleresp = _SingleResponse( - cert, - issuer, - algorithm, - cert_status, - this_update, - next_update, - revocation_time, - revocation_reason, - ) - return OCSPResponseBuilder( - singleresp, - self._responder_id, - self._certs, - self._extensions, - ) - - def responder_id(self, encoding, responder_cert): - if self._responder_id is not None: - raise ValueError("responder_id can only be set once") - if not isinstance(responder_cert, x509.Certificate): - raise TypeError("responder_cert must be a Certificate") - if not isinstance(encoding, OCSPResponderEncoding): - raise TypeError( - "encoding must be an element from OCSPResponderEncoding" - ) - - return OCSPResponseBuilder( - self._response, - (responder_cert, encoding), - self._certs, - self._extensions, - ) - - def certificates(self, certs): - if self._certs is not None: - raise ValueError("certificates may only be set once") - certs = list(certs) - if len(certs) == 0: - raise ValueError("certs must not be an empty list") - if not all(isinstance(x, x509.Certificate) for x in certs): - raise TypeError("certs must be a list of Certificates") - return OCSPResponseBuilder( - self._response, - self._responder_id, - certs, - self._extensions, - ) - - def add_extension(self, extension, critical): - if not isinstance(extension, x509.ExtensionType): - raise TypeError("extension must be an ExtensionType") - - extension = x509.Extension(extension.oid, critical, extension) - _reject_duplicate_extension(extension, self._extensions) - - return OCSPResponseBuilder( - self._response, - self._responder_id, - self._certs, - self._extensions + [extension], - ) - - def sign(self, private_key, algorithm): - from cryptography.hazmat.backends.openssl.backend import backend - - if self._response is None: - raise ValueError("You must add a response before signing") - if self._responder_id is None: - raise ValueError("You must add a responder_id before signing") - - return backend.create_ocsp_response( - OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm - ) - - @classmethod - def build_unsuccessful(cls, response_status): - from cryptography.hazmat.backends.openssl.backend import backend - - if not isinstance(response_status, OCSPResponseStatus): - raise TypeError( - "response_status must be an item from OCSPResponseStatus" - ) - if response_status is OCSPResponseStatus.SUCCESSFUL: - raise ValueError("response_status cannot be SUCCESSFUL") - - return backend.create_ocsp_response(response_status, None, None, None) - - class OCSPRequest(metaclass=abc.ABCMeta): @abc.abstractproperty - def issuer_key_hash(self): + def issuer_key_hash(self) -> bytes: """ The hash of the issuer public key """ @abc.abstractproperty - def issuer_name_hash(self): + def issuer_name_hash(self) -> bytes: """ The hash of the issuer name """ @abc.abstractproperty - def hash_algorithm(self): + def hash_algorithm(self) -> hashes.HashAlgorithm: """ The hash algorithm used in the issuer name and key hashes """ @abc.abstractproperty - def serial_number(self): + def serial_number(self) -> int: """ The serial number of the cert whose status is being checked """ @abc.abstractmethod - def public_bytes(self, encoding): + def public_bytes(self, encoding: serialization.Encoding) -> bytes: """ Serializes the request to DER """ @abc.abstractproperty - def extensions(self): + def extensions(self) -> x509.Extensions: """ The list of request extensions. Not single request extensions. """ @@ -336,38 +177,40 @@ class OCSPRequest(metaclass=abc.ABCMeta): class OCSPResponse(metaclass=abc.ABCMeta): @abc.abstractproperty - def response_status(self): + def response_status(self) -> OCSPResponseStatus: """ The status of the response. This is a value from the OCSPResponseStatus enumeration """ @abc.abstractproperty - def signature_algorithm_oid(self): + def signature_algorithm_oid(self) -> x509.ObjectIdentifier: """ The ObjectIdentifier of the signature algorithm """ @abc.abstractproperty - def signature_hash_algorithm(self): + def signature_hash_algorithm( + self, + ) -> typing.Optional[hashes.HashAlgorithm]: """ Returns a HashAlgorithm corresponding to the type of the digest signed """ @abc.abstractproperty - def signature(self): + def signature(self) -> bytes: """ The signature bytes """ @abc.abstractproperty - def tbs_response_bytes(self): + def tbs_response_bytes(self) -> bytes: """ The tbsResponseData bytes """ @abc.abstractproperty - def certificates(self): + def certificates(self) -> typing.List[x509.Certificate]: """ A list of certificates used to help build a chain to verify the OCSP response. This situation occurs when the OCSP responder uses a delegate @@ -375,88 +218,264 @@ class OCSPResponse(metaclass=abc.ABCMeta): """ @abc.abstractproperty - def responder_key_hash(self): + def responder_key_hash(self) -> typing.Optional[bytes]: """ The responder's key hash or None """ @abc.abstractproperty - def responder_name(self): + def responder_name(self) -> typing.Optional[x509.Name]: """ The responder's Name or None """ @abc.abstractproperty - def produced_at(self): + def produced_at(self) -> datetime.datetime: """ The time the response was produced """ @abc.abstractproperty - def certificate_status(self): + def certificate_status(self) -> OCSPCertStatus: """ The status of the certificate (an element from the OCSPCertStatus enum) """ @abc.abstractproperty - def revocation_time(self): + def revocation_time(self) -> typing.Optional[datetime.datetime]: """ The date of when the certificate was revoked or None if not revoked. """ @abc.abstractproperty - def revocation_reason(self): + def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]: """ The reason the certificate was revoked or None if not specified or not revoked. """ @abc.abstractproperty - def this_update(self): + def this_update(self) -> datetime.datetime: """ The most recent time at which the status being indicated is known by the responder to have been correct """ @abc.abstractproperty - def next_update(self): + def next_update(self) -> typing.Optional[datetime.datetime]: """ The time when newer information will be available """ @abc.abstractproperty - def issuer_key_hash(self): + def issuer_key_hash(self) -> bytes: """ The hash of the issuer public key """ @abc.abstractproperty - def issuer_name_hash(self): + def issuer_name_hash(self) -> bytes: """ The hash of the issuer name """ @abc.abstractproperty - def hash_algorithm(self): + def hash_algorithm(self) -> hashes.HashAlgorithm: """ The hash algorithm used in the issuer name and key hashes """ @abc.abstractproperty - def serial_number(self): + def serial_number(self) -> int: """ The serial number of the cert whose status is being checked """ @abc.abstractproperty - def extensions(self): + def extensions(self) -> x509.Extensions: """ The list of response extensions. Not single response extensions. """ @abc.abstractproperty - def single_extensions(self): + def single_extensions(self) -> x509.Extensions: """ The list of single response extensions. Not response extensions. """ + + +class OCSPRequestBuilder(object): + def __init__(self, request=None, extensions=[]): + self._request = request + self._extensions = extensions + + def add_certificate( + self, + cert: x509.Certificate, + issuer: x509.Certificate, + algorithm: hashes.HashAlgorithm, + ) -> "OCSPRequestBuilder": + if self._request is not None: + raise ValueError("Only one certificate can be added to a request") + + _verify_algorithm(algorithm) + if not isinstance(cert, x509.Certificate) or not isinstance( + issuer, x509.Certificate + ): + raise TypeError("cert and issuer must be a Certificate") + + return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions) + + def add_extension( + self, extval: x509.ExtensionType, critical: bool + ) -> "OCSPRequestBuilder": + if not isinstance(extval, x509.ExtensionType): + raise TypeError("extension must be an ExtensionType") + + extension = x509.Extension(extval.oid, critical, extval) + _reject_duplicate_extension(extension, self._extensions) + + return OCSPRequestBuilder( + self._request, self._extensions + [extension] + ) + + def build(self) -> OCSPRequest: + from cryptography.hazmat.backends.openssl.backend import backend + + if self._request is None: + raise ValueError("You must add a certificate before building") + + return backend.create_ocsp_request(self) + + +class OCSPResponseBuilder(object): + def __init__( + self, response=None, responder_id=None, certs=None, extensions=[] + ): + self._response = response + self._responder_id = responder_id + self._certs = certs + self._extensions = extensions + + def add_response( + self, + cert: x509.Certificate, + issuer: x509.Certificate, + algorithm: hashes.HashAlgorithm, + cert_status: OCSPCertStatus, + this_update: datetime.datetime, + next_update: datetime.datetime, + revocation_time: datetime.datetime, + revocation_reason: x509.ReasonFlags, + ) -> "OCSPResponseBuilder": + if self._response is not None: + raise ValueError("Only one response per OCSPResponse.") + + singleresp = _SingleResponse( + cert, + issuer, + algorithm, + cert_status, + this_update, + next_update, + revocation_time, + revocation_reason, + ) + return OCSPResponseBuilder( + singleresp, + self._responder_id, + self._certs, + self._extensions, + ) + + def responder_id( + self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate + ) -> "OCSPResponseBuilder": + if self._responder_id is not None: + raise ValueError("responder_id can only be set once") + if not isinstance(responder_cert, x509.Certificate): + raise TypeError("responder_cert must be a Certificate") + if not isinstance(encoding, OCSPResponderEncoding): + raise TypeError( + "encoding must be an element from OCSPResponderEncoding" + ) + + return OCSPResponseBuilder( + self._response, + (responder_cert, encoding), + self._certs, + self._extensions, + ) + + def certificates( + self, certs: typing.Iterable[x509.Certificate] + ) -> "OCSPResponseBuilder": + if self._certs is not None: + raise ValueError("certificates may only be set once") + certs = list(certs) + if len(certs) == 0: + raise ValueError("certs must not be an empty list") + if not all(isinstance(x, x509.Certificate) for x in certs): + raise TypeError("certs must be a list of Certificates") + return OCSPResponseBuilder( + self._response, + self._responder_id, + certs, + self._extensions, + ) + + def add_extension( + self, extval: x509.ExtensionType, critical: bool + ) -> "OCSPResponseBuilder": + if not isinstance(extval, x509.ExtensionType): + raise TypeError("extension must be an ExtensionType") + + extension = x509.Extension(extval.oid, critical, extval) + _reject_duplicate_extension(extension, self._extensions) + + return OCSPResponseBuilder( + self._response, + self._responder_id, + self._certs, + self._extensions + [extension], + ) + + def sign( + self, private_key: _PRIVATE_KEY_TYPES, algorithm: hashes.HashAlgorithm + ) -> OCSPResponse: + from cryptography.hazmat.backends.openssl.backend import backend + + if self._response is None: + raise ValueError("You must add a response before signing") + if self._responder_id is None: + raise ValueError("You must add a responder_id before signing") + + return backend.create_ocsp_response( + OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm + ) + + @classmethod + def build_unsuccessful(cls, response_status: OCSPResponse) -> OCSPResponse: + from cryptography.hazmat.backends.openssl.backend import backend + + if not isinstance(response_status, OCSPResponseStatus): + raise TypeError( + "response_status must be an item from OCSPResponseStatus" + ) + if response_status is OCSPResponseStatus.SUCCESSFUL: + raise ValueError("response_status cannot be SUCCESSFUL") + + return backend.create_ocsp_response(response_status, None, None, None) + + +def load_der_ocsp_request(data: bytes) -> OCSPRequest: + from cryptography.hazmat.backends.openssl.backend import backend + + return backend.load_der_ocsp_request(data) + + +def load_der_ocsp_response(data: bytes) -> OCSPResponse: + from cryptography.hazmat.backends.openssl.backend import backend + + return backend.load_der_ocsp_response(data) |
