summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2021-02-18 10:05:54 -0800
committerGitHub <noreply@github.com>2021-02-18 10:05:54 -0800
commit5e1a63047f5b0069e3992309fc2bafb0c0acfc1e (patch)
tree1e0c2746eb09519f8724962996eb29c6fe59e9cd
parent9ed0320f1f150065edfbd818be939d1b8d1729c3 (diff)
parent5bf70eab23bbade7473024e6146f8b757e8805fb (diff)
downloaddnspython-5e1a63047f5b0069e3992309fc2bafb0c0acfc1e.tar.gz
Merge pull request #634 from bwelling/refactor-dnssec
DNSSEC refactoring.
-rw-r--r--dns/dnssec.py293
1 files changed, 141 insertions, 152 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py
index 095fabd..f09ecd6 100644
--- a/dns/dnssec.py
+++ b/dns/dnssec.py
@@ -166,23 +166,15 @@ def make_ds(name, key, algorithm, origin=None):
def _find_candidate_keys(keys, rrsig):
- candidate_keys = []
value = keys.get(rrsig.signer)
- if value is None:
- return None
if isinstance(value, dns.node.Node):
- try:
- rdataset = value.find_rdataset(dns.rdataclass.IN,
- dns.rdatatype.DNSKEY)
- except KeyError:
- return None
+ rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY)
else:
rdataset = value
- for rdata in rdataset:
- if rdata.algorithm == rrsig.algorithm and \
- key_id(rdata) == rrsig.key_tag:
- candidate_keys.append(rdata)
- return candidate_keys
+ if rdataset is None:
+ return None
+ return [rd for rd in rdataset if
+ rd.algorithm == rrsig.algorithm and key_id(rd) == rrsig.key_tag]
def _is_rsa(algorithm):
@@ -251,6 +243,82 @@ def _bytes_to_long(b):
return int.from_bytes(b, 'big')
+def _validate_signature(sig, data, key, chosen_hash):
+ if _is_rsa(key.algorithm):
+ keyptr = key.key
+ (bytes_,) = struct.unpack('!B', keyptr[0:1])
+ keyptr = keyptr[1:]
+ if bytes_ == 0:
+ (bytes_,) = struct.unpack('!H', keyptr[0:2])
+ keyptr = keyptr[2:]
+ rsa_e = keyptr[0:bytes_]
+ rsa_n = keyptr[bytes_:]
+ try:
+ public_key = rsa.RSAPublicNumbers(
+ _bytes_to_long(rsa_e),
+ _bytes_to_long(rsa_n)).public_key(default_backend())
+ except ValueError:
+ raise ValidationFailure('invalid public key')
+ public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash)
+ elif _is_dsa(key.algorithm):
+ keyptr = key.key
+ (t,) = struct.unpack('!B', keyptr[0:1])
+ keyptr = keyptr[1:]
+ octets = 64 + t * 8
+ dsa_q = keyptr[0:20]
+ keyptr = keyptr[20:]
+ dsa_p = keyptr[0:octets]
+ keyptr = keyptr[octets:]
+ dsa_g = keyptr[0:octets]
+ keyptr = keyptr[octets:]
+ dsa_y = keyptr[0:octets]
+ try:
+ public_key = dsa.DSAPublicNumbers(
+ _bytes_to_long(dsa_y),
+ dsa.DSAParameterNumbers(
+ _bytes_to_long(dsa_p),
+ _bytes_to_long(dsa_q),
+ _bytes_to_long(dsa_g))).public_key(default_backend())
+ except ValueError:
+ raise ValidationFailure('invalid public key')
+ public_key.verify(sig, data, chosen_hash)
+ elif _is_ecdsa(key.algorithm):
+ keyptr = key.key
+ if key.algorithm == Algorithm.ECDSAP256SHA256:
+ curve = ec.SECP256R1()
+ octets = 32
+ else:
+ curve = ec.SECP384R1()
+ octets = 48
+ ecdsa_x = keyptr[0:octets]
+ ecdsa_y = keyptr[octets:octets * 2]
+ try:
+ public_key = ec.EllipticCurvePublicNumbers(
+ curve=curve,
+ x=_bytes_to_long(ecdsa_x),
+ y=_bytes_to_long(ecdsa_y)).public_key(default_backend())
+ except ValueError:
+ raise ValidationFailure('invalid public key')
+ public_key.verify(sig, data, ec.ECDSA(chosen_hash))
+ elif _is_eddsa(key.algorithm):
+ keyptr = key.key
+ if key.algorithm == Algorithm.ED25519:
+ loader = ed25519.Ed25519PublicKey
+ else:
+ loader = ed448.Ed448PublicKey
+ try:
+ public_key = loader.from_public_bytes(keyptr)
+ except ValueError:
+ raise ValidationFailure('invalid public key')
+ public_key.verify(sig, data)
+ elif _is_gost(key.algorithm):
+ raise UnsupportedAlgorithm(
+ 'algorithm "%s" not supported by dnspython' %
+ algorithm_to_text(key.algorithm))
+ else:
+ raise ValidationFailure('unknown algorithm %u' % key.algorithm)
+
+
def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
"""Validate an RRset against a single signature rdata, throwing an
exception if validation is not successful.
@@ -288,148 +356,69 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
if candidate_keys is None:
raise ValidationFailure('unknown key')
- for candidate_key in candidate_keys:
- # For convenience, allow the rrset to be specified as a (name,
- # rdataset) tuple as well as a proper rrset
- if isinstance(rrset, tuple):
- rrname = rrset[0]
- rdataset = rrset[1]
- else:
- rrname = rrset.name
- rdataset = rrset
-
- if now is None:
- now = time.time()
- if rrsig.expiration < now:
- raise ValidationFailure('expired')
- if rrsig.inception > now:
- raise ValidationFailure('not yet valid')
-
- if _is_rsa(rrsig.algorithm):
- keyptr = candidate_key.key
- (bytes_,) = struct.unpack('!B', keyptr[0:1])
- keyptr = keyptr[1:]
- if bytes_ == 0:
- (bytes_,) = struct.unpack('!H', keyptr[0:2])
- keyptr = keyptr[2:]
- rsa_e = keyptr[0:bytes_]
- rsa_n = keyptr[bytes_:]
- try:
- public_key = rsa.RSAPublicNumbers(
- _bytes_to_long(rsa_e),
- _bytes_to_long(rsa_n)).public_key(default_backend())
- except ValueError:
- raise ValidationFailure('invalid public key')
- sig = rrsig.signature
- elif _is_dsa(rrsig.algorithm):
- keyptr = candidate_key.key
- (t,) = struct.unpack('!B', keyptr[0:1])
- keyptr = keyptr[1:]
- octets = 64 + t * 8
- dsa_q = keyptr[0:20]
- keyptr = keyptr[20:]
- dsa_p = keyptr[0:octets]
- keyptr = keyptr[octets:]
- dsa_g = keyptr[0:octets]
- keyptr = keyptr[octets:]
- dsa_y = keyptr[0:octets]
- try:
- public_key = dsa.DSAPublicNumbers(
- _bytes_to_long(dsa_y),
- dsa.DSAParameterNumbers(
- _bytes_to_long(dsa_p),
- _bytes_to_long(dsa_q),
- _bytes_to_long(dsa_g))).public_key(default_backend())
- except ValueError:
- raise ValidationFailure('invalid public key')
- sig_r = rrsig.signature[1:21]
- sig_s = rrsig.signature[21:]
- sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
- _bytes_to_long(sig_s))
- elif _is_ecdsa(rrsig.algorithm):
- keyptr = candidate_key.key
- if rrsig.algorithm == Algorithm.ECDSAP256SHA256:
- curve = ec.SECP256R1()
- octets = 32
- else:
- curve = ec.SECP384R1()
- octets = 48
- ecdsa_x = keyptr[0:octets]
- ecdsa_y = keyptr[octets:octets * 2]
- try:
- public_key = ec.EllipticCurvePublicNumbers(
- curve=curve,
- x=_bytes_to_long(ecdsa_x),
- y=_bytes_to_long(ecdsa_y)).public_key(default_backend())
- except ValueError:
- raise ValidationFailure('invalid public key')
- sig_r = rrsig.signature[0:octets]
- sig_s = rrsig.signature[octets:]
- sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
- _bytes_to_long(sig_s))
-
- elif _is_eddsa(rrsig.algorithm):
- keyptr = candidate_key.key
- if rrsig.algorithm == Algorithm.ED25519:
- loader = ed25519.Ed25519PublicKey
- else:
- loader = ed448.Ed448PublicKey
- try:
- public_key = loader.from_public_bytes(keyptr)
- except ValueError:
- raise ValidationFailure('invalid public key')
- sig = rrsig.signature
- elif _is_gost(rrsig.algorithm):
- raise UnsupportedAlgorithm(
- 'algorithm "%s" not supported by dnspython' %
- algorithm_to_text(rrsig.algorithm))
+ # For convenience, allow the rrset to be specified as a (name,
+ # rdataset) tuple as well as a proper rrset
+ if isinstance(rrset, tuple):
+ rrname = rrset[0]
+ rdataset = rrset[1]
+ else:
+ rrname = rrset.name
+ rdataset = rrset
+
+ if now is None:
+ now = time.time()
+ if rrsig.expiration < now:
+ raise ValidationFailure('expired')
+ if rrsig.inception > now:
+ raise ValidationFailure('not yet valid')
+
+ if _is_dsa(rrsig.algorithm):
+ sig_r = rrsig.signature[1:21]
+ sig_s = rrsig.signature[21:]
+ sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
+ _bytes_to_long(sig_s))
+ elif _is_ecdsa(rrsig.algorithm):
+ if rrsig.algorithm == Algorithm.ECDSAP256SHA256:
+ octets = 32
else:
- raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
-
- data = b''
- data += rrsig.to_wire(origin=origin)[:18]
- data += rrsig.signer.to_digestable(origin)
-
- # Derelativize the name before considering labels.
- rrname = rrname.derelativize(origin)
-
- if len(rrname) - 1 < rrsig.labels:
- raise ValidationFailure('owner name longer than RRSIG labels')
- elif rrsig.labels < len(rrname) - 1:
- suffix = rrname.split(rrsig.labels + 1)[1]
- rrname = dns.name.from_text('*', suffix)
- rrnamebuf = rrname.to_digestable()
- rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
- rrsig.original_ttl)
- rrlist = sorted(rdataset)
- for rr in rrlist:
- data += rrnamebuf
- data += rrfixed
- rrdata = rr.to_digestable(origin)
- rrlen = struct.pack('!H', len(rrdata))
- data += rrlen
- data += rrdata
-
- chosen_hash = _make_hash(rrsig.algorithm)
+ octets = 48
+ sig_r = rrsig.signature[0:octets]
+ sig_s = rrsig.signature[octets:]
+ sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
+ _bytes_to_long(sig_s))
+ else:
+ sig = rrsig.signature
+
+ data = b''
+ data += rrsig.to_wire(origin=origin)[:18]
+ data += rrsig.signer.to_digestable(origin)
+
+ # Derelativize the name before considering labels.
+ rrname = rrname.derelativize(origin)
+
+ if len(rrname) - 1 < rrsig.labels:
+ raise ValidationFailure('owner name longer than RRSIG labels')
+ elif rrsig.labels < len(rrname) - 1:
+ suffix = rrname.split(rrsig.labels + 1)[1]
+ rrname = dns.name.from_text('*', suffix)
+ rrnamebuf = rrname.to_digestable()
+ rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
+ rrsig.original_ttl)
+ for rr in sorted(rdataset):
+ data += rrnamebuf
+ data += rrfixed
+ rrdata = rr.to_digestable(origin)
+ rrlen = struct.pack('!H', len(rrdata))
+ data += rrlen
+ data += rrdata
+
+ chosen_hash = _make_hash(rrsig.algorithm)
+
+ for candidate_key in candidate_keys:
try:
- if _is_rsa(rrsig.algorithm):
- public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash)
- elif _is_dsa(rrsig.algorithm):
- public_key.verify(sig, data, chosen_hash)
- elif _is_ecdsa(rrsig.algorithm):
- public_key.verify(sig, data, ec.ECDSA(chosen_hash))
- elif _is_eddsa(rrsig.algorithm):
- public_key.verify(sig, data)
- else:
- # Raise here for code clarity; this won't actually ever happen
- # since if the algorithm is really unknown we'd already have
- # raised an exception above
- raise ValidationFailure('unknown algorithm %u' %
- rrsig.algorithm) # pragma: no cover
- # If we got here, we successfully verified so we can return
- # without error
+ _validate_signature(sig, data, candidate_key, chosen_hash)
return
- except InvalidSignature:
+ except (InvalidSignature, ValidationFailure):
# this happens on an individual validation failure
continue
# nothing verified -- raise failure: