diff options
Diffstat (limited to 'dns/zone.py')
-rw-r--r-- | dns/zone.py | 525 |
1 files changed, 113 insertions, 412 deletions
diff --git a/dns/zone.py b/dns/zone.py index d5bb305..2ca9bc2 100644 --- a/dns/zone.py +++ b/dns/zone.py @@ -20,10 +20,9 @@ import contextlib import io import os -import re -import sys import dns.exception +import dns.masterfile import dns.name import dns.node import dns.rdataclass @@ -32,6 +31,7 @@ import dns.rdata import dns.rdtypes.ANY.SOA import dns.rrset import dns.tokenizer +import dns.transaction import dns.ttl import dns.grange @@ -56,7 +56,7 @@ class UnknownOrigin(BadZone): """The DNS zone's origin is unknown.""" -class Zone: +class Zone(dns.transaction.TransactionManager): """A DNS zone. @@ -642,415 +642,108 @@ class Zone: if self.get_rdataset(name, dns.rdatatype.NS) is None: raise NoNS + def reader(self): + return Transaction(False, True, self) -class _MasterReader: - - """Read a DNS master file - - @ivar tok: The tokenizer - @type tok: dns.tokenizer.Tokenizer object - @ivar last_ttl: The last seen explicit TTL for an RR - @type last_ttl: int - @ivar last_ttl_known: Has last TTL been detected - @type last_ttl_known: bool - @ivar default_ttl: The default TTL from a $TTL directive or SOA RR - @type default_ttl: int - @ivar default_ttl_known: Has default TTL been detected - @type default_ttl_known: bool - @ivar last_name: The last name read - @type last_name: dns.name.Name object - @ivar current_origin: The current origin - @type current_origin: dns.name.Name object - @ivar relativize: should names in the zone be relativized? - @type relativize: bool - @ivar zone: the zone - @type zone: dns.zone.Zone object - @ivar saved_state: saved reader state (used when processing $INCLUDE) - @type saved_state: list of (tokenizer, current_origin, last_name, file, - last_ttl, last_ttl_known, default_ttl, default_ttl_known) tuples. - @ivar current_file: the file object of the $INCLUDed file being parsed - (None if no $INCLUDE is active). - @ivar allow_include: is $INCLUDE allowed? - @type allow_include: bool - @ivar check_origin: should sanity checks of the origin node be done? - The default is True. - @type check_origin: bool - """ + def writer(self, replacement=False): + return Transaction(replacement, False, self) - def __init__(self, tok, origin, rdclass, relativize, zone_factory=Zone, - allow_include=False, check_origin=True): - if isinstance(origin, str): - origin = dns.name.from_text(origin) - self.tok = tok - self.current_origin = origin - self.relativize = relativize - self.last_ttl = 0 - self.last_ttl_known = False - self.default_ttl = 0 - self.default_ttl_known = False - self.last_name = self.current_origin - self.zone = zone_factory(origin, rdclass, relativize=relativize) - self.saved_state = [] - self.current_file = None - self.allow_include = allow_include - self.check_origin = check_origin - - def _eat_line(self): - while 1: - token = self.tok.get() - if token.is_eol_or_eof(): - break - - def _rr_line(self): - """Process one line from a DNS master file.""" - # Name - if self.current_origin is None: - raise UnknownOrigin - token = self.tok.get(want_leading=True) - if not token.is_whitespace(): - self.last_name = self.tok.as_name(token, self.current_origin) - else: - token = self.tok.get() - if token.is_eol_or_eof(): - # treat leading WS followed by EOL/EOF as if they were EOL/EOF. - return - self.tok.unget(token) - name = self.last_name - if not name.is_subdomain(self.zone.origin): - self._eat_line() - return - if self.relativize: - name = name.relativize(self.zone.origin) - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - # TTL - ttl = None - try: - ttl = dns.ttl.from_text(token.value) - self.last_ttl = ttl - self.last_ttl_known = True - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except dns.ttl.BadTTL: - if self.default_ttl_known: - ttl = self.default_ttl - elif self.last_ttl_known: - ttl = self.last_ttl - - # Class - try: - rdclass = dns.rdataclass.from_text(token.value) - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except dns.exception.SyntaxError: - raise - except Exception: - rdclass = self.zone.rdclass - if rdclass != self.zone.rdclass: - raise dns.exception.SyntaxError("RR class is not zone's class") - # Type - try: - rdtype = dns.rdatatype.from_text(token.value) - except Exception: - raise dns.exception.SyntaxError( - "unknown rdatatype '%s'" % token.value) - n = self.zone.nodes.get(name) - if n is None: - n = self.zone.node_factory() - self.zone.nodes[name] = n - try: - rd = dns.rdata.from_text(rdclass, rdtype, self.tok, - self.current_origin, self.relativize, - self.zone.origin) - except dns.exception.SyntaxError: - # Catch and reraise. - raise - except Exception: - # All exceptions that occur in the processing of rdata - # are treated as syntax errors. This is not strictly - # correct, but it is correct almost all of the time. - # We convert them to syntax errors so that we can emit - # helpful filename:line info. - (ty, va) = sys.exc_info()[:2] - raise dns.exception.SyntaxError( - "caught exception {}: {}".format(str(ty), str(va))) - - if not self.default_ttl_known and rdtype == dns.rdatatype.SOA: - # The pre-RFC2308 and pre-BIND9 behavior inherits the zone default - # TTL from the SOA minttl if no $TTL statement is present before the - # SOA is parsed. - self.default_ttl = rd.minimum - self.default_ttl_known = True - if ttl is None: - # if we didn't have a TTL on the SOA, set it! - ttl = rd.minimum - - # TTL check. We had to wait until now to do this as the SOA RR's - # own TTL can be inferred from its minimum. - if ttl is None: - raise dns.exception.SyntaxError("Missing default TTL value") - - covers = rd.covers() - rds = n.find_rdataset(rdclass, rdtype, covers, True) - rds.add(rd, ttl) - - def _parse_modify(self, side): - # Here we catch everything in '{' '}' in a group so we can replace it - # with ''. - is_generate1 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+),(.)}).*$") - is_generate2 = re.compile(r"^.*\$({(\+|-?)(\d+)}).*$") - is_generate3 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+)}).*$") - # Sometimes there are modifiers in the hostname. These come after - # the dollar sign. They are in the form: ${offset[,width[,base]]}. - # Make names - g1 = is_generate1.match(side) - if g1: - mod, sign, offset, width, base = g1.groups() - if sign == '': - sign = '+' - g2 = is_generate2.match(side) - if g2: - mod, sign, offset = g2.groups() - if sign == '': - sign = '+' - width = 0 - base = 'd' - g3 = is_generate3.match(side) - if g3: - mod, sign, offset, width = g3.groups() - if sign == '': - sign = '+' - base = 'd' - - if not (g1 or g2 or g3): - mod = '' - sign = '+' - offset = 0 - width = 0 - base = 'd' - - if base != 'd': - raise NotImplementedError() - - return mod, sign, offset, width, base - - def _generate_line(self): - # range lhs [ttl] [class] type rhs [ comment ] - """Process one line containing the GENERATE statement from a DNS - master file.""" - if self.current_origin is None: - raise UnknownOrigin - - token = self.tok.get() - # Range (required) - try: - start, stop, step = dns.grange.from_text(token.value) - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except Exception: - raise dns.exception.SyntaxError - - # lhs (required) - try: - lhs = token.value - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except Exception: - raise dns.exception.SyntaxError - - # TTL - try: - ttl = dns.ttl.from_text(token.value) - self.last_ttl = ttl - self.last_ttl_known = True - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except dns.ttl.BadTTL: - if not (self.last_ttl_known or self.default_ttl_known): - raise dns.exception.SyntaxError("Missing default TTL value") - if self.default_ttl_known: - ttl = self.default_ttl - elif self.last_ttl_known: - ttl = self.last_ttl - # Class - try: - rdclass = dns.rdataclass.from_text(token.value) - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except dns.exception.SyntaxError: - raise dns.exception.SyntaxError - except Exception: - rdclass = self.zone.rdclass +class Transaction(dns.transaction.Transaction): + + _deleted_rdataset = dns.rdataset.Rdataset(dns.rdataclass.ANY, + dns.rdatatype.ANY) + + def __init__(self, replacement, read_only, zone): + super().__init__(replacement, read_only) + self.zone = zone + self.rdatasets = {} + + def _get_rdataset(self, name, rdclass, rdtype, covers): if rdclass != self.zone.rdclass: - raise dns.exception.SyntaxError("RR class is not zone's class") - # Type - try: - rdtype = dns.rdatatype.from_text(token.value) - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError - except Exception: - raise dns.exception.SyntaxError("unknown rdatatype '%s'" % - token.value) - - # rhs (required) - rhs = token.value - - # The code currently only supports base 'd', so the last value - # in the tuple _parse_modify returns is ignored - lmod, lsign, loffset, lwidth, _ = self._parse_modify(lhs) - rmod, rsign, roffset, rwidth, _ = self._parse_modify(rhs) - for i in range(start, stop + 1, step): - # +1 because bind is inclusive and python is exclusive - - if lsign == '+': - lindex = i + int(loffset) - elif lsign == '-': - lindex = i - int(loffset) - - if rsign == '-': - rindex = i - int(roffset) - elif rsign == '+': - rindex = i + int(roffset) - - lzfindex = str(lindex).zfill(int(lwidth)) - rzfindex = str(rindex).zfill(int(rwidth)) - - name = lhs.replace('$%s' % (lmod), lzfindex) - rdata = rhs.replace('$%s' % (rmod), rzfindex) - - self.last_name = dns.name.from_text(name, self.current_origin, - self.tok.idna_codec) - name = self.last_name - if not name.is_subdomain(self.zone.origin): - self._eat_line() - return - if self.relativize: - name = name.relativize(self.zone.origin) - - n = self.zone.nodes.get(name) - if n is None: - n = self.zone.node_factory() - self.zone.nodes[name] = n - try: - rd = dns.rdata.from_text(rdclass, rdtype, rdata, - self.current_origin, self.relativize, - self.zone.origin) - except dns.exception.SyntaxError: - # Catch and reraise. - raise - except Exception: - # All exceptions that occur in the processing of rdata - # are treated as syntax errors. This is not strictly - # correct, but it is correct almost all of the time. - # We convert them to syntax errors so that we can emit - # helpful filename:line info. - (ty, va) = sys.exc_info()[:2] - raise dns.exception.SyntaxError("caught exception %s: %s" % - (str(ty), str(va))) - - covers = rd.covers() - rds = n.find_rdataset(rdclass, rdtype, covers, True) - rds.add(rd, ttl) - - def read(self): - """Read a DNS master file and build a zone object. - - @raises dns.zone.NoSOA: No SOA RR was found at the zone origin - @raises dns.zone.NoNS: No NS RRset was found at the zone origin - """ + raise ValueError(f'class {rdclass} != ' + + f'zone class {self.zone.rdclass}') + rdataset = self.rdatasets.get((name, rdtype, covers)) + if rdataset is self._deleted_rdataset: + return None + elif rdataset is None: + rdataset = self.zone.get_rdataset(name, rdtype, covers) + return rdataset + def _put_rdataset(self, name, rdataset): + assert not self.read_only + self.zone._validate_name(name) + if rdataset.rdclass != self.zone.rdclass: + raise ValueError(f'rdataset class {rdataset.rdclass} != ' + + f'zone class {self.zone.rdclass}') + self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset + + def _delete_name(self, name): + assert not self.read_only + # First remove any changes involving the name + remove = [] + for key in self.rdatasets: + if key[0] == name: + remove.append(key) + if len(remove) > 0: + for key in remove: + del self.rdatasets[key] + # Next add deletion records for any rdatasets matching the + # name in the zone + node = self.zone.get_node(name) + if node is not None: + for rdataset in node.rdatasets: + self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = \ + self._deleted_rdataset + + def _delete_rdataset(self, name, rdclass, rdtype, covers): + assert not self.read_only + # The high-level code always does a _get_rdataset() before any + # situation where it would call _delete_rdataset(), so we don't + # need to check if rdclass != self.zone.rdclass. try: - while 1: - token = self.tok.get(True, True) - if token.is_eof(): - if self.current_file is not None: - self.current_file.close() - if len(self.saved_state) > 0: - (self.tok, - self.current_origin, - self.last_name, - self.current_file, - self.last_ttl, - self.last_ttl_known, - self.default_ttl, - self.default_ttl_known) = self.saved_state.pop(-1) - continue - break - elif token.is_eol(): - continue - elif token.is_comment(): - self.tok.get_eol() - continue - elif token.value[0] == '$': - c = token.value.upper() - if c == '$TTL': - token = self.tok.get() - if not token.is_identifier(): - raise dns.exception.SyntaxError("bad $TTL") - self.default_ttl = dns.ttl.from_text(token.value) - self.default_ttl_known = True - self.tok.get_eol() - elif c == '$ORIGIN': - self.current_origin = self.tok.get_name() - self.tok.get_eol() - if self.zone.origin is None: - self.zone.origin = self.current_origin - elif c == '$INCLUDE' and self.allow_include: - token = self.tok.get() - filename = token.value - token = self.tok.get() - if token.is_identifier(): - new_origin =\ - dns.name.from_text(token.value, - self.current_origin, - self.tok.idna_codec) - self.tok.get_eol() - elif not token.is_eol_or_eof(): - raise dns.exception.SyntaxError( - "bad origin in $INCLUDE") - else: - new_origin = self.current_origin - self.saved_state.append((self.tok, - self.current_origin, - self.last_name, - self.current_file, - self.last_ttl, - self.last_ttl_known, - self.default_ttl, - self.default_ttl_known)) - self.current_file = open(filename, 'r') - self.tok = dns.tokenizer.Tokenizer(self.current_file, - filename) - self.current_origin = new_origin - elif c == '$GENERATE': - self._generate_line() - else: - raise dns.exception.SyntaxError( - "Unknown master file directive '" + c + "'") - continue - self.tok.unget(token) - self._rr_line() - except dns.exception.SyntaxError as detail: - (filename, line_number) = self.tok.where() - if detail is None: - detail = "syntax error" - ex = dns.exception.SyntaxError( - "%s:%d: %s" % (filename, line_number, detail)) - tb = sys.exc_info()[2] - raise ex.with_traceback(tb) from None - - # Now that we're done reading, do some basic checking of the zone. - if self.check_origin: - self.zone.check_origin() + del self.rdatasets[(name, rdtype, covers)] + except KeyError: + pass + rdataset = self.zone.get_rdataset(name, rdtype, covers) + if rdataset is not None: + self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = \ + self._deleted_rdataset + + def _name_exists(self, name): + for key, rdataset in self.rdatasets.items(): + if key[0] == name: + if rdataset != self._deleted_rdataset: + return True + else: + return None + self.zone._validate_name(name) + if self.zone.get_node(name): + return True + return False + + def _end_transaction(self, commit): + if commit and not self.read_only: + for (name, rdtype, covers), rdataset in \ + self.rdatasets.items(): + if rdataset is self._deleted_rdataset: + self.zone.delete_rdataset(name, rdtype, covers) + else: + self.zone.replace_rdataset(name, rdataset) + + def _set_origin(self, origin): + if self.zone.origin is None: + self.zone.origin = origin + + def _iterate_rdatasets(self): + # Expensive but simple! Use a versioned zone for efficient txn + # iteration. + rdatasets = {} + for (name, rdataset) in self.zone.iterate_rdatasets(): + rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset + rdatasets.update(self.rdatasets) + for (name, _, _), rdataset in rdatasets.items(): + yield (name, rdataset) def from_text(text, origin=None, rdclass=dns.rdataclass.IN, @@ -1103,12 +796,20 @@ def from_text(text, origin=None, rdclass=dns.rdataclass.IN, if filename is None: filename = '<string>' - tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec) - reader = _MasterReader(tok, origin, rdclass, relativize, zone_factory, - allow_include=allow_include, - check_origin=check_origin) - reader.read() - return reader.zone + zone = zone_factory(origin, rdclass, relativize=relativize) + with zone.writer(True) as txn: + tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec) + reader = dns.masterfile.Reader(tok, origin, rdclass, relativize, txn, + allow_include=allow_include) + try: + reader.read() + except dns.masterfile.UnknownOrigin: + # for backwards compatibility + raise dns.zone.UnknownOrigin + # Now that we're done reading, do some basic checking of the zone. + if check_origin: + zone.check_origin() + return zone def from_file(f, origin=None, rdclass=dns.rdataclass.IN, |