summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-01 09:24:53 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-01 09:37:12 -0700
commitebaa5b6eb6680750d242c7abf880b6d6eb4dabad (patch)
tree75b490148a687b3e9d0477aa9586a79d756ab7f8
parent3a971ea95e43341d105a5a7def6c0cb383be9e62 (diff)
downloadkafka-python-decode_errors.tar.gz
Catch response decode errors and log detailsdecode_errors
-rw-r--r--kafka/conn.py15
1 files changed, 14 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index cf5dce3..c5d3be1 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -507,7 +507,20 @@ class BrokerConnection(object):
return None
# decode response
- response = ifr.response_type.decode(read_buffer)
+ try:
+ response = ifr.response_type.decode(read_buffer)
+ except ValueError:
+ read_buffer.seek(0)
+ buf = read_buffer.read()
+ log.error('%s Response %d [ResponseType: %s Request: %s]:'
+ ' Unable to decode %d-byte buffer: %r', self,
+ ifr.correlation_id, ifr.response_type,
+ ifr.request, len(buf), buf)
+ ifr.future.failure(Errors.UnknownError('Unable to decode response'))
+ self.close()
+ self._processing = False
+ return None
+
log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
ifr.future.success(response)
self._processing = False