diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-24 21:24:21 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-24 21:24:21 -0800 |
commit | 2c7b7452a8ca761672e70ee56b3779e4a96c1997 (patch) | |
tree | 0c81eb7336ac4a8d8bc70704993b3b7d9738a5d4 | |
parent | 077dc4742ffa82584946379790424faf4c6ba47f (diff) | |
parent | c02b2711f1b18bba85155f8bf402b5b9824b6502 (diff) | |
download | kafka-python-2c7b7452a8ca761672e70ee56b3779e4a96c1997.tar.gz |
Merge pull request #516 from dpkp/group_id_none
Support group_id=None to disable offset commits and group membership
-rw-r--r-- | kafka/consumer/group.py | 48 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 25 | ||||
-rw-r--r-- | test/test_producer.py | 1 |
3 files changed, 46 insertions, 28 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 333ef64..0e03544 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -42,9 +42,11 @@ class KafkaConsumer(six.Iterator): server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: 'kafka-python-{version}' - group_id (str): name of the consumer group to join for dynamic + group_id (str or None): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' + committing offsets. If None, auto-partition assignment (via + group coordinator) and offset commits are disabled. + Default: 'kafka-python-default-group' key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (callable, optional): Any callable that takes a @@ -283,7 +285,8 @@ class KafkaConsumer(six.Iterator): Returns: kafka.future.Future """ - assert self.config['api_version'] >= (0, 8, 1) + assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' + assert self.config['group_id'] is not None, 'Requires group_id' if offsets is None: offsets = self._subscription.all_consumed_offsets() log.debug("Committing offsets: %s", offsets) @@ -309,7 +312,8 @@ class KafkaConsumer(six.Iterator): to commit with the configured group_id. Defaults to current consumed offsets for all subscribed partitions. """ - assert self.config['api_version'] >= (0, 8, 1) + assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' + assert self.config['group_id'] is not None, 'Requires group_id' if offsets is None: offsets = self._subscription.all_consumed_offsets() self._coordinator.commit_offsets_sync(offsets) @@ -330,7 +334,8 @@ class KafkaConsumer(six.Iterator): Returns: The last committed offset, or None if there was no prior commit. """ - assert self.config['api_version'] >= (0, 8, 1) + assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' + assert self.config['group_id'] is not None, 'Requires group_id' if self._subscription.is_assigned(partition): committed = self._subscription.assignment[partition].committed if committed is None: @@ -418,14 +423,14 @@ class KafkaConsumer(six.Iterator): Returns: dict: map of topic to list of records (may be empty) """ - if self.config['api_version'] >= (0, 8, 2): - # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) - self._coordinator.ensure_coordinator_known() + if self.config['group_id'] is not None: + if self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() - if self.config['api_version'] >= (0, 9): - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() # fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -603,7 +608,9 @@ class KafkaConsumer(six.Iterator): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined """ - if self.config['api_version'] >= (0, 8, 1): + if (self.config['api_version'] >= (0, 8, 1) + and self.config['group_id'] is not None): + # refresh commits for all assigned partitions self._coordinator.refresh_committed_offsets_if_needed() @@ -613,13 +620,14 @@ class KafkaConsumer(six.Iterator): def _message_generator(self): assert self.assignment() or self.subscription() is not None while time.time() < self._consumer_timeout: - if self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + if self.config['group_id'] is not None: + if self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() - if self.config['api_version'] >= (0, 9): - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() # fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -634,7 +642,7 @@ class KafkaConsumer(six.Iterator): self._client.cluster.ttl() / 1000.0 + time.time()) if self.config['api_version'] >= (0, 9): - if not self.assignment(): + if self.config['group_id'] is not None and not self.assignment(): sleep_time = time.time() - timeout_at log.debug('No partitions assigned; sleeping for %s', sleep_time) time.sleep(sleep_time) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 7390ab3..263dac0 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -75,18 +75,24 @@ class ConsumerCoordinator(BaseCoordinator): if key in configs: self.config[key] = configs[key] - self._cluster = client.cluster + if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None: + assert self.config['assignors'], 'Coordinator requires assignors' + self._subscription = subscription self._partitions_per_topic = {} - self._auto_commit_task = None - if self.config['api_version'] >= (0, 9): - assert self.config['assignors'], 'Coordinator require assignors' - + self._cluster = client.cluster self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) - if self.config['api_version'] >= (0, 8, 1): - if self.config['enable_auto_commit']: + self._auto_commit_task = None + if self.config['enable_auto_commit']: + if self.config['api_version'] < (0, 8, 1): + log.warning('Broker version (%s) does not support offset' + ' commits; disabling auto-commit.', + self.config['api_version']) + elif self.config['group_id'] is None: + log.warning('group_id is None: disabling auto-commit.') + else: interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(self, interval) @@ -127,7 +133,10 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger # a rebalance if self._subscription_metadata_changed(): - if self.config['api_version'] >= (0, 9): + + if (self.config['api_version'] >= (0, 9) + and self.config['group_id'] is not None): + self._subscription.mark_for_reassignment() # If we haven't got group coordinator support, diff --git a/test/test_producer.py b/test/test_producer.py index b84feb4..263df11 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -12,6 +12,7 @@ def test_end_to_end(kafka_broker): max_block_ms=10000, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, + group_id=None, consumer_timeout_ms=10000, auto_offset_reset='earliest', value_deserializer=bytes.decode) |