From 0cbb7be7f349913718fe6b0be751320fc2b49e41 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 22 Mar 2019 09:05:58 -0700 Subject: Send pending requests before waiting for responses --- kafka/client_async.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index fa150db..dd48673 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -592,6 +592,10 @@ class KafkaClient(object): # locked section of poll(), there is no additional lock acquisition here processed = set() + # Send pending requests first, before polling for responses + for conn in six.itervalues(self._conns): + conn.send_pending_requests() + start_select = time.time() ready = self._selector.select(timeout) end_select = time.time() @@ -644,8 +648,6 @@ class KafkaClient(object): conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % conn.config['request_timeout_ms'])) - else: - conn.send_pending_requests() if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) -- cgit v1.2.1