summaryrefslogtreecommitdiff
path: root/dns/zone.py
diff options
context:
space:
mode:
Diffstat (limited to 'dns/zone.py')
-rw-r--r--dns/zone.py525
1 files changed, 113 insertions, 412 deletions
diff --git a/dns/zone.py b/dns/zone.py
index d5bb305..2ca9bc2 100644
--- a/dns/zone.py
+++ b/dns/zone.py
@@ -20,10 +20,9 @@
import contextlib
import io
import os
-import re
-import sys
import dns.exception
+import dns.masterfile
import dns.name
import dns.node
import dns.rdataclass
@@ -32,6 +31,7 @@ import dns.rdata
import dns.rdtypes.ANY.SOA
import dns.rrset
import dns.tokenizer
+import dns.transaction
import dns.ttl
import dns.grange
@@ -56,7 +56,7 @@ class UnknownOrigin(BadZone):
"""The DNS zone's origin is unknown."""
-class Zone:
+class Zone(dns.transaction.TransactionManager):
"""A DNS zone.
@@ -642,415 +642,108 @@ class Zone:
if self.get_rdataset(name, dns.rdatatype.NS) is None:
raise NoNS
+ def reader(self):
+ return Transaction(False, True, self)
-class _MasterReader:
-
- """Read a DNS master file
-
- @ivar tok: The tokenizer
- @type tok: dns.tokenizer.Tokenizer object
- @ivar last_ttl: The last seen explicit TTL for an RR
- @type last_ttl: int
- @ivar last_ttl_known: Has last TTL been detected
- @type last_ttl_known: bool
- @ivar default_ttl: The default TTL from a $TTL directive or SOA RR
- @type default_ttl: int
- @ivar default_ttl_known: Has default TTL been detected
- @type default_ttl_known: bool
- @ivar last_name: The last name read
- @type last_name: dns.name.Name object
- @ivar current_origin: The current origin
- @type current_origin: dns.name.Name object
- @ivar relativize: should names in the zone be relativized?
- @type relativize: bool
- @ivar zone: the zone
- @type zone: dns.zone.Zone object
- @ivar saved_state: saved reader state (used when processing $INCLUDE)
- @type saved_state: list of (tokenizer, current_origin, last_name, file,
- last_ttl, last_ttl_known, default_ttl, default_ttl_known) tuples.
- @ivar current_file: the file object of the $INCLUDed file being parsed
- (None if no $INCLUDE is active).
- @ivar allow_include: is $INCLUDE allowed?
- @type allow_include: bool
- @ivar check_origin: should sanity checks of the origin node be done?
- The default is True.
- @type check_origin: bool
- """
+ def writer(self, replacement=False):
+ return Transaction(replacement, False, self)
- def __init__(self, tok, origin, rdclass, relativize, zone_factory=Zone,
- allow_include=False, check_origin=True):
- if isinstance(origin, str):
- origin = dns.name.from_text(origin)
- self.tok = tok
- self.current_origin = origin
- self.relativize = relativize
- self.last_ttl = 0
- self.last_ttl_known = False
- self.default_ttl = 0
- self.default_ttl_known = False
- self.last_name = self.current_origin
- self.zone = zone_factory(origin, rdclass, relativize=relativize)
- self.saved_state = []
- self.current_file = None
- self.allow_include = allow_include
- self.check_origin = check_origin
-
- def _eat_line(self):
- while 1:
- token = self.tok.get()
- if token.is_eol_or_eof():
- break
-
- def _rr_line(self):
- """Process one line from a DNS master file."""
- # Name
- if self.current_origin is None:
- raise UnknownOrigin
- token = self.tok.get(want_leading=True)
- if not token.is_whitespace():
- self.last_name = self.tok.as_name(token, self.current_origin)
- else:
- token = self.tok.get()
- if token.is_eol_or_eof():
- # treat leading WS followed by EOL/EOF as if they were EOL/EOF.
- return
- self.tok.unget(token)
- name = self.last_name
- if not name.is_subdomain(self.zone.origin):
- self._eat_line()
- return
- if self.relativize:
- name = name.relativize(self.zone.origin)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- # TTL
- ttl = None
- try:
- ttl = dns.ttl.from_text(token.value)
- self.last_ttl = ttl
- self.last_ttl_known = True
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except dns.ttl.BadTTL:
- if self.default_ttl_known:
- ttl = self.default_ttl
- elif self.last_ttl_known:
- ttl = self.last_ttl
-
- # Class
- try:
- rdclass = dns.rdataclass.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except dns.exception.SyntaxError:
- raise
- except Exception:
- rdclass = self.zone.rdclass
- if rdclass != self.zone.rdclass:
- raise dns.exception.SyntaxError("RR class is not zone's class")
- # Type
- try:
- rdtype = dns.rdatatype.from_text(token.value)
- except Exception:
- raise dns.exception.SyntaxError(
- "unknown rdatatype '%s'" % token.value)
- n = self.zone.nodes.get(name)
- if n is None:
- n = self.zone.node_factory()
- self.zone.nodes[name] = n
- try:
- rd = dns.rdata.from_text(rdclass, rdtype, self.tok,
- self.current_origin, self.relativize,
- self.zone.origin)
- except dns.exception.SyntaxError:
- # Catch and reraise.
- raise
- except Exception:
- # All exceptions that occur in the processing of rdata
- # are treated as syntax errors. This is not strictly
- # correct, but it is correct almost all of the time.
- # We convert them to syntax errors so that we can emit
- # helpful filename:line info.
- (ty, va) = sys.exc_info()[:2]
- raise dns.exception.SyntaxError(
- "caught exception {}: {}".format(str(ty), str(va)))
-
- if not self.default_ttl_known and rdtype == dns.rdatatype.SOA:
- # The pre-RFC2308 and pre-BIND9 behavior inherits the zone default
- # TTL from the SOA minttl if no $TTL statement is present before the
- # SOA is parsed.
- self.default_ttl = rd.minimum
- self.default_ttl_known = True
- if ttl is None:
- # if we didn't have a TTL on the SOA, set it!
- ttl = rd.minimum
-
- # TTL check. We had to wait until now to do this as the SOA RR's
- # own TTL can be inferred from its minimum.
- if ttl is None:
- raise dns.exception.SyntaxError("Missing default TTL value")
-
- covers = rd.covers()
- rds = n.find_rdataset(rdclass, rdtype, covers, True)
- rds.add(rd, ttl)
-
- def _parse_modify(self, side):
- # Here we catch everything in '{' '}' in a group so we can replace it
- # with ''.
- is_generate1 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+),(.)}).*$")
- is_generate2 = re.compile(r"^.*\$({(\+|-?)(\d+)}).*$")
- is_generate3 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+)}).*$")
- # Sometimes there are modifiers in the hostname. These come after
- # the dollar sign. They are in the form: ${offset[,width[,base]]}.
- # Make names
- g1 = is_generate1.match(side)
- if g1:
- mod, sign, offset, width, base = g1.groups()
- if sign == '':
- sign = '+'
- g2 = is_generate2.match(side)
- if g2:
- mod, sign, offset = g2.groups()
- if sign == '':
- sign = '+'
- width = 0
- base = 'd'
- g3 = is_generate3.match(side)
- if g3:
- mod, sign, offset, width = g3.groups()
- if sign == '':
- sign = '+'
- base = 'd'
-
- if not (g1 or g2 or g3):
- mod = ''
- sign = '+'
- offset = 0
- width = 0
- base = 'd'
-
- if base != 'd':
- raise NotImplementedError()
-
- return mod, sign, offset, width, base
-
- def _generate_line(self):
- # range lhs [ttl] [class] type rhs [ comment ]
- """Process one line containing the GENERATE statement from a DNS
- master file."""
- if self.current_origin is None:
- raise UnknownOrigin
-
- token = self.tok.get()
- # Range (required)
- try:
- start, stop, step = dns.grange.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except Exception:
- raise dns.exception.SyntaxError
-
- # lhs (required)
- try:
- lhs = token.value
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except Exception:
- raise dns.exception.SyntaxError
-
- # TTL
- try:
- ttl = dns.ttl.from_text(token.value)
- self.last_ttl = ttl
- self.last_ttl_known = True
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except dns.ttl.BadTTL:
- if not (self.last_ttl_known or self.default_ttl_known):
- raise dns.exception.SyntaxError("Missing default TTL value")
- if self.default_ttl_known:
- ttl = self.default_ttl
- elif self.last_ttl_known:
- ttl = self.last_ttl
- # Class
- try:
- rdclass = dns.rdataclass.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except dns.exception.SyntaxError:
- raise dns.exception.SyntaxError
- except Exception:
- rdclass = self.zone.rdclass
+class Transaction(dns.transaction.Transaction):
+
+ _deleted_rdataset = dns.rdataset.Rdataset(dns.rdataclass.ANY,
+ dns.rdatatype.ANY)
+
+ def __init__(self, replacement, read_only, zone):
+ super().__init__(replacement, read_only)
+ self.zone = zone
+ self.rdatasets = {}
+
+ def _get_rdataset(self, name, rdclass, rdtype, covers):
if rdclass != self.zone.rdclass:
- raise dns.exception.SyntaxError("RR class is not zone's class")
- # Type
- try:
- rdtype = dns.rdatatype.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except Exception:
- raise dns.exception.SyntaxError("unknown rdatatype '%s'" %
- token.value)
-
- # rhs (required)
- rhs = token.value
-
- # The code currently only supports base 'd', so the last value
- # in the tuple _parse_modify returns is ignored
- lmod, lsign, loffset, lwidth, _ = self._parse_modify(lhs)
- rmod, rsign, roffset, rwidth, _ = self._parse_modify(rhs)
- for i in range(start, stop + 1, step):
- # +1 because bind is inclusive and python is exclusive
-
- if lsign == '+':
- lindex = i + int(loffset)
- elif lsign == '-':
- lindex = i - int(loffset)
-
- if rsign == '-':
- rindex = i - int(roffset)
- elif rsign == '+':
- rindex = i + int(roffset)
-
- lzfindex = str(lindex).zfill(int(lwidth))
- rzfindex = str(rindex).zfill(int(rwidth))
-
- name = lhs.replace('$%s' % (lmod), lzfindex)
- rdata = rhs.replace('$%s' % (rmod), rzfindex)
-
- self.last_name = dns.name.from_text(name, self.current_origin,
- self.tok.idna_codec)
- name = self.last_name
- if not name.is_subdomain(self.zone.origin):
- self._eat_line()
- return
- if self.relativize:
- name = name.relativize(self.zone.origin)
-
- n = self.zone.nodes.get(name)
- if n is None:
- n = self.zone.node_factory()
- self.zone.nodes[name] = n
- try:
- rd = dns.rdata.from_text(rdclass, rdtype, rdata,
- self.current_origin, self.relativize,
- self.zone.origin)
- except dns.exception.SyntaxError:
- # Catch and reraise.
- raise
- except Exception:
- # All exceptions that occur in the processing of rdata
- # are treated as syntax errors. This is not strictly
- # correct, but it is correct almost all of the time.
- # We convert them to syntax errors so that we can emit
- # helpful filename:line info.
- (ty, va) = sys.exc_info()[:2]
- raise dns.exception.SyntaxError("caught exception %s: %s" %
- (str(ty), str(va)))
-
- covers = rd.covers()
- rds = n.find_rdataset(rdclass, rdtype, covers, True)
- rds.add(rd, ttl)
-
- def read(self):
- """Read a DNS master file and build a zone object.
-
- @raises dns.zone.NoSOA: No SOA RR was found at the zone origin
- @raises dns.zone.NoNS: No NS RRset was found at the zone origin
- """
+ raise ValueError(f'class {rdclass} != ' +
+ f'zone class {self.zone.rdclass}')
+ rdataset = self.rdatasets.get((name, rdtype, covers))
+ if rdataset is self._deleted_rdataset:
+ return None
+ elif rdataset is None:
+ rdataset = self.zone.get_rdataset(name, rdtype, covers)
+ return rdataset
+ def _put_rdataset(self, name, rdataset):
+ assert not self.read_only
+ self.zone._validate_name(name)
+ if rdataset.rdclass != self.zone.rdclass:
+ raise ValueError(f'rdataset class {rdataset.rdclass} != ' +
+ f'zone class {self.zone.rdclass}')
+ self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset
+
+ def _delete_name(self, name):
+ assert not self.read_only
+ # First remove any changes involving the name
+ remove = []
+ for key in self.rdatasets:
+ if key[0] == name:
+ remove.append(key)
+ if len(remove) > 0:
+ for key in remove:
+ del self.rdatasets[key]
+ # Next add deletion records for any rdatasets matching the
+ # name in the zone
+ node = self.zone.get_node(name)
+ if node is not None:
+ for rdataset in node.rdatasets:
+ self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = \
+ self._deleted_rdataset
+
+ def _delete_rdataset(self, name, rdclass, rdtype, covers):
+ assert not self.read_only
+ # The high-level code always does a _get_rdataset() before any
+ # situation where it would call _delete_rdataset(), so we don't
+ # need to check if rdclass != self.zone.rdclass.
try:
- while 1:
- token = self.tok.get(True, True)
- if token.is_eof():
- if self.current_file is not None:
- self.current_file.close()
- if len(self.saved_state) > 0:
- (self.tok,
- self.current_origin,
- self.last_name,
- self.current_file,
- self.last_ttl,
- self.last_ttl_known,
- self.default_ttl,
- self.default_ttl_known) = self.saved_state.pop(-1)
- continue
- break
- elif token.is_eol():
- continue
- elif token.is_comment():
- self.tok.get_eol()
- continue
- elif token.value[0] == '$':
- c = token.value.upper()
- if c == '$TTL':
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError("bad $TTL")
- self.default_ttl = dns.ttl.from_text(token.value)
- self.default_ttl_known = True
- self.tok.get_eol()
- elif c == '$ORIGIN':
- self.current_origin = self.tok.get_name()
- self.tok.get_eol()
- if self.zone.origin is None:
- self.zone.origin = self.current_origin
- elif c == '$INCLUDE' and self.allow_include:
- token = self.tok.get()
- filename = token.value
- token = self.tok.get()
- if token.is_identifier():
- new_origin =\
- dns.name.from_text(token.value,
- self.current_origin,
- self.tok.idna_codec)
- self.tok.get_eol()
- elif not token.is_eol_or_eof():
- raise dns.exception.SyntaxError(
- "bad origin in $INCLUDE")
- else:
- new_origin = self.current_origin
- self.saved_state.append((self.tok,
- self.current_origin,
- self.last_name,
- self.current_file,
- self.last_ttl,
- self.last_ttl_known,
- self.default_ttl,
- self.default_ttl_known))
- self.current_file = open(filename, 'r')
- self.tok = dns.tokenizer.Tokenizer(self.current_file,
- filename)
- self.current_origin = new_origin
- elif c == '$GENERATE':
- self._generate_line()
- else:
- raise dns.exception.SyntaxError(
- "Unknown master file directive '" + c + "'")
- continue
- self.tok.unget(token)
- self._rr_line()
- except dns.exception.SyntaxError as detail:
- (filename, line_number) = self.tok.where()
- if detail is None:
- detail = "syntax error"
- ex = dns.exception.SyntaxError(
- "%s:%d: %s" % (filename, line_number, detail))
- tb = sys.exc_info()[2]
- raise ex.with_traceback(tb) from None
-
- # Now that we're done reading, do some basic checking of the zone.
- if self.check_origin:
- self.zone.check_origin()
+ del self.rdatasets[(name, rdtype, covers)]
+ except KeyError:
+ pass
+ rdataset = self.zone.get_rdataset(name, rdtype, covers)
+ if rdataset is not None:
+ self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = \
+ self._deleted_rdataset
+
+ def _name_exists(self, name):
+ for key, rdataset in self.rdatasets.items():
+ if key[0] == name:
+ if rdataset != self._deleted_rdataset:
+ return True
+ else:
+ return None
+ self.zone._validate_name(name)
+ if self.zone.get_node(name):
+ return True
+ return False
+
+ def _end_transaction(self, commit):
+ if commit and not self.read_only:
+ for (name, rdtype, covers), rdataset in \
+ self.rdatasets.items():
+ if rdataset is self._deleted_rdataset:
+ self.zone.delete_rdataset(name, rdtype, covers)
+ else:
+ self.zone.replace_rdataset(name, rdataset)
+
+ def _set_origin(self, origin):
+ if self.zone.origin is None:
+ self.zone.origin = origin
+
+ def _iterate_rdatasets(self):
+ # Expensive but simple! Use a versioned zone for efficient txn
+ # iteration.
+ rdatasets = {}
+ for (name, rdataset) in self.zone.iterate_rdatasets():
+ rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset
+ rdatasets.update(self.rdatasets)
+ for (name, _, _), rdataset in rdatasets.items():
+ yield (name, rdataset)
def from_text(text, origin=None, rdclass=dns.rdataclass.IN,
@@ -1103,12 +796,20 @@ def from_text(text, origin=None, rdclass=dns.rdataclass.IN,
if filename is None:
filename = '<string>'
- tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec)
- reader = _MasterReader(tok, origin, rdclass, relativize, zone_factory,
- allow_include=allow_include,
- check_origin=check_origin)
- reader.read()
- return reader.zone
+ zone = zone_factory(origin, rdclass, relativize=relativize)
+ with zone.writer(True) as txn:
+ tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec)
+ reader = dns.masterfile.Reader(tok, origin, rdclass, relativize, txn,
+ allow_include=allow_include)
+ try:
+ reader.read()
+ except dns.masterfile.UnknownOrigin:
+ # for backwards compatibility
+ raise dns.zone.UnknownOrigin
+ # Now that we're done reading, do some basic checking of the zone.
+ if check_origin:
+ zone.check_origin()
+ return zone
def from_file(f, origin=None, rdclass=dns.rdataclass.IN,