diff options
author | Ask Solem <ask@celeryproject.org> | 2014-06-11 14:49:52 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-06-11 14:49:52 +0100 |
commit | 65a16dd07885d82e1a5cbe605a2b9f6666925c98 (patch) | |
tree | a8e820172af54056b8c260c8ef599d6301fbad9b | |
parent | 42655629865fe4e456b1186ea6435978412d3de6 (diff) | |
download | kombu-65a16dd07885d82e1a5cbe605a2b9f6666925c98.tar.gz |
Async consume test working
-rw-r--r-- | async_test_queue_consume.py | 70 | ||||
-rw-r--r-- | async_test_queue_declare.py | 19 | ||||
-rw-r--r-- | kombu/message.py | 23 | ||||
-rw-r--r-- | kombu/messaging.py | 28 |
4 files changed, 108 insertions, 32 deletions
diff --git a/async_test_queue_consume.py b/async_test_queue_consume.py new file mode 100644 index 00000000..cee66ef8 --- /dev/null +++ b/async_test_queue_consume.py @@ -0,0 +1,70 @@ +from amqp import barrier, promise +from kombu import Connection, Exchange, Queue, Consumer +from kombu.async import Hub + +TEST_QUEUE = Queue('test3', Exchange('test3')) + +program_finished = promise() +active_consumers = set() +closing_channels = set() + + +def on_channel_closed(channel): + print('closed channel: %r' % (channel.id, )) + program_finished() + +def on_consumer_cancelled(consumer): + print('Consumer cancelled: %r' % (consumer, )) + if consumer.channel not in closing_channels: + closing_channels.add(consumer.channel) + channel.close(callback=on_channel_closed) + +def stop(): + print('on program finished') + for consumer in active_consumers: + consumer.cancel(callback=on_consumer_cancelled) + + +def on_ack_sent(message): + print('ACK SENT FOR %r' % (message.delivery_tag, )) + if message.delivery_tag >= 10: + stop() + +def on_message(message): + message.ack(callback=promise(on_ack_sent, (message, ))) + +def on_qos_applied(consumer): + consumer.consume() + print('-' * 76) + +def consume_messages(channel, queue): + consumer = Consumer(channel, [queue], on_message=on_message, + auto_declare=False) + active_consumers.add(consumer) + consumer.qos(prefetch_count=10, callback=on_qos_applied) + +def on_queue_declared(queue): + print('queue and exchange declared: {0}'.format(queue)) + return consume_messages(queue.channel, queue) + +def on_channel_open(channel): + print('channel open: {0}'.format(channel)) + return TEST_QUEUE(channel).declare().then(on_queue_declared) + +def on_connected(connection): + print('connected: {0}'.format(connection)) + return connection.channel().then(on_channel_open) + +loop = Hub() +connection = Connection('pyamqp://') +connection.then(on_connected) + +#def declare_and_publish(): +# connection = yield Connection('pyamqp://') +# channel = yield connection.channel() +# queue = yield TEST_QUEUE(channel).declare() + +connection.register_with_event_loop(loop) +while not program_finished.ready: + loop.run_once() + diff --git a/async_test_queue_declare.py b/async_test_queue_declare.py index 3b41c9b8..7db76766 100644 --- a/async_test_queue_declare.py +++ b/async_test_queue_declare.py @@ -1,4 +1,4 @@ -from amqp import promise +from amqp import barrier, promise from kombu import Connection, Exchange, Queue, Producer from kombu.async import Hub @@ -7,16 +7,19 @@ TEST_QUEUE = Queue('test3', Exchange('test3')) program_finished = promise() -def publish_message(channel, message, **options): - print('sending message') - return Producer(channel).publish(message, **options) +def publish_messages(channel, messages, **options): + print('sending messages') + producer = Producer(channel) + return barrier([producer.publish(m, callback=promise(), **options) for m in messages], + program_finished) def on_queue_declared(queue): print('queue and exchange declared: {0}'.format(queue)) - return publish_message(queue.channel, {'hello': 'world'}, - exchange=queue.exchange, - routing_key=queue.routing_key, - on_sent=program_finished) + return publish_messages( + queue.channel, [{'hello': i} for i in range(10)], + exchange=queue.exchange, + routing_key=queue.routing_key, + ) def on_channel_open(channel): print('channel open: {0}'.format(channel)) diff --git a/kombu/message.py b/kombu/message.py index 5f7ae525..14290add 100644 --- a/kombu/message.py +++ b/kombu/message.py @@ -65,7 +65,7 @@ class Message(object): raise callback(self, exc) - def ack(self): + def ack(self, callback=None): """Acknowledge this message as being processed., This will remove the message from the queue. @@ -85,24 +85,24 @@ class Message(object): raise self.MessageStateError( 'Message already acknowledged with state: {0._state}'.format( self)) - self.channel.basic_ack(self.delivery_tag) self._state = 'ACK' + return self.channel.basic_ack(self.delivery_tag, on_sent=callback) - def ack_log_error(self, logger, errors): + def ack_log_error(self, logger, errors, callback=None): try: - self.ack() + self.ack(callback=callback) except errors as exc: logger.critical("Couldn't ack %r, reason:%r", self.delivery_tag, exc, exc_info=True) - def reject_log_error(self, logger, errors, requeue=False): + def reject_log_error(self, logger, errors, requeue=False, callback=None): try: - self.reject(requeue=requeue) + self.reject(requeue=requeue, callback=callback) except errors as exc: logger.critical("Couldn't reject %r, reason: %r", self.delivery_tag, exc, exc_info=True) - def reject(self, requeue=False): + def reject(self, requeue=False, callback=None): """Reject this message. The message will be discarded by the server. @@ -115,10 +115,12 @@ class Message(object): raise self.MessageStateError( 'Message already acknowledged with state: {0._state}'.format( self)) - self.channel.basic_reject(self.delivery_tag, requeue=requeue) self._state = 'REJECTED' + return self.channel.basic_reject( + self.delivery_tag, requeue=requeue, callback=callback, + ) - def requeue(self): + def requeue(self, callback=None): """Reject this message and put it back on the queue. You must not use this method as a means of selecting messages @@ -132,8 +134,9 @@ class Message(object): raise self.MessageStateError( 'Message already acknowledged with state: {0._state}'.format( self)) - self.channel.basic_reject(self.delivery_tag, requeue=True) self._state = 'REQUEUED' + return self.channel.basic_reject( + self.delivery_tag, requeue=True, callback=callback,) def decode(self): """Deserialize the message body, returning the original diff --git a/kombu/messaging.py b/kombu/messaging.py index 0681b6c9..76653007 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -11,7 +11,7 @@ import numbers from itertools import count -from amqp.promise import Thenable, barrier, promise +from amqp.promise import Thenable, barrier, ppartial, promise from .common import maybe_declare from .compression import compress @@ -168,11 +168,11 @@ class Producer(object): return publish(body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare, - callback=None) + callback=callback) def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, - immediate, exchange, declare, callback): + immediate, exchange, declare, callback=None): channel = self.channel message = channel.prepare_message( body, priority, content_type, @@ -439,7 +439,7 @@ class Consumer(object): """ return self.add_queue(Queue.from_dict(queue, **options)) - def consume(self, no_ack=None, on_sent=None): + def consume(self, no_ack=None, on_sent=None, callback=None): """Start consuming messages. Can be called multiple times, but note that while it @@ -452,15 +452,16 @@ class Consumer(object): """ if self.queues: no_ack = self.no_ack if no_ack is None else no_ack - size = len(self.queues) for i, queue in enumerate(self.queues): if i == size - 1: - self._basic_consume( - queue, no_ack=no_ack, nowait=False, on_sent=on_sent, + return self._basic_consume( + queue, no_ack=no_ack, nowait=False, + on_sent=on_sent, on_reply=callback, ) else: self._basic_consume(queue, no_ack=no_ack, nowait=True) + return ensure_promise(callback) def cancel(self, callback=None, nowait=True): """End all active queue consumers. @@ -470,13 +471,11 @@ class Consumer(object): """ cancel = self.channel.basic_cancel - promises = [] - for tag in values(self._active_tags): - p = promise() - cancel(tag, nowait=nowait, callback=p) - promises.append(p) - self._active_tags.clear() - return barrier(promises, callback) + p = [cancel(tag, nowait=nowait) for tag in self._active_tags] + if p and isinstance(p[0], Thenable): + return barrier(p, ppartial(callback, self)) + p = ready_promise(callback, self) + return p close = cancel def cancel_by_queue(self, queue, nowait=True, callback=None): @@ -552,6 +551,7 @@ class Consumer(object): :param apply_global: Apply new settings globally on all channels. """ + callback = ppartial(callback, self) if callback else None return self.channel.basic_qos( prefetch_size, prefetch_count, apply_global, on_sent=on_sent, callback=callback, |