summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarming Sam <garming@catalyst.net.nz>2016-01-28 12:54:58 +1300
committerKarolin Seeger <kseeger@samba.org>2016-02-24 11:43:42 +0100
commitad5e885c2b0f58888237b409076113d4b06686db (patch)
tree505ffe7a045ec4d65897045ee98772d818b2a698
parent2b4c7dbc15465911be35a1bdb1f10e396118fb1c (diff)
downloadsamba-ad5e885c2b0f58888237b409076113d4b06686db.tar.gz
CVE-2016-0771: tests/dns: RPC => DNS roundtrip test
Make sure that TXT entries stored via RPC come out the same in DNS. This has one caveat in that adding over RPC in Windows eats slashes, and so fails there. BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128 BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686 Signed-off-by: Garming Sam <garming@catalyst.net.nz> Reviewed-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
-rw-r--r--python/samba/tests/dns.py202
-rwxr-xr-xsource4/selftest/tests.py1
2 files changed, 190 insertions, 13 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py
index 25897a77b1d..3e655c37bc3 100644
--- a/python/samba/tests/dns.py
+++ b/python/samba/tests/dns.py
@@ -23,6 +23,7 @@ import samba.ndr as ndr
from samba import credentials, param
from samba.tests import TestCase
from samba.dcerpc import dns, dnsp, dnsserver
+from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
# This timeout only has relevance when testing against Windows
# Format errors tend to return patchy responses, so a timeout is needed.
@@ -879,7 +880,7 @@ class TestZones(DNSTest):
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
self.zone = "test.lan"
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
@@ -899,7 +900,7 @@ class TestZones(DNSTest):
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
- self.server,
+ self.server_ip,
None,
0,
'ZoneCreate',
@@ -909,7 +910,7 @@ class TestZones(DNSTest):
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
- self.server,
+ self.server_ip,
zone,
0,
'DeleteZoneFromDs',
@@ -957,12 +958,31 @@ class TestRPCRoundtrip(DNSTest):
self.lp = self.get_loadparm()
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
super(TestRPCRoundtrip, self).tearDown()
+ def test_update_add_txt_rpc_to_dns(self):
+ prefix, txt = 'rpctextrec', ['"This is a test"']
+
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_null_padded_txt_record(self):
"test adding records works"
prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
@@ -970,7 +990,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
@@ -980,7 +1000,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
@@ -990,11 +1010,66 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
+ def test_update_add_padding_rpc_to_dns(self):
+ prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
+ prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
+ prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
# Test is incomplete due to strlen against txt records
def test_update_add_null_char_txt_record(self):
"test adding records works"
@@ -1003,7 +1078,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL'])
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL"'))
@@ -1013,11 +1088,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL', 'NULL'])
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
+ def test_update_add_null_char_rpc_to_dns(self):
+ prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, ['NULL'])
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_hex_char_txt_record(self):
"test adding records works"
prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
@@ -1025,11 +1119,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
+ def test_update_add_hex_rpc_to_dns(self):
+ prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_slash_txt_record(self):
"test adding records works"
prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
@@ -1037,11 +1150,33 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
+ # This test fails against Windows as it eliminates slashes in RPC
+ # One typical use for a slash is in records like 'var=value' to
+ # escape '=' characters.
+ def test_update_add_slash_rpc_to_dns(self):
+ prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_two_txt_records(self):
"test adding two txt records works"
prefix, txt = 'textrec2', ['"This is a test"',
@@ -1050,12 +1185,34 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
' "\\"and this is a test, too\\""'))
+ def test_update_add_two_rpc_to_dns(self):
+ prefix, txt = 'textrec2', ['"This is a test"',
+ '"and this is a test, too"']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
+ '"\\"This is a test\\""' +
+ ' "\\"and this is a test, too\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_empty_txt_records(self):
"test adding two txt records works"
prefix, txt = 'emptytextrec', []
@@ -1063,11 +1220,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, ''))
+ def test_update_add_empty_rpc_to_dns(self):
+ prefix, txt = 'rpcemptytextrec', []
+
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
if __name__ == "__main__":
import unittest
unittest.main()
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index e284c2ca1fa..8f4976ca878 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -291,6 +291,7 @@ for f in sorted(os.listdir(os.path.join(samba4srcdir, "../pidl/tests"))):
# DNS tests
planpythontestsuite("fl2003dc:local", "samba.tests.dns")
+
for t in smbtorture4_testsuites("dns_internal."):
plansmbtorture4testsuite(t, "ad_dc_ntvfs:local", '//$SERVER/whavever')