diff options
author | Andre Araujo <asdaraujo@gmail.com> | 2018-02-11 15:08:11 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-02-21 13:30:12 -0800 |
commit | fb279d7b968578cc389a699b812795d29248754d (patch) | |
tree | 9e63e734c4910aaf2c0bd9bf0fe22504dda879f7 | |
parent | a1869c4be5f47b4f6433610249aaf29af4ec95e5 (diff) | |
download | kafka-python-fb279d7b968578cc389a699b812795d29248754d.tar.gz |
Fixes racing condition when message is sent to broker before topic logs are created
-rw-r--r-- | test/testutil.py | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/test/testutil.py b/test/testutil.py index 850e925..4e5db47 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -9,8 +9,10 @@ import pytest from . import unittest from kafka import SimpleClient, create_message -from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError +from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \ + NotLeaderForPartitionError, UnknownTopicOrPartitionError, \ + FailedPayloadsError from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order def kafka_versions(*versions): @@ -123,11 +125,25 @@ class KafkaIntegrationTestCase(unittest.TestCase): self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False) if self.client.has_metadata_for_topic(topic): break - except LeaderNotAvailableError: + except (LeaderNotAvailableError, InvalidTopicError): time.sleep(1) else: raise KafkaTimeoutError('Timeout loading topic metadata!') + + # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors + # TODO: It might be a good idea to move this to self.client.ensure_topic_exists + for partition in self.client.get_partition_ids_for_topic(self.topic): + while True: + try: + req = OffsetRequestPayload(self.topic, partition, -1, 100) + self.client.send_offset_request([req]) + break + except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e: + if time.time() > timeout: + raise KafkaTimeoutError('Timeout loading topic metadata!') + time.sleep(.1) + self._messages = {} def tearDown(self): |