summaryrefslogtreecommitdiff
path: root/netaddr/fbsocket.py
blob: dd23a8ca37035ccb0796b5fc81ae2da92cc57a06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#-----------------------------------------------------------------------------
#   Copyright (c) 2008-2015, David P. D. Moss. All rights reserved.
#
#   Released under the BSD license. See the LICENSE file for details.
#-----------------------------------------------------------------------------
"""Fallback routines for Python's standard library socket module"""

from struct import unpack as _unpack, pack as _pack

from netaddr.compat import _bytes_join, _is_str

AF_INET = 2
AF_INET6 = 10


def inet_ntoa(packed_ip):
    """
    Convert an IP address from 32-bit packed binary format to string format.
    """
    if not _is_str(packed_ip):
        raise TypeError('string type expected, not %s' % str(type(packed_ip)))

    if len(packed_ip) != 4:
        raise ValueError('invalid length of packed IP address string')

    return '%d.%d.%d.%d' % _unpack('4B', packed_ip)


def _compact_ipv6_tokens(tokens):
    new_tokens = []

    positions = []
    start_index = None
    num_tokens = 0

    #   Discover all runs of zeros.
    for idx, token in enumerate(tokens):
        if token == '0':
            if start_index is None:
                start_index = idx
            num_tokens += 1
        else:
            if num_tokens > 1:
                positions.append((num_tokens, start_index))
            start_index = None
            num_tokens = 0

        new_tokens.append(token)

    #   Store any position not saved before loop exit.
    if num_tokens > 1:
        positions.append((num_tokens, start_index))

    #   Replace first longest run with an empty string.
    if len(positions) != 0:
        #   Locate longest, left-most run of zeros.
        positions.sort(key=lambda x: x[1])
        best_position = positions[0]
        for position in positions:
            if position[0] > best_position[0]:
                best_position = position
        #   Replace chosen zero run.
        (length, start_idx) = best_position
        new_tokens = new_tokens[0:start_idx] + [''] + new_tokens[start_idx + length:]

        #   Add start and end blanks so join creates '::'.
        if new_tokens[0] == '':
            new_tokens.insert(0, '')

        if new_tokens[-1] == '':
            new_tokens.append('')

    return new_tokens


def inet_ntop(af, packed_ip):
    """Convert an packed IP address of the given family to string format."""
    if af == AF_INET:
        #   IPv4.
        return inet_ntoa(packed_ip)
    elif af == AF_INET6:
        #   IPv6.
        if len(packed_ip) != 16 or not _is_str(packed_ip):
            raise ValueError('invalid length of packed IP address string')

        tokens = ['%x' % i for i in _unpack('>8H', packed_ip)]

        #   Convert packed address to an integer value.
        words = list(_unpack('>8H', packed_ip))
        int_val = 0
        for i, num in enumerate(reversed(words)):
            word = num
            word = word << 16 * i
            int_val = int_val | word

        if 0xffff < int_val <= 0xffffffff or int_val >> 32 == 0xffff:
            #   IPv4 compatible / mapped IPv6.
            packed_ipv4 = _pack('>2H', *[int(i, 16) for i in tokens[-2:]])
            ipv4_str = inet_ntoa(packed_ipv4)
            tokens = tokens[0:-2] + [ipv4_str]

        return ':'.join(_compact_ipv6_tokens(tokens))
    else:
        raise ValueError('unknown address family %d' % af)


def _inet_pton_af_inet(ip_string):
    """
    Convert an IP address in string format (123.45.67.89) to the 32-bit packed
    binary format used in low-level network functions. Differs from inet_aton
    by only support decimal octets. Using octal or hexadecimal values will
    raise a ValueError exception.
    """
    #TODO: optimise this ... use inet_aton with mods if available ...
    if _is_str(ip_string):
        invalid_addr = ValueError('illegal IP address string %r' % ip_string)
        #   Support for hexadecimal and octal octets.
        tokens = ip_string.split('.')

        #   Pack octets.
        if len(tokens) == 4:
            words = []
            for token in tokens:
                if token.startswith('0x') or (token.startswith('0') and len(token) > 1):
                    raise invalid_addr
                try:
                    octet = int(token)
                except ValueError:
                    raise invalid_addr

                if (octet >> 8) != 0:
                    raise invalid_addr
                words.append(_pack('B', octet))
            return _bytes_join(words)
        else:
            raise invalid_addr

    raise ValueError('argument should be a string, not %s' % type(ip_string))


def inet_pton(af, ip_string):
    """
    Convert an IP address from string format to a packed string suitable for
    use with low-level network functions.
    """
    if af == AF_INET:
        #   IPv4.
        return _inet_pton_af_inet(ip_string)
    elif af == AF_INET6:
        invalid_addr = ValueError('illegal IP address string %r' % ip_string)
        #   IPv6.
        values = []

        if not _is_str(ip_string):
            raise invalid_addr

        if 'x' in ip_string:
            #   Don't accept hextets with the 0x prefix.
            raise invalid_addr

        if '::' in ip_string:
            if ip_string == '::':
                #   Unspecified address.
                return '\x00'.encode() * 16
            #   IPv6 compact mode.
            try:
                prefix, suffix = ip_string.split('::')
            except ValueError:
                raise invalid_addr

            l_prefix = []
            l_suffix = []

            if prefix != '':
                l_prefix = prefix.split(':')

            if suffix != '':
                l_suffix = suffix.split(':')

            #   IPv6 compact IPv4 compatibility mode.
            if len(l_suffix) and '.' in l_suffix[-1]:
                ipv4_str = _inet_pton_af_inet(l_suffix.pop())
                l_suffix.append('%x' % _unpack('>H', ipv4_str[0:2])[0])
                l_suffix.append('%x' % _unpack('>H', ipv4_str[2:4])[0])

            token_count = len(l_prefix) + len(l_suffix)

            if not 0 <= token_count <= 8 - 1:
                raise invalid_addr

            gap_size = 8 - ( len(l_prefix) + len(l_suffix) )

            values = (
                [_pack('>H', int(i, 16)) for i in l_prefix] +
                ['\x00\x00'.encode() for i in range(gap_size)] +
                [_pack('>H', int(i, 16)) for i in l_suffix]
            )
            try:
                for token in l_prefix + l_suffix:
                    word = int(token, 16)
                    if not 0 <= word <= 0xffff:
                        raise invalid_addr
            except ValueError:
                raise invalid_addr
        else:
            #   IPv6 verbose mode.
            if ':' in ip_string:
                tokens = ip_string.split(':')

                if '.' in ip_string:
                    ipv6_prefix = tokens[:-1]
                    if ipv6_prefix[:-1] != ['0', '0', '0', '0', '0']:
                        raise invalid_addr

                    if ipv6_prefix[-1].lower() not in ('0', 'ffff'):
                        raise invalid_addr

                    #   IPv6 verbose IPv4 compatibility mode.
                    if len(tokens) != 7:
                        raise invalid_addr

                    ipv4_str = _inet_pton_af_inet(tokens.pop())
                    tokens.append('%x' % _unpack('>H', ipv4_str[0:2])[0])
                    tokens.append('%x' % _unpack('>H', ipv4_str[2:4])[0])

                    values = [_pack('>H', int(i, 16)) for i in tokens]
                else:
                    #   IPv6 verbose mode.
                    if len(tokens) != 8:
                        raise invalid_addr
                try:
                    tokens = [int(token, 16) for token in tokens]
                    for token in tokens:
                        if not 0 <= token <= 0xffff:
                            raise invalid_addr

                except ValueError:
                    raise invalid_addr

                values = [_pack('>H', i) for i in tokens]
            else:
                raise invalid_addr

        return _bytes_join(values)
    else:
        raise ValueError('Unknown address family %d' % af)