summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-15 12:31:10 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-15 12:31:18 -0800
commit7c779b31dbdd6a69741de5d300d52cc52479054f (patch)
treef5f8726c9ecabd960a2c8468093f17c81dfdc517
parentc6b9f84e49675a69ccabc18e8e8fbcbf428a55f2 (diff)
downloadkafka-python-7c779b31dbdd6a69741de5d300d52cc52479054f.tar.gz
Issue 545: Convert deserializer StopIteration errors to raw Exceptions
-rw-r--r--kafka/consumer/fetcher.py9
-rw-r--r--kafka/consumer/group.py2
2 files changed, 9 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index e136ea2..f406a30 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -356,7 +356,14 @@ class Fetcher(six.Iterator):
for record in self._unpack_message_set(tp, msg.decompress()):
yield record
else:
- key, value = self._deserialize(msg)
+ try:
+ key, value = self._deserialize(msg)
+ # If the deserializer raises StopIteration, it is erroneously
+ # caught by the generator. We want all exceptions to be raised
+ # back to the user. See Issue 545
+ except StopIteration as e:
+ log.exception('Deserializer raised StopIteration: %s', e)
+ raise Exception('Deserializer raised StopIteration')
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
def _message_generator(self):
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index c153063..efd3bcc 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -49,7 +49,7 @@ class KafkaConsumer(six.Iterator):
Default: 'kafka-python-default-group'
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
- value_deserializer (callable, optional): Any callable that takes a
+ value_deserializer (callable): Any callable that takes a
raw message value and returns a deserialized value.
fetch_min_bytes (int): Minimum amount of data the server should
return for a fetch request, otherwise wait up to