diff options
Diffstat (limited to 'branches/2.1.x/ipaddr.py')
-rw-r--r-- | branches/2.1.x/ipaddr.py | 450 |
1 files changed, 194 insertions, 256 deletions
diff --git a/branches/2.1.x/ipaddr.py b/branches/2.1.x/ipaddr.py index 2cc4f59..ad27ae9 100644 --- a/branches/2.1.x/ipaddr.py +++ b/branches/2.1.x/ipaddr.py @@ -22,7 +22,7 @@ and networks. """ -__version__ = '2.1.9' +__version__ = '2.1.10' import struct @@ -134,7 +134,7 @@ def v4_int_to_packed(address): """ if address > _BaseV4._ALL_ONES: raise ValueError('Address too large for IPv4') - return struct.pack('!I', address) + return Bytes(struct.pack('!I', address)) def v6_int_to_packed(address): @@ -146,7 +146,7 @@ def v6_int_to_packed(address): Returns: The binary representation of this address. """ - return struct.pack('!QQ', address >> 64, address & (2**64 - 1)) + return Bytes(struct.pack('!QQ', address >> 64, address & (2**64 - 1))) def _find_address_range(addresses): @@ -270,12 +270,12 @@ def _collapse_address_list_recursive(addresses): Example: - ip1 = IPv4Network'1.1.0.0/24') - ip2 = IPv4Network'1.1.1.0/24') - ip3 = IPv4Network'1.1.2.0/24') - ip4 = IPv4Network'1.1.3.0/24') - ip5 = IPv4Network'1.1.4.0/24') - ip6 = IPv4Network'1.1.0.1/22') + ip1 = IPv4Network('1.1.0.0/24') + ip2 = IPv4Network('1.1.1.0/24') + ip3 = IPv4Network('1.1.2.0/24') + ip4 = IPv4Network('1.1.3.0/24') + ip5 = IPv4Network('1.1.4.0/24') + ip6 = IPv4Network('1.1.0.1/22') _collapse_address_list_recursive([ip1, ip2, ip3, ip4, ip5, ip6]) -> [IPv4Network('1.1.0.0/22'), IPv4Network('1.1.4.0/24')] @@ -368,15 +368,27 @@ def collapse_address_list(addresses): # backwards compatibility CollapseAddrList = collapse_address_list -# Test whether this Python implementation supports byte objects that -# are not identical to str ones. -# We need to exclude platforms where bytes == str so that we can -# distinguish between packed representations and strings, for example -# b'12::' (the IPv4 address 49.50.58.58) and '12::' (an IPv6 address). +# We need to distinguish between the string and packed-bytes representations +# of an IP address. For example, b'0::1' is the IPv4 address 48.58.58.49, +# while '0::1' is an IPv6 address. +# +# In Python 3, the native 'bytes' type already provides this functionality, +# so we use it directly. For earlier implementations where bytes is not a +# distinct type, we create a subclass of str to serve as a tag. +# +# Usage example (Python 2): +# ip = ipaddr.IPAddress(ipaddr.Bytes('xxxx')) +# +# Usage example (Python 3): +# ip = ipaddr.IPAddress(b'xxxx') try: - _compat_has_real_bytes = bytes is not str -except NameError: # <Python2.6 - _compat_has_real_bytes = False + if bytes is str: + raise TypeError("bytes is not a distinct type") + Bytes = bytes +except (NameError, TypeError): + class Bytes(str): + def __repr__(self): + return 'Bytes(%s)' % str.__repr__(self) def get_mixed_type_key(obj): """Return a key suitable for sorting between networks and addresses. @@ -435,11 +447,6 @@ class _BaseIP(_IPAddrBase): """ - def __init__(self, address): - if (not (_compat_has_real_bytes and isinstance(address, bytes)) - and '/' in str(address)): - raise AddressValueError(address) - def __eq__(self, other): try: return (self._ip == other._ip @@ -1009,15 +1016,14 @@ class _BaseV4(object): # Equivalent to 255.255.255.255 or 32 bits of 1's. _ALL_ONES = (2**IPV4LENGTH) - 1 + _DECIMAL_DIGITS = frozenset('0123456789') def __init__(self, address): self._version = 4 self._max_prefixlen = IPV4LENGTH - def _explode_shorthand_ip_string(self, ip_str=None): - if not ip_str: - ip_str = str(self) - return ip_str + def _explode_shorthand_ip_string(self): + return str(self) def _ip_int_from_string(self, ip_str): """Turn the given IP string into an integer for comparison. @@ -1029,20 +1035,44 @@ class _BaseV4(object): The IP ip_str as an integer. Raises: - AddressValueError: if the string isn't a valid IP string. + AddressValueError: if ip_str isn't a valid IPv4 Address. """ - packed_ip = 0 octets = ip_str.split('.') if len(octets) != 4: raise AddressValueError(ip_str) + + packed_ip = 0 for oc in octets: try: - packed_ip = (packed_ip << 8) | int(oc) + packed_ip = (packed_ip << 8) | self._parse_octet(oc) except ValueError: raise AddressValueError(ip_str) return packed_ip + def _parse_octet(self, octet_str): + """Convert a decimal octet into an integer. + + Args: + octet_str: A string, the number to parse. + + Returns: + The octet as an integer. + + Raises: + ValueError: if the octet isn't strictly a decimal from [0..255]. + + """ + # Whitelist the characters, since int() allows a lot of bizarre stuff. + if not self._DECIMAL_DIGITS.issuperset(octet_str): + raise ValueError + octet_int = int(octet_str, 10) + # Disallow leading zeroes, because no clear standard exists on + # whether these should be interpreted as decimal or octal. + if octet_int > 255 or (octet_str[0] == '0' and len(octet_str) > 1): + raise ValueError + return octet_int + def _string_from_ip_int(self, ip_int): """Turns a 32-bit integer into dotted decimal notation. @@ -1059,37 +1089,6 @@ class _BaseV4(object): ip_int >>= 8 return '.'.join(octets) - def _is_valid_ip(self, address): - """Validate the dotted decimal notation IP/netmask string. - - Args: - address: A string, either representing a quad-dotted ip - or an integer which is a valid IPv4 IP address. - - Returns: - A boolean, True if the string is a valid dotted decimal IP - string. - - """ - octets = address.split('.') - if len(octets) == 1: - # We have an integer rather than a dotted decimal IP. - try: - return int(address) >= 0 and int(address) <= self._ALL_ONES - except ValueError: - return False - - if len(octets) != 4: - return False - - for octet in octets: - try: - if not 0 <= int(octet) <= 255: - return False - except ValueError: - return False - return True - @property def max_prefixlen(self): return self._max_prefixlen @@ -1190,7 +1189,6 @@ class IPv4Address(_BaseV4, _BaseIP): AddressValueError: If ipaddr isn't a valid IPv4 address. """ - _BaseIP.__init__(self, address) _BaseV4.__init__(self, address) # Efficient constructor from integer. @@ -1201,17 +1199,16 @@ class IPv4Address(_BaseV4, _BaseIP): return # Constructing from a packed address - if _compat_has_real_bytes: - if isinstance(address, bytes) and len(address) == 4: - self._ip = struct.unpack('!I', address)[0] - return + if isinstance(address, Bytes): + try: + self._ip, = struct.unpack('!I', address) + except struct.error: + raise AddressValueError(address) # Wrong length. + return # Assume input argument to be string or any object representation # which converts into a formatted IP string. addr_str = str(address) - if not self._is_valid_ip(addr_str): - raise AddressValueError(addr_str) - self._ip = self._ip_int_from_string(addr_str) @@ -1276,25 +1273,14 @@ class IPv4Network(_BaseV4, _BaseNet): _BaseNet.__init__(self, address) _BaseV4.__init__(self, address) - # Efficient constructor from integer. - if isinstance(address, (int, long)): - self._ip = address - self.ip = IPv4Address(self._ip) + # Constructing from an integer or packed bytes. + if isinstance(address, (int, long, Bytes)): + self.ip = IPv4Address(address) + self._ip = self.ip._ip self._prefixlen = self._max_prefixlen self.netmask = IPv4Address(self._ALL_ONES) - if address < 0 or address > self._ALL_ONES: - raise AddressValueError(address) return - # Constructing from a packed address - if _compat_has_real_bytes: - if isinstance(address, bytes) and len(address) == 4: - self._ip = struct.unpack('!I', address)[0] - self.ip = IPv4Address(self._ip) - self._prefixlen = self._max_prefixlen - self.netmask = IPv4Address(self._ALL_ONES) - return - # Assume input argument to be string or any object representation # which converts into a formatted IP prefix string. addr = str(address).split('/') @@ -1302,9 +1288,6 @@ class IPv4Network(_BaseV4, _BaseNet): if len(addr) > 2: raise AddressValueError(address) - if not self._is_valid_ip(addr[0]): - raise AddressValueError(addr[0]) - self._ip = self._ip_int_from_string(addr[0]) self.ip = IPv4Address(self._ip) @@ -1338,6 +1321,8 @@ class IPv4Network(_BaseV4, _BaseNet): if self.ip != self.network: raise ValueError('%s has host bits set' % self.ip) + if self._prefixlen == (self._max_prefixlen - 1): + self.iterhosts = self.__iter__ def _is_hostmask(self, ip_str): """Test if the IP string is a hostmask (rather than a netmask). @@ -1403,12 +1388,14 @@ class _BaseV6(object): """ _ALL_ONES = (2**IPV6LENGTH) - 1 + _HEXTET_COUNT = 8 + _HEX_DIGITS = frozenset('0123456789ABCDEFabcdef') def __init__(self, address): self._version = 6 self._max_prefixlen = IPV6LENGTH - def _ip_int_from_string(self, ip_str=None): + def _ip_int_from_string(self, ip_str): """Turn an IPv6 ip_str into an integer. Args: @@ -1418,35 +1405,95 @@ class _BaseV6(object): A long, the IPv6 ip_str. Raises: - AddressValueError: if ip_str isn't a valid IP Address. + AddressValueError: if ip_str isn't a valid IPv6 Address. """ - if not ip_str: - ip_str = str(self.ip) + parts = ip_str.split(':') - ip_int = 0 - - # Do we have an IPv4 mapped (::ffff:a.b.c.d) or compact (::a.b.c.d) - # ip_str? - fields = ip_str.split(':') - if fields[-1].count('.') == 3: - ipv4_string = fields.pop() - ipv4_int = IPv4Network(ipv4_string)._ip - octets = [] - for _ in xrange(2): - octets.append(hex(ipv4_int & 0xFFFF).lstrip('0x').rstrip('L')) - ipv4_int >>= 16 - fields.extend(reversed(octets)) - ip_str = ':'.join(fields) - - fields = self._explode_shorthand_ip_string(ip_str).split(':') - for field in fields: - try: - ip_int = (ip_int << 16) + int(field or '0', 16) - except ValueError: + # An IPv6 address needs at least 2 colons (3 parts). + if len(parts) < 3: + raise AddressValueError(ip_str) + + # If the address has an IPv4-style suffix, convert it to hexadecimal. + if '.' in parts[-1]: + ipv4_int = IPv4Address(parts.pop())._ip + parts.append('%x' % ((ipv4_int >> 16) & 0xFFFF)) + parts.append('%x' % (ipv4_int & 0xFFFF)) + + # An IPv6 address can't have more than 8 colons (9 parts). + if len(parts) > self._HEXTET_COUNT + 1: + raise AddressValueError(ip_str) + + # Disregarding the endpoints, find '::' with nothing in between. + # This indicates that a run of zeroes has been skipped. + try: + skip_index, = ( + [i for i in xrange(1, len(parts) - 1) if not parts[i]] or + [None]) + except ValueError: + # Can't have more than one '::' + raise AddressValueError(ip_str) + + # parts_hi is the number of parts to copy from above/before the '::' + # parts_lo is the number of parts to copy from below/after the '::' + if skip_index is not None: + # If we found a '::', then check if it also covers the endpoints. + parts_hi = skip_index + parts_lo = len(parts) - skip_index - 1 + if not parts[0]: + parts_hi -= 1 + if parts_hi: + raise AddressValueError(ip_str) # ^: requires ^:: + if not parts[-1]: + parts_lo -= 1 + if parts_lo: + raise AddressValueError(ip_str) # :$ requires ::$ + parts_skipped = self._HEXTET_COUNT - (parts_hi + parts_lo) + if parts_skipped < 1: + raise AddressValueError(ip_str) + else: + # Otherwise, allocate the entire address to parts_hi. The endpoints + # could still be empty, but _parse_hextet() will check for that. + if len(parts) != self._HEXTET_COUNT: raise AddressValueError(ip_str) + parts_hi = len(parts) + parts_lo = 0 + parts_skipped = 0 + + try: + # Now, parse the hextets into a 128-bit integer. + ip_int = 0L + for i in xrange(parts_hi): + ip_int <<= 16 + ip_int |= self._parse_hextet(parts[i]) + ip_int <<= 16 * parts_skipped + for i in xrange(-parts_lo, 0): + ip_int <<= 16 + ip_int |= self._parse_hextet(parts[i]) + return ip_int + except ValueError: + raise AddressValueError(ip_str) - return ip_int + def _parse_hextet(self, hextet_str): + """Convert an IPv6 hextet string into an integer. + + Args: + hextet_str: A string, the number to parse. + + Returns: + The hextet as an integer. + + Raises: + ValueError: if the input isn't strictly a hex number from [0..FFFF]. + + """ + # Whitelist the characters, since int() allows a lot of bizarre stuff. + if not self._HEX_DIGITS.issuperset(hextet_str): + raise ValueError + hextet_int = int(hextet_str, 16) + if hextet_int > 0xFFFF: + raise ValueError + return hextet_int def _compress_hextets(self, hextets): """Compresses a list of hextets. @@ -1522,7 +1569,7 @@ class _BaseV6(object): hextets = self._compress_hextets(hextets) return ':'.join(hextets) - def _explode_shorthand_ip_string(self, ip_str=None): + def _explode_shorthand_ip_string(self): """Expand a shortened IPv6 address. Args: @@ -1532,108 +1579,20 @@ class _BaseV6(object): A string, the expanded IPv6 address. """ - if not ip_str: + if isinstance(self, _BaseNet): + ip_str = str(self.ip) + else: ip_str = str(self) - if isinstance(self, _BaseNet): - ip_str = str(self.ip) - - if self._is_shorthand_ip(ip_str): - new_ip = [] - hextet = ip_str.split('::') - - if len(hextet) > 1: - sep = len(hextet[0].split(':')) + len(hextet[1].split(':')) - new_ip = hextet[0].split(':') - - for _ in xrange(8 - sep): - new_ip.append('0000') - new_ip += hextet[1].split(':') - - else: - new_ip = ip_str.split(':') - # Now need to make sure every hextet is 4 lower case characters. - # If a hextet is < 4 characters, we've got missing leading 0's. - ret_ip = [] - for hextet in new_ip: - ret_ip.append(('0' * (4 - len(hextet)) + hextet).lower()) - return ':'.join(ret_ip) - # We've already got a longhand ip_str. - return ip_str - - def _is_valid_ip(self, ip_str): - """Ensure we have a valid IPv6 address. - - Probably not as exhaustive as it should be. - - Args: - ip_str: A string, the IPv6 address. - - Returns: - A boolean, True if this is a valid IPv6 address. - - """ - # We need to have at least one ':'. - if ':' not in ip_str: - return False - - # We can only have one '::' shortener. - if ip_str.count('::') > 1: - return False - - # '::' should be encompassed by start, digits or end. - if ':::' in ip_str: - return False - - # A single colon can neither start nor end an address. - if ((ip_str.startswith(':') and not ip_str.startswith('::')) or - (ip_str.endswith(':') and not ip_str.endswith('::'))): - return False - - # If we have no concatenation, we need to have 8 fields with 7 ':'. - if '::' not in ip_str and ip_str.count(':') != 7: - # We might have an IPv4 mapped address. - if ip_str.count('.') != 3: - return False - - ip_str = self._explode_shorthand_ip_string(ip_str) - - # Now that we have that all squared away, let's check that each of the - # hextets are between 0x0 and 0xFFFF. - for hextet in ip_str.split(':'): - if hextet.count('.') == 3: - # If we have an IPv4 mapped address, the IPv4 portion has to - # be at the end of the IPv6 portion. - if not ip_str.split(':')[-1] == hextet: - return False - try: - IPv4Network(hextet) - except AddressValueError: - return False - else: - try: - # a value error here means that we got a bad hextet, - # something like 0xzzzz - if int(hextet, 16) < 0x0 or int(hextet, 16) > 0xFFFF: - return False - except ValueError: - return False - return True - - def _is_shorthand_ip(self, ip_str=None): - """Determine if the address is shortened. - Args: - ip_str: A string, the IPv6 address. - - Returns: - A boolean, True if the address is shortened. - - """ - if ip_str.count('::') == 1: - return True - if filter(lambda x: len(x) < 4, ip_str.split(':')): - return True - return False + ip_int = self._ip_int_from_string(ip_str) + parts = [] + for i in xrange(self._HEXTET_COUNT): + parts.append('%04x' % (ip_int & 0xFFFF)) + ip_int >>= 16 + parts.reverse() + if isinstance(self, _BaseNet): + return '%s/%d' % (':'.join(parts), self.prefixlen) + return ':'.join(parts) @property def max_prefixlen(self): @@ -1749,13 +1708,9 @@ class _BaseV6(object): IPv4 mapped address. Return None otherwise. """ - hextets = self._explode_shorthand_ip_string().split(':') - if hextets[-3] != 'ffff': - return None - try: - return IPv4Address(int('%s%s' % (hextets[-2], hextets[-1]), 16)) - except AddressValueError: + if (self._ip >> 32) != 0xFFFF: return None + return IPv4Address(self._ip & 0xFFFFFFFF) @property def teredo(self): @@ -1764,14 +1719,13 @@ class _BaseV6(object): Returns: Tuple of the (server, client) IPs or None if the address doesn't appear to be a teredo address (doesn't start with - 2001) + 2001::/32) """ - bits = self._explode_shorthand_ip_string().split(':') - if not bits[0] == '2001': + if (self._ip >> 96) != 0x20010000: return None - return (IPv4Address(int(''.join(bits[2:4]), 16)), - IPv4Address(int(''.join(bits[6:]), 16) ^ 0xFFFFFFFF)) + return (IPv4Address((self._ip >> 64) & 0xFFFFFFFF), + IPv4Address(~self._ip & 0xFFFFFFFF)) @property def sixtofour(self): @@ -1782,10 +1736,9 @@ class _BaseV6(object): address doesn't appear to contain a 6to4 embedded address. """ - bits = self._explode_shorthand_ip_string().split(':') - if not bits[0] == '2002': + if (self._ip >> 112) != 0x2002: return None - return IPv4Address(int(''.join(bits[1:3]), 16)) + return IPv4Address((self._ip >> 80) & 0xFFFFFFFF) class IPv6Address(_BaseV6, _BaseIP): @@ -1810,7 +1763,6 @@ class IPv6Address(_BaseV6, _BaseIP): AddressValueError: If address isn't a valid IPv6 address. """ - _BaseIP.__init__(self, address) _BaseV6.__init__(self, address) # Efficient constructor from integer. @@ -1821,11 +1773,13 @@ class IPv6Address(_BaseV6, _BaseIP): return # Constructing from a packed address - if _compat_has_real_bytes: - if isinstance(address, bytes) and len(address) == 16: - tmp = struct.unpack('!QQ', address) - self._ip = (tmp[0] << 64) | tmp[1] - return + if isinstance(address, Bytes): + try: + hi, lo = struct.unpack('!QQ', address) + except struct.error: + raise AddressValueError(address) # Wrong length. + self._ip = (hi << 64) | lo + return # Assume input argument to be string or any object representation # which converts into a formatted IP string. @@ -1833,9 +1787,6 @@ class IPv6Address(_BaseV6, _BaseIP): if not addr_str: raise AddressValueError('') - if not self._is_valid_ip(addr_str): - raise AddressValueError(addr_str) - self._ip = self._ip_int_from_string(addr_str) @@ -1889,26 +1840,14 @@ class IPv6Network(_BaseV6, _BaseNet): _BaseNet.__init__(self, address) _BaseV6.__init__(self, address) - # Efficient constructor from integer. - if isinstance(address, (int, long)): - self._ip = address - self.ip = IPv6Address(self._ip) + # Constructing from an integer or packed bytes. + if isinstance(address, (int, long, Bytes)): + self.ip = IPv6Address(address) + self._ip = self.ip._ip self._prefixlen = self._max_prefixlen self.netmask = IPv6Address(self._ALL_ONES) - if address < 0 or address > self._ALL_ONES: - raise AddressValueError(address) return - # Constructing from a packed address - if _compat_has_real_bytes: - if isinstance(address, bytes) and len(address) == 16: - tmp = struct.unpack('!QQ', address) - self._ip = (tmp[0] << 64) | tmp[1] - self.ip = IPv6Address(self._ip) - self._prefixlen = self._max_prefixlen - self.netmask = IPv6Address(self._ALL_ONES) - return - # Assume input argument to be string or any object representation # which converts into a formatted IP prefix string. addr = str(address).split('/') @@ -1916,8 +1855,8 @@ class IPv6Network(_BaseV6, _BaseNet): if len(addr) > 2: raise AddressValueError(address) - if not self._is_valid_ip(addr[0]): - raise AddressValueError(addr[0]) + self._ip = self._ip_int_from_string(addr[0]) + self.ip = IPv6Address(self._ip) if len(addr) == 2: if self._is_valid_netmask(addr[1]): @@ -1929,13 +1868,12 @@ class IPv6Network(_BaseV6, _BaseNet): self.netmask = IPv6Address(self._ip_int_from_prefix(self._prefixlen)) - self._ip = self._ip_int_from_string(addr[0]) - self.ip = IPv6Address(self._ip) - if strict: if self.ip != self.network: raise ValueError('%s has host bits set' % self.ip) + if self._prefixlen == (self._max_prefixlen - 1): + self.iterhosts = self.__iter__ def _is_valid_netmask(self, prefixlen): """Verify that the netmask/prefixlen is valid. |