summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-10 10:01:30 -0700
committerDana Powers <dana.powers@rd.io>2015-06-10 10:01:30 -0700
commit043e9bb04f1149ab8f9f8942ee591227f659128d (patch)
tree04f39f253ee434b2cca1414319cc35e44018546e /test/test_failover_integration.py
parentded2ac7d321bab02c2b9fb3d8b03a60d6b9a5f84 (diff)
downloadkafka-python-043e9bb04f1149ab8f9f8942ee591227f659128d.tar.gz
Retry failed messages in failover integration tests; use module logger
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py29
1 files changed, 17 insertions, 12 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index f156152..e27f12b 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -2,8 +2,6 @@ import logging
import os
import time
-from . import unittest
-
from kafka import KafkaClient, SimpleConsumer, KeyedProducer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer
@@ -15,6 +13,9 @@ from test.testutil import (
)
+log = logging.getLogger(__name__)
+
+
class TestFailover(KafkaIntegrationTestCase):
create_client = False
@@ -73,12 +74,12 @@ class TestFailover(KafkaIntegrationTestCase):
timeout = 60
while not recovered and (time.time() - started) < timeout:
try:
- logging.debug("attempting to send 'success' message after leader killed")
+ log.debug("attempting to send 'success' message after leader killed")
producer.send_messages(topic, partition, b'success')
- logging.debug("success!")
+ log.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError):
- logging.debug("caught exception sending message -- will retry")
+ log.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
@@ -110,7 +111,7 @@ class TestFailover(KafkaIntegrationTestCase):
# kill leader for partition
self._kill_leader(topic, partition)
- logging.debug("attempting to send 'success' message after leader killed")
+ log.debug("attempting to send 'success' message after leader killed")
# in async mode, this should return immediately
producer.send_messages(topic, partition, b'success')
@@ -164,7 +165,7 @@ class TestFailover(KafkaIntegrationTestCase):
if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError):
- logging.debug("caught exception sending message -- will retry")
+ log.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
@@ -187,12 +188,16 @@ class TestFailover(KafkaIntegrationTestCase):
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
- logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
msg = 'msg {0}: {1}'.format(j, random_string(10))
- resp = producer.send_messages(topic, partition, msg.encode('utf-8'))
- if len(resp) > 0:
- self.assertEqual(resp[0].error, 0)
- logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
+ log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
+ while True:
+ try:
+ producer.send_messages(topic, partition, msg.encode('utf-8'))
+ except:
+ log.exception('failure in _send_random_messages - retrying')
+ continue
+ else:
+ break
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)]