summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2021-12-02 14:44:08 -0800
committerBob Halley <halley@dnspython.org>2021-12-02 14:44:08 -0800
commita5cf8b0817d27abb0f27bfc6b8a78a518c534a35 (patch)
tree257d33cc48e9ca0972c98dddafe450b50ef4ba82
parent7ce649d1a1a4fe692c9896dc8b2287971b480314 (diff)
downloaddnspython-cname-and-other-data.tar.gz
refactor to have a get_node() in the txn APIcname-and-other-data
-rw-r--r--dns/node.py123
-rw-r--r--dns/rdataset.py38
-rw-r--r--dns/transaction.py24
-rw-r--r--dns/zone.py10
-rw-r--r--dns/zonefile.py37
5 files changed, 147 insertions, 85 deletions
diff --git a/dns/node.py b/dns/node.py
index 1739154..b8141c4 100644
--- a/dns/node.py
+++ b/dns/node.py
@@ -17,13 +17,53 @@
"""DNS nodes. A node is a set of rdatasets."""
+import enum
import io
+import dns.immutable
import dns.rdataset
import dns.rdatatype
import dns.renderer
+_cname_types = {
+ dns.rdatatype.CNAME,
+}
+
+# "neutral" types can coexist with a CNAME and thus are not "other data"
+_neutral_types = {
+ dns.rdatatype.NSEC, # RFC 4035 section 2.5
+ dns.rdatatype.NSEC3, # This is not likely to happen, but not impossible!
+ dns.rdatatype.KEY, # RFC 4035 section 2.5, RFC 3007
+}
+
+def _matches_type_or_its_signature(rdtypes, rdtype, covers):
+ return rdtype in rdtypes or \
+ (rdtype == dns.rdatatype.RRSIG and covers in rdtypes)
+
+
+@enum.unique
+class NodeKind(enum.Enum):
+ """Rdatasets in nodes
+ """
+ REGULAR = 0 # a.k.a "other data"
+ NEUTRAL = 1
+ CNAME = 2
+
+ @classmethod
+ def classify(cls, rdtype, covers):
+ if _matches_type_or_its_signature(_cname_types, rdtype, covers):
+ return NodeKind.CNAME
+ elif _matches_type_or_its_signature(_neutral_types, rdtype, covers):
+ return NodeKind.NEUTRAL
+ else:
+ return NodeKind.REGULAR
+
+ @classmethod
+ def classify_rdataset(cls, rdataset):
+ return cls.classify(rdataset.rdtype, rdataset.covers)
+
+
class Node:
"""A Node is a set of rdatasets.
@@ -95,22 +135,26 @@ class Node:
"""Append rdataset to the node with special handling for CNAME and
other data conditions.
- Specifically, if the rdataset being appended is a CNAME, then
- all rdatasets other than NSEC, NSEC3, and their covering RRSIGs
- are deleted. If the rdataset being appended is NOT a CNAME, then
- CNAME and RRSIG(CNAME) are deleted.
+ Specifically, if the rdataset being appended has ``NodeKind.CNAME``,
+ then all rdatasets other than KEY, NSEC, NSEC3, and their covering
+ RRSIGs are deleted. If the rdataset being appended has
+ ``NodeKind.REGUALAR`` then CNAME and RRSIG(CNAME) are deleted.
"""
# Make having just one rdataset at the node fast.
if len(self.rdatasets) > 0:
- if rdataset.implies_cname():
- self.rdatasets = [rds for rds in self.rdatasets
- if rds.ok_for_cname()]
- elif rdataset.implies_other_data():
- self.rdatasets = [rds for rds in self.rdatasets
- if rds.ok_for_other_data()]
+ kind = NodeKind.classify_rdataset(rdataset)
+ if kind == NodeKind.CNAME:
+ self.rdatasets = [rds for rds in self.rdatasets if
+ NodeKind.classify_rdataset(rds) !=
+ NodeKind.REGULAR]
+ elif kind == NodeKind.REGULAR:
+ self.rdatasets = [rds for rds in self.rdatasets if
+ NodeKind.classify_rdataset(rds) !=
+ NodeKind.CNAME]
+ # Otherwise the rdataset is NodeKind.NEUTRAL and we do not need to
+ # edit self.rdatasets.
self.rdatasets.append(rdataset)
-
def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
create=False):
"""Find an rdataset matching the specified properties in the
@@ -221,11 +265,56 @@ class Node:
replacement.covers)
self._append_rdataset(replacement)
- def is_cname(self):
- """Is this a CNAME node?
+ def classify(self):
+ """Classify a node.
+
+ A node which contains a CNAME or RRSIG(CNAME) is a
+ ``NodeKind.CNAME`` node.
- If the node has a CNAME or an RRSIG(CNAME) it is considered a CNAME
- node for CNAME-and-other-data purposes, and ``True`` is returned.
- Otherwise the node is an "other data" node, and ``False`` is returned.
+ A node which contains only "neutral" types, i.e. types allowed to
+ co-exist with a CNAME, is a ``NodeKind.NEUTRAL`` node. The neutral
+ types are NSEC, NSEC3, KEY, and their associated RRSIGS. An empty node
+ is also considered neutral.
+
+ A node which contains some rdataset which is not a CNAME, RRSIG(CNAME),
+ or a neutral type is a a ``NodeKind.REGULAR`` node. Regular nodes are
+ also commonly referred to as "other data".
"""
- return any(rdataset.implies_cname() for rdataset in self.rdatasets)
+ for rdataset in self.rdatasets:
+ kind = NodeKind.classify(rdataset.rdtype, rdataset.covers)
+ if kind != NodeKind.NEUTRAL:
+ return kind
+ return NodeKind.NEUTRAL
+
+ def is_immutable(self):
+ return False
+
+
+dns.immutable.immutable
+class ImmutableNode(Node):
+ def __init__(self, node):
+ super().__init__()
+ self.rdatasets = tuple(
+ [dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
+ )
+
+ def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
+ create=False):
+ if create:
+ raise TypeError("immutable")
+ return super().find_rdataset(rdclass, rdtype, covers, False)
+
+ def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
+ create=False):
+ if create:
+ raise TypeError("immutable")
+ return super().get_rdataset(rdclass, rdtype, covers, False)
+
+ def delete_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
+ raise TypeError("immutable")
+
+ def replace_rdataset(self, replacement):
+ raise TypeError("immutable")
+
+ def is_immutable(self):
+ return True
diff --git a/dns/rdataset.py b/dns/rdataset.py
index f948d76..e69ee23 100644
--- a/dns/rdataset.py
+++ b/dns/rdataset.py
@@ -41,14 +41,6 @@ class IncompatibleTypes(dns.exception.DNSException):
"""An attempt was made to add DNS RR data of an incompatible type."""
-_ok_for_cname = {
- dns.rdatatype.CNAME,
- dns.rdatatype.NSEC, # RFC 4035 section 2.5
- dns.rdatatype.NSEC3, # This is not likely to happen, but not impossible!
- dns.rdatatype.KEY, # RFC 4035 section 2.5, RFC 3007
-}
-
-
class Rdataset(dns.set.Set):
"""A DNS rdataset."""
@@ -331,36 +323,6 @@ class Rdataset(dns.set.Set):
else:
return self[0]._processing_order(iter(self))
- def ok_for_cname(self):
- """Is this rdataset compatible with a CNAME node?"""
- return self.rdtype in _ok_for_cname or \
- (self.rdtype == dns.rdatatype.RRSIG and
- self.covers in _ok_for_cname)
-
- def implies_cname(self):
- """Does this rdataset imply a node is a CNAME node?
-
- If the rdataset's type is CNAME or RRSIG(CNAME) then it implies a
- node is a CNAME node, and ``True`` is returned. Otherwise it implies
- the node is an an "other data" node, and ``False`` is returned.
- """
- return self.rdtype == dns.rdatatype.CNAME or \
- (self.rdtype == dns.rdatatype.RRSIG and
- self.covers == dns.rdatatype.CNAME)
-
- def ok_for_other_data(self):
- """Is this rdataset compatible with an 'other data' (i.e. not CNAME)
- node?"""
- return not self.implies_cname()
-
- def implies_other_data(self):
- """Does this rdataset imply a node is an other data node?
-
- Note that implies_other_data() is not simply "not implies_cname()" as
- some types, e.g. NSEC and RRSIG(NSEC) are neutral.
- """
- return not self.ok_for_cname()
-
@dns.immutable.immutable
class ImmutableRdataset(Rdataset):
diff --git a/dns/transaction.py b/dns/transaction.py
index 0df1e56..ae7417e 100644
--- a/dns/transaction.py
+++ b/dns/transaction.py
@@ -79,11 +79,16 @@ class AlreadyEnded(dns.exception.DNSException):
"""Tried to use an already-ended transaction."""
-def _ensure_immutable(rdataset):
+def _ensure_immutable_rdataset(rdataset):
if rdataset is None or isinstance(rdataset, dns.rdataset.ImmutableRdataset):
return rdataset
return dns.rdataset.ImmutableRdataset(rdataset)
+def _ensure_immutable_node(node):
+ if node is None or node.is_immutable():
+ return node
+ return dns.node.ImmutableNode(node)
+
class Transaction:
@@ -111,15 +116,14 @@ class Transaction:
name = dns.name.from_text(name, None)
rdtype = dns.rdatatype.RdataType.make(rdtype)
rdataset = self._get_rdataset(name, rdtype, covers)
- return _ensure_immutable(rdataset)
+ return _ensure_immutable_rdataset(rdataset)
- def get_rdatasets(self, name):
- """Return the rdatasets at *name*, if any.
+ def get_node(self, name):
+ """Return the node at *name*, if any.
- The returned rdatasets are immutable.
- An empty list is returned if the name doesn't exist.
+ Returns an immutable node or ``None``.
"""
- return [_ensure_immutable(rds) for rds in self._get_rdatasets(name)]
+ return _ensure_immutable_node(self._get_node(name))
def _check_read_only(self):
if self.read_only:
@@ -575,9 +579,9 @@ class Transaction:
"""
raise NotImplementedError # pragma: no cover
- def _get_rdatasets(self, name):
- """Return the rdatasets at *name*, if any.
+ def _get_node(self, name):
+ """Return the node at *name*, if any.
- An empty list is returned if the name doesn't exist.
+ Returns a node or ``None``.
"""
raise NotImplementedError # pragma: no cover
diff --git a/dns/zone.py b/dns/zone.py
index 9d999c7..dc4274a 100644
--- a/dns/zone.py
+++ b/dns/zone.py
@@ -854,6 +854,9 @@ class ImmutableVersionedNode(VersionedNode):
def replace_rdataset(self, replacement):
raise TypeError("immutable")
+ def is_immutable(self):
+ return True
+
class Version:
def __init__(self, zone, id, nodes=None, origin=None):
@@ -1024,11 +1027,8 @@ class Transaction(dns.transaction.Transaction):
for rdataset in node:
yield (name, rdataset)
- def _get_rdatasets(self, name):
- node = self.version.get_node(name)
- if node is None:
- return []
- return node.rdatasets
+ def _get_node(self, name):
+ return self.version.get_node(name)
def from_text(text, origin=None, rdclass=dns.rdataclass.IN,
diff --git a/dns/zonefile.py b/dns/zonefile.py
index 4d72c71..bcafe1d 100644
--- a/dns/zonefile.py
+++ b/dns/zonefile.py
@@ -43,19 +43,22 @@ class CNAMEAndOtherData(dns.exception.DNSException):
def _check_cname_and_other_data(txn, name, rdataset):
- rdatasets = txn.get_rdatasets(name)
- if any(rds.implies_cname() for rds in rdatasets):
- # This is a CNAME node.
- if not rdataset.ok_for_cname():
- raise CNAMEAndOtherData('rdataset not ok for CNAME node')
- elif any(rds.implies_other_data() for rds in rdatasets):
- # This is an other data node
- if not rdataset.ok_for_other_data():
- raise CNAMEAndOtherData('rdataset is a CNAME but node '
- 'has other data')
- # Otherwise the node consists of neutral types that can be
- # present at either a CNAME or an other data node, e.g. NSEC or
- # RRSIG(NSEC)
+ rdataset_kind = dns.node.NodeKind.classify_rdataset(rdataset)
+ node = txn.get_node(name)
+ if node is not None:
+ node_kind = node.classify()
+ else:
+ node_kind = dns.node.NodeKind.NEUTRAL
+ if node_kind == dns.node.NodeKind.CNAME and \
+ rdataset_kind == dns.node.NodeKind.REGULAR:
+ raise CNAMEAndOtherData('rdataset type is not compatible with a '
+ 'CNAME node')
+ elif node_kind == dns.node.NodeKind.REGULAR and \
+ rdataset_kind == dns.node.NodeKind.CNAME:
+ raise CNAMEAndOtherData('CNAME rdataset is not compatible with a '
+ 'regular data node')
+ # Otherwise at least one of the node and the rdataset is neutral, so
+ # adding the rdataset is ok
class Reader:
@@ -466,12 +469,16 @@ class RRsetsReaderTransaction(dns.transaction.Transaction):
def _get_rdataset(self, name, rdtype, covers):
return self.rdatasets.get((name, rdtype, covers))
- def _get_rdatasets(self, name):
+ def _get_node(self, name):
rdatasets = []
for (rdataset_name, _, _), rdataset in self.rdatasets.items():
if name == rdataset_name:
rdatasets.append(rdataset)
- return rdatasets
+ if len(rdatasets) == 0:
+ return None
+ node = dns.node.Node()
+ node.rdatasets = rdatasets
+ return node
def _put_rdataset(self, name, rdataset):
self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset