diff options
Diffstat (limited to 'dns/dnssec.py')
-rw-r--r-- | dns/dnssec.py | 106 |
1 files changed, 104 insertions, 2 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py index 11f0701..4cfb75e 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -24,7 +24,7 @@ import math import struct import time import base64 -from datetime import datetime, timezone +from datetime import datetime from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash @@ -54,6 +54,10 @@ class ValidationFailure(dns.exception.DNSException): """The DNSSEC signature is invalid.""" +class DeniedByPolicy(dns.exception.DNSException): + """Denied by DNSSEC policy.""" + + PublicKey = Union[ "rsa.RSAPublicKey", "ec.EllipticCurvePublicKey", @@ -126,11 +130,64 @@ def key_id(key: DNSKEY) -> int: return total & 0xFFFF +class Policy: + def __init__(self): + pass + + def ok_to_sign(self, _: DNSKEY) -> bool: # pragma: no cover + return False + + def ok_to_validate(self, _: DNSKEY) -> bool: # pragma: no cover + return False + + def ok_to_create_ds(self, _: DSDigest) -> bool: # pragma: no cover + return False + + def ok_to_validate_ds(self, _: DSDigest) -> bool: # pragma: no cover + return False + + +class SimpleDeny(Policy): + def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds): + super().__init__() + self._deny_sign = deny_sign + self._deny_validate = deny_validate + self._deny_create_ds = deny_create_ds + self._deny_validate_ds = deny_validate_ds + + def ok_to_sign(self, key: DNSKEY) -> bool: + return key.algorithm not in self._deny_sign + + def ok_to_validate(self, key: DNSKEY) -> bool: + return key.algorithm not in self._deny_validate + + def ok_to_create_ds(self, algorithm: DSDigest) -> bool: + return algorithm not in self._deny_create_ds + + def ok_to_validate_ds(self, algorithm: DSDigest) -> bool: + return algorithm not in self._deny_validate_ds + + +rfc_8624_policy = SimpleDeny( + {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST}, + {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1}, + {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST}, + {DSDigest.NULL}, +) + +allow_all_policy = SimpleDeny(set(), set(), set(), set()) + + +default_policy = rfc_8624_policy + + def make_ds( name: Union[dns.name.Name, str], key: dns.rdata.Rdata, algorithm: Union[DSDigest, str], origin: Optional[dns.name.Name] = None, + policy: Optional[Policy] = None, + validating: bool = False, ) -> DS: """Create a DS record for a DNSSEC key. @@ -145,16 +202,34 @@ def make_ds( *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, then it will be made absolute using the specified origin. + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + + *validating*, a ``bool``. If ``True``, then policy is checked in + validating mode, i.e. "Is it ok to validate using this digest algorithm?". + Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with + this digest algorithm?". + Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. + Raises ``DeniedByPolicy`` if the algorithm is denied by policy. + Returns a ``dns.rdtypes.ANY.DS.DS`` """ + if policy is None: + policy = default_policy try: if isinstance(algorithm, str): algorithm = DSDigest[algorithm.upper()] except Exception: raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) + if validating: + check = policy.ok_to_validate_ds + else: + check = policy.ok_to_create_ds + if not check(algorithm): + raise DeniedByPolicy if not isinstance(key, DNSKEY): raise ValueError("key is not a DNSKEY") if algorithm == DSDigest.SHA1: @@ -388,6 +463,7 @@ def _validate_rrsig( keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], origin: Optional[dns.name.Name] = None, now: Optional[float] = None, + policy: Optional[Policy] = None, ) -> None: """Validate an RRset against a single signature rdata, throwing an exception if validation is not successful. @@ -410,6 +486,9 @@ def _validate_rrsig( use as the current time when validating. If ``None``, the actual current time is used. + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + Raises ``ValidationFailure`` if the signature is expired, not yet valid, the public key is invalid, the algorithm is unknown, the verification fails, etc. @@ -418,6 +497,9 @@ def _validate_rrsig( dnspython but not implemented. """ + if policy is None: + policy = default_policy + candidate_keys = _find_candidate_keys(keys, rrsig) if candidate_keys is None: raise ValidationFailure("unknown key") @@ -448,6 +530,8 @@ def _validate_rrsig( chosen_hash = _make_hash(rrsig.algorithm) for candidate_key in candidate_keys: + if not policy.ok_to_validate(candidate_key): + continue try: _validate_signature(sig, data, candidate_key, chosen_hash) return @@ -464,6 +548,7 @@ def _validate( keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], origin: Optional[dns.name.Name] = None, now: Optional[float] = None, + policy: Optional[Policy] = None, ) -> None: """Validate an RRset against a signature RRset, throwing an exception if none of the signatures validate. @@ -488,11 +573,17 @@ def _validate( use as the current time when validating. If ``None``, the actual current time is used. + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + Raises ``ValidationFailure`` if the signature is expired, not yet valid, the public key is invalid, the algorithm is unknown, the verification fails, etc. """ + if policy is None: + policy = default_policy + if isinstance(origin, str): origin = dns.name.from_text(origin, dns.name.root) @@ -517,7 +608,7 @@ def _validate( if not isinstance(rrsig, RRSIG): raise ValidationFailure("expected an RRSIG") try: - _validate_rrsig(rrset, rrsig, keys, origin, now) + _validate_rrsig(rrset, rrsig, keys, origin, now, policy) return except (ValidationFailure, UnsupportedAlgorithm): pass @@ -533,6 +624,7 @@ def _sign( expiration: Optional[Union[datetime, str, int, float]] = None, lifetime: Optional[int] = None, verify: bool = False, + policy: Optional[Policy] = None, ) -> RRSIG: """Sign RRset using private key. @@ -564,8 +656,18 @@ def _sign( *verify*, a ``bool``. If set to ``True``, the signer will verify signatures after they are created; the default is ``False``. + + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + + Raises ``DeniedByPolicy`` if the signature is denied by policy. """ + if policy is None: + policy = default_policy + if not policy.ok_to_sign(dnskey): + raise DeniedByPolicy + if isinstance(rrset, tuple): rdclass = rrset[1].rdclass rdtype = rrset[1].rdtype |