summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-12-29 16:06:24 -0800
committerGitHub <noreply@github.com>2019-12-29 16:06:24 -0800
commit2a86b23f477e5ed57aa987db97d11284a37d05a0 (patch)
tree391d4d2d99db38eb2f9869d76592851bbae57cf2
parent1a91a54688cb77fd77c342e719f24f346d5cee89 (diff)
downloadkafka-python-2a86b23f477e5ed57aa987db97d11284a37d05a0.tar.gz
Optionally return OffsetAndMetadata from consumer.committed(tp) (#1979)
-rw-r--r--kafka/consumer/fetcher.py2
-rw-r--r--kafka/consumer/group.py15
-rw-r--r--kafka/consumer/subscription_state.py2
-rw-r--r--kafka/coordinator/consumer.py6
-rw-r--r--test/test_coordinator.py4
-rw-r--r--test/test_fetcher.py4
6 files changed, 20 insertions, 13 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index f9d96b0..5cb25f2 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -185,7 +185,7 @@ class Fetcher(six.Iterator):
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp)
else:
- committed = self._subscriptions.assignment[tp].committed
+ committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index eb7dff2..458e9fd 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -525,7 +525,7 @@ class KafkaConsumer(six.Iterator):
offsets = self._subscription.all_consumed_offsets()
self._coordinator.commit_offsets_sync(offsets)
- def committed(self, partition):
+ def committed(self, partition, metadata=False):
"""Get the last committed offset for the given partition.
This offset will be used as the position for the consumer
@@ -537,9 +537,11 @@ class KafkaConsumer(six.Iterator):
Arguments:
partition (TopicPartition): The partition to check.
+ metadata (bool, optional): If True, return OffsetAndMetadata struct
+ instead of offset int. Default: False.
Returns:
- The last committed offset, or None if there was no prior commit.
+ The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
assert self.config['group_id'] is not None, 'Requires group_id'
@@ -553,10 +555,15 @@ class KafkaConsumer(six.Iterator):
else:
commit_map = self._coordinator.fetch_committed_offsets([partition])
if partition in commit_map:
- committed = commit_map[partition].offset
+ committed = commit_map[partition]
else:
committed = None
- return committed
+
+ if committed is not None:
+ if metadata:
+ return committed
+ else:
+ return committed.offset
def _fetch_all_topic_metadata(self):
"""A blocking call that fetches topic metadata for all topics in the
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 76a6c50..08842d1 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -374,7 +374,7 @@ class SubscriptionState(object):
class TopicPartitionState(object):
def __init__(self):
- self.committed = None # last committed position
+ self.committed = None # last committed OffsetAndMetadata
self.has_valid_position = False # whether we have valid position
self.paused = False # whether this partition has been paused by the user
self.awaiting_reset = False # whether we are awaiting reset
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 30337c3..fda80aa 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -387,7 +387,7 @@ class ConsumerCoordinator(BaseCoordinator):
for partition, offset in six.iteritems(offsets):
# verify assignment is still active
if self._subscription.is_assigned(partition):
- self._subscription.assignment[partition].committed = offset.offset
+ self._subscription.assignment[partition].committed = offset
self._subscription.needs_fetch_committed_offsets = False
def fetch_committed_offsets(self, partitions):
@@ -641,7 +641,7 @@ class ConsumerCoordinator(BaseCoordinator):
log.debug("Group %s committed offset %s for partition %s",
self.group_id, offset, tp)
if self._subscription.is_assigned(tp):
- self._subscription.assignment[tp].committed = offset.offset
+ self._subscription.assignment[tp].committed = offset
elif error_type is Errors.GroupAuthorizationFailedError:
log.error("Not authorized to commit offsets for group %s",
self.group_id)
@@ -704,7 +704,7 @@ class ConsumerCoordinator(BaseCoordinator):
partitions (list of TopicPartition): the partitions to fetch
Returns:
- Future: resolves to dict of offsets: {TopicPartition: int}
+ Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
"""
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 4afdcd9..88ca4c1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -20,7 +20,7 @@ from kafka.protocol.commit import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import WeakMethod
@@ -211,7 +211,7 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator):
assert coordinator._subscription.needs_fetch_committed_offsets is True
coordinator.refresh_committed_offsets_if_needed()
assignment = coordinator._subscription.assignment
- assert assignment[TopicPartition('foobar', 0)].committed == 123
+ assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'')
assert TopicPartition('foobar', 1) not in assignment
assert coordinator._subscription.needs_fetch_committed_offsets is False
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index b61a0f0..697f8be 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -21,7 +21,7 @@ from kafka.errors import (
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
-from kafka.structs import TopicPartition
+from kafka.structs import OffsetAndMetadata, TopicPartition
@pytest.fixture
@@ -124,7 +124,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
fetcher._reset_offset.reset_mock()
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
- fetcher._subscriptions.assignment[partition].committed = 123
+ fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'')
mocker.patch.object(fetcher._subscriptions, 'seek')
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0