summaryrefslogtreecommitdiff
path: root/test/test_producer.py
blob: f11bb05968020438e2b031b0a0bac06e4e8aad06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sys

import pytest

from kafka import KafkaConsumer, KafkaProducer
from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.testutil import random_string


def test_buffer_pool():
    pool = SimpleBufferPool(1000, 1000)

    buf1 = pool.allocate(1000, 1000)
    message = ''.join(map(str, range(100)))
    buf1.write(message.encode('utf-8'))
    pool.deallocate(buf1)

    buf2 = pool.allocate(1000, 1000)
    assert buf2.read() == b''


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_end_to_end(kafka_broker, compression):

    if compression == 'lz4':
        # LZ4 requires 0.8.2
        if version() < (0, 8, 2):
            return
        # LZ4 python libs dont work on python2.6
        elif sys.version_info < (2, 7):
            return

    connect_str = 'localhost:' + str(kafka_broker.port)
    producer = KafkaProducer(bootstrap_servers=connect_str,
                             retries=5,
                             max_block_ms=10000,
                             compression_type=compression,
                             value_serializer=str.encode)
    consumer = KafkaConsumer(bootstrap_servers=connect_str,
                             group_id=None,
                             consumer_timeout_ms=10000,
                             auto_offset_reset='earliest',
                             value_deserializer=bytes.decode)

    topic = random_string(5)

    messages = 100
    futures = []
    for i in range(messages):
        futures.append(producer.send(topic, 'msg %d' % i))
    ret = [f.get(timeout=30) for f in futures]
    assert len(ret) == messages

    producer.close()

    consumer.subscribe([topic])
    msgs = set()
    for i in range(messages):
        try:
            msgs.add(next(consumer).value)
        except StopIteration:
            break

    assert msgs == set(['msg %d' % i for i in range(messages)])