summaryrefslogtreecommitdiff
path: root/dns/dnssec.py
diff options
context:
space:
mode:
Diffstat (limited to 'dns/dnssec.py')
-rw-r--r--dns/dnssec.py106
1 files changed, 104 insertions, 2 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py
index 11f0701..4cfb75e 100644
--- a/dns/dnssec.py
+++ b/dns/dnssec.py
@@ -24,7 +24,7 @@ import math
import struct
import time
import base64
-from datetime import datetime, timezone
+from datetime import datetime
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
@@ -54,6 +54,10 @@ class ValidationFailure(dns.exception.DNSException):
"""The DNSSEC signature is invalid."""
+class DeniedByPolicy(dns.exception.DNSException):
+ """Denied by DNSSEC policy."""
+
+
PublicKey = Union[
"rsa.RSAPublicKey",
"ec.EllipticCurvePublicKey",
@@ -126,11 +130,64 @@ def key_id(key: DNSKEY) -> int:
return total & 0xFFFF
+class Policy:
+ def __init__(self):
+ pass
+
+ def ok_to_sign(self, _: DNSKEY) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_validate(self, _: DNSKEY) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_create_ds(self, _: DSDigest) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_validate_ds(self, _: DSDigest) -> bool: # pragma: no cover
+ return False
+
+
+class SimpleDeny(Policy):
+ def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds):
+ super().__init__()
+ self._deny_sign = deny_sign
+ self._deny_validate = deny_validate
+ self._deny_create_ds = deny_create_ds
+ self._deny_validate_ds = deny_validate_ds
+
+ def ok_to_sign(self, key: DNSKEY) -> bool:
+ return key.algorithm not in self._deny_sign
+
+ def ok_to_validate(self, key: DNSKEY) -> bool:
+ return key.algorithm not in self._deny_validate
+
+ def ok_to_create_ds(self, algorithm: DSDigest) -> bool:
+ return algorithm not in self._deny_create_ds
+
+ def ok_to_validate_ds(self, algorithm: DSDigest) -> bool:
+ return algorithm not in self._deny_validate_ds
+
+
+rfc_8624_policy = SimpleDeny(
+ {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST},
+ {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1},
+ {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST},
+ {DSDigest.NULL},
+)
+
+allow_all_policy = SimpleDeny(set(), set(), set(), set())
+
+
+default_policy = rfc_8624_policy
+
+
def make_ds(
name: Union[dns.name.Name, str],
key: dns.rdata.Rdata,
algorithm: Union[DSDigest, str],
origin: Optional[dns.name.Name] = None,
+ policy: Optional[Policy] = None,
+ validating: bool = False,
) -> DS:
"""Create a DS record for a DNSSEC key.
@@ -145,16 +202,34 @@ def make_ds(
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
then it will be made absolute using the specified origin.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+ *validating*, a ``bool``. If ``True``, then policy is checked in
+ validating mode, i.e. "Is it ok to validate using this digest algorithm?".
+ Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with
+ this digest algorithm?".
+
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+ Raises ``DeniedByPolicy`` if the algorithm is denied by policy.
+
Returns a ``dns.rdtypes.ANY.DS.DS``
"""
+ if policy is None:
+ policy = default_policy
try:
if isinstance(algorithm, str):
algorithm = DSDigest[algorithm.upper()]
except Exception:
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+ if validating:
+ check = policy.ok_to_validate_ds
+ else:
+ check = policy.ok_to_create_ds
+ if not check(algorithm):
+ raise DeniedByPolicy
if not isinstance(key, DNSKEY):
raise ValueError("key is not a DNSKEY")
if algorithm == DSDigest.SHA1:
@@ -388,6 +463,7 @@ def _validate_rrsig(
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
origin: Optional[dns.name.Name] = None,
now: Optional[float] = None,
+ policy: Optional[Policy] = None,
) -> None:
"""Validate an RRset against a single signature rdata, throwing an
exception if validation is not successful.
@@ -410,6 +486,9 @@ def _validate_rrsig(
use as the current time when validating. If ``None``, the actual current
time is used.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
Raises ``ValidationFailure`` if the signature is expired, not yet valid,
the public key is invalid, the algorithm is unknown, the verification
fails, etc.
@@ -418,6 +497,9 @@ def _validate_rrsig(
dnspython but not implemented.
"""
+ if policy is None:
+ policy = default_policy
+
candidate_keys = _find_candidate_keys(keys, rrsig)
if candidate_keys is None:
raise ValidationFailure("unknown key")
@@ -448,6 +530,8 @@ def _validate_rrsig(
chosen_hash = _make_hash(rrsig.algorithm)
for candidate_key in candidate_keys:
+ if not policy.ok_to_validate(candidate_key):
+ continue
try:
_validate_signature(sig, data, candidate_key, chosen_hash)
return
@@ -464,6 +548,7 @@ def _validate(
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
origin: Optional[dns.name.Name] = None,
now: Optional[float] = None,
+ policy: Optional[Policy] = None,
) -> None:
"""Validate an RRset against a signature RRset, throwing an exception
if none of the signatures validate.
@@ -488,11 +573,17 @@ def _validate(
use as the current time when validating. If ``None``, the actual current
time is used.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
Raises ``ValidationFailure`` if the signature is expired, not yet valid,
the public key is invalid, the algorithm is unknown, the verification
fails, etc.
"""
+ if policy is None:
+ policy = default_policy
+
if isinstance(origin, str):
origin = dns.name.from_text(origin, dns.name.root)
@@ -517,7 +608,7 @@ def _validate(
if not isinstance(rrsig, RRSIG):
raise ValidationFailure("expected an RRSIG")
try:
- _validate_rrsig(rrset, rrsig, keys, origin, now)
+ _validate_rrsig(rrset, rrsig, keys, origin, now, policy)
return
except (ValidationFailure, UnsupportedAlgorithm):
pass
@@ -533,6 +624,7 @@ def _sign(
expiration: Optional[Union[datetime, str, int, float]] = None,
lifetime: Optional[int] = None,
verify: bool = False,
+ policy: Optional[Policy] = None,
) -> RRSIG:
"""Sign RRset using private key.
@@ -564,8 +656,18 @@ def _sign(
*verify*, a ``bool``. If set to ``True``, the signer will verify signatures
after they are created; the default is ``False``.
+
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+ Raises ``DeniedByPolicy`` if the signature is denied by policy.
"""
+ if policy is None:
+ policy = default_policy
+ if not policy.ok_to_sign(dnskey):
+ raise DeniedByPolicy
+
if isinstance(rrset, tuple):
rdclass = rrset[1].rdclass
rdtype = rrset[1].rdtype