summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorBob Campbell <bobcampbell@catalyst.net.nz>2016-11-30 09:19:31 +1300
committerGarming Sam <garming@samba.org>2016-12-12 05:00:18 +0100
commit64a382576542d35b4abd64ae78f54713d9d621f9 (patch)
tree79644f9420205357e1afb960c9276aa38d78b7b8 /python
parentb9c99a3483d0de5508ec41698ae992764b2b9abe (diff)
downloadsamba-64a382576542d35b4abd64ae78f54713d9d621f9.tar.gz
python/tests: expand tests for dns server over rpc
Signed-off-by: Bob Campbell <bobcampbell@catalyst.net.nz> Reviewed-by: Garming Sam <garming@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/dcerpc/dnsserver.py724
1 files changed, 653 insertions, 71 deletions
diff --git a/python/samba/tests/dcerpc/dnsserver.py b/python/samba/tests/dcerpc/dnsserver.py
index 7229877a313..ae19c24520c 100644
--- a/python/samba/tests/dcerpc/dnsserver.py
+++ b/python/samba/tests/dcerpc/dnsserver.py
@@ -17,20 +17,642 @@
"""Tests for samba.dcerpc.dnsserver"""
+import os
+import ldb
+
+from samba.auth import system_session
+from samba.samdb import SamDB
+from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import dnsp, dnsserver
from samba.tests import RpcInterfaceTestCase, env_get_var_value
-from samba.netcmd.dns import ARecord, NSRecord
+from samba.netcmd.dns import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SRVRecord, TXTRecord
class DnsserverTests(RpcInterfaceTestCase):
+ @classmethod
+ def setUpClass(cls):
+ good_dns = ["SAMDOM.EXAMPLE.COM",
+ "1.EXAMPLE.COM",
+ "%sEXAMPLE.COM" % ("1."*100),
+ "EXAMPLE",
+ "\n.COM",
+ "!@#$%^&*()_",
+ "HIGH\xFFBYTE",
+ "@.EXAMPLE.COM",
+ "."]
+ bad_dns = ["...",
+ ".EXAMPLE.COM",
+ ".EXAMPLE.",
+ "",
+ "SAMDOM..EXAMPLE.COM"]
+
+ good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
+ bad_mx = []
+
+ good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
+ bad_srv = []
+
+ for bad_dn in bad_dns:
+ bad_mx.append("%s 1" % bad_dn)
+ bad_srv.append("%s 0 0 0" % bad_dn)
+ for good_dn in good_dns:
+ good_mx.append("%s 1" % good_dn)
+ good_srv.append("%s 0 0 0" % good_dn)
+
+ cls.good_records = {
+ "A": ["192.168.0.1",
+ "255.255.255.255"],
+ "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
+ "0000:0000:0000:0000:0000:0000:0000:0000",
+ "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
+ "1234:1234:1234::",
+ "1234:1234:1234:1234:1234::",
+ "1234:5678:9ABC:DEF0::",
+ "0000:0000::0000",
+ "1234::5678:9ABC:0000:0000:0000:0000",
+ "::1",
+ "::",
+ "1:1:1:1:1:1:1:1"],
+ "PTR": good_dns,
+ "CNAME": good_dns,
+ "NS": good_dns,
+ "MX": good_mx,
+ "SRV": good_srv,
+ "TXT": ["text", "", "@#!", "\n"]
+ }
+
+ cls.bad_records = {
+ "A": ["192.168.0.500",
+ "255.255.255.255/32"],
+ "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
+ "0000:0000:0000:0000:0000:0000:0000:0000/1",
+ "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
+ "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
+ "1234:5678:9ABC:DEF0:1234:5678:9ABC",
+ "1111::1111::1111"],
+ "PTR": bad_dns,
+ "CNAME": bad_dns,
+ "NS": bad_dns,
+ "MX": bad_mx,
+ "SRV": bad_srv
+ }
+
+ # Because we use uint16_t for these numbers, we can't
+ # actually create these records.
+ invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
+ "SAMDOM.EXAMPLE.COM 65536",
+ "%s 1" % "A"*256]
+ invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
+ "SAMDOM.EXAMPLE.COM 0 0 65536",
+ "SAMDOM.EXAMPLE.COM 65536 0 0"]
+ cls.invalid_records = {
+ "MX": invalid_mx,
+ "SRV": invalid_srv
+ }
+
def setUp(self):
super(DnsserverTests, self).setUp()
- self.server = env_get_var_value("SERVER_IP")
+ self.server = os.environ["DC_SERVER"]
self.zone = env_get_var_value("REALM").lower()
self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
self.get_loadparm(),
self.get_credentials())
+ self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
+ lp = self.get_loadparm(),
+ session_info=system_session(),
+ credentials=self.get_credentials())
+
+
+ self.custom_zone = "zone"
+ zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+ zone_create_info.pszZoneName = self.custom_zone
+ zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+ zone_create_info.fAging = 0
+ zone_create_info.fDsIntegrated = 1
+ zone_create_info.fLoadExisting = 1
+ zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+
+ self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ None,
+ 0,
+ 'ZoneCreate',
+ dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+ zone_create_info)
+
+ def tearDown(self):
+ self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ self.custom_zone,
+ 0,
+ 'DeleteZoneFromDs',
+ dnsserver.DNSSRV_TYPEID_NULL,
+ None)
+ super(DnsserverTests, self).tearDown()
+
+ # This test fails against Samba (but passes against Windows),
+ # because Samba does not return the record when we enum records.
+ # Records can be given DNS_RANK_NONE when the zone they are in
+ # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
+ # deleted, however, we do not consider this urgent to fix and
+ # so this test is a knownfail.
+ def test_rank_none(self):
+ """
+ See what happens when we set a record's rank to
+ DNS_RANK_NONE.
+ """
+
+ record_str = "192.168.50.50"
+ record_type_str = "A"
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
+
+ dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
+ record.rank = 0 # DNS_RANK_NONE
+ res = self.samdb.dns_replace_by_dn(dn, [record])
+ if res is not None:
+ self.fail("Unable to update dns record to have DNS_RANK_NONE.")
+
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
+
+ def test_dns_tombstoned(self):
+ """
+ See what happens when we set a record to be tombstoned.
+ """
+
+ record_str = "192.168.50.50"
+ record_type_str = "A"
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
+
+ dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
+ record.wType = dnsp.DNS_TYPE_TOMBSTONE
+ res = self.samdb.dns_replace_by_dn(dn, [record])
+ if res is not None:
+ self.fail("Unable to update dns record to be tombstoned.")
+
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
+
+ def get_record_from_db(self, zone_name, record_name):
+ """
+ Returns (dn of record, record)
+ """
+
+ zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
+ expression="(objectClass=dnsZone)",
+ attrs=["cn"])
+
+ zone_dn = None
+ for zone in zones:
+ if zone_name in str(zone.dn):
+ zone_dn = zone.dn
+ break
+
+ if zone_dn is None:
+ raise AssertionError("Couldn't find zone '%s'." % zone_name)
+
+ records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="(objectClass=dnsNode)",
+ attrs=["dnsRecord"])
+
+ for old_packed_record in records:
+ if record_name in str(old_packed_record.dn):
+ return (old_packed_record.dn, ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0]))
+
+ def test_duplicate_matching(self):
+ """
+ Make sure that records which should be distinct from each other or duplicate
+ to each other behave as expected.
+ """
+
+ distinct_dns = [("SAMDOM.EXAMPLE.COM",
+ "SAMDOM.EXAMPLE.CO",
+ "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
+ duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
+ ("EXAMPLE.", "EXAMPLE")]
+
+ # Every tuple has entries which should be considered duplicate to one another.
+ duplicates = {
+ "AAAA": [("AAAA::", "aaaa::"),
+ ("AAAA::", "AAAA:0000::"),
+ ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
+ ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
+ ("0123::", "123::"),
+ ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
+ }
+
+ # Every tuple has entries which should be considered distinct from one another.
+ distinct = {
+ "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
+ "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
+ ("1000::", "::1000"),
+ ("::1", "::11", "::1111"),
+ ("1234::", "0234::")],
+ "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
+ "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
+ "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
+ "TXT": [("A RECORD", "B RECORD", "a record")]
+ }
+
+ for record_type_str in ("PTR", "CNAME", "NS"):
+ distinct[record_type_str] = distinct_dns
+ duplicates[record_type_str] = duplicate_dns
+
+ for record_type_str in duplicates:
+ for duplicate_tuple in duplicates[record_type_str]:
+ # Attempt to add duplicates and make sure that all after the first fails
+ self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
+ for record in duplicate_tuple:
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
+
+ # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
+ for record in duplicate_tuple:
+ self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
+
+ for record_type_str in distinct:
+ for distinct_tuple in distinct[record_type_str]:
+ # Attempt to add distinct and make sure that they all succeed within a tuple
+ i = 0
+ for record in distinct_tuple:
+ i = i + 1
+ try:
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record)
+ # All records should have been added.
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
+ except AssertionError as e:
+ raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
+ "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
+ for record in distinct_tuple:
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
+ # CNAMEs should not have been added, since they conflict.
+ if record_type_str == 'CNAME':
+ continue
+
+ # Add the first distinct and attempt to remove all of the others, making sure this fails
+ # Windows fails this test. This is probably due to weird tombstoning behavior.
+ self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
+ for record in distinct_tuple:
+ if record == distinct_tuple[0]:
+ continue
+ try:
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
+ except AssertionError as e:
+ raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
+ % (distinct_tuple[0], record, e))
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
+
+ def test_accept_valid_commands(self):
+ """
+ Make sure that we can add, update and delete a variety
+ of valid records.
+ """
+ for record_type_str in self.good_records:
+ for record_str in self.good_records[record_type_str]:
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
+
+ def test_reject_invalid_commands(self):
+ """
+ Make sure that we can't add a variety of invalid records,
+ and that we can't update valid records to invalid ones.
+ """
+ num_failures = 0
+ for record_type_str in self.bad_records:
+ for record_str in self.bad_records[record_type_str]:
+ # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
+ # it. Since it shouldn't exist, these should fail too.
+ try:
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
+ except AssertionError as e:
+ print e
+ num_failures = num_failures + 1
+
+ # Also try to update valid records to invalid ones, making sure this fails
+ for record_type_str in self.bad_records:
+ for record_str in self.bad_records[record_type_str]:
+ good_record_str = self.good_records[record_type_str][0]
+ self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
+ try:
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
+ except AssertionError as e:
+ print e
+ num_failures = num_failures + 1
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
+
+ self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
+
+ def test_add_duplicate_different_type(self):
+ """
+ Attempt to add some values which have the same name as
+ existing ones, just a different type.
+ """
+ num_failures = 0
+ for record_type_str_1 in self.good_records:
+ record1 = self.good_records[record_type_str_1][0]
+ self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
+ for record_type_str_2 in self.good_records:
+ if record_type_str_1 == record_type_str_2:
+ continue
+
+ record2 = self.good_records[record_type_str_2][0]
+
+ has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
+ has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
+ has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
+ has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
+ has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
+ has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
+ has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
+
+ # If we attempt to add any record except A or AAAA when we already have an NS record,
+ # the add should fail.
+ add_error_ok = False
+ if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
+ add_error_ok = True
+ # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
+ if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
+ add_error_ok = True
+ # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
+ # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
+ if has_cname and (has_a or has_aaaa or has_srv or has_txt):
+ add_error_ok = True
+
+ try:
+ self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
+ if add_error_ok:
+ num_failures = num_failures + 1
+ print("Expected error when adding %s while a %s existed."
+ % (record_type_str_2, record_type_str_1))
+ except AssertionError as e:
+ if not add_error_ok:
+ num_failures = num_failures + 1
+ print("Didn't expect error when adding %s while a %s existed."
+ % (record_type_str_2, record_type_str_1))
+
+ if not add_error_ok:
+ # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
+ expected_num_type_1 = 1
+ expected_num_type_2 = 1
+
+ # If we have an MX record, a PTR record should replace it when added.
+ # If we have a PTR record, an MX record should replace it when added.
+ if has_ptr and has_mx:
+ expected_num_type_1 = 0
+
+ # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
+ if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
+ expected_num_type_1 = 0
+
+ if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
+ expected_num_type_2 = 0
+
+ try:
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ print("Expected %s %s records after adding a %s record and a %s record already existed."
+ % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
+ try:
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ print("Expected %s %s records after adding a %s record and a %s record already existed."
+ % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
+
+ try:
+ self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
+ except AssertionError as e:
+ pass
+
+ self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
+
+ self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
+
+ # Windows fails this test in the same way we do.
+ def _test_cname(self):
+ """
+ Test some special properties of CNAME records.
+ """
+
+ # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
+ cname_record = self.good_records["CNAME"][1]
+ self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
+
+ for record_type_str in self.good_records:
+ other_record = self.good_records[record_type_str][0]
+ self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
+
+ # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
+ mx_record = "testrecord 1"
+ ns_record = "testrecord"
+
+ self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
+ self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
+
+ self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
+
+ def test_add_duplicate_value(self):
+ """
+ Make sure that we can't add duplicate values of any type.
+ """
+ for record_type_str in self.good_records:
+ record = self.good_records[record_type_str][0]
+
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record)
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
+
+ def test_add_similar_value(self):
+ """
+ Attempt to add values with the same name and type in the same
+ zone. This should work, and should result in both values
+ existing (except with some types).
+ """
+ for record_type_str in self.good_records:
+ for i in range(1, len(self.good_records[record_type_str])):
+ record1 = self.good_records[record_type_str][i-1]
+ record2 = self.good_records[record_type_str][i]
+
+ if record_type_str == 'CNAME':
+ continue
+ # We expect CNAME records to override one another, as
+ # an alias can only map to one CNAME record.
+ # Also, on Windows, when the empty string is added and
+ # another record is added afterwards, the empty string
+ # will be silently overriden by the new one, so it
+ # fails this test for the empty string.
+ expected_num = 1 if record_type_str == 'CNAME' else 2
+
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
+ self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
+ self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
+ self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
+
+ def assert_record(self, zone, name, record_type_str, expected_record_str,
+ assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
+ """
+ Asserts whether or not the given record with the given type exists in the
+ given zone.
+ """
+ try:
+ _, result = self.query_records(zone, name, record_type_str)
+ except RuntimeError as e:
+ if assertion:
+ raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
+ % (expected_record_str, record_type_str))
+ else:
+ return
+
+ found = False
+ for record in result.rec[0].records:
+ if record.data == expected_record_str:
+ found = True
+ break
+
+ if found and not assertion:
+ raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
+ elif not found and assertion:
+ raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
+
+ def assert_num_records(self, zone, name, record_type_str, expected_num=1,
+ client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
+ """
+ Asserts that there are a given amount of records with the given type in
+ the given zone.
+ """
+ try:
+ _, result = self.query_records(zone, name, record_type_str)
+ num_results = len(result.rec[0].records)
+ if not num_results == expected_num:
+ raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
+ % (num_results, record_type_str, name, expected_num))
+ except RuntimeError:
+ if not expected_num == 0:
+ raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
+ % (record_type_str, name, expected_num))
+
+ def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
+ return self.conn.DnssrvEnumRecords2(client_version,
+ 0,
+ self.server,
+ zone,
+ name,
+ None,
+ self.record_type_int(record_type_str),
+ dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
+ None,
+ None)
+
+ def record_obj_from_str(self, record_type_str, record_str):
+ if record_type_str == 'A':
+ return ARecord(record_str)
+ elif record_type_str == 'AAAA':
+ return AAAARecord(record_str)
+ elif record_type_str == 'PTR':
+ return PTRRecord(record_str)
+ elif record_type_str == 'CNAME':
+ return CNameRecord(record_str)
+ elif record_type_str == 'NS':
+ return NSRecord(record_str)
+ elif record_type_str == 'MX':
+ split = record_str.split(' ')
+ return MXRecord(split[0], int(split[1]))
+ elif record_type_str == 'SRV':
+ split = record_str.split(' ')
+ target = split[0]
+ port = int(split[1])
+ priority = int(split[2])
+ weight = int(split[3])
+ return SRVRecord(target, port, priority, weight)
+ elif record_type_str == 'TXT':
+ return TXTRecord(record_str)
+
+ def record_type_int(self, record_type_str):
+ if record_type_str == 'A':
+ return dnsp.DNS_TYPE_A
+ elif record_type_str == 'AAAA':
+ return dnsp.DNS_TYPE_AAAA
+ elif record_type_str == 'PTR':
+ return dnsp.DNS_TYPE_PTR
+ elif record_type_str == 'CNAME':
+ return dnsp.DNS_TYPE_CNAME
+ elif record_type_str == 'NS':
+ return dnsp.DNS_TYPE_NS
+ elif record_type_str == 'MX':
+ return dnsp.DNS_TYPE_MX
+ elif record_type_str == 'SRV':
+ return dnsp.DNS_TYPE_SRV
+ elif record_type_str == 'TXT':
+ return dnsp.DNS_TYPE_TXT
+
+ def add_record(self, zone, name, record_type_str, record_str,
+ assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
+ """
+ Attempts to add a map from the given name to a record of the given type,
+ in the given zone.
+ Also asserts whether or not the add was successful.
+ This can also update existing records if they have the same name.
+ """
+ record = self.record_obj_from_str(record_type_str, record_str)
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = record
+
+ try:
+ self.conn.DnssrvUpdateRecord2(client_version,
+ 0,
+ self.server,
+ zone,
+ name,
+ add_rec_buf,
+ None)
+ if not assertion:
+ raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
+ % (record_str, record_type_str))
+ except RuntimeError as e:
+ if assertion:
+ raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
+ % (record_str, record_type_str, str(e)))
+
+ def delete_record(self, zone, name, record_type_str, record_str,
+ assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
+ """
+ Attempts to delete a record with the given name, record and record type
+ from the given zone.
+ Also asserts whether or not the deletion was successful.
+ """
+ record = self.record_obj_from_str(record_type_str, record_str)
+ del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ del_rec_buf.rec = record
+
+ try:
+ self.conn.DnssrvUpdateRecord2(client_version,
+ 0,
+ self.server,
+ zone,
+ name,
+ None,
+ del_rec_buf)
+ if not assertion:
+ raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
+ except RuntimeError as e:
+ if assertion:
+ raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
+
def test_query2(self):
typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
0,
@@ -76,13 +698,13 @@ class DnsserverTests(RpcInterfaceTestCase):
request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
- typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
- 0,
- self.server,
- None,
- 'EnumZones',
- dnsserver.DNSSRV_TYPEID_DWORD,
- request_filter)
+ _, zones = self.conn.DnssrvComplexOperation2(client_version,
+ 0,
+ self.server,
+ None,
+ 'EnumZones',
+ dnsserver.DNSSRV_TYPEID_DWORD,
+ request_filter)
self.assertEquals(1, zones.dwZoneCount)
# Delete zone
@@ -109,6 +731,7 @@ class DnsserverTests(RpcInterfaceTestCase):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
+
typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
0,
self.server,
@@ -117,7 +740,7 @@ class DnsserverTests(RpcInterfaceTestCase):
dnsserver.DNSSRV_TYPEID_DWORD,
request_filter)
self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
- self.assertEquals(2, zones.dwZoneCount)
+ self.assertEquals(3, zones.dwZoneCount)
request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
@@ -131,25 +754,23 @@ class DnsserverTests(RpcInterfaceTestCase):
self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
self.assertEquals(0, zones.dwZoneCount)
-
def test_enumrecords2(self):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
record_type = dnsp.DNS_TYPE_NS
select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
- buflen, roothints = self.conn.DnssrvEnumRecords2(client_version,
- 0,
- self.server,
- '..RootHints',
- '.',
- None,
- record_type,
- select_flags,
- None,
- None)
+ _, roothints = self.conn.DnssrvEnumRecords2(client_version,
+ 0,
+ self.server,
+ '..RootHints',
+ '.',
+ None,
+ record_type,
+ select_flags,
+ None,
+ None)
self.assertEquals(14, roothints.count) # 1 NS + 13 A records (a-m)
-
def test_updaterecords2(self):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
record_type = dnsp.DNS_TYPE_A
@@ -170,16 +791,16 @@ class DnsserverTests(RpcInterfaceTestCase):
add_rec_buf,
None)
- buflen, result = self.conn.DnssrvEnumRecords2(client_version,
- 0,
- self.server,
- self.zone,
- name,
- None,
- record_type,
- select_flags,
- None,
- None)
+ _, result = self.conn.DnssrvEnumRecords2(client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ None,
+ record_type,
+ select_flags,
+ None,
+ None)
self.assertEquals(1, result.count)
self.assertEquals(1, result.rec[0].wRecordCount)
self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
@@ -235,42 +856,3 @@ class DnsserverTests(RpcInterfaceTestCase):
select_flags,
None,
None)
-
- def test_updaterecords2_soa(self):
- client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
- record_type = dnsp.DNS_TYPE_NS
- select_flags = (dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA |
- dnsserver.DNS_RPC_VIEW_NO_CHILDREN)
-
- nameserver = 'ns.example.local'
- rec = NSRecord(nameserver)
-
- # Add record
- add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
- add_rec_buf.rec = rec
- self.conn.DnssrvUpdateRecord2(client_version,
- 0,
- self.server,
- self.zone,
- '.',
- add_rec_buf,
- None)
-
- buflen, result = self.conn.DnssrvEnumRecords2(client_version,
- 0,
- self.server,
- self.zone,
- '@',
- None,
- record_type,
- select_flags,
- None,
- None)
- self.assertEquals(1, result.count)
- self.assertEquals(2, result.rec[0].wRecordCount)
- match = False
- for i in range(2):
- self.assertEquals(dnsp.DNS_TYPE_NS, result.rec[0].records[i].wType)
- if result.rec[0].records[i].data.str.rstrip('.') == nameserver:
- match = True
- self.assertEquals(match, True)