summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwebber <weguo0022@hotmail.com>2017-08-06 00:40:54 +0800
committerDana Powers <dana.powers@gmail.com>2017-08-05 09:40:54 -0700
commit6b97eaefb23f637bc5095e49fd4ab9ee6755ce6e (patch)
treeaed1f9cd4b011398d2ece6756bbba20e7ad1af40
parent3ff3d75004f94fd55fa089297d3e2376e33ccda7 (diff)
downloadkafka-python-6b97eaefb23f637bc5095e49fd4ab9ee6755ce6e.tar.gz
Fixed Issue 1033.Raise AssertionError when decompression unsupported. (#1159)
-rw-r--r--kafka/consumer/fetcher.py7
1 files changed, 7 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 2782057..8db89a1 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -122,6 +122,7 @@ class Fetcher(six.Iterator):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request)
+ future.error_on_callbacks=True
future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
@@ -550,6 +551,12 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
+ # If unpacking raises AssertionError, it means decompression unsupported
+ # See Issue 1033
+ except AssertionError as e:
+ log.exception('AssertionError raised unpacking messageset: %s', e)
+ raise
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self