summaryrefslogtreecommitdiff
path: root/dns/dnssec.py
diff options
context:
space:
mode:
Diffstat (limited to 'dns/dnssec.py')
-rw-r--r--dns/dnssec.py419
1 files changed, 380 insertions, 39 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py
index 13415bd..c4aff95 100644
--- a/dns/dnssec.py
+++ b/dns/dnssec.py
@@ -20,9 +20,11 @@
from typing import Any, cast, Dict, List, Optional, Tuple, Union
import hashlib
+import math
import struct
import time
import base64
+from datetime import datetime, timezone
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
@@ -36,17 +38,37 @@ import dns.rdataclass
import dns.rrset
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.ANY.DS import DS
-from dns.rdtypes.ANY.RRSIG import RRSIG
+from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
+from dns.rdtypes.dnskeybase import Flag
class UnsupportedAlgorithm(dns.exception.DNSException):
"""The DNSSEC algorithm is not supported."""
+class AlgorithmKeyMismatch(UnsupportedAlgorithm):
+ """The DNSSEC algorithm is not supported for the given key type."""
+
+
class ValidationFailure(dns.exception.DNSException):
"""The DNSSEC signature is invalid."""
+PublicKey = Union[
+ "rsa.RSAPublicKey",
+ "ec.EllipticCurvePublicKey",
+ "ed25519.Ed25519PublicKey",
+ "ed448.Ed448PublicKey",
+]
+
+PrivateKey = Union[
+ "rsa.RSAPrivateKey",
+ "ec.EllipticCurvePrivateKey",
+ "ed25519.Ed25519PrivateKey",
+ "ed448.Ed448PrivateKey",
+]
+
+
def algorithm_from_text(text: str) -> Algorithm:
"""Convert text into a DNSSEC algorithm value.
@@ -69,6 +91,20 @@ def algorithm_to_text(value: Union[Algorithm, int]) -> str:
return Algorithm.to_text(value)
+def to_timestamp(value: Union[datetime, str, float, int]) -> int:
+ """Convert various format to a timestamp"""
+ if isinstance(value, datetime):
+ return int(value.timestamp())
+ elif isinstance(value, str):
+ return sigtime_to_posixtime(value)
+ elif isinstance(value, float):
+ return int(value)
+ elif isinstance(value, int):
+ return value
+ else:
+ raise TypeError("Unsupported timestamp type")
+
+
def key_id(key: DNSKEY) -> int:
"""Return the key id (a 16-bit number) for the specified key.
@@ -213,6 +249,35 @@ def _is_sha512(algorithm: int) -> bool:
return algorithm == Algorithm.RSASHA512
+def _ensure_algorithm_key_combination(algorithm: int, key: PublicKey) -> None:
+ """Ensure algorithm is valid for key type, throwing an exception on
+ mismatch."""
+ if isinstance(key, rsa.RSAPublicKey):
+ if _is_rsa(algorithm):
+ return
+ raise AlgorithmKeyMismatch('algorithm "%s" not valid for RSA key' % algorithm)
+ if isinstance(key, dsa.DSAPublicKey):
+ if _is_dsa(algorithm):
+ return
+ raise AlgorithmKeyMismatch('algorithm "%s" not valid for DSA key' % algorithm)
+ if isinstance(key, ec.EllipticCurvePublicKey):
+ if _is_ecdsa(algorithm):
+ return
+ raise AlgorithmKeyMismatch('algorithm "%s" not valid for ECDSA key' % algorithm)
+ if isinstance(key, ed25519.Ed25519PublicKey):
+ if algorithm == Algorithm.ED25519:
+ return
+ raise AlgorithmKeyMismatch(
+ 'algorithm "%s" not valid for ED25519 key' % algorithm
+ )
+ if isinstance(key, ed448.Ed448PublicKey):
+ if algorithm == Algorithm.ED448:
+ return
+ raise AlgorithmKeyMismatch('algorithm "%s" not valid for ED448 key' % algorithm)
+
+ raise TypeError("unsupported key type")
+
+
def _make_hash(algorithm: int) -> Any:
if _is_md5(algorithm):
return hashes.MD5()
@@ -353,22 +418,10 @@ def _validate_rrsig(
dnspython but not implemented.
"""
- if isinstance(origin, str):
- origin = dns.name.from_text(origin, dns.name.root)
-
candidate_keys = _find_candidate_keys(keys, rrsig)
if candidate_keys is None:
raise ValidationFailure("unknown key")
- # For convenience, allow the rrset to be specified as a (name,
- # rdataset) tuple as well as a proper rrset
- if isinstance(rrset, tuple):
- rrname = rrset[0]
- rdataset = rrset[1]
- else:
- rrname = rrset.name
- rdataset = rrset
-
if now is None:
now = time.time()
if rrsig.expiration < now:
@@ -391,31 +444,7 @@ def _validate_rrsig(
else:
sig = rrsig.signature
- data = b""
- data += rrsig.to_wire(origin=origin)[:18]
- data += rrsig.signer.to_digestable(origin)
-
- # Derelativize the name before considering labels.
- if not rrname.is_absolute():
- if origin is None:
- raise ValidationFailure("relative RR name without an origin specified")
- rrname = rrname.derelativize(origin)
-
- if len(rrname) - 1 < rrsig.labels:
- raise ValidationFailure("owner name longer than RRSIG labels")
- elif rrsig.labels < len(rrname) - 1:
- suffix = rrname.split(rrsig.labels + 1)[1]
- rrname = dns.name.from_text("*", suffix)
- rrnamebuf = rrname.to_digestable()
- rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl)
- rdatas = [rdata.to_digestable(origin) for rdata in rdataset]
- for rdata in sorted(rdatas):
- data += rrnamebuf
- data += rrfixed
- rrlen = struct.pack("!H", len(rdata))
- data += rrlen
- data += rdata
-
+ data = _make_rrsig_signature_data(rrset, rrsig, origin)
chosen_hash = _make_hash(rrsig.algorithm)
for candidate_key in candidate_keys:
@@ -495,6 +524,314 @@ def _validate(
raise ValidationFailure("no RRSIGs validated")
+def _sign(
+ rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ private_key: PrivateKey,
+ signer: dns.name.Name,
+ dnskey: DNSKEY,
+ inception: Optional[Union[datetime, str, float]] = None,
+ expiration: Optional[Union[datetime, str, float]] = None,
+ lifetime: Optional[int] = None,
+ verify: bool = False,
+) -> RRSIG:
+ """Sign RRset using private key.
+
+ *rrset*, the RRset to validate. This can be a
+ ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+ tuple.
+
+ *private_key*, the private key to use for signing, a
+ ``cryptography.hazmat.primitives.asymmetric`` private key class applicable
+ for DNSSEC.
+
+ *signer*, a ``dns.name.Name``, the Signer's name.
+
+ *dnskey*, a ``DNSKEY`` matching ``private_key``.
+
+ *inception*, a ``datetime``, ``str``, or ``float``, signature inception; defaults to now.
+
+ *expiration*, a ``datetime``, ``str`` or ``float``, signature expiration. May be specified as lifetime.
+
+ *lifetime*, an ``int`` specifiying the signature lifetime in seconds.
+
+ *verify*, a ``bool`` set to ``True`` if the signer should verify issued signaures.
+ """
+
+ if isinstance(rrset, tuple):
+ rdclass = rrset[1].rdclass
+ rdtype = rrset[1].rdtype
+ rrname = rrset[0]
+ original_ttl = rrset[1].ttl
+ else:
+ rdclass = rrset.rdclass
+ rdtype = rrset.rdtype
+ rrname = rrset.name
+ original_ttl = rrset.ttl
+
+ if inception is not None:
+ rrsig_inception = to_timestamp(inception)
+ else:
+ rrsig_inception = int(time.time())
+
+ if expiration is not None:
+ rrsig_expiration = to_timestamp(expiration)
+ elif lifetime is not None:
+ rrsig_expiration = int(time.time()) + lifetime
+ else:
+ raise ValueError("expiration or lifetime must be specified")
+
+ rrsig_template = RRSIG(
+ rdclass=rdclass,
+ rdtype=dns.rdatatype.RRSIG,
+ type_covered=rdtype,
+ algorithm=dnskey.algorithm,
+ labels=len(rrname) - 1,
+ original_ttl=original_ttl,
+ expiration=rrsig_expiration,
+ inception=rrsig_inception,
+ key_tag=key_id(dnskey),
+ signer=signer,
+ signature=b"",
+ )
+
+ data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template)
+ chosen_hash = _make_hash(rrsig_template.algorithm)
+ signature = None
+
+ if isinstance(private_key, rsa.RSAPrivateKey):
+ if not _is_rsa(dnskey.algorithm):
+ raise ValueError("Invalid DNSKEY algorithm for RSA key")
+ signature = private_key.sign(data, padding.PKCS1v15(), chosen_hash)
+ if verify:
+ private_key.public_key().verify(
+ signature, data, padding.PKCS1v15(), chosen_hash
+ )
+ elif isinstance(private_key, dsa.DSAPrivateKey):
+ if not _is_dsa(dnskey.algorithm):
+ raise ValueError("Invalid DNSKEY algorithm for DSA key")
+ public_dsa_key = private_key.public_key()
+ if public_dsa_key.key_size > 1024:
+ raise ValueError("DSA key size overflow")
+ der_signature = private_key.sign(data, chosen_hash)
+ if verify:
+ public_dsa_key.verify(der_signature, data, chosen_hash)
+ dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
+ dsa_t = (public_dsa_key.key_size // 8 - 64) // 8
+ octets = 20
+ signature = (
+ struct.pack("!B", dsa_t)
+ + int.to_bytes(dsa_r, length=octets, byteorder="big")
+ + int.to_bytes(dsa_s, length=octets, byteorder="big")
+ )
+ elif isinstance(private_key, ec.EllipticCurvePrivateKey):
+ if not _is_ecdsa(dnskey.algorithm):
+ raise ValueError("Invalid DNSKEY algorithm for EC key")
+ der_signature = private_key.sign(data, ec.ECDSA(chosen_hash))
+ if verify:
+ private_key.public_key().verify(der_signature, data, ec.ECDSA(chosen_hash))
+ if dnskey.algorithm == Algorithm.ECDSAP256SHA256:
+ octets = 32
+ else:
+ octets = 48
+ dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
+ signature = int.to_bytes(dsa_r, length=octets, byteorder="big") + int.to_bytes(
+ dsa_s, length=octets, byteorder="big"
+ )
+ elif isinstance(private_key, ed25519.Ed25519PrivateKey):
+ if dnskey.algorithm != Algorithm.ED25519:
+ raise ValueError("Invalid DNSKEY algorithm for ED25519 key")
+ signature = private_key.sign(data)
+ if verify:
+ private_key.public_key().verify(signature, data)
+ elif isinstance(private_key, ed448.Ed448PrivateKey):
+ if dnskey.algorithm != Algorithm.ED448:
+ raise ValueError("Invalid DNSKEY algorithm for ED448 key")
+ signature = private_key.sign(data)
+ if verify:
+ private_key.public_key().verify(signature, data)
+ else:
+ raise TypeError("Unsupported key algorithm")
+
+ return RRSIG(
+ rdclass=rrsig_template.rdclass,
+ rdtype=rrsig_template.rdtype,
+ type_covered=rrsig_template.type_covered,
+ algorithm=rrsig_template.algorithm,
+ labels=rrsig_template.labels,
+ original_ttl=rrsig_template.original_ttl,
+ expiration=rrsig_template.expiration,
+ inception=rrsig_template.inception,
+ key_tag=rrsig_template.key_tag,
+ signer=rrsig_template.signer,
+ signature=signature,
+ )
+
+
+def _make_rrsig_signature_data(
+ rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ rrsig: RRSIG,
+ origin: Optional[dns.name.Name] = None,
+) -> bytes:
+ """Create signature rdata.
+
+ *rrset*, the RRset to sign/validate. This can be a
+ ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+ tuple.
+
+ *rrsig*, a ``dns.rdata.Rdata``, the signature to validate, or the
+ signature template used when signing.
+
+ *origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative
+ names.
+
+ Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by
+ dnspython but not implemented.
+ """
+
+ if isinstance(origin, str):
+ origin = dns.name.from_text(origin, dns.name.root)
+
+ signer = rrsig.signer
+ if not signer.is_absolute():
+ if origin is None:
+ raise ValidationFailure("relative RR name without an origin specified")
+ signer = signer.derelativize(origin)
+
+ # For convenience, allow the rrset to be specified as a (name,
+ # rdataset) tuple as well as a proper rrset
+ if isinstance(rrset, tuple):
+ rrname = rrset[0]
+ rdataset = rrset[1]
+ else:
+ rrname = rrset.name
+ rdataset = rrset
+
+ data = b""
+ data += rrsig.to_wire(origin=signer)[:18]
+ data += rrsig.signer.to_digestable(signer)
+
+ # Derelativize the name before considering labels.
+ if not rrname.is_absolute():
+ if origin is None:
+ raise ValidationFailure("relative RR name without an origin specified")
+ rrname = rrname.derelativize(origin)
+
+ if len(rrname) - 1 < rrsig.labels:
+ raise ValidationFailure("owner name longer than RRSIG labels")
+ elif rrsig.labels < len(rrname) - 1:
+ suffix = rrname.split(rrsig.labels + 1)[1]
+ rrname = dns.name.from_text("*", suffix)
+ rrnamebuf = rrname.to_digestable()
+ rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl)
+ rdatas = [rdata.to_digestable(origin) for rdata in rdataset]
+ for rdata in sorted(rdatas):
+ data += rrnamebuf
+ data += rrfixed
+ rrlen = struct.pack("!H", len(rdata))
+ data += rrlen
+ data += rdata
+
+ return data
+
+
+def _make_dnskey(
+ public_key: PublicKey,
+ algorithm: Union[int, str],
+ flags: int = Flag.ZONE,
+ protocol: int = 3,
+) -> DNSKEY:
+ """Convert a public key to DNSKEY Rdata
+
+ *public_key*, the public key to convert, a
+ ``cryptography.hazmat.primitives.asymmetric`` public key class applicable
+ for DNSSEC.
+
+ *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
+
+ *flags: DNSKEY flags field as an integer.
+
+ *protocol*: DNSKEY protocol field as an integer.
+
+ Raises ``ValueError`` if the specified key algorithm parameters are not
+ unsupported, ``TypeError`` if the key type is unsupported,
+ `UnsupportedAlgorithm` if the algorithm is unknown and
+ `AlgorithmKeyMismatch` if the algorithm does not match the key type.
+
+ Return DNSKEY ``Rdata``.
+ """
+
+ def encode_rsa_public_key(public_key: "rsa.RSAPublicKey") -> bytes:
+ """Encode a public key per RFC 3110, section 2."""
+ pn = public_key.public_numbers()
+ _exp_len = math.ceil(int.bit_length(pn.e) / 8)
+ exp = int.to_bytes(pn.e, length=_exp_len, byteorder="big")
+ if _exp_len > 255:
+ exp_header = b"\0" + struct.pack("!H", _exp_len)
+ else:
+ exp_header = struct.pack("!B", _exp_len)
+ if pn.n.bit_length() < 512 or pn.n.bit_length() > 4096:
+ raise ValueError("unsupported RSA key length")
+ return exp_header + exp + pn.n.to_bytes((pn.n.bit_length() + 7) // 8, "big")
+
+ def encode_dsa_public_key(public_key: "dsa.DSAPublicKey") -> bytes:
+ """Encode a public key per RFC 2536, section 2."""
+ pn = public_key.public_numbers()
+ dsa_t = (public_key.key_size // 8 - 64) // 8
+ if dsa_t > 8:
+ raise ValueError("unsupported DSA key size")
+ octets = 64 + dsa_t * 8
+ res = struct.pack("!B", dsa_t)
+ res += pn.parameter_numbers.q.to_bytes(20, "big")
+ res += pn.parameter_numbers.p.to_bytes(octets, "big")
+ res += pn.parameter_numbers.g.to_bytes(octets, "big")
+ res += pn.y.to_bytes(octets, "big")
+ return res
+
+ def encode_ecdsa_public_key(public_key: "ec.EllipticCurvePublicKey") -> bytes:
+ """Encode a public key per RFC 6605, section 4."""
+ pn = public_key.public_numbers()
+ if isinstance(public_key.curve, ec.SECP256R1):
+ return pn.x.to_bytes(32, "big") + pn.y.to_bytes(32, "big")
+ elif isinstance(public_key.curve, ec.SECP384R1):
+ return pn.x.to_bytes(48, "big") + pn.y.to_bytes(48, "big")
+ else:
+ raise ValueError("unsupported ECDSA curve")
+
+ try:
+ if isinstance(algorithm, str):
+ algorithm = Algorithm[algorithm.upper()]
+ except Exception:
+ raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+
+ _ensure_algorithm_key_combination(algorithm, public_key)
+
+ if isinstance(public_key, rsa.RSAPublicKey):
+ key_bytes = encode_rsa_public_key(public_key)
+ elif isinstance(public_key, dsa.DSAPublicKey):
+ key_bytes = encode_dsa_public_key(public_key)
+ elif isinstance(public_key, ec.EllipticCurvePublicKey):
+ key_bytes = encode_ecdsa_public_key(public_key)
+ elif isinstance(public_key, ed25519.Ed25519PublicKey):
+ key_bytes = public_key.public_bytes(
+ encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
+ )
+ elif isinstance(public_key, ed448.Ed448PublicKey):
+ key_bytes = public_key.public_bytes(
+ encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
+ )
+ else:
+ raise TypeError("unsupported key algorithm")
+
+ return DNSKEY(
+ rdclass=dns.rdataclass.IN,
+ rdtype=dns.rdatatype.DNSKEY,
+ flags=flags,
+ protocol=protocol,
+ algorithm=algorithm,
+ key=key_bytes,
+ )
+
+
def nsec3_hash(
domain: Union[dns.name.Name, str],
salt: Optional[Union[str, bytes]],
@@ -565,7 +902,7 @@ def _need_pyca(*args, **kwargs):
try:
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import utils
from cryptography.hazmat.primitives.asymmetric import dsa
@@ -576,10 +913,14 @@ try:
except ImportError: # pragma: no cover
validate = _need_pyca
validate_rrsig = _need_pyca
+ sign = _need_pyca
+ make_dnskey = _need_pyca
_have_pyca = False
else:
validate = _validate # type: ignore
validate_rrsig = _validate_rrsig # type: ignore
+ sign = _sign
+ make_dnskey = _make_dnskey
_have_pyca = True
### BEGIN generated Algorithm constants