summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-31 10:12:26 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-31 16:31:57 -0700
commit7a568a9681e67941d2d9f9f8f1740d538f9f1f42 (patch)
treed2ac9d2d6e03705d5073626f796c652d0c9090c6
parentb1effa24aca3a6bcf2268354caae12ee82d6b36d (diff)
downloadkafka-python-lock_client_check_version.tar.gz
lock client.check_versionlock_client_check_version
-rw-r--r--kafka/client_async.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b6adb77..ba5c960 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -845,6 +845,7 @@ class KafkaClient(object):
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
+ self._lock.acquire()
end = time.time() + timeout
while time.time() < end:
@@ -852,6 +853,7 @@ class KafkaClient(object):
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
+ self._lock.release()
raise Errors.NoBrokersAvailable()
self._maybe_connect(try_node)
conn = self._conns[try_node]
@@ -866,16 +868,19 @@ class KafkaClient(object):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
+ self._lock.release()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
if node_id is not None:
+ self._lock.release()
raise
finally:
self._refresh_on_disconnects = True
# Timeout
else:
+ self._lock.release()
raise Errors.NoBrokersAvailable()
def wakeup(self):