summaryrefslogtreecommitdiff
path: root/examples/rpc-tut6
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-10-21 18:09:30 -0700
committerAsk Solem <ask@celeryproject.org>2015-10-21 18:09:30 -0700
commitdb9e2f563adafa6fc62112c1e0fbf7ec8609d59f (patch)
tree97406740513a782434730f5bbcc82b977bceac8b /examples/rpc-tut6
parentf1899f46c58afdd1ace3c95311bcb3d17619e98c (diff)
downloadkombu-db9e2f563adafa6fc62112c1e0fbf7ec8609d59f.tar.gz
Implement kombu version of the RabbitMQ rpc-tut-6 example. Closes #530
Diffstat (limited to 'examples/rpc-tut6')
-rw-r--r--examples/rpc-tut6/rpc_client.py46
-rw-r--r--examples/rpc-tut6/rpc_server.py69
2 files changed, 115 insertions, 0 deletions
diff --git a/examples/rpc-tut6/rpc_client.py b/examples/rpc-tut6/rpc_client.py
new file mode 100644
index 00000000..99b53754
--- /dev/null
+++ b/examples/rpc-tut6/rpc_client.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+
+from kombu import Connection, Producer, Consumer, Queue
+from kombu.utils import uuid
+
+
+class FibonacciRpcClient(object):
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.callback_queue = Queue(uuid(), exclusive=True, auto_delete=True)
+
+ def on_response(self, message):
+ if message.properties['correlation_id'] == self.correlation_id:
+ self.response = message.payload['result']
+
+ def call(self, n):
+ self.response = None
+ self.correlation_id = uuid()
+ with Producer(self.connection) as producer:
+ producer.publish(
+ {'n': n},
+ exchange='',
+ routing_key='rpc_queue',
+ declare=[self.callback_queue],
+ reply_to=self.callback_queue,
+ correlation_id=self.correlation_id,
+ )
+ with Consumer(self.connection,
+ on_message=self.on_response,
+ queues=[self.callback_queue], no_ack=True):
+ while self.response is None:
+ self.connection.drain_events()
+ return self.response
+
+
+def main(broker_url):
+ connection = Connection(broker_url)
+ fibonacci_rpc = FibonacciRpcClient(connection)
+ print(' [x] Requesting fib(30)')
+ response = fibonacci_rpc.call(30)
+ print(' [.] Got {0!r}'.format(response))
+
+
+if __name__ == '__main__':
+ main('pyamqp://')
diff --git a/examples/rpc-tut6/rpc_server.py b/examples/rpc-tut6/rpc_server.py
new file mode 100644
index 00000000..43636b6f
--- /dev/null
+++ b/examples/rpc-tut6/rpc_server.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+
+from kombu import Connection, Producer, Queue
+from kombu.mixins import ConsumerMixin
+
+rpc_queue = Queue('rpc_queue')
+
+
+def fib(n):
+ if n == 0:
+ return 0
+ elif n == 1:
+ return 1
+ else:
+ return fib(n-1) + fib(n-2)
+
+
+class Worker(ConsumerMixin):
+ _producer_connection = None
+
+ def __init__(self, connection):
+ self.connection = connection
+
+ def get_consumers(self, Consumer, channel):
+ return [Consumer(
+ queues=[rpc_queue],
+ on_message=self.on_request,
+ accept={'application/json'},
+ prefetch_count=1,
+ )]
+
+ def on_request(self, message):
+ n = message.payload['n']
+ print(' [.] fib({0})'.format(n))
+ result = fib(n)
+
+ with Producer(self.producer_connection) as producer:
+ producer.publish(
+ {'result': result},
+ exchange='', routing_key=message.properties['reply_to'],
+ correlation_id=message.properties['correlation_id'],
+ serializer='json',
+ )
+ message.ack()
+
+ def on_consume_end(self, connection, channel):
+ if self._producer_connection is not None:
+ self._producer_connection.close()
+ self._producer_connection = None
+
+ @property
+ def producer_connection(self):
+ if self._producer_connection is None:
+ conn = self.connection.clone()
+ conn.ensure_connection(self.on_connection_error,
+ self.connect_max_retries)
+ self._producer_connection = conn
+ return self._producer_connection
+
+
+def start_worker(broker_url):
+ connection = Connection(broker_url)
+ print " [x] Awaiting RPC requests"
+ worker = Worker(connection)
+ worker.run()
+
+
+if __name__ == '__main__':
+ start_worker('pyamqp://')