summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2021-07-13 09:09:36 -0700
committerGitHub <noreply@github.com>2021-07-13 09:09:36 -0700
commit2ad1d9e5583c18900bea778f90cd7eb747abe599 (patch)
treec9fec3d4c3c8ae8c720da0e4397631cde0d394ca
parent275c6cb2303c7969b9ee5c6aaf0bd09866b31cc7 (diff)
parent34342b0ae673cc3e8698e10ec92fe849b6b20ef6 (diff)
downloaddnspython-2ad1d9e5583c18900bea778f90cd7eb747abe599.tar.gz
Merge pull request #673 from rthalley/zone_for_name_additions
zone_for_name() improvements
-rw-r--r--dns/resolver.py51
-rw-r--r--tests/test_resolver.py75
2 files changed, 120 insertions, 6 deletions
diff --git a/dns/resolver.py b/dns/resolver.py
index 10b6ca5..6a9974d 100644
--- a/dns/resolver.py
+++ b/dns/resolver.py
@@ -170,6 +170,9 @@ class NoAnswer(dns.exception.DNSException):
def _fmt_kwargs(self, **kwargs):
return super()._fmt_kwargs(query=kwargs['response'].question)
+ def response(self):
+ return self.kwargs['response']
+
class NoNameservers(dns.exception.DNSException):
"""All nameservers failed to answer the query.
@@ -1368,7 +1371,8 @@ def canonical_name(name):
return get_default_resolver().canonical_name(name)
-def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
+def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None,
+ lifetime=None):
"""Find the name of the zone which contains the specified name.
*name*, an absolute ``dns.name.Name`` or ``str``, the query name.
@@ -1378,12 +1382,19 @@ def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
*tcp*, a ``bool``. If ``True``, use TCP to make the query.
*resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use.
- If ``None``, the default resolver is used.
+ If ``None``, the default, then the default resolver is used.
+
+ *lifetime*, a ``float``, the total time to allow for the queries needed
+ to determine the zone. If ``None``, the default, then only the individual
+ query limits of the resolver apply.
Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS
root. (This is only likely to happen if you're using non-default
root servers in your network and they are misconfigured.)
+ Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be
+ found in the alotted lifetime.
+
Returns a ``dns.name.Name``.
"""
@@ -1393,14 +1404,44 @@ def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
resolver = get_default_resolver()
if not name.is_absolute():
raise NotAbsolute(name)
+ start = time.time()
+ if lifetime is not None:
+ expiration = start + lifetime
+ else:
+ expiration = None
while 1:
try:
- answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp)
+ if expiration:
+ rlifetime = expiration - time.time()
+ if rlifetime <= 0:
+ rlifetime = 0
+ else:
+ rlifetime = None
+ answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp,
+ lifetime=rlifetime)
if answer.rrset.name == name:
return name
# otherwise we were CNAMEd or DNAMEd and need to look higher
- except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
- pass
+ except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
+ if isinstance(e, dns.resolver.NXDOMAIN):
+ response = e.responses().get(name)
+ else:
+ response = e.response()
+ if response:
+ for rrs in response.authority:
+ if rrs.rdtype == dns.rdatatype.SOA and \
+ rrs.rdclass == rdclass:
+ (nr, _, _) = rrs.name.fullcompare(name)
+ if nr == dns.name.NAMERELN_SUPERDOMAIN:
+ # We're doing a proper superdomain check as
+ # if the name were equal we ought to have gotten
+ # it in the answer section! We are ignoring the
+ # possibility that the authority is insane and
+ # is including multiple SOA RRs for different
+ # authorities.
+ return rrs.name
+ # we couldn't extract anything useful from the response (e.g. it's
+ # a type 3 NXDOMAIN)
try:
name = name.parent()
except dns.name.NoParent:
diff --git a/tests/test_resolver.py b/tests/test_resolver.py
index d151485..b2a47d2 100644
--- a/tests/test_resolver.py
+++ b/tests/test_resolver.py
@@ -947,9 +947,41 @@ class AlwaysType3NXDOMAINNanoNameserver(Server):
response.flags |= dns.flags.RA
return response
+
+class AlwaysNXDOMAINNanoNameserver(Server):
+
+ def handle(self, request):
+ response = dns.message.make_response(request.message)
+ response.set_rcode(dns.rcode.NXDOMAIN)
+ response.flags |= dns.flags.RA
+ origin = dns.name.from_text('example.')
+ soa_rrset = response.find_rrset(response.authority, origin,
+ dns.rdataclass.IN, dns.rdatatype.SOA,
+ create=True)
+ rdata = dns.rdata.from_text('IN', 'SOA',
+ 'ns.example. root.example. 1 2 3 4 5')
+ soa_rrset.add(rdata)
+ soa_rrset.update_ttl(300)
+ return response
+
+class AlwaysNoErrorNoDataNanoNameserver(Server):
+
+ def handle(self, request):
+ response = dns.message.make_response(request.message)
+ response.set_rcode(dns.rcode.NOERROR)
+ response.flags |= dns.flags.RA
+ origin = dns.name.from_text('example.')
+ soa_rrset = response.find_rrset(response.authority, origin,
+ dns.rdataclass.IN, dns.rdatatype.SOA,
+ create=True)
+ rdata = dns.rdata.from_text('IN', 'SOA',
+ 'ns.example. root.example. 1 2 3 4 5')
+ soa_rrset.add(rdata)
+ soa_rrset.update_ttl(300)
+ return response
@unittest.skipIf(not (_network_available and _nanonameserver_available),
"Internet and NanoAuth required")
-class ZoneForNameNoParentTest(unittest.TestCase):
+class ZoneForNameTests(unittest.TestCase):
def testNoRootSOA(self):
with AlwaysType3NXDOMAINNanoNameserver() as na:
@@ -959,6 +991,25 @@ class ZoneForNameNoParentTest(unittest.TestCase):
with self.assertRaises(dns.resolver.NoRootSOA):
dns.resolver.zone_for_name('www.foo.bar.', resolver=res)
+ def testHelpfulNXDOMAIN(self):
+ with AlwaysNXDOMAINNanoNameserver() as na:
+ res = dns.resolver.Resolver(configure=False)
+ res.port = na.udp_address[1]
+ res.nameservers = [na.udp_address[0]]
+ expected = dns.name.from_text('example.')
+ name = dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.',
+ resolver=res)
+ self.assertEqual(name, expected)
+
+ def testHelpfulNoErrorNoData(self):
+ with AlwaysNoErrorNoDataNanoNameserver() as na:
+ res = dns.resolver.Resolver(configure=False)
+ res.port = na.udp_address[1]
+ res.nameservers = [na.udp_address[0]]
+ expected = dns.name.from_text('example.')
+ name = dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.',
+ resolver=res)
+ self.assertEqual(name, expected)
class DroppingNanoNameserver(Server):
@@ -1019,3 +1070,25 @@ def testResolverNoNameservers():
assert not error[1] # not TCP
assert error[2] == na.udp_address[1] # port
assert error[3] == 'FORMERR'
+
+
+class SlowAlwaysType3NXDOMAINNanoNameserver(Server):
+
+ def handle(self, request):
+ response = dns.message.make_response(request.message)
+ response.set_rcode(dns.rcode.NXDOMAIN)
+ response.flags |= dns.flags.RA
+ time.sleep(0.2)
+ return response
+
+
+@pytest.mark.skipif(not (_network_available and _nanonameserver_available),
+ reason="Internet and NanoAuth required")
+def testZoneForNameLifetimeTimeout():
+ with SlowAlwaysType3NXDOMAINNanoNameserver() as na:
+ res = dns.resolver.Resolver(configure=False)
+ res.port = na.udp_address[1]
+ res.nameservers = [na.udp_address[0]]
+ with pytest.raises(dns.resolver.LifetimeTimeout):
+ dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.',
+ resolver=res, lifetime=1.0)