summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorreAsOn2010 <the.reason.sake@gmail.com>2015-06-09 04:20:16 +0800
committerreAsOn2010 <the.reason.sake@gmail.com>2015-06-09 04:20:16 +0800
commit945fc048a8cc61e1a9390bd7a7fed371d2e23277 (patch)
tree4d5f584f1d33f89aaa119ad20418b5a91c5ecc55 /kafka
parentb1aad92a2e7dfded5f57ebc497dccc5ad3c56781 (diff)
downloadkafka-python-945fc048a8cc61e1a9390bd7a7fed371d2e23277.tar.gz
try to fix uncaught FailedPayloadsError
Diffstat (limited to 'kafka')
-rw-r--r--kafka/common.py2
-rw-r--r--kafka/consumer/simple.py9
2 files changed, 10 insertions, 1 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 2fdf7d2..66987ff 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -226,6 +226,8 @@ kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
def check_error(response):
+ if isinstance(response, Exception):
+ raise response
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 2c2f820..88eeada 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -19,7 +19,7 @@ from kafka.common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, check_error
+ OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from .base import (
Consumer,
@@ -355,6 +355,13 @@ class SimpleConsumer(Consumer):
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
+ except FailedPayloadsError as e:
+ log.warning("Failed payloads of %s"
+ "Resetting partition offset...",
+ e.payload)
+ # Retry this partition
+ retry_partitions[e.payload.partition] = partitions[e.payload.partition]
+ continue
partition = resp.partition
buffer_size = partitions[partition]