diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-07-30 15:42:27 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:08 +0000 |
commit | 39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch) | |
tree | 2b94ed93bec5ae4f072360c5072cc22b0685f8a1 | |
parent | da25df6d3c6380e27bf638f3620613d05ac9fd03 (diff) | |
download | kafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz |
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
-rw-r--r-- | kafka/conn.py | 1 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 94 | ||||
-rw-r--r-- | kafka/consumer/group.py | 42 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 4 | ||||
-rw-r--r-- | kafka/structs.py | 3 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 46 |
6 files changed, 169 insertions, 21 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index ac8bb3d..d042300 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,6 +19,7 @@ from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.types import Int32 from kafka.version import __version__ diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8db89a1..cb80a6f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -14,9 +14,11 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.protocol.offset import ( + OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET +) from kafka.serializer import Deserializer -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -48,6 +50,7 @@ class Fetcher(six.Iterator): 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), + 'retry_backoff_ms': 100 } def __init__(self, client, subscriptions, metrics, **configs): @@ -180,6 +183,14 @@ class Fetcher(six.Iterator): " offset %s", tp, committed) self._subscriptions.seek(tp, committed) + def get_offsets_by_times(self, timestamps, timeout_ms): + response = {} + for tp, timestamp in timestamps.items(): + timestamp = int(timestamp) + offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms) + response[tp] = OffsetAndTimestamp(offset, tmst) + return response + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -199,14 +210,14 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset = self._offset(partition, timestamp) + offset, _ = self._offset(partition, timestamp) # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp): + def _offset(self, partition, timestamp, timeout_ms=None): """Fetch a single offset before the given timestamp for the partition. Blocks until offset is obtained, or a non-retriable exception is raised @@ -218,21 +229,37 @@ class Fetcher(six.Iterator): is treated as epoch seconds. Returns: - int: message offset + (int, int): message offset and timestamp. None if not available """ + start_time = time.time() + remaining_ms = timeout_ms while True: future = self._send_offset_request(partition, timestamp) - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): return future.value - if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type + if timeout_ms is not None: + remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + if remaining_ms < 0: + break + if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll(future=refresh_future, sleep=True) + self._client.poll( + future=refresh_future, sleep=True, timeout_ms=remaining_ms) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000.0) + + if timeout_ms is not None: + remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + + # Will only happen when timeout_ms != None + raise Errors.KafkaTimeoutError( + "Failed to get offsets by times in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -596,9 +623,15 @@ class Fetcher(six.Iterator): " wait for metadata refresh", partition) return Future().failure(Errors.LeaderNotAvailableError(partition)) - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + if self.config['api_version'] >= (0, 10, 1): + request = OffsetRequest[1]( + -1, [(partition.topic, [(partition.partition, timestamp)])] + ) + else: + request = OffsetRequest[0]( + -1, [(partition.topic, [(partition.partition, timestamp, 1)])] + ) + # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes @@ -623,22 +656,47 @@ class Fetcher(six.Iterator): assert len(response.topics) == 1 and len(partition_info) == 1, ( 'OffsetResponse should only be for a single topic-partition') - part, error_code, offsets = partition_info[0] + partition_info = partition_info[0] + part, error_code = partition_info[:2] + assert topic == partition.topic and part == partition.partition, ( 'OffsetResponse partition does not match OffsetRequest partition') error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Fetched offset %d for partition %s", offset, partition) - future.success(offset) - elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) == 1, 'Expected OffsetResponse with one offset' + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + future.success((offset, None)) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + future.success((offset, timestamp)) + else: + future.success((None, None)) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, we simply + # put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + future.success((None, None)) + elif error_type is Errors.NotLeaderForPartitionError: log.debug("Attempt to fetch offsets for partition %s failed due" " to obsolete leadership information, retrying.", partition) future.failure(error_type(partition)) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6adb154..f9b8f16 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,7 +6,7 @@ import socket import sys import time -from kafka.errors import KafkaConfigurationError +from kafka.errors import KafkaConfigurationError, UnsupportedVersionError from kafka.vendor import six @@ -861,6 +861,46 @@ class KafkaConsumer(six.Iterator): metrics[k.group][k.name] = v.value() return metrics + def offsets_for_times(self, timestamps): + """ + Look up the offsets for the given partitions by timestamp. The returned + offset for each partition is the earliest offset whose timestamp is + greater than or equal to the given timestamp in the corresponding + partition. + + This is a blocking call. The consumer does not have to be assigned the + partitions. + + If the message format version in a partition is before 0.10.0, i.e. + the messages do not have timestamps, ``None`` will be returned for that + partition. + + Note: + Notice that this method may block indefinitely if the partition + does not exist. + + Arguments: + timestamps (dict): ``{TopicPartition: int}`` mapping from partition + to the timestamp to look up. + + Raises: + ValueError: if the target timestamp is negative + UnsupportedVersionError: if the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: if fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + for tp, ts in timestamps.items(): + if ts < 0: + raise ValueError( + "The target time for partition {} is {}. The target time " + "cannot be negative.".format(tp, ts)) + return self._fetcher.get_offsets_by_times( + timestamps, self.config['request_timeout_ms']) + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 8353f8c..5179658 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -3,6 +3,8 @@ from __future__ import absolute_import from .api import Request, Response from .types import Array, Int8, Int16, Int32, Int64, Schema, String +UNKNOWN_OFFSET = -1 + class OffsetResetStrategy(object): LATEST = -1 @@ -91,7 +93,7 @@ class OffsetRequest_v2(Request): RESPONSE_TYPE = OffsetResponse_v2 SCHEMA = Schema( ('replica_id', Int32), - ('isolation_level', Int8), + ('isolation_level', Int8), # <- added isolation_level ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( diff --git a/kafka/structs.py b/kafka/structs.py index 48321e7..62f36dd 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -74,6 +74,9 @@ PartitionMetadata = namedtuple("PartitionMetadata", OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"]) +OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", + ["offset", "timestamp"]) + # Deprecated structs OffsetAndMessage = namedtuple("OffsetAndMessage", diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 193a570..218ed2c 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,12 +1,14 @@ import logging import os +import time from six.moves import xrange import six from . import unittest from kafka import ( - KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message + KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, + create_gzip_message, KafkaProducer ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError @@ -88,6 +90,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): **configs) return consumer + def kafka_producer(self, **configs): + brokers = '%s:%d' % (self.server.host, self.server.port) + producer = KafkaProducer( + bootstrap_servers=brokers, **configs) + return producer + def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -624,3 +632,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): fetched_msgs = [next(consumer) for i in range(10)] self.assertEqual(len(fetched_msgs), 10) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_for_time(self): + late_time = int(time.time()) + middle_time = late_time - 1 + early_time = late_time - 2 + tp = TopicPartition(self.topic, 0) + + kafka_producer = self.kafka_producer() + early_msg = kafka_producer.send( + self.topic, partition=0, value=b"first", + timestamp_ms=early_time).get() + late_msg = kafka_producer.send( + self.topic, partition=0, value=b"last", + timestamp_ms=late_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({tp: early_time}) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: middle_time}) + self.assertEqual(offsets[tp].offset, late_msg.offset) + self.assertEqual(offsets[tp].timestamp, late_time) + + offsets = consumer.offsets_for_times({tp: late_time}) + self.assertEqual(offsets[tp].offset, late_msg.offset) + self.assertEqual(offsets[tp].timestamp, late_time) + + @kafka_versions('<0.10.1') + def test_kafka_consumer_offsets_for_time_old(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + + with self.assertRaises(): + consumer.offsets_for_times({tp: int(time.time())}) |