summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-30 19:29:30 -0700
committerGitHub <noreply@github.com>2019-03-30 19:29:30 -0700
commitb1effa24aca3a6bcf2268354caae12ee82d6b36d (patch)
treea2e5c80a0c85468ba2aff9b1fcc6e974a96fed53
parentde6e9d3cc31db2d513e8d8f9dde4d77d400325ce (diff)
downloadkafka-python-b1effa24aca3a6bcf2268354caae12ee82d6b36d.tar.gz
Dont wakeup during maybe_refresh_metadata -- it is only called by poll() (#1769)
-rw-r--r--kafka/client_async.py8
-rw-r--r--test/test_client_async.py4
2 files changed, 6 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0d9e562..b6adb77 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -517,7 +517,7 @@ class KafkaClient(object):
Future: resolves to Response struct or Error
"""
if not self._can_send_request(node_id):
- self.maybe_connect(node_id)
+ self.maybe_connect(node_id, wakeup=wakeup)
return Future().failure(Errors.NodeNotReadyError(node_id))
# conn.send will queue the request internally
@@ -761,7 +761,7 @@ class KafkaClient(object):
return self.cluster.request_update()
# This method should be locked when running multi-threaded
- def _maybe_refresh_metadata(self):
+ def _maybe_refresh_metadata(self, wakeup=False):
"""Send a metadata request if needed.
Returns:
@@ -792,7 +792,7 @@ class KafkaClient(object):
api_version = 0 if self.config['api_version'] < (0, 10) else 1
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
- future = self.send(node_id, request)
+ future = self.send(node_id, request, wakeup=wakeup)
future.add_callback(self.cluster.update_metadata)
future.add_errback(self.cluster.failed_update)
@@ -809,7 +809,7 @@ class KafkaClient(object):
if self._connecting:
return self.config['reconnect_backoff_ms']
- if self.maybe_connect(node_id):
+ if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return self.config['reconnect_backoff_ms']
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 3588423..246e36c 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -332,7 +332,7 @@ def test_maybe_refresh_metadata_update(mocker, client):
client._poll.assert_called_with(9999.999) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
- send.assert_called_once_with('foobar', request)
+ send.assert_called_once_with('foobar', request, wakeup=False)
def test_maybe_refresh_metadata_cant_send(mocker, client):
@@ -348,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
- client.maybe_connect.assert_called_once_with('foobar')
+ client.maybe_connect.assert_called_once_with('foobar', wakeup=False)
# poll while connecting should not attempt a new connection
client._connecting.add('foobar')