diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-10 10:01:30 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-10 10:01:30 -0700 |
commit | 043e9bb04f1149ab8f9f8942ee591227f659128d (patch) | |
tree | 04f39f253ee434b2cca1414319cc35e44018546e /test/test_failover_integration.py | |
parent | ded2ac7d321bab02c2b9fb3d8b03a60d6b9a5f84 (diff) | |
download | kafka-python-043e9bb04f1149ab8f9f8942ee591227f659128d.tar.gz |
Retry failed messages in failover integration tests; use module logger
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index f156152..e27f12b 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -2,8 +2,6 @@ import logging import os import time -from . import unittest - from kafka import KafkaClient, SimpleConsumer, KeyedProducer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer.base import Producer @@ -15,6 +13,9 @@ from test.testutil import ( ) +log = logging.getLogger(__name__) + + class TestFailover(KafkaIntegrationTestCase): create_client = False @@ -73,12 +74,12 @@ class TestFailover(KafkaIntegrationTestCase): timeout = 60 while not recovered and (time.time() - started) < timeout: try: - logging.debug("attempting to send 'success' message after leader killed") + log.debug("attempting to send 'success' message after leader killed") producer.send_messages(topic, partition, b'success') - logging.debug("success!") + log.debug("success!") recovered = True except (FailedPayloadsError, ConnectionError): - logging.debug("caught exception sending message -- will retry") + log.debug("caught exception sending message -- will retry") continue # Verify we successfully sent the message @@ -110,7 +111,7 @@ class TestFailover(KafkaIntegrationTestCase): # kill leader for partition self._kill_leader(topic, partition) - logging.debug("attempting to send 'success' message after leader killed") + log.debug("attempting to send 'success' message after leader killed") # in async mode, this should return immediately producer.send_messages(topic, partition, b'success') @@ -164,7 +165,7 @@ class TestFailover(KafkaIntegrationTestCase): if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0: recovered = True except (FailedPayloadsError, ConnectionError): - logging.debug("caught exception sending message -- will retry") + log.debug("caught exception sending message -- will retry") continue # Verify we successfully sent the message @@ -187,12 +188,16 @@ class TestFailover(KafkaIntegrationTestCase): def _send_random_messages(self, producer, topic, partition, n): for j in range(n): - logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) msg = 'msg {0}: {1}'.format(j, random_string(10)) - resp = producer.send_messages(topic, partition, msg.encode('utf-8')) - if len(resp) > 0: - self.assertEqual(resp[0].error, 0) - logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) + log.debug('_send_random_message %s to %s:%d', msg, topic, partition) + while True: + try: + producer.send_messages(topic, partition, msg.encode('utf-8')) + except: + log.exception('failure in _send_random_messages - retrying') + continue + else: + break def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)] |