summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-06-19 13:41:59 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-06-19 13:41:59 -0700
commit91f4642e92afc208531f66cea1ed7ef32bcfa4d1 (patch)
tree82e08036d43a297c865d4766b4198bac951ee9bb
parentf126e5bfcc8f41ee5ea29b41ec6eabbc3f441647 (diff)
downloadkafka-python-91f4642e92afc208531f66cea1ed7ef32bcfa4d1.tar.gz
Use dedicated connection for group coordinator (#1822)
This changes the coordinator_id to be a unique string, e.g., `coordinator-1`, so that it will get a dedicated connection. This won't eliminate lock contention because the client lock applies to all connections, but it should improve in-flight-request contention.
-rw-r--r--kafka/cluster.py36
-rw-r--r--kafka/coordinator/base.py6
2 files changed, 17 insertions, 25 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 4169549..19137de 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -65,6 +65,7 @@ class ClusterMetadata(object):
self.config[key] = configs[key]
self._bootstrap_brokers = self._generate_bootstrap_brokers()
+ self._coordinator_brokers = {}
def _generate_bootstrap_brokers(self):
# collect_hosts does not perform DNS, so we should be fine to re-use
@@ -96,7 +97,11 @@ class ClusterMetadata(object):
Returns:
BrokerMetadata or None if not found
"""
- return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)
+ return (
+ self._brokers.get(broker_id) or
+ self._bootstrap_brokers.get(broker_id) or
+ self._coordinator_brokers.get(broker_id)
+ )
def partitions_for_topic(self, topic):
"""Return set of all partitions for topic (whether available or not)
@@ -341,41 +346,28 @@ class ClusterMetadata(object):
response (GroupCoordinatorResponse): broker response
Returns:
- bool: True if metadata is updated, False on error
+ string: coordinator node_id if metadata is updated, None on error
"""
log.debug("Updating coordinator for %s: %s", group, response)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
log.error("GroupCoordinatorResponse error: %s", error_type)
self._groups[group] = -1
- return False
+ return
- node_id = response.coordinator_id
+ # Use a coordinator-specific node id so that group requests
+ # get a dedicated connection
+ node_id = 'coordinator-{}'.format(response.coordinator_id)
coordinator = BrokerMetadata(
- response.coordinator_id,
+ node_id,
response.host,
response.port,
None)
- # Assume that group coordinators are just brokers
- # (this is true now, but could diverge in future)
- if node_id not in self._brokers:
- self._brokers[node_id] = coordinator
-
- # If this happens, either brokers have moved without
- # changing IDs, or our assumption above is wrong
- else:
- node = self._brokers[node_id]
- if coordinator.host != node.host or coordinator.port != node.port:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, node)
- self._groups[group] = node_id
- return False
-
log.info("Group coordinator for %s is %s", group, coordinator)
+ self._coordinator_brokers[node_id] = coordinator
self._groups[group] = node_id
- return True
+ return node_id
def with_partitions(self, partitions_to_add):
"""Returns a copy of cluster metadata with partitions added"""
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index e538fda..421360e 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -676,14 +676,14 @@ class BaseCoordinator(object):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
with self._client._lock, self._lock:
- ok = self._client.cluster.add_group_coordinator(self.group_id, response)
- if not ok:
+ coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response)
+ if not coordinator_id:
# This could happen if coordinator metadata is different
# than broker metadata
future.failure(Errors.IllegalStateError())
return
- self.coordinator_id = response.coordinator_id
+ self.coordinator_id = coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
self._client.maybe_connect(self.coordinator_id)