diff options
author | Keith So <keithks@gmail.com> | 2019-03-13 21:48:53 +0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-13 06:48:53 -0700 |
commit | 7965460a7253a5f5c23e7343c0c06c40e40f471e (patch) | |
tree | 21071e62b5109824cb066a084c952a101c4054bf | |
parent | 1904b536b0a6fb83e006f3a61b2aa360797cf838 (diff) | |
download | kafka-python-7965460a7253a5f5c23e7343c0c06c40e40f471e.tar.gz |
1701 use last offset from fetch v4 if available (#1724)
-rw-r--r-- | kafka/consumer/fetcher.py | 19 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 5 | ||||
-rw-r--r-- | kafka/record/default_records.py | 4 |
3 files changed, 28 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c1eb03e..36e269f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -439,6 +439,14 @@ class Fetcher(six.Iterator): try: batch = records.next_batch() while batch is not None: + + # LegacyRecordBatch cannot access either base_offset or last_offset_delta + try: + self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ + batch.last_offset_delta + except AttributeError: + pass + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1 @@ -643,6 +651,17 @@ class Fetcher(six.Iterator): for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) + + # advance position for any deleted compacted messages if required + if self._subscriptions.assignment[partition].last_offset_from_message_batch: + next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1 + if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: + log.debug( + "Advance position for partition %s from %s to %s (last message batch location plus one)" + " to correct for deleted compacted messages", + partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) + self._subscriptions.assignment[partition].position = next_offset_from_batch_header + position = self._subscriptions.assignment[partition].position # fetch if there is a leader and no in-flight requests diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 4b0b275..ef50166 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -382,6 +382,9 @@ class TopicPartitionState(object): self._position = None # offset exposed to the user self.highwater = None self.drop_pending_message_set = False + # The last message offset hint available from a message batch with + # magic=2 which includes deleted compacted messages + self.last_offset_from_message_batch = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -396,6 +399,7 @@ class TopicPartitionState(object): self.awaiting_reset = True self.reset_strategy = strategy self._position = None + self.last_offset_from_message_batch = None self.has_valid_position = False def seek(self, offset): @@ -404,6 +408,7 @@ class TopicPartitionState(object): self.reset_strategy = None self.has_valid_position = True self.drop_pending_message_set = True + self.last_offset_from_message_batch = None def pause(self): self.paused = True diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 955e3ee..7f0e2b3 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -141,6 +141,10 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): return self._header_data[5] @property + def last_offset_delta(self): + return self._header_data[6] + + @property def compression_type(self): return self.attributes & self.CODEC_MASK |