diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/new.py | 278 |
1 files changed, 151 insertions, 127 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index 39db60c..edf7843 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -1,4 +1,4 @@ -from collections import defaultdict, namedtuple +from collections import namedtuple from copy import deepcopy import logging import sys @@ -11,8 +11,7 @@ from kafka.common import ( OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, - FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError, - TopicAndPartition + FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError ) logger = logging.getLogger(__name__) @@ -125,30 +124,12 @@ class KafkaConsumer(object): self.set_topic_partitions(*topics) self._msg_iter = None - # Setup offsets - self._offsets = OffsetsStruct(fetch=defaultdict(dict), - commit=defaultdict(dict), - highwater= defaultdict(dict), - task_done=defaultdict(dict)) - - # If we have a consumer group, try to fetch stored offsets - if self._config['group_id']: - self._fetch_stored_offsets() - else: - self._auto_reset_offsets() - - # highwater marks (received from server on fetch response) - # and task_done (set locally by user) - # should always get initialized to None - self._reset_highwater_offsets() - self._reset_task_done_offsets() - - def _fetch_stored_offsets(self): + def _get_commit_offsets(self): logger.info("Consumer fetching stored offsets") - for topic, partition in self._topics: + for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( self._config['group_id'], - [OffsetFetchRequest(topic, partition)], + [OffsetFetchRequest(topic_partition[0], topic_partition[1])], fail_on_error=False) try: check_error(resp) @@ -159,29 +140,20 @@ class KafkaConsumer(object): # -1 offset signals no commit is currently stored if resp.offset == -1: - self._offsets.commit[topic][partition] = None - self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) + self._offsets.commit[topic_partition] = None # Otherwise we committed the stored offset # and need to fetch the next one else: - self._offsets.commit[topic][partition] = resp.offset - self._offsets.fetch[topic][partition] = resp.offset - - def _auto_reset_offsets(self): - logger.info("Consumer auto-resetting offsets") - for topic, partition in self._topics: - - self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) - self._offsets.commit[topic][partition] = None + self._offsets.commit[topic_partition] = resp.offset def _reset_highwater_offsets(self): - for topic, partition in self._topics: - self._offsets.highwater[topic][partition] = None + for topic_partition in self._topics: + self._offsets.highwater[topic_partition] = None def _reset_task_done_offsets(self): - for topic, partition in self._topics: - self._offsets.task_done[topic][partition] = None + for topic_partition in self._topics: + self._offsets.task_done[topic_partition] = None def __repr__(self): return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition @@ -224,12 +196,16 @@ class KafkaConsumer(object): return dict(deepcopy(getattr(self._offsets, group))) def task_done(self, message): - topic = message.topic - partition = message.partition + """ + Mark a fetched message as consumed. + Offsets for messages marked as "task_done" will be stored back + to the kafka cluster for this consumer group on commit() + """ + topic_partition = (message.topic, message.partition) offset = message.offset # Warn on non-contiguous offsets - prev_done = self._offsets.task_done[topic][partition] + prev_done = self._offsets.task_done[topic_partition] if prev_done is not None and offset != (prev_done + 1): logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', offset, prev_done) @@ -237,12 +213,12 @@ class KafkaConsumer(object): # Warn on smaller offsets than previous commit # "commit" offsets are actually the offset of the next # message to fetch. # so task_done should be compared with (commit - 1) - prev_done = (self._offsets.commit[topic][partition] - 1) + prev_done = (self._offsets.commit[topic_partition] - 1) if prev_done is not None and (offset <= prev_done): logger.warning('Marking task_done on a previously committed offset?: %d <= %d', offset, prev_done) - self._offsets.task_done[topic][partition] = offset + self._offsets.task_done[topic_partition] = offset def should_auto_commit(self): if not self._config['auto_commit_enable']: @@ -267,24 +243,23 @@ class KafkaConsumer(object): offsets = self._offsets.task_done commits = [] - for topic, partitions in offsets.iteritems(): - for partition, task_done in partitions.iteritems(): + for topic_partition, task_done_offset in offsets.iteritems(): - # Skip if None - if task_done is None: - continue + # Skip if None + if task_done_offset is None: + continue - # Commit offsets as the next offset to fetch - # which is consistent with the Java Client - # task_done is marked by messages consumed, - # so add one to mark the next message for fetching - commit_offset = (task_done + 1) + # Commit offsets as the next offset to fetch + # which is consistent with the Java Client + # task_done is marked by messages consumed, + # so add one to mark the next message for fetching + commit_offset = (task_done_offset + 1) - # Skip if no change from previous committed - if commit_offset == self._offsets.commit[topic][partition]: - continue + # Skip if no change from previous committed + if commit_offset == self._offsets.commit[topic_partition]: + continue - commits.append(OffsetCommitRequest(topic, partition, commit_offset, metadata)) + commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata)) if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) @@ -294,8 +269,9 @@ class KafkaConsumer(object): for r in resps: check_error(r) - task_done = self._offsets.task_done[r.topic][r.partition] - self._offsets.commit[r.topic][r.partition] = (task_done + 1) + topic_partition = (r.topic, r.partition) + task_done = self._offsets.task_done[topic_partition] + self._offsets.commit[topic_partition] = (task_done + 1) if self._config['auto_commit_enable']: self._set_next_auto_commit_time() @@ -335,82 +311,128 @@ class KafkaConsumer(object): def set_topic_partitions(self, *topics): """ Set the topic/partitions to consume + Optionally specify offsets to start from Accepts types: - str - topic name, will consume all available partitions - TopicAndPartition namedtuple - will consume topic/partition - tuple - will consume (topic, partition) - dict - will consume { topic: partition } - { topic: [partition list] } - { topic: (partition tuple,) } + str: topic name (will consume all available partitions) + tuple: (topic, partition) + dict: { topic: partition } + { topic: [partition list] } + { topic: (partition tuple,) } + + Optionally, offsets can be specified directly: + tuple: (topic, partition, offset) + dict: { (topic, partition): offset, ... } Ex: kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + + # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) """ self._topics = [] self._client.load_metadata_for_topics() + # Setup offsets + self._offsets = OffsetsStruct(fetch=dict(), + commit=dict(), + highwater=dict(), + task_done=dict()) + # Handle different topic types for arg in topics: # Topic name str -- all partitions if isinstance(arg, six.string_types): - for partition in self.client.get_partition_ids_for_topic(arg): - self._topics.append(TopicAndPartition(arg, partition)) - - # TopicAndPartition namedtuple - elif isinstance(arg, TopicAndPartition): - self._topics.append(arg) + topic = arg + for partition in self._client.get_partition_ids_for_topic(arg): + self._consume_topic_partition(topic, partition) - # (topic, partition) tuple + # (topic, partition [, offset]) tuple elif isinstance(arg, tuple): - if not isinstance(arg[0], six.string_types): - raise KafkaConfigurationError('Unknown topic type in (topic, ' - 'partition) tuple (expected string)') - if not isinstance(arg[1], int): - raise KafkaConfigurationError('Unknown partition type in (topic, ' - 'partition) tuple (expected int)') - self._topics.append(TopicAndPartition(*arg)) + (topic, partition) = arg[0:2] + if len(arg) == 3: + offset = arg[2] + self._offsets.fetch[(topic, partition)] = offset + self._consume_topic_partition(topic, partition) # { topic: partitions, ... } dict elif isinstance(arg, dict): - for topic in arg: - - if not isinstance(topic, six.string_types): - raise KafkaConfigurationError('Unknown topic type in {topic: ' - 'partitions} dict (expected string)') - # partition can either be a single partition int - if isinstance(arg[topic], int): - self._topics.append(TopicAndPartition(topic, arg[topic])) - - # or a list/tuple of partition ints - elif isinstance(arg[topic], (list, tuple)): - for partition in arg[topic]: - if not isinstance(arg[topic], int): - raise KafkaConfigurationError('Unknown partition type in {topic: ' - 'partitions} dict (expected list ' - 'or tuple of ints)') - self._topics.append(TopicAndPartition(topic, partition)) - else: - raise KafkaConfigurationError('Unknown topic type (dict key must be ' - 'int or list/tuple of ints)') + for key, value in arg.iteritems(): + + # key can be string (a topic) + if isinstance(key, six.string_types): + + # topic: partition + if isinstance(value, int): + self._consume_topic_partition(key, value) + + # topic: [ partition1, partition2, ... ] + elif isinstance(value, (list, tuple)): + for partition in value: + self._consume_topic_partition(key, partition) + else: + raise KafkaConfigurationError('Unknown topic type (dict key must be ' + 'int or list/tuple of ints)') + + # (topic, partition): offset + elif isinstance(key, tuple): + self._consume_topic_partition(*key) + self._offsets.fetch[key] = value + else: - raise KafkaConfigurationError('Unknown topic type (topic must be ' - 'string, TopicAndPartition, ' - '(topic,partition) tuple, or {topic: ' - 'partitions} dict)') - - # Verify that all topic/partitions exist in metadata - for topic, partition in self._topics: - if topic not in self.client.topic_partitions: - raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) - if partition not in self.client.get_partition_ids_for_topic(topic): - raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " - "in broker metadata" % (partition, topic)) - logger.info("Configuring consumer to fetch topic '%s'", topic) + raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) + + # If we have a consumer group, try to fetch stored offsets + if self._config['group_id']: + self._get_commit_offsets() + + # Update missing fetch/commit offsets + for topic_partition in self._topics: + + # Commit offsets default is None + if topic_partition not in self._offsets.commit: + self._offsets.commit[topic_partition] = None + + # Skip if we already have a fetch offset from user args + if topic_partition not in self._offsets.fetch: + + # Fetch offsets default is (1) commit + if self._offsets.commit[topic_partition] is not None: + self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition] + + # or (2) auto reset + else: + self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + + # highwater marks (received from server on fetch response) + # and task_done (set locally by user) + # should always get initialized to None + self._reset_highwater_offsets() + self._reset_task_done_offsets() + + def _consume_topic_partition(self, topic, partition): + if not isinstance(topic, six.string_types): + raise KafkaConfigurationError('Unknown topic type (%s) ' + '-- expected string' % type(topic)) + if not isinstance(partition, int): + raise KafkaConfigurationError('Unknown partition type (%s) ' + '-- expected int' % type(partition)) + + if topic not in self._client.topic_partitions: + raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) + if partition not in self._client.get_partition_ids_for_topic(topic): + raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " + "in broker metadata" % (partition, topic)) + logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) + self._topics.append((topic, partition)) def fetch_messages(self): @@ -420,9 +442,8 @@ class KafkaConsumer(object): fetches = [] offsets = self._offsets.fetch - for topic, partitions in offsets.iteritems(): - for partition, offset in partitions.iteritems(): - fetches.append(FetchRequest(topic, partition, offset, max_bytes)) + for topic_partition, offset in offsets.iteritems(): + fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) # client.send_fetch_request will collect topic/partition requests by leader # and send each group as a single FetchRequest to the correct broker @@ -437,48 +458,51 @@ class KafkaConsumer(object): return for resp in responses: - topic = resp.topic - partition = resp.partition + topic_partition = (resp.topic, resp.partition) try: check_error(resp) except OffsetOutOfRangeError: logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d ' '(Highwatermark: %d)', - topic, partition, offsets[topic][partition], - resp.highwaterMark) + resp.topic, resp.partition, + offsets[topic_partition], resp.highwaterMark) # Reset offset - self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) + self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) continue except NotLeaderForPartitionError: logger.warning("NotLeaderForPartitionError for %s - %d. " "Metadata may be out of date", - topic, partition) + resp.topic, resp.partition) self._refresh_metadata_on_error() continue except RequestTimedOutError: - logger.warning("RequestTimedOutError for %s - %d", topic, partition) + logger.warning("RequestTimedOutError for %s - %d", + resp.topic, resp.partition) continue # Track server highwater mark - self._offsets.highwater[topic][partition] = resp.highwaterMark + self._offsets.highwater[topic_partition] = resp.highwaterMark # Yield each message # Kafka-python could raise an exception during iteration # we are not catching -- user will need to address for (offset, message) in resp.messages: # deserializer_class could raise an exception here - msg = KafkaMessage(topic, partition, offset, message.key, + msg = KafkaMessage(resp.topic, + resp.partition, + offset, message.key, self._config['deserializer_class'](message.value)) # Only increment fetch offset if we safely got the message and deserialized - self._offsets.fetch[topic][partition] = offset + 1 + self._offsets.fetch[topic_partition] = offset + 1 # Then yield to user yield msg - def _reset_partition_offset(self, topic, partition): + def _reset_partition_offset(self, topic_partition): + (topic, partition) = topic_partition LATEST = -1 EARLIEST = -2 |