summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Gaynor <alex.gaynor@gmail.com>2021-01-30 17:44:14 -0500
committerGitHub <noreply@github.com>2021-01-30 16:44:14 -0600
commitf16bff2cbd2855f0becb27724334e4bdb0679247 (patch)
treef5636309e64b6dab7055863fd7426d62a8816c55 /src
parent83c598ac3df8b06f4f68b0cc845de1fde0e454a3 (diff)
downloadcryptography-f16bff2cbd2855f0becb27724334e4bdb0679247.tar.gz
Apply type annotations to x509 ct and ocsp (#5712)
Diffstat (limited to 'src')
-rw-r--r--src/cryptography/fernet.py4
-rw-r--r--src/cryptography/hazmat/backends/openssl/ocsp.py76
-rw-r--r--src/cryptography/hazmat/backends/openssl/x509.py6
-rw-r--r--src/cryptography/x509/base.py4
-rw-r--r--src/cryptography/x509/certificate_transparency.py9
-rw-r--r--src/cryptography/x509/extensions.py9
-rw-r--r--src/cryptography/x509/general_name.py77
-rw-r--r--src/cryptography/x509/name.py52
-rw-r--r--src/cryptography/x509/ocsp.py395
9 files changed, 334 insertions, 298 deletions
diff --git a/src/cryptography/fernet.py b/src/cryptography/fernet.py
index c9464c5ad..57772fee4 100644
--- a/src/cryptography/fernet.py
+++ b/src/cryptography/fernet.py
@@ -157,8 +157,8 @@ class Fernet(object):
class MultiFernet(object):
- def __init__(self, fernets_it: typing.Iterator[Fernet]):
- fernets = list(fernets_it)
+ def __init__(self, fernets: typing.Iterable[Fernet]):
+ fernets = list(fernets)
if not fernets:
raise ValueError(
"MultiFernet requires at least one Fernet instance"
diff --git a/src/cryptography/hazmat/backends/openssl/ocsp.py b/src/cryptography/hazmat/backends/openssl/ocsp.py
index 231794c6b..9138e78db 100644
--- a/src/cryptography/hazmat/backends/openssl/ocsp.py
+++ b/src/cryptography/hazmat/backends/openssl/ocsp.py
@@ -2,6 +2,8 @@
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
+import datetime
+import typing
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
@@ -14,7 +16,7 @@ from cryptography.hazmat.backends.openssl.decode_asn1 import (
_parse_asn1_generalized_time,
)
from cryptography.hazmat.backends.openssl.x509 import _Certificate
-from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509.ocsp import (
OCSPCertStatus,
OCSPRequest,
@@ -84,8 +86,7 @@ def _hash_algorithm(backend, cert_id):
)
-@utils.register_interface(OCSPResponse)
-class _OCSPResponse(object):
+class _OCSPResponse(OCSPResponse):
def __init__(self, backend, ocsp_response):
self._backend = backend
self._ocsp_response = ocsp_response
@@ -120,7 +121,7 @@ class _OCSPResponse(object):
response_status = utils.read_only_property("_status")
- def _requires_successful_response(self):
+ def _requires_successful_response(self) -> None:
if self.response_status != OCSPResponseStatus.SUCCESSFUL:
raise ValueError(
"OCSP response status is not successful so the property "
@@ -128,7 +129,7 @@ class _OCSPResponse(object):
)
@property
- def signature_algorithm_oid(self):
+ def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
self._requires_successful_response()
alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
self._backend.openssl_assert(alg != self._backend._ffi.NULL)
@@ -136,7 +137,9 @@ class _OCSPResponse(object):
return x509.ObjectIdentifier(oid)
@property
- def signature_hash_algorithm(self):
+ def signature_hash_algorithm(
+ self,
+ ) -> typing.Optional[hashes.HashAlgorithm]:
self._requires_successful_response()
oid = self.signature_algorithm_oid
try:
@@ -147,14 +150,14 @@ class _OCSPResponse(object):
)
@property
- def signature(self):
+ def signature(self) -> bytes:
self._requires_successful_response()
sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
self._backend.openssl_assert(sig != self._backend._ffi.NULL)
return _asn1_string_to_bytes(self._backend, sig)
@property
- def tbs_response_bytes(self):
+ def tbs_response_bytes(self) -> bytes:
self._requires_successful_response()
respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
@@ -168,25 +171,25 @@ class _OCSPResponse(object):
return self._backend._ffi.buffer(pp[0], res)[:]
@property
- def certificates(self):
+ def certificates(self) -> typing.List[x509.Certificate]:
self._requires_successful_response()
sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
num = self._backend._lib.sk_X509_num(sk_x509)
- certs = []
+ certs: typing.List[x509.Certificate] = []
for i in range(num):
- x509 = self._backend._lib.sk_X509_value(sk_x509, i)
- self._backend.openssl_assert(x509 != self._backend._ffi.NULL)
- cert = _Certificate(self._backend, x509)
+ x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i)
+ self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL)
+ cert = _Certificate(self._backend, x509_ptr)
# We need to keep the OCSP response that the certificate came from
# alive until the Certificate object itself goes out of scope, so
# we give it a private reference.
- cert._ocsp_resp = self
+ cert._ocsp_resp_ref = self
certs.append(cert)
return certs
@property
- def responder_key_hash(self):
+ def responder_key_hash(self) -> typing.Optional[bytes]:
self._requires_successful_response()
_, asn1_string = self._responder_key_name()
if asn1_string == self._backend._ffi.NULL:
@@ -195,7 +198,7 @@ class _OCSPResponse(object):
return _asn1_string_to_bytes(self._backend, asn1_string)
@property
- def responder_name(self):
+ def responder_name(self) -> typing.Optional[x509.Name]:
self._requires_successful_response()
x509_name, _ = self._responder_key_name()
if x509_name == self._backend._ffi.NULL:
@@ -213,7 +216,7 @@ class _OCSPResponse(object):
return x509_name[0], asn1_string[0]
@property
- def produced_at(self):
+ def produced_at(self) -> datetime.datetime:
self._requires_successful_response()
produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
self._basic
@@ -221,7 +224,7 @@ class _OCSPResponse(object):
return _parse_asn1_generalized_time(self._backend, produced_at)
@property
- def certificate_status(self):
+ def certificate_status(self) -> OCSPCertStatus:
self._requires_successful_response()
status = self._backend._lib.OCSP_single_get0_status(
self._single,
@@ -234,7 +237,7 @@ class _OCSPResponse(object):
return _CERT_STATUS_TO_ENUM[status]
@property
- def revocation_time(self):
+ def revocation_time(self) -> typing.Optional[datetime.datetime]:
self._requires_successful_response()
if self.certificate_status is not OCSPCertStatus.REVOKED:
return None
@@ -251,7 +254,7 @@ class _OCSPResponse(object):
return _parse_asn1_generalized_time(self._backend, asn1_time[0])
@property
- def revocation_reason(self):
+ def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
self._requires_successful_response()
if self.certificate_status is not OCSPCertStatus.REVOKED:
return None
@@ -274,7 +277,7 @@ class _OCSPResponse(object):
return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
@property
- def this_update(self):
+ def this_update(self) -> datetime.datetime:
self._requires_successful_response()
asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
self._backend._lib.OCSP_single_get0_status(
@@ -288,7 +291,7 @@ class _OCSPResponse(object):
return _parse_asn1_generalized_time(self._backend, asn1_time[0])
@property
- def next_update(self):
+ def next_update(self) -> typing.Optional[datetime.datetime]:
self._requires_successful_response()
asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
self._backend._lib.OCSP_single_get0_status(
@@ -304,36 +307,36 @@ class _OCSPResponse(object):
return None
@property
- def issuer_key_hash(self):
+ def issuer_key_hash(self) -> bytes:
self._requires_successful_response()
return _issuer_key_hash(self._backend, self._cert_id)
@property
- def issuer_name_hash(self):
+ def issuer_name_hash(self) -> bytes:
self._requires_successful_response()
return _issuer_name_hash(self._backend, self._cert_id)
@property
- def hash_algorithm(self):
+ def hash_algorithm(self) -> hashes.HashAlgorithm:
self._requires_successful_response()
return _hash_algorithm(self._backend, self._cert_id)
@property
- def serial_number(self):
+ def serial_number(self) -> int:
self._requires_successful_response()
return _serial_number(self._backend, self._cert_id)
@utils.cached_property
- def extensions(self):
+ def extensions(self) -> x509.Extensions:
self._requires_successful_response()
return self._backend._ocsp_basicresp_ext_parser.parse(self._basic)
@utils.cached_property
- def single_extensions(self):
+ def single_extensions(self) -> x509.Extensions:
self._requires_successful_response()
return self._backend._ocsp_singleresp_ext_parser.parse(self._single)
- def public_bytes(self, encoding):
+ def public_bytes(self, encoding: serialization.Encoding) -> bytes:
if encoding is not serialization.Encoding.DER:
raise ValueError("The only allowed encoding value is Encoding.DER")
@@ -345,8 +348,7 @@ class _OCSPResponse(object):
return self._backend._read_mem_bio(bio)
-@utils.register_interface(OCSPRequest)
-class _OCSPRequest(object):
+class _OCSPRequest(OCSPRequest):
def __init__(self, backend, ocsp_request):
if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
raise NotImplementedError(
@@ -362,26 +364,26 @@ class _OCSPRequest(object):
self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)
@property
- def issuer_key_hash(self):
+ def issuer_key_hash(self) -> bytes:
return _issuer_key_hash(self._backend, self._cert_id)
@property
- def issuer_name_hash(self):
+ def issuer_name_hash(self) -> bytes:
return _issuer_name_hash(self._backend, self._cert_id)
@property
- def serial_number(self):
+ def serial_number(self) -> int:
return _serial_number(self._backend, self._cert_id)
@property
- def hash_algorithm(self):
+ def hash_algorithm(self) -> hashes.HashAlgorithm:
return _hash_algorithm(self._backend, self._cert_id)
@utils.cached_property
- def extensions(self):
+ def extensions(self) -> x509.Extensions:
return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request)
- def public_bytes(self, encoding):
+ def public_bytes(self, encoding: serialization.Encoding) -> bytes:
if encoding is not serialization.Encoding.DER:
raise ValueError("The only allowed encoding value is Encoding.DER")
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py
index f460ed2da..07a1dd8a4 100644
--- a/src/cryptography/hazmat/backends/openssl/x509.py
+++ b/src/cryptography/hazmat/backends/openssl/x509.py
@@ -26,8 +26,10 @@ from cryptography.x509.base import _PUBLIC_KEY_TYPES
from cryptography.x509.name import _ASN1Type
-@utils.register_interface(x509.Certificate)
-class _Certificate(object):
+class _Certificate(x509.Certificate):
+ # Keep-alive reference used by OCSP
+ _ocsp_resp_ref: typing.Any
+
def __init__(self, backend, x509_cert):
self._backend = backend
self._x509 = x509_cert
diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py
index c17fff8ce..5c5e6c6da 100644
--- a/src/cryptography/x509/base.py
+++ b/src/cryptography/x509/base.py
@@ -140,7 +140,9 @@ class Certificate(metaclass=abc.ABCMeta):
"""
@abc.abstractproperty
- def signature_hash_algorithm(self) -> hashes.HashAlgorithm:
+ def signature_hash_algorithm(
+ self,
+ ) -> typing.Optional[hashes.HashAlgorithm]:
"""
Returns a HashAlgorithm corresponding to the type of the digest signed
in the certificate.
diff --git a/src/cryptography/x509/certificate_transparency.py b/src/cryptography/x509/certificate_transparency.py
index 44e18803d..d51bee92e 100644
--- a/src/cryptography/x509/certificate_transparency.py
+++ b/src/cryptography/x509/certificate_transparency.py
@@ -4,6 +4,7 @@
import abc
+import datetime
from enum import Enum
@@ -18,25 +19,25 @@ class Version(Enum):
class SignedCertificateTimestamp(metaclass=abc.ABCMeta):
@abc.abstractproperty
- def version(self):
+ def version(self) -> Version:
"""
Returns the SCT version.
"""
@abc.abstractproperty
- def log_id(self):
+ def log_id(self) -> bytes:
"""
Returns an identifier indicating which log this SCT is for.
"""
@abc.abstractproperty
- def timestamp(self):
+ def timestamp(self) -> datetime.datetime:
"""
Returns the timestamp for this SCT.
"""
@abc.abstractproperty
- def entry_type(self):
+ def entry_type(self) -> LogEntryType:
"""
Returns whether this is an SCT for a certificate or pre-certificate.
"""
diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py
index dfad69770..0602908e4 100644
--- a/src/cryptography/x509/extensions.py
+++ b/src/cryptography/x509/extensions.py
@@ -7,6 +7,7 @@ import abc
import datetime
import hashlib
import ipaddress
+import typing
from enum import Enum
from cryptography import utils
@@ -86,27 +87,27 @@ def _make_sequence_methods(field_name):
class DuplicateExtension(Exception):
- def __init__(self, msg, oid):
+ def __init__(self, msg: str, oid: ObjectIdentifier):
super(DuplicateExtension, self).__init__(msg)
self.oid = oid
class ExtensionNotFound(Exception):
- def __init__(self, msg, oid):
+ def __init__(self, msg: str, oid: ObjectIdentifier):
super(ExtensionNotFound, self).__init__(msg)
self.oid = oid
class ExtensionType(metaclass=abc.ABCMeta):
@abc.abstractproperty
- def oid(self):
+ def oid(self) -> ObjectIdentifier:
"""
Returns the oid associated with the given extension type.
"""
class Extensions(object):
- def __init__(self, extensions):
+ def __init__(self, extensions: typing.List[ExtensionType]):
self._extensions = extensions
def get_extension_for_oid(self, oid):
diff --git a/src/cryptography/x509/general_name.py b/src/cryptography/x509/general_name.py
index 7c1306e26..6683e9313 100644
--- a/src/cryptography/x509/general_name.py
+++ b/src/cryptography/x509/general_name.py
@@ -5,6 +5,7 @@
import abc
import ipaddress
+import typing
from email.utils import parseaddr
from cryptography import utils
@@ -41,7 +42,7 @@ class GeneralName(metaclass=abc.ABCMeta):
@utils.register_interface(GeneralName)
class RFC822Name(object):
- def __init__(self, value):
+ def __init__(self, value: str):
if isinstance(value, str):
try:
value.encode("ascii")
@@ -70,25 +71,25 @@ class RFC822Name(object):
instance._value = value
return instance
- def __repr__(self):
+ def __repr__(self) -> str:
return "<RFC822Name(value={0!r})>".format(self.value)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, RFC822Name):
return NotImplemented
return self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.value)
@utils.register_interface(GeneralName)
class DNSName(object):
- def __init__(self, value):
+ def __init__(self, value: str):
if isinstance(value, str):
try:
value.encode("ascii")
@@ -114,22 +115,22 @@ class DNSName(object):
def __repr__(self):
return "<DNSName(value={0!r})>".format(self.value)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, DNSName):
return NotImplemented
return self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.value)
@utils.register_interface(GeneralName)
class UniformResourceIdentifier(object):
- def __init__(self, value):
+ def __init__(self, value: str):
if isinstance(value, str):
try:
value.encode("ascii")
@@ -152,25 +153,25 @@ class UniformResourceIdentifier(object):
instance._value = value
return instance
- def __repr__(self):
+ def __repr__(self) -> str:
return "<UniformResourceIdentifier(value={0!r})>".format(self.value)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, UniformResourceIdentifier):
return NotImplemented
return self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.value)
@utils.register_interface(GeneralName)
class DirectoryName(object):
- def __init__(self, value):
+ def __init__(self, value: Name):
if not isinstance(value, Name):
raise TypeError("value must be a Name")
@@ -178,25 +179,25 @@ class DirectoryName(object):
value = utils.read_only_property("_value")
- def __repr__(self):
+ def __repr__(self) -> str:
return "<DirectoryName(value={})>".format(self.value)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, DirectoryName):
return NotImplemented
return self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.value)
@utils.register_interface(GeneralName)
class RegisteredID(object):
- def __init__(self, value):
+ def __init__(self, value: ObjectIdentifier):
if not isinstance(value, ObjectIdentifier):
raise TypeError("value must be an ObjectIdentifier")
@@ -204,25 +205,33 @@ class RegisteredID(object):
value = utils.read_only_property("_value")
- def __repr__(self):
+ def __repr__(self) -> str:
return "<RegisteredID(value={})>".format(self.value)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, RegisteredID):
return NotImplemented
return self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.value)
@utils.register_interface(GeneralName)
class IPAddress(object):
- def __init__(self, value):
+ def __init__(
+ self,
+ value: typing.Union[
+ ipaddress.IPv4Address,
+ ipaddress.IPv6Address,
+ ipaddress.IPv4Network,
+ ipaddress.IPv6Network,
+ ],
+ ):
if not isinstance(
value,
(
@@ -242,25 +251,25 @@ class IPAddress(object):
value = utils.read_only_property("_value")
- def __repr__(self):
+ def __repr__(self) -> str:
return "<IPAddress(value={})>".format(self.value)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, IPAddress):
return NotImplemented
return self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.value)
@utils.register_interface(GeneralName)
class OtherName(object):
- def __init__(self, type_id, value):
+ def __init__(self, type_id: ObjectIdentifier, value: bytes):
if not isinstance(type_id, ObjectIdentifier):
raise TypeError("type_id must be an ObjectIdentifier")
if not isinstance(value, bytes):
@@ -272,19 +281,19 @@ class OtherName(object):
type_id = utils.read_only_property("_type_id")
value = utils.read_only_property("_value")
- def __repr__(self):
+ def __repr__(self) -> str:
return "<OtherName(type_id={}, value={!r})>".format(
self.type_id, self.value
)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, OtherName):
return NotImplemented
return self.type_id == other.type_id and self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash((self.type_id, self.value))
diff --git a/src/cryptography/x509/name.py b/src/cryptography/x509/name.py
index 54439ed41..c183160e0 100644
--- a/src/cryptography/x509/name.py
+++ b/src/cryptography/x509/name.py
@@ -2,7 +2,7 @@
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
-
+import typing
from enum import Enum
from cryptography import utils
@@ -74,7 +74,7 @@ def _escape_dn_value(val):
class NameAttribute(object):
- def __init__(self, oid, value, _type=_SENTINEL):
+ def __init__(self, oid: ObjectIdentifier, value: str, _type=_SENTINEL):
if not isinstance(oid, ObjectIdentifier):
raise TypeError(
"oid argument must be an ObjectIdentifier instance."
@@ -111,7 +111,7 @@ class NameAttribute(object):
oid = utils.read_only_property("_oid")
value = utils.read_only_property("_value")
- def rfc4514_string(self):
+ def rfc4514_string(self) -> str:
"""
Format as RFC4514 Distinguished Name string.
@@ -121,24 +121,24 @@ class NameAttribute(object):
key = _NAMEOID_TO_NAME.get(self.oid, self.oid.dotted_string)
return "%s=%s" % (key, _escape_dn_value(self.value))
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, NameAttribute):
return NotImplemented
return self.oid == other.oid and self.value == other.value
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash((self.oid, self.value))
- def __repr__(self):
+ def __repr__(self) -> str:
return "<NameAttribute(oid={0.oid}, value={0.value!r})>".format(self)
class RelativeDistinguishedName(object):
- def __init__(self, attributes):
+ def __init__(self, attributes: typing.Iterable[NameAttribute]):
attributes = list(attributes)
if not attributes:
raise ValueError("a relative distinguished name cannot be empty")
@@ -152,10 +152,10 @@ class RelativeDistinguishedName(object):
if len(self._attribute_set) != len(attributes):
raise ValueError("duplicate attributes are not allowed")
- def get_attributes_for_oid(self, oid):
+ def get_attributes_for_oid(self, oid) -> typing.List[NameAttribute]:
return [i for i in self if i.oid == oid]
- def rfc4514_string(self):
+ def rfc4514_string(self) -> str:
"""
Format as RFC4514 Distinguished Name string.
@@ -164,25 +164,25 @@ class RelativeDistinguishedName(object):
"""
return "+".join(attr.rfc4514_string() for attr in self._attributes)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, RelativeDistinguishedName):
return NotImplemented
return self._attribute_set == other._attribute_set
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self._attribute_set)
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[NameAttribute]:
return iter(self._attributes)
- def __len__(self):
+ def __len__(self) -> int:
return len(self._attributes)
- def __repr__(self):
+ def __repr__(self) -> str:
return "<RelativeDistinguishedName({})>".format(self.rfc4514_string())
@@ -201,7 +201,7 @@ class Name(object):
" or a list RelativeDistinguishedName"
)
- def rfc4514_string(self):
+ def rfc4514_string(self) -> str:
"""
Format as RFC4514 Distinguished Name string.
For example 'CN=foobar.com,O=Foo Corp,C=US'
@@ -216,39 +216,39 @@ class Name(object):
attr.rfc4514_string() for attr in reversed(self._attributes)
)
- def get_attributes_for_oid(self, oid):
+ def get_attributes_for_oid(self, oid) -> typing.Iterable[NameAttribute]:
return [i for i in self if i.oid == oid]
@property
- def rdns(self):
+ def rdns(self) -> typing.Iterable[RelativeDistinguishedName]:
return self._attributes
- def public_bytes(self, backend=None):
+ def public_bytes(self, backend=None) -> bytes:
backend = _get_backend(backend)
return backend.x509_name_bytes(self)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
if not isinstance(other, Name):
return NotImplemented
return self._attributes == other._attributes
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not self == other
- def __hash__(self):
+ def __hash__(self) -> int:
# TODO: this is relatively expensive, if this looks like a bottleneck
# for you, consider optimizing!
return hash(tuple(self._attributes))
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[NameAttribute]:
for rdn in self._attributes:
for ava in rdn:
yield ava
- def __len__(self):
+ def __len__(self) -> int:
return sum(len(rdn) for rdn in self._attributes)
- def __repr__(self):
+ def __repr__(self) -> str:
rdns = ",".join(attr.rfc4514_string() for attr in self._attributes)
return "<Name({})>".format(rdns)
diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py
index 5d137d697..4d4992060 100644
--- a/src/cryptography/x509/ocsp.py
+++ b/src/cryptography/x509/ocsp.py
@@ -5,12 +5,14 @@
import abc
import datetime
+import typing
from enum import Enum
from cryptography import x509
-from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509.base import (
_EARLIEST_UTC_TIME,
+ _PRIVATE_KEY_TYPES,
_convert_to_naive_utc_time,
_reject_duplicate_extension,
)
@@ -65,55 +67,6 @@ class OCSPCertStatus(Enum):
_CERT_STATUS_TO_ENUM = {x.value: x for x in OCSPCertStatus}
-def load_der_ocsp_request(data):
- from cryptography.hazmat.backends.openssl.backend import backend
-
- return backend.load_der_ocsp_request(data)
-
-
-def load_der_ocsp_response(data):
- from cryptography.hazmat.backends.openssl.backend import backend
-
- return backend.load_der_ocsp_response(data)
-
-
-class OCSPRequestBuilder(object):
- def __init__(self, request=None, extensions=[]):
- self._request = request
- self._extensions = extensions
-
- def add_certificate(self, cert, issuer, algorithm):
- if self._request is not None:
- raise ValueError("Only one certificate can be added to a request")
-
- _verify_algorithm(algorithm)
- if not isinstance(cert, x509.Certificate) or not isinstance(
- issuer, x509.Certificate
- ):
- raise TypeError("cert and issuer must be a Certificate")
-
- return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
-
- def add_extension(self, extension, critical):
- if not isinstance(extension, x509.ExtensionType):
- raise TypeError("extension must be an ExtensionType")
-
- extension = x509.Extension(extension.oid, critical, extension)
- _reject_duplicate_extension(extension, self._extensions)
-
- return OCSPRequestBuilder(
- self._request, self._extensions + [extension]
- )
-
- def build(self):
- from cryptography.hazmat.backends.openssl.backend import backend
-
- if self._request is None:
- raise ValueError("You must add a certificate before building")
-
- return backend.create_ocsp_request(self)
-
-
class _SingleResponse(object):
def __init__(
self,
@@ -184,151 +137,39 @@ class _SingleResponse(object):
self._revocation_reason = revocation_reason
-class OCSPResponseBuilder(object):
- def __init__(
- self, response=None, responder_id=None, certs=None, extensions=[]
- ):
- self._response = response
- self._responder_id = responder_id
- self._certs = certs
- self._extensions = extensions
-
- def add_response(
- self,
- cert,
- issuer,
- algorithm,
- cert_status,
- this_update,
- next_update,
- revocation_time,
- revocation_reason,
- ):
- if self._response is not None:
- raise ValueError("Only one response per OCSPResponse.")
-
- singleresp = _SingleResponse(
- cert,
- issuer,
- algorithm,
- cert_status,
- this_update,
- next_update,
- revocation_time,
- revocation_reason,
- )
- return OCSPResponseBuilder(
- singleresp,
- self._responder_id,
- self._certs,
- self._extensions,
- )
-
- def responder_id(self, encoding, responder_cert):
- if self._responder_id is not None:
- raise ValueError("responder_id can only be set once")
- if not isinstance(responder_cert, x509.Certificate):
- raise TypeError("responder_cert must be a Certificate")
- if not isinstance(encoding, OCSPResponderEncoding):
- raise TypeError(
- "encoding must be an element from OCSPResponderEncoding"
- )
-
- return OCSPResponseBuilder(
- self._response,
- (responder_cert, encoding),
- self._certs,
- self._extensions,
- )
-
- def certificates(self, certs):
- if self._certs is not None:
- raise ValueError("certificates may only be set once")
- certs = list(certs)
- if len(certs) == 0:
- raise ValueError("certs must not be an empty list")
- if not all(isinstance(x, x509.Certificate) for x in certs):
- raise TypeError("certs must be a list of Certificates")
- return OCSPResponseBuilder(
- self._response,
- self._responder_id,
- certs,
- self._extensions,
- )
-
- def add_extension(self, extension, critical):
- if not isinstance(extension, x509.ExtensionType):
- raise TypeError("extension must be an ExtensionType")
-
- extension = x509.Extension(extension.oid, critical, extension)
- _reject_duplicate_extension(extension, self._extensions)
-
- return OCSPResponseBuilder(
- self._response,
- self._responder_id,
- self._certs,
- self._extensions + [extension],
- )
-
- def sign(self, private_key, algorithm):
- from cryptography.hazmat.backends.openssl.backend import backend
-
- if self._response is None:
- raise ValueError("You must add a response before signing")
- if self._responder_id is None:
- raise ValueError("You must add a responder_id before signing")
-
- return backend.create_ocsp_response(
- OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
- )
-
- @classmethod
- def build_unsuccessful(cls, response_status):
- from cryptography.hazmat.backends.openssl.backend import backend
-
- if not isinstance(response_status, OCSPResponseStatus):
- raise TypeError(
- "response_status must be an item from OCSPResponseStatus"
- )
- if response_status is OCSPResponseStatus.SUCCESSFUL:
- raise ValueError("response_status cannot be SUCCESSFUL")
-
- return backend.create_ocsp_response(response_status, None, None, None)
-
-
class OCSPRequest(metaclass=abc.ABCMeta):
@abc.abstractproperty
- def issuer_key_hash(self):
+ def issuer_key_hash(self) -> bytes:
"""
The hash of the issuer public key
"""
@abc.abstractproperty
- def issuer_name_hash(self):
+ def issuer_name_hash(self) -> bytes:
"""
The hash of the issuer name
"""
@abc.abstractproperty
- def hash_algorithm(self):
+ def hash_algorithm(self) -> hashes.HashAlgorithm:
"""
The hash algorithm used in the issuer name and key hashes
"""
@abc.abstractproperty
- def serial_number(self):
+ def serial_number(self) -> int:
"""
The serial number of the cert whose status is being checked
"""
@abc.abstractmethod
- def public_bytes(self, encoding):
+ def public_bytes(self, encoding: serialization.Encoding) -> bytes:
"""
Serializes the request to DER
"""
@abc.abstractproperty
- def extensions(self):
+ def extensions(self) -> x509.Extensions:
"""
The list of request extensions. Not single request extensions.
"""
@@ -336,38 +177,40 @@ class OCSPRequest(metaclass=abc.ABCMeta):
class OCSPResponse(metaclass=abc.ABCMeta):
@abc.abstractproperty
- def response_status(self):
+ def response_status(self) -> OCSPResponseStatus:
"""
The status of the response. This is a value from the OCSPResponseStatus
enumeration
"""
@abc.abstractproperty
- def signature_algorithm_oid(self):
+ def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
"""
The ObjectIdentifier of the signature algorithm
"""
@abc.abstractproperty
- def signature_hash_algorithm(self):
+ def signature_hash_algorithm(
+ self,
+ ) -> typing.Optional[hashes.HashAlgorithm]:
"""
Returns a HashAlgorithm corresponding to the type of the digest signed
"""
@abc.abstractproperty
- def signature(self):
+ def signature(self) -> bytes:
"""
The signature bytes
"""
@abc.abstractproperty
- def tbs_response_bytes(self):
+ def tbs_response_bytes(self) -> bytes:
"""
The tbsResponseData bytes
"""
@abc.abstractproperty
- def certificates(self):
+ def certificates(self) -> typing.List[x509.Certificate]:
"""
A list of certificates used to help build a chain to verify the OCSP
response. This situation occurs when the OCSP responder uses a delegate
@@ -375,88 +218,264 @@ class OCSPResponse(metaclass=abc.ABCMeta):
"""
@abc.abstractproperty
- def responder_key_hash(self):
+ def responder_key_hash(self) -> typing.Optional[bytes]:
"""
The responder's key hash or None
"""
@abc.abstractproperty
- def responder_name(self):
+ def responder_name(self) -> typing.Optional[x509.Name]:
"""
The responder's Name or None
"""
@abc.abstractproperty
- def produced_at(self):
+ def produced_at(self) -> datetime.datetime:
"""
The time the response was produced
"""
@abc.abstractproperty
- def certificate_status(self):
+ def certificate_status(self) -> OCSPCertStatus:
"""
The status of the certificate (an element from the OCSPCertStatus enum)
"""
@abc.abstractproperty
- def revocation_time(self):
+ def revocation_time(self) -> typing.Optional[datetime.datetime]:
"""
The date of when the certificate was revoked or None if not
revoked.
"""
@abc.abstractproperty
- def revocation_reason(self):
+ def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
"""
The reason the certificate was revoked or None if not specified or
not revoked.
"""
@abc.abstractproperty
- def this_update(self):
+ def this_update(self) -> datetime.datetime:
"""
The most recent time at which the status being indicated is known by
the responder to have been correct
"""
@abc.abstractproperty
- def next_update(self):
+ def next_update(self) -> typing.Optional[datetime.datetime]:
"""
The time when newer information will be available
"""
@abc.abstractproperty
- def issuer_key_hash(self):
+ def issuer_key_hash(self) -> bytes:
"""
The hash of the issuer public key
"""
@abc.abstractproperty
- def issuer_name_hash(self):
+ def issuer_name_hash(self) -> bytes:
"""
The hash of the issuer name
"""
@abc.abstractproperty
- def hash_algorithm(self):
+ def hash_algorithm(self) -> hashes.HashAlgorithm:
"""
The hash algorithm used in the issuer name and key hashes
"""
@abc.abstractproperty
- def serial_number(self):
+ def serial_number(self) -> int:
"""
The serial number of the cert whose status is being checked
"""
@abc.abstractproperty
- def extensions(self):
+ def extensions(self) -> x509.Extensions:
"""
The list of response extensions. Not single response extensions.
"""
@abc.abstractproperty
- def single_extensions(self):
+ def single_extensions(self) -> x509.Extensions:
"""
The list of single response extensions. Not response extensions.
"""
+
+
+class OCSPRequestBuilder(object):
+ def __init__(self, request=None, extensions=[]):
+ self._request = request
+ self._extensions = extensions
+
+ def add_certificate(
+ self,
+ cert: x509.Certificate,
+ issuer: x509.Certificate,
+ algorithm: hashes.HashAlgorithm,
+ ) -> "OCSPRequestBuilder":
+ if self._request is not None:
+ raise ValueError("Only one certificate can be added to a request")
+
+ _verify_algorithm(algorithm)
+ if not isinstance(cert, x509.Certificate) or not isinstance(
+ issuer, x509.Certificate
+ ):
+ raise TypeError("cert and issuer must be a Certificate")
+
+ return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
+
+ def add_extension(
+ self, extval: x509.ExtensionType, critical: bool
+ ) -> "OCSPRequestBuilder":
+ if not isinstance(extval, x509.ExtensionType):
+ raise TypeError("extension must be an ExtensionType")
+
+ extension = x509.Extension(extval.oid, critical, extval)
+ _reject_duplicate_extension(extension, self._extensions)
+
+ return OCSPRequestBuilder(
+ self._request, self._extensions + [extension]
+ )
+
+ def build(self) -> OCSPRequest:
+ from cryptography.hazmat.backends.openssl.backend import backend
+
+ if self._request is None:
+ raise ValueError("You must add a certificate before building")
+
+ return backend.create_ocsp_request(self)
+
+
+class OCSPResponseBuilder(object):
+ def __init__(
+ self, response=None, responder_id=None, certs=None, extensions=[]
+ ):
+ self._response = response
+ self._responder_id = responder_id
+ self._certs = certs
+ self._extensions = extensions
+
+ def add_response(
+ self,
+ cert: x509.Certificate,
+ issuer: x509.Certificate,
+ algorithm: hashes.HashAlgorithm,
+ cert_status: OCSPCertStatus,
+ this_update: datetime.datetime,
+ next_update: datetime.datetime,
+ revocation_time: datetime.datetime,
+ revocation_reason: x509.ReasonFlags,
+ ) -> "OCSPResponseBuilder":
+ if self._response is not None:
+ raise ValueError("Only one response per OCSPResponse.")
+
+ singleresp = _SingleResponse(
+ cert,
+ issuer,
+ algorithm,
+ cert_status,
+ this_update,
+ next_update,
+ revocation_time,
+ revocation_reason,
+ )
+ return OCSPResponseBuilder(
+ singleresp,
+ self._responder_id,
+ self._certs,
+ self._extensions,
+ )
+
+ def responder_id(
+ self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate
+ ) -> "OCSPResponseBuilder":
+ if self._responder_id is not None:
+ raise ValueError("responder_id can only be set once")
+ if not isinstance(responder_cert, x509.Certificate):
+ raise TypeError("responder_cert must be a Certificate")
+ if not isinstance(encoding, OCSPResponderEncoding):
+ raise TypeError(
+ "encoding must be an element from OCSPResponderEncoding"
+ )
+
+ return OCSPResponseBuilder(
+ self._response,
+ (responder_cert, encoding),
+ self._certs,
+ self._extensions,
+ )
+
+ def certificates(
+ self, certs: typing.Iterable[x509.Certificate]
+ ) -> "OCSPResponseBuilder":
+ if self._certs is not None:
+ raise ValueError("certificates may only be set once")
+ certs = list(certs)
+ if len(certs) == 0:
+ raise ValueError("certs must not be an empty list")
+ if not all(isinstance(x, x509.Certificate) for x in certs):
+ raise TypeError("certs must be a list of Certificates")
+ return OCSPResponseBuilder(
+ self._response,
+ self._responder_id,
+ certs,
+ self._extensions,
+ )
+
+ def add_extension(
+ self, extval: x509.ExtensionType, critical: bool
+ ) -> "OCSPResponseBuilder":
+ if not isinstance(extval, x509.ExtensionType):
+ raise TypeError("extension must be an ExtensionType")
+
+ extension = x509.Extension(extval.oid, critical, extval)
+ _reject_duplicate_extension(extension, self._extensions)
+
+ return OCSPResponseBuilder(
+ self._response,
+ self._responder_id,
+ self._certs,
+ self._extensions + [extension],
+ )
+
+ def sign(
+ self, private_key: _PRIVATE_KEY_TYPES, algorithm: hashes.HashAlgorithm
+ ) -> OCSPResponse:
+ from cryptography.hazmat.backends.openssl.backend import backend
+
+ if self._response is None:
+ raise ValueError("You must add a response before signing")
+ if self._responder_id is None:
+ raise ValueError("You must add a responder_id before signing")
+
+ return backend.create_ocsp_response(
+ OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
+ )
+
+ @classmethod
+ def build_unsuccessful(cls, response_status: OCSPResponse) -> OCSPResponse:
+ from cryptography.hazmat.backends.openssl.backend import backend
+
+ if not isinstance(response_status, OCSPResponseStatus):
+ raise TypeError(
+ "response_status must be an item from OCSPResponseStatus"
+ )
+ if response_status is OCSPResponseStatus.SUCCESSFUL:
+ raise ValueError("response_status cannot be SUCCESSFUL")
+
+ return backend.create_ocsp_response(response_status, None, None, None)
+
+
+def load_der_ocsp_request(data: bytes) -> OCSPRequest:
+ from cryptography.hazmat.backends.openssl.backend import backend
+
+ return backend.load_der_ocsp_request(data)
+
+
+def load_der_ocsp_response(data: bytes) -> OCSPResponse:
+ from cryptography.hazmat.backends.openssl.backend import backend
+
+ return backend.load_der_ocsp_response(data)