summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-08-05 17:19:54 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:09 +0000
commit1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch)
treeb7521c2a10dedc958bdc506a3fc5d5ce420e4ba5 /kafka/consumer/group.py
parentefc03d083d323e35a2d32bcbdbccc053f737836e (diff)
downloadkafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py87
1 files changed, 76 insertions, 11 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 48a88b2..54a3711 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -862,33 +862,37 @@ class KafkaConsumer(six.Iterator):
return metrics
def offsets_for_times(self, timestamps):
- """
- Look up the offsets for the given partitions by timestamp. The returned
- offset for each partition is the earliest offset whose timestamp is
- greater than or equal to the given timestamp in the corresponding
- partition.
+ """Look up the offsets for the given partitions by timestamp. The
+ returned offset for each partition is the earliest offset whose
+ timestamp is greater than or equal to the given timestamp in the
+ corresponding partition.
This is a blocking call. The consumer does not have to be assigned the
partitions.
If the message format version in a partition is before 0.10.0, i.e.
the messages do not have timestamps, ``None`` will be returned for that
- partition.
+ partition. ``None`` will also be returned for the partition if there
+ are no messages in it.
Note:
- Notice that this method may block indefinitely if the partition
- does not exist.
+ This method may block indefinitely if the partition does not exist.
Arguments:
timestamps (dict): ``{TopicPartition: int}`` mapping from partition
to the timestamp to look up. Unit should be milliseconds since
beginning of the epoch (midnight Jan 1, 1970 (UTC))
+ Returns:
+ ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition
+ to the timestamp and offset of the first message with timestamp
+ greater than or equal to the target timestamp.
+
Raises:
- ValueError: if the target timestamp is negative
- UnsupportedVersionError: if the broker does not support looking
+ ValueError: If the target timestamp is negative
+ UnsupportedVersionError: If the broker does not support looking
up the offsets by timestamp.
- KafkaTimeoutError: if fetch failed in request_timeout_ms
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
"""
if self.config['api_version'] <= (0, 10, 0):
raise UnsupportedVersionError(
@@ -903,6 +907,67 @@ class KafkaConsumer(six.Iterator):
return self._fetcher.get_offsets_by_times(
timestamps, self.config['request_timeout_ms'])
+ def beginning_offsets(self, partitions):
+ """Get the first offset for the given partitions.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The earliest available offsets for the
+ given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms.
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.beginning_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
+ def end_offsets(self, partitions):
+ """Get the last offset for the given partitions. The last offset of a
+ partition is the offset of the upcoming message, i.e. the offset of the
+ last available message + 1.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The end offsets for the given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.end_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):