diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 344e7e3..5063579 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -35,7 +35,8 @@ class KafkaConsumer(six.Iterator): Arguments: *topics (str): optional list of topics to subscribe to. If not set, - call :meth:`.subscribe` or :meth:`.assign` before consuming records. + call :meth:`~kafka.KafkaConsumer.subscribe` or + :meth:`~kafka.KafkaConsumer.assign` before consuming records. Keyword Arguments: bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' @@ -127,7 +128,7 @@ class KafkaConsumer(six.Iterator): session_timeout_ms (int): The timeout used to detect failures when using Kafka's group management facilities. Default: 30000 max_poll_records (int): The maximum number of records returned in a - single call to :meth:`.poll`. Default: 500 + single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500 receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -172,6 +173,7 @@ class KafkaConsumer(six.Iterator): api_version (tuple): Specify which Kafka API version to use. If set to None, the client will attempt to infer the broker version by probing various APIs. Different versions enable different functionality. + Examples: (0, 9) enables full group coordination features with automatic partition assignment and rebalancing, @@ -181,6 +183,7 @@ class KafkaConsumer(six.Iterator): partition assignment only, (0, 8, 0) enables basic functionality but requires manual partition assignment and offset management. + For the full list of supported versions, see KafkaClient.API_VERSIONS. Default: None api_version_auto_timeout_ms (int): number of milliseconds to throw a @@ -336,11 +339,13 @@ class KafkaConsumer(six.Iterator): partitions (list of TopicPartition): Assignment for this instance. Raises: - IllegalStateError: If consumer has already called :meth:`.subscribe`. + IllegalStateError: If consumer has already called + :meth:`~kafka.KafkaConsumer.subscribe`. Warning: It is not possible to use both manual partition assignment with - :meth:`.assign` and group assignment with :meth:`.subscribe`. + :meth:`~kafka.KafkaConsumer.assign` and group assignment with + :meth:`~kafka.KafkaConsumer.subscribe`. Note: This interface does not support incremental assignment and will @@ -358,12 +363,13 @@ class KafkaConsumer(six.Iterator): def assignment(self): """Get the TopicPartitions currently assigned to this consumer. - If partitions were directly assigned using :meth:`.assign`, then this - will simply return the same partitions that were previously assigned. - If topics were subscribed using :meth:`.subscribe`, then this will give - the set of topic partitions currently assigned to the consumer (which - may be None if the assignment hasn't happened yet, or if the partitions - are in the process of being reassigned). + If partitions were directly assigned using + :meth:`~kafka.KafkaConsumer.assign`, then this will simply return the + same partitions that were previously assigned. If topics were + subscribed using :meth:`~kafka.KafkaConsumer.subscribe`, then this will + give the set of topic partitions currently assigned to the consumer + (which may be None if the assignment hasn't happened yet, or if the + partitions are in the process of being reassigned). Returns: set: {TopicPartition, ...} @@ -527,8 +533,8 @@ class KafkaConsumer(six.Iterator): with any records that are available currently in the buffer, else returns empty. Must not be negative. Default: 0 max_records (int, optional): The maximum number of records returned - in a single call to :meth:`.poll`. Default: Inherit value from - max_poll_records. + in a single call to :meth:`~kafka.KafkaConsumer.poll`. + Default: Inherit value from max_poll_records. Returns: dict: Topic to list of records since the last fetch for the @@ -639,10 +645,12 @@ class KafkaConsumer(six.Iterator): def pause(self, *partitions): """Suspend fetching from the requested partitions. - Future calls to :meth:`.poll` will not return any records from these - partitions until they have been resumed using :meth:`.resume`. Note that - this method does not affect partition subscription. In particular, it - does not cause a group rebalance when automatic assignment is used. + Future calls to :meth:`~kafka.KafkaConsumer.poll` will not return any + records from these partitions until they have been resumed using + :meth:`~kafka.KafkaConsumer.resume`. + + Note: This method does not affect partition subscription. In particular, + it does not cause a group rebalance when automatic assignment is used. Arguments: *partitions (TopicPartition): Partitions to pause. @@ -654,7 +662,8 @@ class KafkaConsumer(six.Iterator): self._subscription.pause(partition) def paused(self): - """Get the partitions that were previously paused using :meth:`.pause`. + """Get the partitions that were previously paused using + :meth:`~kafka.KafkaConsumer.pause`. Returns: set: {partition (TopicPartition), ...} @@ -677,10 +686,12 @@ class KafkaConsumer(six.Iterator): """Manually specify the fetch offset for a TopicPartition. Overrides the fetch offsets that the consumer will use on the next - :meth:`.poll`. If this API is invoked for the same partition more than - once, the latest offset will be used on the next :meth:`.poll`. Note - that you may lose data if this API is arbitrarily used in the middle of - consumption, to reset the fetch offsets. + :meth:`~kafka.KafkaConsumer.poll`. If this API is invoked for the same + partition more than once, the latest offset will be used on the next + :meth:`~kafka.KafkaConsumer.poll`. + + Note: You may lose data if this API is arbitrarily used in the middle of + consumption to reset the fetch offsets. Arguments: partition (TopicPartition): Partition for seek operation @@ -752,7 +763,7 @@ class KafkaConsumer(six.Iterator): Topic subscriptions are not incremental: this list will replace the current assignment (if there is one). - This method is incompatible with :meth:`.assign`. + This method is incompatible with :meth:`~kafka.KafkaConsumer.assign`. Arguments: topics (list): List of topics for subscription. @@ -781,7 +792,8 @@ class KafkaConsumer(six.Iterator): through this interface are from topics subscribed in this call. Raises: - IllegalStateError: If called after previously calling :meth:`.assign`. + IllegalStateError: If called after previously calling + :meth:`~kafka.KafkaConsumer.assign`. AssertionError: If neither topics or pattern is provided. TypeError: If listener is not a ConsumerRebalanceListener. """ |