summaryrefslogtreecommitdiff
path: root/kazoo/handlers/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/handlers/utils.py')
-rw-r--r--kazoo/handlers/utils.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py
index fa561fe..9647a24 100644
--- a/kazoo/handlers/utils.py
+++ b/kazoo/handlers/utils.py
@@ -2,11 +2,21 @@
import errno
import functools
+import os
import select
import ssl
import socket
import time
+from collections import defaultdict
+
+import six
+
+if six.PY34:
+ import selectors
+else:
+ import selectors2 as selectors
+
HAS_FNCTL = True
try:
import fcntl
@@ -19,6 +29,7 @@ _NONE = object()
class AsyncResult(object):
"""A one-time event that stores a value or an exception"""
+
def __init__(self, handler, condition_factory, timeout_factory):
self._handler = handler
self._exception = _NONE
@@ -126,6 +137,7 @@ class AsyncResult(object):
else:
functools.partial(callback, self)()
+
def _set_fd_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
@@ -272,6 +284,7 @@ def capture_exceptions(async_result):
:param async_result: An async result implementing :class:`IAsyncResult`
"""
+
def capture(function):
@functools.wraps(function)
def captured_function(*args, **kwargs):
@@ -279,7 +292,9 @@ def capture_exceptions(async_result):
return function(*args, **kwargs)
except Exception as exc:
async_result.set_exception(exc)
+
return captured_function
+
return capture
@@ -291,6 +306,7 @@ def wrap(async_result):
:param async_result: An async result implementing :class:`IAsyncResult`
"""
+
def capture(function):
@capture_exceptions(async_result)
def captured_function(*args, **kwargs):
@@ -298,5 +314,78 @@ def wrap(async_result):
if value is not None:
async_result.set(value)
return value
+
return captured_function
+
return capture
+
+
+def fileobj_to_fd(fileobj):
+ """Return a file descriptor from a file object.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+
+ Returns:
+ corresponding file descriptor
+
+ Raises:
+ TypeError if the object is invalid
+ """
+ if isinstance(fileobj, int):
+ fd = fileobj
+ else:
+ try:
+ fd = int(fileobj.fileno())
+ except (AttributeError, TypeError, ValueError):
+ raise TypeError("Invalid file object: "
+ "{!r}".format(fileobj))
+ if fd < 0:
+ raise TypeError("Invalid file descriptor: {}".format(fd))
+ os.fstat(fd)
+ return fd
+
+
+def selector_select(rlist, wlist, xlist, timeout=None,
+ selectors_module=selectors):
+ """Selector-based drop-in replacement for select to overcome select
+ limitation on a maximum filehandle value.
+
+ Need backport selectors2 package in python 2.
+ """
+ if timeout is not None:
+ if not (isinstance(timeout, six.integer_types) or isinstance(
+ timeout, float)):
+ raise TypeError('timeout must be a number')
+ if timeout < 0:
+ raise ValueError('timeout must be non-negative')
+
+ events_mapping = {selectors_module.EVENT_READ: rlist,
+ selectors_module.EVENT_WRITE: wlist}
+ fd_events = defaultdict(int)
+ fd_fileobjs = defaultdict(list)
+
+ for event, fileobjs in events_mapping.items():
+ for fileobj in fileobjs:
+ fd = fileobj_to_fd(fileobj)
+ fd_events[fd] |= event
+ fd_fileobjs[fd].append(fileobj)
+
+ selector = selectors_module.DefaultSelector()
+ for fd, events in fd_events.items():
+ selector.register(fd, events)
+
+ revents, wevents, xevents = [], [], []
+ try:
+ ready = selector.select(timeout)
+ finally:
+ selector.close()
+
+ for info in ready:
+ k, events = info
+ if events & selectors.EVENT_READ:
+ revents.extend(fd_fileobjs[k.fd])
+ elif events & selectors.EVENT_WRITE:
+ wevents.extend(fd_fileobjs[k.fd])
+
+ return revents, wevents, xevents