summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-11 17:11:31 -0700
committerGitHub <noreply@github.com>2017-10-11 17:11:31 -0700
commitcfddc6bd179e236874e00a899e9349d5c9a54400 (patch)
tree5b3c851f0d127f53adfb1c58680f6c2e6ff2fa9f
parentf04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff)
downloadkafka-python-cfddc6bd179e236874e00a899e9349d5c9a54400.tar.gz
KAFKA-4034: Avoid unnecessary consumer coordinator lookup (#1254)
-rw-r--r--kafka/consumer/fetcher.py15
-rw-r--r--kafka/consumer/group.py29
-rw-r--r--kafka/coordinator/base.py23
-rw-r--r--kafka/coordinator/consumer.py28
-rw-r--r--test/test_coordinator.py9
5 files changed, 78 insertions, 26 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c4fa546..1800863 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -134,6 +134,18 @@ class Fetcher(six.Iterator):
self._clean_done_fetch_futures()
return futures
+ def reset_offsets_if_needed(self, partitions):
+ """Lookup and set offsets for any partitions which are awaiting an
+ explicit reset.
+
+ Arguments:
+ partitions (set of TopicPartitions): the partitions to reset
+ """
+ for tp in partitions:
+ # TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
+ self._reset_offset(tp)
+
def _clean_done_fetch_futures(self):
while True:
if not self._fetch_futures:
@@ -168,9 +180,6 @@ class Fetcher(six.Iterator):
" update", tp)
continue
- # TODO: If there are several offsets to reset,
- # we could submit offset requests in parallel
- # for now, each call to _reset_offset will block
if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
elif self._subscriptions.assignment[tp].committed is None:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a83d5da..cbfd720 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -585,12 +585,11 @@ class KafkaConsumer(six.Iterator):
dict: Map of topic to list of records (may be empty).
"""
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -835,6 +834,8 @@ class KafkaConsumer(six.Iterator):
Returns:
set: {topic, ...}
"""
+ if self._subscription.subscription is None:
+ return None
return self._subscription.subscription.copy()
def unsubscribe(self):
@@ -988,26 +989,34 @@ class KafkaConsumer(six.Iterator):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
- if (self.config['api_version'] >= (0, 8, 1) and
- self.config['group_id'] is not None):
+ # Lookup any positions for partitions which are awaiting reset (which may be the
+ # case if the user called seekToBeginning or seekToEnd. We do this check first to
+ # avoid an unnecessary lookup of committed offsets (which typically occurs when
+ # the user is manually assigning partitions and managing their own offsets).
+ self._fetcher.reset_offsets_if_needed(partitions)
- # Refresh commits for all assigned partitions
- self._coordinator.refresh_committed_offsets_if_needed()
+ if not self._subscription.has_all_fetch_positions():
+ # if we still don't have offsets for all partitions, then we should either seek
+ # to the last committed position or reset using the auto reset policy
+ if (self.config['api_version'] >= (0, 8, 1) and
+ self.config['group_id'] is not None):
+ # first refresh commits for all assigned partitions
+ self._coordinator.refresh_committed_offsets_if_needed()
- # Then, do any offset lookups in case some positions are not known
- self._fetcher.update_fetch_positions(partitions)
+ # Then, do any offset lookups in case some positions are not known
+ self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index af0936c..53b3e1d 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -88,6 +88,7 @@ class BaseCoordinator(object):
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
self.coordinator_id = None
+ self._find_coordinator_future = None
self.rejoin_needed = True
self.rejoining = False
self.heartbeat = Heartbeat(**self.config)
@@ -195,12 +196,11 @@ class BaseCoordinator(object):
return False
- def ensure_coordinator_known(self):
+ def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
while self.coordinator_unknown():
-
# Prior to 0.8.2 there was no group coordinator
# so we will just pick a node at random and treat
# it as the "coordinator"
@@ -210,7 +210,7 @@ class BaseCoordinator(object):
self._client.ready(self.coordinator_id)
continue
- future = self._send_group_coordinator_request()
+ future = self.lookup_coordinator()
self._client.poll(future=future)
if future.failed():
@@ -224,6 +224,16 @@ class BaseCoordinator(object):
else:
raise future.exception # pylint: disable-msg=raising-bad-type
+ def _reset_find_coordinator_future(self, result):
+ self._find_coordinator_future = None
+
+ def lookup_coordinator(self):
+ if self._find_coordinator_future is None:
+ self._find_coordinator_future = self._send_group_coordinator_request()
+
+ self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
+ return self._find_coordinator_future
+
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -234,6 +244,11 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
+ # always ensure that the coordinator is ready because we may have been
+ # disconnected when sending heartbeats and does not necessarily require
+ # us to rejoin the group.
+ self.ensure_coordinator_ready()
+
if not self.need_rejoin():
return
@@ -242,7 +257,7 @@ class BaseCoordinator(object):
self.rejoining = True
while self.need_rejoin():
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 84c62df..0328837 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -315,7 +315,7 @@ class ConsumerCoordinator(BaseCoordinator):
return {}
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
@@ -353,9 +353,29 @@ class ConsumerCoordinator(BaseCoordinator):
response will be either an Exception or a OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
- Returns:
- Future: indicating whether the commit was successful or not
"""
+ if not self.coordinator_unknown():
+ self._do_commit_offsets_async(offsets, callback)
+ else:
+ # we don't know the current coordinator, so try to find it and then
+ # send the commit or fail (we don't want recursive retries which can
+ # cause offset commits to arrive out of order). Note that there may
+ # be multiple offset commits chained to the same coordinator lookup
+ # request. This is fine because the listeners will be invoked in the
+ # same order that they were added. Note also that BaseCoordinator
+ # prevents multiple concurrent coordinator lookup requests.
+ future = self.lookup_coordinator()
+ future.add_callback(self._do_commit_offsets_async, offsets, callback)
+ if callback:
+ future.add_errback(callback)
+
+ # ensure the commit has a chance to be transmitted (without blocking on
+ # its completion). Note that commits are treated as heartbeats by the
+ # coordinator, so there is no need to explicitly allow heartbeats
+ # through delayed task execution.
+ self._client.poll() # no wakeup if we add that feature
+
+ def _do_commit_offsets_async(self, offsets, callback=None):
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
@@ -386,7 +406,7 @@ class ConsumerCoordinator(BaseCoordinator):
return
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 4115c03..aea2662 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator):
assert coordinator._client.poll.call_count == 0
# general case -- send offset fetch request, get successful future
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_fetch_request',
return_value=Future().success('foobar'))
partitions = [TopicPartition('foobar', 0)]
@@ -295,16 +295,15 @@ def offsets():
def test_commit_offsets_async(mocker, coordinator, offsets):
mocker.patch.object(coordinator._client, 'poll')
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
- ret = coordinator.commit_offsets_async(offsets)
- assert isinstance(ret, Future)
+ coordinator.commit_offsets_async(offsets)
assert coordinator._send_offset_commit_request.call_count == 1
def test_commit_offsets_sync(mocker, coordinator, offsets):
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
cli = coordinator._client