diff options
Diffstat (limited to 'dns/dnssec.py')
-rw-r--r-- | dns/dnssec.py | 293 |
1 files changed, 141 insertions, 152 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py index 095fabd..f09ecd6 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -166,23 +166,15 @@ def make_ds(name, key, algorithm, origin=None): def _find_candidate_keys(keys, rrsig): - candidate_keys = [] value = keys.get(rrsig.signer) - if value is None: - return None if isinstance(value, dns.node.Node): - try: - rdataset = value.find_rdataset(dns.rdataclass.IN, - dns.rdatatype.DNSKEY) - except KeyError: - return None + rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY) else: rdataset = value - for rdata in rdataset: - if rdata.algorithm == rrsig.algorithm and \ - key_id(rdata) == rrsig.key_tag: - candidate_keys.append(rdata) - return candidate_keys + if rdataset is None: + return None + return [rd for rd in rdataset if + rd.algorithm == rrsig.algorithm and key_id(rd) == rrsig.key_tag] def _is_rsa(algorithm): @@ -251,6 +243,82 @@ def _bytes_to_long(b): return int.from_bytes(b, 'big') +def _validate_signature(sig, data, key, chosen_hash): + if _is_rsa(key.algorithm): + keyptr = key.key + (bytes_,) = struct.unpack('!B', keyptr[0:1]) + keyptr = keyptr[1:] + if bytes_ == 0: + (bytes_,) = struct.unpack('!H', keyptr[0:2]) + keyptr = keyptr[2:] + rsa_e = keyptr[0:bytes_] + rsa_n = keyptr[bytes_:] + try: + public_key = rsa.RSAPublicNumbers( + _bytes_to_long(rsa_e), + _bytes_to_long(rsa_n)).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash) + elif _is_dsa(key.algorithm): + keyptr = key.key + (t,) = struct.unpack('!B', keyptr[0:1]) + keyptr = keyptr[1:] + octets = 64 + t * 8 + dsa_q = keyptr[0:20] + keyptr = keyptr[20:] + dsa_p = keyptr[0:octets] + keyptr = keyptr[octets:] + dsa_g = keyptr[0:octets] + keyptr = keyptr[octets:] + dsa_y = keyptr[0:octets] + try: + public_key = dsa.DSAPublicNumbers( + _bytes_to_long(dsa_y), + dsa.DSAParameterNumbers( + _bytes_to_long(dsa_p), + _bytes_to_long(dsa_q), + _bytes_to_long(dsa_g))).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data, chosen_hash) + elif _is_ecdsa(key.algorithm): + keyptr = key.key + if key.algorithm == Algorithm.ECDSAP256SHA256: + curve = ec.SECP256R1() + octets = 32 + else: + curve = ec.SECP384R1() + octets = 48 + ecdsa_x = keyptr[0:octets] + ecdsa_y = keyptr[octets:octets * 2] + try: + public_key = ec.EllipticCurvePublicNumbers( + curve=curve, + x=_bytes_to_long(ecdsa_x), + y=_bytes_to_long(ecdsa_y)).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data, ec.ECDSA(chosen_hash)) + elif _is_eddsa(key.algorithm): + keyptr = key.key + if key.algorithm == Algorithm.ED25519: + loader = ed25519.Ed25519PublicKey + else: + loader = ed448.Ed448PublicKey + try: + public_key = loader.from_public_bytes(keyptr) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data) + elif _is_gost(key.algorithm): + raise UnsupportedAlgorithm( + 'algorithm "%s" not supported by dnspython' % + algorithm_to_text(key.algorithm)) + else: + raise ValidationFailure('unknown algorithm %u' % key.algorithm) + + def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): """Validate an RRset against a single signature rdata, throwing an exception if validation is not successful. @@ -288,148 +356,69 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): if candidate_keys is None: raise ValidationFailure('unknown key') - for candidate_key in candidate_keys: - # For convenience, allow the rrset to be specified as a (name, - # rdataset) tuple as well as a proper rrset - if isinstance(rrset, tuple): - rrname = rrset[0] - rdataset = rrset[1] - else: - rrname = rrset.name - rdataset = rrset - - if now is None: - now = time.time() - if rrsig.expiration < now: - raise ValidationFailure('expired') - if rrsig.inception > now: - raise ValidationFailure('not yet valid') - - if _is_rsa(rrsig.algorithm): - keyptr = candidate_key.key - (bytes_,) = struct.unpack('!B', keyptr[0:1]) - keyptr = keyptr[1:] - if bytes_ == 0: - (bytes_,) = struct.unpack('!H', keyptr[0:2]) - keyptr = keyptr[2:] - rsa_e = keyptr[0:bytes_] - rsa_n = keyptr[bytes_:] - try: - public_key = rsa.RSAPublicNumbers( - _bytes_to_long(rsa_e), - _bytes_to_long(rsa_n)).public_key(default_backend()) - except ValueError: - raise ValidationFailure('invalid public key') - sig = rrsig.signature - elif _is_dsa(rrsig.algorithm): - keyptr = candidate_key.key - (t,) = struct.unpack('!B', keyptr[0:1]) - keyptr = keyptr[1:] - octets = 64 + t * 8 - dsa_q = keyptr[0:20] - keyptr = keyptr[20:] - dsa_p = keyptr[0:octets] - keyptr = keyptr[octets:] - dsa_g = keyptr[0:octets] - keyptr = keyptr[octets:] - dsa_y = keyptr[0:octets] - try: - public_key = dsa.DSAPublicNumbers( - _bytes_to_long(dsa_y), - dsa.DSAParameterNumbers( - _bytes_to_long(dsa_p), - _bytes_to_long(dsa_q), - _bytes_to_long(dsa_g))).public_key(default_backend()) - except ValueError: - raise ValidationFailure('invalid public key') - sig_r = rrsig.signature[1:21] - sig_s = rrsig.signature[21:] - sig = utils.encode_dss_signature(_bytes_to_long(sig_r), - _bytes_to_long(sig_s)) - elif _is_ecdsa(rrsig.algorithm): - keyptr = candidate_key.key - if rrsig.algorithm == Algorithm.ECDSAP256SHA256: - curve = ec.SECP256R1() - octets = 32 - else: - curve = ec.SECP384R1() - octets = 48 - ecdsa_x = keyptr[0:octets] - ecdsa_y = keyptr[octets:octets * 2] - try: - public_key = ec.EllipticCurvePublicNumbers( - curve=curve, - x=_bytes_to_long(ecdsa_x), - y=_bytes_to_long(ecdsa_y)).public_key(default_backend()) - except ValueError: - raise ValidationFailure('invalid public key') - sig_r = rrsig.signature[0:octets] - sig_s = rrsig.signature[octets:] - sig = utils.encode_dss_signature(_bytes_to_long(sig_r), - _bytes_to_long(sig_s)) - - elif _is_eddsa(rrsig.algorithm): - keyptr = candidate_key.key - if rrsig.algorithm == Algorithm.ED25519: - loader = ed25519.Ed25519PublicKey - else: - loader = ed448.Ed448PublicKey - try: - public_key = loader.from_public_bytes(keyptr) - except ValueError: - raise ValidationFailure('invalid public key') - sig = rrsig.signature - elif _is_gost(rrsig.algorithm): - raise UnsupportedAlgorithm( - 'algorithm "%s" not supported by dnspython' % - algorithm_to_text(rrsig.algorithm)) + # For convenience, allow the rrset to be specified as a (name, + # rdataset) tuple as well as a proper rrset + if isinstance(rrset, tuple): + rrname = rrset[0] + rdataset = rrset[1] + else: + rrname = rrset.name + rdataset = rrset + + if now is None: + now = time.time() + if rrsig.expiration < now: + raise ValidationFailure('expired') + if rrsig.inception > now: + raise ValidationFailure('not yet valid') + + if _is_dsa(rrsig.algorithm): + sig_r = rrsig.signature[1:21] + sig_s = rrsig.signature[21:] + sig = utils.encode_dss_signature(_bytes_to_long(sig_r), + _bytes_to_long(sig_s)) + elif _is_ecdsa(rrsig.algorithm): + if rrsig.algorithm == Algorithm.ECDSAP256SHA256: + octets = 32 else: - raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) - - data = b'' - data += rrsig.to_wire(origin=origin)[:18] - data += rrsig.signer.to_digestable(origin) - - # Derelativize the name before considering labels. - rrname = rrname.derelativize(origin) - - if len(rrname) - 1 < rrsig.labels: - raise ValidationFailure('owner name longer than RRSIG labels') - elif rrsig.labels < len(rrname) - 1: - suffix = rrname.split(rrsig.labels + 1)[1] - rrname = dns.name.from_text('*', suffix) - rrnamebuf = rrname.to_digestable() - rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, - rrsig.original_ttl) - rrlist = sorted(rdataset) - for rr in rrlist: - data += rrnamebuf - data += rrfixed - rrdata = rr.to_digestable(origin) - rrlen = struct.pack('!H', len(rrdata)) - data += rrlen - data += rrdata - - chosen_hash = _make_hash(rrsig.algorithm) + octets = 48 + sig_r = rrsig.signature[0:octets] + sig_s = rrsig.signature[octets:] + sig = utils.encode_dss_signature(_bytes_to_long(sig_r), + _bytes_to_long(sig_s)) + else: + sig = rrsig.signature + + data = b'' + data += rrsig.to_wire(origin=origin)[:18] + data += rrsig.signer.to_digestable(origin) + + # Derelativize the name before considering labels. + rrname = rrname.derelativize(origin) + + if len(rrname) - 1 < rrsig.labels: + raise ValidationFailure('owner name longer than RRSIG labels') + elif rrsig.labels < len(rrname) - 1: + suffix = rrname.split(rrsig.labels + 1)[1] + rrname = dns.name.from_text('*', suffix) + rrnamebuf = rrname.to_digestable() + rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, + rrsig.original_ttl) + for rr in sorted(rdataset): + data += rrnamebuf + data += rrfixed + rrdata = rr.to_digestable(origin) + rrlen = struct.pack('!H', len(rrdata)) + data += rrlen + data += rrdata + + chosen_hash = _make_hash(rrsig.algorithm) + + for candidate_key in candidate_keys: try: - if _is_rsa(rrsig.algorithm): - public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash) - elif _is_dsa(rrsig.algorithm): - public_key.verify(sig, data, chosen_hash) - elif _is_ecdsa(rrsig.algorithm): - public_key.verify(sig, data, ec.ECDSA(chosen_hash)) - elif _is_eddsa(rrsig.algorithm): - public_key.verify(sig, data) - else: - # Raise here for code clarity; this won't actually ever happen - # since if the algorithm is really unknown we'd already have - # raised an exception above - raise ValidationFailure('unknown algorithm %u' % - rrsig.algorithm) # pragma: no cover - # If we got here, we successfully verified so we can return - # without error + _validate_signature(sig, data, candidate_key, chosen_hash) return - except InvalidSignature: + except (InvalidSignature, ValidationFailure): # this happens on an individual validation failure continue # nothing verified -- raise failure: |