1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
import logging
import os
import time
from . import unittest
from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
KafkaIntegrationTestCase, kafka_versions, random_string
)
class TestFailover(KafkaIntegrationTestCase):
create_client = False
@classmethod
def setUpClass(cls): # noqa
if not os.environ.get('KAFKA_VERSION'):
return
zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
cls.client = KafkaClient(hosts)
@classmethod
def tearDownClass(cls):
if not os.environ.get('KAFKA_VERSION'):
return
cls.client.close()
for broker in cls.brokers:
broker.close()
cls.zk.close()
@kafka_versions("all")
def test_switch_leader(self):
topic = self.topic
partition = 0
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=False,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
# Send 10 random messages
self._send_random_messages(producer, topic, partition, 100)
# kill leader for partition
self._kill_leader(topic, partition)
# expect failure, but dont wait more than 60 secs to recover
recovered = False
started = time.time()
timeout = 60
while not recovered and (time.time() - started) < timeout:
try:
logging.debug("attempting to send 'success' message after leader killed")
producer.send_messages(topic, partition, b'success')
logging.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError):
logging.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
self.assertTrue(recovered)
# send some more messages to new leader
self._send_random_messages(producer, topic, partition, 100)
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 201, partitions=(partition,))
#@kafka_versions("all")
@unittest.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
topic = self.topic
partition = 0
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True)
# Send 10 random messages
self._send_random_messages(producer, topic, partition, 10)
# kill leader for partition
self._kill_leader(topic, partition)
logging.debug("attempting to send 'success' message after leader killed")
# in async mode, this should return immediately
producer.send_messages(topic, partition, 'success')
# send to new leader
self._send_random_messages(producer, topic, partition, 10)
# wait until producer queue is empty
while not producer.queue.empty():
time.sleep(0.1)
producer.stop()
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,))
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
resp = producer.send_messages(topic, partition, random_string(10))
if len(resp) > 0:
self.assertEqual(resp[0].error, 0)
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
return broker
def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = KafkaClient(hosts)
group = random_string(10)
consumer = SimpleConsumer(client, group, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)
started_at = time.time()
pending = consumer.pending(partitions)
# Keep checking if it isn't immediately correct, subject to timeout
while pending != check_count and (time.time() - started_at < timeout):
pending = consumer.pending(partitions)
consumer.stop()
client.close()
self.assertEqual(pending, check_count)
|