diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-01-10 17:25:33 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-10 17:25:33 -0800 |
commit | a69320b8e3199fa9d7cfa3947a242e699a045c3b (patch) | |
tree | b9a3dce448c76605e9543f6ab61f6ea41d891d61 | |
parent | 0a7492443c78d4791cfdf3d6384c02f1c7757c7b (diff) | |
download | kafka-python-a69320b8e3199fa9d7cfa3947a242e699a045c3b.tar.gz |
Read all available socket bytes (#1332)
* Recv all available network bytes before parsing
* Add experimental support for configuring socket chunking parameters
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 44 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 |
4 files changed, 28 insertions, 22 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0058cf3..29cb8c0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,8 @@ class KafkaClient(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'security_protocol': 'PLAINTEXT', diff --git a/kafka/conn.py b/kafka/conn.py index 1e6770f..1243bdb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -180,6 +180,8 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, @@ -748,19 +750,21 @@ class BrokerConnection(object): return responses def _recv(self): - responses = [] - SOCK_CHUNK_BYTES = 4096 - while True: + """Take all available bytes from socket, return list of any responses from parser""" + recvd = [] + while len(recvd) < self.config['sock_chunk_buffer_count']: try: - data = self._sock.recv(SOCK_CHUNK_BYTES) - # We expect socket.recv to raise an exception if there is not - # enough data to read the full bytes_to_read + data = self._sock.recv(self.config['sock_chunk_bytes']) + # We expect socket.recv to raise an exception if there are no + # bytes available to read from the socket in non-blocking mode. # but if the socket is disconnected, we will get empty data # without an exception raised if not data: log.error('%s: socket disconnected', self) self.close(error=Errors.ConnectionError('socket disconnected')) - break + return [] + else: + recvd.append(data) except SSLWantReadError: break @@ -770,27 +774,23 @@ class BrokerConnection(object): log.exception('%s: Error receiving network data' ' closing socket', self) self.close(error=Errors.ConnectionError(e)) - break + return [] except BlockingIOError: if six.PY3: break raise - if self._sensors: - self._sensors.bytes_received.record(len(data)) - - try: - more_responses = self._protocol.receive_bytes(data) - except Errors.KafkaProtocolError as e: - self.close(e) - break - else: - responses.extend([resp for (_, resp) in more_responses]) - - if len(data) < SOCK_CHUNK_BYTES: - break + recvd_data = b''.join(recvd) + if self._sensors: + self._sensors.bytes_received.record(len(recvd_data)) - return responses + try: + responses = self._protocol.receive_bytes(recvd_data) + except Errors.KafkaProtocolError as e: + self.close(e) + return [] + else: + return [resp for (_, resp) in responses] # drop correlation id def requests_timed_out(self): if self.in_flight_requests: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c345e7..0224d16 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -270,6 +270,8 @@ class KafkaConsumer(six.Iterator): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'consumer_timeout_ms': float('inf'), 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e0c8a41..d24236a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -292,6 +292,8 @@ class KafkaProducer(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'reconnect_backoff_ms': 50, 'reconnect_backoff_max': 1000, 'max_in_flight_requests_per_connection': 5, |