diff options
author | Jakob Schlyter <jakob@kirei.se> | 2023-03-21 02:14:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-20 18:14:59 -0700 |
commit | 454d21c232b1531a0a4565a66f6def25e5cfd95a (patch) | |
tree | 601255b3c67e8d471239c868f05be34c36443284 | |
parent | 8231eaabac049e74298856c957645dac6d196dc8 (diff) | |
download | dnspython-454d21c232b1531a0a4565a66f6def25e5cfd95a.tar.gz |
Zone signer (#911)
* first cut at NSEC support
* use transactions, fix delegations
* rename to add_nsec_to_zone
* optimize NSEC generation
* split out function to get all secure names (could be useful for NSEC3 later)
* add `Bitmap.from_rdtypes()` and add missing typing
* more typing
* add missing import
* add more typing
* fix tok type
* remove _get_secure_names, optimize
* better zone testing (compare as text)
add test example with delegation below other delegation
* include NSEC itself in the bitmap
* lint
* Add names iteration to transactions via iterate_names().
Also make rdataset iteration more obvious by adding an
explicit iterate_rdatasets() API.
* use iterate_names()
* typo
* black
* use single iteration
* better type fix
* add optional transaction to add_nsec_to_zone
* idea for zone signer
* do not sign RRSIGs
* fix signer
* correctly sign DS
* simplify
* simplify by passing rrset to signer
* fix typing
* nit
* add DS
* add more test
* rewrite zone signer
* compact
* simplify
* make easier to read
* bring back rrset_signer
* move default RRset signer
* more
* more
* prettier context handling (mypy issue pending)
* make NSEC zone signer less complex
* update
* fix txn, sign as defined by SEP
* docs
* add back missing dnskey_include
* rename dnskey_include to add_dnskey
* check KSK/ZSK key tags in signed zone
---------
Co-authored-by: Bob Halley <halley@dnspython.org>
-rw-r--r-- | dns/dnssec.py | 227 | ||||
-rw-r--r-- | tests/test_dnssec.py | 125 |
2 files changed, 351 insertions, 1 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py index 5dc2622..74ab4c2 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -17,8 +17,11 @@ """Common DNSSEC-related functions and constants.""" -from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union +from typing import Any, cast, Callable, Dict, List, Optional, Set, Tuple, Union + +import contextlib +import functools import hashlib import math import struct @@ -36,10 +39,14 @@ import dns.rdata import dns.rdatatype import dns.rdataclass import dns.rrset +import dns.transaction +import dns.zone from dns.rdtypes.ANY.CDNSKEY import CDNSKEY from dns.rdtypes.ANY.CDS import CDS from dns.rdtypes.ANY.DNSKEY import DNSKEY from dns.rdtypes.ANY.DS import DS +from dns.rdtypes.ANY.NSEC import NSEC, Bitmap +from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime from dns.rdtypes.dnskeybase import Flag @@ -74,6 +81,8 @@ PrivateKey = Union[ "ed448.Ed448PrivateKey", ] +RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None] + def algorithm_from_text(text: str) -> Algorithm: """Convert text into a DNSSEC algorithm value. @@ -1216,6 +1225,222 @@ def dnskey_rdataset_to_cdnskey_rdataset( return dns.rdataset.from_rdata_list(rdataset.ttl, res) +def default_rrset_signer( + txn: dns.transaction.Transaction, + rrset: dns.rrset.RRset, + signer: dns.name.Name, + ksks: List[Tuple[PrivateKey, DNSKEY]], + zsks: List[Tuple[PrivateKey, DNSKEY]], + inception: Optional[Union[datetime, str, int, float]] = None, + expiration: Optional[Union[datetime, str, int, float]] = None, + lifetime: Optional[int] = None, +) -> None: + """Default RRset signer""" + + if rrset.rdtype in set( + [ + dns.rdatatype.RdataType.DNSKEY, + dns.rdatatype.RdataType.CDS, + dns.rdatatype.RdataType.CDNSKEY, + ] + ): + keys = ksks + else: + keys = zsks + + for (private_key, dnskey) in keys: + rrsig = dns.dnssec.sign( + rrset=rrset, + private_key=private_key, + dnskey=dnskey, + inception=inception, + expiration=expiration, + lifetime=lifetime, + signer=signer, + ) + txn.add(rrset.name, rrset.ttl, rrsig) + + +def sign_zone( + zone: dns.zone.Zone, + txn: Optional[dns.transaction.Transaction] = None, + keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None, + add_dnskey: bool = True, + dnskey_ttl: Optional[int] = None, + inception: Optional[Union[datetime, str, int, float]] = None, + expiration: Optional[Union[datetime, str, int, float]] = None, + lifetime: Optional[int] = None, + nsec3: Optional[NSEC3PARAM] = None, + rrset_signer: Optional[RRsetSigner] = None, +) -> None: + """Sign zone. + + *zone*, a ``dns.zone.Zone``, the zone to sign. + + *txn*, a ``dns.transaction.Transaction``, an optional transaction to use + for signing. + + *keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing. + KSK/ZSK roles are assigned automatically if the SEP flag is used, otherwise + all RRsets are signed by all keys. + + *add_dnskey*, a ``bool``. If ``True``, the default, all specified + DNSKEYs are automatically added to the zone on signing. + + *dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified + the TTL of the existing DNSKEY RRset used or the TTL of the SOA RRset. + + *inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the + signature inception time. If ``None``, the current time is used. If a ``str``, the + format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX + epoch in text form; this is the same the RRSIG rdata's text form. + Values of type `int` or `float` are interpreted as seconds since the UNIX epoch. + + *expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature + expiration time. If ``None``, the expiration time will be the inception time plus + the value of the *lifetime* parameter. See the description of *inception* above + for how the various parameter types are interpreted. + + *lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This + parameter is only meaningful if *expiration* is ``None``. + + *nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet implemented. + + *rrset_signer*, a ``Callable``, an optional function for signing RRsets. + The function requires two arguments: transaction and RRset. If the not + specified, ``dns.dnssec.default_rrset_signer`` will be used. + + Returns ``None``. + """ + + ksks = [] + zsks = [] + + # if we have both KSKs and ZSKs, split by SEP flag. if not, sign all + # records with all keys + if keys: + for key in keys: + if key[1].flags & Flag.SEP: + ksks.append(key) + else: + zsks.append(key) + if not ksks: + ksks = keys + if not zsks: + zsks = keys + else: + keys = [] + + if txn: + cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn) + else: + cm = zone.writer() + + with cm as _txn: + if add_dnskey: + if dnskey_ttl is None: + dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY) + if dnskey: + dnskey_ttl = dnskey.ttl + else: + soa = _txn.get(zone.origin, dns.rdatatype.SOA) + dnskey_ttl = soa.ttl + for (_, dnskey) in keys: + _txn.add(zone.origin, dnskey_ttl, dnskey) + + if nsec3: + raise NotImplementedError("Signing with NSEC3 not yet implemented") + else: + _rrset_signer = rrset_signer or functools.partial( + default_rrset_signer, + signer=zone.origin, + ksks=ksks, + zsks=zsks, + inception=inception, + expiration=expiration, + lifetime=lifetime, + ) + return _sign_zone_nsec(zone, _txn, _rrset_signer) + + +def _sign_zone_nsec( + zone: dns.zone.Zone, + txn: dns.transaction.Transaction, + rrset_signer: Optional[RRsetSigner] = None, +) -> None: + """NSEC zone signer""" + + def _txn_add_nsec( + txn: dns.transaction.Transaction, + name: dns.name.Name, + next_secure: Optional[dns.name.Name], + rdclass: dns.rdataclass.RdataClass, + ttl: int, + rrset_signer: Optional[RRsetSigner] = None, + ) -> None: + """NSEC zone signer helper""" + mandatory_types = set( + [dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC] + ) + node = txn.get_node(name) + if node and next_secure: + types = ( + set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types + ) + windows = Bitmap.from_rdtypes(list(types)) + rrset = dns.rrset.from_rdata( + name, + ttl, + NSEC( + rdclass=rdclass, + rdtype=dns.rdatatype.RdataType.NSEC, + next=next_secure, + windows=windows, + ), + ) + txn.add(rrset) + if rrset_signer: + rrset_signer(txn, rrset) + + rrsig_ttl = zone.get_soa().minimum + delegation = None + last_secure = None + + for name in sorted(txn.iterate_names()): + if delegation and name.is_subdomain(delegation): + # names below delegations are not secure + continue + elif txn.get(name, dns.rdatatype.NS) and name != zone.origin: + # inside delegation + delegation = name + else: + # outside delegation + delegation = None + + if rrset_signer: + node = txn.get_node(name) + if node: + for rdataset in node.rdatasets: + if rdataset.rdtype == dns.rdatatype.RRSIG: + # do not sign RRSIGs + continue + elif delegation and rdataset.rdtype != dns.rdatatype.DS: + # do not sign delegations except DS records + continue + else: + rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset) + rrset_signer(txn, rrset) + + if last_secure: + _txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer) + last_secure = name + + if last_secure: + _txn_add_nsec( + txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer + ) + + def _need_pyca(*args, **kwargs): raise ImportError( "DNSSEC validation requires " + "python cryptography" diff --git a/tests/test_dnssec.py b/tests/test_dnssec.py index f52f980..3074741 100644 --- a/tests/test_dnssec.py +++ b/tests/test_dnssec.py @@ -18,6 +18,7 @@ from datetime import datetime, timedelta, timezone from typing import Any +import functools import unittest import dns.dnssec @@ -30,6 +31,9 @@ import dns.rdtypes.ANY.CDS import dns.rdtypes.ANY.DNSKEY import dns.rdtypes.ANY.DS import dns.rrset +import dns.zone + +from dns.rdtypes.dnskeybase import Flag from .keys import test_dnskeys @@ -580,6 +584,58 @@ fake_gost_ns_rrsig = dns.rrset.from_text( " SXTV9hCLVFWU4PS+/fxxfOHCetsY5tWWSxZi zSHfgpGfsHWzQoAamag4XYDyykc=", ) +test_zone_sans_nsec = """ +example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5 +example. 3600 IN NS ns1.example. +example. 3600 IN NS ns2.example. +bar.foo.example. 3600 IN MX 0 blaz.foo.example. +ns1.example. 3600 IN A 10.0.0.1 +ns2.example. 3600 IN A 10.0.0.2 +sub.example. 3600 IN NS ns1.example. +sub.example. 3600 IN NS ns2.example. +sub.example. 3600 IN NS ns3.sub.example. +sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F +sub.sub.example. 3600 IN NS ns3.sub.example. +ns3.sub.example. 3600 IN A 10.0.0.3 +""" + +test_zone_rrsigs = set( + [ + ("example.", dns.rdatatype.DNSKEY), + ("example.", dns.rdatatype.NS), + ("example.", dns.rdatatype.NSEC), + ("example.", dns.rdatatype.SOA), + ("bar.foo.example.", dns.rdatatype.MX), + ("bar.foo.example.", dns.rdatatype.NSEC), + ("ns1.example.", dns.rdatatype.A), + ("ns1.example.", dns.rdatatype.NSEC), + ("ns2.example.", dns.rdatatype.A), + ("ns2.example.", dns.rdatatype.NSEC), + ("sub.example.", dns.rdatatype.DS), + ("sub.example.", dns.rdatatype.NSEC), + ] +) + +test_zone_with_nsec = """ +example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5 +example. 3600 IN NS ns1.example. +example. 3600 IN NS ns2.example. +example. 5 IN NSEC bar.foo.example. NS NSEC SOA RRSIG +bar.foo.example. 3600 IN MX 0 blaz.foo.example. +bar.foo.example. 5 IN NSEC ns1.example. MX NSEC RRSIG +ns1.example. 3600 IN A 10.0.0.1 +ns1.example. 5 IN NSEC ns2.example. A NSEC RRSIG +ns2.example. 3600 IN A 10.0.0.2 +ns2.example. 5 IN NSEC sub.example. A NSEC RRSIG +sub.example. 3600 IN NS ns1.example. +sub.example. 3600 IN NS ns2.example. +sub.example. 3600 IN NS ns3.sub.example. +sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F +sub.example. 5 IN NSEC example. DS NS NSEC RRSIG +sub.sub.example. 3600 IN NS ns3.sub.example. +ns3.sub.example. 3600 IN A 10.0.0.3 +""" + @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported") class DNSSECValidatorTestCase(unittest.TestCase): @@ -880,6 +936,75 @@ class DNSSECMiscTestCase(unittest.TestCase): ts = dns.dnssec.to_timestamp(441812220) self.assertEqual(ts, REFERENCE_TIMESTAMP) + def test_sign_zone(self): + zone = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False) + + algorithm = dns.dnssec.Algorithm.ED25519 + lifetime = 3600 + + ksk_private_key = ed25519.Ed25519PrivateKey.generate() + ksk_dnskey = dns.dnssec.make_dnskey( + public_key=ksk_private_key.public_key(), + algorithm=algorithm, + flags=Flag.ZONE | Flag.SEP, + ) + + zsk_private_key = ed25519.Ed25519PrivateKey.generate() + zsk_dnskey = dns.dnssec.make_dnskey( + public_key=zsk_private_key.public_key(), + algorithm=algorithm, + flags=Flag.ZONE, + ) + + keys = [(ksk_private_key, ksk_dnskey), (zsk_private_key, zsk_dnskey)] + + with zone.writer() as txn: + dns.dnssec.sign_zone( + zone=zone, + txn=txn, + keys=keys, + lifetime=lifetime, + ) + + rrsigs = set( + [ + (str(name), rdataset.covers) + for (name, rdataset) in zone.iterate_rdatasets() + if rdataset.rdtype == dns.rdatatype.RRSIG + ] + ) + self.assertEqual(rrsigs, test_zone_rrsigs) + + signers = set( + [ + (str(name), rdataset.covers, rdataset[0].key_tag) + for (name, rdataset) in zone.iterate_rdatasets() + if rdataset.rdtype == dns.rdatatype.RRSIG + ] + ) + for name, covers, key_tag in signers: + if covers in [ + dns.rdatatype.DNSKEY, + dns.rdatatype.CDNSKEY, + dns.rdatatype.CDS, + ]: + self.assertEqual(key_tag, dns.dnssec.key_id(ksk_dnskey)) + else: + self.assertEqual(key_tag, dns.dnssec.key_id(zsk_dnskey)) + + def test_sign_zone_nsec_null_signer(self): + def rrset_signer( + txn: dns.transaction.Transaction, + rrset: dns.rrset.RRset, + ) -> None: + pass + + zone1 = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False) + dns.dnssec.sign_zone(zone1, rrset_signer=rrset_signer) + + zone2 = dns.zone.from_text(test_zone_with_nsec, "example.", relativize=False) + self.assertEqual(zone1.to_text(), zone2.to_text()) + class DNSSECMakeDSTestCase(unittest.TestCase): def testMnemonicParser(self): |