summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 22:56:11 -0700
committerGitHub <noreply@github.com>2016-07-16 22:56:11 -0700
commit43bbdf1434615390800783fc8da56000cf9acd10 (patch)
treec60951de40069eb69078940fbf742117a5a10b73
parent5ab4d5c274112a4e2024dea415a0ec4b79009a28 (diff)
downloadkafka-python-43bbdf1434615390800783fc8da56000cf9acd10.tar.gz
Protect writes to wakeup socket with threading lock (#763 / #709)
-rw-r--r--kafka/client_async.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e064d51..2700069 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -6,6 +6,7 @@ import heapq
import itertools
import logging
import random
+import threading
# selectors in stdlib as of py3.4
try:
@@ -158,6 +159,7 @@ class KafkaClient(object):
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
+ self._wake_lock = threading.Lock()
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
@@ -758,10 +760,12 @@ class KafkaClient(object):
raise Errors.NoBrokersAvailable()
def wakeup(self):
- if self._wake_w.send(b'x') != 1:
- log.warning('Unable to send to wakeup socket!')
+ with self._wake_lock:
+ if self._wake_w.send(b'x') != 1:
+ log.warning('Unable to send to wakeup socket!')
def _clear_wake_fd(self):
+ # reading from wake socket should only happen in a single thread
while True:
try:
self._wake_r.recv(1024)