diff options
-rw-r--r-- | polling.py | 94 |
1 files changed, 74 insertions, 20 deletions
@@ -18,18 +18,18 @@ import select import time -class PollsterMixin: +class PollMixin: def __init__(self): super().__init__() self.readers = {} # {fd: (callback, args), ...}. self.writers = {} # {fd: (callback, args), ...}. - self.pollster = select.poll() + self._pollster = select.poll() def pollable(self): return bool(self.readers or self.writers) - def update(self, fd): + def _update(self, fd): assert isinstance(fd, int), fd flags = 0 if fd in self.readers: @@ -37,48 +37,96 @@ class PollsterMixin: if fd in self.writers: flags |= select.POLLOUT if flags: - self.pollster.register(fd, flags) + self._pollster.register(fd, flags) else: - self.pollster.unregister(fd) + self._pollster.unregister(fd) def add_reader(self, fd, callback, *args): self.readers[fd] = (callback, args) - self.update(fd) + self._update(fd) def add_writer(self, fd, callback, *args): self.writers[fd] = (callback, args) - self.update(fd) + self._update(fd) def remove_reader(self, fd): del self.readers[fd] - self.update(fd) + self._update(fd) def remove_writer(self, fd): del self.writers[fd] - self.update(fd) + self._update(fd) - def rawpoll(self, timeout=None): + def poll(self, timeout=None): # Timeout is in seconds, but poll() takes milliseconds. :-( msecs = None if timeout is None else int(1000 * timeout) - quads = [] - for fd, flags in self.pollster.poll(msecs): + events = [] # TODO: Do we need fd and flags in events? + for fd, flags in self._pollster.poll(msecs): if flags & (select.POLLIN | select.POLLHUP): if fd in self.readers: callback, args = self.readers[fd] - quads.append((fd, flags, callback, args)) + events.append((fd, flags, callback, args)) if flags & (select.POLLOUT | select.POLLHUP): if fd in self.writers: callback, args = self.writers[fd] - quads.append((fd, flags, callback, args)) - return quads + events.append((fd, flags, callback, args)) + return events + + +class KqueueMixin: + + def __init__(self): + super().__init__() + self.readers = {} + self.writers = {} + self.kqueue = select.kqueue() + + def pollable(self): + return bool(self.readers or self.writers) + + def add_reader(self, fd, callback, *args): + if fd not in self.readers: + kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) + self.kqueue.control([kev], 0, 0) + self.readers[fd] = (callback, args) + + def add_writer(self, fd, callback, *args): + if fd not in self.readers: + kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) + self.kqueue.control([kev], 0, 0) + self.writers[fd] = (callback, args) + + def remove_reader(self, fd): + del self.readers[fd] + kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) + self.kqueue.control([kev], 0, 0) + + def remove_writer(self, fd): + del self.writers[fd] + kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) + self.kqueue.control([kev], 0, 0) + + def poll(self, timeout=None): + events = [] + max_ev = len(self.readers) + len(self.writers) + for kev in self.kqueue.control(None, max_ev, timeout): + fd = kev.ident + flag = kev.filter + if flag == select.KQ_FILTER_READ and fd in self.readers: + callback, args = self.readers[fd] + events.append((fd, flag, callback, args)) + elif flag == select.KQ_FILTER_WRITE and fd in self.writers: + callback, args = self.writers[fd] + events.append((fd, flag, callback, args)) + return events class EventLoopMixin: def __init__(self): + super().__init__() self.ready = collections.deque() # [(callback, args), ...] self.scheduled = [] # [(when, callback, args), ...] - super().__init__() def call_soon(self, callback, *args): self.ready.append((callback, args)) @@ -110,8 +158,8 @@ class EventLoopMixin: timeout = max(0, when - time.time()) else: timeout = None - quads = self.rawpoll(timeout) - for fd, flag, callback, args in quads: + events = self.poll(timeout) + for fd, flag, callback, args in events: self.call_soon(callback, *args) # Handle 'later' callbacks that are ready. @@ -127,8 +175,14 @@ class EventLoopMixin: self.run_once() -class Pollster(EventLoopMixin, PollsterMixin): - pass +if hasattr(select, 'kqueue'): + class Pollster(EventLoopMixin, KqueueMixin): + pass +elif hasattr(select, 'poll'): + class Pollster(EventLoopMixin, PollMixin): + pass +else: + raise ImportError('Neither poll() not kqueue() supported') class ThreadRunner: |