summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-26 15:03:10 -0800
committerGitHub <noreply@github.com>2018-02-26 15:03:10 -0800
commit4cadaafb24c2bdad475a68e3df5a4e19ce043ce7 (patch)
tree11382f60a18ec2a41e1f59af18a82b239e6c21d7
parente66d8c42c9ebec612093b96950df81b7355e4aab (diff)
downloadkafka-python-4cadaafb24c2bdad475a68e3df5a4e19ce043ce7.tar.gz
Fix KafkaConsumer compacted offset handling (#1397)
-rw-r--r--kafka/consumer/fetcher.py17
-rw-r--r--test/test_fetcher.py21
2 files changed, 28 insertions, 10 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c9bbb97..4f2a543 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -372,11 +372,6 @@ class Fetcher(six.Iterator):
tp, next_offset)
for record in part_records:
- # Fetched compressed messages may include additional records
- if record.offset < fetch_offset:
- log.debug("Skipping message offset: %s (expecting %s)",
- record.offset, fetch_offset)
- continue
drained[tp].append(record)
self._subscriptions.assignment[tp].position = next_offset
@@ -843,10 +838,15 @@ class Fetcher(six.Iterator):
# When fetching an offset that is in the middle of a
# compressed batch, we will get all messages in the batch.
# But we want to start 'take' at the fetch_offset
+ # (or the next highest offset in case the message was compacted)
for i, msg in enumerate(messages):
- if msg.offset == fetch_offset:
+ if msg.offset < fetch_offset:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ msg.offset, fetch_offset)
+ else:
self.message_idx = i
break
+
else:
self.message_idx = 0
self.messages = None
@@ -868,8 +868,9 @@ class Fetcher(six.Iterator):
next_idx = self.message_idx + n
res = self.messages[self.message_idx:next_idx]
self.message_idx = next_idx
- if len(self) > 0:
- self.fetch_offset = self.messages[self.message_idx].offset
+ # fetch_offset should be incremented by 1 to parallel the
+ # subscription position (also incremented by 1)
+ self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1)
return res
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 4547222..fc031f7 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -514,8 +514,8 @@ def test_partition_records_offset():
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) > 0
msgs = records.take(1)
- assert msgs[0].offset == 123
- assert records.fetch_offset == 124
+ assert msgs[0].offset == fetch_offset
+ assert records.fetch_offset == fetch_offset + 1
msgs = records.take(2)
assert len(msgs) == 2
assert len(records) > 0
@@ -538,3 +538,20 @@ def test_partition_records_no_fetch_offset():
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) == 0
+
+
+def test_partition_records_compacted_offset():
+ """Test that messagesets are handle correctly
+ when the fetch offset points to a message that has been compacted
+ """
+ batch_start = 0
+ batch_end = 100
+ fetch_offset = 42
+ tp = TopicPartition('foo', 0)
+ messages = [ConsumerRecord(tp.topic, tp.partition, i,
+ None, None, 'key', 'value', 'checksum', 0, 0)
+ for i in range(batch_start, batch_end) if i != fetch_offset]
+ records = Fetcher.PartitionRecords(fetch_offset, None, messages)
+ assert len(records) == batch_end - fetch_offset - 1
+ msgs = records.take(1)
+ assert msgs[0].offset == fetch_offset + 1