summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-11 11:41:51 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:54 -0800
commit82b3e011fad44c92188ce7645738dea691fa5849 (patch)
treeac40e806d95dbb83957df5af2dc3f9d64367bae5 /kafka
parentfa6738b8f49735df1812e2e9068e227fcaca961d (diff)
downloadkafka-python-82b3e011fad44c92188ce7645738dea691fa5849.tar.gz
Handle FailedPayloadsError on client.send_fetch_request; permit offsets(); update docstring
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/new.py51
1 files changed, 35 insertions, 16 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 5790a31..5ef5921 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -8,7 +8,8 @@ from kafka.client import KafkaClient
from kafka.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout
+ OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
+ FailedPayloadsError, KafkaUnavailableError
)
logger = logging.getLogger(__name__)
@@ -90,11 +91,6 @@ class KafkaConsumer(object):
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
- Message fetching proceeds in batches, with each topic/partition
- queried for a maximum of `fetch_message_max_bytes` data
- After consuming all messages in a batch, StopIteration is raised
- Iterating again after StopIteration will trigger another batch to be fetched
"""
DEFAULT_CONSUMER_CONFIG = {
@@ -237,8 +233,16 @@ class KafkaConsumer(object):
if consumer_timeout and time.time() > consumer_timeout:
raise ConsumerTimeout('Consumer timed out waiting to fetch messages')
- def offsets(self, group):
- return dict(deepcopy(getattr(self._offsets, group)))
+ def offsets(self, group=None):
+ if not group:
+ return {
+ 'fetch': self.offsets('fetch'),
+ 'commit': self.offsets('commit'),
+ 'task_done': self.offsets('task_done'),
+ 'highwater': self.offsets('highwater')
+ }
+ else:
+ return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message):
topic = message.topic
@@ -340,10 +344,15 @@ class KafkaConsumer(object):
# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker
- responses = self.client.send_fetch_request(fetches,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- fail_on_error=False)
+ try:
+ responses = self.client.send_fetch_request(fetches,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ fail_on_error=False)
+ except FailedPayloadsError:
+ logger.warning('FailedPayloadsError attempting to fetch data from kafka')
+ self._refresh_metadata_on_error()
+ return
for resp in responses:
topic = resp.topic
@@ -363,10 +372,7 @@ class KafkaConsumer(object):
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
topic, partition)
- sleep_ms = self._get_config('refresh_leader_backoff_ms')
- logger.warning("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
- time.sleep(sleep_ms / 1000.0)
- self.client.load_metadata_for_topics()
+ self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
@@ -430,3 +436,16 @@ class KafkaConsumer(object):
assert resp.partition == partition
return resp.offsets
+
+ def _refresh_metadata_on_error(self):
+ sleep_ms = self._get_config('refresh_leader_backoff_ms')
+ while True:
+ logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
+ time.sleep(sleep_ms / 1000.0)
+ try:
+ self.client.load_metadata_for_topics()
+ except KafkaUnavailableError:
+ logger.warning("Unable to refresh topic metadata... cluster unavailable")
+ else:
+ logger.info("Topic metadata refreshed")
+ return