diff options
author | Andrew Bartlett <abartlet@samba.org> | 2017-06-08 15:25:23 +1200 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2017-06-10 21:48:22 +0200 |
commit | dfe739a252e994c6091aea0c6220134ed6fa2f72 (patch) | |
tree | 6e23ceb64ea310b795c4e5a9ef1b6ff0e3cc1b4e /python | |
parent | e36d90810654b69094662ab9e49f417bc8951496 (diff) | |
download | samba-dfe739a252e994c6091aea0c6220134ed6fa2f72.tar.gz |
selftest: Add test confirming join-created DNS entries can be modified as the DC
This ensures that samba_dnsupdate can run in the long term against the new DNS entries
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'python')
-rw-r--r-- | python/samba/tests/join.py | 74 |
1 files changed, 68 insertions, 6 deletions
diff --git a/python/samba/tests/join.py b/python/samba/tests/join.py index f18c9fd95ce..1f9fab1d72a 100644 --- a/python/samba/tests/join.py +++ b/python/samba/tests/join.py @@ -20,9 +20,10 @@ import samba import sys import shutil import os -from samba.tests.dns_base import DNSTest +from samba.tests.dns_base import DNSTKeyTest from samba.join import dc_join from samba.dcerpc import drsuapi, misc, dns +from samba.credentials import Credentials def get_logger(name="subunit"): """Get a logger object.""" @@ -31,11 +32,11 @@ def get_logger(name="subunit"): logger.addHandler(logging.StreamHandler(sys.stderr)) return logger -class JoinTestCase(DNSTest): +class JoinTestCase(DNSTKeyTest): def setUp(self): - super(JoinTestCase, self).setUp() self.server = samba.tests.env_get_var_value("SERVER") self.server_ip = samba.tests.env_get_var_value("SERVER_IP") + super(JoinTestCase, self).setUp() self.lp = samba.tests.env_loadparm() self.creds = self.get_credentials() self.netbios_name = "jointest1" @@ -58,6 +59,8 @@ class JoinTestCase(DNSTest): self.join_ctx.force_all_ips = True + self.join_ctx.do_join() + def tearDown(self): try: paths = self.join_ctx.paths @@ -76,9 +79,7 @@ class JoinTestCase(DNSTest): super(JoinTestCase, self).tearDown() - def test_join(self): - - self.join_ctx.do_join() + def test_join_makes_records(self): "create a query packet containing one query record via TCP" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) @@ -111,3 +112,64 @@ class JoinTestCase(DNSTest): self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) self.assertEquals(response.answers[0].rdata, self.join_ctx.dnshostname) self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A) + + + def test_join_records_can_update(self): + dc_creds = Credentials() + dc_creds.guess(self.join_ctx.lp) + dc_creds.set_machine_account(self.join_ctx.lp) + + self.tkey_trans(creds=dc_creds) + + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + q = self.make_name_question(self.join_ctx.dnsdomain, + dns.DNS_QTYPE_SOA, + dns.DNS_QCLASS_IN) + questions = [] + questions.append(q) + self.finish_name_packet(p, questions) + + updates = [] + # Delete the old expected IPs + IPs = samba.interface_ips(self.lp) + for IP in IPs[1:]: + if ":" in IP: + r = dns.res_rec() + r.name = self.join_ctx.dnshostname + r.rr_type = dns.DNS_QTYPE_AAAA + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + rdata = IP + else: + r = dns.res_rec() + r.name = self.join_ctx.dnshostname + r.rr_type = dns.DNS_QTYPE_A + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + rdata = IP + + r.rdata = rdata + updates.append(r) + + p.nscount = len(updates) + p.nsrecs = updates + + mac = self.sign_packet(p, self.key_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = self.join_ctx.dnshostname + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) |