summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@play-bow.org>2013-03-31 03:33:29 -0700
committerBob Halley <halley@play-bow.org>2013-03-31 03:33:29 -0700
commit4cf53b0d70bddaef6b499d2c5d0776f5cb7ef107 (patch)
treea03ed0cba3723781d1f99799441487fe5bfc559a
parentba7597ebf91cd2fed363e7a9733522c8307d5264 (diff)
parentf341a20e58a96c6ceea0feec8b9eba6ff503f063 (diff)
downloaddnspython-4cf53b0d70bddaef6b499d2c5d0776f5cb7ef107.tar.gz
Merge pull request #26 from mmmucky/master
Remove DNSKEY keytag uniqueness assumption (RFC 4034, section 8)
-rw-r--r--dns/dnssec.py194
-rw-r--r--tests/dnssec.py10
2 files changed, 108 insertions, 96 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py
index dd6a27a..4f6fc98 100644
--- a/dns/dnssec.py
+++ b/dns/dnssec.py
@@ -126,7 +126,8 @@ def make_ds(name, key, algorithm, origin=None):
return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
len(dsrdata))
-def _find_key(keys, rrsig):
+def _find_candidate_keys(keys, rrsig):
+ candidate_keys=[]
value = keys.get(rrsig.signer)
if value is None:
return None
@@ -141,8 +142,8 @@ def _find_key(keys, rrsig):
for rdata in rdataset:
if rdata.algorithm == rrsig.algorithm and \
key_id(rdata) == rrsig.key_tag:
- return rdata
- return None
+ candidate_keys.append(rdata)
+ return candidate_keys
def _is_rsa(algorithm):
return algorithm in (RSAMD5, RSASHA1,
@@ -217,100 +218,101 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
if isinstance(origin, (str, unicode)):
origin = dns.name.from_text(origin, dns.name.root)
- key = _find_key(keys, rrsig)
- if not key:
- raise ValidationFailure, 'unknown key'
-
- # For convenience, allow the rrset to be specified as a (name, rdataset)
- # tuple as well as a proper rrset
- if isinstance(rrset, tuple):
- rrname = rrset[0]
- rdataset = rrset[1]
- else:
- rrname = rrset.name
- rdataset = rrset
-
- if now is None:
- now = time.time()
- if rrsig.expiration < now:
- raise ValidationFailure, 'expired'
- if rrsig.inception > now:
- raise ValidationFailure, 'not yet valid'
-
- hash = _make_hash(rrsig.algorithm)
-
- if _is_rsa(rrsig.algorithm):
- keyptr = key.key
- (bytes,) = struct.unpack('!B', keyptr[0:1])
- keyptr = keyptr[1:]
- if bytes == 0:
- (bytes,) = struct.unpack('!H', keyptr[0:2])
- keyptr = keyptr[2:]
- rsa_e = keyptr[0:bytes]
- rsa_n = keyptr[bytes:]
- keylen = len(rsa_n) * 8
- pubkey = Crypto.PublicKey.RSA.construct(
- (Crypto.Util.number.bytes_to_long(rsa_n),
- Crypto.Util.number.bytes_to_long(rsa_e)))
- sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
- elif _is_dsa(rrsig.algorithm):
- keyptr = key.key
- (t,) = struct.unpack('!B', keyptr[0:1])
- keyptr = keyptr[1:]
- octets = 64 + t * 8
- dsa_q = keyptr[0:20]
- keyptr = keyptr[20:]
- dsa_p = keyptr[0:octets]
- keyptr = keyptr[octets:]
- dsa_g = keyptr[0:octets]
- keyptr = keyptr[octets:]
- dsa_y = keyptr[0:octets]
- pubkey = Crypto.PublicKey.DSA.construct(
- (Crypto.Util.number.bytes_to_long(dsa_y),
- Crypto.Util.number.bytes_to_long(dsa_g),
- Crypto.Util.number.bytes_to_long(dsa_p),
- Crypto.Util.number.bytes_to_long(dsa_q)))
- (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
- sig = (Crypto.Util.number.bytes_to_long(dsa_r),
- Crypto.Util.number.bytes_to_long(dsa_s))
- else:
- raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
-
- hash.update(_to_rdata(rrsig, origin)[:18])
- hash.update(rrsig.signer.to_digestable(origin))
-
- if rrsig.labels < len(rrname) - 1:
- suffix = rrname.split(rrsig.labels + 1)[1]
- rrname = dns.name.from_text('*', suffix)
- rrnamebuf = rrname.to_digestable(origin)
- rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
- rrsig.original_ttl)
- rrlist = sorted(rdataset);
- for rr in rrlist:
- hash.update(rrnamebuf)
- hash.update(rrfixed)
- rrdata = rr.to_digestable(origin)
- rrlen = struct.pack('!H', len(rrdata))
- hash.update(rrlen)
- hash.update(rrdata)
-
- digest = hash.digest()
-
- if _is_rsa(rrsig.algorithm):
- # PKCS1 algorithm identifier goop
- digest = _make_algorithm_id(rrsig.algorithm) + digest
- padlen = keylen // 8 - len(digest) - 3
- digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
- elif _is_dsa(rrsig.algorithm):
- pass
- else:
- # Raise here for code clarity; this won't actually ever happen
- # since if the algorithm is really unknown we'd already have
- # raised an exception above
- raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
+ for candidate_key in _find_candidate_keys(keys, rrsig):
+ if not candidate_key:
+ raise ValidationFailure, 'unknown key'
+
+ # For convenience, allow the rrset to be specified as a (name, rdataset)
+ # tuple as well as a proper rrset
+ if isinstance(rrset, tuple):
+ rrname = rrset[0]
+ rdataset = rrset[1]
+ else:
+ rrname = rrset.name
+ rdataset = rrset
+
+ if now is None:
+ now = time.time()
+ if rrsig.expiration < now:
+ raise ValidationFailure, 'expired'
+ if rrsig.inception > now:
+ raise ValidationFailure, 'not yet valid'
+
+ hash = _make_hash(rrsig.algorithm)
+
+ if _is_rsa(rrsig.algorithm):
+ keyptr = candidate_key.key
+ (bytes,) = struct.unpack('!B', keyptr[0:1])
+ keyptr = keyptr[1:]
+ if bytes == 0:
+ (bytes,) = struct.unpack('!H', keyptr[0:2])
+ keyptr = keyptr[2:]
+ rsa_e = keyptr[0:bytes]
+ rsa_n = keyptr[bytes:]
+ keylen = len(rsa_n) * 8
+ pubkey = Crypto.PublicKey.RSA.construct(
+ (Crypto.Util.number.bytes_to_long(rsa_n),
+ Crypto.Util.number.bytes_to_long(rsa_e)))
+ sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
+ elif _is_dsa(rrsig.algorithm):
+ keyptr = candidate_key.key
+ (t,) = struct.unpack('!B', keyptr[0:1])
+ keyptr = keyptr[1:]
+ octets = 64 + t * 8
+ dsa_q = keyptr[0:20]
+ keyptr = keyptr[20:]
+ dsa_p = keyptr[0:octets]
+ keyptr = keyptr[octets:]
+ dsa_g = keyptr[0:octets]
+ keyptr = keyptr[octets:]
+ dsa_y = keyptr[0:octets]
+ pubkey = Crypto.PublicKey.DSA.construct(
+ (Crypto.Util.number.bytes_to_long(dsa_y),
+ Crypto.Util.number.bytes_to_long(dsa_g),
+ Crypto.Util.number.bytes_to_long(dsa_p),
+ Crypto.Util.number.bytes_to_long(dsa_q)))
+ (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
+ sig = (Crypto.Util.number.bytes_to_long(dsa_r),
+ Crypto.Util.number.bytes_to_long(dsa_s))
+ else:
+ raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
+
+ hash.update(_to_rdata(rrsig, origin)[:18])
+ hash.update(rrsig.signer.to_digestable(origin))
+
+ if rrsig.labels < len(rrname) - 1:
+ suffix = rrname.split(rrsig.labels + 1)[1]
+ rrname = dns.name.from_text('*', suffix)
+ rrnamebuf = rrname.to_digestable(origin)
+ rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
+ rrsig.original_ttl)
+ rrlist = sorted(rdataset);
+ for rr in rrlist:
+ hash.update(rrnamebuf)
+ hash.update(rrfixed)
+ rrdata = rr.to_digestable(origin)
+ rrlen = struct.pack('!H', len(rrdata))
+ hash.update(rrlen)
+ hash.update(rrdata)
+
+ digest = hash.digest()
+
+ if _is_rsa(rrsig.algorithm):
+ # PKCS1 algorithm identifier goop
+ digest = _make_algorithm_id(rrsig.algorithm) + digest
+ padlen = keylen // 8 - len(digest) - 3
+ digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
+ elif _is_dsa(rrsig.algorithm):
+ pass
+ else:
+ # Raise here for code clarity; this won't actually ever happen
+ # since if the algorithm is really unknown we'd already have
+ # raised an exception above
+ raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
- if not pubkey.verify(digest, sig):
- raise ValidationFailure, 'verify failure'
+ if pubkey.verify(digest, sig):
+ return
+ raise ValidationFailure, 'verify failure'
def _validate(rrset, rrsigset, keys, origin=None, now=None):
"""Validate an RRset
diff --git a/tests/dnssec.py b/tests/dnssec.py
index 7b4546a..b997b17 100644
--- a/tests/dnssec.py
+++ b/tests/dnssec.py
@@ -30,6 +30,13 @@ abs_keys = { abs_dnspython_org :
'256 3 5 AwEAAdSSghOGjU33IQZgwZM2Hh771VGXX05olJK49FxpSyuEAjDBXY58 LGU9R2Zgeecnk/b9EAhFu/vCV9oECtiTCvwuVAkt9YEweqYDluQInmgP NGMJCKdSLlnX93DkjDw8rMYv5dqXCuSGPlKChfTJOLQxIAxGloS7lL+c 0CTZydAF')
}
+abs_keys_duplicate_keytag = { abs_dnspython_org :
+ dns.rrset.from_text('dnspython.org.', 3600, 'IN', 'DNSKEY',
+ '257 3 5 AwEAAenVTr9L1OMlL1/N2ta0Qj9LLLnnmFWIr1dJoAsWM9BQfsbV7kFZ XbAkER/FY9Ji2o7cELxBwAsVBuWn6IUUAJXLH74YbC1anY0lifjgt29z SwDzuB7zmC7yVYZzUunBulVW4zT0tg1aePbpVL2EtTL8VzREqbJbE25R KuQYHZtFwG8S4iBxJUmT2Bbd0921LLxSQgVoFXlQx/gFV2+UERXcJ5ce iX6A6wc02M/pdg/YbJd2rBa0MYL3/Fz/Xltre0tqsImZGxzi6YtYDs45 NC8gH+44egz82e2DATCVM1ICPmRDjXYTLldQiWA2ZXIWnK0iitl5ue24 7EsWJefrIhE=',
+ '256 3 5 AwEAAdSSg++++THIS/IS/NOT/THE/CORRECT/KEY++++++++++++++++ ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ AaOSydAF',
+ '256 3 5 AwEAAdSSghOGjU33IQZgwZM2Hh771VGXX05olJK49FxpSyuEAjDBXY58 LGU9R2Zgeecnk/b9EAhFu/vCV9oECtiTCvwuVAkt9YEweqYDluQInmgP NGMJCKdSLlnX93DkjDw8rMYv5dqXCuSGPlKChfTJOLQxIAxGloS7lL+c 0CTZydAF')
+ }
+
rel_keys = { dns.name.empty :
dns.rrset.from_text('@', 3600, 'IN', 'DNSKEY',
'257 3 5 AwEAAenVTr9L1OMlL1/N2ta0Qj9LLLnnmFWIr1dJoAsWM9BQfsbV7kFZ XbAkER/FY9Ji2o7cELxBwAsVBuWn6IUUAJXLH74YbC1anY0lifjgt29z SwDzuB7zmC7yVYZzUunBulVW4zT0tg1aePbpVL2EtTL8VzREqbJbE25R KuQYHZtFwG8S4iBxJUmT2Bbd0921LLxSQgVoFXlQx/gFV2+UERXcJ5ce iX6A6wc02M/pdg/YbJd2rBa0MYL3/Fz/Xltre0tqsImZGxzi6YtYDs45 NC8gH+44egz82e2DATCVM1ICPmRDjXYTLldQiWA2ZXIWnK0iitl5ue24 7EsWJefrIhE=',
@@ -95,6 +102,9 @@ class DNSSECValidatorTestCase(unittest.TestCase):
def testAbsoluteRSAGood(self):
dns.dnssec.validate(abs_soa, abs_soa_rrsig, abs_keys, None, when)
+ def testDuplicateKeytag(self):
+ dns.dnssec.validate(abs_soa, abs_soa_rrsig, abs_keys_duplicate_keytag, None, when)
+
def testAbsoluteRSABad(self):
def bad():
dns.dnssec.validate(abs_other_soa, abs_soa_rrsig, abs_keys, None,