summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
blob: 58e9463a834f1ea038c672064e015fe4e4cc2c32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import logging
import os
import time

from kafka import SimpleClient, SimpleConsumer, KeyedProducer
from kafka.errors import (
    FailedPayloadsError, ConnectionError, RequestTimedOutError)
from kafka.producer.base import Producer
from kafka.structs import TopicPartition

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, random_string


log = logging.getLogger(__name__)


class TestFailover(KafkaIntegrationTestCase):
    create_client = False

    def setUp(self):
        if not os.environ.get('KAFKA_VERSION'):
            self.skipTest('integration test requires KAFKA_VERSION')

        zk_chroot = random_string(10)
        replicas = 3
        partitions = 3

        # mini zookeeper, 3 kafka brokers
        self.zk = ZookeeperFixture.instance()
        kk_args = [self.zk.host, self.zk.port]
        kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
                     'partitions': partitions}
        self.brokers = [KafkaFixture.instance(i, *kk_args, **kk_kwargs)
                        for i in range(replicas)]

        hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
        self.client = SimpleClient(hosts, timeout=2)
        super(TestFailover, self).setUp()

    def tearDown(self):
        super(TestFailover, self).tearDown()
        if not os.environ.get('KAFKA_VERSION'):
            return

        self.client.close()
        for broker in self.brokers:
            broker.close()
        self.zk.close()

    def test_switch_leader(self):
        topic = self.topic
        partition = 0

        # Testing the base Producer class here so that we can easily send
        # messages to a specific partition, kill the leader for that partition
        # and check that after another broker takes leadership the producer
        # is able to resume sending messages

        # require that the server commit messages to all in-sync replicas
        # so that failover doesn't lose any messages on server-side
        # and we can assert that server-side message count equals client-side
        producer = Producer(self.client, async=False,
                            req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)

        # Send 100 random messages to a specific partition
        self._send_random_messages(producer, topic, partition, 100)

        # kill leader for partition
        self._kill_leader(topic, partition)

        # expect failure, but dont wait more than 60 secs to recover
        recovered = False
        started = time.time()
        timeout = 60
        while not recovered and (time.time() - started) < timeout:
            try:
                log.debug("attempting to send 'success' message after leader killed")
                producer.send_messages(topic, partition, b'success')
                log.debug("success!")
                recovered = True
            except (FailedPayloadsError, ConnectionError, RequestTimedOutError):
                log.debug("caught exception sending message -- will retry")
                continue

        # Verify we successfully sent the message
        self.assertTrue(recovered)

        # send some more messages to new leader
        self._send_random_messages(producer, topic, partition, 100)

        # count number of messages
        # Should be equal to 100 before + 1 recovery + 100 after
        # at_least=True because exactly once delivery isn't really a thing
        self.assert_message_count(topic, 201, partitions=(partition,),
                                  at_least=True)

    def test_switch_leader_async(self):
        topic = self.topic
        partition = 0

        # Test the base class Producer -- send_messages to a specific partition
        producer = Producer(self.client, async=True,
                            batch_send_every_n=15,
                            batch_send_every_t=3,
                            req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
                            async_log_messages_on_error=False)

        # Send 10 random messages
        self._send_random_messages(producer, topic, partition, 10)
        self._send_random_messages(producer, topic, partition + 1, 10)

        # kill leader for partition
        self._kill_leader(topic, partition)

        log.debug("attempting to send 'success' message after leader killed")

        # in async mode, this should return immediately
        producer.send_messages(topic, partition, b'success')
        producer.send_messages(topic, partition + 1, b'success')

        # send to new leader
        self._send_random_messages(producer, topic, partition, 10)
        self._send_random_messages(producer, topic, partition + 1, 10)

        # Stop the producer and wait for it to shutdown
        producer.stop()
        started = time.time()
        timeout = 60
        while (time.time() - started) < timeout:
            if not producer.thread.is_alive():
                break
            time.sleep(0.1)
        else:
            self.fail('timeout waiting for producer queue to empty')

        # count number of messages
        # Should be equal to 10 before + 1 recovery + 10 after
        # at_least=True because exactly once delivery isn't really a thing
        self.assert_message_count(topic, 21, partitions=(partition,),
                                  at_least=True)
        self.assert_message_count(topic, 21, partitions=(partition + 1,),
                                  at_least=True)

    def test_switch_leader_keyed_producer(self):
        topic = self.topic

        producer = KeyedProducer(self.client, async=False)

        # Send 10 random messages
        for _ in range(10):
            key = random_string(3).encode('utf-8')
            msg = random_string(10).encode('utf-8')
            producer.send_messages(topic, key, msg)

        # kill leader for partition 0
        self._kill_leader(topic, 0)

        recovered = False
        started = time.time()
        timeout = 60
        while not recovered and (time.time() - started) < timeout:
            try:
                key = random_string(3).encode('utf-8')
                msg = random_string(10).encode('utf-8')
                producer.send_messages(topic, key, msg)
                if producer.partitioners[topic].partition(key) == 0:
                    recovered = True
            except (FailedPayloadsError, ConnectionError):
                log.debug("caught exception sending message -- will retry")
                continue

        # Verify we successfully sent the message
        self.assertTrue(recovered)

        # send some more messages just to make sure no more exceptions
        for _ in range(10):
            key = random_string(3).encode('utf-8')
            msg = random_string(10).encode('utf-8')
            producer.send_messages(topic, key, msg)

    def test_switch_leader_simple_consumer(self):
        producer = Producer(self.client, async=False)
        consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
        self._send_random_messages(producer, self.topic, 0, 2)
        consumer.get_messages()
        self._kill_leader(self.topic, 0)
        consumer.get_messages()

    def _send_random_messages(self, producer, topic, partition, n):
        for j in range(n):
            msg = 'msg {0}: {1}'.format(j, random_string(10))
            log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
            while True:
                try:
                    producer.send_messages(topic, partition, msg.encode('utf-8'))
                except:
                    log.exception('failure in _send_random_messages - retrying')
                    continue
                else:
                    break

    def _kill_leader(self, topic, partition):
        leader = self.client.topics_to_brokers[TopicPartition(topic, partition)]
        broker = self.brokers[leader.nodeId]
        broker.close()
        return broker

    def assert_message_count(self, topic, check_count, timeout=10,
                             partitions=None, at_least=False):
        hosts = ','.join(['%s:%d' % (broker.host, broker.port)
                          for broker in self.brokers])

        client = SimpleClient(hosts)
        consumer = SimpleConsumer(client, None, topic,
                                  partitions=partitions,
                                  auto_commit=False,
                                  iter_timeout=timeout)

        started_at = time.time()
        pending = -1
        while pending < check_count and (time.time() - started_at < timeout):
            try:
                pending = consumer.pending(partitions)
            except FailedPayloadsError:
                pass
            time.sleep(0.5)

        consumer.stop()
        client.close()

        if pending < check_count:
            self.fail('Too few pending messages: found %d, expected %d' %
                      (pending, check_count))
        elif pending > check_count and not at_least:
            self.fail('Too many pending messages: found %d, expected %d' %
                      (pending, check_count))
        return True