diff options
author | Ask Solem <ask@celeryproject.org> | 2015-10-21 18:17:50 -0700 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-10-21 18:17:50 -0700 |
commit | f8d2a4517549d5728b93760c9d0042a8dc6b98fa (patch) | |
tree | 43d17588d4bd2c1c0eb4c49fd47d9e760cf5f2b8 /examples | |
parent | db9e2f563adafa6fc62112c1e0fbf7ec8609d59f (diff) | |
download | kombu-f8d2a4517549d5728b93760c9d0042a8dc6b98fa.tar.gz |
Adds ConsumerProducerMixin (Issue #530)
Diffstat (limited to 'examples')
-rw-r--r-- | examples/rpc-tut6/rpc_server.py | 38 |
1 files changed, 13 insertions, 25 deletions
diff --git a/examples/rpc-tut6/rpc_server.py b/examples/rpc-tut6/rpc_server.py index 43636b6f..3bd536db 100644 --- a/examples/rpc-tut6/rpc_server.py +++ b/examples/rpc-tut6/rpc_server.py @@ -1,7 +1,7 @@ #!/usr/bin/env python from kombu import Connection, Producer, Queue -from kombu.mixins import ConsumerMixin +from kombu.mixins import ConsumerProducerMixin rpc_queue = Queue('rpc_queue') @@ -15,8 +15,7 @@ def fib(n): return fib(n-1) + fib(n-2) -class Worker(ConsumerMixin): - _producer_connection = None +class Worker(ConsumerProducerMixin): def __init__(self, connection): self.connection = connection @@ -34,29 +33,15 @@ class Worker(ConsumerMixin): print(' [.] fib({0})'.format(n)) result = fib(n) - with Producer(self.producer_connection) as producer: - producer.publish( - {'result': result}, - exchange='', routing_key=message.properties['reply_to'], - correlation_id=message.properties['correlation_id'], - serializer='json', - ) + self.producer.publish( + {'result': result}, + exchange='', routing_key=message.properties['reply_to'], + correlation_id=message.properties['correlation_id'], + serializer='json', + retry=True, + ) message.ack() - def on_consume_end(self, connection, channel): - if self._producer_connection is not None: - self._producer_connection.close() - self._producer_connection = None - - @property - def producer_connection(self): - if self._producer_connection is None: - conn = self.connection.clone() - conn.ensure_connection(self.on_connection_error, - self.connect_max_retries) - self._producer_connection = conn - return self._producer_connection - def start_worker(broker_url): connection = Connection(broker_url) @@ -66,4 +51,7 @@ def start_worker(broker_url): if __name__ == '__main__': - start_worker('pyamqp://') + try: + start_worker('pyamqp://') + except KeyboardInterrupt: + pass |