summaryrefslogtreecommitdiff
path: root/python/samba/tests/dns.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/tests/dns.py')
-rw-r--r--python/samba/tests/dns.py202
1 files changed, 189 insertions, 13 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py
index 25897a77b1d..3e655c37bc3 100644
--- a/python/samba/tests/dns.py
+++ b/python/samba/tests/dns.py
@@ -23,6 +23,7 @@ import samba.ndr as ndr
from samba import credentials, param
from samba.tests import TestCase
from samba.dcerpc import dns, dnsp, dnsserver
+from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
# This timeout only has relevance when testing against Windows
# Format errors tend to return patchy responses, so a timeout is needed.
@@ -879,7 +880,7 @@ class TestZones(DNSTest):
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
self.zone = "test.lan"
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
@@ -899,7 +900,7 @@ class TestZones(DNSTest):
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
- self.server,
+ self.server_ip,
None,
0,
'ZoneCreate',
@@ -909,7 +910,7 @@ class TestZones(DNSTest):
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
- self.server,
+ self.server_ip,
zone,
0,
'DeleteZoneFromDs',
@@ -957,12 +958,31 @@ class TestRPCRoundtrip(DNSTest):
self.lp = self.get_loadparm()
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
super(TestRPCRoundtrip, self).tearDown()
+ def test_update_add_txt_rpc_to_dns(self):
+ prefix, txt = 'rpctextrec', ['"This is a test"']
+
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_null_padded_txt_record(self):
"test adding records works"
prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
@@ -970,7 +990,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
@@ -980,7 +1000,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
@@ -990,11 +1010,66 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
+ def test_update_add_padding_rpc_to_dns(self):
+ prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
+ prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
+ prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
# Test is incomplete due to strlen against txt records
def test_update_add_null_char_txt_record(self):
"test adding records works"
@@ -1003,7 +1078,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL'])
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL"'))
@@ -1013,11 +1088,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL', 'NULL'])
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
+ def test_update_add_null_char_rpc_to_dns(self):
+ prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, ['NULL'])
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_hex_char_txt_record(self):
"test adding records works"
prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
@@ -1025,11 +1119,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
+ def test_update_add_hex_rpc_to_dns(self):
+ prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_slash_txt_record(self):
"test adding records works"
prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
@@ -1037,11 +1150,33 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
+ # This test fails against Windows as it eliminates slashes in RPC
+ # One typical use for a slash is in records like 'var=value' to
+ # escape '=' characters.
+ def test_update_add_slash_rpc_to_dns(self):
+ prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_two_txt_records(self):
"test adding two txt records works"
prefix, txt = 'textrec2', ['"This is a test"',
@@ -1050,12 +1185,34 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
' "\\"and this is a test, too\\""'))
+ def test_update_add_two_rpc_to_dns(self):
+ prefix, txt = 'textrec2', ['"This is a test"',
+ '"and this is a test, too"']
+ prefix = 'rpc' + prefix
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
+ '"\\"This is a test\\""' +
+ ' "\\"and this is a test, too\\""')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
def test_update_add_empty_txt_records(self):
"test adding two txt records works"
prefix, txt = 'emptytextrec', []
@@ -1063,11 +1220,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
- self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+ self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, ''))
+ def test_update_add_empty_rpc_to_dns(self):
+ prefix, txt = 'rpcemptytextrec', []
+
+ name = "%s.%s" % (prefix, self.get_dns_domain())
+
+ rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, add_rec_buf, None)
+
+ self.check_query_txt(prefix, txt)
+ finally:
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip, self.get_dns_domain(),
+ name, None, add_rec_buf)
+
if __name__ == "__main__":
import unittest
unittest.main()