diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-05 17:19:54 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:09 +0000 |
commit | 1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch) | |
tree | b7521c2a10dedc958bdc506a3fc5d5ce420e4ba5 | |
parent | efc03d083d323e35a2d32bcbdbccc053f737836e (diff) | |
download | kafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz |
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 23 | ||||
-rw-r--r-- | kafka/consumer/group.py | 87 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 47 |
4 files changed, 142 insertions, 17 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index d042300..61d63bf 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -887,7 +887,7 @@ class BrokerConnection(object): def _infer_broker_version_from_api_versions(self, api_versions): # The logic here is to check the list of supported request versions - # in descending order. As soon as we find one that works, return it + # in reverse order. As soon as we find one that works, return it test_cases = [ # format (<broker verion>, <needed struct>) ((0, 11, 0), MetadataRequest[4]), diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1a3dfd5..6a7b794 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -193,6 +193,21 @@ class Fetcher(six.Iterator): offsets[tp] = OffsetAndTimestamp(offset, timestamp) return offsets + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp][0] + return offsets + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -222,10 +237,10 @@ class Fetcher(six.Iterator): self._subscriptions.seek(partition, offset) def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): - """ Fetch offset for each partition passed in ``timestamps`` map. + """Fetch offset for each partition passed in ``timestamps`` map. Blocks until offsets are obtained, a non-retriable exception is raised - or ``timeout_ms`` passed (if it's not ``None``). + or ``timeout_ms`` passed. Arguments: timestamps: {TopicPartition: int} dict with timestamps to fetch @@ -268,7 +283,7 @@ class Fetcher(six.Iterator): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by times in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -613,7 +628,7 @@ class Fetcher(six.Iterator): return f(bytes_) def _send_offset_requests(self, timestamps): - """ Fetch offsets for each partition in timestamps dict. This may send + """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. Arguments: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 48a88b2..54a3711 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -862,33 +862,37 @@ class KafkaConsumer(six.Iterator): return metrics def offsets_for_times(self, timestamps): - """ - Look up the offsets for the given partitions by timestamp. The returned - offset for each partition is the earliest offset whose timestamp is - greater than or equal to the given timestamp in the corresponding - partition. + """Look up the offsets for the given partitions by timestamp. The + returned offset for each partition is the earliest offset whose + timestamp is greater than or equal to the given timestamp in the + corresponding partition. This is a blocking call. The consumer does not have to be assigned the partitions. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, ``None`` will be returned for that - partition. + partition. ``None`` will also be returned for the partition if there + are no messages in it. Note: - Notice that this method may block indefinitely if the partition - does not exist. + This method may block indefinitely if the partition does not exist. Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)) + Returns: + ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition + to the timestamp and offset of the first message with timestamp + greater than or equal to the target timestamp. + Raises: - ValueError: if the target timestamp is negative - UnsupportedVersionError: if the broker does not support looking + ValueError: If the target timestamp is negative + UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: if fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in request_timeout_ms """ if self.config['api_version'] <= (0, 10, 0): raise UnsupportedVersionError( @@ -903,6 +907,67 @@ class KafkaConsumer(six.Iterator): return self._fetcher.get_offsets_by_times( timestamps, self.config['request_timeout_ms']) + def beginning_offsets(self, partitions): + """Get the first offset for the given partitions. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms. + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.beginning_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + + def end_offsets(self, partitions): + """Get the last offset for the given partitions. The last offset of a + partition is the offset of the upcoming message, i.e. the offset of the + last available message + 1. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The end offsets for the given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.end_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index eab93be..803b16a 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -12,7 +12,8 @@ from kafka import ( ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( - ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError + ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, + KafkaTimeoutError ) from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp @@ -666,6 +667,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(offsets[tp].offset, late_msg.offset) self.assertEqual(offsets[tp].timestamp, late_time) + offsets = consumer.offsets_for_times({}) + self.assertEqual(offsets, {}) + # Out of bound timestamps check offsets = consumer.offsets_for_times({tp: 0}) @@ -675,6 +679,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): offsets = consumer.offsets_for_times({tp: 9999999999999}) self.assertEqual(offsets[tp], None) + # Beginning/End offsets + + offsets = consumer.beginning_offsets([tp]) + self.assertEqual(offsets, { + tp: early_msg.offset, + }) + offsets = consumer.end_offsets([tp]) + self.assertEqual(offsets, { + tp: late_msg.offset + 1 + }) + @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_search_many_partitions(self): tp0 = TopicPartition(self.topic, 0) @@ -700,6 +715,18 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): tp1: OffsetAndTimestamp(p1msg.offset, send_time) }) + offsets = consumer.beginning_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset, + tp1: p1msg.offset + }) + + offsets = consumer.end_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset + 1, + tp1: p1msg.offset + 1 + }) + @kafka_versions('<0.10.1') def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() @@ -707,3 +734,21 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) + + with self.assertRaises(UnsupportedVersionError): + consumer.beginning_offsets([tp]) + + with self.assertRaises(UnsupportedVersionError): + consumer.end_offsets([tp]) + + @kafka_versions('<0.10.1') + def test_kafka_consumer_offsets_for_times_errors(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + bad_tp = TopicPartition(self.topic, 100) + + with self.assertRaises(ValueError): + consumer.offsets_for_times({tp: -1}) + + with self.assertRaises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) |