summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-08 08:01:48 -0800
committerGitHub <noreply@github.com>2019-03-08 08:01:48 -0800
commit8c0792581d8a38822c01b40f5d3926c659b0c439 (patch)
tree8b39433a185984b71984a9301d3ed991bdf0fbe3
parent7a99013668b798aaa0acffcf382a7e48e7bd41c1 (diff)
downloadkafka-python-8c0792581d8a38822c01b40f5d3926c659b0c439.tar.gz
Do network connections and writes in KafkaClient.poll() (#1729)
* Add BrokerConnection.send_pending_requests to support async network sends * Send network requests during KafkaClient.poll() rather than in KafkaClient.send() * Dont acquire lock during KafkaClient.send if node is connected / ready * Move all network connection IO into KafkaClient.poll()
-rw-r--r--kafka/client_async.py59
-rw-r--r--kafka/conn.py49
-rw-r--r--kafka/consumer/group.py13
-rw-r--r--kafka/coordinator/base.py4
-rw-r--r--test/fixtures.py7
-rw-r--r--test/test_client_async.py9
6 files changed, 84 insertions, 57 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e2bdda9..d608e6a 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -304,7 +304,10 @@ class KafkaClient(object):
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
+ try:
self._selector.register(conn._sock, selectors.EVENT_WRITE)
+ except KeyError:
+ self._selector.modify(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
@@ -312,10 +315,10 @@ class KafkaClient(object):
self._connecting.remove(node_id)
try:
- self._selector.unregister(conn._sock)
+ self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
except KeyError:
- pass
- self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+ self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+
if self._sensors:
self._sensors.connection_created.record()
@@ -336,6 +339,7 @@ class KafkaClient(object):
self._selector.unregister(conn._sock)
except KeyError:
pass
+
if self._sensors:
self._sensors.connection_closed.record()
@@ -348,6 +352,17 @@ class KafkaClient(object):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
+ def maybe_connect(self, node_id):
+ """Queues a node for asynchronous connection during the next .poll()"""
+ if self._can_connect(node_id):
+ self._connecting.add(node_id)
+ # Wakeup signal is useful in case another thread is
+ # blocked waiting for incoming network traffic while holding
+ # the client lock in poll().
+ self.wakeup()
+ return True
+ return False
+
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
@@ -397,7 +412,7 @@ class KafkaClient(object):
Returns:
bool: True if we are ready to send to the given node
"""
- self._maybe_connect(node_id)
+ self.maybe_connect(node_id)
return self.is_ready(node_id, metadata_priority=metadata_priority)
def connected(self, node_id):
@@ -499,14 +514,15 @@ class KafkaClient(object):
return True
def _can_send_request(self, node_id):
- with self._lock:
- if node_id not in self._conns:
- return False
- conn = self._conns[node_id]
- return conn.connected() and conn.can_send_more()
+ conn = self._conns.get(node_id)
+ if not conn:
+ return False
+ return conn.connected() and conn.can_send_more()
def send(self, node_id, request):
- """Send a request to a specific node.
+ """Send a request to a specific node. Bytes are placed on an
+ internal per-connection send-queue. Actual network I/O will be
+ triggered in a subsequent call to .poll()
Arguments:
node_id (int): destination node
@@ -518,11 +534,21 @@ class KafkaClient(object):
Returns:
Future: resolves to Response struct or Error
"""
- with self._lock:
- if not self._maybe_connect(node_id):
- return Future().failure(Errors.NodeNotReadyError(node_id))
+ if not self._can_send_request(node_id):
+ self.maybe_connect(node_id)
+ return Future().failure(Errors.NodeNotReadyError(node_id))
+
+ # conn.send will queue the request internally
+ # we will need to call send_pending_requests()
+ # to trigger network I/O
+ future = self._conns[node_id].send(request, blocking=False)
- return self._conns[node_id].send(request)
+ # Wakeup signal is useful in case another thread is
+ # blocked waiting for incoming network traffic while holding
+ # the client lock in poll().
+ self.wakeup()
+
+ return future
def poll(self, timeout_ms=None, future=None):
"""Try to read and write to sockets.
@@ -640,6 +666,8 @@ class KafkaClient(object):
conn.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
conn.config['request_timeout_ms']))
+ else:
+ conn.send_pending_requests()
if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
@@ -801,9 +829,8 @@ class KafkaClient(object):
# have such application level configuration, using request timeout instead.
return self.config['request_timeout_ms']
- if self._can_connect(node_id):
+ if self.maybe_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
- self._maybe_connect(node_id)
return self.config['reconnect_backoff_ms']
# connected but can't send more, OR connecting
diff --git a/kafka/conn.py b/kafka/conn.py
index 7dfc8bd..6b5aff9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -733,11 +733,8 @@ class BrokerConnection(object):
future.failure(error)
self.config['state_change_callback'](self)
- def send(self, request):
- """send request, return Future()
-
- Can block on network if request is larger than send_buffer_bytes
- """
+ def send(self, request, blocking=True):
+ """Queue request for async network send, return Future()"""
future = Future()
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
@@ -745,35 +742,49 @@ class BrokerConnection(object):
return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
- return self._send(request)
+ return self._send(request, blocking=blocking)
- def _send(self, request):
+ def _send(self, request, blocking=True):
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
future = Future()
correlation_id = self._protocol.send_request(request)
+
+ # Attempt to replicate behavior from prior to introduction of
+ # send_pending_requests() / async sends
+ if blocking:
+ error = self.send_pending_requests()
+ if isinstance(error, Exception):
+ future.failure(error)
+ return future
+
+ log.debug('%s Request %d: %s', self, correlation_id, request)
+ if request.expect_response():
+ sent_time = time.time()
+ ifr = (correlation_id, future, sent_time)
+ self.in_flight_requests.append(ifr)
+ else:
+ future.success(None)
+ return future
+
+ def send_pending_requests(self):
+ """Can block on network if request is larger than send_buffer_bytes"""
+ if self.state not in (ConnectionStates.AUTHENTICATING,
+ ConnectionStates.CONNECTED):
+ return Errors.NodeNotReadyError(str(self))
data = self._protocol.send_bytes()
try:
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
- sent_time = time.time()
total_bytes = self._send_bytes_blocking(data)
if self._sensors:
self._sensors.bytes_sent.record(total_bytes)
+ return total_bytes
except ConnectionError as e:
- log.exception("Error sending %s to %s", request, self)
+ log.exception("Error sending request data to %s", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
- return future.failure(error)
- log.debug('%s Request %d: %s', self, correlation_id, request)
-
- if request.expect_response():
- ifr = (correlation_id, future, sent_time)
- self.in_flight_requests.append(ifr)
- else:
- future.success(None)
-
- return future
+ return error
def can_send_more(self):
"""Return True unless there are max_in_flight_requests_per_connection."""
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8d2c65e..531c107 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1070,16 +1070,6 @@ class KafkaConsumer(six.Iterator):
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
- # Because the consumer client poll does not sleep unless blocking on
- # network IO, we need to explicitly sleep when we know we are idle
- # because we haven't been assigned any partitions to fetch / consume
- if self._use_consumer_group() and not self.assignment():
- sleep_time = max(timeout_at - time.time(), 0)
- if sleep_time > 0 and not self._client.in_flight_request_count():
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
# Short-circuit the fetch iterator if we are already timed out
# to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
@@ -1090,8 +1080,7 @@ class KafkaConsumer(six.Iterator):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
- if self._client.in_flight_request_count():
- self._client.poll(timeout_ms=0)
+ self._client.poll(timeout_ms=0)
# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 1435183..664e8d2 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -252,7 +252,7 @@ class BaseCoordinator(object):
if self.config['api_version'] < (0, 8, 2):
self.coordinator_id = self._client.least_loaded_node()
if self.coordinator_id is not None:
- self._client.ready(self.coordinator_id)
+ self._client.maybe_connect(self.coordinator_id)
continue
future = self.lookup_coordinator()
@@ -686,7 +686,7 @@ class BaseCoordinator(object):
self.coordinator_id = response.coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
- self._client.ready(self.coordinator_id)
+ self._client.maybe_connect(self.coordinator_id)
self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)
diff --git a/test/fixtures.py b/test/fixtures.py
index 34373e6..8b156e6 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -405,10 +405,11 @@ class KafkaFixture(Fixture):
retries = 10
while True:
node_id = self._client.least_loaded_node()
- for ready_retry in range(40):
- if self._client.ready(node_id, False):
+ for connect_retry in range(40):
+ self._client.maybe_connect(node_id)
+ if self._client.connected(node_id):
break
- time.sleep(.1)
+ self._client.poll(timeout_ms=100)
else:
raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 09781ac..1c8a50f 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn):
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
- sel.unregister.assert_called_with(conn._sock)
- sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
+ sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
@@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn):
def test_ready(mocker, cli, conn):
- maybe_connect = mocker.patch.object(cli, '_maybe_connect')
+ maybe_connect = mocker.patch.object(cli, 'maybe_connect')
node_id = 1
cli.ready(node_id)
maybe_connect.assert_called_with(node_id)
@@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
+ mocker.patch.object(client, 'maybe_connect', return_value=True)
now = time.time()
t = mocker.patch('time.time')
@@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
- client._can_connect.assert_called_once_with('foobar')
- client._maybe_connect.assert_called_once_with('foobar')
+ client.maybe_connect.assert_called_once_with('foobar')
# poll while connecting should not attempt a new connection
client._connecting.add('foobar')