summaryrefslogtreecommitdiff
path: root/python/samba/tests/dns.py
diff options
context:
space:
mode:
authorSamuel Cabrero <samuelcabrero@kernevil.me>2014-12-16 18:04:13 +0100
committerGarming Sam <garming@samba.org>2014-12-22 05:57:08 +0100
commit336ffb29b50298a0597c15b9f60416adb745bc3d (patch)
tree02d0a9ac9039acd0721cd4ec1d9d7a4183aec413 /python/samba/tests/dns.py
parent4fb29e9347271acd66833d471a84e39a525f4f18 (diff)
downloadsamba-336ffb29b50298a0597c15b9f60416adb745bc3d.tar.gz
dns.py: Test dns server reload zones from DSDB when are created or deleted
Signed-off-by: Samuel Cabrero <samuelcabrero@kernevil.me> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'python/samba/tests/dns.py')
-rw-r--r--python/samba/tests/dns.py78
1 files changed, 78 insertions, 0 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py
index 34edf6b66e6..ac946af4591 100644
--- a/python/samba/tests/dns.py
+++ b/python/samba/tests/dns.py
@@ -21,7 +21,9 @@ import random
import socket
import samba.ndr as ndr
import samba.dcerpc.dns as dns
+from samba import credentials, param
from samba.tests import TestCase
+from samba.dcerpc import dnsp, dnsserver
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
@@ -864,6 +866,82 @@ class TestInvalidQueries(DNSTest):
if s is not None:
s.close()
+class TestZones(DNSTest):
+ def get_loadparm(self):
+ lp = param.LoadParm()
+ lp.load(os.getenv("SMB_CONF_PATH"))
+ return lp
+
+ def get_credentials(self, lp):
+ creds = credentials.Credentials()
+ creds.guess(lp)
+ creds.set_machine_account(lp)
+ creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+ return creds
+
+ def setUp(self):
+ super(TestZones, self).setUp()
+ self.lp = self.get_loadparm()
+ self.creds = self.get_credentials(self.lp)
+ self.server = os.getenv("SERVER_IP")
+ self.zone = "test.lan"
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
+ self.lp, self.creds)
+
+ def create_zone(self, zone):
+ zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+ zone_create.pszZoneName = zone
+ zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+ zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
+ zone_create.fAging = 0
+ zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+ self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ None,
+ 0,
+ 'ZoneCreate',
+ dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+ zone_create)
+
+ def delete_zone(self, zone):
+ self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ zone,
+ 0,
+ 'DeleteZoneFromDs',
+ dnsserver.DNSSRV_TYPEID_NULL,
+ None)
+
+ def test_soa_query(self):
+ zone = "test.lan"
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+ questions.append(q)
+ self.finish_name_packet(p, questions)
+
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 0)
+
+ self.create_zone(zone)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 1)
+ self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
+
+ self.delete_zone(zone)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 0)
+
+
if __name__ == "__main__":
import unittest