summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAndrew Bartlett <abartlet@samba.org>2017-06-08 15:25:23 +1200
committerAndrew Bartlett <abartlet@samba.org>2017-06-10 21:48:22 +0200
commitdfe739a252e994c6091aea0c6220134ed6fa2f72 (patch)
tree6e23ceb64ea310b795c4e5a9ef1b6ff0e3cc1b4e /python
parente36d90810654b69094662ab9e49f417bc8951496 (diff)
downloadsamba-dfe739a252e994c6091aea0c6220134ed6fa2f72.tar.gz
selftest: Add test confirming join-created DNS entries can be modified as the DC
This ensures that samba_dnsupdate can run in the long term against the new DNS entries Signed-off-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/join.py74
1 files changed, 68 insertions, 6 deletions
diff --git a/python/samba/tests/join.py b/python/samba/tests/join.py
index f18c9fd95ce..1f9fab1d72a 100644
--- a/python/samba/tests/join.py
+++ b/python/samba/tests/join.py
@@ -20,9 +20,10 @@ import samba
import sys
import shutil
import os
-from samba.tests.dns_base import DNSTest
+from samba.tests.dns_base import DNSTKeyTest
from samba.join import dc_join
from samba.dcerpc import drsuapi, misc, dns
+from samba.credentials import Credentials
def get_logger(name="subunit"):
"""Get a logger object."""
@@ -31,11 +32,11 @@ def get_logger(name="subunit"):
logger.addHandler(logging.StreamHandler(sys.stderr))
return logger
-class JoinTestCase(DNSTest):
+class JoinTestCase(DNSTKeyTest):
def setUp(self):
- super(JoinTestCase, self).setUp()
self.server = samba.tests.env_get_var_value("SERVER")
self.server_ip = samba.tests.env_get_var_value("SERVER_IP")
+ super(JoinTestCase, self).setUp()
self.lp = samba.tests.env_loadparm()
self.creds = self.get_credentials()
self.netbios_name = "jointest1"
@@ -58,6 +59,8 @@ class JoinTestCase(DNSTest):
self.join_ctx.force_all_ips = True
+ self.join_ctx.do_join()
+
def tearDown(self):
try:
paths = self.join_ctx.paths
@@ -76,9 +79,7 @@ class JoinTestCase(DNSTest):
super(JoinTestCase, self).tearDown()
- def test_join(self):
-
- self.join_ctx.do_join()
+ def test_join_makes_records(self):
"create a query packet containing one query record via TCP"
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
@@ -111,3 +112,64 @@ class JoinTestCase(DNSTest):
self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
self.assertEquals(response.answers[0].rdata, self.join_ctx.dnshostname)
self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
+
+
+ def test_join_records_can_update(self):
+ dc_creds = Credentials()
+ dc_creds.guess(self.join_ctx.lp)
+ dc_creds.set_machine_account(self.join_ctx.lp)
+
+ self.tkey_trans(creds=dc_creds)
+
+ p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+ q = self.make_name_question(self.join_ctx.dnsdomain,
+ dns.DNS_QTYPE_SOA,
+ dns.DNS_QCLASS_IN)
+ questions = []
+ questions.append(q)
+ self.finish_name_packet(p, questions)
+
+ updates = []
+ # Delete the old expected IPs
+ IPs = samba.interface_ips(self.lp)
+ for IP in IPs[1:]:
+ if ":" in IP:
+ r = dns.res_rec()
+ r.name = self.join_ctx.dnshostname
+ r.rr_type = dns.DNS_QTYPE_AAAA
+ r.rr_class = dns.DNS_QCLASS_NONE
+ r.ttl = 0
+ r.length = 0xffff
+ rdata = IP
+ else:
+ r = dns.res_rec()
+ r.name = self.join_ctx.dnshostname
+ r.rr_type = dns.DNS_QTYPE_A
+ r.rr_class = dns.DNS_QCLASS_NONE
+ r.ttl = 0
+ r.length = 0xffff
+ rdata = IP
+
+ r.rdata = rdata
+ updates.append(r)
+
+ p.nscount = len(updates)
+ p.nsrecs = updates
+
+ mac = self.sign_packet(p, self.key_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ name = self.join_ctx.dnshostname
+ q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
+ questions.append(q)
+
+ self.finish_name_packet(p, questions)
+ (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 1)