summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-04-01 19:38:48 -0700
committerGitHub <noreply@github.com>2019-04-01 19:38:48 -0700
commit51313d792a24059d003f5647ec531cfd9d62d7ab (patch)
tree2591e9ebfe8b5c23112074d08da647f2b7dffa0b
parentc02df4bcc6ee6920db1be259f44a8f958bb36791 (diff)
downloadkafka-python-51313d792a24059d003f5647ec531cfd9d62d7ab.tar.gz
Dont treat popped conn.close() as failure in state change callback (#1773)
-rw-r--r--kafka/client_async.py13
-rw-r--r--test/test_client_async.py5
2 files changed, 13 insertions, 5 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index dc685f9..a86ab55 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,7 +314,12 @@ class KafkaClient(object):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)
- if self.cluster.is_bootstrap(node_id):
+ # If the connection has already by popped from self._conns,
+ # we can assume the disconnect was intentional and not a failure
+ if node_id not in self._conns:
+ pass
+
+ elif self.cluster.is_bootstrap(node_id):
self._bootstrap_fails += 1
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -419,10 +424,12 @@ class KafkaClient(object):
with self._lock:
if node_id is None:
self._close()
- for conn in self._conns.values():
+ conns = list(self._conns.values())
+ self._conns.clear()
+ for conn in conns:
conn.close()
elif node_id in self._conns:
- self._conns[node_id].close()
+ self._conns.pop(node_id).close()
else:
log.warning("Node %s not found in current connection list; skipping", node_id)
return
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 246e36c..0951cb4 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -93,6 +93,7 @@ def test_conn_state_change(mocker, cli, conn):
sel = mocker.patch.object(cli, '_selector')
node_id = 0
+ cli._conns[node_id] = conn
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
assert node_id in cli._connecting
@@ -180,8 +181,8 @@ def test_close(mocker, cli, conn):
# All node close
cli._maybe_connect(1)
cli.close()
- # +3 close: node 0, node 1, node bootstrap
- call_count += 3
+ # +2 close: node 1, node bootstrap (node 0 already closed)
+ call_count += 2
assert conn.close.call_count == call_count