diff options
author | Bob Halley <halley@dnspython.org> | 2021-07-11 14:23:48 -0700 |
---|---|---|
committer | Bob Halley <halley@dnspython.org> | 2021-07-11 14:23:48 -0700 |
commit | 34342b0ae673cc3e8698e10ec92fe849b6b20ef6 (patch) | |
tree | 942fc3ceda96568711309f5a25ffda370d6f35db | |
parent | be479d0f89a0f3fa1e9a4ced7c4a756b346f4075 (diff) | |
download | dnspython-34342b0ae673cc3e8698e10ec92fe849b6b20ef6.tar.gz |
If a negative response has an SOA in the authority section, then
zone_for_name() will now use it to make the search more efficient.
zone_for_name() now has an optional lifetime parameter which limits the
total time that can be spent resolving.
-rw-r--r-- | dns/resolver.py | 51 | ||||
-rw-r--r-- | tests/test_resolver.py | 75 |
2 files changed, 120 insertions, 6 deletions
diff --git a/dns/resolver.py b/dns/resolver.py index 10b6ca5..6a9974d 100644 --- a/dns/resolver.py +++ b/dns/resolver.py @@ -170,6 +170,9 @@ class NoAnswer(dns.exception.DNSException): def _fmt_kwargs(self, **kwargs): return super()._fmt_kwargs(query=kwargs['response'].question) + def response(self): + return self.kwargs['response'] + class NoNameservers(dns.exception.DNSException): """All nameservers failed to answer the query. @@ -1368,7 +1371,8 @@ def canonical_name(name): return get_default_resolver().canonical_name(name) -def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None): +def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None, + lifetime=None): """Find the name of the zone which contains the specified name. *name*, an absolute ``dns.name.Name`` or ``str``, the query name. @@ -1378,12 +1382,19 @@ def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None): *tcp*, a ``bool``. If ``True``, use TCP to make the query. *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. - If ``None``, the default resolver is used. + If ``None``, the default, then the default resolver is used. + + *lifetime*, a ``float``, the total time to allow for the queries needed + to determine the zone. If ``None``, the default, then only the individual + query limits of the resolver apply. Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS root. (This is only likely to happen if you're using non-default root servers in your network and they are misconfigured.) + Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be + found in the alotted lifetime. + Returns a ``dns.name.Name``. """ @@ -1393,14 +1404,44 @@ def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None): resolver = get_default_resolver() if not name.is_absolute(): raise NotAbsolute(name) + start = time.time() + if lifetime is not None: + expiration = start + lifetime + else: + expiration = None while 1: try: - answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp) + if expiration: + rlifetime = expiration - time.time() + if rlifetime <= 0: + rlifetime = 0 + else: + rlifetime = None + answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp, + lifetime=rlifetime) if answer.rrset.name == name: return name # otherwise we were CNAMEd or DNAMEd and need to look higher - except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): - pass + except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e: + if isinstance(e, dns.resolver.NXDOMAIN): + response = e.responses().get(name) + else: + response = e.response() + if response: + for rrs in response.authority: + if rrs.rdtype == dns.rdatatype.SOA and \ + rrs.rdclass == rdclass: + (nr, _, _) = rrs.name.fullcompare(name) + if nr == dns.name.NAMERELN_SUPERDOMAIN: + # We're doing a proper superdomain check as + # if the name were equal we ought to have gotten + # it in the answer section! We are ignoring the + # possibility that the authority is insane and + # is including multiple SOA RRs for different + # authorities. + return rrs.name + # we couldn't extract anything useful from the response (e.g. it's + # a type 3 NXDOMAIN) try: name = name.parent() except dns.name.NoParent: diff --git a/tests/test_resolver.py b/tests/test_resolver.py index d151485..b2a47d2 100644 --- a/tests/test_resolver.py +++ b/tests/test_resolver.py @@ -947,9 +947,41 @@ class AlwaysType3NXDOMAINNanoNameserver(Server): response.flags |= dns.flags.RA return response + +class AlwaysNXDOMAINNanoNameserver(Server): + + def handle(self, request): + response = dns.message.make_response(request.message) + response.set_rcode(dns.rcode.NXDOMAIN) + response.flags |= dns.flags.RA + origin = dns.name.from_text('example.') + soa_rrset = response.find_rrset(response.authority, origin, + dns.rdataclass.IN, dns.rdatatype.SOA, + create=True) + rdata = dns.rdata.from_text('IN', 'SOA', + 'ns.example. root.example. 1 2 3 4 5') + soa_rrset.add(rdata) + soa_rrset.update_ttl(300) + return response + +class AlwaysNoErrorNoDataNanoNameserver(Server): + + def handle(self, request): + response = dns.message.make_response(request.message) + response.set_rcode(dns.rcode.NOERROR) + response.flags |= dns.flags.RA + origin = dns.name.from_text('example.') + soa_rrset = response.find_rrset(response.authority, origin, + dns.rdataclass.IN, dns.rdatatype.SOA, + create=True) + rdata = dns.rdata.from_text('IN', 'SOA', + 'ns.example. root.example. 1 2 3 4 5') + soa_rrset.add(rdata) + soa_rrset.update_ttl(300) + return response @unittest.skipIf(not (_network_available and _nanonameserver_available), "Internet and NanoAuth required") -class ZoneForNameNoParentTest(unittest.TestCase): +class ZoneForNameTests(unittest.TestCase): def testNoRootSOA(self): with AlwaysType3NXDOMAINNanoNameserver() as na: @@ -959,6 +991,25 @@ class ZoneForNameNoParentTest(unittest.TestCase): with self.assertRaises(dns.resolver.NoRootSOA): dns.resolver.zone_for_name('www.foo.bar.', resolver=res) + def testHelpfulNXDOMAIN(self): + with AlwaysNXDOMAINNanoNameserver() as na: + res = dns.resolver.Resolver(configure=False) + res.port = na.udp_address[1] + res.nameservers = [na.udp_address[0]] + expected = dns.name.from_text('example.') + name = dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.', + resolver=res) + self.assertEqual(name, expected) + + def testHelpfulNoErrorNoData(self): + with AlwaysNoErrorNoDataNanoNameserver() as na: + res = dns.resolver.Resolver(configure=False) + res.port = na.udp_address[1] + res.nameservers = [na.udp_address[0]] + expected = dns.name.from_text('example.') + name = dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.', + resolver=res) + self.assertEqual(name, expected) class DroppingNanoNameserver(Server): @@ -1019,3 +1070,25 @@ def testResolverNoNameservers(): assert not error[1] # not TCP assert error[2] == na.udp_address[1] # port assert error[3] == 'FORMERR' + + +class SlowAlwaysType3NXDOMAINNanoNameserver(Server): + + def handle(self, request): + response = dns.message.make_response(request.message) + response.set_rcode(dns.rcode.NXDOMAIN) + response.flags |= dns.flags.RA + time.sleep(0.2) + return response + + +@pytest.mark.skipif(not (_network_available and _nanonameserver_available), + reason="Internet and NanoAuth required") +def testZoneForNameLifetimeTimeout(): + with SlowAlwaysType3NXDOMAINNanoNameserver() as na: + res = dns.resolver.Resolver(configure=False) + res.port = na.udp_address[1] + res.nameservers = [na.udp_address[0]] + with pytest.raises(dns.resolver.LifetimeTimeout): + dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.', + resolver=res, lifetime=1.0) |