diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-11 17:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-11 17:11:31 -0700 |
commit | cfddc6bd179e236874e00a899e9349d5c9a54400 (patch) | |
tree | 5b3c851f0d127f53adfb1c58680f6c2e6ff2fa9f | |
parent | f04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff) | |
download | kafka-python-cfddc6bd179e236874e00a899e9349d5c9a54400.tar.gz |
KAFKA-4034: Avoid unnecessary consumer coordinator lookup (#1254)
-rw-r--r-- | kafka/consumer/fetcher.py | 15 | ||||
-rw-r--r-- | kafka/consumer/group.py | 29 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 23 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 28 | ||||
-rw-r--r-- | test/test_coordinator.py | 9 |
5 files changed, 78 insertions, 26 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c4fa546..1800863 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -134,6 +134,18 @@ class Fetcher(six.Iterator): self._clean_done_fetch_futures() return futures + def reset_offsets_if_needed(self, partitions): + """Lookup and set offsets for any partitions which are awaiting an + explicit reset. + + Arguments: + partitions (set of TopicPartitions): the partitions to reset + """ + for tp in partitions: + # TODO: If there are several offsets to reset, we could submit offset requests in parallel + if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): + self._reset_offset(tp) + def _clean_done_fetch_futures(self): while True: if not self._fetch_futures: @@ -168,9 +180,6 @@ class Fetcher(six.Iterator): " update", tp) continue - # TODO: If there are several offsets to reset, - # we could submit offset requests in parallel - # for now, each call to _reset_offset will block if self._subscriptions.is_offset_reset_needed(tp): self._reset_offset(tp) elif self._subscriptions.assignment[tp].committed is None: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a83d5da..cbfd720 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -585,12 +585,11 @@ class KafkaConsumer(six.Iterator): dict: Map of topic to list of records (may be empty). """ if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() self._coordinator.ensure_active_group() # 0.8.2 brokers support kafka-backed offset storage via group coordinator elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -835,6 +834,8 @@ class KafkaConsumer(six.Iterator): Returns: set: {topic, ...} """ + if self._subscription.subscription is None: + return None return self._subscription.subscription.copy() def unsubscribe(self): @@ -988,26 +989,34 @@ class KafkaConsumer(six.Iterator): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ - if (self.config['api_version'] >= (0, 8, 1) and - self.config['group_id'] is not None): + # Lookup any positions for partitions which are awaiting reset (which may be the + # case if the user called seekToBeginning or seekToEnd. We do this check first to + # avoid an unnecessary lookup of committed offsets (which typically occurs when + # the user is manually assigning partitions and managing their own offsets). + self._fetcher.reset_offsets_if_needed(partitions) - # Refresh commits for all assigned partitions - self._coordinator.refresh_committed_offsets_if_needed() + if not self._subscription.has_all_fetch_positions(): + # if we still don't have offsets for all partitions, then we should either seek + # to the last committed position or reset using the auto reset policy + if (self.config['api_version'] >= (0, 8, 1) and + self.config['group_id'] is not None): + # first refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed() - # Then, do any offset lookups in case some positions are not known - self._fetcher.update_fetch_positions(partitions) + # Then, do any offset lookups in case some positions are not known + self._fetcher.update_fetch_positions(partitions) def _message_generator(self): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() self._coordinator.ensure_active_group() # 0.8.2 brokers support kafka-backed offset storage via group coordinator elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index af0936c..53b3e1d 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -88,6 +88,7 @@ class BaseCoordinator(object): self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] self.coordinator_id = None + self._find_coordinator_future = None self.rejoin_needed = True self.rejoining = False self.heartbeat = Heartbeat(**self.config) @@ -195,12 +196,11 @@ class BaseCoordinator(object): return False - def ensure_coordinator_known(self): + def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ while self.coordinator_unknown(): - # Prior to 0.8.2 there was no group coordinator # so we will just pick a node at random and treat # it as the "coordinator" @@ -210,7 +210,7 @@ class BaseCoordinator(object): self._client.ready(self.coordinator_id) continue - future = self._send_group_coordinator_request() + future = self.lookup_coordinator() self._client.poll(future=future) if future.failed(): @@ -224,6 +224,16 @@ class BaseCoordinator(object): else: raise future.exception # pylint: disable-msg=raising-bad-type + def _reset_find_coordinator_future(self, result): + self._find_coordinator_future = None + + def lookup_coordinator(self): + if self._find_coordinator_future is None: + self._find_coordinator_future = self._send_group_coordinator_request() + + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return self._find_coordinator_future + def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) @@ -234,6 +244,11 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" + # always ensure that the coordinator is ready because we may have been + # disconnected when sending heartbeats and does not necessarily require + # us to rejoin the group. + self.ensure_coordinator_ready() + if not self.need_rejoin(): return @@ -242,7 +257,7 @@ class BaseCoordinator(object): self.rejoining = True while self.need_rejoin(): - self.ensure_coordinator_known() + self.ensure_coordinator_ready() # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 84c62df..0328837 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -315,7 +315,7 @@ class ConsumerCoordinator(BaseCoordinator): return {} while True: - self.ensure_coordinator_known() + self.ensure_coordinator_ready() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) @@ -353,9 +353,29 @@ class ConsumerCoordinator(BaseCoordinator): response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes. - Returns: - Future: indicating whether the commit was successful or not """ + if not self.coordinator_unknown(): + self._do_commit_offsets_async(offsets, callback) + else: + # we don't know the current coordinator, so try to find it and then + # send the commit or fail (we don't want recursive retries which can + # cause offset commits to arrive out of order). Note that there may + # be multiple offset commits chained to the same coordinator lookup + # request. This is fine because the listeners will be invoked in the + # same order that they were added. Note also that BaseCoordinator + # prevents multiple concurrent coordinator lookup requests. + future = self.lookup_coordinator() + future.add_callback(self._do_commit_offsets_async, offsets, callback) + if callback: + future.add_errback(callback) + + # ensure the commit has a chance to be transmitted (without blocking on + # its completion). Note that commits are treated as heartbeats by the + # coordinator, so there is no need to explicitly allow heartbeats + # through delayed task execution. + self._client.poll() # no wakeup if we add that feature + + def _do_commit_offsets_async(self, offsets, callback=None): assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -386,7 +406,7 @@ class ConsumerCoordinator(BaseCoordinator): return while True: - self.ensure_coordinator_known() + self.ensure_coordinator_ready() future = self._send_offset_commit_request(offsets) self._client.poll(future=future) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4115c03..aea2662 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator): assert coordinator._client.poll.call_count == 0 # general case -- send offset fetch request, get successful future - mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_fetch_request', return_value=Future().success('foobar')) partitions = [TopicPartition('foobar', 0)] @@ -295,16 +295,15 @@ def offsets(): def test_commit_offsets_async(mocker, coordinator, offsets): mocker.patch.object(coordinator._client, 'poll') - mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) - ret = coordinator.commit_offsets_async(offsets) - assert isinstance(ret, Future) + coordinator.commit_offsets_async(offsets) assert coordinator._send_offset_commit_request.call_count == 1 def test_commit_offsets_sync(mocker, coordinator, offsets): - mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) cli = coordinator._client |