summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-06-11 14:49:52 +0100
committerAsk Solem <ask@celeryproject.org>2014-06-11 14:49:52 +0100
commit65a16dd07885d82e1a5cbe605a2b9f6666925c98 (patch)
treea8e820172af54056b8c260c8ef599d6301fbad9b
parent42655629865fe4e456b1186ea6435978412d3de6 (diff)
downloadkombu-65a16dd07885d82e1a5cbe605a2b9f6666925c98.tar.gz
Async consume test working
-rw-r--r--async_test_queue_consume.py70
-rw-r--r--async_test_queue_declare.py19
-rw-r--r--kombu/message.py23
-rw-r--r--kombu/messaging.py28
4 files changed, 108 insertions, 32 deletions
diff --git a/async_test_queue_consume.py b/async_test_queue_consume.py
new file mode 100644
index 00000000..cee66ef8
--- /dev/null
+++ b/async_test_queue_consume.py
@@ -0,0 +1,70 @@
+from amqp import barrier, promise
+from kombu import Connection, Exchange, Queue, Consumer
+from kombu.async import Hub
+
+TEST_QUEUE = Queue('test3', Exchange('test3'))
+
+program_finished = promise()
+active_consumers = set()
+closing_channels = set()
+
+
+def on_channel_closed(channel):
+ print('closed channel: %r' % (channel.id, ))
+ program_finished()
+
+def on_consumer_cancelled(consumer):
+ print('Consumer cancelled: %r' % (consumer, ))
+ if consumer.channel not in closing_channels:
+ closing_channels.add(consumer.channel)
+ channel.close(callback=on_channel_closed)
+
+def stop():
+ print('on program finished')
+ for consumer in active_consumers:
+ consumer.cancel(callback=on_consumer_cancelled)
+
+
+def on_ack_sent(message):
+ print('ACK SENT FOR %r' % (message.delivery_tag, ))
+ if message.delivery_tag >= 10:
+ stop()
+
+def on_message(message):
+ message.ack(callback=promise(on_ack_sent, (message, )))
+
+def on_qos_applied(consumer):
+ consumer.consume()
+ print('-' * 76)
+
+def consume_messages(channel, queue):
+ consumer = Consumer(channel, [queue], on_message=on_message,
+ auto_declare=False)
+ active_consumers.add(consumer)
+ consumer.qos(prefetch_count=10, callback=on_qos_applied)
+
+def on_queue_declared(queue):
+ print('queue and exchange declared: {0}'.format(queue))
+ return consume_messages(queue.channel, queue)
+
+def on_channel_open(channel):
+ print('channel open: {0}'.format(channel))
+ return TEST_QUEUE(channel).declare().then(on_queue_declared)
+
+def on_connected(connection):
+ print('connected: {0}'.format(connection))
+ return connection.channel().then(on_channel_open)
+
+loop = Hub()
+connection = Connection('pyamqp://')
+connection.then(on_connected)
+
+#def declare_and_publish():
+# connection = yield Connection('pyamqp://')
+# channel = yield connection.channel()
+# queue = yield TEST_QUEUE(channel).declare()
+
+connection.register_with_event_loop(loop)
+while not program_finished.ready:
+ loop.run_once()
+
diff --git a/async_test_queue_declare.py b/async_test_queue_declare.py
index 3b41c9b8..7db76766 100644
--- a/async_test_queue_declare.py
+++ b/async_test_queue_declare.py
@@ -1,4 +1,4 @@
-from amqp import promise
+from amqp import barrier, promise
from kombu import Connection, Exchange, Queue, Producer
from kombu.async import Hub
@@ -7,16 +7,19 @@ TEST_QUEUE = Queue('test3', Exchange('test3'))
program_finished = promise()
-def publish_message(channel, message, **options):
- print('sending message')
- return Producer(channel).publish(message, **options)
+def publish_messages(channel, messages, **options):
+ print('sending messages')
+ producer = Producer(channel)
+ return barrier([producer.publish(m, callback=promise(), **options) for m in messages],
+ program_finished)
def on_queue_declared(queue):
print('queue and exchange declared: {0}'.format(queue))
- return publish_message(queue.channel, {'hello': 'world'},
- exchange=queue.exchange,
- routing_key=queue.routing_key,
- on_sent=program_finished)
+ return publish_messages(
+ queue.channel, [{'hello': i} for i in range(10)],
+ exchange=queue.exchange,
+ routing_key=queue.routing_key,
+ )
def on_channel_open(channel):
print('channel open: {0}'.format(channel))
diff --git a/kombu/message.py b/kombu/message.py
index 5f7ae525..14290add 100644
--- a/kombu/message.py
+++ b/kombu/message.py
@@ -65,7 +65,7 @@ class Message(object):
raise
callback(self, exc)
- def ack(self):
+ def ack(self, callback=None):
"""Acknowledge this message as being processed.,
This will remove the message from the queue.
@@ -85,24 +85,24 @@ class Message(object):
raise self.MessageStateError(
'Message already acknowledged with state: {0._state}'.format(
self))
- self.channel.basic_ack(self.delivery_tag)
self._state = 'ACK'
+ return self.channel.basic_ack(self.delivery_tag, on_sent=callback)
- def ack_log_error(self, logger, errors):
+ def ack_log_error(self, logger, errors, callback=None):
try:
- self.ack()
+ self.ack(callback=callback)
except errors as exc:
logger.critical("Couldn't ack %r, reason:%r",
self.delivery_tag, exc, exc_info=True)
- def reject_log_error(self, logger, errors, requeue=False):
+ def reject_log_error(self, logger, errors, requeue=False, callback=None):
try:
- self.reject(requeue=requeue)
+ self.reject(requeue=requeue, callback=callback)
except errors as exc:
logger.critical("Couldn't reject %r, reason: %r",
self.delivery_tag, exc, exc_info=True)
- def reject(self, requeue=False):
+ def reject(self, requeue=False, callback=None):
"""Reject this message.
The message will be discarded by the server.
@@ -115,10 +115,12 @@ class Message(object):
raise self.MessageStateError(
'Message already acknowledged with state: {0._state}'.format(
self))
- self.channel.basic_reject(self.delivery_tag, requeue=requeue)
self._state = 'REJECTED'
+ return self.channel.basic_reject(
+ self.delivery_tag, requeue=requeue, callback=callback,
+ )
- def requeue(self):
+ def requeue(self, callback=None):
"""Reject this message and put it back on the queue.
You must not use this method as a means of selecting messages
@@ -132,8 +134,9 @@ class Message(object):
raise self.MessageStateError(
'Message already acknowledged with state: {0._state}'.format(
self))
- self.channel.basic_reject(self.delivery_tag, requeue=True)
self._state = 'REQUEUED'
+ return self.channel.basic_reject(
+ self.delivery_tag, requeue=True, callback=callback,)
def decode(self):
"""Deserialize the message body, returning the original
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 0681b6c9..76653007 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -11,7 +11,7 @@ import numbers
from itertools import count
-from amqp.promise import Thenable, barrier, promise
+from amqp.promise import Thenable, barrier, ppartial, promise
from .common import maybe_declare
from .compression import compress
@@ -168,11 +168,11 @@ class Producer(object):
return publish(body, priority, content_type,
content_encoding, headers, properties,
routing_key, mandatory, immediate, exchange, declare,
- callback=None)
+ callback=callback)
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
- immediate, exchange, declare, callback):
+ immediate, exchange, declare, callback=None):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
@@ -439,7 +439,7 @@ class Consumer(object):
"""
return self.add_queue(Queue.from_dict(queue, **options))
- def consume(self, no_ack=None, on_sent=None):
+ def consume(self, no_ack=None, on_sent=None, callback=None):
"""Start consuming messages.
Can be called multiple times, but note that while it
@@ -452,15 +452,16 @@ class Consumer(object):
"""
if self.queues:
no_ack = self.no_ack if no_ack is None else no_ack
-
size = len(self.queues)
for i, queue in enumerate(self.queues):
if i == size - 1:
- self._basic_consume(
- queue, no_ack=no_ack, nowait=False, on_sent=on_sent,
+ return self._basic_consume(
+ queue, no_ack=no_ack, nowait=False,
+ on_sent=on_sent, on_reply=callback,
)
else:
self._basic_consume(queue, no_ack=no_ack, nowait=True)
+ return ensure_promise(callback)
def cancel(self, callback=None, nowait=True):
"""End all active queue consumers.
@@ -470,13 +471,11 @@ class Consumer(object):
"""
cancel = self.channel.basic_cancel
- promises = []
- for tag in values(self._active_tags):
- p = promise()
- cancel(tag, nowait=nowait, callback=p)
- promises.append(p)
- self._active_tags.clear()
- return barrier(promises, callback)
+ p = [cancel(tag, nowait=nowait) for tag in self._active_tags]
+ if p and isinstance(p[0], Thenable):
+ return barrier(p, ppartial(callback, self))
+ p = ready_promise(callback, self)
+ return p
close = cancel
def cancel_by_queue(self, queue, nowait=True, callback=None):
@@ -552,6 +551,7 @@ class Consumer(object):
:param apply_global: Apply new settings globally on all channels.
"""
+ callback = ppartial(callback, self) if callback else None
return self.channel.basic_qos(
prefetch_size, prefetch_count, apply_global,
on_sent=on_sent, callback=callback,