diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-15 22:36:53 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:55 -0800 |
commit | 742af4f7e0bad6159e63ed4b369e34426ab9f670 (patch) | |
tree | 4798f62823e6e814eeb7ceabec9a08827bceb97b | |
parent | 07ff623392d1398f801c95d9af3e0a388b049068 (diff) | |
download | kafka-python-742af4f7e0bad6159e63ed4b369e34426ab9f670.tar.gz |
Raise KafkaConfigurationError during fetch_messages if not topics/partitions configured
-rw-r--r-- | kafka/consumer/new.py | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index ad45387..bad1f3d 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -330,8 +330,14 @@ class KafkaConsumer(object): max_wait_time = self._config['fetch_wait_max_ms'] min_bytes = self._config['fetch_min_bytes'] - fetches = [] + # Get current fetch offsets offsets = self._offsets.fetch + if not offsets: + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') + + fetches = [] for topic_partition, offset in offsets.iteritems(): fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) |