summaryrefslogtreecommitdiff
path: root/test/test_producer.py
blob: 263df11452f9c436eb961e7090583ed81a99decc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import pytest

from kafka import KafkaConsumer, KafkaProducer
from test.conftest import version
from test.testutil import random_string


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_end_to_end(kafka_broker):
    connect_str = 'localhost:' + str(kafka_broker.port)
    producer = KafkaProducer(bootstrap_servers=connect_str,
                             max_block_ms=10000,
                             value_serializer=str.encode)
    consumer = KafkaConsumer(bootstrap_servers=connect_str,
                             group_id=None,
                             consumer_timeout_ms=10000,
                             auto_offset_reset='earliest',
                             value_deserializer=bytes.decode)

    topic = random_string(5)

    for i in range(1000):
        producer.send(topic, 'msg %d' % i)
    producer.flush()
    producer.close()

    consumer.subscribe([topic])
    msgs = set()
    for i in range(1000):
        try:
            msgs.add(next(consumer).value)
        except StopIteration:
            break

    assert msgs == set(['msg %d' % i for i in range(1000)])