diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-11 11:41:51 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:54 -0800 |
commit | 82b3e011fad44c92188ce7645738dea691fa5849 (patch) | |
tree | ac40e806d95dbb83957df5af2dc3f9d64367bae5 /kafka | |
parent | fa6738b8f49735df1812e2e9068e227fcaca961d (diff) | |
download | kafka-python-82b3e011fad44c92188ce7645738dea691fa5849.tar.gz |
Handle FailedPayloadsError on client.send_fetch_request; permit offsets(); update docstring
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/new.py | 51 |
1 files changed, 35 insertions, 16 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index 5790a31..5ef5921 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -8,7 +8,8 @@ from kafka.client import KafkaClient from kafka.common import ( OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, - OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout + OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, + FailedPayloadsError, KafkaUnavailableError ) logger = logging.getLogger(__name__) @@ -90,11 +91,6 @@ class KafkaConsumer(object): Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi - - Message fetching proceeds in batches, with each topic/partition - queried for a maximum of `fetch_message_max_bytes` data - After consuming all messages in a batch, StopIteration is raised - Iterating again after StopIteration will trigger another batch to be fetched """ DEFAULT_CONSUMER_CONFIG = { @@ -237,8 +233,16 @@ class KafkaConsumer(object): if consumer_timeout and time.time() > consumer_timeout: raise ConsumerTimeout('Consumer timed out waiting to fetch messages') - def offsets(self, group): - return dict(deepcopy(getattr(self._offsets, group))) + def offsets(self, group=None): + if not group: + return { + 'fetch': self.offsets('fetch'), + 'commit': self.offsets('commit'), + 'task_done': self.offsets('task_done'), + 'highwater': self.offsets('highwater') + } + else: + return dict(deepcopy(getattr(self._offsets, group))) def task_done(self, message): topic = message.topic @@ -340,10 +344,15 @@ class KafkaConsumer(object): # client.send_fetch_request will collect topic/partition requests by leader # and send each group as a single FetchRequest to the correct broker - responses = self.client.send_fetch_request(fetches, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - fail_on_error=False) + try: + responses = self.client.send_fetch_request(fetches, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + fail_on_error=False) + except FailedPayloadsError: + logger.warning('FailedPayloadsError attempting to fetch data from kafka') + self._refresh_metadata_on_error() + return for resp in responses: topic = resp.topic @@ -363,10 +372,7 @@ class KafkaConsumer(object): logger.warning("NotLeaderForPartitionError for %s - %d. " "Metadata may be out of date", topic, partition) - sleep_ms = self._get_config('refresh_leader_backoff_ms') - logger.warning("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) - time.sleep(sleep_ms / 1000.0) - self.client.load_metadata_for_topics() + self._refresh_metadata_on_error() continue except RequestTimedOutError: @@ -430,3 +436,16 @@ class KafkaConsumer(object): assert resp.partition == partition return resp.offsets + + def _refresh_metadata_on_error(self): + sleep_ms = self._get_config('refresh_leader_backoff_ms') + while True: + logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) + time.sleep(sleep_ms / 1000.0) + try: + self.client.load_metadata_for_topics() + except KafkaUnavailableError: + logger.warning("Unable to refresh topic metadata... cluster unavailable") + else: + logger.info("Topic metadata refreshed") + return |