summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/samba/tests/dns.py196
-rwxr-xr-xsource4/selftest/tests.py1
2 files changed, 187 insertions, 10 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py
index 784485e9b38..b16ea1d4625 100644
--- a/python/samba/tests/dns.py
+++ b/python/samba/tests/dns.py
@@ -23,6 +23,7 @@ import samba.ndr as ndr
from samba import credentials, param
from samba.tests import TestCase
from samba.dcerpc import dns, dnsp, dnsserver
+from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
@@ -849,12 +850,31 @@ class TestRPCRoundtrip(DNSTest):
self.lp = self.get_loadparm()
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
super(TestRPCRoundtrip, self).tearDown()
+ def test_update_add_txt_rpc_to_dns(self):
+ prefix, txt = 'rpctextrec', ['"This is a test"']
+
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_null_padded_txt_record(self):
"test adding records works"
prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
@@ -862,7 +882,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
@@ -872,7 +892,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
@@ -882,11 +902,66 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
+ def test_update_add_padding_rpc_to_dns(self):
+ prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
+ prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
+ prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
# Test is incomplete due to strlen against txt records
def test_update_add_null_char_txt_record(self):
"test adding records works"
@@ -895,7 +970,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL'])
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL"'))
@@ -905,11 +980,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL', 'NULL'])
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
+ def test_update_add_null_char_rpc_to_dns(self):
+ prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, ['NULL'])
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_hex_char_txt_record(self):
"test adding records works"
prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
@@ -917,11 +1011,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
+ def test_update_add_hex_rpc_to_dns(self):
+ prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_slash_txt_record(self):
"test adding records works"
prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
@@ -929,11 +1042,33 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
+ # This test fails against Windows as it eliminates slashes in RPC
+ # One typical use for a slash is in records like 'var=value' to
+ # escape '=' characters.
+ def test_update_add_slash_rpc_to_dns(self):
+ prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_two_txt_records(self):
"test adding two txt records works"
prefix, txt = 'textrec2', ['"This is a test"',
@@ -942,12 +1077,34 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
' "\\"and this is a test, too\\""'))
+ def test_update_add_two_rpc_to_dns(self):
+ prefix, txt = 'textrec2', ['"This is a test"',
+ '"and this is a test, too"']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
+ '"\\"This is a test\\""' +
+ ' "\\"and this is a test, too\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_empty_txt_records(self):
"test adding two txt records works"
prefix, txt = 'emptytextrec', []
@@ -955,11 +1112,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, ''))
+ def test_update_add_empty_rpc_to_dns(self):
+ prefix, txt = 'rpcemptytextrec', []
+
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
if __name__ == "__main__":
import unittest
unittest.main()
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index 08b59a17de5..933f30b0966 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -284,6 +284,7 @@ for f in sorted(os.listdir(os.path.join(samba4srcdir, "../pidl/tests"))):
# DNS tests
planpythontestsuite("fl2003dc", "samba.tests.dns")
+
for t in smbtorture4_testsuites("dns_internal."):
plansmbtorture4testsuite(t, "dc:local", '//$SERVER/whavever')