diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-28 21:55:34 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-28 21:55:34 -0700 |
commit | 91eacbbac918eca81ac9aa6d38e94b04d1a1a574 (patch) | |
tree | 048294f4ddd8275c569ad5c860cf7e01668e2891 | |
parent | 227a94663d6b0ab11c12236085f79b5b6ffd5568 (diff) | |
download | kafka-python-91eacbbac918eca81ac9aa6d38e94b04d1a1a574.tar.gz |
Do not block on writes to wake socket
-rw-r--r-- | kafka/client_async.py | 31 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 1 |
2 files changed, 19 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0d9e562..dbe23f5 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -160,7 +160,6 @@ class KafkaClient(object): 'bootstrap_topics_filter': set(), 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, - 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -210,10 +209,10 @@ class KafkaClient(object): self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._wake_r, self._wake_w = socket.socketpair() - self._wake_r.setblocking(False) - self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) + self._wake_r = None + self._wake_w = None self._wake_lock = threading.Lock() + self._init_wakeup_socketpair() self._lock = threading.RLock() @@ -222,7 +221,6 @@ class KafkaClient(object): # lock above. self._pending_completion = collections.deque() - self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._closed = False self._sensors = None @@ -238,6 +236,18 @@ class KafkaClient(object): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + def _init_wakeup_socketpair(self): + self._wake_r, self._wake_w = socket.socketpair() + self._wake_r.setblocking(False) + self._wake_w.setblocking(False) + log.debug("Wakeup socketpair (send): %s, bufsize: %s", + self._wake_w, + self._wake_w.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)) + log.debug("Wakeup socketpair (recv): %s, bufsize: %s", + self._wake_r, + self._wake_r.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)) + self._selector.register(self._wake_r, selectors.EVENT_READ) + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -881,18 +891,15 @@ class KafkaClient(object): def wakeup(self): with self._wake_lock: try: - self._wake_w.sendall(b'x') - except socket.timeout: - log.warning('Timeout to send to wakeup socket!') - raise Errors.KafkaTimeoutError() - except socket.error: - log.warning('Unable to send to wakeup socket!') + self._wake_w.send(b'x') + except socket.error as e: + log.warning('Unable to send to wakeup socket! (%s)', e) def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread while True: try: - self._wake_r.recv(1024) + self._wake_r.recv(4096) except socket.error: break diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2a306e0..f5a07ef 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -377,7 +377,6 @@ class KafkaProducer(object): self._metrics = Metrics(metric_config, reporters) client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - wakeup_timeout_ms=self.config['max_block_ms'], **self.config) # Get auto-discovered version from client if necessary |