diff options
author | Bob Halley <halley@nominum.com> | 2010-01-12 13:31:58 -0800 |
---|---|---|
committer | Bob Halley <halley@nominum.com> | 2010-01-12 13:31:58 -0800 |
commit | 45f795046415a44d41b203e2ba10b72a2abcf610 (patch) | |
tree | 8a0c5145bdfda87b2cc739bdaeddb450a616fbaf | |
parent | 7f397a5a53b65a5c434c6686318c501f8548cc18 (diff) | |
download | dnspython-45f795046415a44d41b203e2ba10b72a2abcf610.tar.gz |
make a proper token object to facilitate future tokenization work
-rw-r--r-- | dns/message.py | 47 | ||||
-rw-r--r-- | dns/rdata.py | 10 | ||||
-rw-r--r-- | dns/rdtypes/ANY/CERT.py | 16 | ||||
-rw-r--r-- | dns/rdtypes/ANY/HIP.py | 6 | ||||
-rw-r--r-- | dns/rdtypes/ANY/ISDN.py | 10 | ||||
-rw-r--r-- | dns/rdtypes/ANY/LOC.py | 39 | ||||
-rw-r--r-- | dns/rdtypes/ANY/NSEC.py | 16 | ||||
-rw-r--r-- | dns/rdtypes/ANY/NSEC3.py | 6 | ||||
-rw-r--r-- | dns/rdtypes/ANY/NXT.py | 10 | ||||
-rw-r--r-- | dns/rdtypes/IN/A.py | 14 | ||||
-rw-r--r-- | dns/rdtypes/IN/AAAA.py | 14 | ||||
-rw-r--r-- | dns/rdtypes/IN/APL.py | 19 | ||||
-rw-r--r-- | dns/rdtypes/IN/DHCID.py | 14 | ||||
-rw-r--r-- | dns/rdtypes/IN/IPSECKEY.py | 6 | ||||
-rw-r--r-- | dns/rdtypes/IN/WKS.py | 18 | ||||
-rw-r--r-- | dns/rdtypes/dsbase.py | 6 | ||||
-rw-r--r-- | dns/rdtypes/keybase.py | 16 | ||||
-rw-r--r-- | dns/rdtypes/sigbase.py | 18 | ||||
-rw-r--r-- | dns/rdtypes/txtbase.py | 19 | ||||
-rw-r--r-- | dns/tokenizer.py | 192 | ||||
-rw-r--r-- | dns/zone.py | 97 | ||||
-rw-r--r-- | tests/tokenizer.py | 95 |
22 files changed, 375 insertions, 313 deletions
diff --git a/dns/message.py b/dns/message.py index 20769d4..af6bb46 100644 --- a/dns/message.py +++ b/dns/message.py @@ -803,17 +803,18 @@ class _TextReader(object): def _header_line(self, section): """Process one line from the text format header section.""" - (ttype, what) = self.tok.get() + token = self.tok.get() + what = token.value if what == 'id': self.message.id = self.tok.get_int() elif what == 'flags': while True: token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): self.tok.unget(token) break self.message.flags = self.message.flags | \ - dns.flags.from_text(token[1]) + dns.flags.from_text(token.value) if dns.opcode.is_update(self.message.flags): self.updating = True elif what == 'edns': @@ -825,11 +826,11 @@ class _TextReader(object): self.message.edns = 0 while True: token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): self.tok.unget(token) break self.message.ednsflags = self.message.ednsflags | \ - dns.flags.edns_from_text(token[1]) + dns.flags.edns_from_text(token.value) elif what == 'payload': self.message.payload = self.tok.get_int() if self.message.edns < 0: @@ -849,24 +850,24 @@ class _TextReader(object): """Process one line from the text format question section.""" token = self.tok.get(want_leading = True) - if token[0] != dns.tokenizer.WHITESPACE: - self.last_name = dns.name.from_text(token[1], None) + if not token.is_whitespace(): + self.last_name = dns.name.from_text(token.value, None) name = self.last_name token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError # Class try: - rdclass = dns.rdataclass.from_text(token[1]) + rdclass = dns.rdataclass.from_text(token.value) token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError except dns.exception.SyntaxError: raise dns.exception.SyntaxError except: rdclass = dns.rdataclass.IN # Type - rdtype = dns.rdatatype.from_text(token[1]) + rdtype = dns.rdatatype.from_text(token.value) self.message.find_rrset(self.message.question, name, rdclass, rdtype, create=True, force_unique=True) @@ -882,17 +883,17 @@ class _TextReader(object): deleting = None # Name token = self.tok.get(want_leading = True) - if token[0] != dns.tokenizer.WHITESPACE: - self.last_name = dns.name.from_text(token[1], None) + if not token.is_whitespace(): + self.last_name = dns.name.from_text(token.value, None) name = self.last_name token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError # TTL try: - ttl = int(token[1], 0) + ttl = int(token.value, 0) token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError except dns.exception.SyntaxError: raise dns.exception.SyntaxError @@ -900,9 +901,9 @@ class _TextReader(object): ttl = 0 # Class try: - rdclass = dns.rdataclass.from_text(token[1]) + rdclass = dns.rdataclass.from_text(token.value) token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError if rdclass == dns.rdataclass.ANY or rdclass == dns.rdataclass.NONE: deleting = rdclass @@ -912,9 +913,9 @@ class _TextReader(object): except: rdclass = dns.rdataclass.IN # Type - rdtype = dns.rdatatype.from_text(token[1]) + rdtype = dns.rdatatype.from_text(token.value) token = self.tok.get() - if token[0] != dns.tokenizer.EOL and token[0] != dns.tokenizer.EOF: + if not token.is_eol_or_eof(): self.tok.unget(token) rd = dns.rdata.from_text(rdclass, rdtype, self.tok, None) covers = rd.covers() @@ -935,10 +936,10 @@ class _TextReader(object): section = None while 1: token = self.tok.get(True, True) - if token[0] == dns.tokenizer.EOL or token[0] == dns.tokenizer.EOF: + if token.is_eol_or_eof(): break - if token[0] == dns.tokenizer.COMMENT: - u = token[1].upper() + if token.is_comment(): + u = token.value.upper() if u == 'HEADER': line_method = self._header_line elif u == 'QUESTION' or u == 'ZONE': diff --git a/dns/rdata.py b/dns/rdata.py index f371fa7..3d1fd99 100644 --- a/dns/rdata.py +++ b/dns/rdata.py @@ -325,10 +325,10 @@ class GenericRdata(Rdata): length = tok.get_int() chunks = [] while 1: - (ttype, value) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - chunks.append(value) + chunks.append(token.value) hex = ''.join(chunks) data = hex.decode('hex_codec') if len(data) != length: @@ -415,8 +415,8 @@ def from_text(rdclass, rdtype, tok, origin = None, relativize = True): # peek at first token token = tok.get() tok.unget(token) - if token[0] == dns.tokenizer.IDENTIFIER and \ - token[1] == r'\#': + if token.is_identifier and \ + token.value == r'\#': # # Known type using the generic syntax. Extract the # wire form from the generic syntax, and then run diff --git a/dns/rdtypes/ANY/CERT.py b/dns/rdtypes/ANY/CERT.py index f76e7ac..225a921 100644 --- a/dns/rdtypes/ANY/CERT.py +++ b/dns/rdtypes/ANY/CERT.py @@ -63,7 +63,7 @@ class CERT(dns.rdata.Rdata): @see: RFC 2538""" __slots__ = ['certificate_type', 'key_tag', 'algorithm', 'certificate'] - + def __init__(self, rdclass, rdtype, certificate_type, key_tag, algorithm, certificate): super(CERT, self).__init__(rdclass, rdtype) @@ -77,7 +77,7 @@ class CERT(dns.rdata.Rdata): return "%s %d %s %s" % (certificate_type, self.key_tag, dns.dnssec.algorithm_to_text(self.algorithm), dns.rdata._base64ify(self.certificate)) - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): certificate_type = _ctype_from_text(tok.get_string()) key_tag = tok.get_uint16() @@ -87,16 +87,16 @@ class CERT(dns.rdata.Rdata): chunks = [] while 1: t = tok.get() - if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF: + if t.is_eol_or_eof(): break - if t[0] != dns.tokenizer.IDENTIFIER: + if not t.is_identifier(): raise dns.exception.SyntaxError - chunks.append(t[1]) + chunks.append(t.value) b64 = ''.join(chunks) certificate = b64.decode('base64_codec') return cls(rdclass, rdtype, certificate_type, key_tag, algorithm, certificate) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -104,7 +104,7 @@ class CERT(dns.rdata.Rdata): self.algorithm) file.write(prefix) file.write(self.certificate) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): prefix = wire[current : current + 5] current += 5 @@ -127,5 +127,5 @@ class CERT(dns.rdata.Rdata): other.to_wire(f) wire2 = f.getvalue() f.close() - + return cmp(wire1, wire2) diff --git a/dns/rdtypes/ANY/HIP.py b/dns/rdtypes/ANY/HIP.py index 011ec43..53a2c6c 100644 --- a/dns/rdtypes/ANY/HIP.py +++ b/dns/rdtypes/ANY/HIP.py @@ -62,10 +62,10 @@ class HIP(dns.rdata.Rdata): key = tok.get_string().decode('base64-codec') servers = [] while 1: - (ttype, value) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - server = dns.name.from_text(value, origin) + server = dns.name.from_text(token.value, origin) server.choose_relativity(origin, relativize) servers.append(server) return cls(rdclass, rdtype, hit, algorithm, key, servers) diff --git a/dns/rdtypes/ANY/ISDN.py b/dns/rdtypes/ANY/ISDN.py index 6006ba0..f33b5ad 100644 --- a/dns/rdtypes/ANY/ISDN.py +++ b/dns/rdtypes/ANY/ISDN.py @@ -27,7 +27,7 @@ class ISDN(dns.rdata.Rdata): @see: RFC 1183""" __slots__ = ['address', 'subaddress'] - + def __init__(self, rdclass, rdtype, address, subaddress): super(ISDN, self).__init__(rdclass, rdtype) self.address = address @@ -39,11 +39,11 @@ class ISDN(dns.rdata.Rdata): dns.rdata._escapify(self.subaddress)) else: return '"%s"' % dns.rdata._escapify(self.address) - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): address = tok.get_string() t = tok.get() - if t[0] != dns.tokenizer.EOL and t[0] != dns.tokenizer.EOF: + if not t.is_eol_or_eof(): tok.unget(t) subaddress = tok.get_string() else: @@ -51,7 +51,7 @@ class ISDN(dns.rdata.Rdata): subaddress = '' tok.get_eol() return cls(rdclass, rdtype, address, subaddress) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -66,7 +66,7 @@ class ISDN(dns.rdata.Rdata): byte = chr(l) file.write(byte) file.write(self.subaddress) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): l = ord(wire[current]) current += 1 diff --git a/dns/rdtypes/ANY/LOC.py b/dns/rdtypes/ANY/LOC.py index 733e966..7fe1a80 100644 --- a/dns/rdtypes/ANY/LOC.py +++ b/dns/rdtypes/ANY/LOC.py @@ -96,7 +96,7 @@ class LOC(dns.rdata.Rdata): __slots__ = ['latitude', 'longitude', 'altitude', 'size', 'horizontal_precision', 'vertical_precision'] - + def __init__(self, rdclass, rdtype, latitude, longitude, altitude, size=1.0, hprec=10000.0, vprec=10.0): """Initialize a LOC record instance. @@ -105,7 +105,7 @@ class LOC(dns.rdata.Rdata): of integers specifying (degrees, minutes, seconds, milliseconds), or they may be floating point values specifying the number of degrees. The other parameters are floats.""" - + super(LOC, self).__init__(rdclass, rdtype) if isinstance(latitude, int) or isinstance(latitude, long): latitude = float(latitude) @@ -148,14 +148,14 @@ class LOC(dns.rdata.Rdata): self.vertical_precision / 100.0 ) return text - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): latitude = [0, 0, 0, 0] longitude = [0, 0, 0, 0] size = 1.0 hprec = 10000.0 vprec = 10.0 - + latitude[0] = tok.get_int() t = tok.get_string() if t.isdigit(): @@ -229,30 +229,29 @@ class LOC(dns.rdata.Rdata): t = t[0 : -1] altitude = float(t) * 100.0 # m -> cm - (ttype, value) = tok.get() - if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF: + token = tok.get() + if not token.is_eol_or_eof(): + value = token.value if value[-1] == 'm': value = value[0 : -1] size = float(value) * 100.0 # m -> cm - (ttype, value) = tok.get() - if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF: + token = tok.get() + if not token.is_eol_or_eof(): + value = token.value if value[-1] == 'm': value = value[0 : -1] hprec = float(value) * 100.0 # m -> cm - (ttype, value) = tok.get() - if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF: + token = tok.get() + if not token.is_eol_or_eof(): + value = token.value if value[-1] == 'm': value = value[0 : -1] vprec = float(value) * 100.0 # m -> cm - (ttype, value) = tok.get() - if ttype != dns.tokenizer.EOL and \ - ttype != dns.tokenizer.EOF: - raise dns.exception.SyntaxError, \ - "expected EOL or EOF" + tok.get_eol() return cls(rdclass, rdtype, latitude, longitude, altitude, size, hprec, vprec) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -262,7 +261,7 @@ class LOC(dns.rdata.Rdata): else: sign = 1 degrees = long(self.latitude[0]) - milliseconds = (degrees * 3600000 + + milliseconds = (degrees * 3600000 + self.latitude[1] * 60000 + self.latitude[2] * 1000 + self.latitude[3]) * sign @@ -273,7 +272,7 @@ class LOC(dns.rdata.Rdata): else: sign = 1 degrees = long(self.longitude[0]) - milliseconds = (degrees * 3600000 + + milliseconds = (degrees * 3600000 + self.longitude[1] * 60000 + self.longitude[2] * 1000 + self.longitude[3]) * sign @@ -285,7 +284,7 @@ class LOC(dns.rdata.Rdata): wire = struct.pack("!BBBBIII", 0, size, hprec, vprec, latitude, longitude, altitude) file.write(wire) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): (version, size, hprec, vprec, latitude, longitude, altitude) = \ struct.unpack("!BBBBIII", wire[current : current + rdlen]) @@ -319,7 +318,7 @@ class LOC(dns.rdata.Rdata): other.to_wire(f) wire2 = f.getvalue() f.close() - + return cmp(wire1, wire2) def _get_float_latitude(self): diff --git a/dns/rdtypes/ANY/NSEC.py b/dns/rdtypes/ANY/NSEC.py index 2d8b367..ee9a009 100644 --- a/dns/rdtypes/ANY/NSEC.py +++ b/dns/rdtypes/ANY/NSEC.py @@ -29,7 +29,7 @@ class NSEC(dns.rdata.Rdata): @type windows: list of (window number, string) tuples""" __slots__ = ['next', 'windows'] - + def __init__(self, rdclass, rdtype, next, windows): super(NSEC, self).__init__(rdclass, rdtype) self.next = next @@ -48,16 +48,16 @@ class NSEC(dns.rdata.Rdata): i * 8 + j)) text += (' ' + ' '.join(bits)) return '%s%s' % (next, text) - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): next = tok.get_name() next = next.choose_relativity(origin, relativize) rdtypes = [] while 1: - (ttype, value) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - nrdtype = dns.rdatatype.from_text(value) + nrdtype = dns.rdatatype.from_text(token.value) if nrdtype == 0: raise dns.exception.SyntaxError, "NSEC with bit 0" if nrdtype > 65535: @@ -85,7 +85,7 @@ class NSEC(dns.rdata.Rdata): bitmap[byte] = chr(ord(bitmap[byte]) | (0x80 >> bit)) windows.append((window, ''.join(bitmap[0:octets]))) return cls(rdclass, rdtype, next, windows) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -94,7 +94,7 @@ class NSEC(dns.rdata.Rdata): file.write(chr(window)) file.write(chr(len(bitmap))) file.write(bitmap) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): (next, cused) = dns.name.from_wire(wire[: current + rdlen], current) current += cused @@ -123,7 +123,7 @@ class NSEC(dns.rdata.Rdata): def choose_relativity(self, origin = None, relativize = True): self.next = self.next.choose_relativity(origin, relativize) - + def _cmp(self, other): v = cmp(self.next, other.next) if v == 0: diff --git a/dns/rdtypes/ANY/NSEC3.py b/dns/rdtypes/ANY/NSEC3.py index e568d7f..1d1782f 100644 --- a/dns/rdtypes/ANY/NSEC3.py +++ b/dns/rdtypes/ANY/NSEC3.py @@ -93,10 +93,10 @@ class NSEC3(dns.rdata.Rdata): next = base64.b32decode(next) rdtypes = [] while 1: - (ttype, value) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - nrdtype = dns.rdatatype.from_text(value) + nrdtype = dns.rdatatype.from_text(token.value) if nrdtype == 0: raise dns.exception.SyntaxError, "NSEC3 with bit 0" if nrdtype > 65535: diff --git a/dns/rdtypes/ANY/NXT.py b/dns/rdtypes/ANY/NXT.py index 4b2571c..10443a3 100644 --- a/dns/rdtypes/ANY/NXT.py +++ b/dns/rdtypes/ANY/NXT.py @@ -53,13 +53,13 @@ class NXT(dns.rdata.Rdata): '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00' ] while 1: - (ttype, value) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - if value.isdigit(): - nrdtype = int(value) + if token.value.isdigit(): + nrdtype = int(token.value) else: - nrdtype = dns.rdatatype.from_text(value) + nrdtype = dns.rdatatype.from_text(token.value) if nrdtype == 0: raise dns.exception.SyntaxError, "NXT with bit 0" if nrdtype > 127: diff --git a/dns/rdtypes/IN/A.py b/dns/rdtypes/IN/A.py index 098a275..71654f2 100644 --- a/dns/rdtypes/IN/A.py +++ b/dns/rdtypes/IN/A.py @@ -25,7 +25,7 @@ class A(dns.rdata.Rdata): @type address: string (in the standard "dotted quad" format)""" __slots__ = ['address'] - + def __init__(self, rdclass, rdtype, address): super(A, self).__init__(rdclass, rdtype) # check that it's OK @@ -34,19 +34,17 @@ class A(dns.rdata.Rdata): def to_text(self, origin=None, relativize=True, **kw): return self.address - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): - (ttype, address) = tok.get() - if ttype != dns.tokenizer.IDENTIFIER: - raise dns.exception.SyntaxError - t = tok.get_eol() + address = tok.get_identifier() + tok.get_eol() return cls(rdclass, rdtype, address) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): file.write(dns.ipv4.inet_aton(self.address)) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): address = dns.ipv4.inet_ntoa(wire[current : current + rdlen]) return cls(rdclass, rdtype, address) diff --git a/dns/rdtypes/IN/AAAA.py b/dns/rdtypes/IN/AAAA.py index 086128e..dfa2606 100644 --- a/dns/rdtypes/IN/AAAA.py +++ b/dns/rdtypes/IN/AAAA.py @@ -25,7 +25,7 @@ class AAAA(dns.rdata.Rdata): @type address: string (in the standard IPv6 format)""" __slots__ = ['address'] - + def __init__(self, rdclass, rdtype, address): super(AAAA, self).__init__(rdclass, rdtype) # check that it's OK @@ -34,19 +34,17 @@ class AAAA(dns.rdata.Rdata): def to_text(self, origin=None, relativize=True, **kw): return self.address - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): - (ttype, address) = tok.get() - if ttype != dns.tokenizer.IDENTIFIER: - raise dns.exception.SyntaxError - t = tok.get_eol() + address = tok.get_identifier() + tok.get_eol() return cls(rdclass, rdtype, address) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): file.write(dns.inet.inet_pton(dns.inet.AF_INET6, self.address)) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): address = dns.inet.inet_ntop(dns.inet.AF_INET6, wire[current : current + rdlen]) diff --git a/dns/rdtypes/IN/APL.py b/dns/rdtypes/IN/APL.py index 3cefb25..21c02d5 100644 --- a/dns/rdtypes/IN/APL.py +++ b/dns/rdtypes/IN/APL.py @@ -23,7 +23,7 @@ import dns.tokenizer class APLItem(object): """An APL list item. - + @ivar family: the address family (IANA address family registry) @type family: int @ivar negation: is this item negated? @@ -35,7 +35,7 @@ class APLItem(object): """ __slots__ = ['family', 'negation', 'address', 'prefix'] - + def __init__(self, family, negation, address, prefix): self.family = family self.negation = negation @@ -80,20 +80,21 @@ class APL(dns.rdata.Rdata): @see: RFC 3123""" __slots__ = ['items'] - + def __init__(self, rdclass, rdtype, items): super(APL, self).__init__(rdclass, rdtype) self.items = items def to_text(self, origin=None, relativize=True, **kw): return ' '.join(map(lambda x: str(x), self.items)) - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): items = [] while 1: - (ttype, item) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break + item = token.value if item[0] == '!': negation = True item = item[1:] @@ -107,13 +108,13 @@ class APL(dns.rdata.Rdata): items.append(item) return cls(rdclass, rdtype, items) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): for item in self.items: item.to_wire(file) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): items = [] while 1: @@ -165,5 +166,5 @@ class APL(dns.rdata.Rdata): other.to_wire(f) wire2 = f.getvalue() f.close() - + return cmp(wire1, wire2) diff --git a/dns/rdtypes/IN/DHCID.py b/dns/rdtypes/IN/DHCID.py index 676e609..934d478 100644 --- a/dns/rdtypes/IN/DHCID.py +++ b/dns/rdtypes/IN/DHCID.py @@ -24,7 +24,7 @@ class DHCID(dns.rdata.Rdata): @see: RFC 4701""" __slots__ = ['data'] - + def __init__(self, rdclass, rdtype, data): super(DHCID, self).__init__(rdclass, rdtype) self.data = data @@ -36,20 +36,20 @@ class DHCID(dns.rdata.Rdata): chunks = [] while 1: t = tok.get() - if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF: + if t.is_eol_or_eof(): break - if t[0] != dns.tokenizer.IDENTIFIER: + if not t.is_identifier(): raise dns.exception.SyntaxError - chunks.append(t[1]) + chunks.append(t.value) b64 = ''.join(chunks) data = b64.decode('base64_codec') return cls(rdclass, rdtype, data) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): file.write(self.data) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): data = wire[current : current + rdlen] return cls(rdclass, rdtype, data) @@ -57,4 +57,4 @@ class DHCID(dns.rdata.Rdata): from_wire = classmethod(from_wire) def _cmp(self, other): - return cmp(self.data, other.data) + return cmp(self.data, other.data) diff --git a/dns/rdtypes/IN/IPSECKEY.py b/dns/rdtypes/IN/IPSECKEY.py index 4aba53b..15eb546 100644 --- a/dns/rdtypes/IN/IPSECKEY.py +++ b/dns/rdtypes/IN/IPSECKEY.py @@ -87,11 +87,11 @@ class IPSECKEY(dns.rdata.Rdata): chunks = [] while 1: t = tok.get() - if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF: + if t.is_eol_or_eof(): break - if t[0] != dns.tokenizer.IDENTIFIER: + if not t.is_identifier(): raise dns.exception.SyntaxError - chunks.append(t[1]) + chunks.append(t.value) b64 = ''.join(chunks) key = b64.decode('base64_codec') return cls(rdclass, rdtype, precedence, gateway_type, algorithm, diff --git a/dns/rdtypes/IN/WKS.py b/dns/rdtypes/IN/WKS.py index 9c15840..4fdded6 100644 --- a/dns/rdtypes/IN/WKS.py +++ b/dns/rdtypes/IN/WKS.py @@ -34,7 +34,7 @@ class WKS(dns.rdata.Rdata): @see: RFC 1035""" __slots__ = ['address', 'protocol', 'bitmap'] - + def __init__(self, rdclass, rdtype, address, protocol, bitmap): super(WKS, self).__init__(rdclass, rdtype) self.address = address @@ -50,7 +50,7 @@ class WKS(dns.rdata.Rdata): bits.append(str(i * 8 + j)) text = ' '.join(bits) return '%s %d %s' % (self.address, self.protocol, text) - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): address = tok.get_string() protocol = tok.get_string() @@ -60,11 +60,11 @@ class WKS(dns.rdata.Rdata): protocol = socket.getprotobyname(protocol) bitmap = [] while 1: - (ttype, value) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - if value.isdigit(): - serv = int(value) + if token.value.isdigit(): + serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError, "protocol must be TCP or UDP" @@ -72,7 +72,7 @@ class WKS(dns.rdata.Rdata): protocol_text = "udp" else: protocol_text = "tcp" - serv = socket.getservbyname(value, protocol_text) + serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: @@ -81,7 +81,7 @@ class WKS(dns.rdata.Rdata): bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8))) bitmap = dns.rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -89,7 +89,7 @@ class WKS(dns.rdata.Rdata): protocol = struct.pack('!B', self.protocol) file.write(protocol) file.write(self.bitmap) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): address = dns.ipv4.inet_ntoa(wire[current : current + 4]) protocol, = struct.unpack('!B', wire[current + 4 : current + 5]) diff --git a/dns/rdtypes/dsbase.py b/dns/rdtypes/dsbase.py index 6ba2123..09b86cc 100644 --- a/dns/rdtypes/dsbase.py +++ b/dns/rdtypes/dsbase.py @@ -54,11 +54,11 @@ class DSBase(dns.rdata.Rdata): chunks = [] while 1: t = tok.get() - if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF: + if t.is_eol_or_eof(): break - if t[0] != dns.tokenizer.IDENTIFIER: + if not t.is_identifier(): raise dns.exception.SyntaxError - chunks.append(t[1]) + chunks.append(t.value) digest = ''.join(chunks) digest = digest.decode('hex_codec') return cls(rdclass, rdtype, key_tag, algorithm, digest_type, diff --git a/dns/rdtypes/keybase.py b/dns/rdtypes/keybase.py index 763bd1e..52114f6 100644 --- a/dns/rdtypes/keybase.py +++ b/dns/rdtypes/keybase.py @@ -61,7 +61,7 @@ _protocol_from_text = { 'IPSEC' : 4, 'ALL' : 255, } - + class KEYBase(dns.rdata.Rdata): """KEY-like record base @@ -75,7 +75,7 @@ class KEYBase(dns.rdata.Rdata): @type key: string""" __slots__ = ['flags', 'protocol', 'algorithm', 'key'] - + def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key): super(KEYBase, self).__init__(rdclass, rdtype) self.flags = flags @@ -108,27 +108,27 @@ class KEYBase(dns.rdata.Rdata): if protocol is None: raise dns.exception.SyntaxError, \ 'unknown protocol %s' % protocol - + algorithm = dns.dnssec.algorithm_from_text(tok.get_string()) chunks = [] while 1: t = tok.get() - if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF: + if t.is_eol_or_eof(): break - if t[0] != dns.tokenizer.IDENTIFIER: + if not t.is_identifier(): raise dns.exception.SyntaxError - chunks.append(t[1]) + chunks.append(t.value) b64 = ''.join(chunks) key = b64.decode('base64_codec') return cls(rdclass, rdtype, flags, protocol, algorithm, key) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): header = struct.pack("!HBB", self.flags, self.protocol, self.algorithm) file.write(header) file.write(self.key) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): if rdlen < 4: raise dns.exception.FormError diff --git a/dns/rdtypes/sigbase.py b/dns/rdtypes/sigbase.py index b20e7da..e035bff 100644 --- a/dns/rdtypes/sigbase.py +++ b/dns/rdtypes/sigbase.py @@ -40,7 +40,7 @@ def sigtime_to_posixtime(what): def posixtime_to_sigtime(what): return time.strftime('%Y%m%d%H%M%S', time.gmtime(what)) - + class SIGBase(dns.rdata.Rdata): """SIG-like record base @@ -66,7 +66,7 @@ class SIGBase(dns.rdata.Rdata): __slots__ = ['type_covered', 'algorithm', 'labels', 'original_ttl', 'expiration', 'inception', 'key_tag', 'signer', 'signature'] - + def __init__(self, rdclass, rdtype, type_covered, algorithm, labels, original_ttl, expiration, inception, key_tag, signer, signature): @@ -83,7 +83,7 @@ class SIGBase(dns.rdata.Rdata): def covers(self): return self.type_covered - + def to_text(self, origin=None, relativize=True, **kw): return '%s %d %d %d %s %s %d %s %s' % ( dns.rdatatype.to_text(self.type_covered), @@ -110,17 +110,17 @@ class SIGBase(dns.rdata.Rdata): chunks = [] while 1: t = tok.get() - if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF: + if t.is_eol_or_eof(): break - if t[0] != dns.tokenizer.IDENTIFIER: + if not t.is_identifier(): raise dns.exception.SyntaxError - chunks.append(t[1]) + chunks.append(t.value) b64 = ''.join(chunks) signature = b64.decode('base64_codec') return cls(rdclass, rdtype, type_covered, algorithm, labels, original_ttl, expiration, inception, key_tag, signer, signature) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -131,7 +131,7 @@ class SIGBase(dns.rdata.Rdata): file.write(header) self.signer.to_wire(file, None, origin) file.write(self.signature) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): header = struct.unpack('!HBBIIIH', wire[current : current + 18]) current += 18 @@ -150,7 +150,7 @@ class SIGBase(dns.rdata.Rdata): def choose_relativity(self, origin = None, relativize = True): self.signer = self.signer.choose_relativity(origin, relativize) - + def _cmp(self, other): hs = struct.pack('!HBBIIIH', self.type_covered, self.algorithm, self.labels, diff --git a/dns/rdtypes/txtbase.py b/dns/rdtypes/txtbase.py index 400a0e5..b3dd086 100644 --- a/dns/rdtypes/txtbase.py +++ b/dns/rdtypes/txtbase.py @@ -27,7 +27,7 @@ class TXTBase(dns.rdata.Rdata): @see: RFC 1035""" __slots__ = ['strings'] - + def __init__(self, rdclass, rdtype, strings): super(TXTBase, self).__init__(rdclass, rdtype) if isinstance(strings, str): @@ -41,23 +41,22 @@ class TXTBase(dns.rdata.Rdata): txt += '%s"%s"' % (prefix, dns.rdata._escapify(s)) prefix = ' ' return txt - + def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): strings = [] while 1: - (ttype, s) = tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = tok.get() + if token.is_eol_or_eof(): break - if ttype != dns.tokenizer.QUOTED_STRING and \ - ttype != dns.tokenizer.IDENTIFIER: + if not (token.is_quoted_string() or token.is_identifier()): raise dns.exception.SyntaxError, "expected a string" - if len(s) > 255: + if len(token.value) > 255: raise dns.exception.SyntaxError, "string too long" - strings.append(s) + strings.append(token.value) if len(strings) == 0: raise dns.exception.UnexpectedEnd return cls(rdclass, rdtype, strings) - + from_text = classmethod(from_text) def to_wire(self, file, compress = None, origin = None): @@ -67,7 +66,7 @@ class TXTBase(dns.rdata.Rdata): byte = chr(l) file.write(byte) file.write(s) - + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None): strings = [] while rdlen > 0: diff --git a/dns/tokenizer.py b/dns/tokenizer.py index a2067d2..cc4566e 100644 --- a/dns/tokenizer.py +++ b/dns/tokenizer.py @@ -45,14 +45,76 @@ class UngetBufferFull(dns.exception.DNSException): """Raised when an attempt is made to unget a token when the unget buffer is full.""" pass - + +class Token(object): + """A DNS master file format token. + + @ivar ttype: The token type + @type ttype: int + @ivar value: The token value + @type value: string + """ + + def __init__(self, ttype, value='', has_escape=False): + """Initialize a token instance. + + @param ttype: The token type + @type ttype: int + @ivar value: The token value + @type value: string + @ivar has_escape: Does the token value contain escapes? + @type has_escape: bool + """ + self.ttype = ttype + self.value = value + self.has_escape = has_escape + + def is_eof(self): + return self.ttype == EOF + + def is_eol(self): + return self.ttype == EOL + + def is_whitespace(self): + return self.ttype == WHITESPACE + + def is_identifier(self): + return self.ttype == IDENTIFIER + + def is_quoted_string(self): + return self.ttype == QUOTED_STRING + + def is_comment(self): + return self.ttype == COMMENT + + def is_delimiter(self): + return self.ttype == DELIMITER + + def is_eol_or_eof(self): + return (self.ttype == EOL or self.ttype == EOF) + + def __eq__(self, other): + if not isinstance(other, Token): + return False + return (self.ttype == other.ttype and + self.value == other.value) + + def __ne__(self, other): + if not isinstance(other, Token): + return True + return (self.ttype != other.ttype or + self.value != other.value) + + def __str__(self): + return '%d "%s"' % (self.ttype, self.value) + class Tokenizer(object): """A DNS master file format tokenizer. A token is a (type, value) tuple, where I{type} is an int, and I{value} is a string. The valid types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING, COMMENT, and DELIMITER. - + @ivar file: The file to tokenize @type file: file @ivar ungotten_char: The most recently ungotten character, or None. @@ -75,7 +137,7 @@ class Tokenizer(object): @ivar filename: A filename that will be returned by the L{where} method. @type filename: string """ - + def __init__(self, f=sys.stdin, filename=None): """Initialize a tokenizer instance. @@ -87,7 +149,7 @@ class Tokenizer(object): will return. @type filename: string """ - + if isinstance(f, str): f = cStringIO.StringIO(f) if filename is None: @@ -112,7 +174,7 @@ class Tokenizer(object): """Read a character from input. @rtype: string """ - + if self.ungotten_char is None: if self.eof: c = '' @@ -133,21 +195,21 @@ class Tokenizer(object): @rtype: (string, int) tuple. The first item is the filename of the input, the second is the current line number. """ - + return (self.filename, self.line_number) - + def _unget_char(self, c): """Unget a character. The unget buffer for characters is only one character large; it is an error to try to unget a character when the unget buffer is not empty. - + @param c: the character to unget @type c: string @raises UngetBufferFull: there is already an ungotten char """ - + if not self.ungotten_char is None: raise UngetBufferFull self.ungotten_char = c @@ -162,7 +224,7 @@ class Tokenizer(object): @rtype: int """ - + skipped = 0 while True: c = self._get_char() @@ -181,25 +243,25 @@ class Tokenizer(object): @param want_comment: If True, return a COMMENT token if the first token read is a comment. The default is False. @type want_comment: bool - @rtype: (int, string) tuple + @rtype: Token object @raises dns.exception.UnexpectedEnd: input ended prematurely @raises dns.exception.SyntaxError: input was badly formed """ - + if not self.ungotten_token is None: token = self.ungotten_token self.ungotten_token = None - if token[0] == WHITESPACE: + if token.is_whitespace(): if want_leading: return token - elif token[0] == COMMENT: + elif token.is_comment(): if want_comment: return token else: return token skipped = self.skip_whitespace() if want_leading and skipped > 0: - return (WHITESPACE, ' ') + return Token(WHITESPACE, ' ') token = '' ttype = IDENTIFIER while True: @@ -230,7 +292,7 @@ class Tokenizer(object): self.skip_whitespace() continue elif c == '\n': - return (EOL, '\n') + return Token(EOL, '\n') elif c == ';': while 1: c = self._get_char() @@ -239,18 +301,18 @@ class Tokenizer(object): token += c if want_comment: self._unget_char(c) - return (COMMENT, token) + return Token(COMMENT, token) elif c == '': if self.multiline: raise dns.exception.SyntaxError, \ 'unbalanced parentheses' - return (EOF, '') + return Token(EOF) elif self.multiline: self.skip_whitespace() token = '' continue else: - return (EOL, '\n') + return Token(EOL, '\n') else: # This code exists in case we ever want a # delimiter to be returned. It never produces @@ -279,7 +341,7 @@ class Tokenizer(object): raise dns.exception.SyntaxError, 'newline in quoted string' elif c == '\\': # - # Treat \ followed by a delimiter as the + # Treat \ followed by a delimiter as the # delimiter, otherwise leave it alone. # c = self._get_char() @@ -291,7 +353,7 @@ class Tokenizer(object): if self.multiline: raise dns.exception.SyntaxError, 'unbalanced parentheses' ttype = EOF - return (ttype, token) + return Token(ttype, token) def unget(self, token): """Unget a token. @@ -299,9 +361,9 @@ class Tokenizer(object): The unget buffer for tokens is only one token large; it is an error to try to unget a token when the unget buffer is not empty. - + @param token: the token to unget - @type token: (int, string) token tuple + @type token: Token object @raises UngetBufferFull: there is already an ungotten token """ @@ -313,9 +375,9 @@ class Tokenizer(object): """Return the next item in an iteration. @rtype: (int, string) """ - + token = self.get() - if token[0] == EOF: + if token.is_eof(): raise StopIteration return token @@ -326,26 +388,26 @@ class Tokenizer(object): def get_int(self): """Read the next token and interpret it as an integer. - + @raises dns.exception.SyntaxError: @rtype: int """ - - (ttype, value) = self.get() - if ttype != IDENTIFIER: + + token = self.get() + if not token.is_identifier(): raise dns.exception.SyntaxError, 'expecting an identifier' - if not value.isdigit(): + if not token.value.isdigit(): raise dns.exception.SyntaxError, 'expecting an integer' - return int(value) + return int(token.value) def get_uint8(self): """Read the next token and interpret it as an 8-bit unsigned integer. - + @raises dns.exception.SyntaxError: @rtype: int """ - + value = self.get_int() if value < 0 or value > 255: raise dns.exception.SyntaxError, \ @@ -355,11 +417,11 @@ class Tokenizer(object): def get_uint16(self): """Read the next token and interpret it as a 16-bit unsigned integer. - + @raises dns.exception.SyntaxError: @rtype: int """ - + value = self.get_int() if value < 0 or value > 65535: raise dns.exception.SyntaxError, \ @@ -369,17 +431,17 @@ class Tokenizer(object): def get_uint32(self): """Read the next token and interpret it as a 32-bit unsigned integer. - + @raises dns.exception.SyntaxError: @rtype: int """ - - (ttype, value) = self.get() - if ttype != IDENTIFIER: + + token = self.get() + if not token.is_identifier(): raise dns.exception.SyntaxError, 'expecting an identifier' - if not value.isdigit(): + if not token.value.isdigit(): raise dns.exception.SyntaxError, 'expecting an integer' - value = long(value) + value = long(token.value) if value < 0 or value > 4294967296L: raise dns.exception.SyntaxError, \ '%d is not an unsigned 32-bit integer' % value @@ -387,26 +449,38 @@ class Tokenizer(object): def get_string(self, origin=None): """Read the next token and interpret it as a string. - + @raises dns.exception.SyntaxError: @rtype: string """ - - (ttype, t) = self.get() - if ttype != IDENTIFIER and ttype != QUOTED_STRING: + + token = self.get() + if not (token.is_identifier() or token.is_quoted_string()): raise dns.exception.SyntaxError, 'expecting a string' - return t + return token.value + + def get_identifier(self, origin=None): + """Read the next token and raise an exception if it is not an identifier. + + @raises dns.exception.SyntaxError: + @rtype: string + """ + + token = self.get() + if not token.is_identifier(): + raise dns.exception.SyntaxError, 'expecting an identifier' + return token.value def get_name(self, origin=None): """Read the next token and interpret it as a DNS name. - + @raises dns.exception.SyntaxError: @rtype: dns.name.Name object""" - - (ttype, t) = self.get() - if ttype != IDENTIFIER: + + token = self.get() + if not token.is_identifier(): raise dns.exception.SyntaxError, 'expecting an identifier' - return dns.name.from_text(t, origin) + return dns.name.from_text(token.value, origin) def get_eol(self): """Read the next token and raise an exception if it isn't EOL or @@ -415,15 +489,15 @@ class Tokenizer(object): @raises dns.exception.SyntaxError: @rtype: string """ - - (ttype, t) = self.get() - if ttype != EOL and ttype != EOF: + + token = self.get() + if not token.is_eol_or_eof(): raise dns.exception.SyntaxError, \ - 'expected EOL or EOF, got %d "%s"' % (ttype, t) - return t + 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value) + return token.value def get_ttl(self): - (ttype, t) = self.get() - if ttype != IDENTIFIER: + token = self.get() + if not token.is_identifier(): raise dns.exception.SyntaxError, 'expecting an identifier' - return dns.ttl.from_text(t) + return dns.ttl.from_text(token.value) diff --git a/dns/zone.py b/dns/zone.py index a40678f..6e1231f 100644 --- a/dns/zone.py +++ b/dns/zone.py @@ -54,7 +54,7 @@ class Zone(object): dns.name.Name object, or it may be a string. In the either case, if the name is relative it is treated as relative to the origin of the zone. - + @ivar rdclass: The zone's rdata class; the default is class IN. @type rdclass: int @ivar origin: The origin of the zone. @@ -71,7 +71,7 @@ class Zone(object): node_factory = dns.node.Node __slots__ = ['rdclass', 'origin', 'nodes', 'relativize'] - + def __init__(self, origin, rdclass=dns.rdataclass.IN, relativize=True): """Initialize a zone object. @@ -90,7 +90,7 @@ class Zone(object): nodes. @rtype: bool """ - + if not isinstance(other, Zone): return False if self.rdclass != other.rdclass or \ @@ -103,7 +103,7 @@ class Zone(object): """Are two zones not equal? @rtype: bool """ - + return not self.__eq__(other) def _validate_name(self, name): @@ -119,7 +119,7 @@ class Zone(object): if self.relativize: name = name.relativize(self.origin) return name - + def __getitem__(self, key): key = self._validate_name(key) return self.nodes[key] @@ -170,7 +170,7 @@ class Zone(object): @raises KeyError: the name is not known and create was not specified. @rtype: dns.node.Node object """ - + name = self._validate_name(name) node = self.nodes.get(name) if node is None: @@ -186,7 +186,7 @@ class Zone(object): This method is like L{find_node}, except it returns None instead of raising an exception if the node does not exist and creation has not been requested. - + @param name: the name of the node to find @type name: dns.name.Name object or string @param create: should the node be created if it doesn't exist? @@ -205,11 +205,11 @@ class Zone(object): It is not an error if the node does not exist. """ - + name = self._validate_name(name) if self.nodes.has_key(name): del self.nodes[name] - + def find_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE, create=False): """Look for rdata with the specified name and type in the zone, @@ -221,7 +221,7 @@ class Zone(object): The rdataset returned is not a copy; changes to it will change the zone. - + KeyError is raised if the name or type are not found. Use L{get_rdataset} if you want to have None returned instead. @@ -257,7 +257,7 @@ class Zone(object): The rdataset returned is not a copy; changes to it will change the zone. - + None is returned if the name or type are not found. Use L{find_rdataset} if you want to have KeyError raised instead. @@ -314,7 +314,7 @@ class Zone(object): def replace_rdataset(self, name, replacement): """Replace an rdataset at name. - + It is not an error if there is no rdataset matching I{replacement}. Ownership of the I{replacement} object is transferred to the zone; @@ -341,7 +341,7 @@ class Zone(object): The I{name}, I{rdtype}, and I{covers} parameters may be strings, in which case they will be converted to their proper type. - + This method is less efficient than the similar L{find_rdataset} because it creates an RRset instead of returning the matching rdataset. It may be more convenient @@ -381,7 +381,7 @@ class Zone(object): The I{name}, I{rdtype}, and I{covers} parameters may be strings, in which case they will be converted to their proper type. - + This method is less efficient than the similar L{get_rdataset} because it creates an RRset instead of returning the matching rdataset. It may be more convenient for some uses since it @@ -420,7 +420,7 @@ class Zone(object): @param covers: the covered type (defaults to None) @type covers: int or string """ - + if isinstance(rdtype, str): rdtype = dns.rdatatype.from_text(rdtype) if isinstance(covers, str): @@ -443,7 +443,7 @@ class Zone(object): @param covers: the covered type (defaults to None) @type covers: int or string """ - + if isinstance(rdtype, str): rdtype = dns.rdatatype.from_text(rdtype) if isinstance(covers, str): @@ -457,7 +457,7 @@ class Zone(object): def to_file(self, f, sorted=True, relativize=True, nl=None): """Write a zone to a file. - + @param f: file or string. If I{f} is a string, it is treated as the name of a file to open. @param sorted: if True, the file will be written with the @@ -566,22 +566,21 @@ class _MasterReader(object): def _eat_line(self): while 1: - (ttype, t) = self.tok.get() - if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF: + token = self.tok.get() + if token.is_eol_or_eof(): break - + def _rr_line(self): """Process one line from a DNS master file.""" # Name if self.current_origin is None: raise UnknownOrigin token = self.tok.get(want_leading = True) - if token[0] != dns.tokenizer.WHITESPACE: - self.last_name = dns.name.from_text(token[1], self.current_origin) + if not token.is_whitespace(): + self.last_name = dns.name.from_text(token.value, self.current_origin) else: token = self.tok.get() - if token[0] == dns.tokenizer.EOL or \ - token[0] == dns.tokenizer.EOF: + if token.is_eol_or_eof(): # treat leading WS followed by EOL/EOF as if they were EOL/EOF. return self.tok.unget(token) @@ -592,21 +591,21 @@ class _MasterReader(object): if self.relativize: name = name.relativize(self.zone.origin) token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError # TTL try: - ttl = dns.ttl.from_text(token[1]) + ttl = dns.ttl.from_text(token.value) token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError except dns.ttl.BadTTL: ttl = self.ttl # Class try: - rdclass = dns.rdataclass.from_text(token[1]) + rdclass = dns.rdataclass.from_text(token.value) token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError except dns.exception.SyntaxError: raise dns.exception.SyntaxError @@ -616,10 +615,10 @@ class _MasterReader(object): raise dns.exception.SyntaxError, "RR class is not zone's class" # Type try: - rdtype = dns.rdatatype.from_text(token[1]) + rdtype = dns.rdatatype.from_text(token.value) except: raise dns.exception.SyntaxError, \ - "unknown rdatatype '%s'" % token[1] + "unknown rdatatype '%s'" % token.value n = self.zone.nodes.get(name) if n is None: n = self.zone.node_factory() @@ -637,7 +636,6 @@ class _MasterReader(object): # correct, but it is correct almost all of the time. # We convert them to syntax errors so that we can emit # helpful filename:line info. - (ty, va) = sys.exc_info()[:2] raise dns.exception.SyntaxError, \ "caught exception %s: %s" % (str(ty), str(va)) @@ -657,7 +655,7 @@ class _MasterReader(object): try: while 1: token = self.tok.get(True, True) - if token[0] == dns.tokenizer.EOF: + if token.is_eof(): if not self.current_file is None: self.current_file.close() if len(self.saved_state) > 0: @@ -668,18 +666,18 @@ class _MasterReader(object): self.ttl) = self.saved_state.pop(-1) continue break - elif token[0] == dns.tokenizer.EOL: + elif token.is_eol(): continue - elif token[0] == dns.tokenizer.COMMENT: + elif token.is_comment(): self.tok.get_eol() continue - elif token[1][0] == '$': - u = token[1].upper() + elif token.value[0] == '$': + u = token.value.upper() if u == '$TTL': token = self.tok.get() - if token[0] != dns.tokenizer.IDENTIFIER: + if not token.is_identifier(): raise dns.exception.SyntaxError, "bad $TTL" - self.ttl = dns.ttl.from_text(token[1]) + self.ttl = dns.ttl.from_text(token.value) self.tok.get_eol() elif u == '$ORIGIN': self.current_origin = self.tok.get_name() @@ -688,17 +686,16 @@ class _MasterReader(object): self.zone.origin = self.current_origin elif u == '$INCLUDE' and self.allow_include: token = self.tok.get() - if token[0] != dns.tokenizer.QUOTED_STRING: + if not token.is_quoted_string(): raise dns.exception.SyntaxError, \ "bad filename in $INCLUDE" - filename = token[1] + filename = token.value token = self.tok.get() - if token[0] == dns.tokenizer.IDENTIFIER: - new_origin = dns.name.from_text(token[1], \ - self.current_origin) + if token.is_identifier(): + new_origin = dns.name.from_text(token.value, \ + self.current_origin) self.tok.get_eol() - elif token[0] != dns.tokenizer.EOL and \ - token[0] != dns.tokenizer.EOF: + elif not token.is_eol_or_eof(): raise dns.exception.SyntaxError, \ "bad origin in $INCLUDE" else: @@ -724,7 +721,7 @@ class _MasterReader(object): detail = "syntax error" raise dns.exception.SyntaxError, \ "%s:%d: %s" % (filename, line_number, detail) - + # Now that we're done reading, do some basic checking of the zone. if self.check_origin: self.zone.check_origin() @@ -819,7 +816,7 @@ def from_file(f, origin = None, rdclass = dns.rdataclass.IN, if filename is None: filename = '<file>' want_close = False - + try: z = from_text(f, origin, rdclass, relativize, zone_factory, filename, allow_include, check_origin) @@ -830,7 +827,7 @@ def from_file(f, origin = None, rdclass = dns.rdataclass.IN, def from_xfr(xfr, zone_factory=Zone, relativize=True): """Convert the output of a zone transfer generator into a zone object. - + @param xfr: The xfr generator @type xfr: generator of dns.message.Message objects @param relativize: should names be relativized? The default is True. @@ -841,7 +838,7 @@ def from_xfr(xfr, zone_factory=Zone, relativize=True): @raises dns.zone.NoNS: No NS RRset was found at the zone origin @rtype: dns.zone.Zone object """ - + z = None for r in xfr: if z is None: diff --git a/tests/tokenizer.py b/tests/tokenizer.py index 7ae981b..c5d41ac 100644 --- a/tests/tokenizer.py +++ b/tests/tokenizer.py @@ -18,112 +18,106 @@ import unittest import dns.exception import dns.tokenizer +Token = dns.tokenizer.Token + class TokenizerTestCase(unittest.TestCase): - + def testQuotedString1(self): tok = dns.tokenizer.Tokenizer(r'"foo"') - (ttype, value) = tok.get() - self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and - value == 'foo') + token = tok.get() + self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, 'foo')) def testQuotedString2(self): tok = dns.tokenizer.Tokenizer(r'""') - (ttype, value) = tok.get() - self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and - value == '') + token = tok.get() + self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, '')) def testQuotedString3(self): tok = dns.tokenizer.Tokenizer(r'"\"foo\""') - (ttype, value) = tok.get() - self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and - value == '"foo"') + token = tok.get() + self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, '"foo"')) def testQuotedString4(self): tok = dns.tokenizer.Tokenizer(r'"foo\010bar"') - (ttype, value) = tok.get() - self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and - value == 'foo\x0abar') + token = tok.get() + self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, 'foo\x0abar')) def testQuotedString5(self): def bad(): tok = dns.tokenizer.Tokenizer(r'"foo') - (ttype, value) = tok.get() + token = tok.get() self.failUnlessRaises(dns.exception.UnexpectedEnd, bad) def testQuotedString6(self): def bad(): tok = dns.tokenizer.Tokenizer(r'"foo\01') - (ttype, value) = tok.get() + token = tok.get() self.failUnlessRaises(dns.exception.SyntaxError, bad) def testQuotedString7(self): def bad(): tok = dns.tokenizer.Tokenizer('"foo\nbar"') - (ttype, value) = tok.get() + token = tok.get() self.failUnlessRaises(dns.exception.SyntaxError, bad) def testEmpty1(self): tok = dns.tokenizer.Tokenizer('') - (ttype, value) = tok.get() - self.failUnless(ttype == dns.tokenizer.EOF) + token = tok.get() + self.failUnless(token.is_eof()) def testEmpty2(self): tok = dns.tokenizer.Tokenizer('') - (ttype1, value1) = tok.get() - (ttype2, value2) = tok.get() - self.failUnless(ttype1 == dns.tokenizer.EOF and - ttype2 == dns.tokenizer.EOF) + token1 = tok.get() + token2 = tok.get() + self.failUnless(token1.is_eof() and token2.is_eof()) def testEOL(self): tok = dns.tokenizer.Tokenizer('\n') - (ttype1, value1) = tok.get() - (ttype2, value2) = tok.get() - self.failUnless(ttype1 == dns.tokenizer.EOL and - ttype2 == dns.tokenizer.EOF) + token1 = tok.get() + token2 = tok.get() + self.failUnless(token1.is_eol() and token2.is_eof()) def testWS1(self): tok = dns.tokenizer.Tokenizer(' \n') - (ttype1, value1) = tok.get() - self.failUnless(ttype1 == dns.tokenizer.EOL) + token1 = tok.get() + self.failUnless(token1.is_eol()) def testWS2(self): tok = dns.tokenizer.Tokenizer(' \n') - (ttype1, value1) = tok.get(want_leading=True) - self.failUnless(ttype1 == dns.tokenizer.WHITESPACE) + token1 = tok.get(want_leading=True) + self.failUnless(token1.is_whitespace()) def testComment1(self): tok = dns.tokenizer.Tokenizer(' ;foo\n') - (ttype1, value1) = tok.get() - self.failUnless(ttype1 == dns.tokenizer.EOL) + token1 = tok.get() + self.failUnless(token1.is_eol()) def testComment2(self): tok = dns.tokenizer.Tokenizer(' ;foo\n') - (ttype1, value1) = tok.get(want_comment = True) - (ttype2, value2) = tok.get() - self.failUnless(ttype1 == dns.tokenizer.COMMENT and - value1 == 'foo' and - ttype2 == dns.tokenizer.EOL) + token1 = tok.get(want_comment = True) + token2 = tok.get() + self.failUnless(token1 == Token(dns.tokenizer.COMMENT, 'foo') and + token2.is_eol()) def testComment3(self): tok = dns.tokenizer.Tokenizer(' ;foo bar\n') - (ttype1, value1) = tok.get(want_comment = True) - (ttype2, value2) = tok.get() - self.failUnless(ttype1 == dns.tokenizer.COMMENT and - value1 == 'foo bar' and - ttype2 == dns.tokenizer.EOL) + token1 = tok.get(want_comment = True) + token2 = tok.get() + self.failUnless(token1 == Token(dns.tokenizer.COMMENT, 'foo bar') and + token2.is_eol()) def testMultiline1(self): tok = dns.tokenizer.Tokenizer('( foo\n\n bar\n)') tokens = list(iter(tok)) - self.failUnless(tokens == [(dns.tokenizer.IDENTIFIER, 'foo'), - (dns.tokenizer.IDENTIFIER, 'bar')]) + self.failUnless(tokens == [Token(dns.tokenizer.IDENTIFIER, 'foo'), + Token(dns.tokenizer.IDENTIFIER, 'bar')]) def testMultiline2(self): tok = dns.tokenizer.Tokenizer('( foo\n\n bar\n)\n') tokens = list(iter(tok)) - self.failUnless(tokens == [(dns.tokenizer.IDENTIFIER, 'foo'), - (dns.tokenizer.IDENTIFIER, 'bar'), - (dns.tokenizer.EOL, '\n')]) + self.failUnless(tokens == [Token(dns.tokenizer.IDENTIFIER, 'foo'), + Token(dns.tokenizer.IDENTIFIER, 'bar'), + Token(dns.tokenizer.EOL, '\n')]) def testMultiline3(self): def bad(): tok = dns.tokenizer.Tokenizer('foo)') @@ -141,7 +135,8 @@ class TokenizerTestCase(unittest.TestCase): t1 = tok.get() tok.unget(t1) t2 = tok.get() - self.failUnless(t1 == t2 and t1 == (dns.tokenizer.IDENTIFIER, 'foo')) + self.failUnless(t1 == t2 and t1.ttype == dns.tokenizer.IDENTIFIER and \ + t1.value == 'foo') def testUnget2(self): def bad(): @@ -164,12 +159,12 @@ class TokenizerTestCase(unittest.TestCase): def testEscapedDelimiter1(self): tok = dns.tokenizer.Tokenizer(r'ch\ ld') t = tok.get() - self.failUnless(t == (dns.tokenizer.IDENTIFIER, r'ch ld')) + self.failUnless(t.ttype == dns.tokenizer.IDENTIFIER and t.value == r'ch ld') def testEscapedDelimiter2(self): tok = dns.tokenizer.Tokenizer(r'ch\0ld') t = tok.get() - self.failUnless(t == (dns.tokenizer.IDENTIFIER, r'ch\0ld')) + self.failUnless(t.ttype == dns.tokenizer.IDENTIFIER and t.value == r'ch\0ld') if __name__ == '__main__': unittest.main() |