summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
blob: ca71f2ddbc58b0f5ecfee4d87bf9261a4186d1fb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import logging
import os
import time

from . import unittest

from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
    KafkaIntegrationTestCase, kafka_versions, random_string
)


class TestFailover(KafkaIntegrationTestCase):
    create_client = False

    @classmethod
    def setUpClass(cls):  # noqa
        if not os.environ.get('KAFKA_VERSION'):
            return

        zk_chroot = random_string(10)
        replicas = 2
        partitions = 2

        # mini zookeeper, 2 kafka brokers
        cls.zk = ZookeeperFixture.instance()
        kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
        cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]

        hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
        cls.client = KafkaClient(hosts)

    @classmethod
    def tearDownClass(cls):
        if not os.environ.get('KAFKA_VERSION'):
            return

        cls.client.close()
        for broker in cls.brokers:
            broker.close()
        cls.zk.close()

    @kafka_versions("all")
    def test_switch_leader(self):
        topic = self.topic
        partition = 0

        # Test the base class Producer -- send_messages to a specific partition
        producer = Producer(self.client, async=False,
                            req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)

        # Send 10 random messages
        self._send_random_messages(producer, topic, partition, 100)

        # kill leader for partition
        self._kill_leader(topic, partition)

        # expect failure, but dont wait more than 60 secs to recover
        recovered = False
        started = time.time()
        timeout = 60
        while not recovered and (time.time() - started) < timeout:
            try:
                logging.debug("attempting to send 'success' message after leader killed")
                producer.send_messages(topic, partition, b'success')
                logging.debug("success!")
                recovered = True
            except (FailedPayloadsError, ConnectionError):
                logging.debug("caught exception sending message -- will retry")
                continue

        # Verify we successfully sent the message
        self.assertTrue(recovered)

        # send some more messages to new leader
        self._send_random_messages(producer, topic, partition, 100)

        # count number of messages
        # Should be equal to 10 before + 1 recovery + 10 after
        self.assert_message_count(topic, 201, partitions=(partition,))


    #@kafka_versions("all")
    @unittest.skip("async producer does not support reliable failover yet")
    def test_switch_leader_async(self):
        topic = self.topic
        partition = 0

        # Test the base class Producer -- send_messages to a specific partition
        producer = Producer(self.client, async=True)

        # Send 10 random messages
        self._send_random_messages(producer, topic, partition, 10)

        # kill leader for partition
        self._kill_leader(topic, partition)

        logging.debug("attempting to send 'success' message after leader killed")

        # in async mode, this should return immediately
        producer.send_messages(topic, partition, 'success')

        # send to new leader
        self._send_random_messages(producer, topic, partition, 10)

        # wait until producer queue is empty
        while not producer.queue.empty():
            time.sleep(0.1)
        producer.stop()

        # count number of messages
        # Should be equal to 10 before + 1 recovery + 10 after
        self.assert_message_count(topic, 21, partitions=(partition,))

    def _send_random_messages(self, producer, topic, partition, n):
        for j in range(n):
            logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
            resp = producer.send_messages(topic, partition, random_string(10))
            if len(resp) > 0:
                self.assertEqual(resp[0].error, 0)
            logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)

    def _kill_leader(self, topic, partition):
        leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
        broker = self.brokers[leader.nodeId]
        broker.close()
        return broker

    def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
        hosts = ','.join(['%s:%d' % (broker.host, broker.port)
                          for broker in self.brokers])

        client = KafkaClient(hosts)
        group = random_string(10)
        consumer = SimpleConsumer(client, group, topic,
                                  partitions=partitions,
                                  auto_commit=False,
                                  iter_timeout=timeout)

        started_at = time.time()
        pending = consumer.pending(partitions)

        # Keep checking if it isn't immediately correct, subject to timeout
        while pending != check_count and (time.time() - started_at < timeout):
            pending = consumer.pending(partitions)

        consumer.stop()
        client.close()

        self.assertEqual(pending, check_count)