From 7b5ade10a5f4197ec19fce5d77484100c6dc1273 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 8 Jul 2016 15:06:37 -0700 Subject: Use explicit subscription state flag to handle seek() during message iteration --- kafka/consumer/subscription_state.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kafka/consumer/subscription_state.py') diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 1c045aa..fa09a06 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -350,6 +350,7 @@ class TopicPartitionState(object): self.reset_strategy = None # the reset strategy if awaitingReset is set self._position = None # offset exposed to the user self.highwater = None + self.drop_pending_message_set = False def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -371,6 +372,7 @@ class TopicPartitionState(object): self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True + self.drop_pending_message_set = True def pause(self): self.paused = True -- cgit v1.2.1