diff options
author | Patrick White <patrick@patrickwhite.org> | 2017-06-01 11:30:51 -0700 |
---|---|---|
committer | Patrick White <patrick@patrickwhite.org> | 2017-06-04 13:45:27 -0700 |
commit | 267e61b4323bc13505e8933fa9b89d0591af3a69 (patch) | |
tree | c46e6bf0fee07aa118c1b1a8420156c67aef1113 | |
parent | 95b2185c0741ed2616def1eefc2bbfd0d4d180ab (diff) | |
download | kazoo-267e61b4323bc13505e8933fa9b89d0591af3a69.tar.gz |
feat(core): use epoll when available to support fds > 1023
When epoll is available, and the highest fd in use is > 1023, route through epoll.
Otherwise, use the existing select() behavior so by and large nothing changes.
Closes #266, #171
-rw-r--r-- | kazoo/handlers/threading.py | 82 | ||||
-rw-r--r-- | kazoo/tests/test_threading_handler.py | 25 |
2 files changed, 105 insertions, 2 deletions
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index 4b5e0fa..3743c88 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -18,6 +18,10 @@ import select import socket import threading import time +import six + +from collections import defaultdict +from itertools import chain import kazoo.python2atexit as python2atexit @@ -33,6 +37,27 @@ _STOP = object() log = logging.getLogger(__name__) +_HAS_EPOLL = hasattr(select, "epoll") + + +def _to_fileno(obj): + if isinstance(obj, six.integer_types): + fd = int(obj) + elif hasattr(obj, "fileno"): + fd = obj.fileno() + if not isinstance(fd, six.integer_types): + raise TypeError("fileno() returned a non-integer") + fd = int(fd) + else: + raise TypeError("argument must be an int, or have a fileno() method.") + + if fd < 0: + raise ValueError( + "file descriptor cannot be a negative integer (%d)" % (fd,) + ) + + return fd + class KazooTimeoutError(Exception): pass @@ -143,8 +168,17 @@ class SequentialThreadingHandler(object): python2atexit.unregister(self.stop) def select(self, *args, **kwargs): - # select() takes no kwargs, so it will be in args - timeout = args[3] if len(args) == 4 else None + # if we have epoll, and select is not expected to work + # use an epoll-based "select". Otherwise don't touch + # anything to minimize changes + if _HAS_EPOLL: + # if the highest fd we've seen is > 1023 + if max(map(_to_fileno, chain(*args[:3]))) > 1023: + return self._epoll_select(*args, **kwargs) + return self._select(*args, **kwargs) + + def _select(self, *args, **kwargs): + timeout = kwargs.pop('timeout', None) # either the time to give up, or None end = (time.time() + timeout) if timeout else None while end is None or time.time() < end: @@ -167,6 +201,50 @@ class SequentialThreadingHandler(object): # if we hit our timeout, lets return as a timeout return ([], [], []) + def _epoll_select(self, rlist, wlist, xlist, timeout=None): + """epoll-based drop-in replacement for select to overcome select + limitation on a maximum filehandle value + """ + if timeout is None: + timeout = -1 + eventmasks = defaultdict(int) + rfd2obj = defaultdict(list) + wfd2obj = defaultdict(list) + xfd2obj = defaultdict(list) + read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case + + def store_evmasks(obj_list, evmask, fd2obj): + for obj in obj_list: + fileno = _to_fileno(obj) + eventmasks[fileno] |= evmask + fd2obj[fileno].append(obj) + + store_evmasks(rlist, read_evmask, rfd2obj) + store_evmasks(wlist, select.EPOLLOUT, wfd2obj) + store_evmasks(xlist, select.EPOLLERR, xfd2obj) + + poller = select.epoll() + + for fileno in eventmasks: + poller.register(fileno, eventmasks[fileno]) + + try: + events = poller.poll(timeout) + revents = [] + wevents = [] + xevents = [] + for fileno, event in events: + if event & read_evmask: + revents += rfd2obj.get(fileno, []) + if event & select.EPOLLOUT: + wevents += wfd2obj.get(fileno, []) + if event & select.EPOLLERR: + xevents += xfd2obj.get(fileno, []) + finally: + poller.close() + + return revents, wevents, xevents + def socket(self): return utils.create_tcp_socket(socket) diff --git a/kazoo/tests/test_threading_handler.py b/kazoo/tests/test_threading_handler.py index b4b93f1..269a1d2 100644 --- a/kazoo/tests/test_threading_handler.py +++ b/kazoo/tests/test_threading_handler.py @@ -46,6 +46,31 @@ class TestThreadingHandler(unittest.TestCase): h.stop() self.assertFalse(h._running) + def test_huge_file_descriptor(self): + from kazoo.handlers.threading import _HAS_EPOLL + if not _HAS_EPOLL: + self.skipTest('only run on systems with epoll()') + import resource + import socket + from kazoo.handlers.utils import create_tcp_socket + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) + except (ValueError, resource.error): + self.skipTest('couldnt raise fd limit high enough') + fd = 0 + socks = [] + while fd < 4000: + sock = create_tcp_socket(socket) + fd = sock.fileno() + socks.append(sock) + h = self._makeOne() + h.start() + h.select(socks, [], []) + with self.assertRaises(ValueError): + h._select(socks, [], []) + h._epoll_select(socks, [], []) + h.stop() + class TestThreadingAsync(unittest.TestCase): def _makeOne(self, *args): |