summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-08 23:39:37 -0800
committerGitHub <noreply@github.com>2018-02-08 23:39:37 -0800
commit8655c75e6a147080235d3458ec82edb9e1ff78a6 (patch)
treeec9c607a83f74962d557117ab5f561b146332999
parent7d8f9a41e0b7a83624e6ebab368de68b87f71997 (diff)
downloadkafka-python-8655c75e6a147080235d3458ec82edb9e1ff78a6.tar.gz
Increase some integration test timeouts (#1374)
-rw-r--r--.gitignore3
-rw-r--r--test/fixtures.py6
-rw-r--r--test/test_consumer_integration.py10
-rw-r--r--test/test_producer.py6
-rw-r--r--test/testutil.py12
5 files changed, 26 insertions, 11 deletions
diff --git a/.gitignore b/.gitignore
index edb75c5..f3cd082 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,4 +13,5 @@ docs/_build
.cache*
.idea/
integration-test/
-tests-env/ \ No newline at end of file
+tests-env/
+.pytest_cache/
diff --git a/test/fixtures.py b/test/fixtures.py
index 62c6d50..1c418fd 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -141,7 +141,7 @@ class ZookeeperFixture(Fixture):
# Party!
timeout = 5
- max_timeout = 30
+ max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
@@ -161,6 +161,7 @@ class ZookeeperFixture(Fixture):
timeout *= 2
time.sleep(backoff)
tries += 1
+ backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
self.out("Done!")
@@ -278,7 +279,7 @@ class KafkaFixture(Fixture):
env = self.kafka_run_class_env()
timeout = 5
- max_timeout = 30
+ max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
@@ -301,6 +302,7 @@ class KafkaFixture(Fixture):
timeout *= 2
time.sleep(backoff)
tries += 1
+ backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
self.out("Done!")
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index ded2314..40eec14 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -647,13 +647,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
early_time = late_time - 2000
tp = TopicPartition(self.topic, 0)
+ timeout = 10
kafka_producer = self.kafka_producer()
early_msg = kafka_producer.send(
self.topic, partition=0, value=b"first",
- timestamp_ms=early_time).get(1)
+ timestamp_ms=early_time).get(timeout)
late_msg = kafka_producer.send(
self.topic, partition=0, value=b"last",
- timestamp_ms=late_time).get(1)
+ timestamp_ms=late_time).get(timeout)
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
@@ -699,12 +700,13 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kafka_producer = self.kafka_producer()
send_time = int(time.time() * 1000)
+ timeout = 10
p0msg = kafka_producer.send(
self.topic, partition=0, value=b"XXX",
- timestamp_ms=send_time).get()
+ timestamp_ms=send_time).get(timeout)
p1msg = kafka_producer.send(
self.topic, partition=1, value=b"XXX",
- timestamp_ms=send_time).get()
+ timestamp_ms=send_time).get(timeout)
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({
diff --git a/test/test_producer.py b/test/test_producer.py
index f7a5b68..80017a1 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -38,12 +38,12 @@ def test_end_to_end(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
- max_block_ms=10000,
+ max_block_ms=30000,
compression_type=compression,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=connect_str,
group_id=None,
- consumer_timeout_ms=10000,
+ consumer_timeout_ms=30000,
auto_offset_reset='earliest',
value_deserializer=bytes.decode)
@@ -87,7 +87,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
- max_block_ms=10000,
+ max_block_ms=30000,
compression_type=compression)
magic = producer._max_usable_produce_magic()
diff --git a/test/testutil.py b/test/testutil.py
index 0bacac4..0ec1cff 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -12,6 +12,7 @@ from six.moves import xrange
from . import unittest
from kafka import SimpleClient
+from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
from kafka.structs import OffsetRequestPayload
__all__ = [
@@ -98,7 +99,16 @@ class KafkaIntegrationTestCase(unittest.TestCase):
if self.create_client:
self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port))
- self.client.ensure_topic_exists(self.topic)
+ timeout = time.time() + 30
+ while time.time() < timeout:
+ try:
+ self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
+ if self.client.has_metadata_for_topic(topic):
+ break
+ except LeaderNotAvailableError:
+ time.sleep(1)
+ else:
+ raise KafkaTimeoutError('Timeout loading topic metadata!')
self._messages = {}