summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick White <patrick@patrickwhite.org>2017-06-01 11:30:51 -0700
committerPatrick White <patrick@patrickwhite.org>2017-06-04 13:45:27 -0700
commit267e61b4323bc13505e8933fa9b89d0591af3a69 (patch)
treec46e6bf0fee07aa118c1b1a8420156c67aef1113
parent95b2185c0741ed2616def1eefc2bbfd0d4d180ab (diff)
downloadkazoo-267e61b4323bc13505e8933fa9b89d0591af3a69.tar.gz
feat(core): use epoll when available to support fds > 1023
When epoll is available, and the highest fd in use is > 1023, route through epoll. Otherwise, use the existing select() behavior so by and large nothing changes. Closes #266, #171
-rw-r--r--kazoo/handlers/threading.py82
-rw-r--r--kazoo/tests/test_threading_handler.py25
2 files changed, 105 insertions, 2 deletions
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index 4b5e0fa..3743c88 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -18,6 +18,10 @@ import select
import socket
import threading
import time
+import six
+
+from collections import defaultdict
+from itertools import chain
import kazoo.python2atexit as python2atexit
@@ -33,6 +37,27 @@ _STOP = object()
log = logging.getLogger(__name__)
+_HAS_EPOLL = hasattr(select, "epoll")
+
+
+def _to_fileno(obj):
+ if isinstance(obj, six.integer_types):
+ fd = int(obj)
+ elif hasattr(obj, "fileno"):
+ fd = obj.fileno()
+ if not isinstance(fd, six.integer_types):
+ raise TypeError("fileno() returned a non-integer")
+ fd = int(fd)
+ else:
+ raise TypeError("argument must be an int, or have a fileno() method.")
+
+ if fd < 0:
+ raise ValueError(
+ "file descriptor cannot be a negative integer (%d)" % (fd,)
+ )
+
+ return fd
+
class KazooTimeoutError(Exception):
pass
@@ -143,8 +168,17 @@ class SequentialThreadingHandler(object):
python2atexit.unregister(self.stop)
def select(self, *args, **kwargs):
- # select() takes no kwargs, so it will be in args
- timeout = args[3] if len(args) == 4 else None
+ # if we have epoll, and select is not expected to work
+ # use an epoll-based "select". Otherwise don't touch
+ # anything to minimize changes
+ if _HAS_EPOLL:
+ # if the highest fd we've seen is > 1023
+ if max(map(_to_fileno, chain(*args[:3]))) > 1023:
+ return self._epoll_select(*args, **kwargs)
+ return self._select(*args, **kwargs)
+
+ def _select(self, *args, **kwargs):
+ timeout = kwargs.pop('timeout', None)
# either the time to give up, or None
end = (time.time() + timeout) if timeout else None
while end is None or time.time() < end:
@@ -167,6 +201,50 @@ class SequentialThreadingHandler(object):
# if we hit our timeout, lets return as a timeout
return ([], [], [])
+ def _epoll_select(self, rlist, wlist, xlist, timeout=None):
+ """epoll-based drop-in replacement for select to overcome select
+ limitation on a maximum filehandle value
+ """
+ if timeout is None:
+ timeout = -1
+ eventmasks = defaultdict(int)
+ rfd2obj = defaultdict(list)
+ wfd2obj = defaultdict(list)
+ xfd2obj = defaultdict(list)
+ read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case
+
+ def store_evmasks(obj_list, evmask, fd2obj):
+ for obj in obj_list:
+ fileno = _to_fileno(obj)
+ eventmasks[fileno] |= evmask
+ fd2obj[fileno].append(obj)
+
+ store_evmasks(rlist, read_evmask, rfd2obj)
+ store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
+ store_evmasks(xlist, select.EPOLLERR, xfd2obj)
+
+ poller = select.epoll()
+
+ for fileno in eventmasks:
+ poller.register(fileno, eventmasks[fileno])
+
+ try:
+ events = poller.poll(timeout)
+ revents = []
+ wevents = []
+ xevents = []
+ for fileno, event in events:
+ if event & read_evmask:
+ revents += rfd2obj.get(fileno, [])
+ if event & select.EPOLLOUT:
+ wevents += wfd2obj.get(fileno, [])
+ if event & select.EPOLLERR:
+ xevents += xfd2obj.get(fileno, [])
+ finally:
+ poller.close()
+
+ return revents, wevents, xevents
+
def socket(self):
return utils.create_tcp_socket(socket)
diff --git a/kazoo/tests/test_threading_handler.py b/kazoo/tests/test_threading_handler.py
index b4b93f1..269a1d2 100644
--- a/kazoo/tests/test_threading_handler.py
+++ b/kazoo/tests/test_threading_handler.py
@@ -46,6 +46,31 @@ class TestThreadingHandler(unittest.TestCase):
h.stop()
self.assertFalse(h._running)
+ def test_huge_file_descriptor(self):
+ from kazoo.handlers.threading import _HAS_EPOLL
+ if not _HAS_EPOLL:
+ self.skipTest('only run on systems with epoll()')
+ import resource
+ import socket
+ from kazoo.handlers.utils import create_tcp_socket
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
+ except (ValueError, resource.error):
+ self.skipTest('couldnt raise fd limit high enough')
+ fd = 0
+ socks = []
+ while fd < 4000:
+ sock = create_tcp_socket(socket)
+ fd = sock.fileno()
+ socks.append(sock)
+ h = self._makeOne()
+ h.start()
+ h.select(socks, [], [])
+ with self.assertRaises(ValueError):
+ h._select(socks, [], [])
+ h._epoll_select(socks, [], [])
+ h.stop()
+
class TestThreadingAsync(unittest.TestCase):
def _makeOne(self, *args):