summaryrefslogtreecommitdiff
path: root/tests/test_query.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_query.py')
-rw-r--r--tests/test_query.py26
1 files changed, 15 insertions, 11 deletions
diff --git a/tests/test_query.py b/tests/test_query.py
index 7a1ec71..8f2b65f 100644
--- a/tests/test_query.py
+++ b/tests/test_query.py
@@ -68,7 +68,7 @@ for (af, address) in ((socket.AF_INET, '8.8.8.8'),
except Exception:
pass
-keyring = dns.tsigkeyring.from_text({'name' : 'tDz6cfXXGtNivRpQ98hr6A=='})
+keyring = dns.tsigkeyring.from_text({'name': 'tDz6cfXXGtNivRpQ98hr6A=='})
@unittest.skipIf(not _network_available, "Internet not reachable")
class QueryTests(unittest.TestCase):
@@ -77,7 +77,7 @@ class QueryTests(unittest.TestCase):
for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.udp(q, address)
+ response = dns.query.udp(q, address, timeout=2)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -92,7 +92,7 @@ class QueryTests(unittest.TestCase):
s.setblocking(0)
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.udp(q, address, sock=s)
+ response = dns.query.udp(q, address, sock=s, timeout=2)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -104,7 +104,7 @@ class QueryTests(unittest.TestCase):
for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tcp(q, address)
+ response = dns.query.tcp(q, address, timeout=2)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -117,11 +117,12 @@ class QueryTests(unittest.TestCase):
with socket.socket(dns.inet.af_for_address(address),
socket.SOCK_STREAM) as s:
ll = dns.inet.low_level_address_tuple((address, 53))
+ s.settimeout(2)
s.connect(ll)
s.setblocking(0)
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tcp(q, None, sock=s)
+ response = dns.query.tcp(q, None, sock=s, timeout=2)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -133,7 +134,7 @@ class QueryTests(unittest.TestCase):
for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tls(q, address)
+ response = dns.query.tls(q, address, timeout=2)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -147,13 +148,14 @@ class QueryTests(unittest.TestCase):
with socket.socket(dns.inet.af_for_address(address),
socket.SOCK_STREAM) as base_s:
ll = dns.inet.low_level_address_tuple((address, 853))
+ base_s.settimeout(2)
base_s.connect(ll)
ctx = ssl.create_default_context()
with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
s.setblocking(0)
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tls(q, None, sock=s)
+ response = dns.query.tls(q, None, sock=s, timeout=2)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
@@ -165,7 +167,7 @@ class QueryTests(unittest.TestCase):
for address in query_addresses:
qname = dns.name.from_text('.')
q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(q, address)
+ (_, tcp) = dns.query.udp_with_fallback(q, address, timeout=2)
self.assertTrue(tcp)
def testQueryUDPFallbackWithSocket(self):
@@ -175,20 +177,22 @@ class QueryTests(unittest.TestCase):
udp_s.setblocking(0)
with socket.socket(af, socket.SOCK_STREAM) as tcp_s:
ll = dns.inet.low_level_address_tuple((address, 53))
+ tcp_s.settimeout(2)
tcp_s.connect(ll)
tcp_s.setblocking(0)
qname = dns.name.from_text('.')
q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
(_, tcp) = dns.query.udp_with_fallback(q, address,
- udp_sock=udp_s,
- tcp_sock=tcp_s)
+ udp_sock=udp_s,
+ tcp_sock=tcp_s,
+ timeout=2)
self.assertTrue(tcp)
def testQueryUDPFallbackNoFallback(self):
for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- (_, tcp) = dns.query.udp_with_fallback(q, address)
+ (_, tcp) = dns.query.udp_with_fallback(q, address, timeout=2)
self.assertFalse(tcp)
def testUDPReceiveQuery(self):