summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-08 23:13:46 -0700
committerDana Powers <dana.powers@rd.io>2015-06-08 23:27:09 -0700
commit53d8251a18d9c033269e105854a7c4cc9730930a (patch)
tree0b1c96f1f80fbc65153d5512de2295dd856f0b17 /test/test_failover_integration.py
parentddb536d87e7c6514d33a8b783cd955af05ed9b2f (diff)
downloadkafka-python-53d8251a18d9c033269e105854a7c4cc9730930a.tar.gz
Produce messages to both partitions in async producer leader switch test
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py10
1 files changed, 9 insertions, 1 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 5082d7c..91e22cf 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase):
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
+ batch_send_every_n=15,
+ batch_send_every_t=3,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+ async_log_messages_on_error=False)
# Send 10 random messages
self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition + 1, 10)
# kill leader for partition
self._kill_leader(topic, partition)
@@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase):
# in async mode, this should return immediately
producer.send_messages(topic, partition, b'success')
+ producer.send_messages(topic, partition + 1, b'success')
# send to new leader
self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition + 1, 10)
# Stop the producer and wait for it to shutdown
producer.stop()
@@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,),
at_least=True)
+ self.assert_message_count(topic, 21, partitions=(partition + 1,),
+ at_least=True)
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):