diff options
Diffstat (limited to 'kombu/async/hub.py')
-rw-r--r-- | kombu/async/hub.py | 66 |
1 files changed, 35 insertions, 31 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 25c71cdb..aee8adef 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -10,17 +10,15 @@ from __future__ import absolute_import import errno -from collections import deque from contextlib import contextmanager from time import sleep from types import GeneratorType as generator -from amqp.promise import promise, Thenable +from amqp.promise import Thenable, promise from kombu.five import Empty, range from kombu.log import get_logger from kombu.utils import cached_property, fileno -from kombu.utils.compat import get_errno from kombu.utils.eventio import READ, WRITE, ERR, poll from .timer import Timer @@ -80,7 +78,7 @@ class Hub(object): self.writers = {} self.on_tick = set() self.on_close = set() - self._ready = deque() + self._ready = set() self._running = False self._loop = None @@ -139,7 +137,7 @@ class Hub(object): except (MemoryError, AssertionError): raise except OSError as exc: - if get_errno(exc) == errno.ENOMEM: + if exc.errno == errno.ENOMEM: raise logger.error('Error in timer: %r', exc, exc_info=1) except Exception as exc: @@ -186,7 +184,7 @@ class Hub(object): def call_soon(self, callback, *args): if not isinstance(callback, Thenable): callback = promise(callback, args) - self._ready.append(callback) + self._ready.add(callback) return callback def call_later(self, delay, callback, *args): @@ -273,53 +271,59 @@ class Hub(object): tick_callback() while todo: - item = todo.popleft() + item = todo.pop() if item: item() poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 - #print('[[[HUB]]]: %s' % (self.repr_active(), )) + # print('[[[HUB]]]: %s' % (self.repr_active(), )) if readers or writers: to_consolidate = [] try: events = poll(poll_timeout) - #print('[EVENTS]: %s' % (self.nepr_events(events or []), )) + # print('[EVENTS]: %s' % (self.repr_events(events), )) except ValueError: # Issue 882 raise StopIteration() - for fileno, event in events or (): - if fileno in consolidate and \ - writers.get(fileno) is None: - to_consolidate.append(fileno) + for fd, event in events or (): + if fd in consolidate and \ + writers.get(fd) is None: + to_consolidate.append(fd) continue cb = cbargs = None - try: - if event & READ: - cb, cbargs = readers[fileno] - elif event & WRITE: - cb, cbargs = writers[fileno] - elif event & ERR: - try: - cb, cbargs = (readers.get(fileno) or - writers.get(fileno)) - except TypeError: - pass - except (KeyError, Empty): - hub_remove(fileno) - continue + + if event & READ: + try: + cb, cbargs = readers[fd] + except KeyError: + self.remove_reader(fd) + continue + elif event & WRITE: + try: + cb, cbargs = writers[fd] + except KeyError: + self.remove_writer(fd) + continue + elif event & ERR: + try: + cb, cbargs = (readers.get(fd) or + writers.get(fd)) + except TypeError: + pass + if cb is None: continue if isinstance(cb, generator): try: next(cb) except OSError as exc: - if get_errno(exc) != errno.EBADF: + if exc.errno != errno.EBADF: raise - hub_remove(fileno) + hub_remove(fd) except StopIteration: pass except Exception: - hub_remove(fileno) + hub_remove(fd) raise else: try: @@ -339,7 +343,7 @@ class Hub(object): def repr_events(self, events): from .debug import repr_events - return repr_events(self, events) + return repr_events(self, events or []) @cached_property def scheduler(self): |