summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHarel Ben-Attia <harelba@gmail.com>2017-02-28 20:13:28 +0200
committerDana Powers <dana.powers@gmail.com>2017-02-28 10:13:28 -0800
commit432f00eb669550c75fa75e8efa56d5d80cda18a5 (patch)
treef52c04dc061b9d70d8fe435483391bbebbd4a2e3
parentbcb4009b935fb74e3ca71206466c68ad74bc7b3c (diff)
downloadkafka-python-432f00eb669550c75fa75e8efa56d5d80cda18a5.tar.gz
Fail-fast on timeout constraint violations during KafkaConsumer creation (#986)
-rw-r--r--kafka/consumer/group.py13
-rw-r--r--test/test_consumer.py8
2 files changed, 21 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 47c721f..a300c83 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -6,6 +6,8 @@ import socket
import sys
import time
+from kafka.errors import KafkaConfigurationError
+
from kafka.vendor import six
from kafka.client_async import KafkaClient, selectors
@@ -267,6 +269,17 @@ class KafkaConsumer(six.Iterator):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config
+ request_timeout_ms = self.config['request_timeout_ms']
+ session_timeout_ms = self.config['session_timeout_ms']
+ fetch_max_wait_ms = self.config['fetch_max_wait_ms']
+ if request_timeout_ms <= session_timeout_ms:
+ raise KafkaConfigurationError(
+ "Request timeout (%s) must be larger than session timeout (%s)" %
+ (request_timeout_ms, session_timeout_ms))
+ if request_timeout_ms <= fetch_max_wait_ms:
+ raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
+ (request_timeout_ms, fetch_max_wait_ms))
+
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
diff --git a/test/test_consumer.py b/test/test_consumer.py
index f3dad16..073a3af 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -16,6 +16,14 @@ class TestKafkaConsumer(unittest.TestCase):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+ def test_session_timeout_larger_than_request_timeout_raises(self):
+ with self.assertRaises(KafkaConfigurationError):
+ KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)
+
+ def test_fetch_max_wait_larger_than_request_timeout_raises(self):
+ with self.assertRaises(KafkaConfigurationError):
+ KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
+
class TestMultiProcessConsumer(unittest.TestCase):
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')