diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 23:13:46 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-08 23:27:09 -0700 |
commit | 53d8251a18d9c033269e105854a7c4cc9730930a (patch) | |
tree | 0b1c96f1f80fbc65153d5512de2295dd856f0b17 /test/test_failover_integration.py | |
parent | ddb536d87e7c6514d33a8b783cd955af05ed9b2f (diff) | |
download | kafka-python-53d8251a18d9c033269e105854a7c4cc9730930a.tar.gz |
Produce messages to both partitions in async producer leader switch test
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 5082d7c..91e22cf 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase): # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) + batch_send_every_n=15, + batch_send_every_t=3, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, + async_log_messages_on_error=False) # Send 10 random messages self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # kill leader for partition self._kill_leader(topic, partition) @@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase): # in async mode, this should return immediately producer.send_messages(topic, partition, b'success') + producer.send_messages(topic, partition + 1, b'success') # send to new leader self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # Stop the producer and wait for it to shutdown producer.stop() @@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,), at_least=True) + self.assert_message_count(topic, 21, partitions=(partition + 1,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): |