summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py123
1 files changed, 99 insertions, 24 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 084450b..3571e90 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -97,13 +97,47 @@ class BrokerConnection(object):
self.last_failure = 0
self._processing = False
self._correlation_id = 0
+ self._gai = None
+ self._gai_index = 0
def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
self.close()
log.debug('%s: creating new socket', str(self))
- self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
+ # if self.afi is set to AF_UNSPEC, then we need to do a name
+ # resolution and try all available address families
+ if self.afi == socket.AF_UNSPEC:
+ if self._gai is None:
+ # XXX: all DNS functions in Python are blocking. If we really
+ # want to be non-blocking here, we need to use a 3rd-party
+ # library like python-adns, or move resolution onto its
+ # own thread. This will be subject to the default libc
+ # name resolution timeout (5s on most Linux boxes)
+ self._gai = socket.getaddrinfo(self.host, self.port,
+ socket.AF_UNSPEC,
+ socket.SOCK_STREAM)
+ self._gai_index = 0
+ else:
+ # if self._gai already exists, then we should try the next
+ # name
+ self._gai_index += 1
+ while True:
+ if self._gai_index >= len(self._gai):
+ log.error('Unable to connect to any of the names for {0}:{1}'.format(
+ self.host, self.port
+ ))
+ self.close()
+ return
+ afi, _, __, ___, sockaddr = self._gai[self._gai_index]
+ if afi not in (socket.AF_INET, socket.AF_INET6):
+ self._gai_index += 1
+ continue
+ break
+ self.host, self.port = sockaddr[:2]
+ self._sock = socket.socket(afi, socket.SOCK_STREAM)
+ else:
+ self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
if self.config['receive_buffer_bytes'] is not None:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.config['receive_buffer_bytes'])
@@ -121,10 +155,16 @@ class BrokerConnection(object):
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
request_timeout = self.config['request_timeout_ms'] / 1000.0
+ ret = None
try:
ret = self._sock.connect_ex((self.host, self.port))
- except socket.error as ret:
- pass
+ # if we got here through a host lookup, we've found a host,port,af tuple
+ # that works save it so we don't do a GAI lookup again
+ if self._gai is not None:
+ self.afi = self._sock.family
+ self._gai = None
+ except socket.error as err:
+ ret = err
# Connection succeeded
if not ret or ret == errno.EISCONN:
@@ -468,6 +508,7 @@ class BrokerConnection(object):
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
+
class ConnFilter(Filter):
def filter(self, record):
if record.funcName in ('recv', 'send'):
@@ -548,37 +589,71 @@ class BrokerConnection(object):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
-def get_ip_port_afi(host_and_port_str):
+def _address_family(address):
"""
- Parse the IP and port from a string in the format of:
+ Attempt to determine the family of an address (or hostname)
- * host_or_ip <- Can be either IPv4 or IPv6 address or hostname/fqdn
- * host_or_ip:port <- This is only for IPv4
- * [host_or_ip]:port. <- This is only for IPv6
+ :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family
+ could not be determined
+ """
+ if address.startswith('[') and address.endswith(']'):
+ return socket.AF_INET6
+ for af in (socket.AF_INET, socket.AF_INET6):
+ try:
+ socket.inet_pton(af, address)
+ return af
+ except (ValueError, AttributeError, socket.error):
+ continue
+ return socket.AF_UNSPEC
- .. note:: If the port is not specified, default will be returned.
- :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6
+def get_ip_port_afi(host_and_port_str):
"""
- afi = socket.AF_INET
+ Parse the IP and port from a string in the format of:
- if host_and_port_str.strip()[0] == '[':
- afi = socket.AF_INET6
- res = host_and_port_str.split("]:")
- res[0] = res[0].replace("[", "")
- res[0] = res[0].replace("]", "")
+ * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn
+ * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn
+ * [host_or_ip] <- IPv6 address literal
+ * [host_or_ip]:port. <- IPv6 address literal
- elif host_and_port_str.count(":") > 1:
- afi = socket.AF_INET6
- res = [host_and_port_str]
+ .. note:: IPv6 address literals with ports *must* be enclosed in brackets
- else:
- res = host_and_port_str.split(':')
+ .. note:: If the port is not specified, default will be returned.
- host = res[0]
- port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
+ :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC
+ """
+ host_and_port_str = host_and_port_str.strip()
+ if host_and_port_str.startswith('['):
+ af = socket.AF_INET6
+ host, rest = host_and_port_str[1:].split(']')
+ if rest:
+ port = int(rest[1:])
+ else:
+ port = DEFAULT_KAFKA_PORT
+ return host, port, af
+ else:
+ if ':' not in host_and_port_str:
+ af = _address_family(host_and_port_str)
+ return host_and_port_str, DEFAULT_KAFKA_PORT, af
+ else:
+ # now we have something with a colon in it and no square brackets. It could be
+ # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair
+ try:
+ # if it decodes as an IPv6 address, use that
+ socket.inet_pton(socket.AF_INET6, host_and_port_str)
+ return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6
+ except AttributeError:
+ log.warning('socket.inet_pton not available on this platform.'
+ ' consider pip install win_inet_pton')
+ pass
+ except (ValueError, socket.error):
+ # it's a host:port pair
+ pass
+ host, port = host_and_port_str.rsplit(':', 1)
+ port = int(port)
- return host.strip(), port, afi
+ af = _address_family(host)
+ return host, port, af
def collect_hosts(hosts, randomize=True):