diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-14 10:42:04 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:54 -0800 |
commit | e702880bda02f5f8c142afe34ce7924a08516389 (patch) | |
tree | c302029a1e36ac07fd6ad3c4c306afde29237ced /kafka | |
parent | 62a71892a687d99bf0076ed3a3d9c614f16a112c (diff) | |
download | kafka-python-e702880bda02f5f8c142afe34ce7924a08516389.tar.gz |
self._topics is private; fixup topic iterations for new TopicAndPartition list; add more type checks to set_topic_and_partitions
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/new.py | 111 |
1 files changed, 67 insertions, 44 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index 54b1922..e7c9c55 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -148,51 +148,48 @@ class KafkaConsumer(object): def _fetch_stored_offsets(self): logger.info("Consumer fetching stored offsets") - for topic in self.topics: - for partition in self.client.get_partition_ids_for_topic(topic): - - (resp,) = self.client.send_offset_fetch_request( - self._config['group_id'], - [OffsetFetchRequest(topic, partition)], - fail_on_error=False) - try: - check_error(resp) - # API spec says server wont set an error here - # but 0.8.1.1 does actually... - except UnknownTopicOrPartitionError: - pass - - # -1 offset signals no commit is currently stored - if resp.offset == -1: - self._offsets.commit[topic][partition] = None - self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) - - # Otherwise we committed the stored offset - # and need to fetch the next one - else: - self._offsets.commit[topic][partition] = resp.offset - self._offsets.fetch[topic][partition] = resp.offset + for topic, partition in self._topics: + (resp,) = self.client.send_offset_fetch_request( + self._config['group_id'], + [OffsetFetchRequest(topic, partition)], + fail_on_error=False) + try: + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... + except UnknownTopicOrPartitionError: + pass + + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self._offsets.commit[topic][partition] = None + self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self._offsets.commit[topic][partition] = resp.offset + self._offsets.fetch[topic][partition] = resp.offset def _auto_reset_offsets(self): logger.info("Consumer auto-resetting offsets") - for topic in self.topics: - for partition in self.client.get_partition_ids_for_topic(topic): + for topic, partition in self._topics: - self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) - self._offsets.commit[topic][partition] = None + self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition) + self._offsets.commit[topic][partition] = None def _reset_highwater_offsets(self): - for topic in self.topics: - for partition in self.client.get_partition_ids_for_topic(topic): - self._offsets.highwater[topic][partition] = None + for topic, partition in self._topics: + self._offsets.highwater[topic][partition] = None def _reset_task_done_offsets(self): - for topic in self.topics: - for partition in self.client.get_partition_ids_for_topic(topic): - self._offsets.task_done[topic][partition] = None + for topic, partition in self._topics: + self._offsets.task_done[topic][partition] = None def __repr__(self): - return '<KafkaConsumer topics=(%s)>' % ', '.join(self.topics) + return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition + for topic_partition in + self._topics]) def __iter__(self): return self @@ -348,27 +345,50 @@ class KafkaConsumer(object): # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) """ - self.topics = [] + self._topics = [] self.client.load_metadata_for_topics() + # Handle different topic types for arg in topics: + + # Topic name str -- all partitions if isinstance(arg, six.string_types): for partition in self.client.get_partition_ids_for_topic(arg): - self.topics.append(TopicAndPartition(arg, partition)) + self._topics.append(TopicAndPartition(arg, partition)) + # TopicAndPartition namedtuple elif isinstance(arg, TopicAndPartition): - self.topics.append(arg) + self._topics.append(arg) + # (topic, partition) tuple elif isinstance(arg, tuple): - self.topics.append(TopicAndPartition(*arg)) - + if not isinstance(arg[0], six.string_types): + raise KafkaConfigurationError('Unknown topic type in (topic, ' + 'partition) tuple (expected string)') + if not isinstance(arg[1], int): + raise KafkaConfigurationError('Unknown partition type in (topic, ' + 'partition) tuple (expected int)') + self._topics.append(TopicAndPartition(*arg)) + + # { topic: partitions, ... } dict elif isinstance(arg, dict): for topic in arg: + + if not isinstance(topic, six.string_types): + raise KafkaConfigurationError('Unknown topic type in {topic: ' + 'partitions} dict (expected string)') + # partition can either be a single partition int if isinstance(arg[topic], int): - self.topics.append(TopicAndPartition(topic, arg[topic])) + self._topics.append(TopicAndPartition(topic, arg[topic])) + + # or a list/tuple of partition ints elif isinstance(arg[topic], (list, tuple)): for partition in arg[topic]: - self.topics.append(TopicAndPartition(topic, partition)) + if not isinstance(arg[topic], int): + raise KafkaConfigurationError('Unknown partition type in {topic: ' + 'partitions} dict (expected list ' + 'or tuple of ints)') + self._topics.append(TopicAndPartition(topic, partition)) else: raise KafkaConfigurationError('Unknown topic type (dict key must be ' 'int or list/tuple of ints)') @@ -378,10 +398,13 @@ class KafkaConsumer(object): '(topic,partition) tuple, or {topic: ' 'partitions} dict)') - # Get initial topic metadata - for topic_partitions in self.topics: + # Verify that all topic/partitions exist in metadata + for topic, partition in self._topics: if topic not in self.client.topic_partitions: raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) + if partition not in self.client.get_partition_ids_for_topic(topic): + raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " + "in broker metadata" % (partition, topic)) logger.info("Configuring consumer to fetch topic '%s'", topic) def fetch_messages(self): |