summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-25 18:59:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-25 21:45:18 -0700
commitfa59d4da590e851a137cb0cf4c93f0089cae6890 (patch)
tree7e198a0f5be06d60db76d12d44f1cb81d4a8115f
parent959e57fb7d87860c310946602bcd802c8c21bf14 (diff)
downloadkafka-python-fa59d4da590e851a137cb0cf4c93f0089cae6890.tar.gz
Improve socket disconnect handlingdisconnects
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py26
2 files changed, 23 insertions, 5 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 7fe0272..7719426 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -225,7 +225,7 @@ class KafkaClient(object):
except KeyError:
pass
if self._refresh_on_disconnects:
- log.warning("Node %s connect failed -- refreshing metadata", node_id)
+ log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
def _maybe_connect(self, node_id):
diff --git a/kafka/conn.py b/kafka/conn.py
index 3571e90..b5c7ba0 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -381,9 +381,17 @@ class BrokerConnection(object):
# Not receiving is the state of reading the payload header
if not self._receiving:
try:
- # An extremely small, but non-zero, probability that there are
- # more than 0 but not yet 4 bytes available to read
- self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
+ bytes_to_read = 4 - self._rbuffer.tell()
+ data = self._sock.recv(bytes_to_read)
+ # We expect socket.recv to raise an exception if there is not
+ # enough data to read the full bytes_to_read
+ # but if the socket is disconnected, we will get empty data
+ # without an exception raised
+ if not data:
+ log.error('%s: socket disconnected', self)
+ self.close(error=Errors.ConnectionError('socket disconnected'))
+ return None
+ self._rbuffer.write(data)
except ssl.SSLWantReadError:
return None
except ConnectionError as e:
@@ -411,7 +419,17 @@ class BrokerConnection(object):
if self._receiving:
staged_bytes = self._rbuffer.tell()
try:
- self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
+ bytes_to_read = self._next_payload_bytes - staged_bytes
+ data = self._sock.recv(bytes_to_read)
+ # We expect socket.recv to raise an exception if there is not
+ # enough data to read the full bytes_to_read
+ # but if the socket is disconnected, we will get empty data
+ # without an exception raised
+ if not data:
+ log.error('%s: socket disconnected', self)
+ self.close(error=Errors.ConnectionError('socket disconnected'))
+ return None
+ self._rbuffer.write(data)
except ssl.SSLWantReadError:
return None
except ConnectionError as e: