summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAaron Haslett <aaronhaslett@catalyst.net.nz>2018-05-09 18:02:28 +1200
committerAndrew Bartlett <abartlet@samba.org>2018-07-12 04:31:52 +0200
commitc1552c70c5a34584ffb23a9a48b6bf1501e1eea4 (patch)
tree0c409d705b83ecc8e61266e7641907b262484b2e /python
parentd871e0c84c761877563652558f44d8a3df4c49a3 (diff)
downloadsamba-c1552c70c5a34584ffb23a9a48b6bf1501e1eea4.tar.gz
dns: record aging tests
First basic DNS record aging tests. These check that we can turn aging on and off, and that timestamps are written on DNS add and update calls, but not RPC calls. BUG: https://bugzilla.samba.org/show_bug.cgi?id=10812 Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz> Reviewed-by: Gary Lockyer <gary@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/dns.py212
-rw-r--r--python/samba/tests/dns_base.py10
2 files changed, 208 insertions, 14 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py
index 4082b5375d1..083dc460afd 100644
--- a/python/samba/tests/dns.py
+++ b/python/samba/tests/dns.py
@@ -16,6 +16,12 @@
#
from __future__ import print_function
+
+from samba import dsdb
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.samdb import SamDB
+from samba.auth import system_session
+import ldb
import os
import sys
import struct
@@ -652,11 +658,9 @@ class TestComplexQueries(DNSTest):
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = value
- r.rdata = rdata
- updates = [r]
+ r.rdata = value
p.nscount = 1
- p.nsrecs = updates
+ p.nsrecs = [r]
(response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
@@ -884,11 +888,22 @@ class TestZones(DNSTest):
self.timeout = timeout
self.zone = "test.lan"
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
+ (self.server_ip),
self.lp, self.creds)
+ self.samdb = SamDB(url="ldap://" + self.server_ip,
+ lp = self.get_loadparm(),
+ session_info=system_session(),
+ credentials=self.creds)
+
+ self.zone_dn = "DC=" + self.zone +\
+ ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
+ str(self.samdb.get_default_basedn())
+
def tearDown(self):
super(TestZones, self).tearDown()
+
try:
self.delete_zone(self.zone)
except RuntimeError as e:
@@ -896,15 +911,18 @@ class TestZones(DNSTest):
if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
raise
- def create_zone(self, zone):
+ def create_zone(self, zone, aging_enabled=False):
zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
zone_create.pszZoneName = zone
zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
- zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
- zone_create.fAging = 0
+ zone_create.fAging = int(aging_enabled)
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+ zone_create.fDsIntegrated = 1
+ zone_create.fLoadExisting = 1
+ zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
try:
- self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ self.rpc_conn.DnssrvOperation2(client_version,
0,
self.server_ip,
None,
@@ -915,6 +933,182 @@ class TestZones(DNSTest):
except WERRORError as e:
self.fail(str(e))
+ def set_params(self, **kwargs):
+ zone = kwargs.pop('zone', None)
+ for key,val in kwargs.items():
+ name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
+ name_param.dwParam = val
+ name_param.pszNodeName = key
+
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
+ try:
+ self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
+ 0, 'ResetDwordProperty', nap_type,
+ name_param)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ def ldap_modify_dnsrecs(self, name, func):
+ dn = 'DC={},{}'.format(name, self.zone_dn)
+ dns_recs = self.ldap_get_dns_records(name)
+ for rec in dns_recs:
+ func(rec)
+ update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
+ self.samdb.modify(ldb.Message.from_dict(self.samdb,
+ update_dict,
+ ldb.FLAG_MOD_REPLACE))
+
+ def dns_update_record(self, prefix, txt):
+ p = self.make_txt_update(prefix, txt, self.zone)
+ (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
+ self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
+ recs = self.ldap_get_dns_records(prefix)
+ recs = [r for r in recs if r.data.str == txt]
+ self.assertEqual(len(recs), 1)
+ return recs[0]
+
+ def ldap_get_records(self, name):
+ dn = 'DC={},{}'.format(name, self.zone_dn)
+ expr = "(&(objectClass=dnsNode)(name={}))".format(name)
+ return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
+ expression=expr, attrs=["*"])
+
+ def ldap_get_dns_records(self, name):
+ records = self.ldap_get_records(name)
+ return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
+ for r in records[0].get('dnsRecord')]
+
+ def ldap_get_zone_settings(self):
+ records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
+ expression="(&(objectClass=dnsZone)"+\
+ "(name={}))".format(self.zone),
+ attrs=["dNSProperty"])
+ self.assertEqual(len(records), 1)
+ props = [ndr_unpack(dnsp.DnsProperty, r)
+ for r in records[0].get('dNSProperty')]
+
+ #We have no choice but to repeat these here.
+ zone_prop_ids = {0x00: "EMPTY",
+ 0x01: "TYPE",
+ 0x02: "ALLOW_UPDATE",
+ 0x08: "SECURE_TIME",
+ 0x10: "NOREFRESH_INTERVAL",
+ 0x11: "SCAVENGING_SERVERS",
+ 0x12: "AGING_ENABLED_TIME",
+ 0x20: "REFRESH_INTERVAL",
+ 0x40: "AGING_STATE",
+ 0x80: "DELETED_FROM_HOSTNAME",
+ 0x81: "MASTER_SERVERS",
+ 0x82: "AUTO_NS_SERVERS",
+ 0x83: "DCPROMO_CONVERT",
+ 0x90: "SCAVENGING_SERVERS_DA",
+ 0x91: "MASTER_SERVERS_DA",
+ 0x92: "NS_SERVERS_DA",
+ 0x100: "NODE_DBFLAGS"}
+ return {zone_prop_ids[p.id].lower(): p.data for p in props}
+
+ def set_aging(self, enable=False):
+ self.create_zone(self.zone, aging_enabled=enable)
+ self.set_params(NoRefreshInterval=1, RefreshInterval=1,
+ Aging=int(bool(enable)), zone=self.zone,
+ AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+
+ def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
+ self.set_aging(enable=True)
+ settings = self.ldap_get_zone_settings()
+ self.assertTrue(settings['aging_state'] is not None)
+ self.assertTrue(settings['aging_state'])
+
+ rec = self.dns_update_record('agingtest', ['test txt'])
+ self.assertNotEqual(rec.dwTimeStamp, 0)
+
+ def test_set_aging_disabled(self):
+ self.set_aging(enable=False)
+ settings = self.ldap_get_zone_settings()
+ self.assertTrue(settings['aging_state'] is not None)
+ self.assertFalse(settings['aging_state'])
+
+ rec = self.dns_update_record('agingtest', ['test txt'])
+ self.assertNotEqual(rec.dwTimeStamp, 0)
+
+ def test_aging_update(self, enable=True):
+ name, txt = 'agingtest', ['test txt']
+ self.set_aging(enable=True)
+ before_mod = self.dns_update_record(name, txt)
+ if not enable:
+ self.set_params(zone=self.zone, Aging=0)
+ dec = 2
+ def mod_ts(rec):
+ rec.dwTimeStamp -= dec
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ after_mod = self.ldap_get_dns_records(name)
+ self.assertEqual(len(after_mod), 1)
+ after_mod = after_mod[0]
+ self.assertEqual(after_mod.dwTimeStamp,
+ before_mod.dwTimeStamp - dec)
+ after_update = self.dns_update_record(name, txt)
+ after_should_equal = before_mod if enable else after_mod
+ self.assertEqual(after_should_equal.dwTimeStamp,
+ after_update.dwTimeStamp)
+
+ def test_aging_update_disabled(self):
+ self.test_aging_update(enable=False)
+
+ def test_aging_refresh(self):
+ name,txt = 'agingtest', ['test txt']
+ self.create_zone(self.zone, aging_enabled=True)
+ interval = 10
+ self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+ Aging=1, zone=self.zone,
+ AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+ before_mod = self.dns_update_record(name, txt)
+ def mod_ts(rec):
+ rec.dwTimeStamp -= interval/2
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ update_during_norefresh = self.dns_update_record(name, txt)
+ def mod_ts(rec):
+ rec.dwTimeStamp -= interval + interval/2
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ update_during_refresh = self.dns_update_record(name, txt)
+ self.assertEqual(update_during_norefresh.dwTimeStamp,
+ before_mod.dwTimeStamp - interval/2)
+ self.assertEqual(update_during_refresh.dwTimeStamp,
+ before_mod.dwTimeStamp)
+
+ def test_rpc_add_no_timestamp(self):
+ name,txt = 'agingtest', ['test txt']
+ self.set_aging(enable=True)
+ rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ rec_buf.rec = TXTRecord(txt)
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip,
+ self.zone, name, rec_buf, None)
+ recs = self.ldap_get_dns_records(name)
+ self.assertEqual(len(recs), 1)
+ self.assertEqual(recs[0].dwTimeStamp, 0)
+
+ def test_basic_scavenging(self):
+ self.create_zone(self.zone, aging_enabled=True)
+ interval = 1
+ self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+ zone=self.zone, Aging=1,
+ AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+ name, txt = 'agingtest', ['test txt']
+ rec = self.dns_update_record(name,txt)
+ rec = self.dns_update_record(name+'2',txt)
+ def mod_ts(rec):
+ rec.dwTimeStamp -= interval*5
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ dsdb._scavenge_dns_records(self.samdb)
+
+ recs = self.ldap_get_dns_records(name)
+ self.assertEqual(len(recs), 1)
+ self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
+ recs = self.ldap_get_dns_records(name+'2')
+ self.assertEqual(len(recs), 1)
+ self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
+
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
diff --git a/python/samba/tests/dns_base.py b/python/samba/tests/dns_base.py
index 10c7a0a6af5..0709bbe1c40 100644
--- a/python/samba/tests/dns_base.py
+++ b/python/samba/tests/dns_base.py
@@ -166,18 +166,18 @@ class DNSTest(TestCaseInTempDir):
return (response, recv_packet[2:])
- def make_txt_update(self, prefix, txt_array):
+ def make_txt_update(self, prefix, txt_array, domain=None):
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
updates = []
- name = self.get_dns_domain()
+ name = domain or self.get_dns_domain()
u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
updates.append(u)
self.finish_name_packet(p, updates)
updates = []
r = dns.res_rec()
- r.name = "%s.%s" % (prefix, self.get_dns_domain())
+ r.name = "%s.%s" % (prefix, name)
r.rr_type = dns.DNS_QTYPE_TXT
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
@@ -190,8 +190,8 @@ class DNSTest(TestCaseInTempDir):
return p
- def check_query_txt(self, prefix, txt_array):
- name = "%s.%s" % (prefix, self.get_dns_domain())
+ def check_query_txt(self, prefix, txt_array, zone=None):
+ name = "%s.%s" % (prefix, zone or self.get_dns_domain())
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
questions = []