summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-08 10:50:06 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-08 10:50:06 -0700
commitbc573e3d63a687903a9be2e1b3da2f943a7208e1 (patch)
treebea57a0f395112acab103392988f26c62510c2f4
parenta7d8ae5411cc74b119ca91a8ff84ddc68cd47c93 (diff)
downloadkafka-python-KAFKA_3977_defer_fetch_parsing.tar.gz
-rw-r--r--kafka/consumer/fetcher.py5
-rw-r--r--test/test_fetcher.py210
2 files changed, 200 insertions, 15 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d059a10..c4fa546 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -400,6 +400,11 @@ class Fetcher(six.Iterator):
tp = self._next_partition_records.topic_partition
+ # We can ignore any prior signal to drop pending message sets
+ # because we are starting from a fresh one where fetch_offset == position
+ # i.e., the user seek()'d to this position
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+
for msg in self._next_partition_records.take():
# Because we are in a generator, it is possible for
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 3bf26db..5da597c 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -8,6 +8,7 @@ import itertools
import time
from kafka.client_async import KafkaClient
+from kafka.codec import gzip_encode
from kafka.consumer.fetcher import (
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
)
@@ -16,11 +17,12 @@ from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
+from kafka.protocol.types import Int64, Int32
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
- UnknownTopicOrPartitionError
+ UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
@@ -294,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker):
def test_partition_records_offset():
- """Test that compressed messagesets are handle correctly
+ """Test that compressed messagesets are handled correctly
when fetch offset is in the middle of the message list
"""
batch_start = 120
@@ -317,6 +319,7 @@ def test_partition_records_offset():
def test_fetched_records(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
@@ -327,7 +330,6 @@ def test_fetched_records(fetcher, topic, mocker):
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
- fetcher.config['check_crcs'] = False
records, partial = fetcher.fetched_records()
assert tp in records
assert len(records[tp]) == len(msgs)
@@ -341,7 +343,7 @@ def test_fetched_records(fetcher, topic, mocker):
-1, 100, 100,
[('foo', [(0, 0, 1000),])]),
FetchResponse[0](
- [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),])]),]),
+ [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
1,
),
(
@@ -351,8 +353,8 @@ def test_fetched_records(fetcher, topic, mocker):
FetchResponse[1](
0,
[("foo", [
- (0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]),
- (1, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]),
+ (0, 0, 1000, [(0, b'xxx'),]),
+ (1, 0, 1000, [(0, b'xxx'),]),
]),]),
2,
),
@@ -361,7 +363,7 @@ def test_fetched_records(fetcher, topic, mocker):
-1, 100, 100,
[('foo', [(0, 0, 1000),])]),
FetchResponse[2](
- 0, [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=1)._encode_self()),])]),]),
+ 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
1,
),
(
@@ -395,17 +397,195 @@ def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_part
assert len(fetcher._completed_fetches) == num_partitions
-def test__unpack_message_set():
- pass
+def test__unpack_message_set(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c'))
+ ]
+ records = list(fetcher._unpack_message_set(tp, messages))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 0
+ assert records[1].offset == 1
+ assert records[2].offset == 2
+
+
+def test__unpack_message_set_compressed_v0(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c')),
+ ]
+ message_bytes = []
+ for offset, _, m in messages:
+ encoded = m.encode()
+ message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
+ compressed_bytes = gzip_encode(b''.join(message_bytes))
+ compressed_base_offset = 0
+ compressed_msgs = [
+ (compressed_base_offset, None,
+ Message(compressed_bytes,
+ magic=0,
+ attributes=Message.CODEC_GZIP))
+ ]
+ records = list(fetcher._unpack_message_set(tp, compressed_msgs))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 0
+ assert records[1].offset == 1
+ assert records[2].offset == 2
+
+
+def test__unpack_message_set_compressed_v1(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c')),
+ ]
+ message_bytes = []
+ for offset, _, m in messages:
+ encoded = m.encode()
+ message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
+ compressed_bytes = gzip_encode(b''.join(message_bytes))
+ compressed_base_offset = 10
+ compressed_msgs = [
+ (compressed_base_offset, None,
+ Message(compressed_bytes,
+ magic=1,
+ attributes=Message.CODEC_GZIP))
+ ]
+ records = list(fetcher._unpack_message_set(tp, compressed_msgs))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 8
+ assert records[1].offset == 9
+ assert records[2].offset == 10
+
+
+def test__parse_record(fetcher):
+ tp = TopicPartition('foo', 0)
+ record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
+ assert record.topic == 'foo'
+ assert record.partition == 0
+ assert record.offset == 123
+ assert record.timestamp == 456
+ assert record.value == b'abc'
+ assert record.key is None
-def test__parse_record():
- pass
+def test__message_generator(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._completed_fetches.append(completed_fetch)
+ for i in range(10):
+ msg = next(fetcher)
+ assert isinstance(msg, ConsumerRecord)
+ assert msg.offset == i
+ assert msg.value == b'foo'
-def test_message_generator():
- pass
+def test__parse_fetched_data(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert isinstance(partition_record, fetcher.PartitionRecords)
+ assert len(partition_record) == 10
-def test__parse_fetched_data():
- pass
+def test__parse_fetched_data__paused(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._subscriptions.pause(tp)
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+
+
+def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 10, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+
+
+def test__parse_fetched_data__not_leader(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ fetcher._client.cluster.request_update.assert_called_with()
+
+
+def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ fetcher._client.cluster.request_update.assert_called_with()
+
+
+def test__parse_fetched_data__out_of_range(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ assert fetcher._subscriptions.assignment[tp].awaiting_reset is True