summaryrefslogtreecommitdiff
path: root/test/test_sasl_integration.py
blob: e3a4813aec33d53f4807eec4435b9d6a2d9da8e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import logging
import uuid

import pytest

from kafka.admin import NewTopic
from kafka.protocol.metadata import MetadataRequest_v1
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore


@pytest.fixture(
    params=[
        pytest.param(
            "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10")
        ),
        pytest.param(
            "SCRAM-SHA-256",
            marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
        ),
        pytest.param(
            "SCRAM-SHA-512",
            marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
        ),
    ]
)
def sasl_kafka(request, kafka_broker_factory):
    sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0]
    yield sasl_kafka
    sasl_kafka.child.dump_logs()


def test_admin(request, sasl_kafka):
    topic_name = special_to_underscore(request.node.name + random_string(4))
    admin, = sasl_kafka.get_admin_clients(1)
    admin.create_topics([NewTopic(topic_name, 1, 1)])
    assert topic_name in sasl_kafka.get_topic_names()


def test_produce_and_consume(request, sasl_kafka):
    topic_name = special_to_underscore(request.node.name + random_string(4))
    sasl_kafka.create_topics([topic_name], num_partitions=2)
    producer, = sasl_kafka.get_producers(1)

    messages_and_futures = []  # [(message, produce_future),]
    for i in range(100):
        encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8")
        future = producer.send(topic_name, value=encoded_msg, partition=i % 2)
        messages_and_futures.append((encoded_msg, future))
    producer.flush()

    for (msg, f) in messages_and_futures:
        assert f.succeeded()

    consumer, = sasl_kafka.get_consumers(1, [topic_name])
    messages = {0: [], 1: []}
    for i, message in enumerate(consumer, 1):
        logging.debug("Consumed message %s", repr(message))
        messages[message.partition].append(message)
        if i >= 100:
            break

    assert_message_count(messages[0], 50)
    assert_message_count(messages[1], 50)


def test_client(request, sasl_kafka):
    topic_name = special_to_underscore(request.node.name + random_string(4))
    sasl_kafka.create_topics([topic_name], num_partitions=1)

    client, = sasl_kafka.get_clients(1)
    request = MetadataRequest_v1(None)
    client.send(0, request)
    for _ in range(10):
        result = client.poll(timeout_ms=10000)
        if len(result) > 0:
            break
    else:
        raise RuntimeError("Couldn't fetch topic response from Broker.")
    result = result[0]
    assert topic_name in [t[1] for t in result.topics]