summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPedro Calleja <pecalleja@gmail.com>2020-09-16 23:57:01 -0500
committerGitHub <noreply@github.com>2020-09-16 21:57:01 -0700
commite485a6ee2a1f05f2333e22b0fbdbafb12badaf3f (patch)
treeaaf10b9bce5e1823d692f3e29d04ac1652960fa1
parentb32f369e5a4b9914e92b4818b4d1bc6152264ec5 (diff)
downloadkafka-python-e485a6ee2a1f05f2333e22b0fbdbafb12badaf3f.tar.gz
Fix initialization order in KafkaClient (#2119)
Fix initialization order in KafkaClient
-rw-r--r--kafka/client_async.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index caa88cf..58f22d4 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -201,10 +201,15 @@ class KafkaClient(object):
if key in configs:
self.config[key] = configs[key]
+ # these properties need to be set on top of the initialization pipeline
+ # because they are used when __del__ method is called
+ self._closed = False
+ self._wake_r, self._wake_w = socket.socketpair()
+ self._selector = self.config['selector']()
+
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
- self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
@@ -212,7 +217,6 @@ class KafkaClient(object):
self._refresh_on_disconnects = True
self._last_bootstrap = 0
self._bootstrap_fails = 0
- self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
self._wake_lock = threading.Lock()
@@ -226,7 +230,6 @@ class KafkaClient(object):
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
- self._closed = False
self._sensors = None
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],