summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py58
1 files changed, 35 insertions, 23 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 344e7e3..5063579 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -35,7 +35,8 @@ class KafkaConsumer(six.Iterator):
Arguments:
*topics (str): optional list of topics to subscribe to. If not set,
- call :meth:`.subscribe` or :meth:`.assign` before consuming records.
+ call :meth:`~kafka.KafkaConsumer.subscribe` or
+ :meth:`~kafka.KafkaConsumer.assign` before consuming records.
Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
@@ -127,7 +128,7 @@ class KafkaConsumer(six.Iterator):
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group management facilities. Default: 30000
max_poll_records (int): The maximum number of records returned in a
- single call to :meth:`.poll`. Default: 500
+ single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
@@ -172,6 +173,7 @@ class KafkaConsumer(six.Iterator):
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will attempt to infer the broker version by probing
various APIs. Different versions enable different functionality.
+
Examples:
(0, 9) enables full group coordination features with automatic
partition assignment and rebalancing,
@@ -181,6 +183,7 @@ class KafkaConsumer(six.Iterator):
partition assignment only,
(0, 8, 0) enables basic functionality but requires manual
partition assignment and offset management.
+
For the full list of supported versions, see
KafkaClient.API_VERSIONS. Default: None
api_version_auto_timeout_ms (int): number of milliseconds to throw a
@@ -336,11 +339,13 @@ class KafkaConsumer(six.Iterator):
partitions (list of TopicPartition): Assignment for this instance.
Raises:
- IllegalStateError: If consumer has already called :meth:`.subscribe`.
+ IllegalStateError: If consumer has already called
+ :meth:`~kafka.KafkaConsumer.subscribe`.
Warning:
It is not possible to use both manual partition assignment with
- :meth:`.assign` and group assignment with :meth:`.subscribe`.
+ :meth:`~kafka.KafkaConsumer.assign` and group assignment with
+ :meth:`~kafka.KafkaConsumer.subscribe`.
Note:
This interface does not support incremental assignment and will
@@ -358,12 +363,13 @@ class KafkaConsumer(six.Iterator):
def assignment(self):
"""Get the TopicPartitions currently assigned to this consumer.
- If partitions were directly assigned using :meth:`.assign`, then this
- will simply return the same partitions that were previously assigned.
- If topics were subscribed using :meth:`.subscribe`, then this will give
- the set of topic partitions currently assigned to the consumer (which
- may be None if the assignment hasn't happened yet, or if the partitions
- are in the process of being reassigned).
+ If partitions were directly assigned using
+ :meth:`~kafka.KafkaConsumer.assign`, then this will simply return the
+ same partitions that were previously assigned. If topics were
+ subscribed using :meth:`~kafka.KafkaConsumer.subscribe`, then this will
+ give the set of topic partitions currently assigned to the consumer
+ (which may be None if the assignment hasn't happened yet, or if the
+ partitions are in the process of being reassigned).
Returns:
set: {TopicPartition, ...}
@@ -527,8 +533,8 @@ class KafkaConsumer(six.Iterator):
with any records that are available currently in the buffer,
else returns empty. Must not be negative. Default: 0
max_records (int, optional): The maximum number of records returned
- in a single call to :meth:`.poll`. Default: Inherit value from
- max_poll_records.
+ in a single call to :meth:`~kafka.KafkaConsumer.poll`.
+ Default: Inherit value from max_poll_records.
Returns:
dict: Topic to list of records since the last fetch for the
@@ -639,10 +645,12 @@ class KafkaConsumer(six.Iterator):
def pause(self, *partitions):
"""Suspend fetching from the requested partitions.
- Future calls to :meth:`.poll` will not return any records from these
- partitions until they have been resumed using :meth:`.resume`. Note that
- this method does not affect partition subscription. In particular, it
- does not cause a group rebalance when automatic assignment is used.
+ Future calls to :meth:`~kafka.KafkaConsumer.poll` will not return any
+ records from these partitions until they have been resumed using
+ :meth:`~kafka.KafkaConsumer.resume`.
+
+ Note: This method does not affect partition subscription. In particular,
+ it does not cause a group rebalance when automatic assignment is used.
Arguments:
*partitions (TopicPartition): Partitions to pause.
@@ -654,7 +662,8 @@ class KafkaConsumer(six.Iterator):
self._subscription.pause(partition)
def paused(self):
- """Get the partitions that were previously paused using :meth:`.pause`.
+ """Get the partitions that were previously paused using
+ :meth:`~kafka.KafkaConsumer.pause`.
Returns:
set: {partition (TopicPartition), ...}
@@ -677,10 +686,12 @@ class KafkaConsumer(six.Iterator):
"""Manually specify the fetch offset for a TopicPartition.
Overrides the fetch offsets that the consumer will use on the next
- :meth:`.poll`. If this API is invoked for the same partition more than
- once, the latest offset will be used on the next :meth:`.poll`. Note
- that you may lose data if this API is arbitrarily used in the middle of
- consumption, to reset the fetch offsets.
+ :meth:`~kafka.KafkaConsumer.poll`. If this API is invoked for the same
+ partition more than once, the latest offset will be used on the next
+ :meth:`~kafka.KafkaConsumer.poll`.
+
+ Note: You may lose data if this API is arbitrarily used in the middle of
+ consumption to reset the fetch offsets.
Arguments:
partition (TopicPartition): Partition for seek operation
@@ -752,7 +763,7 @@ class KafkaConsumer(six.Iterator):
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
- This method is incompatible with :meth:`.assign`.
+ This method is incompatible with :meth:`~kafka.KafkaConsumer.assign`.
Arguments:
topics (list): List of topics for subscription.
@@ -781,7 +792,8 @@ class KafkaConsumer(six.Iterator):
through this interface are from topics subscribed in this call.
Raises:
- IllegalStateError: If called after previously calling :meth:`.assign`.
+ IllegalStateError: If called after previously calling
+ :meth:`~kafka.KafkaConsumer.assign`.
AssertionError: If neither topics or pattern is provided.
TypeError: If listener is not a ConsumerRebalanceListener.
"""