From 92376cbe8004d2ae6e468a70bc268e420531e72e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 8 Dec 2017 14:36:36 -0800 Subject: Refactor dns lookup in BrokerConnection (#1312) --- kafka/conn.py | 110 ++++++++++++++++++++++++------------------------------ test/test_conn.py | 25 +++++++++++++ 2 files changed, 74 insertions(+), 61 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index e20210a..2926e2f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -251,67 +251,42 @@ class BrokerConnection(object): self._sasl_auth_future = None self.last_attempt = 0 self._gai = None - self._gai_index = 0 self._sensors = None if self.config['metrics']: self._sensors = BrokerConnectionMetrics(self.config['metrics'], self.config['metric_group_prefix'], self.node_id) + def _next_afi_host_port(self): + if not self._gai: + self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi) + if not self._gai: + log.error('DNS lookup failed for %s:%i (%s)', + self._init_host, self._init_port, self._init_afi) + return + + afi, _, __, ___, sockaddr = self._gai.pop(0) + host, port = sockaddr[:2] + return (afi, host, port) + def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: - log.debug('%s: creating new socket', self) - # if self.afi is set to AF_UNSPEC, then we need to do a name - # resolution and try all available address families - if self._init_afi == socket.AF_UNSPEC: - if self._gai is None: - # XXX: all DNS functions in Python are blocking. If we really - # want to be non-blocking here, we need to use a 3rd-party - # library like python-adns, or move resolution onto its - # own thread. This will be subject to the default libc - # name resolution timeout (5s on most Linux boxes) - try: - self._gai = socket.getaddrinfo(self._init_host, - self._init_port, - socket.AF_UNSPEC, - socket.SOCK_STREAM) - except socket.gaierror as ex: - log.warning('DNS lookup failed for %s:%d,' - ' exception was %s. Is your' - ' advertised.listeners (called' - ' advertised.host.name before Kafka 9)' - ' correct and resolvable?', - self._init_host, self._init_port, ex) - self._gai = [] - self._gai_index = 0 - else: - # if self._gai already exists, then we should try the next - # name - self._gai_index += 1 - while True: - if self._gai_index >= len(self._gai): - error = 'Unable to connect to any of the names for {0}:{1}'.format( - self._init_host, self._init_port) - log.error(error) - self.close(Errors.ConnectionError(error)) - return - afi, _, __, ___, sockaddr = self._gai[self._gai_index] - if afi not in (socket.AF_INET, socket.AF_INET6): - self._gai_index += 1 - continue - break - self.host, self.port = sockaddr[:2] - self._sock = socket.socket(afi, socket.SOCK_STREAM) + self.last_attempt = time.time() + next_lookup = self._next_afi_host_port() + if not next_lookup: + self.close(Errors.ConnectionError('DNS failure')) + return else: - self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM) + log.debug('%s: creating new socket', self) + self.afi, self.host, self.port = next_lookup + self._sock = socket.socket(self.afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) self._sock.setsockopt(*option) self._sock.setblocking(False) - self.last_attempt = time.time() self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() @@ -328,11 +303,6 @@ class BrokerConnection(object): ret = None try: ret = self._sock.connect_ex((self.host, self.port)) - # if we got here through a host lookup, we've found a host,port,af tuple - # that works save it so we don't do a GAI lookup again - if self._gai is not None: - self.afi = self._sock.family - self._gai = None except socket.error as err: ret = err.errno @@ -645,23 +615,15 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ - if self.state is ConnectionStates.DISCONNECTED: - if error is not None: - if sys.version_info >= (3, 2): - log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True) - else: - log.warning('%s: close() called on disconnected connection with error: %s', self, error) - return - log.info('%s: Closing connection. %s', self, error or '') - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) self._update_reconnect_backoff() if self._sock: self._sock.close() self._sock = None self.state = ConnectionStates.DISCONNECTED - self.last_attempt = time.time() self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -1170,3 +1132,29 @@ def collect_hosts(hosts, randomize=True): shuffle(result) return result + + +def is_inet_4_or_6(gai): + """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" + return gai[0] in (socket.AF_INET, socket.AF_INET6) + + +def dns_lookup(host, port, afi=socket.AF_UNSPEC): + """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + try: + return list(filter(is_inet_4_or_6, + socket.getaddrinfo(host, port, afi, + socket.SOCK_STREAM))) + except socket.gaierror as ex: + log.warning('DNS lookup failed for %s:%d,' + ' exception was %s. Is your' + ' advertised.listeners (called' + ' advertised.host.name before Kafka 9)' + ' correct and resolvable?', + host, port, ex) + return [] diff --git a/test/test_conn.py b/test/test_conn.py index 1621e60..ef7925a 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -267,3 +267,28 @@ def test_lookup_on_connect(): m.assert_called_once_with(hostname, port, 0, 1) conn.close() assert conn.host == ip2 + + +def test_relookup_on_failure(): + hostname = 'example.org' + port = 9092 + conn = BrokerConnection(hostname, port, socket.AF_UNSPEC) + assert conn.host == conn.hostname == hostname + mock_return1 = [] + with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: + last_attempt = conn.last_attempt + conn.connect() + m.assert_called_once_with(hostname, port, 0, 1) + assert conn.disconnected() + assert conn.last_attempt > last_attempt + + ip2 = '127.0.0.2' + mock_return2 = [ + (2, 2, 17, '', (ip2, 9092)), + ] + + with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: + conn.connect() + m.assert_called_once_with(hostname, port, 0, 1) + conn.close() + assert conn.host == ip2 -- cgit v1.2.1