summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-01 00:17:42 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-01 00:17:42 -0800
commit894c9aac50ee9a0b0034ea396a7a13e3b5150114 (patch)
treea9f1654e4a05faad1e8f5623194135c289bffce3
parentd3d6ea939b85ca033293898e2c4c63eda2335aab (diff)
downloadkafka-python-894c9aac50ee9a0b0034ea396a7a13e3b5150114.tar.gz
Be sure to get all metadata when subscribing to a regex pattern.
-rw-r--r--kafka/consumer/group.py31
1 files changed, 19 insertions, 12 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d77a27a..65bb670 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -594,20 +594,26 @@ class KafkaConsumer(six.Iterator):
any listener set in a previous call to subscribe. It is
guaranteed, however, that the partitions revoked/assigned
through this interface are from topics subscribed in this call.
+
+ Raises:
+ IllegalStateError: if called after previously calling assign()
+ AssertionError: if neither topics or pattern is provided
+ TypeError: if listener is not a ConsumerRebalanceListener
"""
- if not topics:
- self.unsubscribe()
+ # SubscriptionState handles error checking
+ self._subscription.subscribe(topics=topics,
+ pattern=pattern,
+ listener=listener)
+
+ # regex will need all topic metadata
+ if pattern is not None:
+ self._client.cluster.need_all_topic_metadata = True
+ self._client.set_topics([])
+ log.debug("Subscribed to topic pattern: %s", pattern)
else:
- self._subscription.subscribe(topics=topics,
- pattern=pattern,
- listener=listener)
- # regex will need all topic metadata
- if pattern is not None:
- self._client.set_topics([])
- log.debug("Subscribed to topic pattern: %s", topics)
- else:
- self._client.set_topics(self._subscription.group_subscription())
- log.debug("Subscribed to topic(s): %s", topics)
+ self._client.cluster.need_all_topic_metadata = False
+ self._client.set_topics(self._subscription.group_subscription())
+ log.debug("Subscribed to topic(s): %s", topics)
def subscription(self):
"""Get the current topic subscription.
@@ -621,6 +627,7 @@ class KafkaConsumer(six.Iterator):
"""Unsubscribe from all topics and clear all assigned partitions."""
self._subscription.unsubscribe()
self._coordinator.close()
+ self._client.cluster.need_all_topic_metadata = False
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")