summaryrefslogtreecommitdiff
path: root/tests/test_xfr.py
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2020-08-16 17:58:29 -0700
committerBob Halley <halley@dnspython.org>2020-08-21 07:40:45 -0700
commita7de0230bcbd9eb1a92cebe988394231cd6437da (patch)
tree80eaac1c15eda312309c0d87f904a19a55fafc1c /tests/test_xfr.py
parente2888f116e0c98748f63044e9801acd0d18defd5 (diff)
downloaddnspython-a7de0230bcbd9eb1a92cebe988394231cd6437da.tar.gz
Implement new inbound xfr design.xfr
Diffstat (limited to 'tests/test_xfr.py')
-rw-r--r--tests/test_xfr.py714
1 files changed, 714 insertions, 0 deletions
diff --git a/tests/test_xfr.py b/tests/test_xfr.py
new file mode 100644
index 0000000..fbda5fa
--- /dev/null
+++ b/tests/test_xfr.py
@@ -0,0 +1,714 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+import asyncio
+
+import pytest
+
+import dns.asyncbackend
+import dns.asyncquery
+import dns.message
+import dns.query
+import dns.tsigkeyring
+import dns.versioned
+import dns.xfr
+
+# Some tests use a "nano nameserver" for testing. It requires trio
+# and threading, so try to import it and if it doesn't work, skip
+# those tests.
+try:
+ from .nanonameserver import Server
+ _nanonameserver_available = True
+except ImportError:
+ _nanonameserver_available = False
+ class Server(object):
+ pass
+
+axfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN AXFR
+;ANSWER
+@ 3600 IN SOA foo bar 1 2 3 4 5
+@ 3600 IN NS ns1
+@ 3600 IN NS ns2
+bar.foo 300 IN MX 0 blaz.foo
+ns1 3600 IN A 10.0.0.1
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN SOA foo bar 1 2 3 4 5
+'''
+
+axfr1 = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN AXFR
+;ANSWER
+@ 3600 IN SOA foo bar 1 2 3 4 5
+@ 3600 IN NS ns1
+@ 3600 IN NS ns2
+'''
+axfr2 = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;ANSWER
+bar.foo 300 IN MX 0 blaz.foo
+ns1 3600 IN A 10.0.0.1
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN SOA foo bar 1 2 3 4 5
+'''
+
+base = """@ 3600 IN SOA foo bar 1 2 3 4 5
+@ 3600 IN NS ns1
+@ 3600 IN NS ns2
+bar.foo 300 IN MX 0 blaz.foo
+ns1 3600 IN A 10.0.0.1
+ns2 3600 IN A 10.0.0.2
+"""
+
+axfr_unexpected_origin = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN AXFR
+;ANSWER
+@ 3600 IN SOA foo bar 1 2 3 4 5
+@ 3600 IN SOA foo bar 1 2 3 4 7
+'''
+
+ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN SOA foo bar 1 2 3 4 5
+bar.foo 300 IN MX 0 blaz.foo
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN SOA foo bar 2 2 3 4 5
+ns2 3600 IN A 10.0.0.4
+@ 3600 IN SOA foo bar 2 2 3 4 5
+@ 3600 IN SOA foo bar 3 2 3 4 5
+ns3 3600 IN A 10.0.0.3
+@ 3600 IN SOA foo bar 3 2 3 4 5
+@ 3600 IN NS ns2
+@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN SOA foo bar 4 2 3 4 5
+'''
+
+compressed_ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN SOA foo bar 1 2 3 4 5
+bar.foo 300 IN MX 0 blaz.foo
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN NS ns2
+@ 3600 IN SOA foo bar 4 2 3 4 5
+ns2 3600 IN A 10.0.0.4
+ns3 3600 IN A 10.0.0.3
+@ 3600 IN SOA foo bar 4 2 3 4 5
+'''
+
+ixfr_expected = """@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN NS ns1
+ns1 3600 IN A 10.0.0.1
+ns2 3600 IN A 10.0.0.4
+ns3 3600 IN A 10.0.0.3
+"""
+
+ixfr_first_message = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 4 2 3 4 5
+'''
+
+ixfr_header = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;ANSWER
+'''
+
+ixfr_body = [
+ '@ 3600 IN SOA foo bar 1 2 3 4 5',
+ 'bar.foo 300 IN MX 0 blaz.foo',
+ 'ns2 3600 IN A 10.0.0.2',
+ '@ 3600 IN SOA foo bar 2 2 3 4 5',
+ 'ns2 3600 IN A 10.0.0.4',
+ '@ 3600 IN SOA foo bar 2 2 3 4 5',
+ '@ 3600 IN SOA foo bar 3 2 3 4 5',
+ 'ns3 3600 IN A 10.0.0.3',
+ '@ 3600 IN SOA foo bar 3 2 3 4 5',
+ '@ 3600 IN NS ns2',
+ '@ 3600 IN SOA foo bar 4 2 3 4 5',
+ '@ 3600 IN SOA foo bar 4 2 3 4 5',
+]
+
+ixfrs = [ixfr_first_message]
+ixfrs.extend([ixfr_header + l for l in ixfr_body])
+
+good_empty_ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 1 2 3 4 5
+'''
+
+retry_tcp_ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 5 2 3 4 5
+'''
+
+bad_empty_ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN SOA foo bar 4 2 3 4 5
+'''
+
+unexpected_end_ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN SOA foo bar 1 2 3 4 5
+bar.foo 300 IN MX 0 blaz.foo
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN NS ns2
+@ 3600 IN SOA foo bar 3 2 3 4 5
+ns2 3600 IN A 10.0.0.4
+ns3 3600 IN A 10.0.0.3
+@ 3600 IN SOA foo bar 4 2 3 4 5
+'''
+
+bad_serial_ixfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 4 2 3 4 5
+@ 3600 IN SOA foo bar 2 2 3 4 5
+bar.foo 300 IN MX 0 blaz.foo
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN NS ns2
+@ 3600 IN SOA foo bar 4 2 3 4 5
+ns2 3600 IN A 10.0.0.4
+ns3 3600 IN A 10.0.0.3
+@ 3600 IN SOA foo bar 4 2 3 4 5
+'''
+
+ixfr_axfr = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 3600 IN SOA foo bar 1 2 3 4 5
+@ 3600 IN NS ns1
+@ 3600 IN NS ns2
+bar.foo 300 IN MX 0 blaz.foo
+ns1 3600 IN A 10.0.0.1
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN SOA foo bar 1 2 3 4 5
+'''
+
+def test_basic_axfr():
+ z = dns.versioned.Zone('example.')
+ m = dns.message.from_text(axfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
+ done = xfr.process_message(m)
+ assert done
+ ez = dns.zone.from_text(base, 'example.')
+ assert z == ez
+
+def test_basic_axfr_two_parts():
+ z = dns.versioned.Zone('example.')
+ m1 = dns.message.from_text(axfr1, origin=z.origin,
+ one_rr_per_rrset=True)
+ m2 = dns.message.from_text(axfr2, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
+ done = xfr.process_message(m1)
+ assert not done
+ done = xfr.process_message(m2)
+ assert done
+ ez = dns.zone.from_text(base, 'example.')
+ assert z == ez
+
+def test_axfr_unexpected_origin():
+ z = dns.versioned.Zone('example.')
+ m = dns.message.from_text(axfr_unexpected_origin, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_basic_ixfr():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ done = xfr.process_message(m)
+ assert done
+ ez = dns.zone.from_text(ixfr_expected, 'example.')
+ assert z == ez
+
+def test_compressed_ixfr():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(compressed_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ done = xfr.process_message(m)
+ assert done
+ ez = dns.zone.from_text(ixfr_expected, 'example.')
+ assert z == ez
+
+def test_basic_ixfr_many_parts():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ done = False
+ for text in ixfrs:
+ assert not done
+ m = dns.message.from_text(text, origin=z.origin,
+ one_rr_per_rrset=True)
+ done = xfr.process_message(m)
+ assert done
+ ez = dns.zone.from_text(ixfr_expected, 'example.')
+ assert z == ez
+
+def test_good_empty_ixfr():
+ z = dns.zone.from_text(ixfr_expected, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(good_empty_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ done = xfr.process_message(m)
+ assert done
+ ez = dns.zone.from_text(ixfr_expected, 'example.')
+ assert z == ez
+
+def test_retry_tcp_ixfr():
+ z = dns.zone.from_text(ixfr_expected, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(retry_tcp_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.xfr.UseTCP):
+ xfr.process_message(m, True)
+
+def test_bad_empty_ixfr():
+ z = dns.zone.from_text(ixfr_expected, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(bad_empty_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=3) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_serial_went_backwards_ixfr():
+ z = dns.zone.from_text(ixfr_expected, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(bad_empty_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=5) as xfr:
+ with pytest.raises(dns.xfr.SerialWentBackwards):
+ xfr.process_message(m)
+
+def test_ixfr_is_axfr():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(ixfr_axfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=0xffffffff) as xfr:
+ done = xfr.process_message(m)
+ assert done
+ ez = dns.zone.from_text(base, 'example.')
+ assert z == ez
+
+def test_ixfr_requires_serial():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ with pytest.raises(ValueError):
+ dns.xfr.Inbound(z, dns.rdatatype.IXFR)
+
+def test_ixfr_unexpected_end():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(unexpected_end_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_ixfr_bad_serial():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(bad_serial_ixfr, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+refused = '''id 1
+opcode QUERY
+rcode REFUSED
+flags AA
+;QUESTION
+example. IN AXFR
+'''
+
+bad_qname = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+not-example. IN IXFR
+'''
+
+bad_qtype = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN AXFR
+'''
+
+soa_not_first = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+bar.foo 300 IN MX 0 blaz.foo
+'''
+
+soa_not_first_2 = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ANSWER
+@ 300 IN MX 0 blaz.foo
+'''
+
+no_answer = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN IXFR
+;ADDITIONAL
+bar.foo 300 IN MX 0 blaz.foo
+'''
+
+axfr_answers_after_final_soa = '''id 1
+opcode QUERY
+rcode NOERROR
+flags AA
+;QUESTION
+example. IN AXFR
+;ANSWER
+@ 3600 IN SOA foo bar 1 2 3 4 5
+@ 3600 IN NS ns1
+@ 3600 IN NS ns2
+bar.foo 300 IN MX 0 blaz.foo
+ns1 3600 IN A 10.0.0.1
+ns2 3600 IN A 10.0.0.2
+@ 3600 IN SOA foo bar 1 2 3 4 5
+ns3 3600 IN A 10.0.0.3
+'''
+
+def test_refused():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(refused, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.xfr.TransferError):
+ xfr.process_message(m)
+
+def test_bad_qname():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(bad_qname, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_bad_qtype():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(bad_qtype, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_soa_not_first():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(soa_not_first, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+ m = dns.message.from_text(soa_not_first_2, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_no_answer():
+ z = dns.zone.from_text(base, 'example.',
+ zone_factory=dns.versioned.Zone)
+ m = dns.message.from_text(no_answer, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+def test_axfr_answers_after_final_soa():
+ z = dns.versioned.Zone('example.')
+ m = dns.message.from_text(axfr_answers_after_final_soa, origin=z.origin,
+ one_rr_per_rrset=True)
+ with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
+ with pytest.raises(dns.exception.FormError):
+ xfr.process_message(m)
+
+keyring = dns.tsigkeyring.from_text(
+ {
+ 'keyname.': 'NjHwPsMKjdN++dOfE5iAiQ=='
+ }
+)
+
+keyname = dns.name.from_text('keyname')
+
+def test_make_query_basic():
+ z = dns.versioned.Zone('example.')
+ (q, s) = dns.xfr.make_query(z)
+ assert q.question[0].rdtype == dns.rdatatype.AXFR
+ assert s is None
+ (q, s) = dns.xfr.make_query(z, serial=None)
+ assert q.question[0].rdtype == dns.rdatatype.AXFR
+ assert s is None
+ (q, s) = dns.xfr.make_query(z, serial=10)
+ assert q.question[0].rdtype == dns.rdatatype.IXFR
+ assert q.authority[0].rdtype == dns.rdatatype.SOA
+ assert q.authority[0][0].serial == 10
+ assert s == 10
+ with z.writer() as txn:
+ txn.add('@', 300, dns.rdata.from_text('in', 'soa', '. . 1 2 3 4 5'))
+ (q, s) = dns.xfr.make_query(z)
+ assert q.question[0].rdtype == dns.rdatatype.IXFR
+ assert q.authority[0].rdtype == dns.rdatatype.SOA
+ assert q.authority[0][0].serial == 1
+ assert s == 1
+ (q, s) = dns.xfr.make_query(z, keyring=keyring, keyname=keyname)
+ assert q.question[0].rdtype == dns.rdatatype.IXFR
+ assert q.authority[0].rdtype == dns.rdatatype.SOA
+ assert q.authority[0][0].serial == 1
+ assert s == 1
+ assert q.keyname == keyname
+
+
+def test_make_query_bad_serial():
+ z = dns.versioned.Zone('example.')
+ with pytest.raises(ValueError):
+ dns.xfr.make_query(z, serial='hi')
+ with pytest.raises(ValueError):
+ dns.xfr.make_query(z, serial=-1)
+ with pytest.raises(ValueError):
+ dns.xfr.make_query(z, serial=4294967296)
+
+
+class XFRNanoNameserver(Server):
+
+ def __init__(self):
+ super().__init__(origin=dns.name.from_text('example'))
+
+ def handle(self, request):
+ try:
+ if request.message.question[0].rdtype == dns.rdatatype.IXFR:
+ text = ixfr
+ else:
+ text = axfr
+ r = dns.message.from_text(text, one_rr_per_rrset=True,
+ origin=self.origin)
+ r.id = request.message.id
+ return r
+ except Exception:
+ pass
+
+@pytest.mark.skipif(not _nanonameserver_available,
+ reason="requires nanonameserver")
+def test_sync_inbound_xfr():
+ with XFRNanoNameserver() as ns:
+ zone = dns.versioned.Zone('example')
+ dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ expected = dns.zone.from_text(ixfr_expected, 'example')
+ assert zone == expected
+
+async def async_inbound_xfr():
+ with XFRNanoNameserver() as ns:
+ zone = dns.versioned.Zone('example')
+ await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
+ port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
+ port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ expected = dns.zone.from_text(ixfr_expected, 'example')
+ assert zone == expected
+
+@pytest.mark.skipif(not _nanonameserver_available,
+ reason="requires nanonameserver")
+def test_asyncio_inbound_xfr():
+ dns.asyncbackend.set_default_backend('asyncio')
+ async def run():
+ await async_inbound_xfr()
+ try:
+ runner = asyncio.run
+ except AttributeError:
+ # this is only needed for 3.6
+ def old_runner(awaitable):
+ loop = asyncio.get_event_loop()
+ return loop.run_until_complete(awaitable)
+ runner = old_runner
+ runner(run())
+
+#
+# We don't need to do this as it's all generic code, but
+# just for extra caution we do it for each backend.
+#
+
+try:
+ import trio
+
+ @pytest.mark.skipif(not _nanonameserver_available,
+ reason="requires nanonameserver")
+ def test_trio_inbound_xfr():
+ dns.asyncbackend.set_default_backend('trio')
+ async def run():
+ await async_inbound_xfr()
+ trio.run(run)
+except ImportError:
+ pass
+
+try:
+ import curio
+
+ @pytest.mark.skipif(not _nanonameserver_available,
+ reason="requires nanonameserver")
+ def test_curio_inbound_xfr():
+ dns.asyncbackend.set_default_backend('curio')
+ async def run():
+ await async_inbound_xfr()
+ curio.run(run)
+except ImportError:
+ pass
+
+
+class UDPXFRNanoNameserver(Server):
+
+ def __init__(self):
+ super().__init__(origin=dns.name.from_text('example'))
+ self.did_truncation = False
+
+ def handle(self, request):
+ try:
+ if request.message.question[0].rdtype == dns.rdatatype.IXFR:
+ if self.did_truncation:
+ text = ixfr
+ else:
+ text = retry_tcp_ixfr
+ self.did_truncation = True
+ else:
+ text = axfr
+ r = dns.message.from_text(text, one_rr_per_rrset=True,
+ origin=self.origin)
+ r.id = request.message.id
+ return r
+ except Exception:
+ pass
+
+@pytest.mark.skipif(not _nanonameserver_available,
+ reason="requires nanonameserver")
+def test_sync_retry_tcp_inbound_xfr():
+ with UDPXFRNanoNameserver() as ns:
+ zone = dns.versioned.Zone('example')
+ dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ expected = dns.zone.from_text(ixfr_expected, 'example')
+ assert zone == expected
+
+async def udp_async_inbound_xfr():
+ with UDPXFRNanoNameserver() as ns:
+ zone = dns.versioned.Zone('example')
+ await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
+ port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
+ port=ns.tcp_address[1],
+ udp_mode=dns.query.UDPMode.TRY_FIRST)
+ expected = dns.zone.from_text(ixfr_expected, 'example')
+ assert zone == expected
+
+@pytest.mark.skipif(not _nanonameserver_available,
+ reason="requires nanonameserver")
+def test_asyncio_retry_tcp_inbound_xfr():
+ dns.asyncbackend.set_default_backend('asyncio')
+ async def run():
+ await udp_async_inbound_xfr()
+ try:
+ runner = asyncio.run
+ except AttributeError:
+ def old_runner(awaitable):
+ loop = asyncio.get_event_loop()
+ return loop.run_until_complete(awaitable)
+ runner = old_runner
+ runner(run())