summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-10-21 18:17:50 -0700
committerAsk Solem <ask@celeryproject.org>2015-10-21 18:17:50 -0700
commitf8d2a4517549d5728b93760c9d0042a8dc6b98fa (patch)
tree43d17588d4bd2c1c0eb4c49fd47d9e760cf5f2b8 /examples
parentdb9e2f563adafa6fc62112c1e0fbf7ec8609d59f (diff)
downloadkombu-f8d2a4517549d5728b93760c9d0042a8dc6b98fa.tar.gz
Adds ConsumerProducerMixin (Issue #530)
Diffstat (limited to 'examples')
-rw-r--r--examples/rpc-tut6/rpc_server.py38
1 files changed, 13 insertions, 25 deletions
diff --git a/examples/rpc-tut6/rpc_server.py b/examples/rpc-tut6/rpc_server.py
index 43636b6f..3bd536db 100644
--- a/examples/rpc-tut6/rpc_server.py
+++ b/examples/rpc-tut6/rpc_server.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
from kombu import Connection, Producer, Queue
-from kombu.mixins import ConsumerMixin
+from kombu.mixins import ConsumerProducerMixin
rpc_queue = Queue('rpc_queue')
@@ -15,8 +15,7 @@ def fib(n):
return fib(n-1) + fib(n-2)
-class Worker(ConsumerMixin):
- _producer_connection = None
+class Worker(ConsumerProducerMixin):
def __init__(self, connection):
self.connection = connection
@@ -34,29 +33,15 @@ class Worker(ConsumerMixin):
print(' [.] fib({0})'.format(n))
result = fib(n)
- with Producer(self.producer_connection) as producer:
- producer.publish(
- {'result': result},
- exchange='', routing_key=message.properties['reply_to'],
- correlation_id=message.properties['correlation_id'],
- serializer='json',
- )
+ self.producer.publish(
+ {'result': result},
+ exchange='', routing_key=message.properties['reply_to'],
+ correlation_id=message.properties['correlation_id'],
+ serializer='json',
+ retry=True,
+ )
message.ack()
- def on_consume_end(self, connection, channel):
- if self._producer_connection is not None:
- self._producer_connection.close()
- self._producer_connection = None
-
- @property
- def producer_connection(self):
- if self._producer_connection is None:
- conn = self.connection.clone()
- conn.ensure_connection(self.on_connection_error,
- self.connect_max_retries)
- self._producer_connection = conn
- return self._producer_connection
-
def start_worker(broker_url):
connection = Connection(broker_url)
@@ -66,4 +51,7 @@ def start_worker(broker_url):
if __name__ == '__main__':
- start_worker('pyamqp://')
+ try:
+ start_worker('pyamqp://')
+ except KeyboardInterrupt:
+ pass