summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2022-12-15 06:22:27 -0800
committerGitHub <noreply@github.com>2022-12-15 06:22:27 -0800
commit16b55fd8499bf11b150dd0f96fc246a477d936cf (patch)
tree8c36138486d9e0afa29ccb0c9940659fdf8a4b38
parent7d5c69abfb57b395900c12b01eb423a5089a2040 (diff)
downloaddnspython-16b55fd8499bf11b150dd0f96fc246a477d936cf.tar.gz
DNSSEC policy. (#869)
-rw-r--r--dns/dnssec.py106
-rw-r--r--dns/dnssectypes.py2
-rw-r--r--tests/test_dnssec.py119
3 files changed, 199 insertions, 28 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py
index 11f0701..4cfb75e 100644
--- a/dns/dnssec.py
+++ b/dns/dnssec.py
@@ -24,7 +24,7 @@ import math
import struct
import time
import base64
-from datetime import datetime, timezone
+from datetime import datetime
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
@@ -54,6 +54,10 @@ class ValidationFailure(dns.exception.DNSException):
"""The DNSSEC signature is invalid."""
+class DeniedByPolicy(dns.exception.DNSException):
+ """Denied by DNSSEC policy."""
+
+
PublicKey = Union[
"rsa.RSAPublicKey",
"ec.EllipticCurvePublicKey",
@@ -126,11 +130,64 @@ def key_id(key: DNSKEY) -> int:
return total & 0xFFFF
+class Policy:
+ def __init__(self):
+ pass
+
+ def ok_to_sign(self, _: DNSKEY) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_validate(self, _: DNSKEY) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_create_ds(self, _: DSDigest) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_validate_ds(self, _: DSDigest) -> bool: # pragma: no cover
+ return False
+
+
+class SimpleDeny(Policy):
+ def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds):
+ super().__init__()
+ self._deny_sign = deny_sign
+ self._deny_validate = deny_validate
+ self._deny_create_ds = deny_create_ds
+ self._deny_validate_ds = deny_validate_ds
+
+ def ok_to_sign(self, key: DNSKEY) -> bool:
+ return key.algorithm not in self._deny_sign
+
+ def ok_to_validate(self, key: DNSKEY) -> bool:
+ return key.algorithm not in self._deny_validate
+
+ def ok_to_create_ds(self, algorithm: DSDigest) -> bool:
+ return algorithm not in self._deny_create_ds
+
+ def ok_to_validate_ds(self, algorithm: DSDigest) -> bool:
+ return algorithm not in self._deny_validate_ds
+
+
+rfc_8624_policy = SimpleDeny(
+ {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST},
+ {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1},
+ {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST},
+ {DSDigest.NULL},
+)
+
+allow_all_policy = SimpleDeny(set(), set(), set(), set())
+
+
+default_policy = rfc_8624_policy
+
+
def make_ds(
name: Union[dns.name.Name, str],
key: dns.rdata.Rdata,
algorithm: Union[DSDigest, str],
origin: Optional[dns.name.Name] = None,
+ policy: Optional[Policy] = None,
+ validating: bool = False,
) -> DS:
"""Create a DS record for a DNSSEC key.
@@ -145,16 +202,34 @@ def make_ds(
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
then it will be made absolute using the specified origin.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+ *validating*, a ``bool``. If ``True``, then policy is checked in
+ validating mode, i.e. "Is it ok to validate using this digest algorithm?".
+ Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with
+ this digest algorithm?".
+
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+ Raises ``DeniedByPolicy`` if the algorithm is denied by policy.
+
Returns a ``dns.rdtypes.ANY.DS.DS``
"""
+ if policy is None:
+ policy = default_policy
try:
if isinstance(algorithm, str):
algorithm = DSDigest[algorithm.upper()]
except Exception:
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+ if validating:
+ check = policy.ok_to_validate_ds
+ else:
+ check = policy.ok_to_create_ds
+ if not check(algorithm):
+ raise DeniedByPolicy
if not isinstance(key, DNSKEY):
raise ValueError("key is not a DNSKEY")
if algorithm == DSDigest.SHA1:
@@ -388,6 +463,7 @@ def _validate_rrsig(
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
origin: Optional[dns.name.Name] = None,
now: Optional[float] = None,
+ policy: Optional[Policy] = None,
) -> None:
"""Validate an RRset against a single signature rdata, throwing an
exception if validation is not successful.
@@ -410,6 +486,9 @@ def _validate_rrsig(
use as the current time when validating. If ``None``, the actual current
time is used.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
Raises ``ValidationFailure`` if the signature is expired, not yet valid,
the public key is invalid, the algorithm is unknown, the verification
fails, etc.
@@ -418,6 +497,9 @@ def _validate_rrsig(
dnspython but not implemented.
"""
+ if policy is None:
+ policy = default_policy
+
candidate_keys = _find_candidate_keys(keys, rrsig)
if candidate_keys is None:
raise ValidationFailure("unknown key")
@@ -448,6 +530,8 @@ def _validate_rrsig(
chosen_hash = _make_hash(rrsig.algorithm)
for candidate_key in candidate_keys:
+ if not policy.ok_to_validate(candidate_key):
+ continue
try:
_validate_signature(sig, data, candidate_key, chosen_hash)
return
@@ -464,6 +548,7 @@ def _validate(
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
origin: Optional[dns.name.Name] = None,
now: Optional[float] = None,
+ policy: Optional[Policy] = None,
) -> None:
"""Validate an RRset against a signature RRset, throwing an exception
if none of the signatures validate.
@@ -488,11 +573,17 @@ def _validate(
use as the current time when validating. If ``None``, the actual current
time is used.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
Raises ``ValidationFailure`` if the signature is expired, not yet valid,
the public key is invalid, the algorithm is unknown, the verification
fails, etc.
"""
+ if policy is None:
+ policy = default_policy
+
if isinstance(origin, str):
origin = dns.name.from_text(origin, dns.name.root)
@@ -517,7 +608,7 @@ def _validate(
if not isinstance(rrsig, RRSIG):
raise ValidationFailure("expected an RRSIG")
try:
- _validate_rrsig(rrset, rrsig, keys, origin, now)
+ _validate_rrsig(rrset, rrsig, keys, origin, now, policy)
return
except (ValidationFailure, UnsupportedAlgorithm):
pass
@@ -533,6 +624,7 @@ def _sign(
expiration: Optional[Union[datetime, str, int, float]] = None,
lifetime: Optional[int] = None,
verify: bool = False,
+ policy: Optional[Policy] = None,
) -> RRSIG:
"""Sign RRset using private key.
@@ -564,8 +656,18 @@ def _sign(
*verify*, a ``bool``. If set to ``True``, the signer will verify signatures
after they are created; the default is ``False``.
+
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+ Raises ``DeniedByPolicy`` if the signature is denied by policy.
"""
+ if policy is None:
+ policy = default_policy
+ if not policy.ok_to_sign(dnskey):
+ raise DeniedByPolicy
+
if isinstance(rrset, tuple):
rdclass = rrset[1].rdclass
rdtype = rrset[1].rdtype
diff --git a/dns/dnssectypes.py b/dns/dnssectypes.py
index 2a74716..02131e0 100644
--- a/dns/dnssectypes.py
+++ b/dns/dnssectypes.py
@@ -50,8 +50,10 @@ class Algorithm(dns.enum.IntEnum):
class DSDigest(dns.enum.IntEnum):
"""DNSSEC Delegation Signer Digest Algorithm"""
+ NULL = 0
SHA1 = 1
SHA256 = 2
+ GOST = 3
SHA384 = 4
@classmethod
diff --git a/tests/test_dnssec.py b/tests/test_dnssec.py
index 9aed879..4a25cd2 100644
--- a/tests/test_dnssec.py
+++ b/tests/test_dnssec.py
@@ -576,7 +576,20 @@ fake_gost_ns_rrsig = dns.rrset.from_text(
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
class DNSSECValidatorTestCase(unittest.TestCase):
def testAbsoluteRSAMD5Good(self): # type: () -> None
- dns.dnssec.validate(rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when)
+ dns.dnssec.validate(
+ rsamd5_ns,
+ rsamd5_ns_rrsig,
+ rsamd5_keys,
+ None,
+ rsamd5_when,
+ policy=dns.dnssec.allow_all_policy,
+ )
+
+ def testAbsoluteRSAMD5GoodDeniedByPolicy(self): # type: () -> None
+ with self.assertRaises(dns.dnssec.ValidationFailure):
+ dns.dnssec.validate(
+ rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when
+ )
def testRSAMD5Keyid(self):
self.assertEqual(dns.dnssec.key_id(rsamd5_keys[abs_example][0]), 30239)
@@ -610,12 +623,30 @@ class DNSSECValidatorTestCase(unittest.TestCase):
self.assertRaises(dns.dnssec.ValidationFailure, bad)
def testAbsoluteDSAGood(self): # type: () -> None
- dns.dnssec.validate(abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2)
+ dns.dnssec.validate(
+ abs_dsa_soa,
+ abs_dsa_soa_rrsig,
+ abs_dsa_keys,
+ None,
+ when2,
+ policy=dns.dnssec.allow_all_policy,
+ )
+
+ def testAbsoluteDSAGoodDeniedByPolicy(self): # type: () -> None
+ with self.assertRaises(dns.dnssec.ValidationFailure):
+ dns.dnssec.validate(
+ abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2
+ )
def testAbsoluteDSABad(self): # type: () -> None
def bad(): # type: () -> None
dns.dnssec.validate(
- abs_other_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2
+ abs_other_dsa_soa,
+ abs_dsa_soa_rrsig,
+ abs_dsa_keys,
+ None,
+ when2,
+ policy=dns.dnssec.allow_all_policy,
)
self.assertRaises(dns.dnssec.ValidationFailure, bad)
@@ -855,9 +886,39 @@ class DNSSECMakeDSTestCase(unittest.TestCase):
def testMakeExampleSHA1DS(self): # type: () -> None
algorithm: Any
for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1):
- ds = dns.dnssec.make_ds(abs_example, example_sep_key, algorithm)
+ ds = dns.dnssec.make_ds(
+ abs_example,
+ example_sep_key,
+ algorithm,
+ policy=dns.dnssec.allow_all_policy,
+ )
+ self.assertEqual(ds, example_ds_sha1)
+ ds = dns.dnssec.make_ds(
+ "example.",
+ example_sep_key,
+ algorithm,
+ policy=dns.dnssec.allow_all_policy,
+ )
+ self.assertEqual(ds, example_ds_sha1)
+
+ def testMakeExampleSHA1DSValidationOkByPolicy(self): # type: () -> None
+ algorithm: Any
+ for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1):
+ ds = dns.dnssec.make_ds(
+ abs_example,
+ example_sep_key,
+ algorithm,
+ policy=dns.dnssec.allow_all_policy,
+ )
+ self.assertEqual(ds, example_ds_sha1)
+ ds = dns.dnssec.make_ds(
+ "example.", example_sep_key, algorithm, validating=True
+ )
self.assertEqual(ds, example_ds_sha1)
- ds = dns.dnssec.make_ds("example.", example_sep_key, algorithm)
+
+ def testMakeExampleSHA1DSDeniedByPolicy(self): # type: () -> None
+ with self.assertRaises(dns.dnssec.DeniedByPolicy):
+ ds = dns.dnssec.make_ds(abs_example, example_sep_key, "SHA1")
self.assertEqual(ds, example_ds_sha1)
def testMakeExampleSHA256DS(self): # type: () -> None
@@ -974,24 +1035,27 @@ class DNSSECMakeDNSKEYTestCase(unittest.TestCase):
with self.assertRaises(ValueError):
dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA)
- def testRSALargeExponent(self): # type: () -> None
- for key_size, public_exponent, dnskey_key_length in [
- (1024, 3, 130),
- (1024, 65537, 132),
- (2048, 3, 258),
- (2048, 65537, 260),
- (4096, 3, 514),
- (4096, 65537, 516),
- ]:
- key = rsa.generate_private_key(
- public_exponent=public_exponent,
- key_size=key_size,
- backend=default_backend(),
- )
- dnskey = dns.dnssec.make_dnskey(
- key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256
- )
- self.assertEqual(len(dnskey.key), dnskey_key_length)
+ # XXXRTH This test is fine but is noticably slow, so I have commented it out for
+ # now
+
+ # def testRSALargeExponent(self): # type: () -> None
+ # for key_size, public_exponent, dnskey_key_length in [
+ # (1024, 3, 130),
+ # (1024, 65537, 132),
+ # (2048, 3, 258),
+ # (2048, 65537, 260),
+ # (4096, 3, 514),
+ # (4096, 65537, 516),
+ # ]:
+ # key = rsa.generate_private_key(
+ # public_exponent=public_exponent,
+ # key_size=key_size,
+ # backend=default_backend(),
+ # )
+ # dnskey = dns.dnssec.make_dnskey(
+ # key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256
+ # )
+ # self.assertEqual(len(dnskey.key), dnskey_key_length)
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
@@ -1014,7 +1078,9 @@ class DNSSECSignatureTestCase(unittest.TestCase):
def testSignatureDSA(self): # type: () -> None
key = dsa.generate_private_key(key_size=1024)
- self._test_signature(key, dns.dnssec.Algorithm.DSA, abs_soa)
+ self._test_signature(
+ key, dns.dnssec.Algorithm.DSA, abs_soa, policy=dns.dnssec.allow_all_policy
+ )
def testSignatureECDSAP256SHA256(self): # type: () -> None
key = ec.generate_private_key(curve=ec.SECP256R1, backend=default_backend())
@@ -1039,7 +1105,7 @@ class DNSSECSignatureTestCase(unittest.TestCase):
rrset = (name, rdataset)
self._test_signature(key, dns.dnssec.Algorithm.ED448, rrset)
- def _test_signature(self, key, algorithm, rrset, signer=None): # type: () -> None
+ def _test_signature(self, key, algorithm, rrset, signer=None, policy=None):
ttl = 60
lifetime = 3600
if isinstance(rrset, tuple):
@@ -1058,10 +1124,11 @@ class DNSSECSignatureTestCase(unittest.TestCase):
lifetime=lifetime,
signer=signer,
verify=True,
+ policy=policy,
)
keys = {signer: dnskey_rrset}
rrsigset = dns.rrset.from_rdata(rrname, ttl, rrsig)
- dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys)
+ dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys, policy=policy)
if __name__ == "__main__":