summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2021-12-03 11:46:44 -0800
committerGitHub <noreply@github.com>2021-12-03 11:46:44 -0800
commitf851f31d3558fc388722b34548d11e6fb1d060f2 (patch)
treea5dc06ed4c78470b170dcde66d254c6f1261060e
parentebceff4e86003fb4f44256011221228025ab3db0 (diff)
parentc46a91d5926e7ddaa32a466572c4b87dea46aa69 (diff)
downloaddnspython-f851f31d3558fc388722b34548d11e6fb1d060f2.tar.gz
Merge pull request #731 from rthalley/cname-and-other-data
First draft of CNAME and other data handling in zones.
-rw-r--r--dns/node.py137
-rw-r--r--dns/transaction.py97
-rw-r--r--dns/zone.py6
-rw-r--r--dns/zonefile.py35
-rw-r--r--doc/whatsnew.rst10
-rw-r--r--tests/test_zone.py153
6 files changed, 416 insertions, 22 deletions
diff --git a/dns/node.py b/dns/node.py
index 68c1526..63ce008 100644
--- a/dns/node.py
+++ b/dns/node.py
@@ -17,16 +17,69 @@
"""DNS nodes. A node is a set of rdatasets."""
+import enum
import io
+import dns.immutable
import dns.rdataset
import dns.rdatatype
import dns.renderer
+_cname_types = {
+ dns.rdatatype.CNAME,
+}
+
+# "neutral" types can coexist with a CNAME and thus are not "other data"
+_neutral_types = {
+ dns.rdatatype.NSEC, # RFC 4035 section 2.5
+ dns.rdatatype.NSEC3, # This is not likely to happen, but not impossible!
+ dns.rdatatype.KEY, # RFC 4035 section 2.5, RFC 3007
+}
+
+def _matches_type_or_its_signature(rdtypes, rdtype, covers):
+ return rdtype in rdtypes or \
+ (rdtype == dns.rdatatype.RRSIG and covers in rdtypes)
+
+
+@enum.unique
+class NodeKind(enum.Enum):
+ """Rdatasets in nodes
+ """
+ REGULAR = 0 # a.k.a "other data"
+ NEUTRAL = 1
+ CNAME = 2
+
+ @classmethod
+ def classify(cls, rdtype, covers):
+ if _matches_type_or_its_signature(_cname_types, rdtype, covers):
+ return NodeKind.CNAME
+ elif _matches_type_or_its_signature(_neutral_types, rdtype, covers):
+ return NodeKind.NEUTRAL
+ else:
+ return NodeKind.REGULAR
+
+ @classmethod
+ def classify_rdataset(cls, rdataset):
+ return cls.classify(rdataset.rdtype, rdataset.covers)
+
+
class Node:
- """A Node is a set of rdatasets."""
+ """A Node is a set of rdatasets.
+
+ A node is either a CNAME node or an "other data" node. A CNAME
+ node contains only CNAME, KEY, NSEC, and NSEC3 rdatasets along with their
+ covering RRSIG rdatasets. An "other data" node contains any
+ rdataset other than a CNAME or RRSIG(CNAME) rdataset. When
+ changes are made to a node, the CNAME or "other data" state is
+ always consistent with the update, i.e. the most recent change
+ wins. For example, if you have a node which contains a CNAME
+ rdataset, and then add an MX rdataset to it, then the CNAME
+ rdataset will be deleted. Likewise if you have a node containing
+ an MX rdataset and add a CNAME rdataset, the MX rdataset will be
+ deleted.
+ """
__slots__ = ['rdatasets']
@@ -78,6 +131,30 @@ class Node:
def __iter__(self):
return iter(self.rdatasets)
+ def _append_rdataset(self, rdataset):
+ """Append rdataset to the node with special handling for CNAME and
+ other data conditions.
+
+ Specifically, if the rdataset being appended has ``NodeKind.CNAME``,
+ then all rdatasets other than KEY, NSEC, NSEC3, and their covering
+ RRSIGs are deleted. If the rdataset being appended has
+ ``NodeKind.REGULAR`` then CNAME and RRSIG(CNAME) are deleted.
+ """
+ # Make having just one rdataset at the node fast.
+ if len(self.rdatasets) > 0:
+ kind = NodeKind.classify_rdataset(rdataset)
+ if kind == NodeKind.CNAME:
+ self.rdatasets = [rds for rds in self.rdatasets if
+ NodeKind.classify_rdataset(rds) !=
+ NodeKind.REGULAR]
+ elif kind == NodeKind.REGULAR:
+ self.rdatasets = [rds for rds in self.rdatasets if
+ NodeKind.classify_rdataset(rds) !=
+ NodeKind.CNAME]
+ # Otherwise the rdataset is NodeKind.NEUTRAL and we do not need to
+ # edit self.rdatasets.
+ self.rdatasets.append(rdataset)
+
def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
create=False):
"""Find an rdataset matching the specified properties in the
@@ -111,7 +188,7 @@ class Node:
if not create:
raise KeyError
rds = dns.rdataset.Rdataset(rdclass, rdtype, covers)
- self.rdatasets.append(rds)
+ self._append_rdataset(rds)
return rds
def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
@@ -186,4 +263,58 @@ class Node:
replacement = replacement.to_rdataset()
self.delete_rdataset(replacement.rdclass, replacement.rdtype,
replacement.covers)
- self.rdatasets.append(replacement)
+ self._append_rdataset(replacement)
+
+ def classify(self):
+ """Classify a node.
+
+ A node which contains a CNAME or RRSIG(CNAME) is a
+ ``NodeKind.CNAME`` node.
+
+ A node which contains only "neutral" types, i.e. types allowed to
+ co-exist with a CNAME, is a ``NodeKind.NEUTRAL`` node. The neutral
+ types are NSEC, NSEC3, KEY, and their associated RRSIGS. An empty node
+ is also considered neutral.
+
+ A node which contains some rdataset which is not a CNAME, RRSIG(CNAME),
+ or a neutral type is a a ``NodeKind.REGULAR`` node. Regular nodes are
+ also commonly referred to as "other data".
+ """
+ for rdataset in self.rdatasets:
+ kind = NodeKind.classify(rdataset.rdtype, rdataset.covers)
+ if kind != NodeKind.NEUTRAL:
+ return kind
+ return NodeKind.NEUTRAL
+
+ def is_immutable(self):
+ return False
+
+
+@dns.immutable.immutable
+class ImmutableNode(Node):
+ def __init__(self, node):
+ super().__init__()
+ self.rdatasets = tuple(
+ [dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
+ )
+
+ def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
+ create=False):
+ if create:
+ raise TypeError("immutable")
+ return super().find_rdataset(rdclass, rdtype, covers, False)
+
+ def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
+ create=False):
+ if create:
+ raise TypeError("immutable")
+ return super().get_rdataset(rdclass, rdtype, covers, False)
+
+ def delete_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
+ raise TypeError("immutable")
+
+ def replace_rdataset(self, replacement):
+ raise TypeError("immutable")
+
+ def is_immutable(self):
+ return True
diff --git a/dns/transaction.py b/dns/transaction.py
index 8aec2e8..ae7417e 100644
--- a/dns/transaction.py
+++ b/dns/transaction.py
@@ -79,6 +79,17 @@ class AlreadyEnded(dns.exception.DNSException):
"""Tried to use an already-ended transaction."""
+def _ensure_immutable_rdataset(rdataset):
+ if rdataset is None or isinstance(rdataset, dns.rdataset.ImmutableRdataset):
+ return rdataset
+ return dns.rdataset.ImmutableRdataset(rdataset)
+
+def _ensure_immutable_node(node):
+ if node is None or node.is_immutable():
+ return node
+ return dns.node.ImmutableNode(node)
+
+
class Transaction:
def __init__(self, manager, replacement=False, read_only=False):
@@ -86,6 +97,9 @@ class Transaction:
self.replacement = replacement
self.read_only = read_only
self._ended = False
+ self._check_put_rdataset = []
+ self._check_delete_rdataset = []
+ self._check_delete_name = []
#
# This is the high level API
@@ -102,10 +116,14 @@ class Transaction:
name = dns.name.from_text(name, None)
rdtype = dns.rdatatype.RdataType.make(rdtype)
rdataset = self._get_rdataset(name, rdtype, covers)
- if rdataset is not None and \
- not isinstance(rdataset, dns.rdataset.ImmutableRdataset):
- rdataset = dns.rdataset.ImmutableRdataset(rdataset)
- return rdataset
+ return _ensure_immutable_rdataset(rdataset)
+
+ def get_node(self, name):
+ """Return the node at *name*, if any.
+
+ Returns an immutable node or ``None``.
+ """
+ return _ensure_immutable_node(self._get_node(name))
def _check_read_only(self):
if self.read_only:
@@ -271,6 +289,43 @@ class Transaction:
"""
self._end(False)
+ def check_put_rdataset(self, check):
+ """Call *check* before putting (storing) an rdataset.
+
+ The function is called with the transaction, the name, and the rdataset.
+
+ The check function may safely make non-mutating transaction method
+ calls, but behavior is undefined if mutating transaction methods are
+ called. The check function should raise an exception if it objects to
+ the put, and otherwise should return ``None``.
+ """
+ self._check_put_rdataset.append(check)
+
+ def check_delete_rdataset(self, check):
+ """Call *check* before deleting an rdataset.
+
+ The function is called with the transaction, the name, the rdatatype,
+ and the covered rdatatype.
+
+ The check function may safely make non-mutating transaction method
+ calls, but behavior is undefined if mutating transaction methods are
+ called. The check function should raise an exception if it objects to
+ the put, and otherwise should return ``None``.
+ """
+ self._check_delete_rdataset.append(check)
+
+ def check_delete_name(self, check):
+ """Call *check* before putting (storing) an rdataset.
+
+ The function is called with the transaction and the name.
+
+ The check function may safely make non-mutating transaction method
+ calls, but behavior is undefined if mutating transaction methods are
+ called. The check function should raise an exception if it objects to
+ the put, and otherwise should return ``None``.
+ """
+ self._check_delete_name.append(check)
+
#
# Helper methods
#
@@ -349,7 +404,7 @@ class Transaction:
trds.update(existing)
existing = trds
rdataset = existing.union(rdataset)
- self._put_rdataset(name, rdataset)
+ self._checked_put_rdataset(name, rdataset)
except IndexError:
raise TypeError(f'not enough parameters to {method}')
@@ -403,16 +458,16 @@ class Transaction:
raise DeleteNotExact(f'{method}: missing rdatas')
rdataset = existing.difference(rdataset)
if len(rdataset) == 0:
- self._delete_rdataset(name, rdataset.rdtype,
- rdataset.covers)
+ self._checked_delete_rdataset(name, rdataset.rdtype,
+ rdataset.covers)
else:
- self._put_rdataset(name, rdataset)
+ self._checked_put_rdataset(name, rdataset)
elif exact:
raise DeleteNotExact(f'{method}: missing rdataset')
else:
if exact and not self._name_exists(name):
raise DeleteNotExact(f'{method}: name not known')
- self._delete_name(name)
+ self._checked_delete_name(name)
except IndexError:
raise TypeError(f'not enough parameters to {method}')
@@ -429,6 +484,21 @@ class Transaction:
finally:
self._ended = True
+ def _checked_put_rdataset(self, name, rdataset):
+ for check in self._check_put_rdataset:
+ check(self, name, rdataset)
+ self._put_rdataset(name, rdataset)
+
+ def _checked_delete_rdataset(self, name, rdtype, covers):
+ for check in self._check_delete_rdataset:
+ check(self, name, rdtype, covers)
+ self._delete_rdataset(name, rdtype, covers)
+
+ def _checked_delete_name(self, name):
+ for check in self._check_delete_name:
+ check(self, name)
+ self._delete_name(name)
+
#
# Transactions are context managers.
#
@@ -462,7 +532,7 @@ class Transaction:
def _delete_name(self, name):
"""Delete all data associated with *name*.
- It is not an error if the rdataset does not exist.
+ It is not an error if the name does not exist.
"""
raise NotImplementedError # pragma: no cover
@@ -506,7 +576,12 @@ class Transaction:
def _iterate_rdatasets(self):
"""Return an iterator that yields (name, rdataset) tuples.
+ """
+ raise NotImplementedError # pragma: no cover
+
+ def _get_node(self, name):
+ """Return the node at *name*, if any.
- Not all Transaction subclasses implement this.
+ Returns a node or ``None``.
"""
raise NotImplementedError # pragma: no cover
diff --git a/dns/zone.py b/dns/zone.py
index 510be2d..dc4274a 100644
--- a/dns/zone.py
+++ b/dns/zone.py
@@ -854,6 +854,9 @@ class ImmutableVersionedNode(VersionedNode):
def replace_rdataset(self, replacement):
raise TypeError("immutable")
+ def is_immutable(self):
+ return True
+
class Version:
def __init__(self, zone, id, nodes=None, origin=None):
@@ -1024,6 +1027,9 @@ class Transaction(dns.transaction.Transaction):
for rdataset in node:
yield (name, rdataset)
+ def _get_node(self, name):
+ return self.version.get_node(name)
+
def from_text(text, origin=None, rdclass=dns.rdataclass.IN,
relativize=True, zone_factory=Zone, filename=None,
diff --git a/dns/zonefile.py b/dns/zonefile.py
index 39c7a38..ce16abb 100644
--- a/dns/zonefile.py
+++ b/dns/zonefile.py
@@ -38,6 +38,29 @@ class UnknownOrigin(dns.exception.DNSException):
"""Unknown origin"""
+class CNAMEAndOtherData(dns.exception.DNSException):
+ """A node has a CNAME and other data"""
+
+
+def _check_cname_and_other_data(txn, name, rdataset):
+ rdataset_kind = dns.node.NodeKind.classify_rdataset(rdataset)
+ node = txn.get_node(name)
+ if node is None:
+ # empty nodes are neutral.
+ return
+ node_kind = node.classify()
+ if node_kind == dns.node.NodeKind.CNAME and \
+ rdataset_kind == dns.node.NodeKind.REGULAR:
+ raise CNAMEAndOtherData('rdataset type is not compatible with a '
+ 'CNAME node')
+ elif node_kind == dns.node.NodeKind.REGULAR and \
+ rdataset_kind == dns.node.NodeKind.CNAME:
+ raise CNAMEAndOtherData('CNAME rdataset is not compatible with a '
+ 'regular data node')
+ # Otherwise at least one of the node and the rdataset is neutral, so
+ # adding the rdataset is ok
+
+
class Reader:
"""Read a DNS zone file into a transaction."""
@@ -71,6 +94,7 @@ class Reader:
self.force_ttl = force_ttl
self.force_rdclass = force_rdclass
self.force_rdtype = force_rdtype
+ self.txn.check_put_rdataset(_check_cname_and_other_data)
def _eat_line(self):
while 1:
@@ -445,6 +469,17 @@ class RRsetsReaderTransaction(dns.transaction.Transaction):
def _get_rdataset(self, name, rdtype, covers):
return self.rdatasets.get((name, rdtype, covers))
+ def _get_node(self, name):
+ rdatasets = []
+ for (rdataset_name, _, _), rdataset in self.rdatasets.items():
+ if name == rdataset_name:
+ rdatasets.append(rdataset)
+ if len(rdatasets) == 0:
+ return None
+ node = dns.node.Node()
+ node.rdatasets = rdatasets
+ return node
+
def _put_rdataset(self, name, rdataset):
self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset
diff --git a/doc/whatsnew.rst b/doc/whatsnew.rst
index e5b1667..7f7f125 100644
--- a/doc/whatsnew.rst
+++ b/doc/whatsnew.rst
@@ -34,6 +34,16 @@ What's New in dnspython
* The CDS rdatatype now allows digest type 0.
+* Dnspython zones now enforces that a node is either a CNAME node or
+ an "other data" node. A CNAME node contains only CNAME,
+ RRSIG(CNAME), NSEC, RRSIG(NSEC), NSEC3, or RRSIG(NSEC3) rdatasets.
+ An "other data" node contains any rdataset other than a CNAME or
+ RRSIG(CNAME) rdataset. The enforcement is "last update wins". For
+ example, if you have a node which contains a CNAME rdataset, and
+ then add an MX rdataset to it, then the CNAME rdataset will be deleted.
+ Likewise if you have a node containing an MX rdataset and add a
+ CNAME rdataset, the MX rdataset will be deleted.
+
2.1.0
----------------------
diff --git a/tests/test_zone.py b/tests/test_zone.py
index 3e34e72..bdc99a3 100644
--- a/tests/test_zone.py
+++ b/tests/test_zone.py
@@ -192,6 +192,55 @@ ns1 3600 IN A 10.0.0.1 ; comment1
ns2 3600 IN A 10.0.0.2 ; comment2
"""
+
+example_cname = """$TTL 3600
+$ORIGIN example.
+@ soa foo bar (1 2 3 4 5)
+@ ns ns1
+@ ns ns2
+ns1 a 10.0.0.1
+ns2 a 10.0.0.2
+www a 10.0.0.3
+web cname www
+ nsec @ CNAME RRSIG
+ rrsig NSEC 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+ rrsig CNAME 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+web2 cname www
+ nsec3 1 1 12 aabbccdd 2t7b4g4vsa5smi47k61mv5bv1a22bojr CNAME RRSIG
+ rrsig NSEC3 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+ rrsig CNAME 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+"""
+
+
+example_other_data = """$TTL 3600
+$ORIGIN example.
+@ soa foo bar (1 2 3 4 5)
+@ ns ns1
+@ ns ns2
+ns1 a 10.0.0.1
+ns2 a 10.0.0.2
+www a 10.0.0.3
+web a 10.0.0.4
+ nsec @ A RRSIG
+ rrsig A 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+ rrsig NSEC 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+"""
+
+example_cname_and_other_data = """$TTL 3600
+$ORIGIN example.
+@ soa foo bar (1 2 3 4 5)
+@ ns ns1
+@ ns ns2
+ns1 a 10.0.0.1
+ns2 a 10.0.0.2
+www a 10.0.0.3
+web a 10.0.0.4
+ cname www
+ nsec @ A RRSIG
+ rrsig A 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+ rrsig NSEC 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY=
+"""
+
_keep_output = True
def _rdata_sort(a):
@@ -839,6 +888,60 @@ class ZoneTestCase(unittest.TestCase):
self.assertTrue(rds is not rrs)
self.assertFalse(isinstance(rds, dns.rrset.RRset))
+ def testCnameAndOtherDataAddOther(self):
+ z = dns.zone.from_text(example_cname, 'example.', relativize=True)
+ rds = dns.rdataset.from_text('in', 'a', 300, '10.0.0.1')
+ z.replace_rdataset('web', rds)
+ z.replace_rdataset('web2', rds.copy())
+ n = z.find_node('web')
+ self.assertEqual(len(n.rdatasets), 3)
+ self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A),
+ rds)
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.NSEC))
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.RRSIG,
+ dns.rdatatype.NSEC))
+ n = z.find_node('web2')
+ self.assertEqual(len(n.rdatasets), 3)
+ self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A),
+ rds)
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.NSEC3))
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.RRSIG,
+ dns.rdatatype.NSEC3))
+
+ def testCnameAndOtherDataAddCname(self):
+ z = dns.zone.from_text(example_other_data, 'example.', relativize=True)
+ rds = dns.rdataset.from_text('in', 'cname', 300, 'www')
+ z.replace_rdataset('web', rds)
+ n = z.find_node('web')
+ self.assertEqual(len(n.rdatasets), 3)
+ self.assertEqual(n.find_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.CNAME),
+ rds)
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.NSEC))
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.RRSIG,
+ dns.rdatatype.NSEC))
+
+ def testCnameAndOtherDataInZonefile(self):
+ with self.assertRaises(dns.zonefile.CNAMEAndOtherData):
+ dns.zone.from_text(example_cname_and_other_data, 'example.',
+ relativize=True)
+
+ def testNameInZoneWithStr(self):
+ z = dns.zone.from_text(example_text, 'example.', relativize=False)
+ self.assertTrue('ns1.example.' in z)
+ self.assertTrue('bar.foo.example.' in z)
+
+ def testNameInZoneWhereNameIsNotValid(self):
+ z = dns.zone.from_text(example_text, 'example.', relativize=False)
+ with self.assertRaises(KeyError):
+ self.assertTrue(1 in z)
+
class VersionedZoneTestCase(unittest.TestCase):
def testUseTransaction(self):
@@ -909,15 +1012,49 @@ class VersionedZoneTestCase(unittest.TestCase):
rds = txn.get('example.', 'soa')
self.assertEqual(rds[0].serial, 1)
- def testNameInZoneWithStr(self):
- z = dns.zone.from_text(example_text, 'example.', relativize=False)
- self.assertTrue('ns1.example.' in z)
- self.assertTrue('bar.foo.example.' in z)
+ def testCnameAndOtherDataAddOther(self):
+ z = dns.zone.from_text(example_cname, 'example.', relativize=True,
+ zone_factory=dns.versioned.Zone)
+ rds = dns.rdataset.from_text('in', 'a', 300, '10.0.0.1')
+ with z.writer() as txn:
+ txn.replace('web', rds)
+ txn.replace('web2', rds.copy())
+ n = z.find_node('web')
+ self.assertEqual(len(n.rdatasets), 3)
+ self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A),
+ rds)
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.NSEC))
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.RRSIG,
+ dns.rdatatype.NSEC))
+ n = z.find_node('web2')
+ self.assertEqual(len(n.rdatasets), 3)
+ self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A),
+ rds)
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.NSEC3))
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.RRSIG,
+ dns.rdatatype.NSEC3))
+
+ def testCnameAndOtherDataAddCname(self):
+ z = dns.zone.from_text(example_other_data, 'example.', relativize=True,
+ zone_factory=dns.versioned.Zone)
+ rds = dns.rdataset.from_text('in', 'cname', 300, 'www')
+ with z.writer() as txn:
+ txn.replace('web', rds)
+ n = z.find_node('web')
+ self.assertEqual(len(n.rdatasets), 3)
+ self.assertEqual(n.find_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.CNAME),
+ rds)
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.NSEC))
+ self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.RRSIG,
+ dns.rdatatype.NSEC))
- def testNameInZoneWhereNameIsNotValid(self):
- z = dns.zone.from_text(example_text, 'example.', relativize=False)
- with self.assertRaises(KeyError):
- self.assertTrue(1 in z)
if __name__ == '__main__':
unittest.main()