diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-04-02 21:27:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-02 21:27:49 -0700 |
commit | 91d31494d02ea636a991abb4dfb25dd904eefd45 (patch) | |
tree | 5fa58d3648afc1335cc03facf08255da85416267 | |
parent | 27cd93be3e7f2e3f3baca04d2126cf3bb6374668 (diff) | |
download | kafka-python-91d31494d02ea636a991abb4dfb25dd904eefd45.tar.gz |
Do not call state_change_callback with lock (#1775)
-rw-r--r-- | kafka/client_async.py | 16 | ||||
-rw-r--r-- | kafka/conn.py | 34 | ||||
-rw-r--r-- | test/test_client_async.py | 21 |
3 files changed, 40 insertions, 31 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index a86ab55..77efac8 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -260,16 +260,16 @@ class KafkaClient(object): conn = self._conns[node_id] return conn.disconnected() and not conn.blacked_out() - def _conn_state_change(self, node_id, conn): + def _conn_state_change(self, node_id, sock, conn): with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) try: - self._selector.register(conn._sock, selectors.EVENT_WRITE) + self._selector.register(sock, selectors.EVENT_WRITE) except KeyError: - self._selector.modify(conn._sock, selectors.EVENT_WRITE) + self._selector.modify(sock, selectors.EVENT_WRITE) if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() @@ -280,9 +280,9 @@ class KafkaClient(object): self._connecting.remove(node_id) try: - self._selector.modify(conn._sock, selectors.EVENT_READ, conn) + self._selector.modify(sock, selectors.EVENT_READ, conn) except KeyError: - self._selector.register(conn._sock, selectors.EVENT_READ, conn) + self._selector.register(sock, selectors.EVENT_READ, conn) if self._sensors: self._sensors.connection_created.record() @@ -298,11 +298,11 @@ class KafkaClient(object): self._conns.pop(node_id).close() # Connection failures imply that our metadata is stale, so let's refresh - elif conn.state is ConnectionStates.DISCONNECTING: + elif conn.state is ConnectionStates.DISCONNECTED: if node_id in self._connecting: self._connecting.remove(node_id) try: - self._selector.unregister(conn._sock) + self._selector.unregister(sock) except KeyError: pass @@ -369,7 +369,7 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) host, port, afi = get_ip_port_afi(broker.host) - cb = functools.partial(WeakMethod(self._conn_state_change), node_id) + cb = WeakMethod(self._conn_state_change) conn = BrokerConnection(host, broker.port, afi, state_change_callback=cb, node_id=node_id, diff --git a/kafka/conn.py b/kafka/conn.py index a00206f..044d2d5 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -212,7 +212,7 @@ class BrokerConnection(object): 'ssl_ciphers': None, 'api_version': (0, 8, 2), # default to most restrictive 'selector': selectors.DefaultSelector, - 'state_change_callback': lambda conn: True, + 'state_change_callback': lambda node_id, sock, conn: True, 'metrics': None, 'metric_group_prefix': '', 'sasl_mechanism': None, @@ -357,6 +357,7 @@ class BrokerConnection(object): return self.state else: log.debug('%s: creating new socket', self) + assert self._sock is None self._sock_afi, self._sock_addr = next_lookup self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) @@ -366,7 +367,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) log.info('%s: connecting to %s:%d [%s %s]', self, self.host, self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) @@ -386,21 +387,21 @@ class BrokerConnection(object): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', self) self.state = ConnectionStates.HANDSHAKE - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() elif self.config['security_protocol'] == 'SASL_PLAINTEXT': log.debug('%s: initiating SASL authentication', self) self.state = ConnectionStates.AUTHENTICATING - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) else: # security_protocol PLAINTEXT log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -425,7 +426,7 @@ class BrokerConnection(object): log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') @@ -435,7 +436,7 @@ class BrokerConnection(object): log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) if self.state not in (ConnectionStates.CONNECTED, ConnectionStates.DISCONNECTED): @@ -802,15 +803,13 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.KafkaConnectionError. """ + if self.state is ConnectionStates.DISCONNECTED: + return with self._lock: if self.state is ConnectionStates.DISCONNECTED: return log.info('%s: Closing connection. %s', self, error or '') - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) self._update_reconnect_backoff() - self._close_socket() - self.state = ConnectionStates.DISCONNECTED self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -819,9 +818,18 @@ class BrokerConnection(object): error = Errors.Cancelled(str(self)) ifrs = list(self.in_flight_requests.items()) self.in_flight_requests.clear() - self.config['state_change_callback'](self) + self.state = ConnectionStates.DISCONNECTED + # To avoid race conditions and/or deadlocks + # keep a reference to the socket but leave it + # open until after the state_change_callback + # This should give clients a change to deregister + # the socket fd from selectors cleanly. + sock = self._sock + self._sock = None - # drop lock before processing futures + # drop lock before state change callback and processing futures + self.config['state_change_callback'](self.node_id, sock, self) + sock.close() for (_correlation_id, (future, _timestamp)) in ifrs: future.failure(error) diff --git a/test/test_client_async.py b/test/test_client_async.py index 0951cb4..2132c8e 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -95,28 +95,29 @@ def test_conn_state_change(mocker, cli, conn): node_id = 0 cli._conns[node_id] = conn conn.state = ConnectionStates.CONNECTING - cli._conn_state_change(node_id, conn) + sock = conn._sock + cli._conn_state_change(node_id, sock, conn) assert node_id in cli._connecting - sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE) + sel.register.assert_called_with(sock, selectors.EVENT_WRITE) conn.state = ConnectionStates.CONNECTED - cli._conn_state_change(node_id, conn) + cli._conn_state_change(node_id, sock, conn) assert node_id not in cli._connecting - sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn) + sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False - conn.state = ConnectionStates.DISCONNECTING - cli._conn_state_change(node_id, conn) + conn.state = ConnectionStates.DISCONNECTED + cli._conn_state_change(node_id, sock, conn) assert node_id not in cli._connecting assert cli.cluster._need_update is True - sel.unregister.assert_called_with(conn._sock) + sel.unregister.assert_called_with(sock) conn.state = ConnectionStates.CONNECTING - cli._conn_state_change(node_id, conn) + cli._conn_state_change(node_id, sock, conn) assert node_id in cli._connecting - conn.state = ConnectionStates.DISCONNECTING - cli._conn_state_change(node_id, conn) + conn.state = ConnectionStates.DISCONNECTED + cli._conn_state_change(node_id, sock, conn) assert node_id not in cli._connecting |