diff options
author | James Brown <jbrown@easypost.com> | 2016-04-11 16:52:35 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-24 16:20:22 -0700 |
commit | 434802dfa17f8c9d6b17b02d12bf0f7bee6240cd (patch) | |
tree | 9eced0688e60bb9f16f28aa6a99676be2bc4e54d | |
parent | a12be0af80a1c0903eb92566e75a63bcec988806 (diff) | |
download | kafka-python-434802dfa17f8c9d6b17b02d12bf0f7bee6240cd.tar.gz |
More thorough IPv6 support that uses getaddrinfo to resolve names
Fixes #641
-rw-r--r-- | kafka/conn.py | 123 | ||||
-rw-r--r-- | test/fixtures.py | 18 | ||||
-rw-r--r-- | test/test_client.py | 9 | ||||
-rw-r--r-- | test/test_client_async.py | 14 | ||||
-rw-r--r-- | test/test_conn_legacy.py | 16 |
5 files changed, 137 insertions, 43 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 084450b..3571e90 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -97,13 +97,47 @@ class BrokerConnection(object): self.last_failure = 0 self._processing = False self._correlation_id = 0 + self._gai = None + self._gai_index = 0 def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: self.close() log.debug('%s: creating new socket', str(self)) - self._sock = socket.socket(self.afi, socket.SOCK_STREAM) + # if self.afi is set to AF_UNSPEC, then we need to do a name + # resolution and try all available address families + if self.afi == socket.AF_UNSPEC: + if self._gai is None: + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + self._gai = socket.getaddrinfo(self.host, self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM) + self._gai_index = 0 + else: + # if self._gai already exists, then we should try the next + # name + self._gai_index += 1 + while True: + if self._gai_index >= len(self._gai): + log.error('Unable to connect to any of the names for {0}:{1}'.format( + self.host, self.port + )) + self.close() + return + afi, _, __, ___, sockaddr = self._gai[self._gai_index] + if afi not in (socket.AF_INET, socket.AF_INET6): + self._gai_index += 1 + continue + break + self.host, self.port = sockaddr[:2] + self._sock = socket.socket(afi, socket.SOCK_STREAM) + else: + self._sock = socket.socket(self.afi, socket.SOCK_STREAM) if self.config['receive_buffer_bytes'] is not None: self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.config['receive_buffer_bytes']) @@ -121,10 +155,16 @@ class BrokerConnection(object): # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status request_timeout = self.config['request_timeout_ms'] / 1000.0 + ret = None try: ret = self._sock.connect_ex((self.host, self.port)) - except socket.error as ret: - pass + # if we got here through a host lookup, we've found a host,port,af tuple + # that works save it so we don't do a GAI lookup again + if self._gai is not None: + self.afi = self._sock.family + self._gai = None + except socket.error as err: + ret = err # Connection succeeded if not ret or ret == errno.EISCONN: @@ -468,6 +508,7 @@ class BrokerConnection(object): # Socket errors are logged as exceptions and can alarm users. Mute them from logging import Filter + class ConnFilter(Filter): def filter(self, record): if record.funcName in ('recv', 'send'): @@ -548,37 +589,71 @@ class BrokerConnection(object): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) -def get_ip_port_afi(host_and_port_str): +def _address_family(address): """ - Parse the IP and port from a string in the format of: + Attempt to determine the family of an address (or hostname) - * host_or_ip <- Can be either IPv4 or IPv6 address or hostname/fqdn - * host_or_ip:port <- This is only for IPv4 - * [host_or_ip]:port. <- This is only for IPv6 + :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family + could not be determined + """ + if address.startswith('[') and address.endswith(']'): + return socket.AF_INET6 + for af in (socket.AF_INET, socket.AF_INET6): + try: + socket.inet_pton(af, address) + return af + except (ValueError, AttributeError, socket.error): + continue + return socket.AF_UNSPEC - .. note:: If the port is not specified, default will be returned. - :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 +def get_ip_port_afi(host_and_port_str): """ - afi = socket.AF_INET + Parse the IP and port from a string in the format of: - if host_and_port_str.strip()[0] == '[': - afi = socket.AF_INET6 - res = host_and_port_str.split("]:") - res[0] = res[0].replace("[", "") - res[0] = res[0].replace("]", "") + * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn + * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn + * [host_or_ip] <- IPv6 address literal + * [host_or_ip]:port. <- IPv6 address literal - elif host_and_port_str.count(":") > 1: - afi = socket.AF_INET6 - res = [host_and_port_str] + .. note:: IPv6 address literals with ports *must* be enclosed in brackets - else: - res = host_and_port_str.split(':') + .. note:: If the port is not specified, default will be returned. - host = res[0] - port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC + """ + host_and_port_str = host_and_port_str.strip() + if host_and_port_str.startswith('['): + af = socket.AF_INET6 + host, rest = host_and_port_str[1:].split(']') + if rest: + port = int(rest[1:]) + else: + port = DEFAULT_KAFKA_PORT + return host, port, af + else: + if ':' not in host_and_port_str: + af = _address_family(host_and_port_str) + return host_and_port_str, DEFAULT_KAFKA_PORT, af + else: + # now we have something with a colon in it and no square brackets. It could be + # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair + try: + # if it decodes as an IPv6 address, use that + socket.inet_pton(socket.AF_INET6, host_and_port_str) + return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6 + except AttributeError: + log.warning('socket.inet_pton not available on this platform.' + ' consider pip install win_inet_pton') + pass + except (ValueError, socket.error): + # it's a host:port pair + pass + host, port = host_and_port_str.rsplit(':', 1) + port = int(port) - return host.strip(), port, afi + af = _address_family(host) + return host, port, af def collect_hosts(hosts, randomize=True): diff --git a/test/fixtures.py b/test/fixtures.py index 826d037..654e636 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -9,7 +9,7 @@ import time import uuid from six.moves import urllib -from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from test.service import ExternalService, SpawnedService from test.testutil import get_open_port @@ -193,7 +193,21 @@ class KafkaFixture(Fixture): else: if port is None: port = get_open_port() - host = "127.0.0.1" + # force IPv6 here because of a confusing point: + # + # - if the string "localhost" is passed, Kafka will *only* bind to the IPv4 address of localhost + # (127.0.0.1); however, kafka-python will attempt to connect on ::1 and fail + # + # - if the address literal 127.0.0.1 is passed, the metadata request during bootstrap will return + # the name "localhost" and we'll go back to the first case. This is odd! + # + # Ideally, Kafka would bind to all loopback addresses when we tell it to listen on "localhost" the + # way it makes an IPv6 socket bound to both 0.0.0.0/0 and ::/0 when we tell it to bind to "" (that is + # to say, when we make a listener of PLAINTEXT://:port. + # + # Note that even though we specify the bind host in bracket notation, Kafka responds to the bootstrap + # metadata request without square brackets later. + host = "[::1]" fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, transport=transport, diff --git a/test/test_client.py b/test/test_client.py index 38235fd..4b5a3a8 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -38,7 +38,8 @@ class TestSimpleClient(unittest.TestCase): client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) self.assertEqual( - sorted([('kafka01', 9092, socket.AF_INET), ('kafka02', 9092, socket.AF_INET), ('kafka03', 9092, socket.AF_INET)]), + sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), + ('kafka03', 9092, socket.AF_UNSPEC)]), sorted(client.hosts)) def test_init_with_csv(self): @@ -46,7 +47,8 @@ class TestSimpleClient(unittest.TestCase): client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092') self.assertEqual( - sorted([('kafka01', 9092, socket.AF_INET), ('kafka02', 9092, socket.AF_INET), ('kafka03', 9092, socket.AF_INET)]), + sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), + ('kafka03', 9092, socket.AF_UNSPEC)]), sorted(client.hosts)) def test_init_with_unicode_csv(self): @@ -54,7 +56,8 @@ class TestSimpleClient(unittest.TestCase): client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') self.assertEqual( - sorted([('kafka01', 9092, socket.AF_INET), ('kafka02', 9092, socket.AF_INET), ('kafka03', 9092, socket.AF_INET)]), + sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), + ('kafka03', 9092, socket.AF_UNSPEC)]), sorted(client.hosts)) @patch.object(SimpleClient, '_get_conn') diff --git a/test/test_client_async.py b/test/test_client_async.py index 922e43c..605ef1a 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -20,11 +20,11 @@ from kafka.structs import BrokerMetadata @pytest.mark.parametrize("bootstrap,expected_hosts", [ - (None, [('localhost', 9092, socket.AF_INET)]), - ('foobar:1234', [('foobar', 1234, socket.AF_INET)]), - ('fizzbuzz', [('fizzbuzz', 9092, socket.AF_INET)]), - ('foo:12,bar:34', [('foo', 12, socket.AF_INET), ('bar', 34, socket.AF_INET)]), - (['fizz:56', 'buzz'], [('fizz', 56, socket.AF_INET), ('buzz', 9092, socket.AF_INET)]), + (None, [('localhost', 9092, socket.AF_UNSPEC)]), + ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]), + ('fizzbuzz', [('fizzbuzz', 9092, socket.AF_UNSPEC)]), + ('foo:12,bar:34', [('foo', 12, socket.AF_UNSPEC), ('bar', 34, socket.AF_UNSPEC)]), + (['fizz:56', 'buzz'], [('fizz', 56, socket.AF_UNSPEC), ('buzz', 9092, socket.AF_UNSPEC)]), ]) def test_bootstrap_servers(mocker, bootstrap, expected_hosts): mocker.patch.object(KafkaClient, '_bootstrap') @@ -42,7 +42,7 @@ def test_bootstrap_success(conn): conn.state = ConnectionStates.CONNECTED cli = KafkaClient() args, kwargs = conn.call_args - assert args == ('localhost', 9092, socket.AF_INET) + assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') assert kwargs == cli.config conn.connect.assert_called_with() @@ -55,7 +55,7 @@ def test_bootstrap_failure(conn): conn.state = ConnectionStates.DISCONNECTED cli = KafkaClient() args, kwargs = conn.call_args - assert args == ('localhost', 9092, socket.AF_INET) + assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') assert kwargs == cli.config conn.connect.assert_called_with() diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py index 347588e..820c4e7 100644 --- a/test/test_conn_legacy.py +++ b/test/test_conn_legacy.py @@ -48,12 +48,12 @@ class ConnTest(unittest.TestCase): self.MockCreateConn.reset_mock() def test_collect_hosts__happy_path(self): - hosts = "localhost:1234,localhost" + hosts = "127.0.0.1:1234,127.0.0.1" results = collect_hosts(hosts) self.assertEqual(set(results), set([ - ('localhost', 1234, socket.AF_INET), - ('localhost', 9092, socket.AF_INET), + ('127.0.0.1', 1234, socket.AF_INET), + ('127.0.0.1', 9092, socket.AF_INET), ])) def test_collect_hosts__ipv6(self): @@ -72,16 +72,18 @@ class ConnTest(unittest.TestCase): 'localhost', '[localhost]', '2001::1', + '[2001::1]', '[2001::1]:1234', ] results = collect_hosts(hosts) self.assertEqual(set(results), set([ - ('localhost', 1234, socket.AF_INET), - ('localhost', 9092, socket.AF_INET), + ('localhost', 1234, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_UNSPEC), ('localhost', 9092, socket.AF_INET6), ('2001::1', 9092, socket.AF_INET6), + ('2001::1', 9092, socket.AF_INET6), ('2001::1', 1234, socket.AF_INET6), ])) @@ -90,8 +92,8 @@ class ConnTest(unittest.TestCase): results = collect_hosts(hosts) self.assertEqual(set(results), set([ - ('localhost', 1234, socket.AF_INET), - ('localhost', 9092, socket.AF_INET), + ('localhost', 1234, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_UNSPEC), ])) |