summaryrefslogtreecommitdiff
path: root/kafka/consumer/subscription_state.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-08 15:06:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-08 15:06:37 -0700
commit7b5ade10a5f4197ec19fce5d77484100c6dc1273 (patch)
tree7c8b62b0dc168a45a405bae2288a5fcf0eb72395 /kafka/consumer/subscription_state.py
parent003bb0a8308e749cf0f63cd60bc2c020b2c96083 (diff)
downloadkafka-python-compacted_offsets.tar.gz
Use explicit subscription state flag to handle seek() during message iterationcompacted_offsets
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r--kafka/consumer/subscription_state.py2
1 files changed, 2 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 1c045aa..fa09a06 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -350,6 +350,7 @@ class TopicPartitionState(object):
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
self.highwater = None
+ self.drop_pending_message_set = False
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
@@ -371,6 +372,7 @@ class TopicPartitionState(object):
self.awaiting_reset = False
self.reset_strategy = None
self.has_valid_position = True
+ self.drop_pending_message_set = True
def pause(self):
self.paused = True