summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorBrian Wellington <bwelling@xbill.org>2020-06-16 13:42:31 -0700
committerBrian Wellington <bwelling@xbill.org>2020-06-16 13:43:16 -0700
commite3c7fa8b24b0f7c2d5889f49dc4b72b749d331d3 (patch)
tree423f0742c447166ca9004899111d7d800d085b81 /tests
parenta13678ed8afdaf09502abaadcfa85399e6d978f2 (diff)
downloaddnspython-e3c7fa8b24b0f7c2d5889f49dc4b72b749d331d3.tar.gz
Test sending a query with TSIG.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_query.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/tests/test_query.py b/tests/test_query.py
index df52dfe..ec490f1 100644
--- a/tests/test_query.py
+++ b/tests/test_query.py
@@ -31,6 +31,7 @@ import dns.name
import dns.rdataclass
import dns.rdatatype
import dns.query
+import dns.tsigkeyring
import dns.zone
# Some tests require the internet to be available to run, so let's
@@ -65,6 +66,8 @@ for (af, address) in ((socket.AF_INET, '8.8.8.8'),
except Exception:
pass
+keyring = dns.tsigkeyring.from_text({'name' : 'tDz6cfXXGtNivRpQ98hr6A=='})
+
@unittest.skipIf(not _network_available, "Internet not reachable")
class QueryTests(unittest.TestCase):
@@ -392,3 +395,39 @@ class XfrTests(unittest.TestCase):
relativize=False)
l = list(xfr)
self.assertRaises(dns.exception.FormError, bad)
+
+class TSIGNanoNameserver(Server):
+
+ def handle(self, message, peer, connection_type):
+ response = dns.message.make_response(message)
+ response.set_rcode(dns.rcode.REFUSED)
+ response.flags |= dns.flags.RA
+ try:
+ if message.question[0].rdtype == dns.rdatatype.A and \
+ message.question[0].rdclass == dns.rdataclass.IN:
+ rrs = dns.rrset.from_text(message.question[0].name, 300,
+ 'IN', 'A', '1.2.3.4')
+ response.answer.append(rrs)
+ response.set_rcode(dns.rcode.NOERROR)
+ response.flags |= dns.flags.AA
+ except Exception:
+ pass
+ return response
+
+@unittest.skipIf(not _nanonameserver_available,
+ "Internet and nanonameserver required")
+class TsigTests(unittest.TestCase):
+
+ def test_tsig(self):
+ with TSIGNanoNameserver(keyring=keyring) as ns:
+ qname = dns.name.from_text('example.com')
+ q = dns.message.make_query(qname, 'A')
+ q.use_tsig(keyring=keyring, keyname='name')
+ response = dns.query.udp(q, ns.udp_address[0],
+ port=ns.udp_address[1])
+ self.assertTrue(response.had_tsig)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('1.2.3.4' in seen)