summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-01-03 15:11:48 -0800
committerJeff Widman <jeff@jeffwidman.com>2019-03-14 09:34:39 -0700
commitb13bcf5c3602e0f4bee866070fc1337c1197bd5f (patch)
treed13c1a6e17bb857481d4bffac6f78a6857a953a9
parent703f06590be2daa7e4592b3d82df6d719a6829bb (diff)
downloadkafka-python-verify-timeouts-set-correctly.tar.gz
Error if connections_max_idle_ms not larger than request_timeout_msverify-timeouts-set-correctly
`connections_max_idle_ms` must always be larger than `request_timeout_ms` to avoid potentially unexpected behavior. Fix #1680.
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--test/test_consumer.py6
2 files changed, 12 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 531c107..f521891 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -313,11 +313,15 @@ class KafkaConsumer(six.Iterator):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config
+ connections_max_idle_ms = self.config['connections_max_idle_ms']
request_timeout_ms = self.config['request_timeout_ms']
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
- if request_timeout_ms <= fetch_max_wait_ms:
- raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
- (request_timeout_ms, fetch_max_wait_ms))
+ if not (fetch_max_wait_ms < request_timeout_ms < connections_max_idle_ms):
+ raise KafkaConfigurationError(
+ "connections_max_idle_ms ({}) must be larger than "
+ "request_timeout_ms ({}) which must be larger than "
+ "fetch_max_wait_ms ({})."
+ .format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms))
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 4ea01c8..0b0d708 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -15,12 +15,16 @@ from kafka.structs import (
class TestKafkaConsumer:
def test_session_timeout_larger_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
- KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)
+ KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
+ def test_connections_max_idle_ms_smaller_than_request_timeout_raises(self):
+ with pytest.raises(KafkaConfigurationError):
+ KafkaConsumer(bootstrap_servers='localhost:9092', connections_max_idle_ms=69000, request_timeout_ms=40000)
+
def test_subscription_copy(self):
consumer = KafkaConsumer('foo', api_version=(0, 10))
sub = consumer.subscription()