summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-08 14:36:36 -0800
committerGitHub <noreply@github.com>2017-12-08 14:36:36 -0800
commit92376cbe8004d2ae6e468a70bc268e420531e72e (patch)
treea550110f1408182be2ad91f54f39aaba8a9f8710
parent2c8748ccfd4feaa16206899599663ff3aac03c6a (diff)
downloadkafka-python-92376cbe8004d2ae6e468a70bc268e420531e72e.tar.gz
Refactor dns lookup in BrokerConnection (#1312)
-rw-r--r--kafka/conn.py110
-rw-r--r--test/test_conn.py25
2 files changed, 74 insertions, 61 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index e20210a..2926e2f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -251,67 +251,42 @@ class BrokerConnection(object):
self._sasl_auth_future = None
self.last_attempt = 0
self._gai = None
- self._gai_index = 0
self._sensors = None
if self.config['metrics']:
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
self.node_id)
+ def _next_afi_host_port(self):
+ if not self._gai:
+ self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
+ if not self._gai:
+ log.error('DNS lookup failed for %s:%i (%s)',
+ self._init_host, self._init_port, self._init_afi)
+ return
+
+ afi, _, __, ___, sockaddr = self._gai.pop(0)
+ host, port = sockaddr[:2]
+ return (afi, host, port)
+
def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
- log.debug('%s: creating new socket', self)
- # if self.afi is set to AF_UNSPEC, then we need to do a name
- # resolution and try all available address families
- if self._init_afi == socket.AF_UNSPEC:
- if self._gai is None:
- # XXX: all DNS functions in Python are blocking. If we really
- # want to be non-blocking here, we need to use a 3rd-party
- # library like python-adns, or move resolution onto its
- # own thread. This will be subject to the default libc
- # name resolution timeout (5s on most Linux boxes)
- try:
- self._gai = socket.getaddrinfo(self._init_host,
- self._init_port,
- socket.AF_UNSPEC,
- socket.SOCK_STREAM)
- except socket.gaierror as ex:
- log.warning('DNS lookup failed for %s:%d,'
- ' exception was %s. Is your'
- ' advertised.listeners (called'
- ' advertised.host.name before Kafka 9)'
- ' correct and resolvable?',
- self._init_host, self._init_port, ex)
- self._gai = []
- self._gai_index = 0
- else:
- # if self._gai already exists, then we should try the next
- # name
- self._gai_index += 1
- while True:
- if self._gai_index >= len(self._gai):
- error = 'Unable to connect to any of the names for {0}:{1}'.format(
- self._init_host, self._init_port)
- log.error(error)
- self.close(Errors.ConnectionError(error))
- return
- afi, _, __, ___, sockaddr = self._gai[self._gai_index]
- if afi not in (socket.AF_INET, socket.AF_INET6):
- self._gai_index += 1
- continue
- break
- self.host, self.port = sockaddr[:2]
- self._sock = socket.socket(afi, socket.SOCK_STREAM)
+ self.last_attempt = time.time()
+ next_lookup = self._next_afi_host_port()
+ if not next_lookup:
+ self.close(Errors.ConnectionError('DNS failure'))
+ return
else:
- self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM)
+ log.debug('%s: creating new socket', self)
+ self.afi, self.host, self.port = next_lookup
+ self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
self._sock.setsockopt(*option)
self._sock.setblocking(False)
- self.last_attempt = time.time()
self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
@@ -328,11 +303,6 @@ class BrokerConnection(object):
ret = None
try:
ret = self._sock.connect_ex((self.host, self.port))
- # if we got here through a host lookup, we've found a host,port,af tuple
- # that works save it so we don't do a GAI lookup again
- if self._gai is not None:
- self.afi = self._sock.family
- self._gai = None
except socket.error as err:
ret = err.errno
@@ -645,23 +615,15 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
- if self.state is ConnectionStates.DISCONNECTED:
- if error is not None:
- if sys.version_info >= (3, 2):
- log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True)
- else:
- log.warning('%s: close() called on disconnected connection with error: %s', self, error)
- return
-
log.info('%s: Closing connection. %s', self, error or '')
- self.state = ConnectionStates.DISCONNECTING
- self.config['state_change_callback'](self)
+ if self.state is not ConnectionStates.DISCONNECTED:
+ self.state = ConnectionStates.DISCONNECTING
+ self.config['state_change_callback'](self)
self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None
self.state = ConnectionStates.DISCONNECTED
- self.last_attempt = time.time()
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
@@ -1170,3 +1132,29 @@ def collect_hosts(hosts, randomize=True):
shuffle(result)
return result
+
+
+def is_inet_4_or_6(gai):
+ """Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
+ return gai[0] in (socket.AF_INET, socket.AF_INET6)
+
+
+def dns_lookup(host, port, afi=socket.AF_UNSPEC):
+ """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
+ # XXX: all DNS functions in Python are blocking. If we really
+ # want to be non-blocking here, we need to use a 3rd-party
+ # library like python-adns, or move resolution onto its
+ # own thread. This will be subject to the default libc
+ # name resolution timeout (5s on most Linux boxes)
+ try:
+ return list(filter(is_inet_4_or_6,
+ socket.getaddrinfo(host, port, afi,
+ socket.SOCK_STREAM)))
+ except socket.gaierror as ex:
+ log.warning('DNS lookup failed for %s:%d,'
+ ' exception was %s. Is your'
+ ' advertised.listeners (called'
+ ' advertised.host.name before Kafka 9)'
+ ' correct and resolvable?',
+ host, port, ex)
+ return []
diff --git a/test/test_conn.py b/test/test_conn.py
index 1621e60..ef7925a 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -267,3 +267,28 @@ def test_lookup_on_connect():
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn.host == ip2
+
+
+def test_relookup_on_failure():
+ hostname = 'example.org'
+ port = 9092
+ conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
+ assert conn.host == conn.hostname == hostname
+ mock_return1 = []
+ with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
+ last_attempt = conn.last_attempt
+ conn.connect()
+ m.assert_called_once_with(hostname, port, 0, 1)
+ assert conn.disconnected()
+ assert conn.last_attempt > last_attempt
+
+ ip2 = '127.0.0.2'
+ mock_return2 = [
+ (2, 2, 17, '', (ip2, 9092)),
+ ]
+
+ with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
+ conn.connect()
+ m.assert_called_once_with(hostname, port, 0, 1)
+ conn.close()
+ assert conn.host == ip2