summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 22:36:53 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit742af4f7e0bad6159e63ed4b369e34426ab9f670 (patch)
tree4798f62823e6e814eeb7ceabec9a08827bceb97b
parent07ff623392d1398f801c95d9af3e0a388b049068 (diff)
downloadkafka-python-742af4f7e0bad6159e63ed4b369e34426ab9f670.tar.gz
Raise KafkaConfigurationError during fetch_messages if not topics/partitions configured
-rw-r--r--kafka/consumer/new.py8
1 files changed, 7 insertions, 1 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index ad45387..bad1f3d 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -330,8 +330,14 @@ class KafkaConsumer(object):
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']
- fetches = []
+ # Get current fetch offsets
offsets = self._offsets.fetch
+ if not offsets:
+ if not self._topics:
+ raise KafkaConfigurationError('No topics or partitions configured')
+ raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
+
+ fetches = []
for topic_partition, offset in offsets.iteritems():
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))