diff options
Diffstat (limited to 'kazoo/handlers/utils.py')
-rw-r--r-- | kazoo/handlers/utils.py | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py index fa561fe..9647a24 100644 --- a/kazoo/handlers/utils.py +++ b/kazoo/handlers/utils.py @@ -2,11 +2,21 @@ import errno import functools +import os import select import ssl import socket import time +from collections import defaultdict + +import six + +if six.PY34: + import selectors +else: + import selectors2 as selectors + HAS_FNCTL = True try: import fcntl @@ -19,6 +29,7 @@ _NONE = object() class AsyncResult(object): """A one-time event that stores a value or an exception""" + def __init__(self, handler, condition_factory, timeout_factory): self._handler = handler self._exception = _NONE @@ -126,6 +137,7 @@ class AsyncResult(object): else: functools.partial(callback, self)() + def _set_fd_cloexec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) @@ -272,6 +284,7 @@ def capture_exceptions(async_result): :param async_result: An async result implementing :class:`IAsyncResult` """ + def capture(function): @functools.wraps(function) def captured_function(*args, **kwargs): @@ -279,7 +292,9 @@ def capture_exceptions(async_result): return function(*args, **kwargs) except Exception as exc: async_result.set_exception(exc) + return captured_function + return capture @@ -291,6 +306,7 @@ def wrap(async_result): :param async_result: An async result implementing :class:`IAsyncResult` """ + def capture(function): @capture_exceptions(async_result) def captured_function(*args, **kwargs): @@ -298,5 +314,78 @@ def wrap(async_result): if value is not None: async_result.set(value) return value + return captured_function + return capture + + +def fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + + Raises: + TypeError if the object is invalid + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise TypeError("Invalid file object: " + "{!r}".format(fileobj)) + if fd < 0: + raise TypeError("Invalid file descriptor: {}".format(fd)) + os.fstat(fd) + return fd + + +def selector_select(rlist, wlist, xlist, timeout=None, + selectors_module=selectors): + """Selector-based drop-in replacement for select to overcome select + limitation on a maximum filehandle value. + + Need backport selectors2 package in python 2. + """ + if timeout is not None: + if not (isinstance(timeout, six.integer_types) or isinstance( + timeout, float)): + raise TypeError('timeout must be a number') + if timeout < 0: + raise ValueError('timeout must be non-negative') + + events_mapping = {selectors_module.EVENT_READ: rlist, + selectors_module.EVENT_WRITE: wlist} + fd_events = defaultdict(int) + fd_fileobjs = defaultdict(list) + + for event, fileobjs in events_mapping.items(): + for fileobj in fileobjs: + fd = fileobj_to_fd(fileobj) + fd_events[fd] |= event + fd_fileobjs[fd].append(fileobj) + + selector = selectors_module.DefaultSelector() + for fd, events in fd_events.items(): + selector.register(fd, events) + + revents, wevents, xevents = [], [], [] + try: + ready = selector.select(timeout) + finally: + selector.close() + + for info in ready: + k, events = info + if events & selectors.EVENT_READ: + revents.extend(fd_fileobjs[k.fd]) + elif events & selectors.EVENT_WRITE: + wevents.extend(fd_fileobjs[k.fd]) + + return revents, wevents, xevents |