diff options
| author | Bob Halley <halley@dnspython.org> | 2020-06-15 17:20:00 -0700 |
|---|---|---|
| committer | Bob Halley <halley@dnspython.org> | 2020-06-15 17:20:00 -0700 |
| commit | 12e607a9878607df0590ab231d5e81040c1568c5 (patch) | |
| tree | a1ab920dc5e4b9b2650f188b4b86873e52e60081 /tests/test_query.py | |
| parent | a867ef1c8af00734138c8e72321782ef3a798013 (diff) | |
| download | dnspython-12e607a9878607df0590ab231d5e81040c1568c5.tar.gz | |
test IPv4 and IPv6 according to availability
Diffstat (limited to 'tests/test_query.py')
| -rw-r--r-- | tests/test_query.py | 162 |
1 files changed, 96 insertions, 66 deletions
diff --git a/tests/test_query.py b/tests/test_query.py index 90fee03..df52dfe 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -25,6 +25,7 @@ except Exception: have_ssl = False import dns.exception +import dns.inet import dns.message import dns.name import dns.rdataclass @@ -51,26 +52,27 @@ except ImportError: class Server(object): pass +# Probe for IPv4 and IPv6 +query_addresses = [] +for (af, address) in ((socket.AF_INET, '8.8.8.8'), + (socket.AF_INET6, '2001:4860:4860::8888')): + try: + with socket.socket(af, socket.SOCK_DGRAM) as s: + # Connecting a UDP socket is supposed to return ENETUNREACH if + # no route to the network is present. + s.connect((address, 53)) + query_addresses.append(address) + except Exception: + pass + @unittest.skipIf(not _network_available, "Internet not reachable") class QueryTests(unittest.TestCase): def testQueryUDP(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.udp(q, '8.8.8.8') - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) - - def testQueryUDPWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.setblocking(0) + for address in query_addresses: qname = dns.name.from_text('dns.google.') q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.udp(q, '8.8.8.8', sock=s) + response = dns.query.udp(q, address) rrs = response.get_rrset(response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A) self.assertTrue(rrs is not None) @@ -78,24 +80,26 @@ class QueryTests(unittest.TestCase): self.assertTrue('8.8.8.8' in seen) self.assertTrue('8.8.4.4' in seen) - def testQueryTCP(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tcp(q, '8.8.8.8') - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + def testQueryUDPWithSocket(self): + for address in query_addresses: + with socket.socket(dns.inet.af_for_address(address), + socket.SOCK_DGRAM) as s: + s.setblocking(0) + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + response = dns.query.udp(q, address, sock=s) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) - def testQueryTCPWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect(('8.8.8.8', 53)) - s.setblocking(0) + def testQueryTCP(self): + for address in query_addresses: qname = dns.name.from_text('dns.google.') q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tcp(q, None, sock=s) + response = dns.query.tcp(q, address) rrs = response.get_rrset(response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A) self.assertTrue(rrs is not None) @@ -103,27 +107,16 @@ class QueryTests(unittest.TestCase): self.assertTrue('8.8.8.8' in seen) self.assertTrue('8.8.4.4' in seen) - def testQueryTLS(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tls(q, '8.8.8.8') - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) - - @unittest.skipUnless(have_ssl, "No SSL support") - def testQueryTLSWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as base_s: - base_s.connect(('8.8.8.8', 853)) - ctx = ssl.create_default_context() - with ctx.wrap_socket(base_s, server_hostname='dns.google') as s: + def testQueryTCPWithSocket(self): + for address in query_addresses: + with socket.socket(dns.inet.af_for_address(address), + socket.SOCK_STREAM) as s: + ll = dns.inet.low_level_address_tuple((address, 53)) + s.connect(ll) s.setblocking(0) qname = dns.name.from_text('dns.google.') q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tls(q, None, sock=s) + response = dns.query.tcp(q, None, sock=s) rrs = response.get_rrset(response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A) self.assertTrue(rrs is not None) @@ -131,30 +124,67 @@ class QueryTests(unittest.TestCase): self.assertTrue('8.8.8.8' in seen) self.assertTrue('8.8.4.4' in seen) + def testQueryTLS(self): + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + response = dns.query.tls(q, address) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) + + @unittest.skipUnless(have_ssl, "No SSL support") + def testQueryTLSWithSocket(self): + for address in query_addresses: + with socket.socket(dns.inet.af_for_address(address), + socket.SOCK_STREAM) as base_s: + ll = dns.inet.low_level_address_tuple((address, 853)) + base_s.connect(ll) + ctx = ssl.create_default_context() + with ctx.wrap_socket(base_s, server_hostname='dns.google') as s: + s.setblocking(0) + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + response = dns.query.tls(q, None, sock=s) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) + def testQueryUDPFallback(self): - qname = dns.name.from_text('.') - q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) - (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8') - self.assertTrue(tcp) + for address in query_addresses: + qname = dns.name.from_text('.') + q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) + (_, tcp) = dns.query.udp_with_fallback(q, address) + self.assertTrue(tcp) def testQueryUDPFallbackWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_s: - udp_s.setblocking(0) - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_s: - tcp_s.connect(('8.8.8.8', 53)) - tcp_s.setblocking(0) - qname = dns.name.from_text('.') - q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) - (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8', - udp_sock=udp_s, - tcp_sock=tcp_s) - self.assertTrue(tcp) + for address in query_addresses: + af = dns.inet.af_for_address(address) + with socket.socket(af, socket.SOCK_DGRAM) as udp_s: + udp_s.setblocking(0) + with socket.socket(af, socket.SOCK_STREAM) as tcp_s: + ll = dns.inet.low_level_address_tuple((address, 53)) + tcp_s.connect(ll) + tcp_s.setblocking(0) + qname = dns.name.from_text('.') + q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) + (_, tcp) = dns.query.udp_with_fallback(q, address, + udp_sock=udp_s, + tcp_sock=tcp_s) + self.assertTrue(tcp) def testQueryUDPFallbackNoFallback(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8') - self.assertFalse(tcp) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + (_, tcp) = dns.query.udp_with_fallback(q, address) + self.assertFalse(tcp) axfr_zone = ''' |
