summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@nominum.com>2010-01-12 13:31:58 -0800
committerBob Halley <halley@nominum.com>2010-01-12 13:31:58 -0800
commit45f795046415a44d41b203e2ba10b72a2abcf610 (patch)
tree8a0c5145bdfda87b2cc739bdaeddb450a616fbaf
parent7f397a5a53b65a5c434c6686318c501f8548cc18 (diff)
downloaddnspython-45f795046415a44d41b203e2ba10b72a2abcf610.tar.gz
make a proper token object to facilitate future tokenization work
-rw-r--r--dns/message.py47
-rw-r--r--dns/rdata.py10
-rw-r--r--dns/rdtypes/ANY/CERT.py16
-rw-r--r--dns/rdtypes/ANY/HIP.py6
-rw-r--r--dns/rdtypes/ANY/ISDN.py10
-rw-r--r--dns/rdtypes/ANY/LOC.py39
-rw-r--r--dns/rdtypes/ANY/NSEC.py16
-rw-r--r--dns/rdtypes/ANY/NSEC3.py6
-rw-r--r--dns/rdtypes/ANY/NXT.py10
-rw-r--r--dns/rdtypes/IN/A.py14
-rw-r--r--dns/rdtypes/IN/AAAA.py14
-rw-r--r--dns/rdtypes/IN/APL.py19
-rw-r--r--dns/rdtypes/IN/DHCID.py14
-rw-r--r--dns/rdtypes/IN/IPSECKEY.py6
-rw-r--r--dns/rdtypes/IN/WKS.py18
-rw-r--r--dns/rdtypes/dsbase.py6
-rw-r--r--dns/rdtypes/keybase.py16
-rw-r--r--dns/rdtypes/sigbase.py18
-rw-r--r--dns/rdtypes/txtbase.py19
-rw-r--r--dns/tokenizer.py192
-rw-r--r--dns/zone.py97
-rw-r--r--tests/tokenizer.py95
22 files changed, 375 insertions, 313 deletions
diff --git a/dns/message.py b/dns/message.py
index 20769d4..af6bb46 100644
--- a/dns/message.py
+++ b/dns/message.py
@@ -803,17 +803,18 @@ class _TextReader(object):
def _header_line(self, section):
"""Process one line from the text format header section."""
- (ttype, what) = self.tok.get()
+ token = self.tok.get()
+ what = token.value
if what == 'id':
self.message.id = self.tok.get_int()
elif what == 'flags':
while True:
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
self.tok.unget(token)
break
self.message.flags = self.message.flags | \
- dns.flags.from_text(token[1])
+ dns.flags.from_text(token.value)
if dns.opcode.is_update(self.message.flags):
self.updating = True
elif what == 'edns':
@@ -825,11 +826,11 @@ class _TextReader(object):
self.message.edns = 0
while True:
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
self.tok.unget(token)
break
self.message.ednsflags = self.message.ednsflags | \
- dns.flags.edns_from_text(token[1])
+ dns.flags.edns_from_text(token.value)
elif what == 'payload':
self.message.payload = self.tok.get_int()
if self.message.edns < 0:
@@ -849,24 +850,24 @@ class _TextReader(object):
"""Process one line from the text format question section."""
token = self.tok.get(want_leading = True)
- if token[0] != dns.tokenizer.WHITESPACE:
- self.last_name = dns.name.from_text(token[1], None)
+ if not token.is_whitespace():
+ self.last_name = dns.name.from_text(token.value, None)
name = self.last_name
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
# Class
try:
- rdclass = dns.rdataclass.from_text(token[1])
+ rdclass = dns.rdataclass.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.exception.SyntaxError:
raise dns.exception.SyntaxError
except:
rdclass = dns.rdataclass.IN
# Type
- rdtype = dns.rdatatype.from_text(token[1])
+ rdtype = dns.rdatatype.from_text(token.value)
self.message.find_rrset(self.message.question, name,
rdclass, rdtype, create=True,
force_unique=True)
@@ -882,17 +883,17 @@ class _TextReader(object):
deleting = None
# Name
token = self.tok.get(want_leading = True)
- if token[0] != dns.tokenizer.WHITESPACE:
- self.last_name = dns.name.from_text(token[1], None)
+ if not token.is_whitespace():
+ self.last_name = dns.name.from_text(token.value, None)
name = self.last_name
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
# TTL
try:
- ttl = int(token[1], 0)
+ ttl = int(token.value, 0)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.exception.SyntaxError:
raise dns.exception.SyntaxError
@@ -900,9 +901,9 @@ class _TextReader(object):
ttl = 0
# Class
try:
- rdclass = dns.rdataclass.from_text(token[1])
+ rdclass = dns.rdataclass.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
if rdclass == dns.rdataclass.ANY or rdclass == dns.rdataclass.NONE:
deleting = rdclass
@@ -912,9 +913,9 @@ class _TextReader(object):
except:
rdclass = dns.rdataclass.IN
# Type
- rdtype = dns.rdatatype.from_text(token[1])
+ rdtype = dns.rdatatype.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.EOL and token[0] != dns.tokenizer.EOF:
+ if not token.is_eol_or_eof():
self.tok.unget(token)
rd = dns.rdata.from_text(rdclass, rdtype, self.tok, None)
covers = rd.covers()
@@ -935,10 +936,10 @@ class _TextReader(object):
section = None
while 1:
token = self.tok.get(True, True)
- if token[0] == dns.tokenizer.EOL or token[0] == dns.tokenizer.EOF:
+ if token.is_eol_or_eof():
break
- if token[0] == dns.tokenizer.COMMENT:
- u = token[1].upper()
+ if token.is_comment():
+ u = token.value.upper()
if u == 'HEADER':
line_method = self._header_line
elif u == 'QUESTION' or u == 'ZONE':
diff --git a/dns/rdata.py b/dns/rdata.py
index f371fa7..3d1fd99 100644
--- a/dns/rdata.py
+++ b/dns/rdata.py
@@ -325,10 +325,10 @@ class GenericRdata(Rdata):
length = tok.get_int()
chunks = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- chunks.append(value)
+ chunks.append(token.value)
hex = ''.join(chunks)
data = hex.decode('hex_codec')
if len(data) != length:
@@ -415,8 +415,8 @@ def from_text(rdclass, rdtype, tok, origin = None, relativize = True):
# peek at first token
token = tok.get()
tok.unget(token)
- if token[0] == dns.tokenizer.IDENTIFIER and \
- token[1] == r'\#':
+ if token.is_identifier and \
+ token.value == r'\#':
#
# Known type using the generic syntax. Extract the
# wire form from the generic syntax, and then run
diff --git a/dns/rdtypes/ANY/CERT.py b/dns/rdtypes/ANY/CERT.py
index f76e7ac..225a921 100644
--- a/dns/rdtypes/ANY/CERT.py
+++ b/dns/rdtypes/ANY/CERT.py
@@ -63,7 +63,7 @@ class CERT(dns.rdata.Rdata):
@see: RFC 2538"""
__slots__ = ['certificate_type', 'key_tag', 'algorithm', 'certificate']
-
+
def __init__(self, rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate):
super(CERT, self).__init__(rdclass, rdtype)
@@ -77,7 +77,7 @@ class CERT(dns.rdata.Rdata):
return "%s %d %s %s" % (certificate_type, self.key_tag,
dns.dnssec.algorithm_to_text(self.algorithm),
dns.rdata._base64ify(self.certificate))
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
certificate_type = _ctype_from_text(tok.get_string())
key_tag = tok.get_uint16()
@@ -87,16 +87,16 @@ class CERT(dns.rdata.Rdata):
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
certificate = b64.decode('base64_codec')
return cls(rdclass, rdtype, certificate_type, key_tag,
algorithm, certificate)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -104,7 +104,7 @@ class CERT(dns.rdata.Rdata):
self.algorithm)
file.write(prefix)
file.write(self.certificate)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
prefix = wire[current : current + 5]
current += 5
@@ -127,5 +127,5 @@ class CERT(dns.rdata.Rdata):
other.to_wire(f)
wire2 = f.getvalue()
f.close()
-
+
return cmp(wire1, wire2)
diff --git a/dns/rdtypes/ANY/HIP.py b/dns/rdtypes/ANY/HIP.py
index 011ec43..53a2c6c 100644
--- a/dns/rdtypes/ANY/HIP.py
+++ b/dns/rdtypes/ANY/HIP.py
@@ -62,10 +62,10 @@ class HIP(dns.rdata.Rdata):
key = tok.get_string().decode('base64-codec')
servers = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- server = dns.name.from_text(value, origin)
+ server = dns.name.from_text(token.value, origin)
server.choose_relativity(origin, relativize)
servers.append(server)
return cls(rdclass, rdtype, hit, algorithm, key, servers)
diff --git a/dns/rdtypes/ANY/ISDN.py b/dns/rdtypes/ANY/ISDN.py
index 6006ba0..f33b5ad 100644
--- a/dns/rdtypes/ANY/ISDN.py
+++ b/dns/rdtypes/ANY/ISDN.py
@@ -27,7 +27,7 @@ class ISDN(dns.rdata.Rdata):
@see: RFC 1183"""
__slots__ = ['address', 'subaddress']
-
+
def __init__(self, rdclass, rdtype, address, subaddress):
super(ISDN, self).__init__(rdclass, rdtype)
self.address = address
@@ -39,11 +39,11 @@ class ISDN(dns.rdata.Rdata):
dns.rdata._escapify(self.subaddress))
else:
return '"%s"' % dns.rdata._escapify(self.address)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
t = tok.get()
- if t[0] != dns.tokenizer.EOL and t[0] != dns.tokenizer.EOF:
+ if not t.is_eol_or_eof():
tok.unget(t)
subaddress = tok.get_string()
else:
@@ -51,7 +51,7 @@ class ISDN(dns.rdata.Rdata):
subaddress = ''
tok.get_eol()
return cls(rdclass, rdtype, address, subaddress)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -66,7 +66,7 @@ class ISDN(dns.rdata.Rdata):
byte = chr(l)
file.write(byte)
file.write(self.subaddress)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
l = ord(wire[current])
current += 1
diff --git a/dns/rdtypes/ANY/LOC.py b/dns/rdtypes/ANY/LOC.py
index 733e966..7fe1a80 100644
--- a/dns/rdtypes/ANY/LOC.py
+++ b/dns/rdtypes/ANY/LOC.py
@@ -96,7 +96,7 @@ class LOC(dns.rdata.Rdata):
__slots__ = ['latitude', 'longitude', 'altitude', 'size',
'horizontal_precision', 'vertical_precision']
-
+
def __init__(self, rdclass, rdtype, latitude, longitude, altitude,
size=1.0, hprec=10000.0, vprec=10.0):
"""Initialize a LOC record instance.
@@ -105,7 +105,7 @@ class LOC(dns.rdata.Rdata):
of integers specifying (degrees, minutes, seconds, milliseconds),
or they may be floating point values specifying the number of
degrees. The other parameters are floats."""
-
+
super(LOC, self).__init__(rdclass, rdtype)
if isinstance(latitude, int) or isinstance(latitude, long):
latitude = float(latitude)
@@ -148,14 +148,14 @@ class LOC(dns.rdata.Rdata):
self.vertical_precision / 100.0
)
return text
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
latitude = [0, 0, 0, 0]
longitude = [0, 0, 0, 0]
size = 1.0
hprec = 10000.0
vprec = 10.0
-
+
latitude[0] = tok.get_int()
t = tok.get_string()
if t.isdigit():
@@ -229,30 +229,29 @@ class LOC(dns.rdata.Rdata):
t = t[0 : -1]
altitude = float(t) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF:
+ token = tok.get()
+ if not token.is_eol_or_eof():
+ value = token.value
if value[-1] == 'm':
value = value[0 : -1]
size = float(value) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF:
+ token = tok.get()
+ if not token.is_eol_or_eof():
+ value = token.value
if value[-1] == 'm':
value = value[0 : -1]
hprec = float(value) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF:
+ token = tok.get()
+ if not token.is_eol_or_eof():
+ value = token.value
if value[-1] == 'm':
value = value[0 : -1]
vprec = float(value) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and \
- ttype != dns.tokenizer.EOF:
- raise dns.exception.SyntaxError, \
- "expected EOL or EOF"
+ tok.get_eol()
return cls(rdclass, rdtype, latitude, longitude, altitude,
size, hprec, vprec)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -262,7 +261,7 @@ class LOC(dns.rdata.Rdata):
else:
sign = 1
degrees = long(self.latitude[0])
- milliseconds = (degrees * 3600000 +
+ milliseconds = (degrees * 3600000 +
self.latitude[1] * 60000 +
self.latitude[2] * 1000 +
self.latitude[3]) * sign
@@ -273,7 +272,7 @@ class LOC(dns.rdata.Rdata):
else:
sign = 1
degrees = long(self.longitude[0])
- milliseconds = (degrees * 3600000 +
+ milliseconds = (degrees * 3600000 +
self.longitude[1] * 60000 +
self.longitude[2] * 1000 +
self.longitude[3]) * sign
@@ -285,7 +284,7 @@ class LOC(dns.rdata.Rdata):
wire = struct.pack("!BBBBIII", 0, size, hprec, vprec, latitude,
longitude, altitude)
file.write(wire)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(version, size, hprec, vprec, latitude, longitude, altitude) = \
struct.unpack("!BBBBIII", wire[current : current + rdlen])
@@ -319,7 +318,7 @@ class LOC(dns.rdata.Rdata):
other.to_wire(f)
wire2 = f.getvalue()
f.close()
-
+
return cmp(wire1, wire2)
def _get_float_latitude(self):
diff --git a/dns/rdtypes/ANY/NSEC.py b/dns/rdtypes/ANY/NSEC.py
index 2d8b367..ee9a009 100644
--- a/dns/rdtypes/ANY/NSEC.py
+++ b/dns/rdtypes/ANY/NSEC.py
@@ -29,7 +29,7 @@ class NSEC(dns.rdata.Rdata):
@type windows: list of (window number, string) tuples"""
__slots__ = ['next', 'windows']
-
+
def __init__(self, rdclass, rdtype, next, windows):
super(NSEC, self).__init__(rdclass, rdtype)
self.next = next
@@ -48,16 +48,16 @@ class NSEC(dns.rdata.Rdata):
i * 8 + j))
text += (' ' + ' '.join(bits))
return '%s%s' % (next, text)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
next = tok.get_name()
next = next.choose_relativity(origin, relativize)
rdtypes = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- nrdtype = dns.rdatatype.from_text(value)
+ nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError, "NSEC with bit 0"
if nrdtype > 65535:
@@ -85,7 +85,7 @@ class NSEC(dns.rdata.Rdata):
bitmap[byte] = chr(ord(bitmap[byte]) | (0x80 >> bit))
windows.append((window, ''.join(bitmap[0:octets])))
return cls(rdclass, rdtype, next, windows)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -94,7 +94,7 @@ class NSEC(dns.rdata.Rdata):
file.write(chr(window))
file.write(chr(len(bitmap)))
file.write(bitmap)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(next, cused) = dns.name.from_wire(wire[: current + rdlen], current)
current += cused
@@ -123,7 +123,7 @@ class NSEC(dns.rdata.Rdata):
def choose_relativity(self, origin = None, relativize = True):
self.next = self.next.choose_relativity(origin, relativize)
-
+
def _cmp(self, other):
v = cmp(self.next, other.next)
if v == 0:
diff --git a/dns/rdtypes/ANY/NSEC3.py b/dns/rdtypes/ANY/NSEC3.py
index e568d7f..1d1782f 100644
--- a/dns/rdtypes/ANY/NSEC3.py
+++ b/dns/rdtypes/ANY/NSEC3.py
@@ -93,10 +93,10 @@ class NSEC3(dns.rdata.Rdata):
next = base64.b32decode(next)
rdtypes = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- nrdtype = dns.rdatatype.from_text(value)
+ nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError, "NSEC3 with bit 0"
if nrdtype > 65535:
diff --git a/dns/rdtypes/ANY/NXT.py b/dns/rdtypes/ANY/NXT.py
index 4b2571c..10443a3 100644
--- a/dns/rdtypes/ANY/NXT.py
+++ b/dns/rdtypes/ANY/NXT.py
@@ -53,13 +53,13 @@ class NXT(dns.rdata.Rdata):
'\x00', '\x00', '\x00', '\x00',
'\x00', '\x00', '\x00', '\x00' ]
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- if value.isdigit():
- nrdtype = int(value)
+ if token.value.isdigit():
+ nrdtype = int(token.value)
else:
- nrdtype = dns.rdatatype.from_text(value)
+ nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError, "NXT with bit 0"
if nrdtype > 127:
diff --git a/dns/rdtypes/IN/A.py b/dns/rdtypes/IN/A.py
index 098a275..71654f2 100644
--- a/dns/rdtypes/IN/A.py
+++ b/dns/rdtypes/IN/A.py
@@ -25,7 +25,7 @@ class A(dns.rdata.Rdata):
@type address: string (in the standard "dotted quad" format)"""
__slots__ = ['address']
-
+
def __init__(self, rdclass, rdtype, address):
super(A, self).__init__(rdclass, rdtype)
# check that it's OK
@@ -34,19 +34,17 @@ class A(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
return self.address
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
- (ttype, address) = tok.get()
- if ttype != dns.tokenizer.IDENTIFIER:
- raise dns.exception.SyntaxError
- t = tok.get_eol()
+ address = tok.get_identifier()
+ tok.get_eol()
return cls(rdclass, rdtype, address)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(dns.ipv4.inet_aton(self.address))
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = dns.ipv4.inet_ntoa(wire[current : current + rdlen])
return cls(rdclass, rdtype, address)
diff --git a/dns/rdtypes/IN/AAAA.py b/dns/rdtypes/IN/AAAA.py
index 086128e..dfa2606 100644
--- a/dns/rdtypes/IN/AAAA.py
+++ b/dns/rdtypes/IN/AAAA.py
@@ -25,7 +25,7 @@ class AAAA(dns.rdata.Rdata):
@type address: string (in the standard IPv6 format)"""
__slots__ = ['address']
-
+
def __init__(self, rdclass, rdtype, address):
super(AAAA, self).__init__(rdclass, rdtype)
# check that it's OK
@@ -34,19 +34,17 @@ class AAAA(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
return self.address
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
- (ttype, address) = tok.get()
- if ttype != dns.tokenizer.IDENTIFIER:
- raise dns.exception.SyntaxError
- t = tok.get_eol()
+ address = tok.get_identifier()
+ tok.get_eol()
return cls(rdclass, rdtype, address)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(dns.inet.inet_pton(dns.inet.AF_INET6, self.address))
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = dns.inet.inet_ntop(dns.inet.AF_INET6,
wire[current : current + rdlen])
diff --git a/dns/rdtypes/IN/APL.py b/dns/rdtypes/IN/APL.py
index 3cefb25..21c02d5 100644
--- a/dns/rdtypes/IN/APL.py
+++ b/dns/rdtypes/IN/APL.py
@@ -23,7 +23,7 @@ import dns.tokenizer
class APLItem(object):
"""An APL list item.
-
+
@ivar family: the address family (IANA address family registry)
@type family: int
@ivar negation: is this item negated?
@@ -35,7 +35,7 @@ class APLItem(object):
"""
__slots__ = ['family', 'negation', 'address', 'prefix']
-
+
def __init__(self, family, negation, address, prefix):
self.family = family
self.negation = negation
@@ -80,20 +80,21 @@ class APL(dns.rdata.Rdata):
@see: RFC 3123"""
__slots__ = ['items']
-
+
def __init__(self, rdclass, rdtype, items):
super(APL, self).__init__(rdclass, rdtype)
self.items = items
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(map(lambda x: str(x), self.items))
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
items = []
while 1:
- (ttype, item) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
+ item = token.value
if item[0] == '!':
negation = True
item = item[1:]
@@ -107,13 +108,13 @@ class APL(dns.rdata.Rdata):
items.append(item)
return cls(rdclass, rdtype, items)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
for item in self.items:
item.to_wire(file)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
items = []
while 1:
@@ -165,5 +166,5 @@ class APL(dns.rdata.Rdata):
other.to_wire(f)
wire2 = f.getvalue()
f.close()
-
+
return cmp(wire1, wire2)
diff --git a/dns/rdtypes/IN/DHCID.py b/dns/rdtypes/IN/DHCID.py
index 676e609..934d478 100644
--- a/dns/rdtypes/IN/DHCID.py
+++ b/dns/rdtypes/IN/DHCID.py
@@ -24,7 +24,7 @@ class DHCID(dns.rdata.Rdata):
@see: RFC 4701"""
__slots__ = ['data']
-
+
def __init__(self, rdclass, rdtype, data):
super(DHCID, self).__init__(rdclass, rdtype)
self.data = data
@@ -36,20 +36,20 @@ class DHCID(dns.rdata.Rdata):
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
data = b64.decode('base64_codec')
return cls(rdclass, rdtype, data)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(self.data)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
data = wire[current : current + rdlen]
return cls(rdclass, rdtype, data)
@@ -57,4 +57,4 @@ class DHCID(dns.rdata.Rdata):
from_wire = classmethod(from_wire)
def _cmp(self, other):
- return cmp(self.data, other.data)
+ return cmp(self.data, other.data)
diff --git a/dns/rdtypes/IN/IPSECKEY.py b/dns/rdtypes/IN/IPSECKEY.py
index 4aba53b..15eb546 100644
--- a/dns/rdtypes/IN/IPSECKEY.py
+++ b/dns/rdtypes/IN/IPSECKEY.py
@@ -87,11 +87,11 @@ class IPSECKEY(dns.rdata.Rdata):
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
key = b64.decode('base64_codec')
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
diff --git a/dns/rdtypes/IN/WKS.py b/dns/rdtypes/IN/WKS.py
index 9c15840..4fdded6 100644
--- a/dns/rdtypes/IN/WKS.py
+++ b/dns/rdtypes/IN/WKS.py
@@ -34,7 +34,7 @@ class WKS(dns.rdata.Rdata):
@see: RFC 1035"""
__slots__ = ['address', 'protocol', 'bitmap']
-
+
def __init__(self, rdclass, rdtype, address, protocol, bitmap):
super(WKS, self).__init__(rdclass, rdtype)
self.address = address
@@ -50,7 +50,7 @@ class WKS(dns.rdata.Rdata):
bits.append(str(i * 8 + j))
text = ' '.join(bits)
return '%s %d %s' % (self.address, self.protocol, text)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
@@ -60,11 +60,11 @@ class WKS(dns.rdata.Rdata):
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- if value.isdigit():
- serv = int(value)
+ if token.value.isdigit():
+ serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError, "protocol must be TCP or UDP"
@@ -72,7 +72,7 @@ class WKS(dns.rdata.Rdata):
protocol_text = "udp"
else:
protocol_text = "tcp"
- serv = socket.getservbyname(value, protocol_text)
+ serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
@@ -81,7 +81,7 @@ class WKS(dns.rdata.Rdata):
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -89,7 +89,7 @@ class WKS(dns.rdata.Rdata):
protocol = struct.pack('!B', self.protocol)
file.write(protocol)
file.write(self.bitmap)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = dns.ipv4.inet_ntoa(wire[current : current + 4])
protocol, = struct.unpack('!B', wire[current + 4 : current + 5])
diff --git a/dns/rdtypes/dsbase.py b/dns/rdtypes/dsbase.py
index 6ba2123..09b86cc 100644
--- a/dns/rdtypes/dsbase.py
+++ b/dns/rdtypes/dsbase.py
@@ -54,11 +54,11 @@ class DSBase(dns.rdata.Rdata):
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
digest = ''.join(chunks)
digest = digest.decode('hex_codec')
return cls(rdclass, rdtype, key_tag, algorithm, digest_type,
diff --git a/dns/rdtypes/keybase.py b/dns/rdtypes/keybase.py
index 763bd1e..52114f6 100644
--- a/dns/rdtypes/keybase.py
+++ b/dns/rdtypes/keybase.py
@@ -61,7 +61,7 @@ _protocol_from_text = {
'IPSEC' : 4,
'ALL' : 255,
}
-
+
class KEYBase(dns.rdata.Rdata):
"""KEY-like record base
@@ -75,7 +75,7 @@ class KEYBase(dns.rdata.Rdata):
@type key: string"""
__slots__ = ['flags', 'protocol', 'algorithm', 'key']
-
+
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
super(KEYBase, self).__init__(rdclass, rdtype)
self.flags = flags
@@ -108,27 +108,27 @@ class KEYBase(dns.rdata.Rdata):
if protocol is None:
raise dns.exception.SyntaxError, \
'unknown protocol %s' % protocol
-
+
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
key = b64.decode('base64_codec')
return cls(rdclass, rdtype, flags, protocol, algorithm, key)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
header = struct.pack("!HBB", self.flags, self.protocol, self.algorithm)
file.write(header)
file.write(self.key)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
if rdlen < 4:
raise dns.exception.FormError
diff --git a/dns/rdtypes/sigbase.py b/dns/rdtypes/sigbase.py
index b20e7da..e035bff 100644
--- a/dns/rdtypes/sigbase.py
+++ b/dns/rdtypes/sigbase.py
@@ -40,7 +40,7 @@ def sigtime_to_posixtime(what):
def posixtime_to_sigtime(what):
return time.strftime('%Y%m%d%H%M%S', time.gmtime(what))
-
+
class SIGBase(dns.rdata.Rdata):
"""SIG-like record base
@@ -66,7 +66,7 @@ class SIGBase(dns.rdata.Rdata):
__slots__ = ['type_covered', 'algorithm', 'labels', 'original_ttl',
'expiration', 'inception', 'key_tag', 'signer',
'signature']
-
+
def __init__(self, rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature):
@@ -83,7 +83,7 @@ class SIGBase(dns.rdata.Rdata):
def covers(self):
return self.type_covered
-
+
def to_text(self, origin=None, relativize=True, **kw):
return '%s %d %d %d %s %s %d %s %s' % (
dns.rdatatype.to_text(self.type_covered),
@@ -110,17 +110,17 @@ class SIGBase(dns.rdata.Rdata):
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
signature = b64.decode('base64_codec')
return cls(rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -131,7 +131,7 @@ class SIGBase(dns.rdata.Rdata):
file.write(header)
self.signer.to_wire(file, None, origin)
file.write(self.signature)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
header = struct.unpack('!HBBIIIH', wire[current : current + 18])
current += 18
@@ -150,7 +150,7 @@ class SIGBase(dns.rdata.Rdata):
def choose_relativity(self, origin = None, relativize = True):
self.signer = self.signer.choose_relativity(origin, relativize)
-
+
def _cmp(self, other):
hs = struct.pack('!HBBIIIH', self.type_covered,
self.algorithm, self.labels,
diff --git a/dns/rdtypes/txtbase.py b/dns/rdtypes/txtbase.py
index 400a0e5..b3dd086 100644
--- a/dns/rdtypes/txtbase.py
+++ b/dns/rdtypes/txtbase.py
@@ -27,7 +27,7 @@ class TXTBase(dns.rdata.Rdata):
@see: RFC 1035"""
__slots__ = ['strings']
-
+
def __init__(self, rdclass, rdtype, strings):
super(TXTBase, self).__init__(rdclass, rdtype)
if isinstance(strings, str):
@@ -41,23 +41,22 @@ class TXTBase(dns.rdata.Rdata):
txt += '%s"%s"' % (prefix, dns.rdata._escapify(s))
prefix = ' '
return txt
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
strings = []
while 1:
- (ttype, s) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- if ttype != dns.tokenizer.QUOTED_STRING and \
- ttype != dns.tokenizer.IDENTIFIER:
+ if not (token.is_quoted_string() or token.is_identifier()):
raise dns.exception.SyntaxError, "expected a string"
- if len(s) > 255:
+ if len(token.value) > 255:
raise dns.exception.SyntaxError, "string too long"
- strings.append(s)
+ strings.append(token.value)
if len(strings) == 0:
raise dns.exception.UnexpectedEnd
return cls(rdclass, rdtype, strings)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
@@ -67,7 +66,7 @@ class TXTBase(dns.rdata.Rdata):
byte = chr(l)
file.write(byte)
file.write(s)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
strings = []
while rdlen > 0:
diff --git a/dns/tokenizer.py b/dns/tokenizer.py
index a2067d2..cc4566e 100644
--- a/dns/tokenizer.py
+++ b/dns/tokenizer.py
@@ -45,14 +45,76 @@ class UngetBufferFull(dns.exception.DNSException):
"""Raised when an attempt is made to unget a token when the unget
buffer is full."""
pass
-
+
+class Token(object):
+ """A DNS master file format token.
+
+ @ivar ttype: The token type
+ @type ttype: int
+ @ivar value: The token value
+ @type value: string
+ """
+
+ def __init__(self, ttype, value='', has_escape=False):
+ """Initialize a token instance.
+
+ @param ttype: The token type
+ @type ttype: int
+ @ivar value: The token value
+ @type value: string
+ @ivar has_escape: Does the token value contain escapes?
+ @type has_escape: bool
+ """
+ self.ttype = ttype
+ self.value = value
+ self.has_escape = has_escape
+
+ def is_eof(self):
+ return self.ttype == EOF
+
+ def is_eol(self):
+ return self.ttype == EOL
+
+ def is_whitespace(self):
+ return self.ttype == WHITESPACE
+
+ def is_identifier(self):
+ return self.ttype == IDENTIFIER
+
+ def is_quoted_string(self):
+ return self.ttype == QUOTED_STRING
+
+ def is_comment(self):
+ return self.ttype == COMMENT
+
+ def is_delimiter(self):
+ return self.ttype == DELIMITER
+
+ def is_eol_or_eof(self):
+ return (self.ttype == EOL or self.ttype == EOF)
+
+ def __eq__(self, other):
+ if not isinstance(other, Token):
+ return False
+ return (self.ttype == other.ttype and
+ self.value == other.value)
+
+ def __ne__(self, other):
+ if not isinstance(other, Token):
+ return True
+ return (self.ttype != other.ttype or
+ self.value != other.value)
+
+ def __str__(self):
+ return '%d "%s"' % (self.ttype, self.value)
+
class Tokenizer(object):
"""A DNS master file format tokenizer.
A token is a (type, value) tuple, where I{type} is an int, and
I{value} is a string. The valid types are EOF, EOL, WHITESPACE,
IDENTIFIER, QUOTED_STRING, COMMENT, and DELIMITER.
-
+
@ivar file: The file to tokenize
@type file: file
@ivar ungotten_char: The most recently ungotten character, or None.
@@ -75,7 +137,7 @@ class Tokenizer(object):
@ivar filename: A filename that will be returned by the L{where} method.
@type filename: string
"""
-
+
def __init__(self, f=sys.stdin, filename=None):
"""Initialize a tokenizer instance.
@@ -87,7 +149,7 @@ class Tokenizer(object):
will return.
@type filename: string
"""
-
+
if isinstance(f, str):
f = cStringIO.StringIO(f)
if filename is None:
@@ -112,7 +174,7 @@ class Tokenizer(object):
"""Read a character from input.
@rtype: string
"""
-
+
if self.ungotten_char is None:
if self.eof:
c = ''
@@ -133,21 +195,21 @@ class Tokenizer(object):
@rtype: (string, int) tuple. The first item is the filename of
the input, the second is the current line number.
"""
-
+
return (self.filename, self.line_number)
-
+
def _unget_char(self, c):
"""Unget a character.
The unget buffer for characters is only one character large; it is
an error to try to unget a character when the unget buffer is not
empty.
-
+
@param c: the character to unget
@type c: string
@raises UngetBufferFull: there is already an ungotten char
"""
-
+
if not self.ungotten_char is None:
raise UngetBufferFull
self.ungotten_char = c
@@ -162,7 +224,7 @@ class Tokenizer(object):
@rtype: int
"""
-
+
skipped = 0
while True:
c = self._get_char()
@@ -181,25 +243,25 @@ class Tokenizer(object):
@param want_comment: If True, return a COMMENT token if the
first token read is a comment. The default is False.
@type want_comment: bool
- @rtype: (int, string) tuple
+ @rtype: Token object
@raises dns.exception.UnexpectedEnd: input ended prematurely
@raises dns.exception.SyntaxError: input was badly formed
"""
-
+
if not self.ungotten_token is None:
token = self.ungotten_token
self.ungotten_token = None
- if token[0] == WHITESPACE:
+ if token.is_whitespace():
if want_leading:
return token
- elif token[0] == COMMENT:
+ elif token.is_comment():
if want_comment:
return token
else:
return token
skipped = self.skip_whitespace()
if want_leading and skipped > 0:
- return (WHITESPACE, ' ')
+ return Token(WHITESPACE, ' ')
token = ''
ttype = IDENTIFIER
while True:
@@ -230,7 +292,7 @@ class Tokenizer(object):
self.skip_whitespace()
continue
elif c == '\n':
- return (EOL, '\n')
+ return Token(EOL, '\n')
elif c == ';':
while 1:
c = self._get_char()
@@ -239,18 +301,18 @@ class Tokenizer(object):
token += c
if want_comment:
self._unget_char(c)
- return (COMMENT, token)
+ return Token(COMMENT, token)
elif c == '':
if self.multiline:
raise dns.exception.SyntaxError, \
'unbalanced parentheses'
- return (EOF, '')
+ return Token(EOF)
elif self.multiline:
self.skip_whitespace()
token = ''
continue
else:
- return (EOL, '\n')
+ return Token(EOL, '\n')
else:
# This code exists in case we ever want a
# delimiter to be returned. It never produces
@@ -279,7 +341,7 @@ class Tokenizer(object):
raise dns.exception.SyntaxError, 'newline in quoted string'
elif c == '\\':
#
- # Treat \ followed by a delimiter as the
+ # Treat \ followed by a delimiter as the
# delimiter, otherwise leave it alone.
#
c = self._get_char()
@@ -291,7 +353,7 @@ class Tokenizer(object):
if self.multiline:
raise dns.exception.SyntaxError, 'unbalanced parentheses'
ttype = EOF
- return (ttype, token)
+ return Token(ttype, token)
def unget(self, token):
"""Unget a token.
@@ -299,9 +361,9 @@ class Tokenizer(object):
The unget buffer for tokens is only one token large; it is
an error to try to unget a token when the unget buffer is not
empty.
-
+
@param token: the token to unget
- @type token: (int, string) token tuple
+ @type token: Token object
@raises UngetBufferFull: there is already an ungotten token
"""
@@ -313,9 +375,9 @@ class Tokenizer(object):
"""Return the next item in an iteration.
@rtype: (int, string)
"""
-
+
token = self.get()
- if token[0] == EOF:
+ if token.is_eof():
raise StopIteration
return token
@@ -326,26 +388,26 @@ class Tokenizer(object):
def get_int(self):
"""Read the next token and interpret it as an integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
- (ttype, value) = self.get()
- if ttype != IDENTIFIER:
+
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- if not value.isdigit():
+ if not token.value.isdigit():
raise dns.exception.SyntaxError, 'expecting an integer'
- return int(value)
+ return int(token.value)
def get_uint8(self):
"""Read the next token and interpret it as an 8-bit unsigned
integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
+
value = self.get_int()
if value < 0 or value > 255:
raise dns.exception.SyntaxError, \
@@ -355,11 +417,11 @@ class Tokenizer(object):
def get_uint16(self):
"""Read the next token and interpret it as a 16-bit unsigned
integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
+
value = self.get_int()
if value < 0 or value > 65535:
raise dns.exception.SyntaxError, \
@@ -369,17 +431,17 @@ class Tokenizer(object):
def get_uint32(self):
"""Read the next token and interpret it as a 32-bit unsigned
integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
- (ttype, value) = self.get()
- if ttype != IDENTIFIER:
+
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- if not value.isdigit():
+ if not token.value.isdigit():
raise dns.exception.SyntaxError, 'expecting an integer'
- value = long(value)
+ value = long(token.value)
if value < 0 or value > 4294967296L:
raise dns.exception.SyntaxError, \
'%d is not an unsigned 32-bit integer' % value
@@ -387,26 +449,38 @@ class Tokenizer(object):
def get_string(self, origin=None):
"""Read the next token and interpret it as a string.
-
+
@raises dns.exception.SyntaxError:
@rtype: string
"""
-
- (ttype, t) = self.get()
- if ttype != IDENTIFIER and ttype != QUOTED_STRING:
+
+ token = self.get()
+ if not (token.is_identifier() or token.is_quoted_string()):
raise dns.exception.SyntaxError, 'expecting a string'
- return t
+ return token.value
+
+ def get_identifier(self, origin=None):
+ """Read the next token and raise an exception if it is not an identifier.
+
+ @raises dns.exception.SyntaxError:
+ @rtype: string
+ """
+
+ token = self.get()
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError, 'expecting an identifier'
+ return token.value
def get_name(self, origin=None):
"""Read the next token and interpret it as a DNS name.
-
+
@raises dns.exception.SyntaxError:
@rtype: dns.name.Name object"""
-
- (ttype, t) = self.get()
- if ttype != IDENTIFIER:
+
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- return dns.name.from_text(t, origin)
+ return dns.name.from_text(token.value, origin)
def get_eol(self):
"""Read the next token and raise an exception if it isn't EOL or
@@ -415,15 +489,15 @@ class Tokenizer(object):
@raises dns.exception.SyntaxError:
@rtype: string
"""
-
- (ttype, t) = self.get()
- if ttype != EOL and ttype != EOF:
+
+ token = self.get()
+ if not token.is_eol_or_eof():
raise dns.exception.SyntaxError, \
- 'expected EOL or EOF, got %d "%s"' % (ttype, t)
- return t
+ 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value)
+ return token.value
def get_ttl(self):
- (ttype, t) = self.get()
- if ttype != IDENTIFIER:
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- return dns.ttl.from_text(t)
+ return dns.ttl.from_text(token.value)
diff --git a/dns/zone.py b/dns/zone.py
index a40678f..6e1231f 100644
--- a/dns/zone.py
+++ b/dns/zone.py
@@ -54,7 +54,7 @@ class Zone(object):
dns.name.Name object, or it may be a string. In the either case,
if the name is relative it is treated as relative to the origin of
the zone.
-
+
@ivar rdclass: The zone's rdata class; the default is class IN.
@type rdclass: int
@ivar origin: The origin of the zone.
@@ -71,7 +71,7 @@ class Zone(object):
node_factory = dns.node.Node
__slots__ = ['rdclass', 'origin', 'nodes', 'relativize']
-
+
def __init__(self, origin, rdclass=dns.rdataclass.IN, relativize=True):
"""Initialize a zone object.
@@ -90,7 +90,7 @@ class Zone(object):
nodes.
@rtype: bool
"""
-
+
if not isinstance(other, Zone):
return False
if self.rdclass != other.rdclass or \
@@ -103,7 +103,7 @@ class Zone(object):
"""Are two zones not equal?
@rtype: bool
"""
-
+
return not self.__eq__(other)
def _validate_name(self, name):
@@ -119,7 +119,7 @@ class Zone(object):
if self.relativize:
name = name.relativize(self.origin)
return name
-
+
def __getitem__(self, key):
key = self._validate_name(key)
return self.nodes[key]
@@ -170,7 +170,7 @@ class Zone(object):
@raises KeyError: the name is not known and create was not specified.
@rtype: dns.node.Node object
"""
-
+
name = self._validate_name(name)
node = self.nodes.get(name)
if node is None:
@@ -186,7 +186,7 @@ class Zone(object):
This method is like L{find_node}, except it returns None instead
of raising an exception if the node does not exist and creation
has not been requested.
-
+
@param name: the name of the node to find
@type name: dns.name.Name object or string
@param create: should the node be created if it doesn't exist?
@@ -205,11 +205,11 @@ class Zone(object):
It is not an error if the node does not exist.
"""
-
+
name = self._validate_name(name)
if self.nodes.has_key(name):
del self.nodes[name]
-
+
def find_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,
create=False):
"""Look for rdata with the specified name and type in the zone,
@@ -221,7 +221,7 @@ class Zone(object):
The rdataset returned is not a copy; changes to it will change
the zone.
-
+
KeyError is raised if the name or type are not found.
Use L{get_rdataset} if you want to have None returned instead.
@@ -257,7 +257,7 @@ class Zone(object):
The rdataset returned is not a copy; changes to it will change
the zone.
-
+
None is returned if the name or type are not found.
Use L{find_rdataset} if you want to have KeyError raised instead.
@@ -314,7 +314,7 @@ class Zone(object):
def replace_rdataset(self, name, replacement):
"""Replace an rdataset at name.
-
+
It is not an error if there is no rdataset matching I{replacement}.
Ownership of the I{replacement} object is transferred to the zone;
@@ -341,7 +341,7 @@ class Zone(object):
The I{name}, I{rdtype}, and I{covers} parameters may be
strings, in which case they will be converted to their proper
type.
-
+
This method is less efficient than the similar
L{find_rdataset} because it creates an RRset instead of
returning the matching rdataset. It may be more convenient
@@ -381,7 +381,7 @@ class Zone(object):
The I{name}, I{rdtype}, and I{covers} parameters may be
strings, in which case they will be converted to their proper
type.
-
+
This method is less efficient than the similar L{get_rdataset}
because it creates an RRset instead of returning the matching
rdataset. It may be more convenient for some uses since it
@@ -420,7 +420,7 @@ class Zone(object):
@param covers: the covered type (defaults to None)
@type covers: int or string
"""
-
+
if isinstance(rdtype, str):
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
@@ -443,7 +443,7 @@ class Zone(object):
@param covers: the covered type (defaults to None)
@type covers: int or string
"""
-
+
if isinstance(rdtype, str):
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
@@ -457,7 +457,7 @@ class Zone(object):
def to_file(self, f, sorted=True, relativize=True, nl=None):
"""Write a zone to a file.
-
+
@param f: file or string. If I{f} is a string, it is treated
as the name of a file to open.
@param sorted: if True, the file will be written with the
@@ -566,22 +566,21 @@ class _MasterReader(object):
def _eat_line(self):
while 1:
- (ttype, t) = self.tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = self.tok.get()
+ if token.is_eol_or_eof():
break
-
+
def _rr_line(self):
"""Process one line from a DNS master file."""
# Name
if self.current_origin is None:
raise UnknownOrigin
token = self.tok.get(want_leading = True)
- if token[0] != dns.tokenizer.WHITESPACE:
- self.last_name = dns.name.from_text(token[1], self.current_origin)
+ if not token.is_whitespace():
+ self.last_name = dns.name.from_text(token.value, self.current_origin)
else:
token = self.tok.get()
- if token[0] == dns.tokenizer.EOL or \
- token[0] == dns.tokenizer.EOF:
+ if token.is_eol_or_eof():
# treat leading WS followed by EOL/EOF as if they were EOL/EOF.
return
self.tok.unget(token)
@@ -592,21 +591,21 @@ class _MasterReader(object):
if self.relativize:
name = name.relativize(self.zone.origin)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
# TTL
try:
- ttl = dns.ttl.from_text(token[1])
+ ttl = dns.ttl.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.ttl.BadTTL:
ttl = self.ttl
# Class
try:
- rdclass = dns.rdataclass.from_text(token[1])
+ rdclass = dns.rdataclass.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.exception.SyntaxError:
raise dns.exception.SyntaxError
@@ -616,10 +615,10 @@ class _MasterReader(object):
raise dns.exception.SyntaxError, "RR class is not zone's class"
# Type
try:
- rdtype = dns.rdatatype.from_text(token[1])
+ rdtype = dns.rdatatype.from_text(token.value)
except:
raise dns.exception.SyntaxError, \
- "unknown rdatatype '%s'" % token[1]
+ "unknown rdatatype '%s'" % token.value
n = self.zone.nodes.get(name)
if n is None:
n = self.zone.node_factory()
@@ -637,7 +636,6 @@ class _MasterReader(object):
# correct, but it is correct almost all of the time.
# We convert them to syntax errors so that we can emit
# helpful filename:line info.
-
(ty, va) = sys.exc_info()[:2]
raise dns.exception.SyntaxError, \
"caught exception %s: %s" % (str(ty), str(va))
@@ -657,7 +655,7 @@ class _MasterReader(object):
try:
while 1:
token = self.tok.get(True, True)
- if token[0] == dns.tokenizer.EOF:
+ if token.is_eof():
if not self.current_file is None:
self.current_file.close()
if len(self.saved_state) > 0:
@@ -668,18 +666,18 @@ class _MasterReader(object):
self.ttl) = self.saved_state.pop(-1)
continue
break
- elif token[0] == dns.tokenizer.EOL:
+ elif token.is_eol():
continue
- elif token[0] == dns.tokenizer.COMMENT:
+ elif token.is_comment():
self.tok.get_eol()
continue
- elif token[1][0] == '$':
- u = token[1].upper()
+ elif token.value[0] == '$':
+ u = token.value.upper()
if u == '$TTL':
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError, "bad $TTL"
- self.ttl = dns.ttl.from_text(token[1])
+ self.ttl = dns.ttl.from_text(token.value)
self.tok.get_eol()
elif u == '$ORIGIN':
self.current_origin = self.tok.get_name()
@@ -688,17 +686,16 @@ class _MasterReader(object):
self.zone.origin = self.current_origin
elif u == '$INCLUDE' and self.allow_include:
token = self.tok.get()
- if token[0] != dns.tokenizer.QUOTED_STRING:
+ if not token.is_quoted_string():
raise dns.exception.SyntaxError, \
"bad filename in $INCLUDE"
- filename = token[1]
+ filename = token.value
token = self.tok.get()
- if token[0] == dns.tokenizer.IDENTIFIER:
- new_origin = dns.name.from_text(token[1], \
- self.current_origin)
+ if token.is_identifier():
+ new_origin = dns.name.from_text(token.value, \
+ self.current_origin)
self.tok.get_eol()
- elif token[0] != dns.tokenizer.EOL and \
- token[0] != dns.tokenizer.EOF:
+ elif not token.is_eol_or_eof():
raise dns.exception.SyntaxError, \
"bad origin in $INCLUDE"
else:
@@ -724,7 +721,7 @@ class _MasterReader(object):
detail = "syntax error"
raise dns.exception.SyntaxError, \
"%s:%d: %s" % (filename, line_number, detail)
-
+
# Now that we're done reading, do some basic checking of the zone.
if self.check_origin:
self.zone.check_origin()
@@ -819,7 +816,7 @@ def from_file(f, origin = None, rdclass = dns.rdataclass.IN,
if filename is None:
filename = '<file>'
want_close = False
-
+
try:
z = from_text(f, origin, rdclass, relativize, zone_factory,
filename, allow_include, check_origin)
@@ -830,7 +827,7 @@ def from_file(f, origin = None, rdclass = dns.rdataclass.IN,
def from_xfr(xfr, zone_factory=Zone, relativize=True):
"""Convert the output of a zone transfer generator into a zone object.
-
+
@param xfr: The xfr generator
@type xfr: generator of dns.message.Message objects
@param relativize: should names be relativized? The default is True.
@@ -841,7 +838,7 @@ def from_xfr(xfr, zone_factory=Zone, relativize=True):
@raises dns.zone.NoNS: No NS RRset was found at the zone origin
@rtype: dns.zone.Zone object
"""
-
+
z = None
for r in xfr:
if z is None:
diff --git a/tests/tokenizer.py b/tests/tokenizer.py
index 7ae981b..c5d41ac 100644
--- a/tests/tokenizer.py
+++ b/tests/tokenizer.py
@@ -18,112 +18,106 @@ import unittest
import dns.exception
import dns.tokenizer
+Token = dns.tokenizer.Token
+
class TokenizerTestCase(unittest.TestCase):
-
+
def testQuotedString1(self):
tok = dns.tokenizer.Tokenizer(r'"foo"')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == 'foo')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, 'foo'))
def testQuotedString2(self):
tok = dns.tokenizer.Tokenizer(r'""')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == '')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, ''))
def testQuotedString3(self):
tok = dns.tokenizer.Tokenizer(r'"\"foo\""')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == '"foo"')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, '"foo"'))
def testQuotedString4(self):
tok = dns.tokenizer.Tokenizer(r'"foo\010bar"')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == 'foo\x0abar')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, 'foo\x0abar'))
def testQuotedString5(self):
def bad():
tok = dns.tokenizer.Tokenizer(r'"foo')
- (ttype, value) = tok.get()
+ token = tok.get()
self.failUnlessRaises(dns.exception.UnexpectedEnd, bad)
def testQuotedString6(self):
def bad():
tok = dns.tokenizer.Tokenizer(r'"foo\01')
- (ttype, value) = tok.get()
+ token = tok.get()
self.failUnlessRaises(dns.exception.SyntaxError, bad)
def testQuotedString7(self):
def bad():
tok = dns.tokenizer.Tokenizer('"foo\nbar"')
- (ttype, value) = tok.get()
+ token = tok.get()
self.failUnlessRaises(dns.exception.SyntaxError, bad)
def testEmpty1(self):
tok = dns.tokenizer.Tokenizer('')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.EOF)
+ token = tok.get()
+ self.failUnless(token.is_eof())
def testEmpty2(self):
tok = dns.tokenizer.Tokenizer('')
- (ttype1, value1) = tok.get()
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOF and
- ttype2 == dns.tokenizer.EOF)
+ token1 = tok.get()
+ token2 = tok.get()
+ self.failUnless(token1.is_eof() and token2.is_eof())
def testEOL(self):
tok = dns.tokenizer.Tokenizer('\n')
- (ttype1, value1) = tok.get()
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOL and
- ttype2 == dns.tokenizer.EOF)
+ token1 = tok.get()
+ token2 = tok.get()
+ self.failUnless(token1.is_eol() and token2.is_eof())
def testWS1(self):
tok = dns.tokenizer.Tokenizer(' \n')
- (ttype1, value1) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOL)
+ token1 = tok.get()
+ self.failUnless(token1.is_eol())
def testWS2(self):
tok = dns.tokenizer.Tokenizer(' \n')
- (ttype1, value1) = tok.get(want_leading=True)
- self.failUnless(ttype1 == dns.tokenizer.WHITESPACE)
+ token1 = tok.get(want_leading=True)
+ self.failUnless(token1.is_whitespace())
def testComment1(self):
tok = dns.tokenizer.Tokenizer(' ;foo\n')
- (ttype1, value1) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOL)
+ token1 = tok.get()
+ self.failUnless(token1.is_eol())
def testComment2(self):
tok = dns.tokenizer.Tokenizer(' ;foo\n')
- (ttype1, value1) = tok.get(want_comment = True)
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.COMMENT and
- value1 == 'foo' and
- ttype2 == dns.tokenizer.EOL)
+ token1 = tok.get(want_comment = True)
+ token2 = tok.get()
+ self.failUnless(token1 == Token(dns.tokenizer.COMMENT, 'foo') and
+ token2.is_eol())
def testComment3(self):
tok = dns.tokenizer.Tokenizer(' ;foo bar\n')
- (ttype1, value1) = tok.get(want_comment = True)
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.COMMENT and
- value1 == 'foo bar' and
- ttype2 == dns.tokenizer.EOL)
+ token1 = tok.get(want_comment = True)
+ token2 = tok.get()
+ self.failUnless(token1 == Token(dns.tokenizer.COMMENT, 'foo bar') and
+ token2.is_eol())
def testMultiline1(self):
tok = dns.tokenizer.Tokenizer('( foo\n\n bar\n)')
tokens = list(iter(tok))
- self.failUnless(tokens == [(dns.tokenizer.IDENTIFIER, 'foo'),
- (dns.tokenizer.IDENTIFIER, 'bar')])
+ self.failUnless(tokens == [Token(dns.tokenizer.IDENTIFIER, 'foo'),
+ Token(dns.tokenizer.IDENTIFIER, 'bar')])
def testMultiline2(self):
tok = dns.tokenizer.Tokenizer('( foo\n\n bar\n)\n')
tokens = list(iter(tok))
- self.failUnless(tokens == [(dns.tokenizer.IDENTIFIER, 'foo'),
- (dns.tokenizer.IDENTIFIER, 'bar'),
- (dns.tokenizer.EOL, '\n')])
+ self.failUnless(tokens == [Token(dns.tokenizer.IDENTIFIER, 'foo'),
+ Token(dns.tokenizer.IDENTIFIER, 'bar'),
+ Token(dns.tokenizer.EOL, '\n')])
def testMultiline3(self):
def bad():
tok = dns.tokenizer.Tokenizer('foo)')
@@ -141,7 +135,8 @@ class TokenizerTestCase(unittest.TestCase):
t1 = tok.get()
tok.unget(t1)
t2 = tok.get()
- self.failUnless(t1 == t2 and t1 == (dns.tokenizer.IDENTIFIER, 'foo'))
+ self.failUnless(t1 == t2 and t1.ttype == dns.tokenizer.IDENTIFIER and \
+ t1.value == 'foo')
def testUnget2(self):
def bad():
@@ -164,12 +159,12 @@ class TokenizerTestCase(unittest.TestCase):
def testEscapedDelimiter1(self):
tok = dns.tokenizer.Tokenizer(r'ch\ ld')
t = tok.get()
- self.failUnless(t == (dns.tokenizer.IDENTIFIER, r'ch ld'))
+ self.failUnless(t.ttype == dns.tokenizer.IDENTIFIER and t.value == r'ch ld')
def testEscapedDelimiter2(self):
tok = dns.tokenizer.Tokenizer(r'ch\0ld')
t = tok.get()
- self.failUnless(t == (dns.tokenizer.IDENTIFIER, r'ch\0ld'))
+ self.failUnless(t.ttype == dns.tokenizer.IDENTIFIER and t.value == r'ch\0ld')
if __name__ == '__main__':
unittest.main()