summaryrefslogtreecommitdiff
path: root/tests/test_query.py
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2020-06-15 17:20:00 -0700
committerBob Halley <halley@dnspython.org>2020-06-15 17:20:00 -0700
commit12e607a9878607df0590ab231d5e81040c1568c5 (patch)
treea1ab920dc5e4b9b2650f188b4b86873e52e60081 /tests/test_query.py
parenta867ef1c8af00734138c8e72321782ef3a798013 (diff)
downloaddnspython-12e607a9878607df0590ab231d5e81040c1568c5.tar.gz
test IPv4 and IPv6 according to availability
Diffstat (limited to 'tests/test_query.py')
-rw-r--r--tests/test_query.py162
1 files changed, 96 insertions, 66 deletions
diff --git a/tests/test_query.py b/tests/test_query.py
index 90fee03..df52dfe 100644
--- a/tests/test_query.py
+++ b/tests/test_query.py
@@ -25,6 +25,7 @@ except Exception:
have_ssl = False
import dns.exception
+import dns.inet
import dns.message
import dns.name
import dns.rdataclass
@@ -51,26 +52,27 @@ except ImportError:
class Server(object):
pass
+# Probe for IPv4 and IPv6
+query_addresses = []
+for (af, address) in ((socket.AF_INET, '8.8.8.8'),
+ (socket.AF_INET6, '2001:4860:4860::8888')):
+ try:
+ with socket.socket(af, socket.SOCK_DGRAM) as s:
+ # Connecting a UDP socket is supposed to return ENETUNREACH if
+ # no route to the network is present.
+ s.connect((address, 53))
+ query_addresses.append(address)
+ except Exception:
+ pass
+
@unittest.skipIf(not _network_available, "Internet not reachable")
class QueryTests(unittest.TestCase):
def testQueryUDP(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.udp(q, '8.8.8.8')
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
-
- def testQueryUDPWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
- s.setblocking(0)
+ for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.udp(q, '8.8.8.8', sock=s)
+ response = dns.query.udp(q, address)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -78,24 +80,26 @@ class QueryTests(unittest.TestCase):
self.assertTrue('8.8.8.8' in seen)
self.assertTrue('8.8.4.4' in seen)
- def testQueryTCP(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tcp(q, '8.8.8.8')
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ def testQueryUDPWithSocket(self):
+ for address in query_addresses:
+ with socket.socket(dns.inet.af_for_address(address),
+ socket.SOCK_DGRAM) as s:
+ s.setblocking(0)
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ response = dns.query.udp(q, address, sock=s)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
- def testQueryTCPWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.connect(('8.8.8.8', 53))
- s.setblocking(0)
+ def testQueryTCP(self):
+ for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tcp(q, None, sock=s)
+ response = dns.query.tcp(q, address)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -103,27 +107,16 @@ class QueryTests(unittest.TestCase):
self.assertTrue('8.8.8.8' in seen)
self.assertTrue('8.8.4.4' in seen)
- def testQueryTLS(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tls(q, '8.8.8.8')
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
-
- @unittest.skipUnless(have_ssl, "No SSL support")
- def testQueryTLSWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as base_s:
- base_s.connect(('8.8.8.8', 853))
- ctx = ssl.create_default_context()
- with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
+ def testQueryTCPWithSocket(self):
+ for address in query_addresses:
+ with socket.socket(dns.inet.af_for_address(address),
+ socket.SOCK_STREAM) as s:
+ ll = dns.inet.low_level_address_tuple((address, 53))
+ s.connect(ll)
s.setblocking(0)
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tls(q, None, sock=s)
+ response = dns.query.tcp(q, None, sock=s)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -131,30 +124,67 @@ class QueryTests(unittest.TestCase):
self.assertTrue('8.8.8.8' in seen)
self.assertTrue('8.8.4.4' in seen)
+ def testQueryTLS(self):
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ response = dns.query.tls(q, address)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
+
+ @unittest.skipUnless(have_ssl, "No SSL support")
+ def testQueryTLSWithSocket(self):
+ for address in query_addresses:
+ with socket.socket(dns.inet.af_for_address(address),
+ socket.SOCK_STREAM) as base_s:
+ ll = dns.inet.low_level_address_tuple((address, 853))
+ base_s.connect(ll)
+ ctx = ssl.create_default_context()
+ with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
+ s.setblocking(0)
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ response = dns.query.tls(q, None, sock=s)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
+
def testQueryUDPFallback(self):
- qname = dns.name.from_text('.')
- q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8')
- self.assertTrue(tcp)
+ for address in query_addresses:
+ qname = dns.name.from_text('.')
+ q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+ (_, tcp) = dns.query.udp_with_fallback(q, address)
+ self.assertTrue(tcp)
def testQueryUDPFallbackWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_s:
- udp_s.setblocking(0)
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_s:
- tcp_s.connect(('8.8.8.8', 53))
- tcp_s.setblocking(0)
- qname = dns.name.from_text('.')
- q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8',
- udp_sock=udp_s,
- tcp_sock=tcp_s)
- self.assertTrue(tcp)
+ for address in query_addresses:
+ af = dns.inet.af_for_address(address)
+ with socket.socket(af, socket.SOCK_DGRAM) as udp_s:
+ udp_s.setblocking(0)
+ with socket.socket(af, socket.SOCK_STREAM) as tcp_s:
+ ll = dns.inet.low_level_address_tuple((address, 53))
+ tcp_s.connect(ll)
+ tcp_s.setblocking(0)
+ qname = dns.name.from_text('.')
+ q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+ (_, tcp) = dns.query.udp_with_fallback(q, address,
+ udp_sock=udp_s,
+ tcp_sock=tcp_s)
+ self.assertTrue(tcp)
def testQueryUDPFallbackNoFallback(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8')
- self.assertFalse(tcp)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ (_, tcp) = dns.query.udp_with_fallback(q, address)
+ self.assertFalse(tcp)
axfr_zone = '''