diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-03-09 14:54:06 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-09 14:54:06 -0500 |
commit | 1ffdd5caf7f10fb5372780cb9a5ac4a906cac342 (patch) | |
tree | 927d74134f29bab100d5170711b339ca31484132 | |
parent | 4cbeb2e591447ba25271c4924393e602ba49b324 (diff) | |
download | kafka-python-1ffdd5caf7f10fb5372780cb9a5ac4a906cac342.tar.gz |
Add BrokerConnection.connect_blocking() (#1411)
-rw-r--r-- | kafka/client.py | 12 | ||||
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | kafka/conn.py | 64 | ||||
-rw-r--r-- | test/conftest.py | 1 | ||||
-rw-r--r-- | test/test_client_async.py | 8 |
5 files changed, 55 insertions, 36 deletions
diff --git a/kafka/client.py b/kafka/client.py index 369dc97..10b1724 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -71,17 +71,7 @@ class SimpleClient(object): ) conn = self._conns[host_key] - conn.connect() - if conn.connected(): - return conn - - timeout = time.time() + self.timeout - while time.time() < timeout and conn.connecting(): - if conn.connect() is ConnectionStates.CONNECTED: - break - else: - time.sleep(0.05) - else: + if not conn.connect_blocking(self.timeout): conn.close() raise ConnectionError("%s:%s (%s)" % (host, port, afi)) return conn diff --git a/kafka/client_async.py b/kafka/client_async.py index 58155b8..857e4b7 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -257,11 +257,7 @@ class KafkaClient(object): state_change_callback=cb, node_id='bootstrap', **self.config) - bootstrap.connect() - while bootstrap.connecting(): - self._selector.select(1) - bootstrap.connect() - if not bootstrap.connected(): + if not bootstrap.connect_blocking(): bootstrap.close() continue future = bootstrap.send(metadata_request) diff --git a/kafka/conn.py b/kafka/conn.py index b0d6029..4bbd744 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -271,18 +271,58 @@ class BrokerConnection(object): self.config['metric_group_prefix'], self.node_id) + def _dns_lookup(self): + self._gai = dns_lookup(self.host, self.port, self.afi) + if not self._gai: + log.error('DNS lookup failed for %s:%i (%s)', + self.host, self.port, self.afi) + return False + return True + def _next_afi_host_port(self): if not self._gai: - self._gai = dns_lookup(self.host, self.port, self.afi) - if not self._gai: - log.error('DNS lookup failed for %s:%i (%s)', - self.host, self.port, self.afi) + if not self._dns_lookup(): return - afi, _, __, ___, sockaddr = self._gai.pop(0) host, port = sockaddr[:2] return (afi, host, port) + def connect_blocking(self, timeout=float('inf')): + if self.connected(): + return True + timeout += time.time() + # First attempt to perform dns lookup + # note that the underlying interface, socket.getaddrinfo, + # has no explicit timeout so we may exceed the user-specified timeout + while time.time() < timeout: + if self._dns_lookup(): + break + else: + return False + + # Loop once over all returned dns entries + selector = None + while self._gai: + while time.time() < timeout: + self.connect() + if self.connected(): + if selector is not None: + selector.close() + return True + elif self.connecting(): + if selector is None: + selector = self.config['selector']() + selector.register(self._sock, selectors.EVENT_WRITE) + selector.select(1) + elif self.disconnected(): + if selector is not None: + selector.close() + selector = None + break + else: + break + return False + def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): @@ -903,19 +943,9 @@ class BrokerConnection(object): ((0, 8, 0), MetadataRequest[0]([])), ] - def connect(): - self.connect() - if self.connected(): - return - timeout_at = time.time() + timeout - while time.time() < timeout_at and self.connecting(): - if self.connect() is ConnectionStates.CONNECTED: - return - time.sleep(0.05) - raise Errors.NodeNotReadyError() - for version, request in test_cases: - connect() + if not self.connect_blocking(timeout): + raise Errors.NodeNotReadyError() f = self.send(request) # HACK: sleeping to wait for socket to send bytes time.sleep(0.1) diff --git a/test/conftest.py b/test/conftest.py index d53ff23..52ebfb4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -128,6 +128,7 @@ def conn(mocker): return state conn._set_conn_state = _set_conn_state conn.connect.side_effect = lambda: conn.state + conn.connect_blocking.return_value = True conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING, ConnectionStates.HANDSHAKE) conn.connected = lambda: conn.state is ConnectionStates.CONNECTED diff --git a/test/test_client_async.py b/test/test_client_async.py index eece139..eccb564 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -55,21 +55,22 @@ def test_bootstrap_success(conn): kwargs.pop('state_change_callback') kwargs.pop('node_id') assert kwargs == cli.config - conn.connect.assert_called_with() + conn.connect_blocking.assert_called_with() conn.send.assert_called_once_with(MetadataRequest[0]([])) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None), BrokerMetadata(1, 'bar', 34, None)]) + def test_bootstrap_failure(conn): - conn.state = ConnectionStates.DISCONNECTED + conn.connect_blocking.return_value = False cli = KafkaClient(api_version=(0, 9)) args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') kwargs.pop('node_id') assert kwargs == cli.config - conn.connect.assert_called_with() + conn.connect_blocking.assert_called_with() conn.close.assert_called_with() assert cli._bootstrap_fails == 1 assert cli.cluster.brokers() == set() @@ -95,6 +96,7 @@ def test_can_connect(cli, conn): conn.blacked_out.return_value = True assert not cli._can_connect(0) + def test_maybe_connect(cli, conn): try: # Node not in metadata, raises AssertionError |