summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-28 21:55:34 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-28 21:55:34 -0700
commit91eacbbac918eca81ac9aa6d38e94b04d1a1a574 (patch)
tree048294f4ddd8275c569ad5c860cf7e01668e2891
parent227a94663d6b0ab11c12236085f79b5b6ffd5568 (diff)
downloadkafka-python-91eacbbac918eca81ac9aa6d38e94b04d1a1a574.tar.gz
Do not block on writes to wake socket
-rw-r--r--kafka/client_async.py31
-rw-r--r--kafka/producer/kafka.py1
2 files changed, 19 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0d9e562..dbe23f5 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -160,7 +160,6 @@ class KafkaClient(object):
'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 30000,
- 'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
@@ -210,10 +209,10 @@ class KafkaClient(object):
self._refresh_on_disconnects = True
self._last_bootstrap = 0
self._bootstrap_fails = 0
- self._wake_r, self._wake_w = socket.socketpair()
- self._wake_r.setblocking(False)
- self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
+ self._wake_r = None
+ self._wake_w = None
self._wake_lock = threading.Lock()
+ self._init_wakeup_socketpair()
self._lock = threading.RLock()
@@ -222,7 +221,6 @@ class KafkaClient(object):
# lock above.
self._pending_completion = collections.deque()
- self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False
self._sensors = None
@@ -238,6 +236,18 @@ class KafkaClient(object):
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)
+ def _init_wakeup_socketpair(self):
+ self._wake_r, self._wake_w = socket.socketpair()
+ self._wake_r.setblocking(False)
+ self._wake_w.setblocking(False)
+ log.debug("Wakeup socketpair (send): %s, bufsize: %s",
+ self._wake_w,
+ self._wake_w.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF))
+ log.debug("Wakeup socketpair (recv): %s, bufsize: %s",
+ self._wake_r,
+ self._wake_r.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF))
+ self._selector.register(self._wake_r, selectors.EVENT_READ)
+
def _can_bootstrap(self):
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
backoff_factor = 2 ** effective_failures
@@ -881,18 +891,15 @@ class KafkaClient(object):
def wakeup(self):
with self._wake_lock:
try:
- self._wake_w.sendall(b'x')
- except socket.timeout:
- log.warning('Timeout to send to wakeup socket!')
- raise Errors.KafkaTimeoutError()
- except socket.error:
- log.warning('Unable to send to wakeup socket!')
+ self._wake_w.send(b'x')
+ except socket.error as e:
+ log.warning('Unable to send to wakeup socket! (%s)', e)
def _clear_wake_fd(self):
# reading from wake socket should only happen in a single thread
while True:
try:
- self._wake_r.recv(1024)
+ self._wake_r.recv(4096)
except socket.error:
break
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 2a306e0..f5a07ef 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -377,7 +377,6 @@ class KafkaProducer(object):
self._metrics = Metrics(metric_config, reporters)
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
- wakeup_timeout_ms=self.config['max_block_ms'],
**self.config)
# Get auto-discovered version from client if necessary