summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-14 10:42:04 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:54 -0800
commite702880bda02f5f8c142afe34ce7924a08516389 (patch)
treec302029a1e36ac07fd6ad3c4c306afde29237ced /kafka
parent62a71892a687d99bf0076ed3a3d9c614f16a112c (diff)
downloadkafka-python-e702880bda02f5f8c142afe34ce7924a08516389.tar.gz
self._topics is private; fixup topic iterations for new TopicAndPartition list; add more type checks to set_topic_and_partitions
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/new.py111
1 files changed, 67 insertions, 44 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 54b1922..e7c9c55 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -148,51 +148,48 @@ class KafkaConsumer(object):
def _fetch_stored_offsets(self):
logger.info("Consumer fetching stored offsets")
- for topic in self.topics:
- for partition in self.client.get_partition_ids_for_topic(topic):
-
- (resp,) = self.client.send_offset_fetch_request(
- self._config['group_id'],
- [OffsetFetchRequest(topic, partition)],
- fail_on_error=False)
- try:
- check_error(resp)
- # API spec says server wont set an error here
- # but 0.8.1.1 does actually...
- except UnknownTopicOrPartitionError:
- pass
-
- # -1 offset signals no commit is currently stored
- if resp.offset == -1:
- self._offsets.commit[topic][partition] = None
- self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
-
- # Otherwise we committed the stored offset
- # and need to fetch the next one
- else:
- self._offsets.commit[topic][partition] = resp.offset
- self._offsets.fetch[topic][partition] = resp.offset
+ for topic, partition in self._topics:
+ (resp,) = self.client.send_offset_fetch_request(
+ self._config['group_id'],
+ [OffsetFetchRequest(topic, partition)],
+ fail_on_error=False)
+ try:
+ check_error(resp)
+ # API spec says server wont set an error here
+ # but 0.8.1.1 does actually...
+ except UnknownTopicOrPartitionError:
+ pass
+
+ # -1 offset signals no commit is currently stored
+ if resp.offset == -1:
+ self._offsets.commit[topic][partition] = None
+ self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
+
+ # Otherwise we committed the stored offset
+ # and need to fetch the next one
+ else:
+ self._offsets.commit[topic][partition] = resp.offset
+ self._offsets.fetch[topic][partition] = resp.offset
def _auto_reset_offsets(self):
logger.info("Consumer auto-resetting offsets")
- for topic in self.topics:
- for partition in self.client.get_partition_ids_for_topic(topic):
+ for topic, partition in self._topics:
- self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
- self._offsets.commit[topic][partition] = None
+ self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
+ self._offsets.commit[topic][partition] = None
def _reset_highwater_offsets(self):
- for topic in self.topics:
- for partition in self.client.get_partition_ids_for_topic(topic):
- self._offsets.highwater[topic][partition] = None
+ for topic, partition in self._topics:
+ self._offsets.highwater[topic][partition] = None
def _reset_task_done_offsets(self):
- for topic in self.topics:
- for partition in self.client.get_partition_ids_for_topic(topic):
- self._offsets.task_done[topic][partition] = None
+ for topic, partition in self._topics:
+ self._offsets.task_done[topic][partition] = None
def __repr__(self):
- return '<KafkaConsumer topics=(%s)>' % ', '.join(self.topics)
+ return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
+ for topic_partition in
+ self._topics])
def __iter__(self):
return self
@@ -348,27 +345,50 @@ class KafkaConsumer(object):
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
"""
- self.topics = []
+ self._topics = []
self.client.load_metadata_for_topics()
+ # Handle different topic types
for arg in topics:
+
+ # Topic name str -- all partitions
if isinstance(arg, six.string_types):
for partition in self.client.get_partition_ids_for_topic(arg):
- self.topics.append(TopicAndPartition(arg, partition))
+ self._topics.append(TopicAndPartition(arg, partition))
+ # TopicAndPartition namedtuple
elif isinstance(arg, TopicAndPartition):
- self.topics.append(arg)
+ self._topics.append(arg)
+ # (topic, partition) tuple
elif isinstance(arg, tuple):
- self.topics.append(TopicAndPartition(*arg))
-
+ if not isinstance(arg[0], six.string_types):
+ raise KafkaConfigurationError('Unknown topic type in (topic, '
+ 'partition) tuple (expected string)')
+ if not isinstance(arg[1], int):
+ raise KafkaConfigurationError('Unknown partition type in (topic, '
+ 'partition) tuple (expected int)')
+ self._topics.append(TopicAndPartition(*arg))
+
+ # { topic: partitions, ... } dict
elif isinstance(arg, dict):
for topic in arg:
+
+ if not isinstance(topic, six.string_types):
+ raise KafkaConfigurationError('Unknown topic type in {topic: '
+ 'partitions} dict (expected string)')
+ # partition can either be a single partition int
if isinstance(arg[topic], int):
- self.topics.append(TopicAndPartition(topic, arg[topic]))
+ self._topics.append(TopicAndPartition(topic, arg[topic]))
+
+ # or a list/tuple of partition ints
elif isinstance(arg[topic], (list, tuple)):
for partition in arg[topic]:
- self.topics.append(TopicAndPartition(topic, partition))
+ if not isinstance(arg[topic], int):
+ raise KafkaConfigurationError('Unknown partition type in {topic: '
+ 'partitions} dict (expected list '
+ 'or tuple of ints)')
+ self._topics.append(TopicAndPartition(topic, partition))
else:
raise KafkaConfigurationError('Unknown topic type (dict key must be '
'int or list/tuple of ints)')
@@ -378,10 +398,13 @@ class KafkaConsumer(object):
'(topic,partition) tuple, or {topic: '
'partitions} dict)')
- # Get initial topic metadata
- for topic_partitions in self.topics:
+ # Verify that all topic/partitions exist in metadata
+ for topic, partition in self._topics:
if topic not in self.client.topic_partitions:
raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
+ if partition not in self.client.get_partition_ids_for_topic(topic):
+ raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
+ "in broker metadata" % (partition, topic))
logger.info("Configuring consumer to fetch topic '%s'", topic)
def fetch_messages(self):