summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-08 16:01:08 -0700
committerGitHub <noreply@github.com>2016-07-08 16:01:08 -0700
commit77971ac093add6d5b292e5f91bcefd3f8df56102 (patch)
tree7c8b62b0dc168a45a405bae2288a5fcf0eb72395
parent58991c5fa24076a644a9e682cb865b48b8a736ba (diff)
parent7b5ade10a5f4197ec19fce5d77484100c6dc1273 (diff)
downloadkafka-python-77971ac093add6d5b292e5f91bcefd3f8df56102.tar.gz
Merge pull request #752 from dpkp/compacted_offsets
Fix consumer iteration on compacted topics
-rw-r--r--kafka/consumer/fetcher.py18
-rw-r--r--kafka/consumer/subscription_state.py2
2 files changed, 18 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 9c06aba..5f3eb1d 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -424,6 +424,12 @@ class Fetcher(six.Iterator):
elif fetch_offset == position:
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s", position, tp)
+
+ # We can ignore any prior signal to drop pending message sets
+ # because we are starting from a fresh one where fetch_offset == position
+ # i.e., the user seek()'d to this position
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+
for msg in self._unpack_message_set(tp, messages):
# Because we are in a generator, it is possible for
@@ -436,9 +442,17 @@ class Fetcher(six.Iterator):
" since it is no longer fetchable", tp)
break
+ # If there is a seek during message iteration,
+ # we should stop unpacking this message set and
+ # wait for a new fetch response that aligns with the
+ # new seek position
+ elif self._subscriptions.assignment[tp].drop_pending_message_set:
+ log.debug("Skipping remainder of message set for partition %s", tp)
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+ break
+
# Compressed messagesets may include earlier messages
- # It is also possible that the user called seek()
- elif msg.offset != self._subscriptions.assignment[tp].position:
+ elif msg.offset < self._subscriptions.assignment[tp].position:
log.debug("Skipping message offset: %s (expecting %s)",
msg.offset,
self._subscriptions.assignment[tp].position)
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 1c045aa..fa09a06 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -350,6 +350,7 @@ class TopicPartitionState(object):
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
self.highwater = None
+ self.drop_pending_message_set = False
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
@@ -371,6 +372,7 @@ class TopicPartitionState(object):
self.awaiting_reset = False
self.reset_strategy = None
self.has_valid_position = True
+ self.drop_pending_message_set = True
def pause(self):
self.paused = True