diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-19 08:26:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-19 08:26:30 -0700 |
commit | 461ccbd9ecf06722c9ff73f6ed439be4b8391672 (patch) | |
tree | a4bc99969cce7b0f3b3f18ae85658cff6bc33117 | |
parent | 0b5a49e58d16336c1a632a4f5e42bc4fbbb3d118 (diff) | |
download | kafka-python-461ccbd9ecf06722c9ff73f6ed439be4b8391672.tar.gz |
check_version should scan nodes until version found or timeout (#731)
* Mute all connection logging during conn.check_version
* Always process pending MetadataRequest in conn.check_version
* KakfaClient.check_version: Scan all brokers until a version is identified or timeout
-rw-r--r-- | kafka/client_async.py | 53 | ||||
-rw-r--r-- | kafka/conn.py | 10 |
2 files changed, 46 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 1276743..8916a3e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -662,20 +662,49 @@ class KafkaClient(object): self._delayed_tasks.remove(task) def check_version(self, node_id=None, timeout=2, strict=False): - """Attempt to guess the broker version""" - if node_id is None: - node_id = self.least_loaded_node() - if node_id is None: + """Attempt to guess a broker version + + Note: it is possible that this method blocks longer than the + specified timeout. This can happen if the entire cluster + is down and the client enters a bootstrap backoff sleep. + This is only possible if node_id is None. + + Returns: version str, i.e. '0.10', '0.9', '0.8.2', '0.8.1', '0.8.0' + + Raises: + NodeNotReadyError (if node_id is provided) + NoBrokersAvailable (if node_id is None) + UnrecognizedBrokerVersion: please file bug if seen! + AssertionError (if strict=True): please file bug if seen! + """ + end = time.time() + timeout + while time.time() < end: + + # It is possible that least_loaded_node falls back to bootstrap, + # which can block for an increasing backoff period + try_node = node_id or self.least_loaded_node() + if try_node is None: raise Errors.NoBrokersAvailable() + self._maybe_connect(try_node) + conn = self._conns[try_node] - # We will be intentionally causing socket failures - # and should not trigger metadata refresh - self._refresh_on_disconnects = False - self._maybe_connect(node_id) - conn = self._conns[node_id] - version = conn.check_version() - self._refresh_on_disconnects = True - return version + # We will intentionally cause socket failures + # These should not trigger metadata refresh + self._refresh_on_disconnects = False + try: + remaining = end - time.time() + version = conn.check_version(timeout=remaining, strict=strict) + return version + except Errors.NodeNotReadyError: + # Only raise to user if this is a node-specific request + if node_id is not None: + raise + finally: + self._refresh_on_disconnects = True + + # Timeout + else: + raise Errors.NoBrokersAvailable() def wakeup(self): if self._wake_w.send(b'x') != 1: diff --git a/kafka/conn.py b/kafka/conn.py index c5d3be1..005dd7e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -561,9 +561,9 @@ class BrokerConnection(object): class ConnFilter(Filter): def filter(self, record): - if record.funcName in ('recv', 'send'): - return False - return True + if record.funcName == 'check_version': + return True + return False log_filter = ConnFilter() log.addFilter(log_filter) @@ -598,11 +598,11 @@ class BrokerConnection(object): # the attempt to write to a disconnected socket should # immediately fail and allow us to infer that the prior # request was unrecognized - self.send(MetadataRequest[0]([])) + mr = self.send(MetadataRequest[0]([])) if self._sock: self._sock.setblocking(True) - while not f.is_done: + while not (f.is_done and mr.is_done): self.recv() if self._sock: self._sock.setblocking(False) |