From c1552c70c5a34584ffb23a9a48b6bf1501e1eea4 Mon Sep 17 00:00:00 2001 From: Aaron Haslett Date: Wed, 9 May 2018 18:02:28 +1200 Subject: dns: record aging tests First basic DNS record aging tests. These check that we can turn aging on and off, and that timestamps are written on DNS add and update calls, but not RPC calls. BUG: https://bugzilla.samba.org/show_bug.cgi?id=10812 Signed-off-by: Aaron Haslett Reviewed-by: Gary Lockyer Reviewed-by: Andrew Bartlett --- python/samba/tests/dns.py | 212 +++++++++++++++++++++++++++++++++++++++-- python/samba/tests/dns_base.py | 10 +- 2 files changed, 208 insertions(+), 14 deletions(-) (limited to 'python') diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py index 4082b5375d1..083dc460afd 100644 --- a/python/samba/tests/dns.py +++ b/python/samba/tests/dns.py @@ -16,6 +16,12 @@ # from __future__ import print_function + +from samba import dsdb +from samba.ndr import ndr_unpack, ndr_pack +from samba.samdb import SamDB +from samba.auth import system_session +import ldb import os import sys import struct @@ -652,11 +658,9 @@ class TestComplexQueries(DNSTest): r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 r.length = 0xffff - rdata = value - r.rdata = rdata - updates = [r] + r.rdata = value p.nscount = 1 - p.nsrecs = updates + p.nsrecs = [r] (response, response_packet) = self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) @@ -884,11 +888,22 @@ class TestZones(DNSTest): self.timeout = timeout self.zone = "test.lan" - self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip), + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\ + (self.server_ip), self.lp, self.creds) + self.samdb = SamDB(url="ldap://" + self.server_ip, + lp = self.get_loadparm(), + session_info=system_session(), + credentials=self.creds) + + self.zone_dn = "DC=" + self.zone +\ + ",CN=MicrosoftDNS,DC=DomainDNSZones," +\ + str(self.samdb.get_default_basedn()) + def tearDown(self): super(TestZones, self).tearDown() + try: self.delete_zone(self.zone) except RuntimeError as e: @@ -896,15 +911,18 @@ class TestZones(DNSTest): if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: raise - def create_zone(self, zone): + def create_zone(self, zone, aging_enabled=False): zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() zone_create.pszZoneName = zone zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY - zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE - zone_create.fAging = 0 + zone_create.fAging = int(aging_enabled) zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + zone_create.fDsIntegrated = 1 + zone_create.fLoadExisting = 1 + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE try: - self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + self.rpc_conn.DnssrvOperation2(client_version, 0, self.server_ip, None, @@ -915,6 +933,182 @@ class TestZones(DNSTest): except WERRORError as e: self.fail(str(e)) + def set_params(self, **kwargs): + zone = kwargs.pop('zone', None) + for key,val in kwargs.items(): + name_param = dnsserver.DNS_RPC_NAME_AND_PARAM() + name_param.dwParam = val + name_param.pszNodeName = key + + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM + try: + self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone, + 0, 'ResetDwordProperty', nap_type, + name_param) + except WERRORError as e: + self.fail(str(e)) + + def ldap_modify_dnsrecs(self, name, func): + dn = 'DC={},{}'.format(name, self.zone_dn) + dns_recs = self.ldap_get_dns_records(name) + for rec in dns_recs: + func(rec) + update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]} + self.samdb.modify(ldb.Message.from_dict(self.samdb, + update_dict, + ldb.FLAG_MOD_REPLACE)) + + def dns_update_record(self, prefix, txt): + p = self.make_txt_update(prefix, txt, self.zone) + (code, response) = self.dns_transaction_udp(p, host=self.server_ip) + self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK) + recs = self.ldap_get_dns_records(prefix) + recs = [r for r in recs if r.data.str == txt] + self.assertEqual(len(recs), 1) + return recs[0] + + def ldap_get_records(self, name): + dn = 'DC={},{}'.format(name, self.zone_dn) + expr = "(&(objectClass=dnsNode)(name={}))".format(name) + return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + + def ldap_get_dns_records(self, name): + records = self.ldap_get_records(name) + return [ndr_unpack(dnsp.DnssrvRpcRecord, r) + for r in records[0].get('dnsRecord')] + + def ldap_get_zone_settings(self): + records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE, + expression="(&(objectClass=dnsZone)"+\ + "(name={}))".format(self.zone), + attrs=["dNSProperty"]) + self.assertEqual(len(records), 1) + props = [ndr_unpack(dnsp.DnsProperty, r) + for r in records[0].get('dNSProperty')] + + #We have no choice but to repeat these here. + zone_prop_ids = {0x00: "EMPTY", + 0x01: "TYPE", + 0x02: "ALLOW_UPDATE", + 0x08: "SECURE_TIME", + 0x10: "NOREFRESH_INTERVAL", + 0x11: "SCAVENGING_SERVERS", + 0x12: "AGING_ENABLED_TIME", + 0x20: "REFRESH_INTERVAL", + 0x40: "AGING_STATE", + 0x80: "DELETED_FROM_HOSTNAME", + 0x81: "MASTER_SERVERS", + 0x82: "AUTO_NS_SERVERS", + 0x83: "DCPROMO_CONVERT", + 0x90: "SCAVENGING_SERVERS_DA", + 0x91: "MASTER_SERVERS_DA", + 0x92: "NS_SERVERS_DA", + 0x100: "NODE_DBFLAGS"} + return {zone_prop_ids[p.id].lower(): p.data for p in props} + + def set_aging(self, enable=False): + self.create_zone(self.zone, aging_enabled=enable) + self.set_params(NoRefreshInterval=1, RefreshInterval=1, + Aging=int(bool(enable)), zone=self.zone, + AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE) + + def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']): + self.set_aging(enable=True) + settings = self.ldap_get_zone_settings() + self.assertTrue(settings['aging_state'] is not None) + self.assertTrue(settings['aging_state']) + + rec = self.dns_update_record('agingtest', ['test txt']) + self.assertNotEqual(rec.dwTimeStamp, 0) + + def test_set_aging_disabled(self): + self.set_aging(enable=False) + settings = self.ldap_get_zone_settings() + self.assertTrue(settings['aging_state'] is not None) + self.assertFalse(settings['aging_state']) + + rec = self.dns_update_record('agingtest', ['test txt']) + self.assertNotEqual(rec.dwTimeStamp, 0) + + def test_aging_update(self, enable=True): + name, txt = 'agingtest', ['test txt'] + self.set_aging(enable=True) + before_mod = self.dns_update_record(name, txt) + if not enable: + self.set_params(zone=self.zone, Aging=0) + dec = 2 + def mod_ts(rec): + rec.dwTimeStamp -= dec + self.ldap_modify_dnsrecs(name, mod_ts) + after_mod = self.ldap_get_dns_records(name) + self.assertEqual(len(after_mod), 1) + after_mod = after_mod[0] + self.assertEqual(after_mod.dwTimeStamp, + before_mod.dwTimeStamp - dec) + after_update = self.dns_update_record(name, txt) + after_should_equal = before_mod if enable else after_mod + self.assertEqual(after_should_equal.dwTimeStamp, + after_update.dwTimeStamp) + + def test_aging_update_disabled(self): + self.test_aging_update(enable=False) + + def test_aging_refresh(self): + name,txt = 'agingtest', ['test txt'] + self.create_zone(self.zone, aging_enabled=True) + interval = 10 + self.set_params(NoRefreshInterval=interval, RefreshInterval=interval, + Aging=1, zone=self.zone, + AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE) + before_mod = self.dns_update_record(name, txt) + def mod_ts(rec): + rec.dwTimeStamp -= interval/2 + self.ldap_modify_dnsrecs(name, mod_ts) + update_during_norefresh = self.dns_update_record(name, txt) + def mod_ts(rec): + rec.dwTimeStamp -= interval + interval/2 + self.ldap_modify_dnsrecs(name, mod_ts) + update_during_refresh = self.dns_update_record(name, txt) + self.assertEqual(update_during_norefresh.dwTimeStamp, + before_mod.dwTimeStamp - interval/2) + self.assertEqual(update_during_refresh.dwTimeStamp, + before_mod.dwTimeStamp) + + def test_rpc_add_no_timestamp(self): + name,txt = 'agingtest', ['test txt'] + self.set_aging(enable=True) + rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + rec_buf.rec = TXTRecord(txt) + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, + self.zone, name, rec_buf, None) + recs = self.ldap_get_dns_records(name) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].dwTimeStamp, 0) + + def test_basic_scavenging(self): + self.create_zone(self.zone, aging_enabled=True) + interval = 1 + self.set_params(NoRefreshInterval=interval, RefreshInterval=interval, + zone=self.zone, Aging=1, + AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE) + name, txt = 'agingtest', ['test txt'] + rec = self.dns_update_record(name,txt) + rec = self.dns_update_record(name+'2',txt) + def mod_ts(rec): + rec.dwTimeStamp -= interval*5 + self.ldap_modify_dnsrecs(name, mod_ts) + dsdb._scavenge_dns_records(self.samdb) + + recs = self.ldap_get_dns_records(name) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) + recs = self.ldap_get_dns_records(name+'2') + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT) + def delete_zone(self, zone): self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, diff --git a/python/samba/tests/dns_base.py b/python/samba/tests/dns_base.py index 10c7a0a6af5..0709bbe1c40 100644 --- a/python/samba/tests/dns_base.py +++ b/python/samba/tests/dns_base.py @@ -166,18 +166,18 @@ class DNSTest(TestCaseInTempDir): return (response, recv_packet[2:]) - def make_txt_update(self, prefix, txt_array): + def make_txt_update(self, prefix, txt_array, domain=None): p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] - name = self.get_dns_domain() + name = domain or self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() - r.name = "%s.%s" % (prefix, self.get_dns_domain()) + r.name = "%s.%s" % (prefix, name) r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 @@ -190,8 +190,8 @@ class DNSTest(TestCaseInTempDir): return p - def check_query_txt(self, prefix, txt_array): - name = "%s.%s" % (prefix, self.get_dns_domain()) + def check_query_txt(self, prefix, txt_array, zone=None): + name = "%s.%s" % (prefix, zone or self.get_dns_domain()) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] -- cgit v1.2.1