summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-28 19:30:09 -0700
committerGitHub <noreply@github.com>2019-09-28 19:30:09 -0700
commit89bf6a6ee51e8a54f909eae4785d04e485b91198 (patch)
treedcecac3f96738699441226c4f4a38f2cedc3632e
parent5d1d42429e07f4aa2959b488ea76efb6d0bafc79 (diff)
downloadkafka-python-89bf6a6ee51e8a54f909eae4785d04e485b91198.tar.gz
Rely on socket selector to detect completed connection attempts (#1909)
-rw-r--r--kafka/client_async.py10
-rw-r--r--kafka/conn.py10
-rw-r--r--kafka/producer/sender.py2
-rw-r--r--test/test_client_async.py2
-rw-r--r--test/test_conn.py2
5 files changed, 15 insertions, 11 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 96c0647..ac2d364 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -267,9 +267,9 @@ class KafkaClient(object):
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
- self._selector.register(sock, selectors.EVENT_WRITE)
+ self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
- self._selector.modify(sock, selectors.EVENT_WRITE)
+ self._selector.modify(sock, selectors.EVENT_WRITE, conn)
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
@@ -623,7 +623,11 @@ class KafkaClient(object):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
- elif not (events & selectors.EVENT_READ):
+ if events & selectors.EVENT_WRITE:
+ conn = key.data
+ if conn.connecting():
+ conn.connect()
+ if not (events & selectors.EVENT_READ):
continue
conn = key.data
processed.add(conn)
diff --git a/kafka/conn.py b/kafka/conn.py
index 99466d9..5ea5436 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -769,16 +769,16 @@ class BrokerConnection(object):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
- the reconnect backoff time. When connecting, returns 0 to allow
- non-blocking connect to finish. When connected, returns a very large
- number to handle slow/stalled connections.
+ the reconnect backoff time. When connecting or connected, returns a very
+ large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
- elif self.connecting():
- return 0
else:
+ # When connecting or connected, we should be able to delay
+ # indefinitely since other events (connection or data acked) will
+ # cause a wakeup once data can be sent.
return float('inf')
def connected(self):
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 064fee4..88ec07c 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -157,7 +157,7 @@ class Sender(threading.Thread):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
- self._client.poll(poll_timeout_ms)
+ self._client.poll(timeout_ms=poll_timeout_ms)
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 82d1467..8bb2028 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -94,7 +94,7 @@ def test_conn_state_change(mocker, cli, conn):
sock = conn._sock
cli._conn_state_change(node_id, sock, conn)
assert node_id in cli._connecting
- sel.register.assert_called_with(sock, selectors.EVENT_WRITE)
+ sel.register.assert_called_with(sock, selectors.EVENT_WRITE, conn)
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, sock, conn)
diff --git a/test/test_conn.py b/test/test_conn.py
index 6412cb6..966f7b3 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -85,7 +85,7 @@ def test_connection_delay(conn):
conn.last_attempt = 1000
assert conn.connection_delay() == conn.config['reconnect_backoff_ms']
conn.state = ConnectionStates.CONNECTING
- assert conn.connection_delay() == 0
+ assert conn.connection_delay() == float('inf')
conn.state = ConnectionStates.CONNECTED
assert conn.connection_delay() == float('inf')