diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:00:16 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:00:16 -0800 |
commit | fae1a227b1eb67fda2264d81c36cdbe39b49e057 (patch) | |
tree | 79c4c97cafc92f5ab4936269c101ea04e36b2553 /kafka | |
parent | 30fefa9b4f6922b97536b5641ec696dcc8257601 (diff) | |
download | kafka-python-fae1a227b1eb67fda2264d81c36cdbe39b49e057.tar.gz |
Add api_version config to KafkaConsumer; disable features inline by version
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/group.py | 47 |
1 files changed, 36 insertions, 11 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b8b5bde..a9a4ac0 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -42,6 +42,7 @@ class KafkaConsumer(six.Iterator): 'session_timeout_ms': 30000, 'send_buffer_bytes': 128 * 1024, 'receive_buffer_bytes': 32 * 1024, + 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet #'metric_reporters': None, #'metrics_num_samples': 2, @@ -144,6 +145,12 @@ class KafkaConsumer(six.Iterator): (SO_SNDBUF) to use when sending data. Default: 131072 receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: 32768 + api_version (str): specify which kafka API version to use. + 0.9 enables full group coordination features; 0.8.2 enables + kafka-storage offset commits; 0.8.1 enables zookeeper-storage + offset commits; 0.8.0 is what is left. If set to 'auto', will + attempt to infer the broker version by probing various APIs. + Default: auto Configuration parameters are described in more detail at https://kafka.apache.org/090/configuration.html#newconsumerconfigs @@ -157,6 +164,16 @@ class KafkaConsumer(six.Iterator): assert not configs, 'Unrecognized configs: %s' % configs self._client = KafkaClient(**self.config) + + # Check Broker Version if not set explicitly + if self.config['api_version'] == 'auto': + self.config['api_version'] = self._client.check_version() + assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0') + + # Convert api_version config to tuple for easy comparisons + self.config['api_version'] = tuple( + map(int, self.config['api_version'].split('.'))) + self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( self._client, self._subscription, **self.config) @@ -250,6 +267,7 @@ class KafkaConsumer(six.Iterator): Returns: kafka.future.Future """ + assert self.config['api_version'] >= (0, 8, 1) if offsets is None: offsets = self._subscription.all_consumed_offsets() log.debug("Committing offsets: %s", offsets) @@ -275,6 +293,7 @@ class KafkaConsumer(six.Iterator): to commit with the configured group_id. Defaults to current consumed offsets for all subscribed partitions. """ + assert self.config['api_version'] >= (0, 8, 1) if offsets is None: offsets = self._subscription.all_consumed_offsets() self._coordinator.commit_offsets_sync(offsets) @@ -295,6 +314,7 @@ class KafkaConsumer(six.Iterator): Returns: The last committed offset, or None if there was no prior commit. """ + assert self.config['api_version'] >= (0, 8, 1) if self._subscription.is_assigned(partition): committed = self._subscription.assignment[partition].committed if committed is None: @@ -382,12 +402,14 @@ class KafkaConsumer(six.Iterator): Returns: dict: map of topic to list of records (may be empty) """ - # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) - self._coordinator.ensure_coordinator_known() + if self.config['api_version'] >= (0, 8, 2): + # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) + self._coordinator.ensure_coordinator_known() - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() # fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -565,19 +587,22 @@ class KafkaConsumer(six.Iterator): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined """ - # refresh commits for all assigned partitions - self._coordinator.refresh_committed_offsets_if_needed() + if self.config['api_version'] >= (0, 8, 1): + # refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed() # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) def _message_generator(self): while True: - self._coordinator.ensure_coordinator_known() + if self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() # fetch positions if we have partitions we're subscribed to that we # don't know the offset for |