diff options
author | Samuel Cabrero <samuelcabrero@kernevil.me> | 2014-12-16 18:04:13 +0100 |
---|---|---|
committer | Garming Sam <garming@samba.org> | 2014-12-22 05:57:08 +0100 |
commit | 336ffb29b50298a0597c15b9f60416adb745bc3d (patch) | |
tree | 02d0a9ac9039acd0721cd4ec1d9d7a4183aec413 /python/samba/tests/dns.py | |
parent | 4fb29e9347271acd66833d471a84e39a525f4f18 (diff) | |
download | samba-336ffb29b50298a0597c15b9f60416adb745bc3d.tar.gz |
dns.py: Test dns server reload zones from DSDB when are created or deleted
Signed-off-by: Samuel Cabrero <samuelcabrero@kernevil.me>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'python/samba/tests/dns.py')
-rw-r--r-- | python/samba/tests/dns.py | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py index 34edf6b66e6..ac946af4591 100644 --- a/python/samba/tests/dns.py +++ b/python/samba/tests/dns.py @@ -21,7 +21,9 @@ import random import socket import samba.ndr as ndr import samba.dcerpc.dns as dns +from samba import credentials, param from samba.tests import TestCase +from samba.dcerpc import dnsp, dnsserver FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) @@ -864,6 +866,82 @@ class TestInvalidQueries(DNSTest): if s is not None: s.close() +class TestZones(DNSTest): + def get_loadparm(self): + lp = param.LoadParm() + lp.load(os.getenv("SMB_CONF_PATH")) + return lp + + def get_credentials(self, lp): + creds = credentials.Credentials() + creds.guess(lp) + creds.set_machine_account(lp) + creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE) + return creds + + def setUp(self): + super(TestZones, self).setUp() + self.lp = self.get_loadparm() + self.creds = self.get_credentials(self.lp) + self.server = os.getenv("SERVER_IP") + self.zone = "test.lan" + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server), + self.lp, self.creds) + + def create_zone(self, zone): + zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create.pszZoneName = zone + zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE + zone_create.fAging = 0 + zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create) + + def delete_zone(self, zone): + self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + def test_soa_query(self): + zone = "test.lan" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 0) + + self.create_zone(zone) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA) + + self.delete_zone(zone) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 0) + + if __name__ == "__main__": import unittest |