diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-01 00:17:42 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-01 00:17:42 -0800 |
commit | 894c9aac50ee9a0b0034ea396a7a13e3b5150114 (patch) | |
tree | a9f1654e4a05faad1e8f5623194135c289bffce3 | |
parent | d3d6ea939b85ca033293898e2c4c63eda2335aab (diff) | |
download | kafka-python-894c9aac50ee9a0b0034ea396a7a13e3b5150114.tar.gz |
Be sure to get all metadata when subscribing to a regex pattern.
-rw-r--r-- | kafka/consumer/group.py | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d77a27a..65bb670 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -594,20 +594,26 @@ class KafkaConsumer(six.Iterator): any listener set in a previous call to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. + + Raises: + IllegalStateError: if called after previously calling assign() + AssertionError: if neither topics or pattern is provided + TypeError: if listener is not a ConsumerRebalanceListener """ - if not topics: - self.unsubscribe() + # SubscriptionState handles error checking + self._subscription.subscribe(topics=topics, + pattern=pattern, + listener=listener) + + # regex will need all topic metadata + if pattern is not None: + self._client.cluster.need_all_topic_metadata = True + self._client.set_topics([]) + log.debug("Subscribed to topic pattern: %s", pattern) else: - self._subscription.subscribe(topics=topics, - pattern=pattern, - listener=listener) - # regex will need all topic metadata - if pattern is not None: - self._client.set_topics([]) - log.debug("Subscribed to topic pattern: %s", topics) - else: - self._client.set_topics(self._subscription.group_subscription()) - log.debug("Subscribed to topic(s): %s", topics) + self._client.cluster.need_all_topic_metadata = False + self._client.set_topics(self._subscription.group_subscription()) + log.debug("Subscribed to topic(s): %s", topics) def subscription(self): """Get the current topic subscription. @@ -621,6 +627,7 @@ class KafkaConsumer(six.Iterator): """Unsubscribe from all topics and clear all assigned partitions.""" self._subscription.unsubscribe() self._coordinator.close() + self._client.cluster.need_all_topic_metadata = False self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") |