diff options
author | Bob Halley <halley@dnspython.org> | 2020-08-16 17:58:29 -0700 |
---|---|---|
committer | Bob Halley <halley@dnspython.org> | 2020-08-21 07:40:45 -0700 |
commit | a7de0230bcbd9eb1a92cebe988394231cd6437da (patch) | |
tree | 80eaac1c15eda312309c0d87f904a19a55fafc1c /tests/test_xfr.py | |
parent | e2888f116e0c98748f63044e9801acd0d18defd5 (diff) | |
download | dnspython-a7de0230bcbd9eb1a92cebe988394231cd6437da.tar.gz |
Implement new inbound xfr design.xfr
Diffstat (limited to 'tests/test_xfr.py')
-rw-r--r-- | tests/test_xfr.py | 714 |
1 files changed, 714 insertions, 0 deletions
diff --git a/tests/test_xfr.py b/tests/test_xfr.py new file mode 100644 index 0000000..fbda5fa --- /dev/null +++ b/tests/test_xfr.py @@ -0,0 +1,714 @@ +# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license + +import asyncio + +import pytest + +import dns.asyncbackend +import dns.asyncquery +import dns.message +import dns.query +import dns.tsigkeyring +import dns.versioned +import dns.xfr + +# Some tests use a "nano nameserver" for testing. It requires trio +# and threading, so try to import it and if it doesn't work, skip +# those tests. +try: + from .nanonameserver import Server + _nanonameserver_available = True +except ImportError: + _nanonameserver_available = False + class Server(object): + pass + +axfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN AXFR +;ANSWER +@ 3600 IN SOA foo bar 1 2 3 4 5 +@ 3600 IN NS ns1 +@ 3600 IN NS ns2 +bar.foo 300 IN MX 0 blaz.foo +ns1 3600 IN A 10.0.0.1 +ns2 3600 IN A 10.0.0.2 +@ 3600 IN SOA foo bar 1 2 3 4 5 +''' + +axfr1 = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN AXFR +;ANSWER +@ 3600 IN SOA foo bar 1 2 3 4 5 +@ 3600 IN NS ns1 +@ 3600 IN NS ns2 +''' +axfr2 = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;ANSWER +bar.foo 300 IN MX 0 blaz.foo +ns1 3600 IN A 10.0.0.1 +ns2 3600 IN A 10.0.0.2 +@ 3600 IN SOA foo bar 1 2 3 4 5 +''' + +base = """@ 3600 IN SOA foo bar 1 2 3 4 5 +@ 3600 IN NS ns1 +@ 3600 IN NS ns2 +bar.foo 300 IN MX 0 blaz.foo +ns1 3600 IN A 10.0.0.1 +ns2 3600 IN A 10.0.0.2 +""" + +axfr_unexpected_origin = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN AXFR +;ANSWER +@ 3600 IN SOA foo bar 1 2 3 4 5 +@ 3600 IN SOA foo bar 1 2 3 4 7 +''' + +ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN SOA foo bar 1 2 3 4 5 +bar.foo 300 IN MX 0 blaz.foo +ns2 3600 IN A 10.0.0.2 +@ 3600 IN SOA foo bar 2 2 3 4 5 +ns2 3600 IN A 10.0.0.4 +@ 3600 IN SOA foo bar 2 2 3 4 5 +@ 3600 IN SOA foo bar 3 2 3 4 5 +ns3 3600 IN A 10.0.0.3 +@ 3600 IN SOA foo bar 3 2 3 4 5 +@ 3600 IN NS ns2 +@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN SOA foo bar 4 2 3 4 5 +''' + +compressed_ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN SOA foo bar 1 2 3 4 5 +bar.foo 300 IN MX 0 blaz.foo +ns2 3600 IN A 10.0.0.2 +@ 3600 IN NS ns2 +@ 3600 IN SOA foo bar 4 2 3 4 5 +ns2 3600 IN A 10.0.0.4 +ns3 3600 IN A 10.0.0.3 +@ 3600 IN SOA foo bar 4 2 3 4 5 +''' + +ixfr_expected = """@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN NS ns1 +ns1 3600 IN A 10.0.0.1 +ns2 3600 IN A 10.0.0.4 +ns3 3600 IN A 10.0.0.3 +""" + +ixfr_first_message = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 4 2 3 4 5 +''' + +ixfr_header = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;ANSWER +''' + +ixfr_body = [ + '@ 3600 IN SOA foo bar 1 2 3 4 5', + 'bar.foo 300 IN MX 0 blaz.foo', + 'ns2 3600 IN A 10.0.0.2', + '@ 3600 IN SOA foo bar 2 2 3 4 5', + 'ns2 3600 IN A 10.0.0.4', + '@ 3600 IN SOA foo bar 2 2 3 4 5', + '@ 3600 IN SOA foo bar 3 2 3 4 5', + 'ns3 3600 IN A 10.0.0.3', + '@ 3600 IN SOA foo bar 3 2 3 4 5', + '@ 3600 IN NS ns2', + '@ 3600 IN SOA foo bar 4 2 3 4 5', + '@ 3600 IN SOA foo bar 4 2 3 4 5', +] + +ixfrs = [ixfr_first_message] +ixfrs.extend([ixfr_header + l for l in ixfr_body]) + +good_empty_ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 1 2 3 4 5 +''' + +retry_tcp_ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 5 2 3 4 5 +''' + +bad_empty_ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN SOA foo bar 4 2 3 4 5 +''' + +unexpected_end_ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN SOA foo bar 1 2 3 4 5 +bar.foo 300 IN MX 0 blaz.foo +ns2 3600 IN A 10.0.0.2 +@ 3600 IN NS ns2 +@ 3600 IN SOA foo bar 3 2 3 4 5 +ns2 3600 IN A 10.0.0.4 +ns3 3600 IN A 10.0.0.3 +@ 3600 IN SOA foo bar 4 2 3 4 5 +''' + +bad_serial_ixfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 4 2 3 4 5 +@ 3600 IN SOA foo bar 2 2 3 4 5 +bar.foo 300 IN MX 0 blaz.foo +ns2 3600 IN A 10.0.0.2 +@ 3600 IN NS ns2 +@ 3600 IN SOA foo bar 4 2 3 4 5 +ns2 3600 IN A 10.0.0.4 +ns3 3600 IN A 10.0.0.3 +@ 3600 IN SOA foo bar 4 2 3 4 5 +''' + +ixfr_axfr = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 3600 IN SOA foo bar 1 2 3 4 5 +@ 3600 IN NS ns1 +@ 3600 IN NS ns2 +bar.foo 300 IN MX 0 blaz.foo +ns1 3600 IN A 10.0.0.1 +ns2 3600 IN A 10.0.0.2 +@ 3600 IN SOA foo bar 1 2 3 4 5 +''' + +def test_basic_axfr(): + z = dns.versioned.Zone('example.') + m = dns.message.from_text(axfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr: + done = xfr.process_message(m) + assert done + ez = dns.zone.from_text(base, 'example.') + assert z == ez + +def test_basic_axfr_two_parts(): + z = dns.versioned.Zone('example.') + m1 = dns.message.from_text(axfr1, origin=z.origin, + one_rr_per_rrset=True) + m2 = dns.message.from_text(axfr2, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr: + done = xfr.process_message(m1) + assert not done + done = xfr.process_message(m2) + assert done + ez = dns.zone.from_text(base, 'example.') + assert z == ez + +def test_axfr_unexpected_origin(): + z = dns.versioned.Zone('example.') + m = dns.message.from_text(axfr_unexpected_origin, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_basic_ixfr(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + done = xfr.process_message(m) + assert done + ez = dns.zone.from_text(ixfr_expected, 'example.') + assert z == ez + +def test_compressed_ixfr(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(compressed_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + done = xfr.process_message(m) + assert done + ez = dns.zone.from_text(ixfr_expected, 'example.') + assert z == ez + +def test_basic_ixfr_many_parts(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + done = False + for text in ixfrs: + assert not done + m = dns.message.from_text(text, origin=z.origin, + one_rr_per_rrset=True) + done = xfr.process_message(m) + assert done + ez = dns.zone.from_text(ixfr_expected, 'example.') + assert z == ez + +def test_good_empty_ixfr(): + z = dns.zone.from_text(ixfr_expected, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(good_empty_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + done = xfr.process_message(m) + assert done + ez = dns.zone.from_text(ixfr_expected, 'example.') + assert z == ez + +def test_retry_tcp_ixfr(): + z = dns.zone.from_text(ixfr_expected, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(retry_tcp_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.xfr.UseTCP): + xfr.process_message(m, True) + +def test_bad_empty_ixfr(): + z = dns.zone.from_text(ixfr_expected, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(bad_empty_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=3) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_serial_went_backwards_ixfr(): + z = dns.zone.from_text(ixfr_expected, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(bad_empty_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=5) as xfr: + with pytest.raises(dns.xfr.SerialWentBackwards): + xfr.process_message(m) + +def test_ixfr_is_axfr(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(ixfr_axfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=0xffffffff) as xfr: + done = xfr.process_message(m) + assert done + ez = dns.zone.from_text(base, 'example.') + assert z == ez + +def test_ixfr_requires_serial(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + with pytest.raises(ValueError): + dns.xfr.Inbound(z, dns.rdatatype.IXFR) + +def test_ixfr_unexpected_end(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(unexpected_end_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_ixfr_bad_serial(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(bad_serial_ixfr, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +refused = '''id 1 +opcode QUERY +rcode REFUSED +flags AA +;QUESTION +example. IN AXFR +''' + +bad_qname = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +not-example. IN IXFR +''' + +bad_qtype = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN AXFR +''' + +soa_not_first = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +bar.foo 300 IN MX 0 blaz.foo +''' + +soa_not_first_2 = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ANSWER +@ 300 IN MX 0 blaz.foo +''' + +no_answer = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN IXFR +;ADDITIONAL +bar.foo 300 IN MX 0 blaz.foo +''' + +axfr_answers_after_final_soa = '''id 1 +opcode QUERY +rcode NOERROR +flags AA +;QUESTION +example. IN AXFR +;ANSWER +@ 3600 IN SOA foo bar 1 2 3 4 5 +@ 3600 IN NS ns1 +@ 3600 IN NS ns2 +bar.foo 300 IN MX 0 blaz.foo +ns1 3600 IN A 10.0.0.1 +ns2 3600 IN A 10.0.0.2 +@ 3600 IN SOA foo bar 1 2 3 4 5 +ns3 3600 IN A 10.0.0.3 +''' + +def test_refused(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(refused, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.xfr.TransferError): + xfr.process_message(m) + +def test_bad_qname(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(bad_qname, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_bad_qtype(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(bad_qtype, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_soa_not_first(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(soa_not_first, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + m = dns.message.from_text(soa_not_first_2, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_no_answer(): + z = dns.zone.from_text(base, 'example.', + zone_factory=dns.versioned.Zone) + m = dns.message.from_text(no_answer, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +def test_axfr_answers_after_final_soa(): + z = dns.versioned.Zone('example.') + m = dns.message.from_text(axfr_answers_after_final_soa, origin=z.origin, + one_rr_per_rrset=True) + with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr: + with pytest.raises(dns.exception.FormError): + xfr.process_message(m) + +keyring = dns.tsigkeyring.from_text( + { + 'keyname.': 'NjHwPsMKjdN++dOfE5iAiQ==' + } +) + +keyname = dns.name.from_text('keyname') + +def test_make_query_basic(): + z = dns.versioned.Zone('example.') + (q, s) = dns.xfr.make_query(z) + assert q.question[0].rdtype == dns.rdatatype.AXFR + assert s is None + (q, s) = dns.xfr.make_query(z, serial=None) + assert q.question[0].rdtype == dns.rdatatype.AXFR + assert s is None + (q, s) = dns.xfr.make_query(z, serial=10) + assert q.question[0].rdtype == dns.rdatatype.IXFR + assert q.authority[0].rdtype == dns.rdatatype.SOA + assert q.authority[0][0].serial == 10 + assert s == 10 + with z.writer() as txn: + txn.add('@', 300, dns.rdata.from_text('in', 'soa', '. . 1 2 3 4 5')) + (q, s) = dns.xfr.make_query(z) + assert q.question[0].rdtype == dns.rdatatype.IXFR + assert q.authority[0].rdtype == dns.rdatatype.SOA + assert q.authority[0][0].serial == 1 + assert s == 1 + (q, s) = dns.xfr.make_query(z, keyring=keyring, keyname=keyname) + assert q.question[0].rdtype == dns.rdatatype.IXFR + assert q.authority[0].rdtype == dns.rdatatype.SOA + assert q.authority[0][0].serial == 1 + assert s == 1 + assert q.keyname == keyname + + +def test_make_query_bad_serial(): + z = dns.versioned.Zone('example.') + with pytest.raises(ValueError): + dns.xfr.make_query(z, serial='hi') + with pytest.raises(ValueError): + dns.xfr.make_query(z, serial=-1) + with pytest.raises(ValueError): + dns.xfr.make_query(z, serial=4294967296) + + +class XFRNanoNameserver(Server): + + def __init__(self): + super().__init__(origin=dns.name.from_text('example')) + + def handle(self, request): + try: + if request.message.question[0].rdtype == dns.rdatatype.IXFR: + text = ixfr + else: + text = axfr + r = dns.message.from_text(text, one_rr_per_rrset=True, + origin=self.origin) + r.id = request.message.id + return r + except Exception: + pass + +@pytest.mark.skipif(not _nanonameserver_available, + reason="requires nanonameserver") +def test_sync_inbound_xfr(): + with XFRNanoNameserver() as ns: + zone = dns.versioned.Zone('example') + dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + expected = dns.zone.from_text(ixfr_expected, 'example') + assert zone == expected + +async def async_inbound_xfr(): + with XFRNanoNameserver() as ns: + zone = dns.versioned.Zone('example') + await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone, + port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone, + port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + expected = dns.zone.from_text(ixfr_expected, 'example') + assert zone == expected + +@pytest.mark.skipif(not _nanonameserver_available, + reason="requires nanonameserver") +def test_asyncio_inbound_xfr(): + dns.asyncbackend.set_default_backend('asyncio') + async def run(): + await async_inbound_xfr() + try: + runner = asyncio.run + except AttributeError: + # this is only needed for 3.6 + def old_runner(awaitable): + loop = asyncio.get_event_loop() + return loop.run_until_complete(awaitable) + runner = old_runner + runner(run()) + +# +# We don't need to do this as it's all generic code, but +# just for extra caution we do it for each backend. +# + +try: + import trio + + @pytest.mark.skipif(not _nanonameserver_available, + reason="requires nanonameserver") + def test_trio_inbound_xfr(): + dns.asyncbackend.set_default_backend('trio') + async def run(): + await async_inbound_xfr() + trio.run(run) +except ImportError: + pass + +try: + import curio + + @pytest.mark.skipif(not _nanonameserver_available, + reason="requires nanonameserver") + def test_curio_inbound_xfr(): + dns.asyncbackend.set_default_backend('curio') + async def run(): + await async_inbound_xfr() + curio.run(run) +except ImportError: + pass + + +class UDPXFRNanoNameserver(Server): + + def __init__(self): + super().__init__(origin=dns.name.from_text('example')) + self.did_truncation = False + + def handle(self, request): + try: + if request.message.question[0].rdtype == dns.rdatatype.IXFR: + if self.did_truncation: + text = ixfr + else: + text = retry_tcp_ixfr + self.did_truncation = True + else: + text = axfr + r = dns.message.from_text(text, one_rr_per_rrset=True, + origin=self.origin) + r.id = request.message.id + return r + except Exception: + pass + +@pytest.mark.skipif(not _nanonameserver_available, + reason="requires nanonameserver") +def test_sync_retry_tcp_inbound_xfr(): + with UDPXFRNanoNameserver() as ns: + zone = dns.versioned.Zone('example') + dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + expected = dns.zone.from_text(ixfr_expected, 'example') + assert zone == expected + +async def udp_async_inbound_xfr(): + with UDPXFRNanoNameserver() as ns: + zone = dns.versioned.Zone('example') + await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone, + port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone, + port=ns.tcp_address[1], + udp_mode=dns.query.UDPMode.TRY_FIRST) + expected = dns.zone.from_text(ixfr_expected, 'example') + assert zone == expected + +@pytest.mark.skipif(not _nanonameserver_available, + reason="requires nanonameserver") +def test_asyncio_retry_tcp_inbound_xfr(): + dns.asyncbackend.set_default_backend('asyncio') + async def run(): + await udp_async_inbound_xfr() + try: + runner = asyncio.run + except AttributeError: + def old_runner(awaitable): + loop = asyncio.get_event_loop() + return loop.run_until_complete(awaitable) + runner = old_runner + runner(run()) |