summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-10-07 13:10:51 +0100
committerAsk Solem <ask@celeryproject.org>2013-10-07 13:12:39 +0100
commit221ae11044dc6d66e53801938a20cdb8b52548a5 (patch)
tree4911df7e35f4c0818a3204fa366ad32a07b86bc6 /kombu
parent3d340d574adcca68d791d82df7f4b5470dac11f5 (diff)
downloadkombu-221ae11044dc6d66e53801938a20cdb8b52548a5.tar.gz
Adds loop.call_soon and loop.run_forever
Diffstat (limited to 'kombu')
-rw-r--r--kombu/async/hub.py67
-rw-r--r--kombu/messaging.py1
-rw-r--r--kombu/tests/test_common.py1
3 files changed, 59 insertions, 10 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index fe0ca376..e3e2924f 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -8,10 +8,13 @@ Event loop implementation.
"""
from __future__ import absolute_import
+from collections import deque
from contextlib import contextmanager
from time import sleep
from types import GeneratorType as generator
+from amqp import promise
+
from kombu.five import Empty, items, range
from kombu.log import get_logger
from kombu.utils import cached_property, fileno, reprcall
@@ -25,6 +28,14 @@ logger = get_logger(__name__)
_current_loop = None
+class Stop(BaseException):
+ """Stops the event loop."""
+
+
+def _raise_stop_error():
+ raise Stop()
+
+
@contextmanager
def _dummy_context(*args, **kwargs):
yield
@@ -83,6 +94,10 @@ class Hub(object):
self.writers = {}
self.on_tick = set()
self.on_close = set()
+ self._ready = deque()
+
+ self._running = False
+ self._loop = None
# The eventloop (in celery.worker.loops)
# will merge fds in this set and then instead of calling
@@ -94,6 +109,8 @@ class Hub(object):
self.consolidate = set()
self.consolidate_callback = None
+ self.propagate_errors = ()
+
self._create_poller()
def reset(self):
@@ -108,11 +125,9 @@ class Hub(object):
def _close_poller(self):
if self.poller is not None:
self.poller.close()
- self.poller = None
def stop(self):
- self._close_poller()
- self.close()
+ self.call_soon(_raise_stop_error)
def __repr__(self):
return '<Hub@{0:#x}: R:{1} W:{2}>'.format(
@@ -132,6 +147,8 @@ class Hub(object):
entry()
except propagate:
raise
+ except MemoryError:
+ raise
except Exception as exc:
logger.error('Error in timer: %r', exc, exc_info=1)
return min(max(delay or 0, min_delay), max_delay)
@@ -154,6 +171,28 @@ class Hub(object):
self._unregister(fd)
self._discard(fd)
+ def run_forever(self):
+ self._running = True
+ try:
+ while 1:
+ try:
+ self.run_once()
+ except Stop:
+ break
+ finally:
+ self._running = False
+
+ def run_once(self):
+ try:
+ next(self.loop)
+ except StopIteration:
+ self._loop = None
+
+ def call_soon(self, callback, *args):
+ handle = promise(callback, args)
+ self._ready.append(handle)
+ return handle
+
def call_later(self, delay, callback, *args):
return self.timer.call_after(delay, callback, args)
@@ -198,6 +237,7 @@ class Hub(object):
pass
def close(self, *args):
+ self._close_poller()
[self._unregister(fd) for fd in self.readers]
self.readers.clear()
[self._unregister(fd) for fd in self.writers]
@@ -212,10 +252,10 @@ class Hub(object):
self.writers.pop(fd, None)
self.consolidate.discard(fd)
- def _loop(self, propagate=None,
- generator=generator, sleep=sleep, min=min, next=next,
- Empty=Empty, StopIteration=StopIteration, KeyError=KeyError,
- READ=READ, WRITE=WRITE, ERR=ERR):
+ def create_loop(self,
+ generator=generator, sleep=sleep, min=min, next=next,
+ Empty=Empty, StopIteration=StopIteration,
+ KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
readers, writers = self.readers, self.writers
poll = self.poller.poll
fire_timers = self.fire_timers
@@ -225,6 +265,8 @@ class Hub(object):
consolidate_callback = self.consolidate_callback
on_tick = self.on_tick
remove_ticks = on_tick.difference_update
+ todo = self._ready
+ propagate = self.propagate_errors
while 1:
outdated_ticks = set()
@@ -238,6 +280,11 @@ class Hub(object):
outdated_ticks.add(tick_callback)
remove_ticks(outdated_ticks)
+ while todo:
+ item = todo.popleft()
+ if item:
+ item()
+
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
#print('[[[HUB]]]: %s' % (self.repr_active(), ))
if readers or writers:
@@ -323,6 +370,8 @@ class Hub(object):
def scheduler(self):
return iter(self.timer)
- @cached_property
+ @property
def loop(self):
- self._loop()
+ if self._loop is None:
+ self._loop = self.create_loop()
+ return self._loop
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 1ee5f19e..5a9343ce 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -19,6 +19,7 @@ from .utils import ChannelPromise, maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
+
class Producer(object):
"""Message Producer.
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 82e7da81..a393d69f 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -141,7 +141,6 @@ class test_replies(Case):
'retry_policy': None,
'content_encoding': 'binary'})
-
@patch('kombu.common.itermessages')
def test_collect_replies_with_ack(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()