From 742af4f7e0bad6159e63ed4b369e34426ab9f670 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 15 Sep 2014 22:36:53 -0700 Subject: Raise KafkaConfigurationError during fetch_messages if not topics/partitions configured --- kafka/consumer/new.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index ad45387..bad1f3d 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -330,8 +330,14 @@ class KafkaConsumer(object): max_wait_time = self._config['fetch_wait_max_ms'] min_bytes = self._config['fetch_min_bytes'] - fetches = [] + # Get current fetch offsets offsets = self._offsets.fetch + if not offsets: + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') + + fetches = [] for topic_partition, offset in offsets.iteritems(): fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) -- cgit v1.2.1