diff options
author | Ask Solem <ask@celeryproject.org> | 2013-10-07 13:10:51 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-10-07 13:12:39 +0100 |
commit | 221ae11044dc6d66e53801938a20cdb8b52548a5 (patch) | |
tree | 4911df7e35f4c0818a3204fa366ad32a07b86bc6 /kombu/async/hub.py | |
parent | 3d340d574adcca68d791d82df7f4b5470dac11f5 (diff) | |
download | kombu-221ae11044dc6d66e53801938a20cdb8b52548a5.tar.gz |
Adds loop.call_soon and loop.run_forever
Diffstat (limited to 'kombu/async/hub.py')
-rw-r--r-- | kombu/async/hub.py | 67 |
1 files changed, 58 insertions, 9 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py index fe0ca376..e3e2924f 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -8,10 +8,13 @@ Event loop implementation. """ from __future__ import absolute_import +from collections import deque from contextlib import contextmanager from time import sleep from types import GeneratorType as generator +from amqp import promise + from kombu.five import Empty, items, range from kombu.log import get_logger from kombu.utils import cached_property, fileno, reprcall @@ -25,6 +28,14 @@ logger = get_logger(__name__) _current_loop = None +class Stop(BaseException): + """Stops the event loop.""" + + +def _raise_stop_error(): + raise Stop() + + @contextmanager def _dummy_context(*args, **kwargs): yield @@ -83,6 +94,10 @@ class Hub(object): self.writers = {} self.on_tick = set() self.on_close = set() + self._ready = deque() + + self._running = False + self._loop = None # The eventloop (in celery.worker.loops) # will merge fds in this set and then instead of calling @@ -94,6 +109,8 @@ class Hub(object): self.consolidate = set() self.consolidate_callback = None + self.propagate_errors = () + self._create_poller() def reset(self): @@ -108,11 +125,9 @@ class Hub(object): def _close_poller(self): if self.poller is not None: self.poller.close() - self.poller = None def stop(self): - self._close_poller() - self.close() + self.call_soon(_raise_stop_error) def __repr__(self): return '<Hub@{0:#x}: R:{1} W:{2}>'.format( @@ -132,6 +147,8 @@ class Hub(object): entry() except propagate: raise + except MemoryError: + raise except Exception as exc: logger.error('Error in timer: %r', exc, exc_info=1) return min(max(delay or 0, min_delay), max_delay) @@ -154,6 +171,28 @@ class Hub(object): self._unregister(fd) self._discard(fd) + def run_forever(self): + self._running = True + try: + while 1: + try: + self.run_once() + except Stop: + break + finally: + self._running = False + + def run_once(self): + try: + next(self.loop) + except StopIteration: + self._loop = None + + def call_soon(self, callback, *args): + handle = promise(callback, args) + self._ready.append(handle) + return handle + def call_later(self, delay, callback, *args): return self.timer.call_after(delay, callback, args) @@ -198,6 +237,7 @@ class Hub(object): pass def close(self, *args): + self._close_poller() [self._unregister(fd) for fd in self.readers] self.readers.clear() [self._unregister(fd) for fd in self.writers] @@ -212,10 +252,10 @@ class Hub(object): self.writers.pop(fd, None) self.consolidate.discard(fd) - def _loop(self, propagate=None, - generator=generator, sleep=sleep, min=min, next=next, - Empty=Empty, StopIteration=StopIteration, KeyError=KeyError, - READ=READ, WRITE=WRITE, ERR=ERR): + def create_loop(self, + generator=generator, sleep=sleep, min=min, next=next, + Empty=Empty, StopIteration=StopIteration, + KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR): readers, writers = self.readers, self.writers poll = self.poller.poll fire_timers = self.fire_timers @@ -225,6 +265,8 @@ class Hub(object): consolidate_callback = self.consolidate_callback on_tick = self.on_tick remove_ticks = on_tick.difference_update + todo = self._ready + propagate = self.propagate_errors while 1: outdated_ticks = set() @@ -238,6 +280,11 @@ class Hub(object): outdated_ticks.add(tick_callback) remove_ticks(outdated_ticks) + while todo: + item = todo.popleft() + if item: + item() + poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 #print('[[[HUB]]]: %s' % (self.repr_active(), )) if readers or writers: @@ -323,6 +370,8 @@ class Hub(object): def scheduler(self): return iter(self.timer) - @cached_property + @property def loop(self): - self._loop() + if self._loop is None: + self._loop = self.create_loop() + return self._loop |