diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-11 17:44:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-11 17:44:17 +0300 |
commit | f04435c5ed97fef0975a77a8dc7bae7c284bba63 (patch) | |
tree | da8e24a147f6643952856a92edd6c0bd3a3961e6 | |
parent | 1df58bf87da1a2c8a2f9e659dfabaed1cff7c0c2 (diff) | |
parent | bc573e3d63a687903a9be2e1b3da2f943a7208e1 (diff) | |
download | kafka-python-f04435c5ed97fef0975a77a8dc7bae7c284bba63.tar.gz |
Merge pull request #1245 from dpkp/KAFKA_3977_defer_fetch_parsing
KAFKA-3977: defer fetch response parsing and raise exceptions to user
-rw-r--r-- | kafka/consumer/fetcher.py | 498 | ||||
-rw-r--r-- | test/test_fetcher.py | 316 |
2 files changed, 537 insertions, 277 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f552038..c4fa546 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -28,6 +28,11 @@ ConsumerRecord = collections.namedtuple("ConsumerRecord", "key", "value", "checksum", "serialized_key_size", "serialized_value_size"]) +CompletedFetch = collections.namedtuple("CompletedFetch", + ["topic_partition", "fetched_offset", "response_version", + "partition_data", "metric_aggregator"]) + + class NoOffsetForPartitionError(Errors.KafkaError): pass @@ -104,18 +109,15 @@ class Fetcher(six.Iterator): self._client = client self._subscriptions = subscriptions - self._records = collections.deque() # (offset, topic_partition, messages) - self._unauthorized_topics = set() - self._offset_out_of_range_partitions = dict() # {topic_partition: offset} - self._record_too_large_partitions = dict() # {topic_partition: offset} + self._completed_fetches = collections.deque() # Unparsed responses + self._next_partition_records = None # Holds a single PartitionRecords until fully consumed self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) def send_fetches(self): - """Send FetchRequests asynchronously for all assigned partitions. - - Note: noop if there are unconsumed records internal to the fetcher + """Send FetchRequests for all assigned partitions that do not already have + an in-flight fetch or pending fetch data. Returns: List of Futures: each future resolves to a FetchResponse @@ -125,7 +127,6 @@ class Fetcher(six.Iterator): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request) - future.error_on_callbacks=True future.add_callback(self._handle_fetch_response, request, time.time()) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) @@ -285,67 +286,6 @@ class Fetcher(six.Iterator): raise Errors.KafkaTimeoutError( "Failed to get offsets by timestamps in %s ms" % timeout_ms) - def _raise_if_offset_out_of_range(self): - """Check FetchResponses for offset out of range. - - Raises: - OffsetOutOfRangeError: if any partition from previous FetchResponse - contains OffsetOutOfRangeError and the default_reset_policy is - None - """ - if not self._offset_out_of_range_partitions: - return - - current_out_of_range_partitions = {} - - # filter only the fetchable partitions - for partition, offset in six.iteritems(self._offset_out_of_range_partitions): - if not self._subscriptions.is_fetchable(partition): - log.debug("Ignoring fetched records for %s since it is no" - " longer fetchable", partition) - continue - position = self._subscriptions.assignment[partition].position - # ignore partition if the current position != offset in FetchResponse - # e.g. after seek() - if position is not None and offset == position: - current_out_of_range_partitions[partition] = position - - self._offset_out_of_range_partitions.clear() - if current_out_of_range_partitions: - raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions) - - def _raise_if_unauthorized_topics(self): - """Check FetchResponses for topic authorization failures. - - Raises: - TopicAuthorizationFailedError - """ - if self._unauthorized_topics: - topics = set(self._unauthorized_topics) - self._unauthorized_topics.clear() - raise Errors.TopicAuthorizationFailedError(topics) - - def _raise_if_record_too_large(self): - """Check FetchResponses for messages larger than the max per partition. - - Raises: - RecordTooLargeError: if there is a message larger than fetch size - """ - if not self._record_too_large_partitions: - return - - copied_record_too_large_partitions = dict(self._record_too_large_partitions) - self._record_too_large_partitions.clear() - - raise RecordTooLargeError( - "There are some messages at [Partition=Offset]: %s " - " whose size is larger than the fetch size %s" - " and hence cannot be ever returned." - " Increase the fetch size, or decrease the maximum message" - " size the broker will allow.", - copied_record_too_large_partitions, - self.config['max_partition_fetch_bytes']) - def fetched_records(self, max_records=None): """Returns previously fetched records and updates consumed offsets. @@ -375,22 +315,25 @@ class Fetcher(six.Iterator): if self._subscriptions.needs_partition_assignment: return {}, False - self._raise_if_offset_out_of_range() - self._raise_if_unauthorized_topics() - self._raise_if_record_too_large() - drained = collections.defaultdict(list) - partial = bool(self._records and max_records) - while self._records and max_records > 0: - part = self._records.popleft() - max_records -= self._append(drained, part, max_records) - if part.has_more(): - self._records.appendleft(part) + records_remaining = max_records + + while records_remaining > 0: + if not self._next_partition_records: + if not self._completed_fetches: + break + completion = self._completed_fetches.popleft() + self._next_partition_records = self._parse_fetched_data(completion) else: - partial &= False - return dict(drained), partial + records_remaining -= self._append(drained, + self._next_partition_records, + records_remaining) + return dict(drained), bool(self._completed_fetches) def _append(self, drained, part, max_records): + if not part: + return 0 + tp = part.topic_partition fetch_offset = part.fetch_offset if not self._subscriptions.is_assigned(tp): @@ -409,9 +352,8 @@ class Fetcher(six.Iterator): " %s since it is no longer fetchable", tp) elif fetch_offset == position: + # we are ensured to have at least one record since we already checked for emptiness part_records = part.take(max_records) - if not part_records: - return 0 next_offset = part_records[-1].offset + 1 log.log(0, "Returning fetched records at offset %d for assigned" @@ -444,93 +386,72 @@ class Fetcher(six.Iterator): if self._subscriptions.needs_partition_assignment: raise StopIteration('Subscription needs partition assignment') - while self._records: + while self._next_partition_records or self._completed_fetches: - # Check on each iteration since this is a generator - self._raise_if_offset_out_of_range() - self._raise_if_unauthorized_topics() - self._raise_if_record_too_large() + if not self._next_partition_records: + completion = self._completed_fetches.popleft() + self._next_partition_records = self._parse_fetched_data(completion) + continue # Send additional FetchRequests when the internal queue is low # this should enable moderate pipelining - if len(self._records) <= self.config['iterator_refetch_records']: + if len(self._completed_fetches) <= self.config['iterator_refetch_records']: self.send_fetches() - part = self._records.popleft() - - tp = part.topic_partition - fetch_offset = part.fetch_offset - if not self._subscriptions.is_assigned(tp): - # this can happen when a rebalance happened before - # fetched records are returned - log.debug("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) - continue - - # note that the position should always be available - # as long as the partition is still assigned - position = self._subscriptions.assignment[tp].position - if not self._subscriptions.is_fetchable(tp): - # this can happen when a partition is paused before - # fetched records are returned - log.debug("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) + tp = self._next_partition_records.topic_partition - elif fetch_offset == position: - log.log(0, "Returning fetched records at offset %d for assigned" - " partition %s", position, tp) - - # We can ignore any prior signal to drop pending message sets - # because we are starting from a fresh one where fetch_offset == position - # i.e., the user seek()'d to this position - self._subscriptions.assignment[tp].drop_pending_message_set = False - - for msg in part.messages: - - # Because we are in a generator, it is possible for - # subscription state to change between yield calls - # so we need to re-check on each loop - # this should catch assignment changes, pauses - # and resets via seek_to_beginning / seek_to_end - if not self._subscriptions.is_fetchable(tp): - log.debug("Not returning fetched records for partition %s" - " since it is no longer fetchable", tp) - break - - # If there is a seek during message iteration, - # we should stop unpacking this message set and - # wait for a new fetch response that aligns with the - # new seek position - elif self._subscriptions.assignment[tp].drop_pending_message_set: - log.debug("Skipping remainder of message set for partition %s", tp) - self._subscriptions.assignment[tp].drop_pending_message_set = False - break - - # Compressed messagesets may include earlier messages - elif msg.offset < self._subscriptions.assignment[tp].position: - log.debug("Skipping message offset: %s (expecting %s)", - msg.offset, - self._subscriptions.assignment[tp].position) - continue + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False - self._subscriptions.assignment[tp].position = msg.offset + 1 - yield msg + for msg in self._next_partition_records.take(): - else: - # these records aren't next in line based on the last consumed - # position, ignore them they must be from an obsolete request - log.debug("Ignoring fetched records for %s at offset %s since" - " the current position is %d", tp, part.fetch_offset, - position) + # Because we are in a generator, it is possible for + # subscription state to change between yield calls + # so we need to re-check on each loop + # this should catch assignment changes, pauses + # and resets via seek_to_beginning / seek_to_end + if not self._subscriptions.is_fetchable(tp): + log.debug("Not returning fetched records for partition %s" + " since it is no longer fetchable", tp) + self._next_partition_records = None + break + + # If there is a seek during message iteration, + # we should stop unpacking this message set and + # wait for a new fetch response that aligns with the + # new seek position + elif self._subscriptions.assignment[tp].drop_pending_message_set: + log.debug("Skipping remainder of message set for partition %s", tp) + self._subscriptions.assignment[tp].drop_pending_message_set = False + self._next_partition_records = None + break + + # Compressed messagesets may include earlier messages + elif msg.offset < self._subscriptions.assignment[tp].position: + log.debug("Skipping message offset: %s (expecting %s)", + msg.offset, + self._subscriptions.assignment[tp].position) + continue + + self._subscriptions.assignment[tp].position = msg.offset + 1 + yield msg + + self._next_partition_records = None def _unpack_message_set(self, tp, messages): try: for offset, size, msg in messages: if self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) - elif msg.is_compressed(): - # If relative offset is used, we need to decompress the entire message first to compute - # the absolute offset. + + if not msg.is_compressed(): + yield self._parse_record(tp, offset, msg.timestamp, msg) + + else: + # If relative offset is used, we need to decompress the entire message first + # to compute the absolute offset. inner_mset = msg.decompress() # There should only ever be a single layer of compression @@ -569,31 +490,7 @@ class Fetcher(six.Iterator): if absolute_base_offset >= 0: inner_offset += absolute_base_offset - - key = self._deserialize( - self.config['key_deserializer'], - tp.topic, inner_msg.key) - value = self._deserialize( - self.config['value_deserializer'], - tp.topic, inner_msg.value) - yield ConsumerRecord(tp.topic, tp.partition, inner_offset, - inner_timestamp, msg.timestamp_type, - key, value, inner_msg.crc, - len(inner_msg.key) if inner_msg.key is not None else -1, - len(inner_msg.value) if inner_msg.value is not None else -1) - - else: - key = self._deserialize( - self.config['key_deserializer'], - tp.topic, msg.key) - value = self._deserialize( - self.config['value_deserializer'], - tp.topic, msg.value) - yield ConsumerRecord(tp.topic, tp.partition, offset, - msg.timestamp, msg.timestamp_type, - key, value, msg.crc, - len(msg.key) if msg.key is not None else -1, - len(msg.value) if msg.value is not None else -1) + yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg) # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised @@ -608,6 +505,15 @@ class Fetcher(six.Iterator): log.exception('AssertionError raised unpacking messageset: %s', e) raise + def _parse_record(self, tp, offset, timestamp, msg): + key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key) + value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value) + return ConsumerRecord(tp.topic, tp.partition, offset, + timestamp, msg.timestamp_type, + key, value, msg.crc, + len(msg.key) if msg.key is not None else -1, + len(msg.value) if msg.value is not None else -1) + def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -764,8 +670,11 @@ class Fetcher(six.Iterator): def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() - pending = set([part.topic_partition for part in self._records]) - return fetchable.difference(pending) + if self._next_partition_records: + fetchable.remove(self._next_partition_records.topic_partition) + for fetch in self._completed_fetches: + fetchable.remove(fetch.topic_partition) + return fetchable def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. @@ -835,93 +744,126 @@ class Fetcher(six.Iterator): def _handle_fetch_response(self, request, send_time, response): """The callback for fetch completion""" - total_bytes = 0 - total_count = 0 - recv_time = time.time() - fetch_offsets = {} for topic, partitions in request.topics: - for partition, offset, _ in partitions: + for partition_data in partitions: + partition, offset = partition_data[:2] fetch_offsets[TopicPartition(topic, partition)] = offset + partitions = set([TopicPartition(topic, partition_data[0]) + for topic, partitions in response.topics + for partition_data in partitions]) + metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) + # randomized ordering should improve balance for short-lived consumers random.shuffle(response.topics) for topic, partitions in response.topics: random.shuffle(partitions) - for partition, error_code, highwater, messages in partitions: - tp = TopicPartition(topic, partition) - error_type = Errors.for_code(error_code) - if not self._subscriptions.is_fetchable(tp): - # this can happen when a rebalance happened or a partition - # consumption paused while fetch is still in-flight - log.debug("Ignoring fetched records for partition %s" - " since it is no longer fetchable", tp) + for partition_data in partitions: + tp = TopicPartition(topic, partition_data[0]) + completed_fetch = CompletedFetch( + tp, fetch_offsets[tp], + response.API_VERSION, + partition_data[1:], + metric_aggregator + ) + self._completed_fetches.append(completed_fetch) - elif error_type is Errors.NoError: - self._subscriptions.assignment[tp].highwater = highwater - - # we are interested in this fetch only if the beginning - # offset (of the *request*) matches the current consumed position - # Note that the *response* may return a messageset that starts - # earlier (e.g., compressed messages) or later (e.g., compacted topic) - fetch_offset = fetch_offsets[tp] - position = self._subscriptions.assignment[tp].position - if position is None or position != fetch_offset: - log.debug("Discarding fetch response for partition %s" - " since its offset %d does not match the" - " expected offset %d", tp, fetch_offset, - position) - continue + if response.API_VERSION >= 1: + self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) + self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + + def _parse_fetched_data(self, completed_fetch): + tp = completed_fetch.topic_partition + partition = completed_fetch.partition_data + fetch_offset = completed_fetch.fetched_offset + num_bytes = 0 + records_count = 0 + parsed_records = None - num_bytes = 0 - partial = None - if messages and isinstance(messages[-1][-1], PartialMessage): - partial = messages.pop() - - if messages: - log.debug("Adding fetched record for partition %s with" - " offset %d to buffered record list", tp, - position) - unpacked = list(self._unpack_message_set(tp, messages)) - self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked)) - last_offset, _, _ = messages[-1] - self._sensors.records_fetch_lag.record(highwater - last_offset) - num_bytes = sum(msg[1] for msg in messages) - elif partial: - # we did not read a single message from a non-empty - # buffer because that message's size is larger than - # fetch size, in this case record this exception - self._record_too_large_partitions[tp] = fetch_offset - - self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages)) - total_bytes += num_bytes - total_count += len(messages) - elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): - self._client.cluster.request_update() - elif error_type is Errors.OffsetOutOfRangeError: - fetch_offset = fetch_offsets[tp] + error_code, highwater = completed_fetch.partition_data[:2] + error_type = Errors.for_code(error_code) + messages = completed_fetch.partition_data[-1] + + try: + if not self._subscriptions.is_fetchable(tp): + # this can happen when a rebalance happened or a partition + # consumption paused while fetch is still in-flight + log.debug("Ignoring fetched records for partition %s" + " since it is no longer fetchable", tp) + + elif error_type is Errors.NoError: + self._subscriptions.assignment[tp].highwater = highwater + + # we are interested in this fetch only if the beginning + # offset (of the *request*) matches the current consumed position + # Note that the *response* may return a messageset that starts + # earlier (e.g., compressed messages) or later (e.g., compacted topic) + position = self._subscriptions.assignment[tp].position + if position is None or position != fetch_offset: + log.debug("Discarding fetch response for partition %s" + " since its offset %d does not match the" + " expected offset %d", tp, fetch_offset, + position) + return None + + partial = None + if messages and isinstance(messages[-1][-1], PartialMessage): + partial = messages.pop() + + if messages: + log.debug("Adding fetched record for partition %s with" + " offset %d to buffered record list", tp, + position) + unpacked = list(self._unpack_message_set(tp, messages)) + parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) + last_offset, _, _ = messages[-1] + self._sensors.records_fetch_lag.record(highwater - last_offset) + num_bytes = sum(msg[1] for msg in messages) + records_count = len(messages) + elif partial: + # we did not read a single message from a non-empty + # buffer because that message's size is larger than + # fetch size, in this case record this exception + record_too_large_partitions = {tp: fetch_offset} + raise RecordTooLargeError( + "There are some messages at [Partition=Offset]: %s " + " whose size is larger than the fetch size %s" + " and hence cannot be ever returned." + " Increase the fetch size, or decrease the maximum message" + " size the broker will allow." % ( + record_too_large_partitions, + self.config['max_partition_fetch_bytes']), + record_too_large_partitions) + self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count) + + elif error_type in (Errors.NotLeaderForPartitionError, + Errors.UnknownTopicOrPartitionError): + self._client.cluster.request_update() + elif error_type is Errors.OffsetOutOfRangeError: + position = self._subscriptions.assignment[tp].position + if position is None or position != fetch_offset: + log.debug("Discarding stale fetch response for partition %s" + " since the fetched offset %d does not match the" + " current offset %d", tp, fetch_offset, position) + elif self._subscriptions.has_default_offset_reset_policy(): log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp) - if self._subscriptions.has_default_offset_reset_policy(): - self._subscriptions.need_offset_reset(tp) - log.info("Resetting offset for topic-partition %s", tp) - else: - self._offset_out_of_range_partitions[tp] = fetch_offset - elif error_type is Errors.TopicAuthorizationFailedError: - log.warn("Not authorized to read from topic %s.", tp.topic) - self._unauthorized_topics.add(tp.topic) - elif error_type is Errors.UnknownError: - log.warn("Unknown error fetching data for topic-partition %s", tp) + self._subscriptions.need_offset_reset(tp) else: - raise error_type('Unexpected error while fetching data') + raise Errors.OffsetOutOfRangeError({tp: fetch_offset}) - # Because we are currently decompressing messages lazily, the sensors here - # will get compressed bytes / message set stats when compression is enabled - self._sensors.bytes_fetched.record(total_bytes) - self._sensors.records_fetched.record(total_count) - if response.API_VERSION >= 1: - self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) - self._sensors.fetch_latency.record((recv_time - send_time) * 1000) + elif error_type is Errors.TopicAuthorizationFailedError: + log.warn("Not authorized to read from topic %s.", tp.topic) + raise Errors.TopicAuthorizationFailedError(set(tp.topic)) + elif error_type is Errors.UnknownError: + log.warn("Unknown error fetching data for topic-partition %s", tp) + else: + raise error_type('Unexpected error while fetching data') + + finally: + completed_fetch.metric_aggregator.record(tp, num_bytes, records_count) + + return parsed_records class PartitionRecords(object): def __init__(self, fetch_offset, tp, messages): @@ -935,21 +877,55 @@ class Fetcher(six.Iterator): if msg.offset == fetch_offset: self.message_idx = i + # For truthiness evaluation we need to define __len__ or __nonzero__ + def __len__(self): + if self.messages is None or self.message_idx >= len(self.messages): + return 0 + return len(self.messages) - self.message_idx + def discard(self): self.messages = None - def take(self, n): - if not self.has_more(): + def take(self, n=None): + if not len(self): return [] + if n is None or n > len(self): + n = len(self) next_idx = self.message_idx + n res = self.messages[self.message_idx:next_idx] self.message_idx = next_idx - if self.has_more(): + if len(self) > 0: self.fetch_offset = self.messages[self.message_idx].offset return res - def has_more(self): - return self.messages and self.message_idx < len(self.messages) + +class FetchResponseMetricAggregator(object): + """ + Since we parse the message data for each partition from each fetch + response lazily, fetch-level metrics need to be aggregated as the messages + from each partition are parsed. This class is used to facilitate this + incremental aggregation. + """ + def __init__(self, sensors, partitions): + self.sensors = sensors + self.unrecorded_partitions = partitions + self.total_bytes = 0 + self.total_records = 0 + + def record(self, partition, num_bytes, num_records): + """ + After each partition is parsed, we update the current metric totals + with the total bytes and number of records parsed. After all partitions + have reported, we write the metric. + """ + self.unrecorded_partitions.remove(partition) + self.total_bytes += num_bytes + self.total_records += num_records + + # once all expected partitions from the fetch have reported in, record the metrics + if not self.unrecorded_partitions: + self.sensors.bytes_fetched.record(self.total_bytes) + self.sensors.records_fetched.record(self.total_records) class FetchManagerMetrics(object): diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 86d154f..5da597c 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -3,20 +3,26 @@ from __future__ import absolute_import import pytest -import itertools from collections import OrderedDict +import itertools +import time from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError +from kafka.codec import gzip_encode +from kafka.consumer.fetcher import ( + CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError +) from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics -from kafka.protocol.fetch import FetchRequest +from kafka.protocol.fetch import FetchRequest, FetchResponse +from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse +from kafka.protocol.types import Int64, Int32 from kafka.structs import TopicPartition from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, OffsetOutOfRangeError ) @@ -31,28 +37,33 @@ def subscription_state(): @pytest.fixture -def fetcher(client, subscription_state): - subscription_state.subscribe(topics=['foobar']) - assignment = [TopicPartition('foobar', i) for i in range(3)] +def topic(): + return 'foobar' + + +@pytest.fixture +def fetcher(client, subscription_state, topic): + subscription_state.subscribe(topics=[topic]) + assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) return Fetcher(client, subscription_state, Metrics()) -def test_send_fetches(fetcher, mocker): +def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], - [('foobar', [ + [(topic, [ (0, 0, fetcher.config['max_partition_fetch_bytes']), (1, 0, fetcher.config['max_partition_fetch_bytes']), ])]), FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], - [('foobar', [ + [(topic, [ (2, 0, fetcher.config['max_partition_fetch_bytes']), ])]) ] @@ -80,9 +91,9 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests]) -def test_update_fetch_positions(fetcher, mocker): +def test_update_fetch_positions(fetcher, topic, mocker): mocker.patch.object(fetcher, '_reset_offset') - partition = TopicPartition('foobar', 0) + partition = TopicPartition(topic, 0) # unassigned partition fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) @@ -285,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker): def test_partition_records_offset(): - """Test that compressed messagesets are handle correctly + """Test that compressed messagesets are handled correctly when fetch offset is in the middle of the message list """ batch_start = 120 @@ -296,12 +307,285 @@ def test_partition_records_offset(): None, None, 'key', 'value', 'checksum', 0, 0) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) - assert records.has_more() + assert len(records) > 0 msgs = records.take(1) assert msgs[0].offset == 123 assert records.fetch_offset == 124 msgs = records.take(2) assert len(msgs) == 2 - assert records.has_more() + assert len(records) > 0 records.discard() - assert not records.has_more() + assert len(records) == 0 + + +def test_fetched_records(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._completed_fetches.append(completed_fetch) + records, partial = fetcher.fetched_records() + assert tp in records + assert len(records[tp]) == len(msgs) + assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp])) + assert partial is False + + +@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [ + ( + FetchRequest[0]( + -1, 100, 100, + [('foo', [(0, 0, 1000),])]), + FetchResponse[0]( + [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[1]( + -1, 100, 100, + [('foo', [(0, 0, 1000), (1, 0, 1000),])]), + FetchResponse[1]( + 0, + [("foo", [ + (0, 0, 1000, [(0, b'xxx'),]), + (1, 0, 1000, [(0, b'xxx'),]), + ]),]), + 2, + ), + ( + FetchRequest[2]( + -1, 100, 100, + [('foo', [(0, 0, 1000),])]), + FetchResponse[2]( + 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[3]( + -1, 100, 100, 10000, + [('foo', [(0, 0, 1000),])]), + FetchResponse[3]( + 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[4]( + -1, 100, 100, 10000, 0, + [('foo', [(0, 0, 1000),])]), + FetchResponse[4]( + 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), + 1, + ), + ( + # This may only be used in broker-broker api calls + FetchRequest[5]( + -1, 100, 100, 10000, 0, + [('foo', [(0, 0, 1000),])]), + FetchResponse[5]( + 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), + 1, + ), +]) +def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions): + fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response) + assert len(fetcher._completed_fetches) == num_partitions + + +def test__unpack_message_set(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')) + ] + records = list(fetcher._unpack_message_set(tp, messages)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 0 + assert records[1].offset == 1 + assert records[2].offset == 2 + + +def test__unpack_message_set_compressed_v0(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')), + ] + message_bytes = [] + for offset, _, m in messages: + encoded = m.encode() + message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) + compressed_bytes = gzip_encode(b''.join(message_bytes)) + compressed_base_offset = 0 + compressed_msgs = [ + (compressed_base_offset, None, + Message(compressed_bytes, + magic=0, + attributes=Message.CODEC_GZIP)) + ] + records = list(fetcher._unpack_message_set(tp, compressed_msgs)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 0 + assert records[1].offset == 1 + assert records[2].offset == 2 + + +def test__unpack_message_set_compressed_v1(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')), + ] + message_bytes = [] + for offset, _, m in messages: + encoded = m.encode() + message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) + compressed_bytes = gzip_encode(b''.join(message_bytes)) + compressed_base_offset = 10 + compressed_msgs = [ + (compressed_base_offset, None, + Message(compressed_bytes, + magic=1, + attributes=Message.CODEC_GZIP)) + ] + records = list(fetcher._unpack_message_set(tp, compressed_msgs)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 8 + assert records[1].offset == 9 + assert records[2].offset == 10 + + +def test__parse_record(fetcher): + tp = TopicPartition('foo', 0) + record = fetcher._parse_record(tp, 123, 456, Message(b'abc')) + assert record.topic == 'foo' + assert record.partition == 0 + assert record.offset == 123 + assert record.timestamp == 456 + assert record.value == b'abc' + assert record.key is None + + +def test__message_generator(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._completed_fetches.append(completed_fetch) + for i in range(10): + msg = next(fetcher) + assert isinstance(msg, ConsumerRecord) + assert msg.offset == i + assert msg.value == b'foo' + + +def test__parse_fetched_data(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert isinstance(partition_record, fetcher.PartitionRecords) + assert len(partition_record) == 10 + + +def test__parse_fetched_data__paused(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._subscriptions.pause(tp) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + + +def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 10, 0, [0, 100, msgs], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + + +def test__parse_fetched_data__not_leader(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + fetcher._client.cluster.request_update.assert_called_with() + + +def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + fetcher._client.cluster.request_update.assert_called_with() + + +def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + assert fetcher._subscriptions.assignment[tp].awaiting_reset is True |