summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-24 16:22:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-24 18:38:26 -0800
commit2a1970138c233e1ebaa58b6db670b2ed0f8d8551 (patch)
tree357a3b2d0303a72801ca4b52ff540d12c116cd4d
parent077dc4742ffa82584946379790424faf4c6ba47f (diff)
downloadkafka-python-2a1970138c233e1ebaa58b6db670b2ed0f8d8551.tar.gz
Disable offset commits and auto-partition-assignment when group_id is None
-rw-r--r--kafka/consumer/group.py48
-rw-r--r--kafka/coordinator/consumer.py25
2 files changed, 45 insertions, 28 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 333ef64..0e03544 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -42,9 +42,11 @@ class KafkaConsumer(six.Iterator):
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
- group_id (str): name of the consumer group to join for dynamic
+ group_id (str or None): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
- committing offsets. Default: 'kafka-python-default-group'
+ committing offsets. If None, auto-partition assignment (via
+ group coordinator) and offset commits are disabled.
+ Default: 'kafka-python-default-group'
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable, optional): Any callable that takes a
@@ -283,7 +285,8 @@ class KafkaConsumer(six.Iterator):
Returns:
kafka.future.Future
"""
- assert self.config['api_version'] >= (0, 8, 1)
+ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
+ assert self.config['group_id'] is not None, 'Requires group_id'
if offsets is None:
offsets = self._subscription.all_consumed_offsets()
log.debug("Committing offsets: %s", offsets)
@@ -309,7 +312,8 @@ class KafkaConsumer(six.Iterator):
to commit with the configured group_id. Defaults to current
consumed offsets for all subscribed partitions.
"""
- assert self.config['api_version'] >= (0, 8, 1)
+ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
+ assert self.config['group_id'] is not None, 'Requires group_id'
if offsets is None:
offsets = self._subscription.all_consumed_offsets()
self._coordinator.commit_offsets_sync(offsets)
@@ -330,7 +334,8 @@ class KafkaConsumer(six.Iterator):
Returns:
The last committed offset, or None if there was no prior commit.
"""
- assert self.config['api_version'] >= (0, 8, 1)
+ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
+ assert self.config['group_id'] is not None, 'Requires group_id'
if self._subscription.is_assigned(partition):
committed = self._subscription.assignment[partition].committed
if committed is None:
@@ -418,14 +423,14 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: map of topic to list of records (may be empty)
"""
- if self.config['api_version'] >= (0, 8, 2):
- # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
- self._coordinator.ensure_coordinator_known()
+ if self.config['group_id'] is not None:
+ if self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
- if self.config['api_version'] >= (0, 9):
- # ensure we have partitions assigned if we expect to
- if self._subscription.partitions_auto_assigned():
- self._coordinator.ensure_active_group()
+ if self.config['api_version'] >= (0, 9):
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -603,7 +608,9 @@ class KafkaConsumer(six.Iterator):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined
"""
- if self.config['api_version'] >= (0, 8, 1):
+ if (self.config['api_version'] >= (0, 8, 1)
+ and self.config['group_id'] is not None):
+
# refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()
@@ -613,13 +620,14 @@ class KafkaConsumer(six.Iterator):
def _message_generator(self):
assert self.assignment() or self.subscription() is not None
while time.time() < self._consumer_timeout:
- if self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ if self.config['group_id'] is not None:
+ if self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
- if self.config['api_version'] >= (0, 9):
- # ensure we have partitions assigned if we expect to
- if self._subscription.partitions_auto_assigned():
- self._coordinator.ensure_active_group()
+ if self.config['api_version'] >= (0, 9):
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -634,7 +642,7 @@ class KafkaConsumer(six.Iterator):
self._client.cluster.ttl() / 1000.0 + time.time())
if self.config['api_version'] >= (0, 9):
- if not self.assignment():
+ if self.config['group_id'] is not None and not self.assignment():
sleep_time = time.time() - timeout_at
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 7390ab3..263dac0 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -75,18 +75,24 @@ class ConsumerCoordinator(BaseCoordinator):
if key in configs:
self.config[key] = configs[key]
- self._cluster = client.cluster
+ if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None:
+ assert self.config['assignors'], 'Coordinator requires assignors'
+
self._subscription = subscription
self._partitions_per_topic = {}
- self._auto_commit_task = None
- if self.config['api_version'] >= (0, 9):
- assert self.config['assignors'], 'Coordinator require assignors'
-
+ self._cluster = client.cluster
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
- if self.config['api_version'] >= (0, 8, 1):
- if self.config['enable_auto_commit']:
+ self._auto_commit_task = None
+ if self.config['enable_auto_commit']:
+ if self.config['api_version'] < (0, 8, 1):
+ log.warning('Broker version (%s) does not support offset'
+ ' commits; disabling auto-commit.',
+ self.config['api_version'])
+ elif self.config['group_id'] is None:
+ log.warning('group_id is None: disabling auto-commit.')
+ else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(self, interval)
@@ -127,7 +133,10 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger
# a rebalance
if self._subscription_metadata_changed():
- if self.config['api_version'] >= (0, 9):
+
+ if (self.config['api_version'] >= (0, 9)
+ and self.config['group_id'] is not None):
+
self._subscription.mark_for_reassignment()
# If we haven't got group coordinator support,