diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 123 |
1 files changed, 99 insertions, 24 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 084450b..3571e90 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -97,13 +97,47 @@ class BrokerConnection(object): self.last_failure = 0 self._processing = False self._correlation_id = 0 + self._gai = None + self._gai_index = 0 def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: self.close() log.debug('%s: creating new socket', str(self)) - self._sock = socket.socket(self.afi, socket.SOCK_STREAM) + # if self.afi is set to AF_UNSPEC, then we need to do a name + # resolution and try all available address families + if self.afi == socket.AF_UNSPEC: + if self._gai is None: + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + self._gai = socket.getaddrinfo(self.host, self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM) + self._gai_index = 0 + else: + # if self._gai already exists, then we should try the next + # name + self._gai_index += 1 + while True: + if self._gai_index >= len(self._gai): + log.error('Unable to connect to any of the names for {0}:{1}'.format( + self.host, self.port + )) + self.close() + return + afi, _, __, ___, sockaddr = self._gai[self._gai_index] + if afi not in (socket.AF_INET, socket.AF_INET6): + self._gai_index += 1 + continue + break + self.host, self.port = sockaddr[:2] + self._sock = socket.socket(afi, socket.SOCK_STREAM) + else: + self._sock = socket.socket(self.afi, socket.SOCK_STREAM) if self.config['receive_buffer_bytes'] is not None: self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.config['receive_buffer_bytes']) @@ -121,10 +155,16 @@ class BrokerConnection(object): # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status request_timeout = self.config['request_timeout_ms'] / 1000.0 + ret = None try: ret = self._sock.connect_ex((self.host, self.port)) - except socket.error as ret: - pass + # if we got here through a host lookup, we've found a host,port,af tuple + # that works save it so we don't do a GAI lookup again + if self._gai is not None: + self.afi = self._sock.family + self._gai = None + except socket.error as err: + ret = err # Connection succeeded if not ret or ret == errno.EISCONN: @@ -468,6 +508,7 @@ class BrokerConnection(object): # Socket errors are logged as exceptions and can alarm users. Mute them from logging import Filter + class ConnFilter(Filter): def filter(self, record): if record.funcName in ('recv', 'send'): @@ -548,37 +589,71 @@ class BrokerConnection(object): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) -def get_ip_port_afi(host_and_port_str): +def _address_family(address): """ - Parse the IP and port from a string in the format of: + Attempt to determine the family of an address (or hostname) - * host_or_ip <- Can be either IPv4 or IPv6 address or hostname/fqdn - * host_or_ip:port <- This is only for IPv4 - * [host_or_ip]:port. <- This is only for IPv6 + :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family + could not be determined + """ + if address.startswith('[') and address.endswith(']'): + return socket.AF_INET6 + for af in (socket.AF_INET, socket.AF_INET6): + try: + socket.inet_pton(af, address) + return af + except (ValueError, AttributeError, socket.error): + continue + return socket.AF_UNSPEC - .. note:: If the port is not specified, default will be returned. - :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 +def get_ip_port_afi(host_and_port_str): """ - afi = socket.AF_INET + Parse the IP and port from a string in the format of: - if host_and_port_str.strip()[0] == '[': - afi = socket.AF_INET6 - res = host_and_port_str.split("]:") - res[0] = res[0].replace("[", "") - res[0] = res[0].replace("]", "") + * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn + * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn + * [host_or_ip] <- IPv6 address literal + * [host_or_ip]:port. <- IPv6 address literal - elif host_and_port_str.count(":") > 1: - afi = socket.AF_INET6 - res = [host_and_port_str] + .. note:: IPv6 address literals with ports *must* be enclosed in brackets - else: - res = host_and_port_str.split(':') + .. note:: If the port is not specified, default will be returned. - host = res[0] - port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC + """ + host_and_port_str = host_and_port_str.strip() + if host_and_port_str.startswith('['): + af = socket.AF_INET6 + host, rest = host_and_port_str[1:].split(']') + if rest: + port = int(rest[1:]) + else: + port = DEFAULT_KAFKA_PORT + return host, port, af + else: + if ':' not in host_and_port_str: + af = _address_family(host_and_port_str) + return host_and_port_str, DEFAULT_KAFKA_PORT, af + else: + # now we have something with a colon in it and no square brackets. It could be + # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair + try: + # if it decodes as an IPv6 address, use that + socket.inet_pton(socket.AF_INET6, host_and_port_str) + return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6 + except AttributeError: + log.warning('socket.inet_pton not available on this platform.' + ' consider pip install win_inet_pton') + pass + except (ValueError, socket.error): + # it's a host:port pair + pass + host, port = host_and_port_str.rsplit(':', 1) + port = int(port) - return host.strip(), port, afi + af = _address_family(host) + return host, port, af def collect_hosts(hosts, randomize=True): |