diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-15 12:31:10 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-15 12:31:18 -0800 |
commit | 7c779b31dbdd6a69741de5d300d52cc52479054f (patch) | |
tree | f5f8726c9ecabd960a2c8468093f17c81dfdc517 | |
parent | c6b9f84e49675a69ccabc18e8e8fbcbf428a55f2 (diff) | |
download | kafka-python-7c779b31dbdd6a69741de5d300d52cc52479054f.tar.gz |
Issue 545: Convert deserializer StopIteration errors to raw Exceptions
-rw-r--r-- | kafka/consumer/fetcher.py | 9 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 |
2 files changed, 9 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e136ea2..f406a30 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -356,7 +356,14 @@ class Fetcher(six.Iterator): for record in self._unpack_message_set(tp, msg.decompress()): yield record else: - key, value = self._deserialize(msg) + try: + key, value = self._deserialize(msg) + # If the deserializer raises StopIteration, it is erroneously + # caught by the generator. We want all exceptions to be raised + # back to the user. See Issue 545 + except StopIteration as e: + log.exception('Deserializer raised StopIteration: %s', e) + raise Exception('Deserializer raised StopIteration') yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) def _message_generator(self): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index c153063..efd3bcc 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -49,7 +49,7 @@ class KafkaConsumer(six.Iterator): Default: 'kafka-python-default-group' key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. - value_deserializer (callable, optional): Any callable that takes a + value_deserializer (callable): Any callable that takes a raw message value and returns a deserialized value. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to |