summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWang <taptube@gmail.com>2022-02-03 07:59:08 +0800
committerGitHub <noreply@github.com>2022-02-02 18:59:08 -0500
commit4042a8505cdac94dba5718b6a89f82d478fef0d6 (patch)
tree18cd3b7f656979907d2513dc1e6fc80e994603cd
parentf585d605eea0a37a08aae95a8cc259b80da2ecf0 (diff)
downloadkazoo-4042a8505cdac94dba5718b6a89f82d478fef0d6.tar.gz
fix(core): use selectors to poll connections instead of raw select in threading,gevent,eventlet (#656)
Co-authored-by: lawrentwang <lawrentwang@tencent.com> Solve the select limitation on a maximum file handler value and dynamic choose the best poller in the system.
-rw-r--r--kazoo/handlers/eventlet.py8
-rw-r--r--kazoo/handlers/gevent.py9
-rw-r--r--kazoo/handlers/threading.py87
-rw-r--r--kazoo/handlers/utils.py89
-rw-r--r--kazoo/tests/test_eventlet_handler.py22
-rw-r--r--kazoo/tests/test_gevent_handler.py22
-rw-r--r--kazoo/tests/test_selectors_select.py91
-rw-r--r--kazoo/tests/test_threading_handler.py11
-rw-r--r--requirements.txt1
-rw-r--r--setup.py3
10 files changed, 245 insertions, 98 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py
index c13f886..3f67ccd 100644
--- a/kazoo/handlers/eventlet.py
+++ b/kazoo/handlers/eventlet.py
@@ -5,15 +5,15 @@ import contextlib
import logging
import eventlet
-from eventlet.green import select as green_select
from eventlet.green import socket as green_socket
from eventlet.green import time as green_time
from eventlet.green import threading as green_threading
+from eventlet.green import selectors as green_selectors
from eventlet import queue as green_queue
from kazoo.handlers import utils
import kazoo.python2atexit as python2atexit
-
+from kazoo.handlers.utils import selector_select
LOG = logging.getLogger(__name__)
@@ -41,6 +41,7 @@ class TimeoutError(Exception):
class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""
+
def __init__(self, handler):
super(AsyncResult, self).__init__(handler,
green_threading.Condition,
@@ -164,7 +165,8 @@ class SequentialEventletHandler(object):
def select(self, *args, **kwargs):
with _yield_before_after():
- return green_select.select(*args, **kwargs)
+ return selector_select(*args, selectors_module=green_selectors,
+ **kwargs)
def async_result(self):
return AsyncResult(self)
diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py
index 96ee765..7f2948e 100644
--- a/kazoo/handlers/gevent.py
+++ b/kazoo/handlers/gevent.py
@@ -9,6 +9,10 @@ import gevent.event
import gevent.queue
import gevent.select
import gevent.thread
+import gevent.selectors
+
+from kazoo.handlers.utils import selector_select
+
try:
from gevent.lock import Semaphore, RLock
except ImportError:
@@ -17,7 +21,6 @@ except ImportError:
from kazoo.handlers import utils
from kazoo import python2atexit
-
_using_libevent = gevent.__version__.startswith('0.')
log = logging.getLogger(__name__)
@@ -84,6 +87,7 @@ class SequentialGeventHandler(object):
del func # release before possible idle
except self.queue_empty:
continue
+
return gevent.spawn(greenlet_worker)
def start(self):
@@ -122,7 +126,8 @@ class SequentialGeventHandler(object):
python2atexit.unregister(self.stop)
def select(self, *args, **kwargs):
- return gevent.select.select(*args, **kwargs)
+ return selector_select(*args, selectors_module=gevent.selectors,
+ **kwargs)
def socket(self, *args, **kwargs):
return utils.create_tcp_socket(socket)
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index 2192523..2389f33 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -12,11 +12,7 @@ environments that use threads.
"""
from __future__ import absolute_import
-from collections import defaultdict
-import errno
-from itertools import chain
import logging
-import select
import socket
import threading
import time
@@ -25,20 +21,18 @@ import six
import kazoo.python2atexit as python2atexit
from kazoo.handlers import utils
+from kazoo.handlers.utils import selector_select
try:
import Queue
except ImportError: # pragma: nocover
import queue as Queue
-
# sentinel objects
_STOP = object()
log = logging.getLogger(__name__)
-_HAS_EPOLL = hasattr(select, "epoll")
-
def _to_fileno(obj):
if isinstance(obj, six.integer_types):
@@ -65,6 +59,7 @@ class KazooTimeoutError(Exception):
class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""
+
def __init__(self, handler):
super(AsyncResult, self).__init__(handler,
threading.Condition,
@@ -133,6 +128,7 @@ class SequentialThreadingHandler(object):
del func # release before possible idle
except self.queue_empty:
continue
+
t = self.spawn(_thread_worker)
return t
@@ -173,82 +169,7 @@ class SequentialThreadingHandler(object):
python2atexit.unregister(self.stop)
def select(self, *args, **kwargs):
- # if we have epoll, and select is not expected to work
- # use an epoll-based "select". Otherwise don't touch
- # anything to minimize changes
- if _HAS_EPOLL:
- # if the highest fd we've seen is > 1023
- if max(map(_to_fileno, chain.from_iterable(args[:3]))) > 1023:
- return self._epoll_select(*args, **kwargs)
- return self._select(*args, **kwargs)
-
- def _select(self, *args, **kwargs):
- timeout = kwargs.pop('timeout', None)
- # either the time to give up, or None
- end = (time.time() + timeout) if timeout else None
- while end is None or time.time() < end:
- if end is not None:
- # make a list, since tuples aren't mutable
- args = list(args)
-
- # set the timeout to the remaining time
- args[3] = end - time.time()
- try:
- return select.select(*args, **kwargs)
- except select.error as ex:
- # if the system call was interrupted, we'll retry until timeout
- # in Python 3, system call interruptions are a native exception
- # in Python 2, they are not
- errnum = ex.errno if isinstance(ex, OSError) else ex[0]
- if errnum == errno.EINTR:
- continue
- raise
- # if we hit our timeout, lets return as a timeout
- return ([], [], [])
-
- def _epoll_select(self, rlist, wlist, xlist, timeout=None):
- """epoll-based drop-in replacement for select to overcome select
- limitation on a maximum filehandle value
- """
- if timeout is None:
- timeout = -1
- eventmasks = defaultdict(int)
- rfd2obj = defaultdict(list)
- wfd2obj = defaultdict(list)
- xfd2obj = defaultdict(list)
- read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case
-
- def store_evmasks(obj_list, evmask, fd2obj):
- for obj in obj_list:
- fileno = _to_fileno(obj)
- eventmasks[fileno] |= evmask
- fd2obj[fileno].append(obj)
-
- store_evmasks(rlist, read_evmask, rfd2obj)
- store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
- store_evmasks(xlist, select.EPOLLERR, xfd2obj)
-
- poller = select.epoll()
-
- for fileno in eventmasks:
- poller.register(fileno, eventmasks[fileno])
-
- try:
- events = poller.poll(timeout)
- revents = []
- wevents = []
- xevents = []
- for fileno, event in events:
- if event & read_evmask:
- revents += rfd2obj.get(fileno, [])
- if event & select.EPOLLOUT:
- wevents += wfd2obj.get(fileno, [])
- if event & select.EPOLLERR:
- xevents += xfd2obj.get(fileno, [])
- finally:
- poller.close()
-
- return revents, wevents, xevents
+ return selector_select(*args, **kwargs)
def socket(self):
return utils.create_tcp_socket(socket)
diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py
index fa561fe..9647a24 100644
--- a/kazoo/handlers/utils.py
+++ b/kazoo/handlers/utils.py
@@ -2,11 +2,21 @@
import errno
import functools
+import os
import select
import ssl
import socket
import time
+from collections import defaultdict
+
+import six
+
+if six.PY34:
+ import selectors
+else:
+ import selectors2 as selectors
+
HAS_FNCTL = True
try:
import fcntl
@@ -19,6 +29,7 @@ _NONE = object()
class AsyncResult(object):
"""A one-time event that stores a value or an exception"""
+
def __init__(self, handler, condition_factory, timeout_factory):
self._handler = handler
self._exception = _NONE
@@ -126,6 +137,7 @@ class AsyncResult(object):
else:
functools.partial(callback, self)()
+
def _set_fd_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
@@ -272,6 +284,7 @@ def capture_exceptions(async_result):
:param async_result: An async result implementing :class:`IAsyncResult`
"""
+
def capture(function):
@functools.wraps(function)
def captured_function(*args, **kwargs):
@@ -279,7 +292,9 @@ def capture_exceptions(async_result):
return function(*args, **kwargs)
except Exception as exc:
async_result.set_exception(exc)
+
return captured_function
+
return capture
@@ -291,6 +306,7 @@ def wrap(async_result):
:param async_result: An async result implementing :class:`IAsyncResult`
"""
+
def capture(function):
@capture_exceptions(async_result)
def captured_function(*args, **kwargs):
@@ -298,5 +314,78 @@ def wrap(async_result):
if value is not None:
async_result.set(value)
return value
+
return captured_function
+
return capture
+
+
+def fileobj_to_fd(fileobj):
+ """Return a file descriptor from a file object.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+
+ Returns:
+ corresponding file descriptor
+
+ Raises:
+ TypeError if the object is invalid
+ """
+ if isinstance(fileobj, int):
+ fd = fileobj
+ else:
+ try:
+ fd = int(fileobj.fileno())
+ except (AttributeError, TypeError, ValueError):
+ raise TypeError("Invalid file object: "
+ "{!r}".format(fileobj))
+ if fd < 0:
+ raise TypeError("Invalid file descriptor: {}".format(fd))
+ os.fstat(fd)
+ return fd
+
+
+def selector_select(rlist, wlist, xlist, timeout=None,
+ selectors_module=selectors):
+ """Selector-based drop-in replacement for select to overcome select
+ limitation on a maximum filehandle value.
+
+ Need backport selectors2 package in python 2.
+ """
+ if timeout is not None:
+ if not (isinstance(timeout, six.integer_types) or isinstance(
+ timeout, float)):
+ raise TypeError('timeout must be a number')
+ if timeout < 0:
+ raise ValueError('timeout must be non-negative')
+
+ events_mapping = {selectors_module.EVENT_READ: rlist,
+ selectors_module.EVENT_WRITE: wlist}
+ fd_events = defaultdict(int)
+ fd_fileobjs = defaultdict(list)
+
+ for event, fileobjs in events_mapping.items():
+ for fileobj in fileobjs:
+ fd = fileobj_to_fd(fileobj)
+ fd_events[fd] |= event
+ fd_fileobjs[fd].append(fileobj)
+
+ selector = selectors_module.DefaultSelector()
+ for fd, events in fd_events.items():
+ selector.register(fd, events)
+
+ revents, wevents, xevents = [], [], []
+ try:
+ ready = selector.select(timeout)
+ finally:
+ selector.close()
+
+ for info in ready:
+ k, events = info
+ if events & selectors.EVENT_READ:
+ revents.extend(fd_fileobjs[k.fd])
+ elif events & selectors.EVENT_WRITE:
+ wevents.extend(fd_fileobjs[k.fd])
+
+ return revents, wevents, xevents
diff --git a/kazoo/tests/test_eventlet_handler.py b/kazoo/tests/test_eventlet_handler.py
index 69af400..43ec4f9 100644
--- a/kazoo/tests/test_eventlet_handler.py
+++ b/kazoo/tests/test_eventlet_handler.py
@@ -130,6 +130,28 @@ class TestEventletHandler(unittest.TestCase):
with pytest.raises(IOError):
r.get()
+ def test_huge_file_descriptor(self):
+ import resource
+ from eventlet.green import socket
+ from kazoo.handlers.utils import create_tcp_socket
+
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
+ except (ValueError, resource.error):
+ self.skipTest('couldnt raise fd limit high enough')
+ fd = 0
+ socks = []
+ while fd < 4000:
+ sock = create_tcp_socket(socket)
+ fd = sock.fileno()
+ socks.append(sock)
+ with start_stop_one() as h:
+ h.start()
+ h.select(socks, [], [], 0)
+ h.stop()
+ for sock in socks:
+ sock.close()
+
class TestEventletClient(test_client.TestClient):
def setUp(self):
diff --git a/kazoo/tests/test_gevent_handler.py b/kazoo/tests/test_gevent_handler.py
index 9bf1029..5515114 100644
--- a/kazoo/tests/test_gevent_handler.py
+++ b/kazoo/tests/test_gevent_handler.py
@@ -142,6 +142,28 @@ class TestBasicGeventClient(KazooTestCase):
ev.wait()
client.stop()
+ def test_huge_file_descriptor(self):
+ import resource
+ from gevent import socket
+ from kazoo.handlers.utils import create_tcp_socket
+
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
+ except (ValueError, resource.error):
+ self.skipTest('couldnt raise fd limit high enough')
+ fd = 0
+ socks = []
+ while fd < 4000:
+ sock = create_tcp_socket(socket)
+ fd = sock.fileno()
+ socks.append(sock)
+ h = self._makeOne()
+ h.start()
+ h.select(socks, [], [], 0)
+ h.stop()
+ for sock in socks:
+ sock.close()
+
class TestGeventClient(test_client.TestClient):
def setUp(self):
diff --git a/kazoo/tests/test_selectors_select.py b/kazoo/tests/test_selectors_select.py
new file mode 100644
index 0000000..b5cdd33
--- /dev/null
+++ b/kazoo/tests/test_selectors_select.py
@@ -0,0 +1,91 @@
+"""
+The official python select function test case copied from python source
+ to test the selector_select function.
+"""
+
+import errno
+import os
+import socket
+import sys
+import unittest
+from test import support
+from kazoo.handlers.utils import selector_select
+
+select = selector_select
+
+
+@unittest.skipIf((sys.platform[:3] == 'win'),
+ "can't easily test on this system")
+class SelectTestCase(unittest.TestCase):
+ class Nope:
+ pass
+
+ class Almost:
+ def fileno(self):
+ return 'fileno'
+
+ def test_error_conditions(self):
+ self.assertRaises(TypeError, select, 1, 2, 3)
+ self.assertRaises(TypeError, select, [self.Nope()], [], [])
+ self.assertRaises(TypeError, select, [self.Almost()], [], [])
+ self.assertRaises(TypeError, select, [], [], [], "not a number")
+ self.assertRaises(ValueError, select, [], [], [], -1)
+
+ # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
+ @unittest.skipIf(sys.platform.startswith('freebsd'),
+ 'skip because of a FreeBSD bug: kern/155606')
+ def test_errno(self):
+ with open(__file__, 'rb') as fp:
+ fd = fp.fileno()
+ fp.close()
+ try:
+ select([fd], [], [], 0)
+ except OSError as err:
+ self.assertEqual(err.errno, errno.EBADF)
+ else:
+ self.fail("exception not raised")
+
+ def test_returned_list_identity(self):
+ # See issue #8329
+ r, w, x = select([], [], [], 1)
+ self.assertIsNot(r, w)
+ self.assertIsNot(r, x)
+ self.assertIsNot(w, x)
+
+ def test_select(self):
+ cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
+ p = os.popen(cmd, 'r')
+ for tout in (0, 1, 2, 4, 8, 16) + (None,) * 10:
+ if support.verbose:
+ print('timeout =', tout)
+ rfd, wfd, xfd = select([p], [], [], tout)
+ if (rfd, wfd, xfd) == ([], [], []):
+ continue
+ if (rfd, wfd, xfd) == ([p], [], []):
+ line = p.readline()
+ if support.verbose:
+ print(repr(line))
+ if not line:
+ if support.verbose:
+ print('EOF')
+ break
+ continue
+ self.fail('Unexpected return values from select():', rfd, wfd, xfd)
+ p.close()
+
+ # Issue 16230: Crash on select resized list
+ def test_select_mutated(self):
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ a = []
+
+ class F:
+ def fileno(self):
+ del a[-1]
+ return s.fileno()
+
+ a[:] = [F()] * 10
+ self.assertEqual(select([], a, []), ([], a[:5], []))
+
+
+def tearDownModule():
+ support.reap_children()
diff --git a/kazoo/tests/test_threading_handler.py b/kazoo/tests/test_threading_handler.py
index 4376957..dbdccd7 100644
--- a/kazoo/tests/test_threading_handler.py
+++ b/kazoo/tests/test_threading_handler.py
@@ -45,10 +45,6 @@ class TestThreadingHandler(unittest.TestCase):
assert h._running is False
def test_huge_file_descriptor(self):
- from kazoo.handlers.threading import _HAS_EPOLL
-
- if not _HAS_EPOLL:
- self.skipTest('only run on systems with epoll()')
import resource
import socket
from kazoo.handlers.utils import create_tcp_socket
@@ -65,11 +61,10 @@ class TestThreadingHandler(unittest.TestCase):
socks.append(sock)
h = self._makeOne()
h.start()
- h.select(socks, [], [])
- with pytest.raises(ValueError):
- h._select(socks, [], [])
- h._epoll_select(socks, [], [])
+ h.select(socks, [], [], 0)
h.stop()
+ for sock in socks:
+ sock.close()
class TestThreadingAsync(unittest.TestCase):
diff --git a/requirements.txt b/requirements.txt
index ffe2fce..83adeb7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,2 @@
six
+selectors2>=2.0.2; python_version < "3.4.0" \ No newline at end of file
diff --git a/setup.py b/setup.py
index 15a7a13..c9f9dc2 100644
--- a/setup.py
+++ b/setup.py
@@ -3,7 +3,6 @@ import re
from setuptools import setup, find_packages
import sys
-
here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, 'README.md')) as f:
README = f.read()
@@ -16,7 +15,7 @@ with open(os.path.join(here, 'kazoo', 'version.py')) as f:
PYPY = getattr(sys, 'pypy_version_info', False) and True or False
-install_requires = ['six']
+install_requires = ['six', 'selectors2>=2.0.2; python_version < "3.4.0"']
tests_require = install_requires + [
'mock',