summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-02 16:36:30 -0800
committerGitHub <noreply@github.com>2018-02-02 16:36:30 -0800
commit618c5051493693c1305aa9f08e8a0583d5fcf0e3 (patch)
tree3a2fcec8260915a83f19a603671c4a0e5461cca0
parent08a7fb7b754a754c6c64e96d4ba5c4f56cf38a5f (diff)
downloadkafka-python-618c5051493693c1305aa9f08e8a0583d5fcf0e3.tar.gz
KAFKA-3949: Avoid race condition when subscription changes during rebalance (#1364)
-rw-r--r--kafka/cluster.py7
-rw-r--r--kafka/consumer/fetcher.py6
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/consumer/subscription_state.py30
-rw-r--r--kafka/coordinator/base.py24
-rw-r--r--kafka/coordinator/consumer.py102
-rw-r--r--test/test_coordinator.py58
7 files changed, 128 insertions, 109 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index d646fdf..1ab4218 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -291,6 +291,13 @@ class ClusterMetadata(object):
for listener in self._listeners:
listener(self)
+ if self.need_all_topic_metadata:
+ # the listener may change the interested topics,
+ # which could cause another metadata refresh.
+ # If we have already fetched all topics, however,
+ # another fetch should be unnecessary.
+ self._need_update = False
+
def add_listener(self, listener):
"""Add a callback function to be called on each metadata update"""
self._listeners.add(listener)
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index afb8f52..f9fcb37 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -326,9 +326,6 @@ class Fetcher(six.Iterator):
max_records = self.config['max_poll_records']
assert max_records > 0
- if self._subscriptions.needs_partition_assignment:
- return {}, False
-
drained = collections.defaultdict(list)
records_remaining = max_records
@@ -397,9 +394,6 @@ class Fetcher(six.Iterator):
def _message_generator(self):
"""Iterate over fetched_records"""
- if self._subscriptions.needs_partition_assignment:
- raise StopIteration('Subscription needs partition assignment')
-
while self._next_partition_records or self._completed_fetches:
if not self._next_partition_records:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 0224d16..1c1f1e8 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -644,6 +644,11 @@ class KafkaConsumer(six.Iterator):
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
self._client.poll(timeout_ms=timeout_ms)
+ # after the long poll, we should check whether the group needs to rebalance
+ # prior to returning data so that the group can stabilize faster
+ if self._coordinator.need_rejoin():
+ return {}
+
records, _ = self._fetcher.fetched_records(max_records)
return records
@@ -1055,6 +1060,11 @@ class KafkaConsumer(six.Iterator):
poll_ms = 0
self._client.poll(timeout_ms=poll_ms)
+ # after the long poll, we should check whether the group needs to rebalance
+ # prior to returning data so that the group can stabilize faster
+ if self._coordinator.need_rejoin():
+ continue
+
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 3d4dfef..10d722e 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -68,7 +68,6 @@ class SubscriptionState(object):
self._group_subscription = set()
self._user_assignment = set()
self.assignment = dict()
- self.needs_partition_assignment = False
self.listener = None
# initialize to true for the consumers to fetch offset upon starting up
@@ -172,7 +171,6 @@ class SubscriptionState(object):
log.info('Updating subscribed topics to: %s', topics)
self.subscription = set(topics)
self._group_subscription.update(topics)
- self.needs_partition_assignment = True
# Remove any assigned partitions which are no longer subscribed to
for tp in set(self.assignment.keys()):
@@ -192,12 +190,12 @@ class SubscriptionState(object):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
self._group_subscription.update(topics)
- def mark_for_reassignment(self):
+ def reset_group_subscription(self):
+ """Reset the group's subscription to only contain topics subscribed by this consumer."""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
assert self.subscription is not None, 'Subscription required'
self._group_subscription.intersection_update(self.subscription)
- self.needs_partition_assignment = True
def assign_from_user(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
@@ -220,18 +218,17 @@ class SubscriptionState(object):
if self.subscription is not None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
- self._user_assignment.clear()
- self._user_assignment.update(partitions)
+ if self._user_assignment != set(partitions):
+ self._user_assignment = set(partitions)
- for partition in partitions:
- if partition not in self.assignment:
- self._add_assigned_partition(partition)
+ for partition in partitions:
+ if partition not in self.assignment:
+ self._add_assigned_partition(partition)
- for tp in set(self.assignment.keys()) - self._user_assignment:
- del self.assignment[tp]
+ for tp in set(self.assignment.keys()) - self._user_assignment:
+ del self.assignment[tp]
- self.needs_partition_assignment = False
- self.needs_fetch_committed_offsets = True
+ self.needs_fetch_committed_offsets = True
def assign_from_subscribed(self, assignments):
"""Update the assignment to the specified partitions
@@ -245,16 +242,18 @@ class SubscriptionState(object):
assignments (list of TopicPartition): partitions to assign to this
consumer instance.
"""
- if self.subscription is None:
+ if not self.partitions_auto_assigned():
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
for tp in assignments:
if tp.topic not in self.subscription:
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
+
+ # after rebalancing, we always reinitialize the assignment state
self.assignment.clear()
for tp in assignments:
self._add_assigned_partition(tp)
- self.needs_partition_assignment = False
+ self.needs_fetch_committed_offsets = True
log.info("Updated partition assignment: %s", assignments)
def unsubscribe(self):
@@ -262,7 +261,6 @@ class SubscriptionState(object):
self.subscription = None
self._user_assignment.clear()
self.assignment.clear()
- self.needs_partition_assignment = True
self.subscribed_pattern = None
def group_subscription(self):
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 301c06d..820fc1f 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -344,23 +344,25 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
with self._lock:
- if not self.need_rejoin():
- return
-
- # call on_join_prepare if needed. We set a flag to make sure that
- # we do not call it a second time if the client is woken up before
- # a pending rebalance completes.
- if not self.rejoining:
- self._on_join_prepare(self._generation.generation_id,
- self._generation.member_id)
- self.rejoining = True
-
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
while self.need_rejoin():
self.ensure_coordinator_ready()
+ # call on_join_prepare if needed. We set a flag
+ # to make sure that we do not call it a second
+ # time if the client is woken up before a pending
+ # rebalance completes. This must be called on each
+ # iteration of the loop because an event requiring
+ # a rebalance (such as a metadata refresh which
+ # changes the matched subscription set) can occur
+ # while another rebalance is still in progress.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
# JoinGroup request.
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index ab30883..9438a7e 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -84,6 +84,8 @@ class ConsumerCoordinator(BaseCoordinator):
self.config[key] = configs[key]
self._subscription = subscription
+ self._is_leader = False
+ self._joined_subscription = set()
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
@@ -132,11 +134,22 @@ class ConsumerCoordinator(BaseCoordinator):
def group_protocols(self):
"""Returns list of preferred (protocols, metadata)"""
- topics = self._subscription.subscription
- assert topics is not None, 'Consumer has not subscribed to topics'
+ if self._subscription.subscription is None:
+ raise Errors.IllegalStateError('Consumer has not subscribed to topics')
+ # dpkp note: I really dislike this.
+ # why? because we are using this strange method group_protocols,
+ # which is seemingly innocuous, to set internal state (_joined_subscription)
+ # that is later used to check whether metadata has changed since we joined a group
+ # but there is no guarantee that this method, group_protocols, will get called
+ # in the correct sequence or that it will only be called when we want it to be.
+ # So this really should be moved elsewhere, but I don't have the energy to
+ # work that out right now. If you read this at some later date after the mutable
+ # state has bitten you... I'm sorry! It mimics the java client, and that's the
+ # best I've got for now.
+ self._joined_subscription = set(self._subscription.subscription)
metadata_list = []
for assignor in self.config['assignors']:
- metadata = assignor.metadata(topics)
+ metadata = assignor.metadata(self._joined_subscription)
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
return metadata_list
@@ -158,21 +171,29 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger
# a rebalance
- if self._subscription_metadata_changed(cluster):
-
- if (self.config['api_version'] >= (0, 9)
- and self.config['group_id'] is not None):
-
- self._subscription.mark_for_reassignment()
-
- # If we haven't got group coordinator support,
- # just assign all partitions locally
- else:
- self._subscription.assign_from_subscribed([
- TopicPartition(topic, partition)
- for topic in self._subscription.subscription
- for partition in self._metadata_snapshot[topic]
- ])
+ if self._subscription.partitions_auto_assigned():
+ metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
+ if self._metadata_snapshot != metadata_snapshot:
+ self._metadata_snapshot = metadata_snapshot
+
+ # If we haven't got group coordinator support,
+ # just assign all partitions locally
+ if self._auto_assign_all_partitions():
+ self._subscription.assign_from_subscribed([
+ TopicPartition(topic, partition)
+ for topic in self._subscription.subscription
+ for partition in self._metadata_snapshot[topic]
+ ])
+
+ def _auto_assign_all_partitions(self):
+ # For users that use "subscribe" without group support,
+ # we will simply assign all partitions to this consumer
+ if self.config['api_version'] < (0, 9):
+ return True
+ elif self.config['group_id'] is None:
+ return True
+ else:
+ return False
def _build_metadata_snapshot(self, subscription, cluster):
metadata_snapshot = {}
@@ -181,16 +202,6 @@ class ConsumerCoordinator(BaseCoordinator):
metadata_snapshot[topic] = set(partitions)
return metadata_snapshot
- def _subscription_metadata_changed(self, cluster):
- if not self._subscription.partitions_auto_assigned():
- return False
-
- metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
- if self._metadata_snapshot != metadata_snapshot:
- self._metadata_snapshot = metadata_snapshot
- return True
- return False
-
def _lookup_assignor(self, name):
for assignor in self.config['assignors']:
if assignor.name == name:
@@ -199,12 +210,10 @@ class ConsumerCoordinator(BaseCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
- # if we were the assignor, then we need to make sure that there have
- # been no metadata updates since the rebalance begin. Otherwise, we
- # won't rebalance again until the next metadata change
- if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot:
- self._subscription.mark_for_reassignment()
- return
+ # only the leader is responsible for monitoring for metadata changes
+ # (i.e. partition changes)
+ if not self._is_leader:
+ self._assignment_snapshot = None
assignor = self._lookup_assignor(protocol)
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
@@ -307,6 +316,7 @@ class ConsumerCoordinator(BaseCoordinator):
# keep track of the metadata used for assignment so that we can check
# after rebalance completion whether anything has changed
self._cluster.request_update()
+ self._is_leader = True
self._assignment_snapshot = self._metadata_snapshot
log.debug("Performing assignment for group %s using strategy %s"
@@ -338,8 +348,8 @@ class ConsumerCoordinator(BaseCoordinator):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)
- self._assignment_snapshot = None
- self._subscription.mark_for_reassignment()
+ self._is_leader = False
+ self._subscription.reset_group_subscription()
def need_rejoin(self):
"""Check whether the group should be rejoined
@@ -347,9 +357,23 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
bool: True if consumer should rejoin group, False otherwise
"""
- return (self._subscription.partitions_auto_assigned() and
- (super(ConsumerCoordinator, self).need_rejoin() or
- self._subscription.needs_partition_assignment))
+ if not self._subscription.partitions_auto_assigned():
+ return False
+
+ if self._auto_assign_all_partitions():
+ return False
+
+ # we need to rejoin if we performed the assignment and metadata has changed
+ if (self._assignment_snapshot is not None
+ and self._assignment_snapshot != self._metadata_snapshot):
+ return True
+
+ # we need to join if our subscription has changed since the last join
+ if (self._joined_subscription is not None
+ and self._joined_subscription != self._subscription.subscription):
+ return True
+
+ return super(ConsumerCoordinator, self).need_rejoin()
def refresh_committed_offsets_if_needed(self):
"""Fetch committed offsets for assigned partitions."""
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index e094b9c..7a2627e 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -62,7 +62,7 @@ def test_group_protocols(coordinator):
# Requires a subscription
try:
coordinator.group_protocols()
- except AssertionError:
+ except Errors.IllegalStateError:
pass
else:
assert False, 'Exception not raised when expected'
@@ -85,8 +85,7 @@ def test_pattern_subscription(coordinator, api_version):
coordinator.config['api_version'] = api_version
coordinator._subscription.subscribe(pattern='foo')
assert coordinator._subscription.subscription == set([])
- assert coordinator._subscription_metadata_changed({}) is False
- assert coordinator._subscription.needs_partition_assignment is False
+ assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {})
cluster = coordinator._client.cluster
cluster.update_metadata(MetadataResponse[0](
@@ -100,12 +99,10 @@ def test_pattern_subscription(coordinator, api_version):
# 0.9 consumers should trigger dynamic partition assignment
if api_version >= (0, 9):
- assert coordinator._subscription.needs_partition_assignment is True
assert coordinator._subscription.assignment == {}
# earlier consumers get all partitions assigned locally
else:
- assert coordinator._subscription.needs_partition_assignment is False
assert set(coordinator._subscription.assignment.keys()) == set([
TopicPartition('foo1', 0),
TopicPartition('foo2', 0)])
@@ -195,7 +192,6 @@ def test_perform_assignment(mocker, coordinator):
def test_on_join_prepare(coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator._on_join_prepare(0, 'member-foo')
- assert coordinator._subscription.needs_partition_assignment is True
def test_need_rejoin(coordinator):
@@ -205,13 +201,6 @@ def test_need_rejoin(coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
assert coordinator.need_rejoin() is True
- coordinator._subscription.needs_partition_assignment = False
- coordinator.rejoin_needed = False
- assert coordinator.need_rejoin() is False
-
- coordinator._subscription.needs_partition_assignment = True
- assert coordinator.need_rejoin() is True
-
def test_refresh_committed_offsets_if_needed(mocker, coordinator):
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
@@ -388,7 +377,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
@pytest.fixture
def patched_coord(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
- coordinator._subscription.needs_partition_assignment = False
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
mocker.patch.object(coordinator, 'coordinator', return_value=0)
@@ -461,47 +449,39 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
offsets, future, mocker.ANY, response)
-@pytest.mark.parametrize('response,error,dead,reassign', [
+@pytest.mark.parametrize('response,error,dead', [
(OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]),
- Errors.GroupAuthorizationFailedError, False, False),
+ Errors.GroupAuthorizationFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]),
- Errors.OffsetMetadataTooLargeError, False, False),
+ Errors.OffsetMetadataTooLargeError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
- Errors.InvalidCommitOffsetSizeError, False, False),
+ Errors.InvalidCommitOffsetSizeError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
- Errors.GroupLoadInProgressError, False, False),
+ Errors.GroupLoadInProgressError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
- Errors.GroupCoordinatorNotAvailableError, True, False),
+ Errors.GroupCoordinatorNotAvailableError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
- Errors.NotCoordinatorForGroupError, True, False),
+ Errors.NotCoordinatorForGroupError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
- Errors.RequestTimedOutError, True, False),
+ Errors.RequestTimedOutError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
- Errors.CommitFailedError, False, True),
+ Errors.CommitFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]),
- Errors.CommitFailedError, False, True),
+ Errors.CommitFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]),
- Errors.CommitFailedError, False, True),
+ Errors.CommitFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]),
- Errors.InvalidTopicError, False, False),
+ Errors.InvalidTopicError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
- Errors.TopicAuthorizationFailedError, False, False),
+ Errors.TopicAuthorizationFailedError, False),
])
def test_handle_offset_commit_response(mocker, patched_coord, offsets,
- response, error, dead, reassign):
+ response, error, dead):
future = Future()
patched_coord._handle_offset_commit_response(offsets, future, time.time(),
response)
assert isinstance(future.exception, error)
assert patched_coord.coordinator_id is (None if dead else 0)
- if reassign:
- assert patched_coord._generation is Generation.NO_GENERATION
- assert patched_coord.rejoin_needed is True
- assert patched_coord.state is MemberState.UNJOINED
- else:
- assert patched_coord._generation is not Generation.NO_GENERATION
- assert patched_coord.rejoin_needed is False
- assert patched_coord.state is MemberState.STABLE
@pytest.fixture
@@ -570,6 +550,10 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
Errors.GroupLoadInProgressError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
Errors.NotCoordinatorForGroupError, True),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
+ Errors.UnknownMemberIdError, False),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
+ Errors.IllegalGenerationError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
Errors.TopicAuthorizationFailedError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
@@ -627,7 +611,7 @@ def test_ensure_active_group(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True))
- mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, True, False])
+ mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, False])
mocker.patch.object(coordinator, '_on_join_complete')
mocker.patch.object(coordinator, '_heartbeat_thread')