summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-19 08:26:30 -0700
committerGitHub <noreply@github.com>2016-06-19 08:26:30 -0700
commit461ccbd9ecf06722c9ff73f6ed439be4b8391672 (patch)
treea4bc99969cce7b0f3b3f18ae85658cff6bc33117
parent0b5a49e58d16336c1a632a4f5e42bc4fbbb3d118 (diff)
downloadkafka-python-461ccbd9ecf06722c9ff73f6ed439be4b8391672.tar.gz
check_version should scan nodes until version found or timeout (#731)
* Mute all connection logging during conn.check_version * Always process pending MetadataRequest in conn.check_version * KakfaClient.check_version: Scan all brokers until a version is identified or timeout
-rw-r--r--kafka/client_async.py53
-rw-r--r--kafka/conn.py10
2 files changed, 46 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 1276743..8916a3e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -662,20 +662,49 @@ class KafkaClient(object):
self._delayed_tasks.remove(task)
def check_version(self, node_id=None, timeout=2, strict=False):
- """Attempt to guess the broker version"""
- if node_id is None:
- node_id = self.least_loaded_node()
- if node_id is None:
+ """Attempt to guess a broker version
+
+ Note: it is possible that this method blocks longer than the
+ specified timeout. This can happen if the entire cluster
+ is down and the client enters a bootstrap backoff sleep.
+ This is only possible if node_id is None.
+
+ Returns: version str, i.e. '0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'
+
+ Raises:
+ NodeNotReadyError (if node_id is provided)
+ NoBrokersAvailable (if node_id is None)
+ UnrecognizedBrokerVersion: please file bug if seen!
+ AssertionError (if strict=True): please file bug if seen!
+ """
+ end = time.time() + timeout
+ while time.time() < end:
+
+ # It is possible that least_loaded_node falls back to bootstrap,
+ # which can block for an increasing backoff period
+ try_node = node_id or self.least_loaded_node()
+ if try_node is None:
raise Errors.NoBrokersAvailable()
+ self._maybe_connect(try_node)
+ conn = self._conns[try_node]
- # We will be intentionally causing socket failures
- # and should not trigger metadata refresh
- self._refresh_on_disconnects = False
- self._maybe_connect(node_id)
- conn = self._conns[node_id]
- version = conn.check_version()
- self._refresh_on_disconnects = True
- return version
+ # We will intentionally cause socket failures
+ # These should not trigger metadata refresh
+ self._refresh_on_disconnects = False
+ try:
+ remaining = end - time.time()
+ version = conn.check_version(timeout=remaining, strict=strict)
+ return version
+ except Errors.NodeNotReadyError:
+ # Only raise to user if this is a node-specific request
+ if node_id is not None:
+ raise
+ finally:
+ self._refresh_on_disconnects = True
+
+ # Timeout
+ else:
+ raise Errors.NoBrokersAvailable()
def wakeup(self):
if self._wake_w.send(b'x') != 1:
diff --git a/kafka/conn.py b/kafka/conn.py
index c5d3be1..005dd7e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -561,9 +561,9 @@ class BrokerConnection(object):
class ConnFilter(Filter):
def filter(self, record):
- if record.funcName in ('recv', 'send'):
- return False
- return True
+ if record.funcName == 'check_version':
+ return True
+ return False
log_filter = ConnFilter()
log.addFilter(log_filter)
@@ -598,11 +598,11 @@ class BrokerConnection(object):
# the attempt to write to a disconnected socket should
# immediately fail and allow us to infer that the prior
# request was unrecognized
- self.send(MetadataRequest[0]([]))
+ mr = self.send(MetadataRequest[0]([]))
if self._sock:
self._sock.setblocking(True)
- while not f.is_done:
+ while not (f.is_done and mr.is_done):
self.recv()
if self._sock:
self._sock.setblocking(False)