# pylint: skip-file from __future__ import absolute_import import pytest from collections import OrderedDict import itertools import time from kafka.client_async import KafkaClient from kafka.codec import gzip_encode from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError ) from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse from kafka.protocol.types import Int64, Int32 from kafka.structs import TopicPartition from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError ) @pytest.fixture def client(mocker): return mocker.Mock(spec=KafkaClient(bootstrap_servers=(), api_version=(0, 9))) @pytest.fixture def subscription_state(): return SubscriptionState() @pytest.fixture def topic(): return 'foobar' @pytest.fixture def fetcher(client, subscription_state, topic): subscription_state.subscribe(topics=[topic]) assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) return Fetcher(client, subscription_state, Metrics()) def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], [(topic, [ (0, 0, fetcher.config['max_partition_fetch_bytes']), (1, 0, fetcher.config['max_partition_fetch_bytes']), ])]), FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], [(topic, [ (2, 0, fetcher.config['max_partition_fetch_bytes']), ])]) ] mocker.patch.object(fetcher, '_create_fetch_requests', return_value=dict(enumerate(fetch_requests))) ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): fetcher._client.send.assert_any_call(node, request) assert len(ret) == len(fetch_requests) @pytest.mark.parametrize(("api_version", "fetch_version"), [ ((0, 10, 1), 3), ((0, 10, 0), 2), ((0, 9), 1), ((0, 8), 0) ]) def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): fetcher._client.in_flight_request_count.return_value = 0 fetcher.config['api_version'] = api_version by_node = fetcher._create_fetch_requests() requests = by_node.values() assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests]) def test_update_fetch_positions(fetcher, topic, mocker): mocker.patch.object(fetcher, '_reset_offset') partition = TopicPartition(topic, 0) # unassigned partition fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) assert fetcher._reset_offset.call_count == 0 # fetchable partition (has offset, not paused) fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 # partition needs reset, no committed offset fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False fetcher.update_fetch_positions([partition]) fetcher._reset_offset.assert_called_with(partition) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True fetcher.update_fetch_positions([partition]) fetcher._reset_offset.assert_called_with(partition) # partition needs reset, has committed offset fetcher._reset_offset.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False fetcher._subscriptions.assignment[partition].committed = 123 mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 fetcher._subscriptions.seek.assert_called_with(partition, 123) def test__reset_offset(fetcher, mocker): tp = TopicPartition("topic", 0) fetcher._subscriptions.subscribe(topics="topic") fetcher._subscriptions.assign_from_subscribed([tp]) fetcher._subscriptions.need_offset_reset(tp) mocked = mocker.patch.object(fetcher, '_retrieve_offsets') mocked.return_value = {} with pytest.raises(NoOffsetForPartitionError): fetcher._reset_offset(tp) mocked.return_value = {tp: (1001, None)} fetcher._reset_offset(tp) assert not fetcher._subscriptions.assignment[tp].awaiting_reset assert fetcher._subscriptions.assignment[tp].position == 1001 def test__send_offset_requests(fetcher, mocker): tp = TopicPartition("topic_send_offset", 1) mocked_send = mocker.patch.object(fetcher, "_send_offset_request") send_futures = [] def send_side_effect(*args, **kw): f = Future() send_futures.append(f) return f mocked_send.side_effect = send_side_effect mocked_leader = mocker.patch.object( fetcher._client.cluster, "leader_for_partition") # First we report unavailable leader 2 times different ways and later # always as available mocked_leader.side_effect = itertools.chain( [None, -1], itertools.cycle([0])) # Leader == None fut = fetcher._send_offset_requests({tp: 0}) assert fut.failed() assert isinstance(fut.exception, StaleMetadata) assert not mocked_send.called # Leader == -1 fut = fetcher._send_offset_requests({tp: 0}) assert fut.failed() assert isinstance(fut.exception, LeaderNotAvailableError) assert not mocked_send.called # Leader == 0, send failed fut = fetcher._send_offset_requests({tp: 0}) assert not fut.is_done assert mocked_send.called # Check that we bound the futures correctly to chain failure send_futures.pop().failure(NotLeaderForPartitionError(tp)) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) # Leader == 0, send success fut = fetcher._send_offset_requests({tp: 0}) assert not fut.is_done assert mocked_send.called # Check that we bound the futures correctly to chain success send_futures.pop().success({tp: (10, 10000)}) assert fut.succeeded() assert fut.value == {tp: (10, 10000)} def test__send_offset_requests_multiple_nodes(fetcher, mocker): tp1 = TopicPartition("topic_send_offset", 1) tp2 = TopicPartition("topic_send_offset", 2) tp3 = TopicPartition("topic_send_offset", 3) tp4 = TopicPartition("topic_send_offset", 4) mocked_send = mocker.patch.object(fetcher, "_send_offset_request") send_futures = [] def send_side_effect(node_id, timestamps): f = Future() send_futures.append((node_id, timestamps, f)) return f mocked_send.side_effect = send_side_effect mocked_leader = mocker.patch.object( fetcher._client.cluster, "leader_for_partition") mocked_leader.side_effect = itertools.cycle([0, 1]) # -- All node succeeded case tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) fut = fetcher._send_offset_requests(tss) assert not fut.is_done assert mocked_send.call_count == 2 req_by_node = {} second_future = None for node, timestamps, f in send_futures: req_by_node[node] = timestamps if node == 0: # Say tp3 does not have any messages so it's missing f.success({tp1: (11, 1001)}) else: second_future = f assert req_by_node == { 0: {tp1: 0, tp3: 0}, 1: {tp2: 0, tp4: 0} } # We only resolved 1 future so far, so result future is not yet ready assert not fut.is_done second_future.success({tp2: (12, 1002), tp4: (14, 1004)}) assert fut.succeeded() assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)} # -- First succeeded second not del send_futures[:] fut = fetcher._send_offset_requests(tss) assert len(send_futures) == 2 send_futures[0][2].success({tp1: (11, 1001)}) send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) # -- First fails second succeeded del send_futures[:] fut = fetcher._send_offset_requests(tss) assert len(send_futures) == 2 send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) send_futures[1][2].success({tp1: (11, 1001)}) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) def test__handle_offset_response(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition fut = Future() res = OffsetResponse[1]([ ("topic", [(0, 43, -1, -1)]), ("topic", [(1, 0, 1000, 9999)]) ]) fetcher._handle_offset_response(fut, res) assert fut.succeeded() assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} # Broker returns NotLeaderForPartitionError fut = Future() res = OffsetResponse[1]([ ("topic", [(0, 6, -1, -1)]), ]) fetcher._handle_offset_response(fut, res) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) # Broker returns UnknownTopicOrPartitionError fut = Future() res = OffsetResponse[1]([ ("topic", [(0, 3, -1, -1)]), ]) fetcher._handle_offset_response(fut, res) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) # Broker returns many errors and 1 result # Will fail on 1st error and return fut = Future() res = OffsetResponse[1]([ ("topic", [(0, 43, -1, -1)]), ("topic", [(1, 6, -1, -1)]), ("topic", [(2, 3, -1, -1)]), ("topic", [(3, 0, 1000, 9999)]) ]) fetcher._handle_offset_response(fut, res) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) def test_partition_records_offset(): """Test that compressed messagesets are handled correctly when fetch offset is in the middle of the message list """ batch_start = 120 batch_end = 130 fetch_offset = 123 tp = TopicPartition('foo', 0) messages = [ConsumerRecord(tp.topic, tp.partition, i, None, None, 'key', 'value', 'checksum', 0, 0) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) assert len(records) > 0 msgs = records.take(1) assert msgs[0].offset == 123 assert records.fetch_offset == 124 msgs = records.take(2) assert len(msgs) == 2 assert len(records) > 0 records.discard() assert len(records) == 0 def test_fetched_records(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): msg = Message(b'foo') msgs.append((i, -1, msg)) completed_fetch = CompletedFetch( tp, 0, 0, [0, 100, msgs], mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) records, partial = fetcher.fetched_records() assert tp in records assert len(records[tp]) == len(msgs) assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp])) assert partial is False @pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [ ( FetchRequest[0]( -1, 100, 100, [('foo', [(0, 0, 1000),])]), FetchResponse[0]( [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( FetchRequest[1]( -1, 100, 100, [('foo', [(0, 0, 1000), (1, 0, 1000),])]), FetchResponse[1]( 0, [("foo", [ (0, 0, 1000, [(0, b'xxx'),]), (1, 0, 1000, [(0, b'xxx'),]), ]),]), 2, ), ( FetchRequest[2]( -1, 100, 100, [('foo', [(0, 0, 1000),])]), FetchResponse[2]( 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( FetchRequest[3]( -1, 100, 100, 10000, [('foo', [(0, 0, 1000),])]), FetchResponse[3]( 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( FetchRequest[4]( -1, 100, 100, 10000, 0, [('foo', [(0, 0, 1000),])]), FetchResponse[4]( 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), 1, ), ( # This may only be used in broker-broker api calls FetchRequest[5]( -1, 100, 100, 10000, 0, [('foo', [(0, 0, 1000),])]), FetchResponse[5]( 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), 1, ), ]) def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions): fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response) assert len(fetcher._completed_fetches) == num_partitions def test__unpack_message_set(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0) messages = [ (0, None, Message(b'a')), (1, None, Message(b'b')), (2, None, Message(b'c')) ] records = list(fetcher._unpack_message_set(tp, messages)) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a' assert records[1].value == b'b' assert records[2].value == b'c' assert records[0].offset == 0 assert records[1].offset == 1 assert records[2].offset == 2 def test__unpack_message_set_compressed_v0(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0) messages = [ (0, None, Message(b'a')), (1, None, Message(b'b')), (2, None, Message(b'c')), ] message_bytes = [] for offset, _, m in messages: encoded = m.encode() message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) compressed_bytes = gzip_encode(b''.join(message_bytes)) compressed_base_offset = 0 compressed_msgs = [ (compressed_base_offset, None, Message(compressed_bytes, magic=0, attributes=Message.CODEC_GZIP)) ] records = list(fetcher._unpack_message_set(tp, compressed_msgs)) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a' assert records[1].value == b'b' assert records[2].value == b'c' assert records[0].offset == 0 assert records[1].offset == 1 assert records[2].offset == 2 def test__unpack_message_set_compressed_v1(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0) messages = [ (0, None, Message(b'a')), (1, None, Message(b'b')), (2, None, Message(b'c')), ] message_bytes = [] for offset, _, m in messages: encoded = m.encode() message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) compressed_bytes = gzip_encode(b''.join(message_bytes)) compressed_base_offset = 10 compressed_msgs = [ (compressed_base_offset, None, Message(compressed_bytes, magic=1, attributes=Message.CODEC_GZIP)) ] records = list(fetcher._unpack_message_set(tp, compressed_msgs)) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a' assert records[1].value == b'b' assert records[2].value == b'c' assert records[0].offset == 8 assert records[1].offset == 9 assert records[2].offset == 10 def test__parse_record(fetcher): tp = TopicPartition('foo', 0) record = fetcher._parse_record(tp, 123, 456, Message(b'abc')) assert record.topic == 'foo' assert record.partition == 0 assert record.offset == 123 assert record.timestamp == 456 assert record.value == b'abc' assert record.key is None def test__message_generator(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): msg = Message(b'foo') msgs.append((i, -1, msg)) completed_fetch = CompletedFetch( tp, 0, 0, [0, 100, msgs], mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) for i in range(10): msg = next(fetcher) assert isinstance(msg, ConsumerRecord) assert msg.offset == i assert msg.value == b'foo' def test__parse_fetched_data(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): msg = Message(b'foo') msgs.append((i, -1, msg)) completed_fetch = CompletedFetch( tp, 0, 0, [0, 100, msgs], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) assert isinstance(partition_record, fetcher.PartitionRecords) assert len(partition_record) == 10 def test__parse_fetched_data__paused(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): msg = Message(b'foo') msgs.append((i, -1, msg)) completed_fetch = CompletedFetch( tp, 0, 0, [0, 100, msgs], mocker.MagicMock() ) fetcher._subscriptions.pause(tp) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): msg = Message(b'foo') msgs.append((i, -1, msg)) completed_fetch = CompletedFetch( tp, 10, 0, [0, 100, msgs], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None def test__parse_fetched_data__not_leader(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) completed_fetch = CompletedFetch( tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None fetcher._client.cluster.request_update.assert_called_with() def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) completed_fetch = CompletedFetch( tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None fetcher._client.cluster.request_update.assert_called_with() def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) completed_fetch = CompletedFetch( tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None assert fetcher._subscriptions.assignment[tp].awaiting_reset is True