summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Schlyter <jakob@kirei.se>2023-03-21 02:14:59 +0100
committerGitHub <noreply@github.com>2023-03-20 18:14:59 -0700
commit454d21c232b1531a0a4565a66f6def25e5cfd95a (patch)
tree601255b3c67e8d471239c868f05be34c36443284
parent8231eaabac049e74298856c957645dac6d196dc8 (diff)
downloaddnspython-454d21c232b1531a0a4565a66f6def25e5cfd95a.tar.gz
Zone signer (#911)
* first cut at NSEC support * use transactions, fix delegations * rename to add_nsec_to_zone * optimize NSEC generation * split out function to get all secure names (could be useful for NSEC3 later) * add `Bitmap.from_rdtypes()` and add missing typing * more typing * add missing import * add more typing * fix tok type * remove _get_secure_names, optimize * better zone testing (compare as text) add test example with delegation below other delegation * include NSEC itself in the bitmap * lint * Add names iteration to transactions via iterate_names(). Also make rdataset iteration more obvious by adding an explicit iterate_rdatasets() API. * use iterate_names() * typo * black * use single iteration * better type fix * add optional transaction to add_nsec_to_zone * idea for zone signer * do not sign RRSIGs * fix signer * correctly sign DS * simplify * simplify by passing rrset to signer * fix typing * nit * add DS * add more test * rewrite zone signer * compact * simplify * make easier to read * bring back rrset_signer * move default RRset signer * more * more * prettier context handling (mypy issue pending) * make NSEC zone signer less complex * update * fix txn, sign as defined by SEP * docs * add back missing dnskey_include * rename dnskey_include to add_dnskey * check KSK/ZSK key tags in signed zone --------- Co-authored-by: Bob Halley <halley@dnspython.org>
-rw-r--r--dns/dnssec.py227
-rw-r--r--tests/test_dnssec.py125
2 files changed, 351 insertions, 1 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py
index 5dc2622..74ab4c2 100644
--- a/dns/dnssec.py
+++ b/dns/dnssec.py
@@ -17,8 +17,11 @@
"""Common DNSSEC-related functions and constants."""
-from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, cast, Callable, Dict, List, Optional, Set, Tuple, Union
+
+import contextlib
+import functools
import hashlib
import math
import struct
@@ -36,10 +39,14 @@ import dns.rdata
import dns.rdatatype
import dns.rdataclass
import dns.rrset
+import dns.transaction
+import dns.zone
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
from dns.rdtypes.ANY.CDS import CDS
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.ANY.DS import DS
+from dns.rdtypes.ANY.NSEC import NSEC, Bitmap
+from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
from dns.rdtypes.dnskeybase import Flag
@@ -74,6 +81,8 @@ PrivateKey = Union[
"ed448.Ed448PrivateKey",
]
+RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None]
+
def algorithm_from_text(text: str) -> Algorithm:
"""Convert text into a DNSSEC algorithm value.
@@ -1216,6 +1225,222 @@ def dnskey_rdataset_to_cdnskey_rdataset(
return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+def default_rrset_signer(
+ txn: dns.transaction.Transaction,
+ rrset: dns.rrset.RRset,
+ signer: dns.name.Name,
+ ksks: List[Tuple[PrivateKey, DNSKEY]],
+ zsks: List[Tuple[PrivateKey, DNSKEY]],
+ inception: Optional[Union[datetime, str, int, float]] = None,
+ expiration: Optional[Union[datetime, str, int, float]] = None,
+ lifetime: Optional[int] = None,
+) -> None:
+ """Default RRset signer"""
+
+ if rrset.rdtype in set(
+ [
+ dns.rdatatype.RdataType.DNSKEY,
+ dns.rdatatype.RdataType.CDS,
+ dns.rdatatype.RdataType.CDNSKEY,
+ ]
+ ):
+ keys = ksks
+ else:
+ keys = zsks
+
+ for (private_key, dnskey) in keys:
+ rrsig = dns.dnssec.sign(
+ rrset=rrset,
+ private_key=private_key,
+ dnskey=dnskey,
+ inception=inception,
+ expiration=expiration,
+ lifetime=lifetime,
+ signer=signer,
+ )
+ txn.add(rrset.name, rrset.ttl, rrsig)
+
+
+def sign_zone(
+ zone: dns.zone.Zone,
+ txn: Optional[dns.transaction.Transaction] = None,
+ keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None,
+ add_dnskey: bool = True,
+ dnskey_ttl: Optional[int] = None,
+ inception: Optional[Union[datetime, str, int, float]] = None,
+ expiration: Optional[Union[datetime, str, int, float]] = None,
+ lifetime: Optional[int] = None,
+ nsec3: Optional[NSEC3PARAM] = None,
+ rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+ """Sign zone.
+
+ *zone*, a ``dns.zone.Zone``, the zone to sign.
+
+ *txn*, a ``dns.transaction.Transaction``, an optional transaction to use
+ for signing.
+
+ *keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing.
+ KSK/ZSK roles are assigned automatically if the SEP flag is used, otherwise
+ all RRsets are signed by all keys.
+
+ *add_dnskey*, a ``bool``. If ``True``, the default, all specified
+ DNSKEYs are automatically added to the zone on signing.
+
+ *dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified
+ the TTL of the existing DNSKEY RRset used or the TTL of the SOA RRset.
+
+ *inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the
+ signature inception time. If ``None``, the current time is used. If a ``str``, the
+ format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX
+ epoch in text form; this is the same the RRSIG rdata's text form.
+ Values of type `int` or `float` are interpreted as seconds since the UNIX epoch.
+
+ *expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
+ expiration time. If ``None``, the expiration time will be the inception time plus
+ the value of the *lifetime* parameter. See the description of *inception* above
+ for how the various parameter types are interpreted.
+
+ *lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This
+ parameter is only meaningful if *expiration* is ``None``.
+
+ *nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet implemented.
+
+ *rrset_signer*, a ``Callable``, an optional function for signing RRsets.
+ The function requires two arguments: transaction and RRset. If the not
+ specified, ``dns.dnssec.default_rrset_signer`` will be used.
+
+ Returns ``None``.
+ """
+
+ ksks = []
+ zsks = []
+
+ # if we have both KSKs and ZSKs, split by SEP flag. if not, sign all
+ # records with all keys
+ if keys:
+ for key in keys:
+ if key[1].flags & Flag.SEP:
+ ksks.append(key)
+ else:
+ zsks.append(key)
+ if not ksks:
+ ksks = keys
+ if not zsks:
+ zsks = keys
+ else:
+ keys = []
+
+ if txn:
+ cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn)
+ else:
+ cm = zone.writer()
+
+ with cm as _txn:
+ if add_dnskey:
+ if dnskey_ttl is None:
+ dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY)
+ if dnskey:
+ dnskey_ttl = dnskey.ttl
+ else:
+ soa = _txn.get(zone.origin, dns.rdatatype.SOA)
+ dnskey_ttl = soa.ttl
+ for (_, dnskey) in keys:
+ _txn.add(zone.origin, dnskey_ttl, dnskey)
+
+ if nsec3:
+ raise NotImplementedError("Signing with NSEC3 not yet implemented")
+ else:
+ _rrset_signer = rrset_signer or functools.partial(
+ default_rrset_signer,
+ signer=zone.origin,
+ ksks=ksks,
+ zsks=zsks,
+ inception=inception,
+ expiration=expiration,
+ lifetime=lifetime,
+ )
+ return _sign_zone_nsec(zone, _txn, _rrset_signer)
+
+
+def _sign_zone_nsec(
+ zone: dns.zone.Zone,
+ txn: dns.transaction.Transaction,
+ rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+ """NSEC zone signer"""
+
+ def _txn_add_nsec(
+ txn: dns.transaction.Transaction,
+ name: dns.name.Name,
+ next_secure: Optional[dns.name.Name],
+ rdclass: dns.rdataclass.RdataClass,
+ ttl: int,
+ rrset_signer: Optional[RRsetSigner] = None,
+ ) -> None:
+ """NSEC zone signer helper"""
+ mandatory_types = set(
+ [dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC]
+ )
+ node = txn.get_node(name)
+ if node and next_secure:
+ types = (
+ set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types
+ )
+ windows = Bitmap.from_rdtypes(list(types))
+ rrset = dns.rrset.from_rdata(
+ name,
+ ttl,
+ NSEC(
+ rdclass=rdclass,
+ rdtype=dns.rdatatype.RdataType.NSEC,
+ next=next_secure,
+ windows=windows,
+ ),
+ )
+ txn.add(rrset)
+ if rrset_signer:
+ rrset_signer(txn, rrset)
+
+ rrsig_ttl = zone.get_soa().minimum
+ delegation = None
+ last_secure = None
+
+ for name in sorted(txn.iterate_names()):
+ if delegation and name.is_subdomain(delegation):
+ # names below delegations are not secure
+ continue
+ elif txn.get(name, dns.rdatatype.NS) and name != zone.origin:
+ # inside delegation
+ delegation = name
+ else:
+ # outside delegation
+ delegation = None
+
+ if rrset_signer:
+ node = txn.get_node(name)
+ if node:
+ for rdataset in node.rdatasets:
+ if rdataset.rdtype == dns.rdatatype.RRSIG:
+ # do not sign RRSIGs
+ continue
+ elif delegation and rdataset.rdtype != dns.rdatatype.DS:
+ # do not sign delegations except DS records
+ continue
+ else:
+ rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset)
+ rrset_signer(txn, rrset)
+
+ if last_secure:
+ _txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer)
+ last_secure = name
+
+ if last_secure:
+ _txn_add_nsec(
+ txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer
+ )
+
+
def _need_pyca(*args, **kwargs):
raise ImportError(
"DNSSEC validation requires " + "python cryptography"
diff --git a/tests/test_dnssec.py b/tests/test_dnssec.py
index f52f980..3074741 100644
--- a/tests/test_dnssec.py
+++ b/tests/test_dnssec.py
@@ -18,6 +18,7 @@
from datetime import datetime, timedelta, timezone
from typing import Any
+import functools
import unittest
import dns.dnssec
@@ -30,6 +31,9 @@ import dns.rdtypes.ANY.CDS
import dns.rdtypes.ANY.DNSKEY
import dns.rdtypes.ANY.DS
import dns.rrset
+import dns.zone
+
+from dns.rdtypes.dnskeybase import Flag
from .keys import test_dnskeys
@@ -580,6 +584,58 @@ fake_gost_ns_rrsig = dns.rrset.from_text(
" SXTV9hCLVFWU4PS+/fxxfOHCetsY5tWWSxZi zSHfgpGfsHWzQoAamag4XYDyykc=",
)
+test_zone_sans_nsec = """
+example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+bar.foo.example. 3600 IN MX 0 blaz.foo.example.
+ns1.example. 3600 IN A 10.0.0.1
+ns2.example. 3600 IN A 10.0.0.2
+sub.example. 3600 IN NS ns1.example.
+sub.example. 3600 IN NS ns2.example.
+sub.example. 3600 IN NS ns3.sub.example.
+sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F
+sub.sub.example. 3600 IN NS ns3.sub.example.
+ns3.sub.example. 3600 IN A 10.0.0.3
+"""
+
+test_zone_rrsigs = set(
+ [
+ ("example.", dns.rdatatype.DNSKEY),
+ ("example.", dns.rdatatype.NS),
+ ("example.", dns.rdatatype.NSEC),
+ ("example.", dns.rdatatype.SOA),
+ ("bar.foo.example.", dns.rdatatype.MX),
+ ("bar.foo.example.", dns.rdatatype.NSEC),
+ ("ns1.example.", dns.rdatatype.A),
+ ("ns1.example.", dns.rdatatype.NSEC),
+ ("ns2.example.", dns.rdatatype.A),
+ ("ns2.example.", dns.rdatatype.NSEC),
+ ("sub.example.", dns.rdatatype.DS),
+ ("sub.example.", dns.rdatatype.NSEC),
+ ]
+)
+
+test_zone_with_nsec = """
+example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+example. 5 IN NSEC bar.foo.example. NS NSEC SOA RRSIG
+bar.foo.example. 3600 IN MX 0 blaz.foo.example.
+bar.foo.example. 5 IN NSEC ns1.example. MX NSEC RRSIG
+ns1.example. 3600 IN A 10.0.0.1
+ns1.example. 5 IN NSEC ns2.example. A NSEC RRSIG
+ns2.example. 3600 IN A 10.0.0.2
+ns2.example. 5 IN NSEC sub.example. A NSEC RRSIG
+sub.example. 3600 IN NS ns1.example.
+sub.example. 3600 IN NS ns2.example.
+sub.example. 3600 IN NS ns3.sub.example.
+sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F
+sub.example. 5 IN NSEC example. DS NS NSEC RRSIG
+sub.sub.example. 3600 IN NS ns3.sub.example.
+ns3.sub.example. 3600 IN A 10.0.0.3
+"""
+
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
class DNSSECValidatorTestCase(unittest.TestCase):
@@ -880,6 +936,75 @@ class DNSSECMiscTestCase(unittest.TestCase):
ts = dns.dnssec.to_timestamp(441812220)
self.assertEqual(ts, REFERENCE_TIMESTAMP)
+ def test_sign_zone(self):
+ zone = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False)
+
+ algorithm = dns.dnssec.Algorithm.ED25519
+ lifetime = 3600
+
+ ksk_private_key = ed25519.Ed25519PrivateKey.generate()
+ ksk_dnskey = dns.dnssec.make_dnskey(
+ public_key=ksk_private_key.public_key(),
+ algorithm=algorithm,
+ flags=Flag.ZONE | Flag.SEP,
+ )
+
+ zsk_private_key = ed25519.Ed25519PrivateKey.generate()
+ zsk_dnskey = dns.dnssec.make_dnskey(
+ public_key=zsk_private_key.public_key(),
+ algorithm=algorithm,
+ flags=Flag.ZONE,
+ )
+
+ keys = [(ksk_private_key, ksk_dnskey), (zsk_private_key, zsk_dnskey)]
+
+ with zone.writer() as txn:
+ dns.dnssec.sign_zone(
+ zone=zone,
+ txn=txn,
+ keys=keys,
+ lifetime=lifetime,
+ )
+
+ rrsigs = set(
+ [
+ (str(name), rdataset.covers)
+ for (name, rdataset) in zone.iterate_rdatasets()
+ if rdataset.rdtype == dns.rdatatype.RRSIG
+ ]
+ )
+ self.assertEqual(rrsigs, test_zone_rrsigs)
+
+ signers = set(
+ [
+ (str(name), rdataset.covers, rdataset[0].key_tag)
+ for (name, rdataset) in zone.iterate_rdatasets()
+ if rdataset.rdtype == dns.rdatatype.RRSIG
+ ]
+ )
+ for name, covers, key_tag in signers:
+ if covers in [
+ dns.rdatatype.DNSKEY,
+ dns.rdatatype.CDNSKEY,
+ dns.rdatatype.CDS,
+ ]:
+ self.assertEqual(key_tag, dns.dnssec.key_id(ksk_dnskey))
+ else:
+ self.assertEqual(key_tag, dns.dnssec.key_id(zsk_dnskey))
+
+ def test_sign_zone_nsec_null_signer(self):
+ def rrset_signer(
+ txn: dns.transaction.Transaction,
+ rrset: dns.rrset.RRset,
+ ) -> None:
+ pass
+
+ zone1 = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False)
+ dns.dnssec.sign_zone(zone1, rrset_signer=rrset_signer)
+
+ zone2 = dns.zone.from_text(test_zone_with_nsec, "example.", relativize=False)
+ self.assertEqual(zone1.to_text(), zone2.to_text())
+
class DNSSECMakeDSTestCase(unittest.TestCase):
def testMnemonicParser(self):