diff options
Diffstat (limited to 'dns/dnssec.py')
-rw-r--r-- | dns/dnssec.py | 419 |
1 files changed, 380 insertions, 39 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py index 13415bd..c4aff95 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -20,9 +20,11 @@ from typing import Any, cast, Dict, List, Optional, Tuple, Union import hashlib +import math import struct import time import base64 +from datetime import datetime, timezone from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash @@ -36,17 +38,37 @@ import dns.rdataclass import dns.rrset from dns.rdtypes.ANY.DNSKEY import DNSKEY from dns.rdtypes.ANY.DS import DS -from dns.rdtypes.ANY.RRSIG import RRSIG +from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime +from dns.rdtypes.dnskeybase import Flag class UnsupportedAlgorithm(dns.exception.DNSException): """The DNSSEC algorithm is not supported.""" +class AlgorithmKeyMismatch(UnsupportedAlgorithm): + """The DNSSEC algorithm is not supported for the given key type.""" + + class ValidationFailure(dns.exception.DNSException): """The DNSSEC signature is invalid.""" +PublicKey = Union[ + "rsa.RSAPublicKey", + "ec.EllipticCurvePublicKey", + "ed25519.Ed25519PublicKey", + "ed448.Ed448PublicKey", +] + +PrivateKey = Union[ + "rsa.RSAPrivateKey", + "ec.EllipticCurvePrivateKey", + "ed25519.Ed25519PrivateKey", + "ed448.Ed448PrivateKey", +] + + def algorithm_from_text(text: str) -> Algorithm: """Convert text into a DNSSEC algorithm value. @@ -69,6 +91,20 @@ def algorithm_to_text(value: Union[Algorithm, int]) -> str: return Algorithm.to_text(value) +def to_timestamp(value: Union[datetime, str, float, int]) -> int: + """Convert various format to a timestamp""" + if isinstance(value, datetime): + return int(value.timestamp()) + elif isinstance(value, str): + return sigtime_to_posixtime(value) + elif isinstance(value, float): + return int(value) + elif isinstance(value, int): + return value + else: + raise TypeError("Unsupported timestamp type") + + def key_id(key: DNSKEY) -> int: """Return the key id (a 16-bit number) for the specified key. @@ -213,6 +249,35 @@ def _is_sha512(algorithm: int) -> bool: return algorithm == Algorithm.RSASHA512 +def _ensure_algorithm_key_combination(algorithm: int, key: PublicKey) -> None: + """Ensure algorithm is valid for key type, throwing an exception on + mismatch.""" + if isinstance(key, rsa.RSAPublicKey): + if _is_rsa(algorithm): + return + raise AlgorithmKeyMismatch('algorithm "%s" not valid for RSA key' % algorithm) + if isinstance(key, dsa.DSAPublicKey): + if _is_dsa(algorithm): + return + raise AlgorithmKeyMismatch('algorithm "%s" not valid for DSA key' % algorithm) + if isinstance(key, ec.EllipticCurvePublicKey): + if _is_ecdsa(algorithm): + return + raise AlgorithmKeyMismatch('algorithm "%s" not valid for ECDSA key' % algorithm) + if isinstance(key, ed25519.Ed25519PublicKey): + if algorithm == Algorithm.ED25519: + return + raise AlgorithmKeyMismatch( + 'algorithm "%s" not valid for ED25519 key' % algorithm + ) + if isinstance(key, ed448.Ed448PublicKey): + if algorithm == Algorithm.ED448: + return + raise AlgorithmKeyMismatch('algorithm "%s" not valid for ED448 key' % algorithm) + + raise TypeError("unsupported key type") + + def _make_hash(algorithm: int) -> Any: if _is_md5(algorithm): return hashes.MD5() @@ -353,22 +418,10 @@ def _validate_rrsig( dnspython but not implemented. """ - if isinstance(origin, str): - origin = dns.name.from_text(origin, dns.name.root) - candidate_keys = _find_candidate_keys(keys, rrsig) if candidate_keys is None: raise ValidationFailure("unknown key") - # For convenience, allow the rrset to be specified as a (name, - # rdataset) tuple as well as a proper rrset - if isinstance(rrset, tuple): - rrname = rrset[0] - rdataset = rrset[1] - else: - rrname = rrset.name - rdataset = rrset - if now is None: now = time.time() if rrsig.expiration < now: @@ -391,31 +444,7 @@ def _validate_rrsig( else: sig = rrsig.signature - data = b"" - data += rrsig.to_wire(origin=origin)[:18] - data += rrsig.signer.to_digestable(origin) - - # Derelativize the name before considering labels. - if not rrname.is_absolute(): - if origin is None: - raise ValidationFailure("relative RR name without an origin specified") - rrname = rrname.derelativize(origin) - - if len(rrname) - 1 < rrsig.labels: - raise ValidationFailure("owner name longer than RRSIG labels") - elif rrsig.labels < len(rrname) - 1: - suffix = rrname.split(rrsig.labels + 1)[1] - rrname = dns.name.from_text("*", suffix) - rrnamebuf = rrname.to_digestable() - rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl) - rdatas = [rdata.to_digestable(origin) for rdata in rdataset] - for rdata in sorted(rdatas): - data += rrnamebuf - data += rrfixed - rrlen = struct.pack("!H", len(rdata)) - data += rrlen - data += rdata - + data = _make_rrsig_signature_data(rrset, rrsig, origin) chosen_hash = _make_hash(rrsig.algorithm) for candidate_key in candidate_keys: @@ -495,6 +524,314 @@ def _validate( raise ValidationFailure("no RRSIGs validated") +def _sign( + rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + private_key: PrivateKey, + signer: dns.name.Name, + dnskey: DNSKEY, + inception: Optional[Union[datetime, str, float]] = None, + expiration: Optional[Union[datetime, str, float]] = None, + lifetime: Optional[int] = None, + verify: bool = False, +) -> RRSIG: + """Sign RRset using private key. + + *rrset*, the RRset to validate. This can be a + ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) + tuple. + + *private_key*, the private key to use for signing, a + ``cryptography.hazmat.primitives.asymmetric`` private key class applicable + for DNSSEC. + + *signer*, a ``dns.name.Name``, the Signer's name. + + *dnskey*, a ``DNSKEY`` matching ``private_key``. + + *inception*, a ``datetime``, ``str``, or ``float``, signature inception; defaults to now. + + *expiration*, a ``datetime``, ``str`` or ``float``, signature expiration. May be specified as lifetime. + + *lifetime*, an ``int`` specifiying the signature lifetime in seconds. + + *verify*, a ``bool`` set to ``True`` if the signer should verify issued signaures. + """ + + if isinstance(rrset, tuple): + rdclass = rrset[1].rdclass + rdtype = rrset[1].rdtype + rrname = rrset[0] + original_ttl = rrset[1].ttl + else: + rdclass = rrset.rdclass + rdtype = rrset.rdtype + rrname = rrset.name + original_ttl = rrset.ttl + + if inception is not None: + rrsig_inception = to_timestamp(inception) + else: + rrsig_inception = int(time.time()) + + if expiration is not None: + rrsig_expiration = to_timestamp(expiration) + elif lifetime is not None: + rrsig_expiration = int(time.time()) + lifetime + else: + raise ValueError("expiration or lifetime must be specified") + + rrsig_template = RRSIG( + rdclass=rdclass, + rdtype=dns.rdatatype.RRSIG, + type_covered=rdtype, + algorithm=dnskey.algorithm, + labels=len(rrname) - 1, + original_ttl=original_ttl, + expiration=rrsig_expiration, + inception=rrsig_inception, + key_tag=key_id(dnskey), + signer=signer, + signature=b"", + ) + + data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template) + chosen_hash = _make_hash(rrsig_template.algorithm) + signature = None + + if isinstance(private_key, rsa.RSAPrivateKey): + if not _is_rsa(dnskey.algorithm): + raise ValueError("Invalid DNSKEY algorithm for RSA key") + signature = private_key.sign(data, padding.PKCS1v15(), chosen_hash) + if verify: + private_key.public_key().verify( + signature, data, padding.PKCS1v15(), chosen_hash + ) + elif isinstance(private_key, dsa.DSAPrivateKey): + if not _is_dsa(dnskey.algorithm): + raise ValueError("Invalid DNSKEY algorithm for DSA key") + public_dsa_key = private_key.public_key() + if public_dsa_key.key_size > 1024: + raise ValueError("DSA key size overflow") + der_signature = private_key.sign(data, chosen_hash) + if verify: + public_dsa_key.verify(der_signature, data, chosen_hash) + dsa_r, dsa_s = utils.decode_dss_signature(der_signature) + dsa_t = (public_dsa_key.key_size // 8 - 64) // 8 + octets = 20 + signature = ( + struct.pack("!B", dsa_t) + + int.to_bytes(dsa_r, length=octets, byteorder="big") + + int.to_bytes(dsa_s, length=octets, byteorder="big") + ) + elif isinstance(private_key, ec.EllipticCurvePrivateKey): + if not _is_ecdsa(dnskey.algorithm): + raise ValueError("Invalid DNSKEY algorithm for EC key") + der_signature = private_key.sign(data, ec.ECDSA(chosen_hash)) + if verify: + private_key.public_key().verify(der_signature, data, ec.ECDSA(chosen_hash)) + if dnskey.algorithm == Algorithm.ECDSAP256SHA256: + octets = 32 + else: + octets = 48 + dsa_r, dsa_s = utils.decode_dss_signature(der_signature) + signature = int.to_bytes(dsa_r, length=octets, byteorder="big") + int.to_bytes( + dsa_s, length=octets, byteorder="big" + ) + elif isinstance(private_key, ed25519.Ed25519PrivateKey): + if dnskey.algorithm != Algorithm.ED25519: + raise ValueError("Invalid DNSKEY algorithm for ED25519 key") + signature = private_key.sign(data) + if verify: + private_key.public_key().verify(signature, data) + elif isinstance(private_key, ed448.Ed448PrivateKey): + if dnskey.algorithm != Algorithm.ED448: + raise ValueError("Invalid DNSKEY algorithm for ED448 key") + signature = private_key.sign(data) + if verify: + private_key.public_key().verify(signature, data) + else: + raise TypeError("Unsupported key algorithm") + + return RRSIG( + rdclass=rrsig_template.rdclass, + rdtype=rrsig_template.rdtype, + type_covered=rrsig_template.type_covered, + algorithm=rrsig_template.algorithm, + labels=rrsig_template.labels, + original_ttl=rrsig_template.original_ttl, + expiration=rrsig_template.expiration, + inception=rrsig_template.inception, + key_tag=rrsig_template.key_tag, + signer=rrsig_template.signer, + signature=signature, + ) + + +def _make_rrsig_signature_data( + rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + rrsig: RRSIG, + origin: Optional[dns.name.Name] = None, +) -> bytes: + """Create signature rdata. + + *rrset*, the RRset to sign/validate. This can be a + ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) + tuple. + + *rrsig*, a ``dns.rdata.Rdata``, the signature to validate, or the + signature template used when signing. + + *origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative + names. + + Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by + dnspython but not implemented. + """ + + if isinstance(origin, str): + origin = dns.name.from_text(origin, dns.name.root) + + signer = rrsig.signer + if not signer.is_absolute(): + if origin is None: + raise ValidationFailure("relative RR name without an origin specified") + signer = signer.derelativize(origin) + + # For convenience, allow the rrset to be specified as a (name, + # rdataset) tuple as well as a proper rrset + if isinstance(rrset, tuple): + rrname = rrset[0] + rdataset = rrset[1] + else: + rrname = rrset.name + rdataset = rrset + + data = b"" + data += rrsig.to_wire(origin=signer)[:18] + data += rrsig.signer.to_digestable(signer) + + # Derelativize the name before considering labels. + if not rrname.is_absolute(): + if origin is None: + raise ValidationFailure("relative RR name without an origin specified") + rrname = rrname.derelativize(origin) + + if len(rrname) - 1 < rrsig.labels: + raise ValidationFailure("owner name longer than RRSIG labels") + elif rrsig.labels < len(rrname) - 1: + suffix = rrname.split(rrsig.labels + 1)[1] + rrname = dns.name.from_text("*", suffix) + rrnamebuf = rrname.to_digestable() + rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl) + rdatas = [rdata.to_digestable(origin) for rdata in rdataset] + for rdata in sorted(rdatas): + data += rrnamebuf + data += rrfixed + rrlen = struct.pack("!H", len(rdata)) + data += rrlen + data += rdata + + return data + + +def _make_dnskey( + public_key: PublicKey, + algorithm: Union[int, str], + flags: int = Flag.ZONE, + protocol: int = 3, +) -> DNSKEY: + """Convert a public key to DNSKEY Rdata + + *public_key*, the public key to convert, a + ``cryptography.hazmat.primitives.asymmetric`` public key class applicable + for DNSSEC. + + *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. + + *flags: DNSKEY flags field as an integer. + + *protocol*: DNSKEY protocol field as an integer. + + Raises ``ValueError`` if the specified key algorithm parameters are not + unsupported, ``TypeError`` if the key type is unsupported, + `UnsupportedAlgorithm` if the algorithm is unknown and + `AlgorithmKeyMismatch` if the algorithm does not match the key type. + + Return DNSKEY ``Rdata``. + """ + + def encode_rsa_public_key(public_key: "rsa.RSAPublicKey") -> bytes: + """Encode a public key per RFC 3110, section 2.""" + pn = public_key.public_numbers() + _exp_len = math.ceil(int.bit_length(pn.e) / 8) + exp = int.to_bytes(pn.e, length=_exp_len, byteorder="big") + if _exp_len > 255: + exp_header = b"\0" + struct.pack("!H", _exp_len) + else: + exp_header = struct.pack("!B", _exp_len) + if pn.n.bit_length() < 512 or pn.n.bit_length() > 4096: + raise ValueError("unsupported RSA key length") + return exp_header + exp + pn.n.to_bytes((pn.n.bit_length() + 7) // 8, "big") + + def encode_dsa_public_key(public_key: "dsa.DSAPublicKey") -> bytes: + """Encode a public key per RFC 2536, section 2.""" + pn = public_key.public_numbers() + dsa_t = (public_key.key_size // 8 - 64) // 8 + if dsa_t > 8: + raise ValueError("unsupported DSA key size") + octets = 64 + dsa_t * 8 + res = struct.pack("!B", dsa_t) + res += pn.parameter_numbers.q.to_bytes(20, "big") + res += pn.parameter_numbers.p.to_bytes(octets, "big") + res += pn.parameter_numbers.g.to_bytes(octets, "big") + res += pn.y.to_bytes(octets, "big") + return res + + def encode_ecdsa_public_key(public_key: "ec.EllipticCurvePublicKey") -> bytes: + """Encode a public key per RFC 6605, section 4.""" + pn = public_key.public_numbers() + if isinstance(public_key.curve, ec.SECP256R1): + return pn.x.to_bytes(32, "big") + pn.y.to_bytes(32, "big") + elif isinstance(public_key.curve, ec.SECP384R1): + return pn.x.to_bytes(48, "big") + pn.y.to_bytes(48, "big") + else: + raise ValueError("unsupported ECDSA curve") + + try: + if isinstance(algorithm, str): + algorithm = Algorithm[algorithm.upper()] + except Exception: + raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) + + _ensure_algorithm_key_combination(algorithm, public_key) + + if isinstance(public_key, rsa.RSAPublicKey): + key_bytes = encode_rsa_public_key(public_key) + elif isinstance(public_key, dsa.DSAPublicKey): + key_bytes = encode_dsa_public_key(public_key) + elif isinstance(public_key, ec.EllipticCurvePublicKey): + key_bytes = encode_ecdsa_public_key(public_key) + elif isinstance(public_key, ed25519.Ed25519PublicKey): + key_bytes = public_key.public_bytes( + encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw + ) + elif isinstance(public_key, ed448.Ed448PublicKey): + key_bytes = public_key.public_bytes( + encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw + ) + else: + raise TypeError("unsupported key algorithm") + + return DNSKEY( + rdclass=dns.rdataclass.IN, + rdtype=dns.rdatatype.DNSKEY, + flags=flags, + protocol=protocol, + algorithm=algorithm, + key=key_bytes, + ) + + def nsec3_hash( domain: Union[dns.name.Name, str], salt: Optional[Union[str, bytes]], @@ -565,7 +902,7 @@ def _need_pyca(*args, **kwargs): try: from cryptography.exceptions import InvalidSignature from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import utils from cryptography.hazmat.primitives.asymmetric import dsa @@ -576,10 +913,14 @@ try: except ImportError: # pragma: no cover validate = _need_pyca validate_rrsig = _need_pyca + sign = _need_pyca + make_dnskey = _need_pyca _have_pyca = False else: validate = _validate # type: ignore validate_rrsig = _validate_rrsig # type: ignore + sign = _sign + make_dnskey = _make_dnskey _have_pyca = True ### BEGIN generated Algorithm constants |