summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-03 08:47:10 -0700
committerGitHub <noreply@github.com>2019-09-03 08:47:10 -0700
commit7a69952e956412f45b1eed1e217931e3ec33f2e7 (patch)
tree2d8eca7d1f9e89a5abdfc5578cfb5df801b218df
parent61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa (diff)
downloadkafka-python-7a69952e956412f45b1eed1e217931e3ec33f2e7.tar.gz
Improve connection lock handling; always use context manager (#1895)
-rw-r--r--kafka/conn.py277
1 files changed, 151 insertions, 126 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 5ef141c..99466d9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -593,21 +593,30 @@ class BrokerConnection(object):
self.config['sasl_plain_username'],
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
- try:
- with self._lock:
- if not self._can_send_recv():
- return future.failure(Errors.NodeNotReadyError(str(self)))
- self._send_bytes_blocking(size + msg)
- # The server will send a zero sized message (that is Int32(0)) on success.
- # The connection is closed on failure
- data = self._recv_bytes_blocking(4)
+ err = None
+ close = False
+ with self._lock:
+ if not self._can_send_recv():
+ err = Errors.NodeNotReadyError(str(self))
+ close = False
+ else:
+ try:
+ self._send_bytes_blocking(size + msg)
+
+ # The server will send a zero sized message (that is Int32(0)) on success.
+ # The connection is closed on failure
+ data = self._recv_bytes_blocking(4)
- except (ConnectionError, TimeoutError) as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.KafkaConnectionError("%s: %s" % (self, e))
- self.close(error=error)
- return future.failure(error)
+ except (ConnectionError, TimeoutError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ err = Errors.KafkaConnectionError("%s: %s" % (self, e))
+ close = True
+
+ if err is not None:
+ if close:
+ self.close(error=err)
+ return future.failure(err)
if data != b'\x00\x00\x00\x00':
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
@@ -625,61 +634,67 @@ class BrokerConnection(object):
).canonicalize(gssapi.MechType.kerberos)
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
- self._lock.acquire()
- if not self._can_send_recv():
- return future.failure(Errors.NodeNotReadyError(str(self)))
- # Establish security context and negotiate protection level
- # For reference RFC 2222, section 7.2.1
- try:
- # Exchange tokens until authentication either succeeds or fails
- client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
- received_token = None
- while not client_ctx.complete:
- # calculate an output token from kafka token (or None if first iteration)
- output_token = client_ctx.step(received_token)
-
- # pass output token to kafka, or send empty response if the security
- # context is complete (output token is None in that case)
- if output_token is None:
- self._send_bytes_blocking(Int32.encode(0))
- else:
- msg = output_token
+ err = None
+ close = False
+ with self._lock:
+ if not self._can_send_recv():
+ err = Errors.NodeNotReadyError(str(self))
+ close = False
+ else:
+ # Establish security context and negotiate protection level
+ # For reference RFC 2222, section 7.2.1
+ try:
+ # Exchange tokens until authentication either succeeds or fails
+ client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
+ received_token = None
+ while not client_ctx.complete:
+ # calculate an output token from kafka token (or None if first iteration)
+ output_token = client_ctx.step(received_token)
+
+ # pass output token to kafka, or send empty response if the security
+ # context is complete (output token is None in that case)
+ if output_token is None:
+ self._send_bytes_blocking(Int32.encode(0))
+ else:
+ msg = output_token
+ size = Int32.encode(len(msg))
+ self._send_bytes_blocking(size + msg)
+
+ # The server will send a token back. Processing of this token either
+ # establishes a security context, or it needs further token exchange.
+ # The gssapi will be able to identify the needed next step.
+ # The connection is closed on failure.
+ header = self._recv_bytes_blocking(4)
+ (token_size,) = struct.unpack('>i', header)
+ received_token = self._recv_bytes_blocking(token_size)
+
+ # Process the security layer negotiation token, sent by the server
+ # once the security context is established.
+
+ # unwraps message containing supported protection levels and msg size
+ msg = client_ctx.unwrap(received_token).message
+ # Kafka currently doesn't support integrity or confidentiality security layers, so we
+ # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
+ # by the server
+ msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:]
+ # add authorization identity to the response, GSS-wrap and send it
+ msg = client_ctx.wrap(msg + auth_id.encode(), False).message
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)
- # The server will send a token back. Processing of this token either
- # establishes a security context, or it needs further token exchange.
- # The gssapi will be able to identify the needed next step.
- # The connection is closed on failure.
- header = self._recv_bytes_blocking(4)
- (token_size,) = struct.unpack('>i', header)
- received_token = self._recv_bytes_blocking(token_size)
-
- # Process the security layer negotiation token, sent by the server
- # once the security context is established.
-
- # unwraps message containing supported protection levels and msg size
- msg = client_ctx.unwrap(received_token).message
- # Kafka currently doesn't support integrity or confidentiality security layers, so we
- # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
- # by the server
- msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:]
- # add authorization identity to the response, GSS-wrap and send it
- msg = client_ctx.wrap(msg + auth_id.encode(), False).message
- size = Int32.encode(len(msg))
- self._send_bytes_blocking(size + msg)
+ except (ConnectionError, TimeoutError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ err = Errors.KafkaConnectionError("%s: %s" % (self, e))
+ close = True
+ except Exception as e:
+ err = e
+ close = True
- except (ConnectionError, TimeoutError) as e:
- self._lock.release()
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.KafkaConnectionError("%s: %s" % (self, e))
- self.close(error=error)
- return future.failure(error)
- except Exception as e:
- self._lock.release()
- return future.failure(e)
+ if err is not None:
+ if close:
+ self.close(error=err)
+ return future.failure(err)
- self._lock.release()
log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
return future.success(True)
@@ -688,25 +703,31 @@ class BrokerConnection(object):
msg = bytes(self._build_oauth_client_request().encode("utf-8"))
size = Int32.encode(len(msg))
- self._lock.acquire()
- if not self._can_send_recv():
- return future.failure(Errors.NodeNotReadyError(str(self)))
- try:
- # Send SASL OAuthBearer request with OAuth token
- self._send_bytes_blocking(size + msg)
- # The server will send a zero sized message (that is Int32(0)) on success.
- # The connection is closed on failure
- data = self._recv_bytes_blocking(4)
+ err = None
+ close = False
+ with self._lock:
+ if not self._can_send_recv():
+ err = Errors.NodeNotReadyError(str(self))
+ close = False
+ else:
+ try:
+ # Send SASL OAuthBearer request with OAuth token
+ self._send_bytes_blocking(size + msg)
- except (ConnectionError, TimeoutError) as e:
- self._lock.release()
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.KafkaConnectionError("%s: %s" % (self, e))
- self.close(error=error)
- return future.failure(error)
+ # The server will send a zero sized message (that is Int32(0)) on success.
+ # The connection is closed on failure
+ data = self._recv_bytes_blocking(4)
- self._lock.release()
+ except (ConnectionError, TimeoutError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ err = Errors.KafkaConnectionError("%s: %s" % (self, e))
+ close = True
+
+ if err is not None:
+ if close:
+ self.close(error=err)
+ return future.failure(err)
if data != b'\x00\x00\x00\x00':
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
@@ -857,6 +878,9 @@ class BrokerConnection(object):
future = Future()
with self._lock:
if not self._can_send_recv():
+ # In this case, since we created the future above,
+ # we know there are no callbacks/errbacks that could fire w/
+ # lock. So failing + returning inline should be safe
return future.failure(Errors.NodeNotReadyError(str(self)))
correlation_id = self._protocol.send_request(request)
@@ -935,56 +959,57 @@ class BrokerConnection(object):
def _recv(self):
"""Take all available bytes from socket, return list of any responses from parser"""
recvd = []
- self._lock.acquire()
- if not self._can_send_recv():
- log.warning('%s cannot recv: socket not connected', self)
- self._lock.release()
- return ()
-
- while len(recvd) < self.config['sock_chunk_buffer_count']:
- try:
- data = self._sock.recv(self.config['sock_chunk_bytes'])
- # We expect socket.recv to raise an exception if there are no
- # bytes available to read from the socket in non-blocking mode.
- # but if the socket is disconnected, we will get empty data
- # without an exception raised
- if not data:
- log.error('%s: socket disconnected', self)
- self._lock.release()
- self.close(error=Errors.KafkaConnectionError('socket disconnected'))
- return []
- else:
- recvd.append(data)
+ err = None
+ with self._lock:
+ if not self._can_send_recv():
+ log.warning('%s cannot recv: socket not connected', self)
+ return ()
- except SSLWantReadError:
- break
- except (ConnectionError, TimeoutError) as e:
- if six.PY2 and e.errno == errno.EWOULDBLOCK:
+ while len(recvd) < self.config['sock_chunk_buffer_count']:
+ try:
+ data = self._sock.recv(self.config['sock_chunk_bytes'])
+ # We expect socket.recv to raise an exception if there are no
+ # bytes available to read from the socket in non-blocking mode.
+ # but if the socket is disconnected, we will get empty data
+ # without an exception raised
+ if not data:
+ log.error('%s: socket disconnected', self)
+ err = Errors.KafkaConnectionError('socket disconnected')
+ break
+ else:
+ recvd.append(data)
+
+ except SSLWantReadError:
break
- log.exception('%s: Error receiving network data'
- ' closing socket', self)
- self._lock.release()
- self.close(error=Errors.KafkaConnectionError(e))
- return []
- except BlockingIOError:
- if six.PY3:
+ except (ConnectionError, TimeoutError) as e:
+ if six.PY2 and e.errno == errno.EWOULDBLOCK:
+ break
+ log.exception('%s: Error receiving network data'
+ ' closing socket', self)
+ err = Errors.KafkaConnectionError(e)
break
- self._lock.release()
- raise
-
- recvd_data = b''.join(recvd)
- if self._sensors:
- self._sensors.bytes_received.record(len(recvd_data))
-
- try:
- responses = self._protocol.receive_bytes(recvd_data)
- except Errors.KafkaProtocolError as e:
- self._lock.release()
- self.close(e)
- return []
- else:
- self._lock.release()
- return responses
+ except BlockingIOError:
+ if six.PY3:
+ break
+ # For PY2 this is a catchall and should be re-raised
+ raise
+
+ # Only process bytes if there was no connection exception
+ if err is None:
+ recvd_data = b''.join(recvd)
+ if self._sensors:
+ self._sensors.bytes_received.record(len(recvd_data))
+
+ # We need to keep the lock through protocol receipt
+ # so that we ensure that the processed byte order is the
+ # same as the received byte order
+ try:
+ return self._protocol.receive_bytes(recvd_data)
+ except Errors.KafkaProtocolError as e:
+ err = e
+
+ self.close(error=err)
+ return ()
def requests_timed_out(self):
with self._lock: