summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-11 17:44:17 +0300
committerGitHub <noreply@github.com>2017-10-11 17:44:17 +0300
commitf04435c5ed97fef0975a77a8dc7bae7c284bba63 (patch)
treeda8e24a147f6643952856a92edd6c0bd3a3961e6
parent1df58bf87da1a2c8a2f9e659dfabaed1cff7c0c2 (diff)
parentbc573e3d63a687903a9be2e1b3da2f943a7208e1 (diff)
downloadkafka-python-f04435c5ed97fef0975a77a8dc7bae7c284bba63.tar.gz
Merge pull request #1245 from dpkp/KAFKA_3977_defer_fetch_parsing
KAFKA-3977: defer fetch response parsing and raise exceptions to user
-rw-r--r--kafka/consumer/fetcher.py498
-rw-r--r--test/test_fetcher.py316
2 files changed, 537 insertions, 277 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index f552038..c4fa546 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -28,6 +28,11 @@ ConsumerRecord = collections.namedtuple("ConsumerRecord",
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
+CompletedFetch = collections.namedtuple("CompletedFetch",
+ ["topic_partition", "fetched_offset", "response_version",
+ "partition_data", "metric_aggregator"])
+
+
class NoOffsetForPartitionError(Errors.KafkaError):
pass
@@ -104,18 +109,15 @@ class Fetcher(six.Iterator):
self._client = client
self._subscriptions = subscriptions
- self._records = collections.deque() # (offset, topic_partition, messages)
- self._unauthorized_topics = set()
- self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
- self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._completed_fetches = collections.deque() # Unparsed responses
+ self._next_partition_records = None # Holds a single PartitionRecords until fully consumed
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def send_fetches(self):
- """Send FetchRequests asynchronously for all assigned partitions.
-
- Note: noop if there are unconsumed records internal to the fetcher
+ """Send FetchRequests for all assigned partitions that do not already have
+ an in-flight fetch or pending fetch data.
Returns:
List of Futures: each future resolves to a FetchResponse
@@ -125,7 +127,6 @@ class Fetcher(six.Iterator):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request)
- future.error_on_callbacks=True
future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
@@ -285,67 +286,6 @@ class Fetcher(six.Iterator):
raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
- def _raise_if_offset_out_of_range(self):
- """Check FetchResponses for offset out of range.
-
- Raises:
- OffsetOutOfRangeError: if any partition from previous FetchResponse
- contains OffsetOutOfRangeError and the default_reset_policy is
- None
- """
- if not self._offset_out_of_range_partitions:
- return
-
- current_out_of_range_partitions = {}
-
- # filter only the fetchable partitions
- for partition, offset in six.iteritems(self._offset_out_of_range_partitions):
- if not self._subscriptions.is_fetchable(partition):
- log.debug("Ignoring fetched records for %s since it is no"
- " longer fetchable", partition)
- continue
- position = self._subscriptions.assignment[partition].position
- # ignore partition if the current position != offset in FetchResponse
- # e.g. after seek()
- if position is not None and offset == position:
- current_out_of_range_partitions[partition] = position
-
- self._offset_out_of_range_partitions.clear()
- if current_out_of_range_partitions:
- raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
-
- def _raise_if_unauthorized_topics(self):
- """Check FetchResponses for topic authorization failures.
-
- Raises:
- TopicAuthorizationFailedError
- """
- if self._unauthorized_topics:
- topics = set(self._unauthorized_topics)
- self._unauthorized_topics.clear()
- raise Errors.TopicAuthorizationFailedError(topics)
-
- def _raise_if_record_too_large(self):
- """Check FetchResponses for messages larger than the max per partition.
-
- Raises:
- RecordTooLargeError: if there is a message larger than fetch size
- """
- if not self._record_too_large_partitions:
- return
-
- copied_record_too_large_partitions = dict(self._record_too_large_partitions)
- self._record_too_large_partitions.clear()
-
- raise RecordTooLargeError(
- "There are some messages at [Partition=Offset]: %s "
- " whose size is larger than the fetch size %s"
- " and hence cannot be ever returned."
- " Increase the fetch size, or decrease the maximum message"
- " size the broker will allow.",
- copied_record_too_large_partitions,
- self.config['max_partition_fetch_bytes'])
-
def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
@@ -375,22 +315,25 @@ class Fetcher(six.Iterator):
if self._subscriptions.needs_partition_assignment:
return {}, False
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
-
drained = collections.defaultdict(list)
- partial = bool(self._records and max_records)
- while self._records and max_records > 0:
- part = self._records.popleft()
- max_records -= self._append(drained, part, max_records)
- if part.has_more():
- self._records.appendleft(part)
+ records_remaining = max_records
+
+ while records_remaining > 0:
+ if not self._next_partition_records:
+ if not self._completed_fetches:
+ break
+ completion = self._completed_fetches.popleft()
+ self._next_partition_records = self._parse_fetched_data(completion)
else:
- partial &= False
- return dict(drained), partial
+ records_remaining -= self._append(drained,
+ self._next_partition_records,
+ records_remaining)
+ return dict(drained), bool(self._completed_fetches)
def _append(self, drained, part, max_records):
+ if not part:
+ return 0
+
tp = part.topic_partition
fetch_offset = part.fetch_offset
if not self._subscriptions.is_assigned(tp):
@@ -409,9 +352,8 @@ class Fetcher(six.Iterator):
" %s since it is no longer fetchable", tp)
elif fetch_offset == position:
+ # we are ensured to have at least one record since we already checked for emptiness
part_records = part.take(max_records)
- if not part_records:
- return 0
next_offset = part_records[-1].offset + 1
log.log(0, "Returning fetched records at offset %d for assigned"
@@ -444,93 +386,72 @@ class Fetcher(six.Iterator):
if self._subscriptions.needs_partition_assignment:
raise StopIteration('Subscription needs partition assignment')
- while self._records:
+ while self._next_partition_records or self._completed_fetches:
- # Check on each iteration since this is a generator
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
+ if not self._next_partition_records:
+ completion = self._completed_fetches.popleft()
+ self._next_partition_records = self._parse_fetched_data(completion)
+ continue
# Send additional FetchRequests when the internal queue is low
# this should enable moderate pipelining
- if len(self._records) <= self.config['iterator_refetch_records']:
+ if len(self._completed_fetches) <= self.config['iterator_refetch_records']:
self.send_fetches()
- part = self._records.popleft()
-
- tp = part.topic_partition
- fetch_offset = part.fetch_offset
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
-
- # note that the position should always be available
- # as long as the partition is still assigned
- position = self._subscriptions.assignment[tp].position
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition is paused before
- # fetched records are returned
- log.debug("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
+ tp = self._next_partition_records.topic_partition
- elif fetch_offset == position:
- log.log(0, "Returning fetched records at offset %d for assigned"
- " partition %s", position, tp)
-
- # We can ignore any prior signal to drop pending message sets
- # because we are starting from a fresh one where fetch_offset == position
- # i.e., the user seek()'d to this position
- self._subscriptions.assignment[tp].drop_pending_message_set = False
-
- for msg in part.messages:
-
- # Because we are in a generator, it is possible for
- # subscription state to change between yield calls
- # so we need to re-check on each loop
- # this should catch assignment changes, pauses
- # and resets via seek_to_beginning / seek_to_end
- if not self._subscriptions.is_fetchable(tp):
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer fetchable", tp)
- break
-
- # If there is a seek during message iteration,
- # we should stop unpacking this message set and
- # wait for a new fetch response that aligns with the
- # new seek position
- elif self._subscriptions.assignment[tp].drop_pending_message_set:
- log.debug("Skipping remainder of message set for partition %s", tp)
- self._subscriptions.assignment[tp].drop_pending_message_set = False
- break
-
- # Compressed messagesets may include earlier messages
- elif msg.offset < self._subscriptions.assignment[tp].position:
- log.debug("Skipping message offset: %s (expecting %s)",
- msg.offset,
- self._subscriptions.assignment[tp].position)
- continue
+ # We can ignore any prior signal to drop pending message sets
+ # because we are starting from a fresh one where fetch_offset == position
+ # i.e., the user seek()'d to this position
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
- self._subscriptions.assignment[tp].position = msg.offset + 1
- yield msg
+ for msg in self._next_partition_records.take():
- else:
- # these records aren't next in line based on the last consumed
- # position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s since"
- " the current position is %d", tp, part.fetch_offset,
- position)
+ # Because we are in a generator, it is possible for
+ # subscription state to change between yield calls
+ # so we need to re-check on each loop
+ # this should catch assignment changes, pauses
+ # and resets via seek_to_beginning / seek_to_end
+ if not self._subscriptions.is_fetchable(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ self._next_partition_records = None
+ break
+
+ # If there is a seek during message iteration,
+ # we should stop unpacking this message set and
+ # wait for a new fetch response that aligns with the
+ # new seek position
+ elif self._subscriptions.assignment[tp].drop_pending_message_set:
+ log.debug("Skipping remainder of message set for partition %s", tp)
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+ self._next_partition_records = None
+ break
+
+ # Compressed messagesets may include earlier messages
+ elif msg.offset < self._subscriptions.assignment[tp].position:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ msg.offset,
+ self._subscriptions.assignment[tp].position)
+ continue
+
+ self._subscriptions.assignment[tp].position = msg.offset + 1
+ yield msg
+
+ self._next_partition_records = None
def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
- elif msg.is_compressed():
- # If relative offset is used, we need to decompress the entire message first to compute
- # the absolute offset.
+
+ if not msg.is_compressed():
+ yield self._parse_record(tp, offset, msg.timestamp, msg)
+
+ else:
+ # If relative offset is used, we need to decompress the entire message first
+ # to compute the absolute offset.
inner_mset = msg.decompress()
# There should only ever be a single layer of compression
@@ -569,31 +490,7 @@ class Fetcher(six.Iterator):
if absolute_base_offset >= 0:
inner_offset += absolute_base_offset
-
- key = self._deserialize(
- self.config['key_deserializer'],
- tp.topic, inner_msg.key)
- value = self._deserialize(
- self.config['value_deserializer'],
- tp.topic, inner_msg.value)
- yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
- inner_timestamp, msg.timestamp_type,
- key, value, inner_msg.crc,
- len(inner_msg.key) if inner_msg.key is not None else -1,
- len(inner_msg.value) if inner_msg.value is not None else -1)
-
- else:
- key = self._deserialize(
- self.config['key_deserializer'],
- tp.topic, msg.key)
- value = self._deserialize(
- self.config['value_deserializer'],
- tp.topic, msg.value)
- yield ConsumerRecord(tp.topic, tp.partition, offset,
- msg.timestamp, msg.timestamp_type,
- key, value, msg.crc,
- len(msg.key) if msg.key is not None else -1,
- len(msg.value) if msg.value is not None else -1)
+ yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg)
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
@@ -608,6 +505,15 @@ class Fetcher(six.Iterator):
log.exception('AssertionError raised unpacking messageset: %s', e)
raise
+ def _parse_record(self, tp, offset, timestamp, msg):
+ key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key)
+ value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value)
+ return ConsumerRecord(tp.topic, tp.partition, offset,
+ timestamp, msg.timestamp_type,
+ key, value, msg.crc,
+ len(msg.key) if msg.key is not None else -1,
+ len(msg.value) if msg.value is not None else -1)
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -764,8 +670,11 @@ class Fetcher(six.Iterator):
def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()
- pending = set([part.topic_partition for part in self._records])
- return fetchable.difference(pending)
+ if self._next_partition_records:
+ fetchable.remove(self._next_partition_records.topic_partition)
+ for fetch in self._completed_fetches:
+ fetchable.remove(fetch.topic_partition)
+ return fetchable
def _create_fetch_requests(self):
"""Create fetch requests for all assigned partitions, grouped by node.
@@ -835,93 +744,126 @@ class Fetcher(six.Iterator):
def _handle_fetch_response(self, request, send_time, response):
"""The callback for fetch completion"""
- total_bytes = 0
- total_count = 0
- recv_time = time.time()
-
fetch_offsets = {}
for topic, partitions in request.topics:
- for partition, offset, _ in partitions:
+ for partition_data in partitions:
+ partition, offset = partition_data[:2]
fetch_offsets[TopicPartition(topic, partition)] = offset
+ partitions = set([TopicPartition(topic, partition_data[0])
+ for topic, partitions in response.topics
+ for partition_data in partitions])
+ metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
+
# randomized ordering should improve balance for short-lived consumers
random.shuffle(response.topics)
for topic, partitions in response.topics:
random.shuffle(partitions)
- for partition, error_code, highwater, messages in partitions:
- tp = TopicPartition(topic, partition)
- error_type = Errors.for_code(error_code)
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a rebalance happened or a partition
- # consumption paused while fetch is still in-flight
- log.debug("Ignoring fetched records for partition %s"
- " since it is no longer fetchable", tp)
+ for partition_data in partitions:
+ tp = TopicPartition(topic, partition_data[0])
+ completed_fetch = CompletedFetch(
+ tp, fetch_offsets[tp],
+ response.API_VERSION,
+ partition_data[1:],
+ metric_aggregator
+ )
+ self._completed_fetches.append(completed_fetch)
- elif error_type is Errors.NoError:
- self._subscriptions.assignment[tp].highwater = highwater
-
- # we are interested in this fetch only if the beginning
- # offset (of the *request*) matches the current consumed position
- # Note that the *response* may return a messageset that starts
- # earlier (e.g., compressed messages) or later (e.g., compacted topic)
- fetch_offset = fetch_offsets[tp]
- position = self._subscriptions.assignment[tp].position
- if position is None or position != fetch_offset:
- log.debug("Discarding fetch response for partition %s"
- " since its offset %d does not match the"
- " expected offset %d", tp, fetch_offset,
- position)
- continue
+ if response.API_VERSION >= 1:
+ self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
+ self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
+
+ def _parse_fetched_data(self, completed_fetch):
+ tp = completed_fetch.topic_partition
+ partition = completed_fetch.partition_data
+ fetch_offset = completed_fetch.fetched_offset
+ num_bytes = 0
+ records_count = 0
+ parsed_records = None
- num_bytes = 0
- partial = None
- if messages and isinstance(messages[-1][-1], PartialMessage):
- partial = messages.pop()
-
- if messages:
- log.debug("Adding fetched record for partition %s with"
- " offset %d to buffered record list", tp,
- position)
- unpacked = list(self._unpack_message_set(tp, messages))
- self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
- last_offset, _, _ = messages[-1]
- self._sensors.records_fetch_lag.record(highwater - last_offset)
- num_bytes = sum(msg[1] for msg in messages)
- elif partial:
- # we did not read a single message from a non-empty
- # buffer because that message's size is larger than
- # fetch size, in this case record this exception
- self._record_too_large_partitions[tp] = fetch_offset
-
- self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
- total_bytes += num_bytes
- total_count += len(messages)
- elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
- self._client.cluster.request_update()
- elif error_type is Errors.OffsetOutOfRangeError:
- fetch_offset = fetch_offsets[tp]
+ error_code, highwater = completed_fetch.partition_data[:2]
+ error_type = Errors.for_code(error_code)
+ messages = completed_fetch.partition_data[-1]
+
+ try:
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a rebalance happened or a partition
+ # consumption paused while fetch is still in-flight
+ log.debug("Ignoring fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+
+ elif error_type is Errors.NoError:
+ self._subscriptions.assignment[tp].highwater = highwater
+
+ # we are interested in this fetch only if the beginning
+ # offset (of the *request*) matches the current consumed position
+ # Note that the *response* may return a messageset that starts
+ # earlier (e.g., compressed messages) or later (e.g., compacted topic)
+ position = self._subscriptions.assignment[tp].position
+ if position is None or position != fetch_offset:
+ log.debug("Discarding fetch response for partition %s"
+ " since its offset %d does not match the"
+ " expected offset %d", tp, fetch_offset,
+ position)
+ return None
+
+ partial = None
+ if messages and isinstance(messages[-1][-1], PartialMessage):
+ partial = messages.pop()
+
+ if messages:
+ log.debug("Adding fetched record for partition %s with"
+ " offset %d to buffered record list", tp,
+ position)
+ unpacked = list(self._unpack_message_set(tp, messages))
+ parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
+ last_offset, _, _ = messages[-1]
+ self._sensors.records_fetch_lag.record(highwater - last_offset)
+ num_bytes = sum(msg[1] for msg in messages)
+ records_count = len(messages)
+ elif partial:
+ # we did not read a single message from a non-empty
+ # buffer because that message's size is larger than
+ # fetch size, in this case record this exception
+ record_too_large_partitions = {tp: fetch_offset}
+ raise RecordTooLargeError(
+ "There are some messages at [Partition=Offset]: %s "
+ " whose size is larger than the fetch size %s"
+ " and hence cannot be ever returned."
+ " Increase the fetch size, or decrease the maximum message"
+ " size the broker will allow." % (
+ record_too_large_partitions,
+ self.config['max_partition_fetch_bytes']),
+ record_too_large_partitions)
+ self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count)
+
+ elif error_type in (Errors.NotLeaderForPartitionError,
+ Errors.UnknownTopicOrPartitionError):
+ self._client.cluster.request_update()
+ elif error_type is Errors.OffsetOutOfRangeError:
+ position = self._subscriptions.assignment[tp].position
+ if position is None or position != fetch_offset:
+ log.debug("Discarding stale fetch response for partition %s"
+ " since the fetched offset %d does not match the"
+ " current offset %d", tp, fetch_offset, position)
+ elif self._subscriptions.has_default_offset_reset_policy():
log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp)
- if self._subscriptions.has_default_offset_reset_policy():
- self._subscriptions.need_offset_reset(tp)
- log.info("Resetting offset for topic-partition %s", tp)
- else:
- self._offset_out_of_range_partitions[tp] = fetch_offset
- elif error_type is Errors.TopicAuthorizationFailedError:
- log.warn("Not authorized to read from topic %s.", tp.topic)
- self._unauthorized_topics.add(tp.topic)
- elif error_type is Errors.UnknownError:
- log.warn("Unknown error fetching data for topic-partition %s", tp)
+ self._subscriptions.need_offset_reset(tp)
else:
- raise error_type('Unexpected error while fetching data')
+ raise Errors.OffsetOutOfRangeError({tp: fetch_offset})
- # Because we are currently decompressing messages lazily, the sensors here
- # will get compressed bytes / message set stats when compression is enabled
- self._sensors.bytes_fetched.record(total_bytes)
- self._sensors.records_fetched.record(total_count)
- if response.API_VERSION >= 1:
- self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
- self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
+ elif error_type is Errors.TopicAuthorizationFailedError:
+ log.warn("Not authorized to read from topic %s.", tp.topic)
+ raise Errors.TopicAuthorizationFailedError(set(tp.topic))
+ elif error_type is Errors.UnknownError:
+ log.warn("Unknown error fetching data for topic-partition %s", tp)
+ else:
+ raise error_type('Unexpected error while fetching data')
+
+ finally:
+ completed_fetch.metric_aggregator.record(tp, num_bytes, records_count)
+
+ return parsed_records
class PartitionRecords(object):
def __init__(self, fetch_offset, tp, messages):
@@ -935,21 +877,55 @@ class Fetcher(six.Iterator):
if msg.offset == fetch_offset:
self.message_idx = i
+ # For truthiness evaluation we need to define __len__ or __nonzero__
+ def __len__(self):
+ if self.messages is None or self.message_idx >= len(self.messages):
+ return 0
+ return len(self.messages) - self.message_idx
+
def discard(self):
self.messages = None
- def take(self, n):
- if not self.has_more():
+ def take(self, n=None):
+ if not len(self):
return []
+ if n is None or n > len(self):
+ n = len(self)
next_idx = self.message_idx + n
res = self.messages[self.message_idx:next_idx]
self.message_idx = next_idx
- if self.has_more():
+ if len(self) > 0:
self.fetch_offset = self.messages[self.message_idx].offset
return res
- def has_more(self):
- return self.messages and self.message_idx < len(self.messages)
+
+class FetchResponseMetricAggregator(object):
+ """
+ Since we parse the message data for each partition from each fetch
+ response lazily, fetch-level metrics need to be aggregated as the messages
+ from each partition are parsed. This class is used to facilitate this
+ incremental aggregation.
+ """
+ def __init__(self, sensors, partitions):
+ self.sensors = sensors
+ self.unrecorded_partitions = partitions
+ self.total_bytes = 0
+ self.total_records = 0
+
+ def record(self, partition, num_bytes, num_records):
+ """
+ After each partition is parsed, we update the current metric totals
+ with the total bytes and number of records parsed. After all partitions
+ have reported, we write the metric.
+ """
+ self.unrecorded_partitions.remove(partition)
+ self.total_bytes += num_bytes
+ self.total_records += num_records
+
+ # once all expected partitions from the fetch have reported in, record the metrics
+ if not self.unrecorded_partitions:
+ self.sensors.bytes_fetched.record(self.total_bytes)
+ self.sensors.records_fetched.record(self.total_records)
class FetchManagerMetrics(object):
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 86d154f..5da597c 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -3,20 +3,26 @@ from __future__ import absolute_import
import pytest
-import itertools
from collections import OrderedDict
+import itertools
+import time
from kafka.client_async import KafkaClient
-from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError
+from kafka.codec import gzip_encode
+from kafka.consumer.fetcher import (
+ CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
+)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
-from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.fetch import FetchRequest, FetchResponse
+from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
+from kafka.protocol.types import Int64, Int32
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
- UnknownTopicOrPartitionError
+ UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
@@ -31,28 +37,33 @@ def subscription_state():
@pytest.fixture
-def fetcher(client, subscription_state):
- subscription_state.subscribe(topics=['foobar'])
- assignment = [TopicPartition('foobar', i) for i in range(3)]
+def topic():
+ return 'foobar'
+
+
+@pytest.fixture
+def fetcher(client, subscription_state, topic):
+ subscription_state.subscribe(topics=[topic])
+ assignment = [TopicPartition(topic, i) for i in range(3)]
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state, Metrics())
-def test_send_fetches(fetcher, mocker):
+def test_send_fetches(fetcher, topic, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
- [('foobar', [
+ [(topic, [
(0, 0, fetcher.config['max_partition_fetch_bytes']),
(1, 0, fetcher.config['max_partition_fetch_bytes']),
])]),
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
- [('foobar', [
+ [(topic, [
(2, 0, fetcher.config['max_partition_fetch_bytes']),
])])
]
@@ -80,9 +91,9 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests])
-def test_update_fetch_positions(fetcher, mocker):
+def test_update_fetch_positions(fetcher, topic, mocker):
mocker.patch.object(fetcher, '_reset_offset')
- partition = TopicPartition('foobar', 0)
+ partition = TopicPartition(topic, 0)
# unassigned partition
fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
@@ -285,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker):
def test_partition_records_offset():
- """Test that compressed messagesets are handle correctly
+ """Test that compressed messagesets are handled correctly
when fetch offset is in the middle of the message list
"""
batch_start = 120
@@ -296,12 +307,285 @@ def test_partition_records_offset():
None, None, 'key', 'value', 'checksum', 0, 0)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
- assert records.has_more()
+ assert len(records) > 0
msgs = records.take(1)
assert msgs[0].offset == 123
assert records.fetch_offset == 124
msgs = records.take(2)
assert len(msgs) == 2
- assert records.has_more()
+ assert len(records) > 0
records.discard()
- assert not records.has_more()
+ assert len(records) == 0
+
+
+def test_fetched_records(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._completed_fetches.append(completed_fetch)
+ records, partial = fetcher.fetched_records()
+ assert tp in records
+ assert len(records[tp]) == len(msgs)
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp]))
+ assert partial is False
+
+
+@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [
+ (
+ FetchRequest[0](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[0](
+ [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[1](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000), (1, 0, 1000),])]),
+ FetchResponse[1](
+ 0,
+ [("foo", [
+ (0, 0, 1000, [(0, b'xxx'),]),
+ (1, 0, 1000, [(0, b'xxx'),]),
+ ]),]),
+ 2,
+ ),
+ (
+ FetchRequest[2](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[2](
+ 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[3](
+ -1, 100, 100, 10000,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[3](
+ 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[4](
+ -1, 100, 100, 10000, 0,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[4](
+ 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ # This may only be used in broker-broker api calls
+ FetchRequest[5](
+ -1, 100, 100, 10000, 0,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[5](
+ 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]),
+ 1,
+ ),
+])
+def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions):
+ fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response)
+ assert len(fetcher._completed_fetches) == num_partitions
+
+
+def test__unpack_message_set(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c'))
+ ]
+ records = list(fetcher._unpack_message_set(tp, messages))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 0
+ assert records[1].offset == 1
+ assert records[2].offset == 2
+
+
+def test__unpack_message_set_compressed_v0(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c')),
+ ]
+ message_bytes = []
+ for offset, _, m in messages:
+ encoded = m.encode()
+ message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
+ compressed_bytes = gzip_encode(b''.join(message_bytes))
+ compressed_base_offset = 0
+ compressed_msgs = [
+ (compressed_base_offset, None,
+ Message(compressed_bytes,
+ magic=0,
+ attributes=Message.CODEC_GZIP))
+ ]
+ records = list(fetcher._unpack_message_set(tp, compressed_msgs))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 0
+ assert records[1].offset == 1
+ assert records[2].offset == 2
+
+
+def test__unpack_message_set_compressed_v1(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c')),
+ ]
+ message_bytes = []
+ for offset, _, m in messages:
+ encoded = m.encode()
+ message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
+ compressed_bytes = gzip_encode(b''.join(message_bytes))
+ compressed_base_offset = 10
+ compressed_msgs = [
+ (compressed_base_offset, None,
+ Message(compressed_bytes,
+ magic=1,
+ attributes=Message.CODEC_GZIP))
+ ]
+ records = list(fetcher._unpack_message_set(tp, compressed_msgs))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 8
+ assert records[1].offset == 9
+ assert records[2].offset == 10
+
+
+def test__parse_record(fetcher):
+ tp = TopicPartition('foo', 0)
+ record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
+ assert record.topic == 'foo'
+ assert record.partition == 0
+ assert record.offset == 123
+ assert record.timestamp == 456
+ assert record.value == b'abc'
+ assert record.key is None
+
+
+def test__message_generator(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._completed_fetches.append(completed_fetch)
+ for i in range(10):
+ msg = next(fetcher)
+ assert isinstance(msg, ConsumerRecord)
+ assert msg.offset == i
+ assert msg.value == b'foo'
+
+
+def test__parse_fetched_data(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert isinstance(partition_record, fetcher.PartitionRecords)
+ assert len(partition_record) == 10
+
+
+def test__parse_fetched_data__paused(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._subscriptions.pause(tp)
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+
+
+def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 10, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+
+
+def test__parse_fetched_data__not_leader(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ fetcher._client.cluster.request_update.assert_called_with()
+
+
+def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ fetcher._client.cluster.request_update.assert_called_with()
+
+
+def test__parse_fetched_data__out_of_range(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ assert fetcher._subscriptions.assignment[tp].awaiting_reset is True