From 16b55fd8499bf11b150dd0f96fc246a477d936cf Mon Sep 17 00:00:00 2001 From: Bob Halley Date: Thu, 15 Dec 2022 06:22:27 -0800 Subject: DNSSEC policy. (#869) --- dns/dnssec.py | 106 ++++++++++++++++++++++++++++++++++++++++++++- dns/dnssectypes.py | 2 + tests/test_dnssec.py | 119 ++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 199 insertions(+), 28 deletions(-) diff --git a/dns/dnssec.py b/dns/dnssec.py index 11f0701..4cfb75e 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -24,7 +24,7 @@ import math import struct import time import base64 -from datetime import datetime, timezone +from datetime import datetime from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash @@ -54,6 +54,10 @@ class ValidationFailure(dns.exception.DNSException): """The DNSSEC signature is invalid.""" +class DeniedByPolicy(dns.exception.DNSException): + """Denied by DNSSEC policy.""" + + PublicKey = Union[ "rsa.RSAPublicKey", "ec.EllipticCurvePublicKey", @@ -126,11 +130,64 @@ def key_id(key: DNSKEY) -> int: return total & 0xFFFF +class Policy: + def __init__(self): + pass + + def ok_to_sign(self, _: DNSKEY) -> bool: # pragma: no cover + return False + + def ok_to_validate(self, _: DNSKEY) -> bool: # pragma: no cover + return False + + def ok_to_create_ds(self, _: DSDigest) -> bool: # pragma: no cover + return False + + def ok_to_validate_ds(self, _: DSDigest) -> bool: # pragma: no cover + return False + + +class SimpleDeny(Policy): + def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds): + super().__init__() + self._deny_sign = deny_sign + self._deny_validate = deny_validate + self._deny_create_ds = deny_create_ds + self._deny_validate_ds = deny_validate_ds + + def ok_to_sign(self, key: DNSKEY) -> bool: + return key.algorithm not in self._deny_sign + + def ok_to_validate(self, key: DNSKEY) -> bool: + return key.algorithm not in self._deny_validate + + def ok_to_create_ds(self, algorithm: DSDigest) -> bool: + return algorithm not in self._deny_create_ds + + def ok_to_validate_ds(self, algorithm: DSDigest) -> bool: + return algorithm not in self._deny_validate_ds + + +rfc_8624_policy = SimpleDeny( + {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST}, + {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1}, + {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST}, + {DSDigest.NULL}, +) + +allow_all_policy = SimpleDeny(set(), set(), set(), set()) + + +default_policy = rfc_8624_policy + + def make_ds( name: Union[dns.name.Name, str], key: dns.rdata.Rdata, algorithm: Union[DSDigest, str], origin: Optional[dns.name.Name] = None, + policy: Optional[Policy] = None, + validating: bool = False, ) -> DS: """Create a DS record for a DNSSEC key. @@ -145,16 +202,34 @@ def make_ds( *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, then it will be made absolute using the specified origin. + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + + *validating*, a ``bool``. If ``True``, then policy is checked in + validating mode, i.e. "Is it ok to validate using this digest algorithm?". + Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with + this digest algorithm?". + Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. + Raises ``DeniedByPolicy`` if the algorithm is denied by policy. + Returns a ``dns.rdtypes.ANY.DS.DS`` """ + if policy is None: + policy = default_policy try: if isinstance(algorithm, str): algorithm = DSDigest[algorithm.upper()] except Exception: raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) + if validating: + check = policy.ok_to_validate_ds + else: + check = policy.ok_to_create_ds + if not check(algorithm): + raise DeniedByPolicy if not isinstance(key, DNSKEY): raise ValueError("key is not a DNSKEY") if algorithm == DSDigest.SHA1: @@ -388,6 +463,7 @@ def _validate_rrsig( keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], origin: Optional[dns.name.Name] = None, now: Optional[float] = None, + policy: Optional[Policy] = None, ) -> None: """Validate an RRset against a single signature rdata, throwing an exception if validation is not successful. @@ -410,6 +486,9 @@ def _validate_rrsig( use as the current time when validating. If ``None``, the actual current time is used. + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + Raises ``ValidationFailure`` if the signature is expired, not yet valid, the public key is invalid, the algorithm is unknown, the verification fails, etc. @@ -418,6 +497,9 @@ def _validate_rrsig( dnspython but not implemented. """ + if policy is None: + policy = default_policy + candidate_keys = _find_candidate_keys(keys, rrsig) if candidate_keys is None: raise ValidationFailure("unknown key") @@ -448,6 +530,8 @@ def _validate_rrsig( chosen_hash = _make_hash(rrsig.algorithm) for candidate_key in candidate_keys: + if not policy.ok_to_validate(candidate_key): + continue try: _validate_signature(sig, data, candidate_key, chosen_hash) return @@ -464,6 +548,7 @@ def _validate( keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], origin: Optional[dns.name.Name] = None, now: Optional[float] = None, + policy: Optional[Policy] = None, ) -> None: """Validate an RRset against a signature RRset, throwing an exception if none of the signatures validate. @@ -488,11 +573,17 @@ def _validate( use as the current time when validating. If ``None``, the actual current time is used. + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + Raises ``ValidationFailure`` if the signature is expired, not yet valid, the public key is invalid, the algorithm is unknown, the verification fails, etc. """ + if policy is None: + policy = default_policy + if isinstance(origin, str): origin = dns.name.from_text(origin, dns.name.root) @@ -517,7 +608,7 @@ def _validate( if not isinstance(rrsig, RRSIG): raise ValidationFailure("expected an RRSIG") try: - _validate_rrsig(rrset, rrsig, keys, origin, now) + _validate_rrsig(rrset, rrsig, keys, origin, now, policy) return except (ValidationFailure, UnsupportedAlgorithm): pass @@ -533,6 +624,7 @@ def _sign( expiration: Optional[Union[datetime, str, int, float]] = None, lifetime: Optional[int] = None, verify: bool = False, + policy: Optional[Policy] = None, ) -> RRSIG: """Sign RRset using private key. @@ -564,8 +656,18 @@ def _sign( *verify*, a ``bool``. If set to ``True``, the signer will verify signatures after they are created; the default is ``False``. + + *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, + ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. + + Raises ``DeniedByPolicy`` if the signature is denied by policy. """ + if policy is None: + policy = default_policy + if not policy.ok_to_sign(dnskey): + raise DeniedByPolicy + if isinstance(rrset, tuple): rdclass = rrset[1].rdclass rdtype = rrset[1].rdtype diff --git a/dns/dnssectypes.py b/dns/dnssectypes.py index 2a74716..02131e0 100644 --- a/dns/dnssectypes.py +++ b/dns/dnssectypes.py @@ -50,8 +50,10 @@ class Algorithm(dns.enum.IntEnum): class DSDigest(dns.enum.IntEnum): """DNSSEC Delegation Signer Digest Algorithm""" + NULL = 0 SHA1 = 1 SHA256 = 2 + GOST = 3 SHA384 = 4 @classmethod diff --git a/tests/test_dnssec.py b/tests/test_dnssec.py index 9aed879..4a25cd2 100644 --- a/tests/test_dnssec.py +++ b/tests/test_dnssec.py @@ -576,7 +576,20 @@ fake_gost_ns_rrsig = dns.rrset.from_text( @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported") class DNSSECValidatorTestCase(unittest.TestCase): def testAbsoluteRSAMD5Good(self): # type: () -> None - dns.dnssec.validate(rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when) + dns.dnssec.validate( + rsamd5_ns, + rsamd5_ns_rrsig, + rsamd5_keys, + None, + rsamd5_when, + policy=dns.dnssec.allow_all_policy, + ) + + def testAbsoluteRSAMD5GoodDeniedByPolicy(self): # type: () -> None + with self.assertRaises(dns.dnssec.ValidationFailure): + dns.dnssec.validate( + rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when + ) def testRSAMD5Keyid(self): self.assertEqual(dns.dnssec.key_id(rsamd5_keys[abs_example][0]), 30239) @@ -610,12 +623,30 @@ class DNSSECValidatorTestCase(unittest.TestCase): self.assertRaises(dns.dnssec.ValidationFailure, bad) def testAbsoluteDSAGood(self): # type: () -> None - dns.dnssec.validate(abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2) + dns.dnssec.validate( + abs_dsa_soa, + abs_dsa_soa_rrsig, + abs_dsa_keys, + None, + when2, + policy=dns.dnssec.allow_all_policy, + ) + + def testAbsoluteDSAGoodDeniedByPolicy(self): # type: () -> None + with self.assertRaises(dns.dnssec.ValidationFailure): + dns.dnssec.validate( + abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2 + ) def testAbsoluteDSABad(self): # type: () -> None def bad(): # type: () -> None dns.dnssec.validate( - abs_other_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2 + abs_other_dsa_soa, + abs_dsa_soa_rrsig, + abs_dsa_keys, + None, + when2, + policy=dns.dnssec.allow_all_policy, ) self.assertRaises(dns.dnssec.ValidationFailure, bad) @@ -855,9 +886,39 @@ class DNSSECMakeDSTestCase(unittest.TestCase): def testMakeExampleSHA1DS(self): # type: () -> None algorithm: Any for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1): - ds = dns.dnssec.make_ds(abs_example, example_sep_key, algorithm) + ds = dns.dnssec.make_ds( + abs_example, + example_sep_key, + algorithm, + policy=dns.dnssec.allow_all_policy, + ) + self.assertEqual(ds, example_ds_sha1) + ds = dns.dnssec.make_ds( + "example.", + example_sep_key, + algorithm, + policy=dns.dnssec.allow_all_policy, + ) + self.assertEqual(ds, example_ds_sha1) + + def testMakeExampleSHA1DSValidationOkByPolicy(self): # type: () -> None + algorithm: Any + for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1): + ds = dns.dnssec.make_ds( + abs_example, + example_sep_key, + algorithm, + policy=dns.dnssec.allow_all_policy, + ) + self.assertEqual(ds, example_ds_sha1) + ds = dns.dnssec.make_ds( + "example.", example_sep_key, algorithm, validating=True + ) self.assertEqual(ds, example_ds_sha1) - ds = dns.dnssec.make_ds("example.", example_sep_key, algorithm) + + def testMakeExampleSHA1DSDeniedByPolicy(self): # type: () -> None + with self.assertRaises(dns.dnssec.DeniedByPolicy): + ds = dns.dnssec.make_ds(abs_example, example_sep_key, "SHA1") self.assertEqual(ds, example_ds_sha1) def testMakeExampleSHA256DS(self): # type: () -> None @@ -974,24 +1035,27 @@ class DNSSECMakeDNSKEYTestCase(unittest.TestCase): with self.assertRaises(ValueError): dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA) - def testRSALargeExponent(self): # type: () -> None - for key_size, public_exponent, dnskey_key_length in [ - (1024, 3, 130), - (1024, 65537, 132), - (2048, 3, 258), - (2048, 65537, 260), - (4096, 3, 514), - (4096, 65537, 516), - ]: - key = rsa.generate_private_key( - public_exponent=public_exponent, - key_size=key_size, - backend=default_backend(), - ) - dnskey = dns.dnssec.make_dnskey( - key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256 - ) - self.assertEqual(len(dnskey.key), dnskey_key_length) + # XXXRTH This test is fine but is noticably slow, so I have commented it out for + # now + + # def testRSALargeExponent(self): # type: () -> None + # for key_size, public_exponent, dnskey_key_length in [ + # (1024, 3, 130), + # (1024, 65537, 132), + # (2048, 3, 258), + # (2048, 65537, 260), + # (4096, 3, 514), + # (4096, 65537, 516), + # ]: + # key = rsa.generate_private_key( + # public_exponent=public_exponent, + # key_size=key_size, + # backend=default_backend(), + # ) + # dnskey = dns.dnssec.make_dnskey( + # key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256 + # ) + # self.assertEqual(len(dnskey.key), dnskey_key_length) @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported") @@ -1014,7 +1078,9 @@ class DNSSECSignatureTestCase(unittest.TestCase): def testSignatureDSA(self): # type: () -> None key = dsa.generate_private_key(key_size=1024) - self._test_signature(key, dns.dnssec.Algorithm.DSA, abs_soa) + self._test_signature( + key, dns.dnssec.Algorithm.DSA, abs_soa, policy=dns.dnssec.allow_all_policy + ) def testSignatureECDSAP256SHA256(self): # type: () -> None key = ec.generate_private_key(curve=ec.SECP256R1, backend=default_backend()) @@ -1039,7 +1105,7 @@ class DNSSECSignatureTestCase(unittest.TestCase): rrset = (name, rdataset) self._test_signature(key, dns.dnssec.Algorithm.ED448, rrset) - def _test_signature(self, key, algorithm, rrset, signer=None): # type: () -> None + def _test_signature(self, key, algorithm, rrset, signer=None, policy=None): ttl = 60 lifetime = 3600 if isinstance(rrset, tuple): @@ -1058,10 +1124,11 @@ class DNSSECSignatureTestCase(unittest.TestCase): lifetime=lifetime, signer=signer, verify=True, + policy=policy, ) keys = {signer: dnskey_rrset} rrsigset = dns.rrset.from_rdata(rrname, ttl, rrsig) - dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys) + dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys, policy=policy) if __name__ == "__main__": -- cgit v1.2.1