diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-08 16:01:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-08 16:01:08 -0700 |
commit | 77971ac093add6d5b292e5f91bcefd3f8df56102 (patch) | |
tree | 7c8b62b0dc168a45a405bae2288a5fcf0eb72395 | |
parent | 58991c5fa24076a644a9e682cb865b48b8a736ba (diff) | |
parent | 7b5ade10a5f4197ec19fce5d77484100c6dc1273 (diff) | |
download | kafka-python-77971ac093add6d5b292e5f91bcefd3f8df56102.tar.gz |
Merge pull request #752 from dpkp/compacted_offsets
Fix consumer iteration on compacted topics
-rw-r--r-- | kafka/consumer/fetcher.py | 18 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 2 |
2 files changed, 18 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 9c06aba..5f3eb1d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -424,6 +424,12 @@ class Fetcher(six.Iterator): elif fetch_offset == position: log.log(0, "Returning fetched records at offset %d for assigned" " partition %s", position, tp) + + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False + for msg in self._unpack_message_set(tp, messages): # Because we are in a generator, it is possible for @@ -436,9 +442,17 @@ class Fetcher(six.Iterator): " since it is no longer fetchable", tp) break + # If there is a seek during message iteration, + # we should stop unpacking this message set and + # wait for a new fetch response that aligns with the + # new seek position + elif self._subscriptions.assignment[tp].drop_pending_message_set: + log.debug("Skipping remainder of message set for partition %s", tp) + self._subscriptions.assignment[tp].drop_pending_message_set = False + break + # Compressed messagesets may include earlier messages - # It is also possible that the user called seek() - elif msg.offset != self._subscriptions.assignment[tp].position: + elif msg.offset < self._subscriptions.assignment[tp].position: log.debug("Skipping message offset: %s (expecting %s)", msg.offset, self._subscriptions.assignment[tp].position) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 1c045aa..fa09a06 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -350,6 +350,7 @@ class TopicPartitionState(object): self.reset_strategy = None # the reset strategy if awaitingReset is set self._position = None # offset exposed to the user self.highwater = None + self.drop_pending_message_set = False def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -371,6 +372,7 @@ class TopicPartitionState(object): self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True + self.drop_pending_message_set = True def pause(self): self.paused = True |