summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-11 20:02:18 +0300
committerTaras <voyn1991@gmail.com>2017-10-12 11:10:44 +0300
commite992fbfad926486766ff7b63a499f9cf29984fec (patch)
tree5d0ede2b60c84d06cecc9e9c2a1ee914d64f4bef
parent0557983b2ae05adc2f1076d5e670d693c8327ab9 (diff)
downloadkafka-python-e992fbfad926486766ff7b63a499f9cf29984fec.tar.gz
Fix tests and rebase problems
-rw-r--r--kafka/consumer/fetcher.py3
-rw-r--r--test/test_fetcher.py122
2 files changed, 28 insertions, 97 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 493c1ff..dd90c2e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -728,7 +728,6 @@ class Fetcher(six.Iterator):
def _parse_fetched_data(self, completed_fetch):
tp = completed_fetch.topic_partition
- partition = completed_fetch.partition_data
fetch_offset = completed_fetch.fetched_offset
num_bytes = 0
records_count = 0
@@ -736,7 +735,6 @@ class Fetcher(six.Iterator):
error_code, highwater = completed_fetch.partition_data[:2]
error_type = Errors.for_code(error_code)
- records = MemoryRecords(partition_data[-1])
try:
if not self._subscriptions.is_fetchable(tp):
@@ -760,6 +758,7 @@ class Fetcher(six.Iterator):
position)
return None
+ records = MemoryRecords(completed_fetch.partition_data[-1])
if records.has_next():
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 5da597c..364a808 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -8,22 +8,20 @@ import itertools
import time
from kafka.client_async import KafkaClient
-from kafka.codec import gzip_encode
from kafka.consumer.fetcher import (
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest, FetchResponse
-from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
-from kafka.protocol.types import Int64, Int32
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
+from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
@pytest.fixture
@@ -51,6 +49,16 @@ def fetcher(client, subscription_state, topic):
return Fetcher(client, subscription_state, Metrics())
+def _build_record_batch(msgs, compression=0):
+ builder = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=9999999)
+ for msg in msgs:
+ key, value, timestamp = msg
+ builder.append(key=key, value=value, timestamp=timestamp)
+ builder.close()
+ return builder.buffer()
+
+
def test_send_fetches(fetcher, topic, mocker):
fetch_requests = [
FetchRequest[0](
@@ -321,12 +329,12 @@ def test_partition_records_offset():
def test_fetched_records(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
+
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
@@ -401,11 +409,12 @@ def test__unpack_message_set(fetcher):
fetcher.config['check_crcs'] = False
tp = TopicPartition('foo', 0)
messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c'))
+ (None, b"a", None),
+ (None, b"b", None),
+ (None, b"c", None),
]
- records = list(fetcher._unpack_message_set(tp, messages))
+ memory_records = MemoryRecords(_build_record_batch(messages))
+ records = list(fetcher._unpack_message_set(tp, memory_records))
assert len(records) == 3
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
assert records[0].value == b'a'
@@ -416,88 +425,14 @@ def test__unpack_message_set(fetcher):
assert records[2].offset == 2
-def test__unpack_message_set_compressed_v0(fetcher):
- fetcher.config['check_crcs'] = False
- tp = TopicPartition('foo', 0)
- messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c')),
- ]
- message_bytes = []
- for offset, _, m in messages:
- encoded = m.encode()
- message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
- compressed_bytes = gzip_encode(b''.join(message_bytes))
- compressed_base_offset = 0
- compressed_msgs = [
- (compressed_base_offset, None,
- Message(compressed_bytes,
- magic=0,
- attributes=Message.CODEC_GZIP))
- ]
- records = list(fetcher._unpack_message_set(tp, compressed_msgs))
- assert len(records) == 3
- assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
- assert records[0].value == b'a'
- assert records[1].value == b'b'
- assert records[2].value == b'c'
- assert records[0].offset == 0
- assert records[1].offset == 1
- assert records[2].offset == 2
-
-
-def test__unpack_message_set_compressed_v1(fetcher):
- fetcher.config['check_crcs'] = False
- tp = TopicPartition('foo', 0)
- messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c')),
- ]
- message_bytes = []
- for offset, _, m in messages:
- encoded = m.encode()
- message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
- compressed_bytes = gzip_encode(b''.join(message_bytes))
- compressed_base_offset = 10
- compressed_msgs = [
- (compressed_base_offset, None,
- Message(compressed_bytes,
- magic=1,
- attributes=Message.CODEC_GZIP))
- ]
- records = list(fetcher._unpack_message_set(tp, compressed_msgs))
- assert len(records) == 3
- assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
- assert records[0].value == b'a'
- assert records[1].value == b'b'
- assert records[2].value == b'c'
- assert records[0].offset == 8
- assert records[1].offset == 9
- assert records[2].offset == 10
-
-
-def test__parse_record(fetcher):
- tp = TopicPartition('foo', 0)
- record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
- assert record.topic == 'foo'
- assert record.partition == 0
- assert record.offset == 123
- assert record.timestamp == 456
- assert record.value == b'abc'
- assert record.key is None
-
-
def test__message_generator(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
@@ -513,10 +448,9 @@ def test__parse_fetched_data(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
partition_record = fetcher._parse_fetched_data(completed_fetch)
@@ -529,10 +463,9 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._subscriptions.pause(tp)
@@ -545,10 +478,9 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 10, 0, [0, 100, msgs],
+ tp, 10, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
partition_record = fetcher._parse_fetched_data(completed_fetch)