diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-06 19:11:23 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-06 19:11:23 -0800 |
commit | 7a99013668b798aaa0acffcf382a7e48e7bd41c1 (patch) | |
tree | 1a6815529211d0f9f99ae05f4bc93dddcbd00baf /kafka | |
parent | 37699be51f868bd736e0fd595f2afc4c03b00ca4 (diff) | |
download | kafka-python-7a99013668b798aaa0acffcf382a7e48e7bd41c1.tar.gz |
Do not require client lock for read-only operations (#1730)
In an effort to reduce the surface area of lock coordination, and thereby hopefully reduce lock contention, I think we can remove locking from the read-only KafkaClient methods: connected, is_disconnected, in_flight_request_count, and least_loaded_node . Given that the read data could change after the lock is released but before the caller uses it, the value of acquiring a lock here does not seem high to me.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client_async.py | 100 |
1 files changed, 50 insertions, 50 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b0d1f5e..e2bdda9 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -402,10 +402,10 @@ class KafkaClient(object): def connected(self, node_id): """Return True iff the node_id is connected.""" - with self._lock: - if node_id not in self._conns: - return False - return self._conns[node_id].connected() + conn = self._conns.get(node_id) + if conn is None: + return False + return conn.connected() def _close(self): if not self._closed: @@ -448,10 +448,10 @@ class KafkaClient(object): Returns: bool: True iff the node exists and is disconnected """ - with self._lock: - if node_id not in self._conns: - return False - return self._conns[node_id].disconnected() + conn = self._conns.get(node_id) + if conn is None: + return False + return conn.disconnected() def connection_delay(self, node_id): """ @@ -467,10 +467,10 @@ class KafkaClient(object): Returns: int: The number of milliseconds to wait. """ - with self._lock: - if node_id not in self._conns: - return 0 - return self._conns[node_id].connection_delay() + conn = self._conns.get(node_id) + if conn is None: + return 0 + return conn.connection_delay() def is_ready(self, node_id, metadata_priority=True): """Check whether a node is ready to send more requests. @@ -656,13 +656,14 @@ class KafkaClient(object): Returns: int: pending in-flight requests for the node, or all nodes if None """ - with self._lock: - if node_id is not None: - if node_id not in self._conns: - return 0 - return len(self._conns[node_id].in_flight_requests) - else: - return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) + if node_id is not None: + conn = self._conns.get(node_id) + if conn is None: + return 0 + return len(conn.in_flight_requests) + else: + return sum([len(conn.in_flight_requests) + for conn in list(self._conns.values())]) def _fire_pending_completed_requests(self): responses = [] @@ -689,38 +690,37 @@ class KafkaClient(object): Returns: node_id or None if no suitable node was found """ - with self._lock: - nodes = [broker.nodeId for broker in self.cluster.brokers()] - random.shuffle(nodes) - - inflight = float('inf') - found = None - for node_id in nodes: - conn = self._conns.get(node_id) - connected = conn is not None and conn.connected() - blacked_out = conn is not None and conn.blacked_out() - curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 - if connected and curr_inflight == 0: - # if we find an established connection - # with no in-flight requests, we can stop right away - return node_id - elif not blacked_out and curr_inflight < inflight: - # otherwise if this is the best we have found so far, record that - inflight = curr_inflight - found = node_id - - if found is not None: - return found - - # some broker versions return an empty list of broker metadata - # if there are no topics created yet. the bootstrap process - # should detect this and keep a 'bootstrap' node alive until - # a non-bootstrap node is connected and non-empty broker - # metadata is available - elif 'bootstrap' in self._conns: - return 'bootstrap' + nodes = [broker.nodeId for broker in self.cluster.brokers()] + random.shuffle(nodes) - return None + inflight = float('inf') + found = None + for node_id in nodes: + conn = self._conns.get(node_id) + connected = conn is not None and conn.connected() + blacked_out = conn is not None and conn.blacked_out() + curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 + if connected and curr_inflight == 0: + # if we find an established connection + # with no in-flight requests, we can stop right away + return node_id + elif not blacked_out and curr_inflight < inflight: + # otherwise if this is the best we have found so far, record that + inflight = curr_inflight + found = node_id + + if found is not None: + return found + + # some broker versions return an empty list of broker metadata + # if there are no topics created yet. the bootstrap process + # should detect this and keep a 'bootstrap' node alive until + # a non-bootstrap node is connected and non-empty broker + # metadata is available + elif 'bootstrap' in self._conns: + return 'bootstrap' + + return None def set_topics(self, topics): """Set specific topics to track for metadata. |