diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-09-29 17:04:17 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-29 17:04:17 -0700 |
commit | 392d674be6641078717a4d87e471916c9a4bbb22 (patch) | |
tree | baee4165315b4c99939a944a6a7b893f0c17e36b | |
parent | 9de12d3f03236988a60e6cd79a50ffa5165cf735 (diff) | |
download | kafka-python-392d674be6641078717a4d87e471916c9a4bbb22.tar.gz |
Send socket data via non-blocking IO with send buffer (#1912)
-rw-r--r-- | kafka/client_async.py | 29 | ||||
-rw-r--r-- | kafka/conn.py | 80 | ||||
-rw-r--r-- | kafka/consumer/group.py | 8 | ||||
-rw-r--r-- | test/test_client_async.py | 4 |
4 files changed, 108 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ac2d364..9b9cb8f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -207,6 +207,7 @@ class KafkaClient(object): self._conns = Dict() # object to support weakrefs self._api_versions = None self._connecting = set() + self._sending = set() self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -532,6 +533,7 @@ class KafkaClient(object): # we will need to call send_pending_requests() # to trigger network I/O future = conn.send(request, blocking=False) + self._sending.add(conn) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding @@ -604,14 +606,23 @@ class KafkaClient(object): return responses + def _register_send_sockets(self): + while self._sending: + conn = self._sending.pop() + try: + key = self._selector.get_key(conn._sock) + events = key.events | selectors.EVENT_WRITE + self._selector.modify(key.fileobj, events, key.data) + except KeyError: + self._selector.register(conn._sock, selectors.EVENT_WRITE, conn) + def _poll(self, timeout): # This needs to be locked, but since it is only called from within the # locked section of poll(), there is no additional lock acquisition here processed = set() # Send pending requests first, before polling for responses - for conn in six.itervalues(self._conns): - conn.send_pending_requests() + self._register_send_sockets() start_select = time.time() ready = self._selector.select(timeout) @@ -623,10 +634,24 @@ class KafkaClient(object): if key.fileobj is self._wake_r: self._clear_wake_fd() continue + + # Send pending requests if socket is ready to write if events & selectors.EVENT_WRITE: conn = key.data if conn.connecting(): conn.connect() + else: + if conn.send_pending_requests_v2(): + # If send is complete, we dont need to track write readiness + # for this socket anymore + if key.events ^ selectors.EVENT_WRITE: + self._selector.modify( + key.fileobj, + key.events ^ selectors.EVENT_WRITE, + key.data) + else: + self._selector.unregister(key.fileobj) + if not (events & selectors.EVENT_READ): continue conn = key.data diff --git a/kafka/conn.py b/kafka/conn.py index 5ea5436..815065b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -289,6 +289,7 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() self._sock = None + self._send_buffer = b'' self._ssl_context = None if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] @@ -557,6 +558,32 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _send_bytes(self, data): + """Send some data via non-blocking IO + + Note: this method is not synchronized internally; you should + always hold the _lock before calling + + Returns: number of bytes + Raises: socket exception + """ + total_sent = 0 + while total_sent < len(data): + try: + sent_bytes = self._sock.send(data[total_sent:]) + total_sent += sent_bytes + except (SSLWantReadError, SSLWantWriteError): + break + except (ConnectionError, TimeoutError) as e: + if six.PY2 and e.errno == errno.EWOULDBLOCK: + break + raise + except BlockingIOError: + if six.PY3: + break + raise + return total_sent + def _send_bytes_blocking(self, data): self._sock.settimeout(self.config['request_timeout_ms'] / 1000) total_sent = 0 @@ -839,6 +866,7 @@ class BrokerConnection(object): self._protocol = KafkaProtocol( client_id=self.config['client_id'], api_version=self.config['api_version']) + self._send_buffer = b'' if error is None: error = Errors.Cancelled(str(self)) ifrs = list(self.in_flight_requests.items()) @@ -901,24 +929,60 @@ class BrokerConnection(object): return future def send_pending_requests(self): - """Can block on network if request is larger than send_buffer_bytes""" + """Attempts to send pending requests messages via blocking IO + If all requests have been sent, return True + Otherwise, if the socket is blocked and there are more bytes to send, + return False. + """ try: with self._lock: if not self._can_send_recv(): - return Errors.NodeNotReadyError(str(self)) - # In the future we might manage an internal write buffer - # and send bytes asynchronously. For now, just block - # sending each request payload + return False data = self._protocol.send_bytes() total_bytes = self._send_bytes_blocking(data) + if self._sensors: self._sensors.bytes_sent.record(total_bytes) - return total_bytes + return True + except (ConnectionError, TimeoutError) as e: log.exception("Error sending request data to %s", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) - return error + return False + + def send_pending_requests_v2(self): + """Attempts to send pending requests messages via non-blocking IO + If all requests have been sent, return True + Otherwise, if the socket is blocked and there are more bytes to send, + return False. + """ + try: + with self._lock: + if not self._can_send_recv(): + return False + + # _protocol.send_bytes returns encoded requests to send + # we send them via _send_bytes() + # and hold leftover bytes in _send_buffer + if not self._send_buffer: + self._send_buffer = self._protocol.send_bytes() + + total_bytes = 0 + if self._send_buffer: + total_bytes = self._send_bytes(self._send_buffer) + self._send_buffer = self._send_buffer[total_bytes:] + + if self._sensors: + self._sensors.bytes_sent.record(total_bytes) + # Return True iff send buffer is empty + return len(self._send_buffer) == 0 + + except (ConnectionError, TimeoutError, Exception) as e: + log.exception("Error sending request data to %s", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return False def can_send_more(self): """Return True unless there are max_in_flight_requests_per_connection.""" @@ -979,7 +1043,7 @@ class BrokerConnection(object): else: recvd.append(data) - except SSLWantReadError: + except (SSLWantReadError, SSLWantWriteError): break except (ConnectionError, TimeoutError) as e: if six.PY2 and e.errno == errno.EWOULDBLOCK: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 77b0b96..231fc8a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -674,11 +674,15 @@ class KafkaConsumer(six.Iterator): # responses to enable pipelining while the user is handling the # fetched records. if not partial: - self._fetcher.send_fetches() + futures = self._fetcher.send_fetches() + if len(futures): + self._client.poll(timeout_ms=0) return records # Send any new fetches (won't resend pending fetches) - self._fetcher.send_fetches() + futures = self._fetcher.send_fetches() + if len(futures): + self._client.poll(timeout_ms=0) timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll() * 1000) self._client.poll(timeout_ms=timeout_ms) diff --git a/test/test_client_async.py b/test/test_client_async.py index 8bb2028..74da66a 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -25,6 +25,7 @@ from kafka.structs import BrokerMetadata @pytest.fixture def cli(mocker, conn): client = KafkaClient(api_version=(0, 9)) + mocker.patch.object(client, '_selector') client.poll(future=client.cluster.request_update()) return client @@ -32,6 +33,7 @@ def cli(mocker, conn): def test_bootstrap(mocker, conn): conn.state = ConnectionStates.CONNECTED cli = KafkaClient(api_version=(0, 9)) + mocker.patch.object(cli, '_selector') future = cli.cluster.request_update() cli.poll(future=future) @@ -86,7 +88,7 @@ def test_maybe_connect(cli, conn): def test_conn_state_change(mocker, cli, conn): - sel = mocker.patch.object(cli, '_selector') + sel = cli._selector node_id = 0 cli._conns[node_id] = conn |