summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-05-29 20:49:32 -0700
committerGitHub <noreply@github.com>2019-05-29 20:49:32 -0700
commit5bb1abd3495ce81a0522b2a66e6c5d2731dae77b (patch)
tree0982d52437b30d281f42d9f15240af8898133e40
parent21b00c30ecc159a5df389fe96287898660f659d2 (diff)
downloadkafka-python-5bb1abd3495ce81a0522b2a66e6c5d2731dae77b.tar.gz
Catch TimeoutError in BrokerConnection send/recv (#1820)
-rw-r--r--kafka/conn.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 044d2d5..825406c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -36,6 +36,7 @@ from kafka.version import __version__
if six.PY2:
ConnectionError = socket.error
+ TimeoutError = socket.error
BlockingIOError = Exception
log = logging.getLogger(__name__)
@@ -498,7 +499,7 @@ class BrokerConnection(object):
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
pass
- except (SSLZeroReturnError, ConnectionError, SSLEOFError):
+ except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
@@ -599,7 +600,7 @@ class BrokerConnection(object):
# The connection is closed on failure
data = self._recv_bytes_blocking(4)
- except ConnectionError as e:
+ except (ConnectionError, TimeoutError) as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
@@ -665,7 +666,7 @@ class BrokerConnection(object):
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)
- except ConnectionError as e:
+ except (ConnectionError, TimeoutError) as e:
self._lock.release()
log.exception("%s: Error receiving reply from server", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
@@ -695,7 +696,7 @@ class BrokerConnection(object):
# The connection is closed on failure
data = self._recv_bytes_blocking(4)
- except ConnectionError as e:
+ except (ConnectionError, TimeoutError) as e:
self._lock.release()
log.exception("%s: Error receiving reply from server", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
@@ -886,7 +887,7 @@ class BrokerConnection(object):
if self._sensors:
self._sensors.bytes_sent.record(total_bytes)
return total_bytes
- except ConnectionError as e:
+ except (ConnectionError, TimeoutError) as e:
log.exception("Error sending request data to %s", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
@@ -954,7 +955,7 @@ class BrokerConnection(object):
except SSLWantReadError:
break
- except ConnectionError as e:
+ except (ConnectionError, TimeoutError) as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
log.exception('%s: Error receiving network data'