summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-01-10 17:25:33 -0800
committerGitHub <noreply@github.com>2018-01-10 17:25:33 -0800
commita69320b8e3199fa9d7cfa3947a242e699a045c3b (patch)
treeb9a3dce448c76605e9543f6ab61f6ea41d891d61
parent0a7492443c78d4791cfdf3d6384c02f1c7757c7b (diff)
downloadkafka-python-a69320b8e3199fa9d7cfa3947a242e699a045c3b.tar.gz
Read all available socket bytes (#1332)
* Recv all available network bytes before parsing * Add experimental support for configuring socket chunking parameters
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py44
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/producer/kafka.py2
4 files changed, 28 insertions, 22 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0058cf3..29cb8c0 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -154,6 +154,8 @@ class KafkaClient(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
diff --git a/kafka/conn.py b/kafka/conn.py
index 1e6770f..1243bdb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -180,6 +180,8 @@ class BrokerConnection(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
@@ -748,19 +750,21 @@ class BrokerConnection(object):
return responses
def _recv(self):
- responses = []
- SOCK_CHUNK_BYTES = 4096
- while True:
+ """Take all available bytes from socket, return list of any responses from parser"""
+ recvd = []
+ while len(recvd) < self.config['sock_chunk_buffer_count']:
try:
- data = self._sock.recv(SOCK_CHUNK_BYTES)
- # We expect socket.recv to raise an exception if there is not
- # enough data to read the full bytes_to_read
+ data = self._sock.recv(self.config['sock_chunk_bytes'])
+ # We expect socket.recv to raise an exception if there are no
+ # bytes available to read from the socket in non-blocking mode.
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
- break
+ return []
+ else:
+ recvd.append(data)
except SSLWantReadError:
break
@@ -770,27 +774,23 @@ class BrokerConnection(object):
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
- break
+ return []
except BlockingIOError:
if six.PY3:
break
raise
- if self._sensors:
- self._sensors.bytes_received.record(len(data))
-
- try:
- more_responses = self._protocol.receive_bytes(data)
- except Errors.KafkaProtocolError as e:
- self.close(e)
- break
- else:
- responses.extend([resp for (_, resp) in more_responses])
-
- if len(data) < SOCK_CHUNK_BYTES:
- break
+ recvd_data = b''.join(recvd)
+ if self._sensors:
+ self._sensors.bytes_received.record(len(recvd_data))
- return responses
+ try:
+ responses = self._protocol.receive_bytes(recvd_data)
+ except Errors.KafkaProtocolError as e:
+ self.close(e)
+ return []
+ else:
+ return [resp for (_, resp) in responses] # drop correlation id
def requests_timed_out(self):
if self.in_flight_requests:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 7c345e7..0224d16 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -270,6 +270,8 @@ class KafkaConsumer(six.Iterator):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e0c8a41..d24236a 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -292,6 +292,8 @@ class KafkaProducer(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 1000,
'max_in_flight_requests_per_connection': 5,