summaryrefslogtreecommitdiff
path: root/test/test_consumer_group.py
blob: 33676179d0a03796474619bc58e9dcd619a2bfd2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import collections
import logging
import threading
import time

import pytest
from kafka.vendor import six

from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState
from kafka.structs import TopicPartition

from test.testutil import env_kafka_version, random_string


def get_connect_str(kafka_broker):
    return kafka_broker.host + ':' + str(kafka_broker.port)


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_consumer(kafka_broker, topic):
    # The `topic` fixture is included because
    # 0.8.2 brokers need a topic to function well
    consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
    consumer.poll(500)
    assert len(consumer._client._conns) > 0
    node_id = list(consumer._client._conns.keys())[0]
    assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
    consumer.close()

@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_consumer_topics(kafka_broker, topic):
    consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
    # Necessary to drive the IO
    consumer.poll(500)
    assert topic in consumer.topics()
    assert len(consumer.partitions_for_topic(topic)) > 0
    consumer.close()

@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
    num_partitions = 4
    connect_str = get_connect_str(kafka_broker)
    consumers = {}
    stop = {}
    threads = {}
    messages = collections.defaultdict(list)
    group_id = 'test-group-' + random_string(6)
    def consumer_thread(i):
        assert i not in consumers
        assert i not in stop
        stop[i] = threading.Event()
        consumers[i] = KafkaConsumer(topic,
                                     bootstrap_servers=connect_str,
                                     group_id=group_id,
                                     heartbeat_interval_ms=500)
        while not stop[i].is_set():
            for tp, records in six.itervalues(consumers[i].poll(100)):
                messages[i][tp].extend(records)
        consumers[i].close()
        consumers[i] = None
        stop[i] = None

    num_consumers = 4
    for i in range(num_consumers):
        t = threading.Thread(target=consumer_thread, args=(i,))
        t.start()
        threads[i] = t

    try:
        timeout = time.time() + 35
        while True:
            for c in range(num_consumers):

                # Verify all consumers have been created
                if c not in consumers:
                    break

                # Verify all consumers have an assignment
                elif not consumers[c].assignment():
                    break

            # If all consumers exist and have an assignment
            else:

                logging.info('All consumers have assignment... checking for stable group')
                # Verify all consumers are in the same generation
                # then log state and break while loop
                generations = set([consumer._coordinator._generation.generation_id
                                   for consumer in list(consumers.values())])

                # New generation assignment is not complete until
                # coordinator.rejoining = False
                rejoining = any([consumer._coordinator.rejoining
                                 for consumer in list(consumers.values())])

                if not rejoining and len(generations) == 1:
                    for c, consumer in list(consumers.items()):
                        logging.info("[%s] %s %s: %s", c,
                                     consumer._coordinator._generation.generation_id,
                                     consumer._coordinator._generation.member_id,
                                     consumer.assignment())
                    break
                else:
                    logging.info('Rejoining: %s, generations: %s', rejoining, generations)
                    time.sleep(1)
            assert time.time() < timeout, "timeout waiting for assignments"

        logging.info('Group stabilized; verifying assignment')
        group_assignment = set()
        for c in range(num_consumers):
            assert len(consumers[c].assignment()) != 0
            assert set.isdisjoint(consumers[c].assignment(), group_assignment)
            group_assignment.update(consumers[c].assignment())

        assert group_assignment == set([
            TopicPartition(topic, partition)
            for partition in range(num_partitions)])
        logging.info('Assignment looks good!')

    finally:
        logging.info('Shutting down %s consumers', num_consumers)
        for c in range(num_consumers):
            logging.info('Stopping consumer %s', c)
            stop[c].set()
            threads[c].join()
            threads[c] = None


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_paused(kafka_broker, topic):
    consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
    topics = [TopicPartition(topic, 1)]
    consumer.assign(topics)
    assert set(topics) == consumer.assignment()
    assert set() == consumer.paused()

    consumer.pause(topics[0])
    assert set([topics[0]]) == consumer.paused()

    consumer.resume(topics[0])
    assert set() == consumer.paused()

    consumer.unsubscribe()
    assert set() == consumer.paused()
    consumer.close()


@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_heartbeat_thread(kafka_broker, topic):
    group_id = 'test-group-' + random_string(6)
    consumer = KafkaConsumer(topic,
                             bootstrap_servers=get_connect_str(kafka_broker),
                             group_id=group_id,
                             heartbeat_interval_ms=500)

    # poll until we have joined group / have assignment
    while not consumer.assignment():
        consumer.poll(timeout_ms=100)

    assert consumer._coordinator.state is MemberState.STABLE
    last_poll = consumer._coordinator.heartbeat.last_poll
    last_beat = consumer._coordinator.heartbeat.last_send

    timeout = time.time() + 30
    while True:
        if time.time() > timeout:
            raise RuntimeError('timeout waiting for heartbeat')
        if consumer._coordinator.heartbeat.last_send > last_beat:
            break
        time.sleep(0.5)

    assert consumer._coordinator.heartbeat.last_poll == last_poll
    consumer.poll(timeout_ms=100)
    assert consumer._coordinator.heartbeat.last_poll > last_poll
    consumer.close()