summaryrefslogtreecommitdiff
path: root/kombu/async/hub.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/async/hub.py')
-rw-r--r--kombu/async/hub.py66
1 files changed, 35 insertions, 31 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 25c71cdb..aee8adef 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -10,17 +10,15 @@ from __future__ import absolute_import
import errno
-from collections import deque
from contextlib import contextmanager
from time import sleep
from types import GeneratorType as generator
-from amqp.promise import promise, Thenable
+from amqp.promise import Thenable, promise
from kombu.five import Empty, range
from kombu.log import get_logger
from kombu.utils import cached_property, fileno
-from kombu.utils.compat import get_errno
from kombu.utils.eventio import READ, WRITE, ERR, poll
from .timer import Timer
@@ -80,7 +78,7 @@ class Hub(object):
self.writers = {}
self.on_tick = set()
self.on_close = set()
- self._ready = deque()
+ self._ready = set()
self._running = False
self._loop = None
@@ -139,7 +137,7 @@ class Hub(object):
except (MemoryError, AssertionError):
raise
except OSError as exc:
- if get_errno(exc) == errno.ENOMEM:
+ if exc.errno == errno.ENOMEM:
raise
logger.error('Error in timer: %r', exc, exc_info=1)
except Exception as exc:
@@ -186,7 +184,7 @@ class Hub(object):
def call_soon(self, callback, *args):
if not isinstance(callback, Thenable):
callback = promise(callback, args)
- self._ready.append(callback)
+ self._ready.add(callback)
return callback
def call_later(self, delay, callback, *args):
@@ -273,53 +271,59 @@ class Hub(object):
tick_callback()
while todo:
- item = todo.popleft()
+ item = todo.pop()
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
- #print('[[[HUB]]]: %s' % (self.repr_active(), ))
+ # print('[[[HUB]]]: %s' % (self.repr_active(), ))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
- #print('[EVENTS]: %s' % (self.nepr_events(events or []), ))
+ # print('[EVENTS]: %s' % (self.repr_events(events), ))
except ValueError: # Issue 882
raise StopIteration()
- for fileno, event in events or ():
- if fileno in consolidate and \
- writers.get(fileno) is None:
- to_consolidate.append(fileno)
+ for fd, event in events or ():
+ if fd in consolidate and \
+ writers.get(fd) is None:
+ to_consolidate.append(fd)
continue
cb = cbargs = None
- try:
- if event & READ:
- cb, cbargs = readers[fileno]
- elif event & WRITE:
- cb, cbargs = writers[fileno]
- elif event & ERR:
- try:
- cb, cbargs = (readers.get(fileno) or
- writers.get(fileno))
- except TypeError:
- pass
- except (KeyError, Empty):
- hub_remove(fileno)
- continue
+
+ if event & READ:
+ try:
+ cb, cbargs = readers[fd]
+ except KeyError:
+ self.remove_reader(fd)
+ continue
+ elif event & WRITE:
+ try:
+ cb, cbargs = writers[fd]
+ except KeyError:
+ self.remove_writer(fd)
+ continue
+ elif event & ERR:
+ try:
+ cb, cbargs = (readers.get(fd) or
+ writers.get(fd))
+ except TypeError:
+ pass
+
if cb is None:
continue
if isinstance(cb, generator):
try:
next(cb)
except OSError as exc:
- if get_errno(exc) != errno.EBADF:
+ if exc.errno != errno.EBADF:
raise
- hub_remove(fileno)
+ hub_remove(fd)
except StopIteration:
pass
except Exception:
- hub_remove(fileno)
+ hub_remove(fd)
raise
else:
try:
@@ -339,7 +343,7 @@ class Hub(object):
def repr_events(self, events):
from .debug import repr_events
- return repr_events(self, events)
+ return repr_events(self, events or [])
@cached_property
def scheduler(self):