diff options
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | test/test_consumer.py | 6 |
2 files changed, 12 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 531c107..f521891 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -313,11 +313,15 @@ class KafkaConsumer(six.Iterator): new_config, self.config['auto_offset_reset']) self.config['auto_offset_reset'] = new_config + connections_max_idle_ms = self.config['connections_max_idle_ms'] request_timeout_ms = self.config['request_timeout_ms'] fetch_max_wait_ms = self.config['fetch_max_wait_ms'] - if request_timeout_ms <= fetch_max_wait_ms: - raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" % - (request_timeout_ms, fetch_max_wait_ms)) + if not (fetch_max_wait_ms < request_timeout_ms < connections_max_idle_ms): + raise KafkaConfigurationError( + "connections_max_idle_ms ({}) must be larger than " + "request_timeout_ms ({}) which must be larger than " + "fetch_max_wait_ms ({})." + .format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms)) metrics_tags = {'client-id': self.config['client_id']} metric_config = MetricConfig(samples=self.config['metrics_num_samples'], diff --git a/test/test_consumer.py b/test/test_consumer.py index 4ea01c8..0b0d708 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -15,12 +15,16 @@ from kafka.structs import ( class TestKafkaConsumer: def test_session_timeout_larger_than_request_timeout_raises(self): with pytest.raises(KafkaConfigurationError): - KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000) + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000) def test_fetch_max_wait_larger_than_request_timeout_raises(self): with pytest.raises(KafkaConfigurationError): KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000) + def test_connections_max_idle_ms_smaller_than_request_timeout_raises(self): + with pytest.raises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', connections_max_idle_ms=69000, request_timeout_ms=40000) + def test_subscription_copy(self): consumer = KafkaConsumer('foo', api_version=(0, 10)) sub = consumer.subscription() |