diff options
| author | Bob Halley <halley@dnspython.org> | 2020-07-21 07:32:27 -0700 |
|---|---|---|
| committer | Bob Halley <halley@dnspython.org> | 2020-07-21 07:32:27 -0700 |
| commit | 8c63cfd484a70097312acc368f7b10d80cd5ce87 (patch) | |
| tree | 590628fb9288e508c94c481666eb11e1a26730f9 /tests/test_resolution.py | |
| parent | 00a22ad03ffbcc45c184cbb99bf0aa17e4fc3902 (diff) | |
| download | dnspython-8c63cfd484a70097312acc368f7b10d80cd5ce87.tar.gz | |
unify chaining code
Diffstat (limited to 'tests/test_resolution.py')
| -rw-r--r-- | tests/test_resolution.py | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/tests/test_resolution.py b/tests/test_resolution.py index 9145f16..db42d46 100644 --- a/tests/test_resolution.py +++ b/tests/test_resolution.py @@ -83,6 +83,22 @@ class ResolutionTestCase(unittest.TestCase): r.set_rcode(dns.rcode.NXDOMAIN) return r + def make_long_chain_response(self, q, count): + r = dns.message.make_response(q) + name = self.qname + for i in range(count): + rrs = r.get_rrset(r.answer, name, dns.rdataclass.IN, + dns.rdatatype.CNAME, create=True) + tname = dns.name.from_text(f'target{i}.') + rrs.add(dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.CNAME, + str(tname)), 300) + name = tname + rrs = r.get_rrset(r.answer, name, dns.rdataclass.IN, + dns.rdatatype.A, create=True) + rrs.add(dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.A, + '10.0.0.1'), 300) + return r + def test_next_request_cache_hit(self): self.resolver.cache = dns.resolver.Cache() q = dns.message.make_query(self.qname, dns.rdatatype.A) @@ -353,6 +369,36 @@ class ResolutionTestCase(unittest.TestCase): self.assertTrue(answer is None) self.assertTrue(done) + def test_query_result_nxdomain_but_has_answer(self): + q = dns.message.make_query(self.qname, dns.rdatatype.A) + r = self.make_address_response(q) + r.set_rcode(dns.rcode.NXDOMAIN) + (_, _) = self.resn.next_request() + (nameserver, _, _, _) = self.resn.next_nameserver() + (answer, done) = self.resn.query_result(r, None) + self.assertIsNone(answer) + self.assertFalse(done) + self.assertTrue(nameserver not in self.resn.nameservers) + + def test_query_result_chain_not_too_long(self): + q = dns.message.make_query(self.qname, dns.rdatatype.A) + r = self.make_long_chain_response(q, 15) + (_, _) = self.resn.next_request() + (_, _, _, _) = self.resn.next_nameserver() + (answer, done) = self.resn.query_result(r, None) + self.assertIsNotNone(answer) + self.assertTrue(done) + + def test_query_result_chain_too_long(self): + q = dns.message.make_query(self.qname, dns.rdatatype.A) + r = self.make_long_chain_response(q, 16) + (_, _) = self.resn.next_request() + (nameserver, _, _, _) = self.resn.next_nameserver() + (answer, done) = self.resn.query_result(r, None) + self.assertIsNone(answer) + self.assertFalse(done) + self.assertTrue(nameserver not in self.resn.nameservers) + def test_query_result_nxdomain_cached(self): self.resolver.cache = dns.resolver.Cache() q = dns.message.make_query(self.qname, dns.rdatatype.A) |
