summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-12-17 10:37:28 -0800
committerDana Powers <dana.powers@gmail.com>2017-01-19 11:10:04 -0800
commitd0f4abe05d02458ad8a4a19e75d1ec86fb67ab3e (patch)
tree3d124cc5b2d4c3a668404f57ffd76846cea38e06
parent7df120d7ce4f3cb429cdfb7b246d41d744b30e6c (diff)
downloadkafka-python-d0f4abe05d02458ad8a4a19e75d1ec86fb67ab3e.tar.gz
Pass error to BrokerConnection.close()
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py46
2 files changed, 28 insertions, 20 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index bb96578..6179eba 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -557,7 +557,7 @@ class KafkaClient(object):
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
pass
- conn.close()
+ conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
# Accumulate as many responses as the connection has pending
diff --git a/kafka/conn.py b/kafka/conn.py
index cbecfa7..50dc4d9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -9,6 +9,7 @@ from random import shuffle
import socket
import ssl
import time
+import traceback
from kafka.vendor import six
@@ -236,10 +237,10 @@ class BrokerConnection(object):
self._gai_index += 1
while True:
if self._gai_index >= len(self._gai):
- log.error('Unable to connect to any of the names for {0}:{1}'.format(
- self._init_host, self._init_port
- ))
- self.close()
+ error = 'Unable to connect to any of the names for {0}:{1}'.format(
+ self._init_host, self._init_port)
+ log.error(error)
+ self.close(Errors.ConnectionError(error))
return
afi, _, __, ___, sockaddr = self._gai[self._gai_index]
if afi not in (socket.AF_INET, socket.AF_INET6):
@@ -293,12 +294,12 @@ class BrokerConnection(object):
elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret)
- self.close()
+ self.close(Errors.ConnectionError(ret))
# Connection timed out
elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
- self.close() # error=TimeoutError ?
+ self.close(Errors.ConnectionError('timeout'))
# Needs retry
else:
@@ -345,9 +346,9 @@ class BrokerConnection(object):
password=self.config['ssl_password'])
if self.config['ssl_crlfile']:
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
- log.error('%s: No CRL support with this version of Python.'
- ' Disconnecting.', self)
- self.close()
+ error = 'No CRL support with this version of Python.'
+ log.error('%s: %s Disconnecting.', self, error)
+ self.close(Errors.ConnectionError(error))
return
log.info('%s: Loading SSL CRL from %s', str(self), self.config['ssl_crlfile'])
self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
@@ -359,9 +360,9 @@ class BrokerConnection(object):
self._sock,
server_hostname=self.hostname,
do_handshake_on_connect=False)
- except ssl.SSLError:
+ except ssl.SSLError as e:
log.exception('%s: Failed to wrap socket in SSLContext!', str(self))
- self.close()
+ self.close(e)
self.last_failure = time.time()
def _try_handshake(self):
@@ -374,7 +375,7 @@ class BrokerConnection(object):
pass
except ssl.SSLZeroReturnError:
log.warning('SSL connection closed by server during handshake.')
- self.close()
+ self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
return False
@@ -482,9 +483,15 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
- if self.state is not ConnectionStates.DISCONNECTED:
- self.state = ConnectionStates.DISCONNECTING
- self.config['state_change_callback'](self)
+ if self.state is ConnectionStates.DISCONNECTED:
+ if error is not None:
+ log.warning('%s: close() called on disconnected connection with error: %s', self, error)
+ traceback.print_stack()
+ return
+
+ log.info('%s: Closing connection. %s', self, error or '')
+ self.state = ConnectionStates.DISCONNECTING
+ self.config['state_change_callback'](self)
if self._sock:
self._sock.close()
self._sock = None
@@ -572,7 +579,7 @@ class BrokerConnection(object):
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
- self.close()
+ self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
return None
elif not self.in_flight_requests:
@@ -699,7 +706,7 @@ class BrokerConnection(object):
'%s: Correlation IDs do not match: sent %d, recv %d'
% (str(self), ifr.correlation_id, recv_correlation_id))
ifr.future.failure(error)
- self.close()
+ self.close(error)
self._processing = False
return None
@@ -713,8 +720,9 @@ class BrokerConnection(object):
' Unable to decode %d-byte buffer: %r', self,
ifr.correlation_id, ifr.response_type,
ifr.request, len(buf), buf)
- ifr.future.failure(Errors.UnknownError('Unable to decode response'))
- self.close()
+ error = Errors.UnknownError('Unable to decode response')
+ ifr.future.failure(error)
+ self.close(error)
self._processing = False
return None