summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarming Sam <garming@catalyst.net.nz>2016-01-27 17:41:44 +1300
committerKarolin Seeger <kseeger@samba.org>2016-02-24 11:43:59 +0100
commit1a97ee31a8702ed7b06d0b07355615e314d29106 (patch)
tree2b871cbd33acef156aa98c83c275ff9930a33f89
parent006551db9d9a3feeadb0ebd31c1d91c766533827 (diff)
downloadsamba-1a97ee31a8702ed7b06d0b07355615e314d29106.tar.gz
CVE-2016-0771: tests/dns: modify tests to check via RPC
This checks that TXT records added over DNS, look the same over RPC. BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128 BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686 Signed-off-by: Garming Sam <garming@catalyst.net.nz> Reviewed-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
-rw-r--r--python/samba/tests/dns.py271
1 files changed, 165 insertions, 106 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py
index e153a2b1b94..784485e9b38 100644
--- a/python/samba/tests/dns.py
+++ b/python/samba/tests/dns.py
@@ -146,6 +146,47 @@ class DNSTest(TestCase):
N+=length
return result
+ def make_txt_update(self, prefix, txt_array):
+ p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+ updates = []
+
+ name = self.get_dns_domain()
+ u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+ updates.append(u)
+ self.finish_name_packet(p, updates)
+
+ updates = []
+ r = dns.res_rec()
+ r.name = "%s.%s" % (prefix, self.get_dns_domain())
+ r.rr_type = dns.DNS_QTYPE_TXT
+ r.rr_class = dns.DNS_QCLASS_IN
+ r.ttl = 900
+ r.length = 0xffff
+ rdata = make_txt_record(txt_array)
+ r.rdata = rdata
+ updates.append(r)
+ p.nscount = len(updates)
+ p.nsrecs = updates
+
+ return p
+
+ def check_query_txt(self, prefix, txt_array):
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+ questions.append(q)
+
+ self.finish_name_packet(p, questions)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.assertEquals(response.ancount, 1)
+ self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
+
+ def assertIsNotNone(self, item):
+ self.assertTrue(item is not None)
+
class TestSimpleQueries(DNSTest):
def test_one_a_query(self):
@@ -415,44 +456,6 @@ class TestDNSUpdates(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
- def make_txt_update(self, prefix, txt_array):
- p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
- updates = []
-
- name = self.get_dns_domain()
- u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
- updates.append(u)
- self.finish_name_packet(p, updates)
-
- updates = []
- r = dns.res_rec()
- r.name = "%s.%s" % (prefix, self.get_dns_domain())
- r.rr_type = dns.DNS_QTYPE_TXT
- r.rr_class = dns.DNS_QCLASS_IN
- r.ttl = 900
- r.length = 0xffff
- rdata = make_txt_record(txt_array)
- r.rdata = rdata
- updates.append(r)
- p.nscount = len(updates)
- p.nsrecs = updates
-
- return p
-
- def check_query_txt(self, prefix, txt_array):
- name = "%s.%s" % (prefix, self.get_dns_domain())
- p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
- questions = []
-
- q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
- questions.append(q)
-
- self.finish_name_packet(p, questions)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.assertEquals(response.ancount, 1)
- self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
-
def test_update_add_txt_record(self):
"test adding records works"
prefix, txt = 'textrec', ['"This is a test"']
@@ -461,74 +464,6 @@ class TestDNSUpdates(DNSTest):
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- def test_update_add_null_padded_txt_record(self):
- "test adding records works"
- prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
- prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
- prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
- # Test is incomplete due to strlen against txt records
- def test_update_add_null_char_txt_record(self):
- "test adding records works"
- prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, ['NULL'])
-
- prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, ['NULL', 'NULL'])
-
- def test_update_add_hex_char_txt_record(self):
- "test adding records works"
- prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
- def test_update_add_slash_txt_record(self):
- "test adding records works"
- prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
- def test_update_add_two_txt_records(self):
- "test adding two txt records works"
- prefix, txt = 'textrec2', ['"This is a test"',
- '"and this is a test, too"']
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
- def test_update_add_empty_txt_records(self):
- "test adding two txt records works"
- prefix, txt = 'emptytextrec', []
- p = self.make_txt_update(prefix, txt)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.check_query_txt(prefix, txt)
-
def test_delete_record(self):
"Test if deleting records works"
@@ -901,6 +836,130 @@ class TestInvalidQueries(DNSTest):
if s is not None:
s.close()
+class TestRPCRoundtrip(DNSTest):
+ def get_credentials(self, lp):
+ creds = credentials.Credentials()
+ creds.guess(lp)
+ creds.set_machine_account(lp)
+ creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+ return creds
+
+ def setUp(self):
+ super(TestRPCRoundtrip, self).setUp()
+ self.lp = self.get_loadparm()
+ self.creds = self.get_credentials(self.lp)
+ self.server = os.getenv("SERVER_IP")
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+ self.lp, self.creds)
+
+ def tearDown(self):
+ super(TestRPCRoundtrip, self).tearDown()
+
+ def test_update_add_null_padded_txt_record(self):
+ "test adding records works"
+ prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
+
+ prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
+
+ prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
+
+ # Test is incomplete due to strlen against txt records
+ def test_update_add_null_char_txt_record(self):
+ "test adding records works"
+ prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, ['NULL'])
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"NULL"'))
+
+ prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, ['NULL', 'NULL'])
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
+
+ def test_update_add_hex_char_txt_record(self):
+ "test adding records works"
+ prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
+
+ def test_update_add_slash_txt_record(self):
+ "test adding records works"
+ prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
+
+ def test_update_add_two_txt_records(self):
+ "test adding two txt records works"
+ prefix, txt = 'textrec2', ['"This is a test"',
+ '"and this is a test, too"']
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
+ ' "\\"and this is a test, too\\""'))
+
+ def test_update_add_empty_txt_records(self):
+ "test adding two txt records works"
+ prefix, txt = 'emptytextrec', []
+ p = self.make_txt_update(prefix, txt)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.check_query_txt(prefix, txt)
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.get_dns_domain(),
+ "%s.%s" % (prefix, self.get_dns_domain()),
+ dnsp.DNS_TYPE_TXT, ''))
+
if __name__ == "__main__":
import unittest
unittest.main()