summaryrefslogtreecommitdiff
path: root/test/test_producer.py
blob: 7263130d110cf93d525f7a49d8f7ee3b9b84eaf8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import gc
import platform
import time
import threading

import pytest

from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from test.testutil import env_kafka_version, random_string


def test_buffer_pool():
    pool = SimpleBufferPool(1000, 1000)

    buf1 = pool.allocate(1000, 1000)
    message = ''.join(map(str, range(100)))
    buf1.write(message.encode('utf-8'))
    pool.deallocate(buf1)

    buf2 = pool.allocate(1000, 1000)
    assert buf2.read() == b''


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
    if compression == 'lz4':
        if env_kafka_version() < (0, 8, 2):
            pytest.skip('LZ4 requires 0.8.2')
        elif platform.python_implementation() == 'PyPy':
            pytest.skip('python-lz4 crashes on older versions of pypy')

    if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
        pytest.skip('zstd requires kafka 2.1.0 or newer')

    connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
    producer = KafkaProducer(bootstrap_servers=connect_str,
                             retries=5,
                             max_block_ms=30000,
                             compression_type=compression,
                             value_serializer=str.encode)
    consumer = KafkaConsumer(bootstrap_servers=connect_str,
                             group_id=None,
                             consumer_timeout_ms=30000,
                             auto_offset_reset='earliest',
                             value_deserializer=bytes.decode)

    topic = random_string(5)

    messages = 100
    futures = []
    for i in range(messages):
        futures.append(producer.send(topic, 'msg %d' % i))
    ret = [f.get(timeout=30) for f in futures]
    assert len(ret) == messages
    producer.close()

    consumer.subscribe([topic])
    msgs = set()
    for i in range(messages):
        try:
            msgs.add(next(consumer).value)
        except StopIteration:
            break

    assert msgs == set(['msg %d' % (i,) for i in range(messages)])
    consumer.close()


@pytest.mark.skipif(platform.python_implementation() != 'CPython',
                    reason='Test relies on CPython-specific gc policies')
def test_kafka_producer_gc_cleanup():
    gc.collect()
    threads = threading.active_count()
    producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection
    assert threading.active_count() == threads + 1
    del(producer)
    gc.collect()
    assert threading.active_count() == threads


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
    if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
        pytest.skip('zstd requires 2.1.0 or more')
    connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
    producer = KafkaProducer(bootstrap_servers=connect_str,
                             retries=5,
                             max_block_ms=30000,
                             compression_type=compression)
    magic = producer._max_usable_produce_magic()

    # record headers are supported in 0.11.0
    if env_kafka_version() < (0, 11, 0):
        headers = None
    else:
        headers = [("Header Key", b"Header Value")]

    topic = random_string(5)
    future = producer.send(
        topic,
        value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
        partition=0)
    record = future.get(timeout=5)
    assert record is not None
    assert record.topic == topic
    assert record.partition == 0
    assert record.topic_partition == TopicPartition(topic, 0)
    assert record.offset == 0
    if magic >= 1:
        assert record.timestamp == 9999999
    else:
        assert record.timestamp == -1  # NO_TIMESTAMP

    if magic >= 2:
        assert record.checksum is None
    elif magic == 1:
        assert record.checksum == 1370034956
    else:
        assert record.checksum == 3296137851

    assert record.serialized_key_size == 10
    assert record.serialized_value_size == 12
    if headers:
        assert record.serialized_header_size == 22

    if magic == 0:
        pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
    send_time = time.time() * 1000
    future = producer.send(
        topic,
        value=b"Simple value", key=b"Simple key", timestamp_ms=None,
        partition=0)
    record = future.get(timeout=5)
    assert abs(record.timestamp - send_time) <= 1000  # Allow 1s deviation