diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-12 17:31:10 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-12 18:21:32 -0800 |
commit | 047a65f1d9965f5b6913b18fabb3f44f8a726430 (patch) | |
tree | ca0b11863db1eff404b7dd9cf760cdc4edf8bb64 | |
parent | 86d98c00fdda7f0d9f2cccb64e2128977bd5ee8d (diff) | |
download | kafka-python-047a65f1d9965f5b6913b18fabb3f44f8a726430.tar.gz |
factor group checking logic to KafkaConsumer._use_consumer_group()
-rw-r--r-- | kafka/consumer/group.py | 61 |
1 files changed, 37 insertions, 24 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b43b0f4..637ef93 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -439,14 +439,14 @@ class KafkaConsumer(six.Iterator): Returns: dict: map of topic to list of records (may be empty) """ - if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + if self._use_consumer_group(): + self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_active_group() + + # 0.8.2 brokers support kafka-backed offset storage via group coordinator + elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() - if self.config['api_version'] >= (0, 9): - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() # fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -665,6 +665,16 @@ class KafkaConsumer(six.Iterator): self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + def _use_consumer_group(self): + """Return True iff this consumer can/should join a broker-coordinated group.""" + if self.config['api_version'] < (0, 9): + return False + elif self.config['group_id'] is None: + return False + elif not self._subscription.partitions_auto_assigned(): + return False + return True + def _update_fetch_positions(self, partitions): """ Set the fetch position to the committed position (if there is one) @@ -690,17 +700,16 @@ class KafkaConsumer(six.Iterator): def _message_generator(self): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: - if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() - if self.config['api_version'] >= (0, 9): - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() + if self._use_consumer_group(): + self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_active_group() - # fetch positions if we have partitions we're subscribed to that we - # don't know the offset for + # 0.8.2 brokers support kafka-backed offset storage via group coordinator + elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() + + # fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) @@ -714,14 +723,18 @@ class KafkaConsumer(six.Iterator): # like heartbeats, auto-commits, and metadata refreshes timeout_at = self._next_timeout() - if self.config['api_version'] >= (0, 9): - if self.config['group_id'] is not None and not self.assignment(): - sleep_time = max(timeout_at - time.time(), 0) - if sleep_time > 0 and not self._client.in_flight_request_count(): - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - + # Because the consumer client poll does not sleep unless blocking on + # network IO, we need to explicitly sleep when we know we are idle + # because we haven't been assigned any partitions to fetch / consume + if self._use_consumer_group() and not self.assignment(): + sleep_time = max(timeout_at - time.time(), 0) + if sleep_time > 0 and not self._client.in_flight_request_count(): + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + # Short-circuit the fetch iterator if we are already timed out + # to avoid any unintentional interaction with fetcher setup if time.time() > timeout_at: continue |