summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith So <keithks@gmail.com>2019-03-13 21:48:53 +0800
committerDana Powers <dana.powers@gmail.com>2019-03-13 06:48:53 -0700
commit7965460a7253a5f5c23e7343c0c06c40e40f471e (patch)
tree21071e62b5109824cb066a084c952a101c4054bf
parent1904b536b0a6fb83e006f3a61b2aa360797cf838 (diff)
downloadkafka-python-7965460a7253a5f5c23e7343c0c06c40e40f471e.tar.gz
1701 use last offset from fetch v4 if available (#1724)
-rw-r--r--kafka/consumer/fetcher.py19
-rw-r--r--kafka/consumer/subscription_state.py5
-rw-r--r--kafka/record/default_records.py4
3 files changed, 28 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c1eb03e..36e269f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -439,6 +439,14 @@ class Fetcher(six.Iterator):
try:
batch = records.next_batch()
while batch is not None:
+
+ # LegacyRecordBatch cannot access either base_offset or last_offset_delta
+ try:
+ self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
+ batch.last_offset_delta
+ except AttributeError:
+ pass
+
for record in batch:
key_size = len(record.key) if record.key is not None else -1
value_size = len(record.value) if record.value is not None else -1
@@ -643,6 +651,17 @@ class Fetcher(six.Iterator):
for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
+
+ # advance position for any deleted compacted messages if required
+ if self._subscriptions.assignment[partition].last_offset_from_message_batch:
+ next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
+ if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
+ log.debug(
+ "Advance position for partition %s from %s to %s (last message batch location plus one)"
+ " to correct for deleted compacted messages",
+ partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
+ self._subscriptions.assignment[partition].position = next_offset_from_batch_header
+
position = self._subscriptions.assignment[partition].position
# fetch if there is a leader and no in-flight requests
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 4b0b275..ef50166 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -382,6 +382,9 @@ class TopicPartitionState(object):
self._position = None # offset exposed to the user
self.highwater = None
self.drop_pending_message_set = False
+ # The last message offset hint available from a message batch with
+ # magic=2 which includes deleted compacted messages
+ self.last_offset_from_message_batch = None
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
@@ -396,6 +399,7 @@ class TopicPartitionState(object):
self.awaiting_reset = True
self.reset_strategy = strategy
self._position = None
+ self.last_offset_from_message_batch = None
self.has_valid_position = False
def seek(self, offset):
@@ -404,6 +408,7 @@ class TopicPartitionState(object):
self.reset_strategy = None
self.has_valid_position = True
self.drop_pending_message_set = True
+ self.last_offset_from_message_batch = None
def pause(self):
self.paused = True
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 955e3ee..7f0e2b3 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -141,6 +141,10 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
return self._header_data[5]
@property
+ def last_offset_delta(self):
+ return self._header_data[6]
+
+ @property
def compression_type(self):
return self.attributes & self.CODEC_MASK