summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndre Araujo <asdaraujo@gmail.com>2018-02-11 15:08:11 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-02-21 13:30:12 -0800
commitfb279d7b968578cc389a699b812795d29248754d (patch)
tree9e63e734c4910aaf2c0bd9bf0fe22504dda879f7
parenta1869c4be5f47b4f6433610249aaf29af4ec95e5 (diff)
downloadkafka-python-fb279d7b968578cc389a699b812795d29248754d.tar.gz
Fixes racing condition when message is sent to broker before topic logs are created
-rw-r--r--test/testutil.py22
1 files changed, 19 insertions, 3 deletions
diff --git a/test/testutil.py b/test/testutil.py
index 850e925..4e5db47 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -9,8 +9,10 @@ import pytest
from . import unittest
from kafka import SimpleClient, create_message
-from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
-from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
+from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError
+from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError, \
+ FailedPayloadsError
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
def kafka_versions(*versions):
@@ -123,11 +125,25 @@ class KafkaIntegrationTestCase(unittest.TestCase):
self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
if self.client.has_metadata_for_topic(topic):
break
- except LeaderNotAvailableError:
+ except (LeaderNotAvailableError, InvalidTopicError):
time.sleep(1)
else:
raise KafkaTimeoutError('Timeout loading topic metadata!')
+
+ # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors
+ # TODO: It might be a good idea to move this to self.client.ensure_topic_exists
+ for partition in self.client.get_partition_ids_for_topic(self.topic):
+ while True:
+ try:
+ req = OffsetRequestPayload(self.topic, partition, -1, 100)
+ self.client.send_offset_request([req])
+ break
+ except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e:
+ if time.time() > timeout:
+ raise KafkaTimeoutError('Timeout loading topic metadata!')
+ time.sleep(.1)
+
self._messages = {}
def tearDown(self):