From fb279d7b968578cc389a699b812795d29248754d Mon Sep 17 00:00:00 2001 From: Andre Araujo Date: Sun, 11 Feb 2018 15:08:11 -0800 Subject: Fixes racing condition when message is sent to broker before topic logs are created --- test/testutil.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'test') diff --git a/test/testutil.py b/test/testutil.py index 850e925..4e5db47 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -9,8 +9,10 @@ import pytest from . import unittest from kafka import SimpleClient, create_message -from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError +from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \ + NotLeaderForPartitionError, UnknownTopicOrPartitionError, \ + FailedPayloadsError from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order def kafka_versions(*versions): @@ -123,11 +125,25 @@ class KafkaIntegrationTestCase(unittest.TestCase): self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False) if self.client.has_metadata_for_topic(topic): break - except LeaderNotAvailableError: + except (LeaderNotAvailableError, InvalidTopicError): time.sleep(1) else: raise KafkaTimeoutError('Timeout loading topic metadata!') + + # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors + # TODO: It might be a good idea to move this to self.client.ensure_topic_exists + for partition in self.client.get_partition_ids_for_topic(self.topic): + while True: + try: + req = OffsetRequestPayload(self.topic, partition, -1, 100) + self.client.send_offset_request([req]) + break + except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e: + if time.time() > timeout: + raise KafkaTimeoutError('Timeout loading topic metadata!') + time.sleep(.1) + self._messages = {} def tearDown(self): -- cgit v1.2.1