diff options
| author | Ask Solem <ask@celeryproject.org> | 2012-06-24 16:32:17 +0100 |
|---|---|---|
| committer | Ask Solem <ask@celeryproject.org> | 2012-06-24 16:32:17 +0100 |
| commit | fa816f6dc920aeb7b68b75f98fa1656da57d05c8 (patch) | |
| tree | c2f2d076cc62ea62405fb6f267accab61d1f67c9 /examples | |
| parent | 4922c4aaea77be7a32a7d904104b055159e0da3e (diff) | |
| download | kombu-fa816f6dc920aeb7b68b75f98fa1656da57d05c8.tar.gz | |
BrokerConnection is now Connection in docs
Diffstat (limited to 'examples')
| -rw-r--r-- | examples/complete_receive.py | 25 | ||||
| -rw-r--r-- | examples/complete_receive_manual.py | 45 | ||||
| -rw-r--r-- | examples/complete_send.py | 18 | ||||
| -rw-r--r-- | examples/complete_send_manual.py | 35 | ||||
| -rw-r--r-- | examples/simple_eventlet_receive.py | 41 | ||||
| -rw-r--r-- | examples/simple_eventlet_send.py | 32 | ||||
| -rw-r--r-- | examples/simple_receive.py | 6 | ||||
| -rw-r--r-- | examples/simple_send.py | 8 | ||||
| -rw-r--r-- | examples/simple_task_queue/client.py | 24 | ||||
| -rw-r--r-- | examples/simple_task_queue/queues.py | 8 | ||||
| -rw-r--r-- | examples/simple_task_queue/worker.py | 20 |
11 files changed, 93 insertions, 169 deletions
diff --git a/examples/complete_receive.py b/examples/complete_receive.py index 416fa2d6..aa3987bf 100644 --- a/examples/complete_receive.py +++ b/examples/complete_receive.py @@ -2,15 +2,16 @@ Example of simple consumer that waits for a single message, acknowledges it and exits. """ + from __future__ import with_statement -from kombu import Connection, Exchange, Queue +from kombu import Connection, Exchange, Queue, Consumer, eventloop from pprint import pformat #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") +exchange = Exchange('kombu_demo', type='direct') +queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') def pretty(obj): @@ -19,17 +20,23 @@ def pretty(obj): #: This is the callback applied when a message is received. def handle_message(body, message): - print("Received message: %r" % (body, )) - print(" properties:\n%s" % (pretty(message.properties), )) - print(" delivery_info:\n%s" % (pretty(message.delivery_info), )) + print('Received message: %r' % (body, )) + print(' properties:\n%s' % (pretty(message.properties), )) + print(' delivery_info:\n%s' % (pretty(message.delivery_info), )) message.ack() -with Connection("amqp://guest:guest@localhost:5672//") as connection: +#: Create a connection and a channel. +#: If hostname, userid, password and virtual_host is not specified +#: the values below are the default, but listed here so it can +#: be easily changed. +with Connection('amqp://guest:guest@localhost:5672//') as connection: + #: Create consumer using our callback and queue. #: Second argument can also be a list to consume from #: any number of queues. - with connection.Consumer(queue, callbacks=[handle_message]): + with Consumer(connection, queue, callbacks=[handle_message]): + #: This waits for a single event. Note that this event may not #: be a message, or a message that is to be delivered to the consumers #: channel, but any event received on the connection. - connection.drain_events(timeout=10) + eventloop(connection, limit=1, timeout=10.0) diff --git a/examples/complete_receive_manual.py b/examples/complete_receive_manual.py deleted file mode 100644 index bf618e46..00000000 --- a/examples/complete_receive_manual.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Example of simple consumer that waits for a single message, acknowledges it -and exits. -""" - -from kombu import BrokerConnection, Exchange, Queue, Consumer -from pprint import pformat - -#: By default messages sent to exchanges are persistent (delivery_mode=2), -#: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") - - -def pretty(obj): - return pformat(obj, indent=4) - - -#: This is the callback applied when a message is received. -def handle_message(body, message): - print("Received message: %r" % (body, )) - print(" properties:\n%s" % (pretty(message.properties), )) - print(" delivery_info:\n%s" % (pretty(message.delivery_info), )) - message.ack() - -#: Create a connection and a channel. -#: If hostname, userid, password and virtual_host is not specified -#: the values below are the default, but listed here so it can -#: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") -channel = connection.channel() - -#: Create consumer using our callback and queue. -#: Second argument can also be a list to consume from -#: any number of queues. -consumer = Consumer(channel, queue, callbacks=[handle_message]) -consumer.consume() - -#: This waits for a single event. Note that this event may not -#: be a message, or a message that is to be delivered to the consumers -#: channel, but any event received on the connection. -connection.drain_events() diff --git a/examples/complete_send.py b/examples/complete_send.py index faa6a1a1..c81f8620 100644 --- a/examples/complete_send.py +++ b/examples/complete_send.py @@ -7,26 +7,26 @@ You can use `complete_receive.py` to receive the message sent. """ from __future__ import with_statement -from kombu import Connection, Exchange, Queue +from kombu import Connection, Producer, Exchange, Queue #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") +exchange = Exchange('kombu_demo', type='direct') +queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') -with Connection("amqp://guest:guest@localhost:5672//") as connection: +with Connection('amqp://guest:guest@localhost:5672//') as connection: #: Producers are used to publish messages. #: a default exchange and routing key can also be specifed #: as arguments the Producer, but we rather specify this explicitly #: at the publish call. - producer = connection.Producer() + producer = Producer(connection) #: Publish the message using the json serializer (which is the default), #: and zlib compression. The kombu consumer will automatically detect - #: encoding, serializiation and compression used and decode accordingly. - producer.publish({"hello": "world"}, + #: encoding, serialization and compression used and decode accordingly. + producer.publish({'hello': 'world'}, exchange=exchange, - routing_key="kombu_demo", - serializer="json", compression="zlib") + routing_key='kombu_demo', + serializer='json', compression='zlib') diff --git a/examples/complete_send_manual.py b/examples/complete_send_manual.py deleted file mode 100644 index 9e3b3252..00000000 --- a/examples/complete_send_manual.py +++ /dev/null @@ -1,35 +0,0 @@ -""" - -Example producer that sends a single message and exits. - -You can use `complete_receive.py` to receive the message sent. - -""" - -from kombu import BrokerConnection, Exchange, Queue, Producer - -#: By default messages sent to exchanges are persistent (delivery_mode=2), -#: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") - - -#: Create connection and channel. -#: If hostname, userid, password and virtual_host is not specified -#: the values below are the default, but listed here so it can -#: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") -channel = connection.channel() - -#: Producers are used to publish messages. -#: Routing keys can also be specifed as an argument to `publish`. -producer = Producer(channel, exchange, routing_key="kombu_demo") - -#: Publish the message using the json serializer (which is the default), -#: and zlib compression. The kombu consumer will automatically detect -#: encoding, serializiation and compression used and decode accordingly. -producer.publish({"hello": "world"}, serializer="json", - compression="zlib") diff --git a/examples/simple_eventlet_receive.py b/examples/simple_eventlet_receive.py index cce518cc..a8208650 100644 --- a/examples/simple_eventlet_receive.py +++ b/examples/simple_eventlet_receive.py @@ -6,12 +6,13 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the message sent. """ +from __future__ import with_statement + import eventlet -from eventlet import spawn from Queue import Empty -from kombu import BrokerConnection +from kombu import Connection eventlet.monkey_patch() @@ -22,25 +23,21 @@ def wait_many(timeout=1): #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") - - #: SimpleQueue mimics the interface of the Python Queue module. - #: First argument can either be a queue name or a kombu.Queue object. - #: If a name, then the queue will be declared with the name as the queue - #: name, exchange name and routing key. - queue = connection.SimpleQueue("kombu_demo") - - while True: - try: - message = queue.get(block=False, timeout=timeout) - except Empty: - break - else: - spawn(message.ack) - print(message.payload) - - queue.close() - connection.close() - + with Connection('amqp://guest:guest@localhost:5672//') as connection: + + #: SimpleQueue mimics the interface of the Python Queue module. + #: First argument can either be a queue name or a kombu.Queue object. + #: If a name, then the queue will be declared with the name as the + #: queue name, exchange name and routing key. + with connection.SimpleQueue('kombu_demo') as queue: + + while True: + try: + message = queue.get(block=False, timeout=timeout) + except Empty: + break + else: + message.ack() + print(message.payload) spawn(wait_many).wait() diff --git a/examples/simple_eventlet_send.py b/examples/simple_eventlet_send.py index 3d0641e1..f6259bf3 100644 --- a/examples/simple_eventlet_send.py +++ b/examples/simple_eventlet_send.py @@ -6,9 +6,11 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the message sent. """ +from __future__ import with_statement + import eventlet -from kombu import BrokerConnection +from kombu import Connection eventlet.monkey_patch() @@ -19,24 +21,22 @@ def send_many(n): #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") + with Connection('amqp://guest:guest@localhost:5672//') as connection: - #: SimpleQueue mimics the interface of the Python Queue module. - #: First argument can either be a queue name or a kombu.Queue object. - #: If a name, then the queue will be declared with the name as the queue - #: name, exchange name and routing key. - queue = connection.SimpleQueue("kombu_demo") + #: SimpleQueue mimics the interface of the Python Queue module. + #: First argument can either be a queue name or a kombu.Queue object. + #: If a name, then the queue will be declared with the name as the + #: queue name, exchange name and routing key. + with connection.SimpleQueue('kombu_demo') as queue: - def send_message(i): - queue.put({"hello": "world%s" % (i, )}) + def send_message(i): + queue.put({'hello': 'world%s' % (i, )}) - pool = eventlet.GreenPool(10) - for i in xrange(n): - pool.spawn(send_message, i) - pool.waitall() + pool = eventlet.GreenPool(10) + for i in xrange(n): + pool.spawn(send_message, i) + pool.waitall() - queue.close() - connection.close() -if __name__ == "__main__": +if __name__ == '__main__': send_many(10) diff --git a/examples/simple_receive.py b/examples/simple_receive.py index 9d53aa35..e9b933b1 100644 --- a/examples/simple_receive.py +++ b/examples/simple_receive.py @@ -4,19 +4,19 @@ Example receiving a message using the SimpleQueue interface. from __future__ import with_statement -from kombu import BrokerConnection +from kombu import Connection #: Create connection #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: +with Connection('amqp://guest:guest@localhost:5672//') as conn: #: SimpleQueue mimics the interface of the Python Queue module. #: First argument can either be a queue name or a kombu.Queue object. #: If a name, then the queue will be declared with the name as the queue #: name, exchange name and routing key. - with conn.SimpleQueue("kombu_demo") as queue: + with conn.SimpleQueue('kombu_demo') as queue: message = queue.get(block=True, timeout=10) message.ack() print(message.payload) diff --git a/examples/simple_send.py b/examples/simple_send.py index 76cdb9b5..e76fbcf0 100644 --- a/examples/simple_send.py +++ b/examples/simple_send.py @@ -8,20 +8,20 @@ message sent. """ from __future__ import with_statement -from kombu import BrokerConnection +from kombu import Connection #: Create connection #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: +with Connection('amqp://guest:guest@localhost:5672//') as conn: #: SimpleQueue mimics the interface of the Python Queue module. #: First argument can either be a queue name or a kombu.Queue object. #: If a name, then the queue will be declared with the name as the queue #: name, exchange name and routing key. - with conn.SimpleQueue("kombu_demo") as queue: - queue.put({"hello": "world"}, serializer="json", compression="zlib") + with conn.SimpleQueue('kombu_demo') as queue: + queue.put({'hello': 'world'}, serializer='json', compression='zlib') ##### diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py index 1ab6175a..e07b8c45 100644 --- a/examples/simple_task_queue/client.py +++ b/examples/simple_task_queue/client.py @@ -5,25 +5,25 @@ from kombu.pools import producers from queues import task_exchange -priority_to_routing_key = {"high": "hipri", - "mid": "midpri", - "low": "lopri"} +priority_to_routing_key = {'high': 'hipri', + 'mid': 'midpri', + 'low': 'lopri'} -def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"): - payload = {"fun": fun, "args": args, "kwargs": kwargs} +def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'): + payload = {'fun': fun, 'args': args, 'kwargs': kwargs} routing_key = priority_to_routing_key[priority] with producers[connection].acquire(block=True) as producer: maybe_declare(task_exchange, producer.channel) - producer.publish(payload, serializer="pickle", - compression="bzip2", + producer.publish(payload, serializer='pickle', + compression='bzip2', routing_key=routing_key) -if __name__ == "__main__": - from kombu import BrokerConnection +if __name__ == '__main__': + from kombu import Connection from tasks import hello_task - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") - send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={}, - priority="high") + connection = Connection('amqp://guest:guest@localhost:5672//') + send_as_task(connection, fun=hello_task, args=('Kombu', ), kwargs={}, + priority='high') diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py index 680e7575..602c2b0e 100644 --- a/examples/simple_task_queue/queues.py +++ b/examples/simple_task_queue/queues.py @@ -1,6 +1,6 @@ from kombu import Exchange, Queue -task_exchange = Exchange("tasks", type="direct") -task_queues = [Queue("hipri", task_exchange, routing_key="hipri"), - Queue("midpri", task_exchange, routing_key="midpri"), - Queue("lopri", task_exchange, routing_key="lopri")] +task_exchange = Exchange('tasks', type='direct') +task_queues = [Queue('hipri', task_exchange, routing_key='hipri'), + Queue('midpri', task_exchange, routing_key='midpri'), + Queue('lopri', task_exchange, routing_key='lopri')] diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 063a6b4d..3d933c4b 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -16,23 +16,23 @@ class Worker(ConsumerMixin): callbacks=[self.process_task])] def process_task(self, body, message): - fun = body["fun"] - args = body["args"] - kwargs = body["kwargs"] - self.info("Got task: %s", reprcall(fun.__name__, args, kwargs)) + fun = body['fun'] + args = body['args'] + kwargs = body['kwargs'] + self.info('Got task: %s', reprcall(fun.__name__, args, kwargs)) try: fun(*args, **kwdict(kwargs)) except Exception, exc: - self.error("task raised exception: %r", exc) + self.error('task raised exception: %r', exc) message.ack() -if __name__ == "__main__": - from kombu import BrokerConnection +if __name__ == '__main__': + from kombu import Connection from kombu.utils.debug import setup_logging - setup_logging(loglevel="INFO") + setup_logging(loglevel='INFO') - with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + with Connection('amqp://guest:guest@localhost:5672//') as conn: try: Worker(conn).run() except KeyboardInterrupt: - print("bye bye") + print('bye bye') |
