summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-07 16:23:24 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-08 09:23:57 -0700
commit9b71b0da624ebcd6e3e06b4f325e0ddffd1c8f47 (patch)
tree3448d82d16533ea516cdbd3904e2c96da1a897f3
parented053660a4fc1341402e6ecd2c5739c252503ef2 (diff)
downloadkafka-python-9b71b0da624ebcd6e3e06b4f325e0ddffd1c8f47.tar.gz
Make _wake_r socket non-blocking; drop select from _clear_wake_fd
-rw-r--r--kafka/client_async.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e921fa4..ca51987 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -98,6 +98,7 @@ class KafkaClient(object):
self._bootstrap_fails = 0
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
+ self._wake_r.setblocking(False)
def __del__(self):
self._wake_r.close()
@@ -682,10 +683,10 @@ class KafkaClient(object):
def _clear_wake_fd(self):
while True:
- fds, _, _ = select.select([self._wake_r], [], [], 0)
- if not fds:
+ try:
+ self._wake_r.recv(1)
+ except:
break
- self._wake_r.recv(1)
class DelayedTaskQueue(object):