summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-07-30 15:42:27 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:08 +0000
commit39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch)
tree2b94ed93bec5ae4f072360c5072cc22b0685f8a1
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
downloadkafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
-rw-r--r--kafka/conn.py1
-rw-r--r--kafka/consumer/fetcher.py94
-rw-r--r--kafka/consumer/group.py42
-rw-r--r--kafka/protocol/offset.py4
-rw-r--r--kafka/structs.py3
-rw-r--r--test/test_consumer_integration.py46
6 files changed, 169 insertions, 21 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index ac8bb3d..d042300 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -19,6 +19,7 @@ from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
+from kafka.protocol.fetch import FetchRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8db89a1..cb80a6f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -14,9 +14,11 @@ from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
-from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.protocol.offset import (
+ OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
+)
from kafka.serializer import Deserializer
-from kafka.structs import TopicPartition
+from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
@@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
+ 'retry_backoff_ms': 100
}
def __init__(self, client, subscriptions, metrics, **configs):
@@ -180,6 +183,14 @@ class Fetcher(six.Iterator):
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)
+ def get_offsets_by_times(self, timestamps, timeout_ms):
+ response = {}
+ for tp, timestamp in timestamps.items():
+ timestamp = int(timestamp)
+ offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms)
+ response[tp] = OffsetAndTimestamp(offset, tmst)
+ return response
+
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -199,14 +210,14 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
- offset = self._offset(partition, timestamp)
+ offset, _ = self._offset(partition, timestamp)
# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
- def _offset(self, partition, timestamp):
+ def _offset(self, partition, timestamp, timeout_ms=None):
"""Fetch a single offset before the given timestamp for the partition.
Blocks until offset is obtained, or a non-retriable exception is raised
@@ -218,21 +229,37 @@ class Fetcher(six.Iterator):
is treated as epoch seconds.
Returns:
- int: message offset
+ (int, int): message offset and timestamp. None if not available
"""
+ start_time = time.time()
+ remaining_ms = timeout_ms
while True:
future = self._send_offset_request(partition, timestamp)
- self._client.poll(future=future)
+ self._client.poll(future=future, timeout_ms=remaining_ms)
if future.succeeded():
return future.value
-
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
+ if timeout_ms is not None:
+ remaining_ms = timeout_ms - (time.time() - start_time) * 1000
+ if remaining_ms < 0:
+ break
+
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
- self._client.poll(future=refresh_future, sleep=True)
+ self._client.poll(
+ future=refresh_future, sleep=True, timeout_ms=remaining_ms)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+
+ if timeout_ms is not None:
+ remaining_ms = timeout_ms - (time.time() - start_time) * 1000
+
+ # Will only happen when timeout_ms != None
+ raise Errors.KafkaTimeoutError(
+ "Failed to get offsets by times in %s ms" % timeout_ms)
def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
@@ -596,9 +623,15 @@ class Fetcher(six.Iterator):
" wait for metadata refresh", partition)
return Future().failure(Errors.LeaderNotAvailableError(partition))
- request = OffsetRequest[0](
- -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
- )
+ if self.config['api_version'] >= (0, 10, 1):
+ request = OffsetRequest[1](
+ -1, [(partition.topic, [(partition.partition, timestamp)])]
+ )
+ else:
+ request = OffsetRequest[0](
+ -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
+ )
+
# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
@@ -623,22 +656,47 @@ class Fetcher(six.Iterator):
assert len(response.topics) == 1 and len(partition_info) == 1, (
'OffsetResponse should only be for a single topic-partition')
- part, error_code, offsets = partition_info[0]
+ partition_info = partition_info[0]
+ part, error_code = partition_info[:2]
+
assert topic == partition.topic and part == partition.partition, (
'OffsetResponse partition does not match OffsetRequest partition')
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
- offset = offsets[0]
- log.debug("Fetched offset %d for partition %s", offset, partition)
- future.success(offset)
- elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
+ if response.API_VERSION == 0:
+ offsets = partition_info[2]
+ assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
+ offset = offsets[0]
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ future.success((offset, None))
+ else:
+ timestamp, offset = partition_info[2:]
+ log.debug("Handling ListOffsetResponse response for %s. "
+ "Fetched offset %s, timestamp %s",
+ partition, offset, timestamp)
+ if offset != UNKNOWN_OFFSET:
+ future.success((offset, timestamp))
+ else:
+ future.success((None, None))
+ elif error_type is Errors.UnsupportedForMessageFormatError:
+ # The message format on the broker side is before 0.10.0, we simply
+ # put None in the response.
+ log.debug("Cannot search by timestamp for partition %s because the"
+ " message format version is before 0.10.0", partition)
+ future.success((None, None))
+ elif error_type is Errors.NotLeaderForPartitionError:
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
+ elif error_type is Errors.UnknownTopicOrPartitionError:
+ log.warn("Received unknown topic or partition error in ListOffset "
+ "request for partition %s. The topic/partition " +
+ "may not exist or the user may not have Describe access "
+ "to it.", partition)
+ future.failure(error_type(partition))
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6adb154..f9b8f16 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -6,7 +6,7 @@ import socket
import sys
import time
-from kafka.errors import KafkaConfigurationError
+from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
from kafka.vendor import six
@@ -861,6 +861,46 @@ class KafkaConsumer(six.Iterator):
metrics[k.group][k.name] = v.value()
return metrics
+ def offsets_for_times(self, timestamps):
+ """
+ Look up the offsets for the given partitions by timestamp. The returned
+ offset for each partition is the earliest offset whose timestamp is
+ greater than or equal to the given timestamp in the corresponding
+ partition.
+
+ This is a blocking call. The consumer does not have to be assigned the
+ partitions.
+
+ If the message format version in a partition is before 0.10.0, i.e.
+ the messages do not have timestamps, ``None`` will be returned for that
+ partition.
+
+ Note:
+ Notice that this method may block indefinitely if the partition
+ does not exist.
+
+ Arguments:
+ timestamps (dict): ``{TopicPartition: int}`` mapping from partition
+ to the timestamp to look up.
+
+ Raises:
+ ValueError: if the target timestamp is negative
+ UnsupportedVersionError: if the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: if fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ for tp, ts in timestamps.items():
+ if ts < 0:
+ raise ValueError(
+ "The target time for partition {} is {}. The target time "
+ "cannot be negative.".format(tp, ts))
+ return self._fetcher.get_offsets_by_times(
+ timestamps, self.config['request_timeout_ms'])
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 8353f8c..5179658 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -3,6 +3,8 @@ from __future__ import absolute_import
from .api import Request, Response
from .types import Array, Int8, Int16, Int32, Int64, Schema, String
+UNKNOWN_OFFSET = -1
+
class OffsetResetStrategy(object):
LATEST = -1
@@ -91,7 +93,7 @@ class OffsetRequest_v2(Request):
RESPONSE_TYPE = OffsetResponse_v2
SCHEMA = Schema(
('replica_id', Int32),
- ('isolation_level', Int8),
+ ('isolation_level', Int8), # <- added isolation_level
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
diff --git a/kafka/structs.py b/kafka/structs.py
index 48321e7..62f36dd 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -74,6 +74,9 @@ PartitionMetadata = namedtuple("PartitionMetadata",
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
["offset", "metadata"])
+OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
+ ["offset", "timestamp"])
+
# Deprecated structs
OffsetAndMessage = namedtuple("OffsetAndMessage",
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 193a570..218ed2c 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,12 +1,14 @@
import logging
import os
+import time
from six.moves import xrange
import six
from . import unittest
from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message
+ KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
+ create_gzip_message, KafkaProducer
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
@@ -88,6 +90,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
**configs)
return consumer
+ def kafka_producer(self, **configs):
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ producer = KafkaProducer(
+ bootstrap_servers=brokers, **configs)
+ return producer
+
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -624,3 +632,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
fetched_msgs = [next(consumer) for i in range(10)]
self.assertEqual(len(fetched_msgs), 10)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_for_time(self):
+ late_time = int(time.time())
+ middle_time = late_time - 1
+ early_time = late_time - 2
+ tp = TopicPartition(self.topic, 0)
+
+ kafka_producer = self.kafka_producer()
+ early_msg = kafka_producer.send(
+ self.topic, partition=0, value=b"first",
+ timestamp_ms=early_time).get()
+ late_msg = kafka_producer.send(
+ self.topic, partition=0, value=b"last",
+ timestamp_ms=late_time).get()
+
+ consumer = self.kafka_consumer()
+ offsets = consumer.offsets_for_times({tp: early_time})
+ self.assertEqual(offsets[tp].offset, early_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, early_time)
+
+ offsets = consumer.offsets_for_times({tp: middle_time})
+ self.assertEqual(offsets[tp].offset, late_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, late_time)
+
+ offsets = consumer.offsets_for_times({tp: late_time})
+ self.assertEqual(offsets[tp].offset, late_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, late_time)
+
+ @kafka_versions('<0.10.1')
+ def test_kafka_consumer_offsets_for_time_old(self):
+ consumer = self.kafka_consumer()
+ tp = TopicPartition(self.topic, 0)
+
+ with self.assertRaises():
+ consumer.offsets_for_times({tp: int(time.time())})