1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
import gc
import platform
import time
import threading
import pytest
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.fixtures import random_string
def test_buffer_pool():
pool = SimpleBufferPool(1000, 1000)
buf1 = pool.allocate(1000, 1000)
message = ''.join(map(str, range(100)))
buf1.write(message.encode('utf-8'))
pool.deallocate(buf1)
buf2 = pool.allocate(1000, 1000)
assert buf2.read() == b''
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_end_to_end(kafka_broker, compression):
if compression == 'lz4':
# LZ4 requires 0.8.2
if version() < (0, 8, 2):
return
# python-lz4 crashes on older versions of pypy
elif platform.python_implementation() == 'PyPy':
return
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=30000,
compression_type=compression,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=connect_str,
group_id=None,
consumer_timeout_ms=30000,
auto_offset_reset='earliest',
value_deserializer=bytes.decode)
topic = random_string(5)
messages = 100
futures = []
for i in range(messages):
futures.append(producer.send(topic, 'msg %d' % i))
ret = [f.get(timeout=30) for f in futures]
assert len(ret) == messages
producer.close()
consumer.subscribe([topic])
msgs = set()
for i in range(messages):
try:
msgs.add(next(consumer).value)
except StopIteration:
break
assert msgs == set(['msg %d' % (i,) for i in range(messages)])
consumer.close()
@pytest.mark.skipif(platform.python_implementation() != 'CPython',
reason='Test relies on CPython-specific gc policies')
def test_kafka_producer_gc_cleanup():
gc.collect()
threads = threading.active_count()
producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection
assert threading.active_count() == threads + 1
del(producer)
gc.collect()
assert threading.active_count() == threads
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=30000,
compression_type=compression)
magic = producer._max_usable_produce_magic()
# record headers are supported in 0.11.0
if version() < (0, 11, 0):
headers = None
else:
headers = [("Header Key", b"Header Value")]
topic = random_string(5)
future = producer.send(
topic,
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
partition=0)
record = future.get(timeout=5)
assert record is not None
assert record.topic == topic
assert record.partition == 0
assert record.topic_partition == TopicPartition(topic, 0)
assert record.offset == 0
if magic >= 1:
assert record.timestamp == 9999999
else:
assert record.timestamp == -1 # NO_TIMESTAMP
if magic >= 2:
assert record.checksum is None
elif magic == 1:
assert record.checksum == 1370034956
else:
assert record.checksum == 3296137851
assert record.serialized_key_size == 10
assert record.serialized_value_size == 12
if headers:
assert record.serialized_header_size == 22
# generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
return
send_time = time.time() * 1000
future = producer.send(
topic,
value=b"Simple value", key=b"Simple key", timestamp_ms=None,
partition=0)
record = future.get(timeout=5)
assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation
|