summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-06 19:11:23 -0800
committerGitHub <noreply@github.com>2019-03-06 19:11:23 -0800
commit7a99013668b798aaa0acffcf382a7e48e7bd41c1 (patch)
tree1a6815529211d0f9f99ae05f4bc93dddcbd00baf
parent37699be51f868bd736e0fd595f2afc4c03b00ca4 (diff)
downloadkafka-python-7a99013668b798aaa0acffcf382a7e48e7bd41c1.tar.gz
Do not require client lock for read-only operations (#1730)
In an effort to reduce the surface area of lock coordination, and thereby hopefully reduce lock contention, I think we can remove locking from the read-only KafkaClient methods: connected, is_disconnected, in_flight_request_count, and least_loaded_node . Given that the read data could change after the lock is released but before the caller uses it, the value of acquiring a lock here does not seem high to me.
-rw-r--r--kafka/client_async.py100
1 files changed, 50 insertions, 50 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b0d1f5e..e2bdda9 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -402,10 +402,10 @@ class KafkaClient(object):
def connected(self, node_id):
"""Return True iff the node_id is connected."""
- with self._lock:
- if node_id not in self._conns:
- return False
- return self._conns[node_id].connected()
+ conn = self._conns.get(node_id)
+ if conn is None:
+ return False
+ return conn.connected()
def _close(self):
if not self._closed:
@@ -448,10 +448,10 @@ class KafkaClient(object):
Returns:
bool: True iff the node exists and is disconnected
"""
- with self._lock:
- if node_id not in self._conns:
- return False
- return self._conns[node_id].disconnected()
+ conn = self._conns.get(node_id)
+ if conn is None:
+ return False
+ return conn.disconnected()
def connection_delay(self, node_id):
"""
@@ -467,10 +467,10 @@ class KafkaClient(object):
Returns:
int: The number of milliseconds to wait.
"""
- with self._lock:
- if node_id not in self._conns:
- return 0
- return self._conns[node_id].connection_delay()
+ conn = self._conns.get(node_id)
+ if conn is None:
+ return 0
+ return conn.connection_delay()
def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
@@ -656,13 +656,14 @@ class KafkaClient(object):
Returns:
int: pending in-flight requests for the node, or all nodes if None
"""
- with self._lock:
- if node_id is not None:
- if node_id not in self._conns:
- return 0
- return len(self._conns[node_id].in_flight_requests)
- else:
- return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
+ if node_id is not None:
+ conn = self._conns.get(node_id)
+ if conn is None:
+ return 0
+ return len(conn.in_flight_requests)
+ else:
+ return sum([len(conn.in_flight_requests)
+ for conn in list(self._conns.values())])
def _fire_pending_completed_requests(self):
responses = []
@@ -689,38 +690,37 @@ class KafkaClient(object):
Returns:
node_id or None if no suitable node was found
"""
- with self._lock:
- nodes = [broker.nodeId for broker in self.cluster.brokers()]
- random.shuffle(nodes)
-
- inflight = float('inf')
- found = None
- for node_id in nodes:
- conn = self._conns.get(node_id)
- connected = conn is not None and conn.connected()
- blacked_out = conn is not None and conn.blacked_out()
- curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
- if connected and curr_inflight == 0:
- # if we find an established connection
- # with no in-flight requests, we can stop right away
- return node_id
- elif not blacked_out and curr_inflight < inflight:
- # otherwise if this is the best we have found so far, record that
- inflight = curr_inflight
- found = node_id
-
- if found is not None:
- return found
-
- # some broker versions return an empty list of broker metadata
- # if there are no topics created yet. the bootstrap process
- # should detect this and keep a 'bootstrap' node alive until
- # a non-bootstrap node is connected and non-empty broker
- # metadata is available
- elif 'bootstrap' in self._conns:
- return 'bootstrap'
+ nodes = [broker.nodeId for broker in self.cluster.brokers()]
+ random.shuffle(nodes)
- return None
+ inflight = float('inf')
+ found = None
+ for node_id in nodes:
+ conn = self._conns.get(node_id)
+ connected = conn is not None and conn.connected()
+ blacked_out = conn is not None and conn.blacked_out()
+ curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
+ if connected and curr_inflight == 0:
+ # if we find an established connection
+ # with no in-flight requests, we can stop right away
+ return node_id
+ elif not blacked_out and curr_inflight < inflight:
+ # otherwise if this is the best we have found so far, record that
+ inflight = curr_inflight
+ found = node_id
+
+ if found is not None:
+ return found
+
+ # some broker versions return an empty list of broker metadata
+ # if there are no topics created yet. the bootstrap process
+ # should detect this and keep a 'bootstrap' node alive until
+ # a non-bootstrap node is connected and non-empty broker
+ # metadata is available
+ elif 'bootstrap' in self._conns:
+ return 'bootstrap'
+
+ return None
def set_topics(self, topics):
"""Set specific topics to track for metadata.