summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-08-05 17:19:54 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:09 +0000
commit1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch)
treeb7521c2a10dedc958bdc506a3fc5d5ce420e4ba5
parentefc03d083d323e35a2d32bcbdbccc053f737836e (diff)
downloadkafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/consumer/fetcher.py23
-rw-r--r--kafka/consumer/group.py87
-rw-r--r--test/test_consumer_integration.py47
4 files changed, 142 insertions, 17 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d042300..61d63bf 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -887,7 +887,7 @@ class BrokerConnection(object):
def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
- # in descending order. As soon as we find one that works, return it
+ # in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 11, 0), MetadataRequest[4]),
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 1a3dfd5..6a7b794 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -193,6 +193,21 @@ class Fetcher(six.Iterator):
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
return offsets
+ def beginning_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
+
+ def end_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.LATEST, timeout_ms)
+
+ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
+ timestamps = dict([(tp, timestamp) for tp in partitions])
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ offsets[tp] = offsets[tp][0]
+ return offsets
+
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -222,10 +237,10 @@ class Fetcher(six.Iterator):
self._subscriptions.seek(partition, offset)
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
- """ Fetch offset for each partition passed in ``timestamps`` map.
+ """Fetch offset for each partition passed in ``timestamps`` map.
Blocks until offsets are obtained, a non-retriable exception is raised
- or ``timeout_ms`` passed (if it's not ``None``).
+ or ``timeout_ms`` passed.
Arguments:
timestamps: {TopicPartition: int} dict with timestamps to fetch
@@ -268,7 +283,7 @@ class Fetcher(six.Iterator):
remaining_ms = timeout_ms - elapsed_ms
raise Errors.KafkaTimeoutError(
- "Failed to get offsets by times in %s ms" % timeout_ms)
+ "Failed to get offsets by timestamps in %s ms" % timeout_ms)
def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
@@ -613,7 +628,7 @@ class Fetcher(six.Iterator):
return f(bytes_)
def _send_offset_requests(self, timestamps):
- """ Fetch offsets for each partition in timestamps dict. This may send
+ """Fetch offsets for each partition in timestamps dict. This may send
request to multiple nodes, based on who is Leader for partition.
Arguments:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 48a88b2..54a3711 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -862,33 +862,37 @@ class KafkaConsumer(six.Iterator):
return metrics
def offsets_for_times(self, timestamps):
- """
- Look up the offsets for the given partitions by timestamp. The returned
- offset for each partition is the earliest offset whose timestamp is
- greater than or equal to the given timestamp in the corresponding
- partition.
+ """Look up the offsets for the given partitions by timestamp. The
+ returned offset for each partition is the earliest offset whose
+ timestamp is greater than or equal to the given timestamp in the
+ corresponding partition.
This is a blocking call. The consumer does not have to be assigned the
partitions.
If the message format version in a partition is before 0.10.0, i.e.
the messages do not have timestamps, ``None`` will be returned for that
- partition.
+ partition. ``None`` will also be returned for the partition if there
+ are no messages in it.
Note:
- Notice that this method may block indefinitely if the partition
- does not exist.
+ This method may block indefinitely if the partition does not exist.
Arguments:
timestamps (dict): ``{TopicPartition: int}`` mapping from partition
to the timestamp to look up. Unit should be milliseconds since
beginning of the epoch (midnight Jan 1, 1970 (UTC))
+ Returns:
+ ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition
+ to the timestamp and offset of the first message with timestamp
+ greater than or equal to the target timestamp.
+
Raises:
- ValueError: if the target timestamp is negative
- UnsupportedVersionError: if the broker does not support looking
+ ValueError: If the target timestamp is negative
+ UnsupportedVersionError: If the broker does not support looking
up the offsets by timestamp.
- KafkaTimeoutError: if fetch failed in request_timeout_ms
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
"""
if self.config['api_version'] <= (0, 10, 0):
raise UnsupportedVersionError(
@@ -903,6 +907,67 @@ class KafkaConsumer(six.Iterator):
return self._fetcher.get_offsets_by_times(
timestamps, self.config['request_timeout_ms'])
+ def beginning_offsets(self, partitions):
+ """Get the first offset for the given partitions.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The earliest available offsets for the
+ given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms.
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.beginning_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
+ def end_offsets(self, partitions):
+ """Get the last offset for the given partitions. The last offset of a
+ partition is the offset of the upcoming message, i.e. the offset of the
+ last available message + 1.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The end offsets for the given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.end_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index eab93be..803b16a 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -12,7 +12,8 @@ from kafka import (
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
- ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError
+ ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
+ KafkaTimeoutError
)
from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -666,6 +667,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(offsets[tp].offset, late_msg.offset)
self.assertEqual(offsets[tp].timestamp, late_time)
+ offsets = consumer.offsets_for_times({})
+ self.assertEqual(offsets, {})
+
# Out of bound timestamps check
offsets = consumer.offsets_for_times({tp: 0})
@@ -675,6 +679,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
offsets = consumer.offsets_for_times({tp: 9999999999999})
self.assertEqual(offsets[tp], None)
+ # Beginning/End offsets
+
+ offsets = consumer.beginning_offsets([tp])
+ self.assertEqual(offsets, {
+ tp: early_msg.offset,
+ })
+ offsets = consumer.end_offsets([tp])
+ self.assertEqual(offsets, {
+ tp: late_msg.offset + 1
+ })
+
@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_search_many_partitions(self):
tp0 = TopicPartition(self.topic, 0)
@@ -700,6 +715,18 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
tp1: OffsetAndTimestamp(p1msg.offset, send_time)
})
+ offsets = consumer.beginning_offsets([tp0, tp1])
+ self.assertEqual(offsets, {
+ tp0: p0msg.offset,
+ tp1: p1msg.offset
+ })
+
+ offsets = consumer.end_offsets([tp0, tp1])
+ self.assertEqual(offsets, {
+ tp0: p0msg.offset + 1,
+ tp1: p1msg.offset + 1
+ })
+
@kafka_versions('<0.10.1')
def test_kafka_consumer_offsets_for_time_old(self):
consumer = self.kafka_consumer()
@@ -707,3 +734,21 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(UnsupportedVersionError):
consumer.offsets_for_times({tp: int(time.time())})
+
+ with self.assertRaises(UnsupportedVersionError):
+ consumer.beginning_offsets([tp])
+
+ with self.assertRaises(UnsupportedVersionError):
+ consumer.end_offsets([tp])
+
+ @kafka_versions('<0.10.1')
+ def test_kafka_consumer_offsets_for_times_errors(self):
+ consumer = self.kafka_consumer()
+ tp = TopicPartition(self.topic, 0)
+ bad_tp = TopicPartition(self.topic, 100)
+
+ with self.assertRaises(ValueError):
+ consumer.offsets_for_times({tp: -1})
+
+ with self.assertRaises(KafkaTimeoutError):
+ consumer.offsets_for_times({bad_tp: 0})