summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-06-24 16:32:17 +0100
committerAsk Solem <ask@celeryproject.org>2012-06-24 16:32:17 +0100
commitfa816f6dc920aeb7b68b75f98fa1656da57d05c8 (patch)
treec2f2d076cc62ea62405fb6f267accab61d1f67c9 /examples
parent4922c4aaea77be7a32a7d904104b055159e0da3e (diff)
downloadkombu-fa816f6dc920aeb7b68b75f98fa1656da57d05c8.tar.gz
BrokerConnection is now Connection in docs
Diffstat (limited to 'examples')
-rw-r--r--examples/complete_receive.py25
-rw-r--r--examples/complete_receive_manual.py45
-rw-r--r--examples/complete_send.py18
-rw-r--r--examples/complete_send_manual.py35
-rw-r--r--examples/simple_eventlet_receive.py41
-rw-r--r--examples/simple_eventlet_send.py32
-rw-r--r--examples/simple_receive.py6
-rw-r--r--examples/simple_send.py8
-rw-r--r--examples/simple_task_queue/client.py24
-rw-r--r--examples/simple_task_queue/queues.py8
-rw-r--r--examples/simple_task_queue/worker.py20
11 files changed, 93 insertions, 169 deletions
diff --git a/examples/complete_receive.py b/examples/complete_receive.py
index 416fa2d6..aa3987bf 100644
--- a/examples/complete_receive.py
+++ b/examples/complete_receive.py
@@ -2,15 +2,16 @@
Example of simple consumer that waits for a single message, acknowledges it
and exits.
"""
+
from __future__ import with_statement
-from kombu import Connection, Exchange, Queue
+from kombu import Connection, Exchange, Queue, Consumer, eventloop
from pprint import pformat
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
+exchange = Exchange('kombu_demo', type='direct')
+queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
def pretty(obj):
@@ -19,17 +20,23 @@ def pretty(obj):
#: This is the callback applied when a message is received.
def handle_message(body, message):
- print("Received message: %r" % (body, ))
- print(" properties:\n%s" % (pretty(message.properties), ))
- print(" delivery_info:\n%s" % (pretty(message.delivery_info), ))
+ print('Received message: %r' % (body, ))
+ print(' properties:\n%s' % (pretty(message.properties), ))
+ print(' delivery_info:\n%s' % (pretty(message.delivery_info), ))
message.ack()
-with Connection("amqp://guest:guest@localhost:5672//") as connection:
+#: Create a connection and a channel.
+#: If hostname, userid, password and virtual_host is not specified
+#: the values below are the default, but listed here so it can
+#: be easily changed.
+with Connection('amqp://guest:guest@localhost:5672//') as connection:
+
#: Create consumer using our callback and queue.
#: Second argument can also be a list to consume from
#: any number of queues.
- with connection.Consumer(queue, callbacks=[handle_message]):
+ with Consumer(connection, queue, callbacks=[handle_message]):
+
#: This waits for a single event. Note that this event may not
#: be a message, or a message that is to be delivered to the consumers
#: channel, but any event received on the connection.
- connection.drain_events(timeout=10)
+ eventloop(connection, limit=1, timeout=10.0)
diff --git a/examples/complete_receive_manual.py b/examples/complete_receive_manual.py
deleted file mode 100644
index bf618e46..00000000
--- a/examples/complete_receive_manual.py
+++ /dev/null
@@ -1,45 +0,0 @@
-"""
-Example of simple consumer that waits for a single message, acknowledges it
-and exits.
-"""
-
-from kombu import BrokerConnection, Exchange, Queue, Consumer
-from pprint import pformat
-
-#: By default messages sent to exchanges are persistent (delivery_mode=2),
-#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
-
-
-def pretty(obj):
- return pformat(obj, indent=4)
-
-
-#: This is the callback applied when a message is received.
-def handle_message(body, message):
- print("Received message: %r" % (body, ))
- print(" properties:\n%s" % (pretty(message.properties), ))
- print(" delivery_info:\n%s" % (pretty(message.delivery_info), ))
- message.ack()
-
-#: Create a connection and a channel.
-#: If hostname, userid, password and virtual_host is not specified
-#: the values below are the default, but listed here so it can
-#: be easily changed.
-connection = BrokerConnection(hostname="localhost",
- userid="guest",
- password="guest",
- virtual_host="/")
-channel = connection.channel()
-
-#: Create consumer using our callback and queue.
-#: Second argument can also be a list to consume from
-#: any number of queues.
-consumer = Consumer(channel, queue, callbacks=[handle_message])
-consumer.consume()
-
-#: This waits for a single event. Note that this event may not
-#: be a message, or a message that is to be delivered to the consumers
-#: channel, but any event received on the connection.
-connection.drain_events()
diff --git a/examples/complete_send.py b/examples/complete_send.py
index faa6a1a1..c81f8620 100644
--- a/examples/complete_send.py
+++ b/examples/complete_send.py
@@ -7,26 +7,26 @@ You can use `complete_receive.py` to receive the message sent.
"""
from __future__ import with_statement
-from kombu import Connection, Exchange, Queue
+from kombu import Connection, Producer, Exchange, Queue
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
+exchange = Exchange('kombu_demo', type='direct')
+queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
-with Connection("amqp://guest:guest@localhost:5672//") as connection:
+with Connection('amqp://guest:guest@localhost:5672//') as connection:
#: Producers are used to publish messages.
#: a default exchange and routing key can also be specifed
#: as arguments the Producer, but we rather specify this explicitly
#: at the publish call.
- producer = connection.Producer()
+ producer = Producer(connection)
#: Publish the message using the json serializer (which is the default),
#: and zlib compression. The kombu consumer will automatically detect
- #: encoding, serializiation and compression used and decode accordingly.
- producer.publish({"hello": "world"},
+ #: encoding, serialization and compression used and decode accordingly.
+ producer.publish({'hello': 'world'},
exchange=exchange,
- routing_key="kombu_demo",
- serializer="json", compression="zlib")
+ routing_key='kombu_demo',
+ serializer='json', compression='zlib')
diff --git a/examples/complete_send_manual.py b/examples/complete_send_manual.py
deleted file mode 100644
index 9e3b3252..00000000
--- a/examples/complete_send_manual.py
+++ /dev/null
@@ -1,35 +0,0 @@
-"""
-
-Example producer that sends a single message and exits.
-
-You can use `complete_receive.py` to receive the message sent.
-
-"""
-
-from kombu import BrokerConnection, Exchange, Queue, Producer
-
-#: By default messages sent to exchanges are persistent (delivery_mode=2),
-#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
-
-
-#: Create connection and channel.
-#: If hostname, userid, password and virtual_host is not specified
-#: the values below are the default, but listed here so it can
-#: be easily changed.
-connection = BrokerConnection(hostname="localhost",
- userid="guest",
- password="guest",
- virtual_host="/")
-channel = connection.channel()
-
-#: Producers are used to publish messages.
-#: Routing keys can also be specifed as an argument to `publish`.
-producer = Producer(channel, exchange, routing_key="kombu_demo")
-
-#: Publish the message using the json serializer (which is the default),
-#: and zlib compression. The kombu consumer will automatically detect
-#: encoding, serializiation and compression used and decode accordingly.
-producer.publish({"hello": "world"}, serializer="json",
- compression="zlib")
diff --git a/examples/simple_eventlet_receive.py b/examples/simple_eventlet_receive.py
index cce518cc..a8208650 100644
--- a/examples/simple_eventlet_receive.py
+++ b/examples/simple_eventlet_receive.py
@@ -6,12 +6,13 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
+from __future__ import with_statement
+
import eventlet
-from eventlet import spawn
from Queue import Empty
-from kombu import BrokerConnection
+from kombu import Connection
eventlet.monkey_patch()
@@ -22,25 +23,21 @@ def wait_many(timeout=1):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
-
- #: SimpleQueue mimics the interface of the Python Queue module.
- #: First argument can either be a queue name or a kombu.Queue object.
- #: If a name, then the queue will be declared with the name as the queue
- #: name, exchange name and routing key.
- queue = connection.SimpleQueue("kombu_demo")
-
- while True:
- try:
- message = queue.get(block=False, timeout=timeout)
- except Empty:
- break
- else:
- spawn(message.ack)
- print(message.payload)
-
- queue.close()
- connection.close()
-
+ with Connection('amqp://guest:guest@localhost:5672//') as connection:
+
+ #: SimpleQueue mimics the interface of the Python Queue module.
+ #: First argument can either be a queue name or a kombu.Queue object.
+ #: If a name, then the queue will be declared with the name as the
+ #: queue name, exchange name and routing key.
+ with connection.SimpleQueue('kombu_demo') as queue:
+
+ while True:
+ try:
+ message = queue.get(block=False, timeout=timeout)
+ except Empty:
+ break
+ else:
+ message.ack()
+ print(message.payload)
spawn(wait_many).wait()
diff --git a/examples/simple_eventlet_send.py b/examples/simple_eventlet_send.py
index 3d0641e1..f6259bf3 100644
--- a/examples/simple_eventlet_send.py
+++ b/examples/simple_eventlet_send.py
@@ -6,9 +6,11 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
+from __future__ import with_statement
+
import eventlet
-from kombu import BrokerConnection
+from kombu import Connection
eventlet.monkey_patch()
@@ -19,24 +21,22 @@ def send_many(n):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ with Connection('amqp://guest:guest@localhost:5672//') as connection:
- #: SimpleQueue mimics the interface of the Python Queue module.
- #: First argument can either be a queue name or a kombu.Queue object.
- #: If a name, then the queue will be declared with the name as the queue
- #: name, exchange name and routing key.
- queue = connection.SimpleQueue("kombu_demo")
+ #: SimpleQueue mimics the interface of the Python Queue module.
+ #: First argument can either be a queue name or a kombu.Queue object.
+ #: If a name, then the queue will be declared with the name as the
+ #: queue name, exchange name and routing key.
+ with connection.SimpleQueue('kombu_demo') as queue:
- def send_message(i):
- queue.put({"hello": "world%s" % (i, )})
+ def send_message(i):
+ queue.put({'hello': 'world%s' % (i, )})
- pool = eventlet.GreenPool(10)
- for i in xrange(n):
- pool.spawn(send_message, i)
- pool.waitall()
+ pool = eventlet.GreenPool(10)
+ for i in xrange(n):
+ pool.spawn(send_message, i)
+ pool.waitall()
- queue.close()
- connection.close()
-if __name__ == "__main__":
+if __name__ == '__main__':
send_many(10)
diff --git a/examples/simple_receive.py b/examples/simple_receive.py
index 9d53aa35..e9b933b1 100644
--- a/examples/simple_receive.py
+++ b/examples/simple_receive.py
@@ -4,19 +4,19 @@ Example receiving a message using the SimpleQueue interface.
from __future__ import with_statement
-from kombu import BrokerConnection
+from kombu import Connection
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
-with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+with Connection('amqp://guest:guest@localhost:5672//') as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
- with conn.SimpleQueue("kombu_demo") as queue:
+ with conn.SimpleQueue('kombu_demo') as queue:
message = queue.get(block=True, timeout=10)
message.ack()
print(message.payload)
diff --git a/examples/simple_send.py b/examples/simple_send.py
index 76cdb9b5..e76fbcf0 100644
--- a/examples/simple_send.py
+++ b/examples/simple_send.py
@@ -8,20 +8,20 @@ message sent.
"""
from __future__ import with_statement
-from kombu import BrokerConnection
+from kombu import Connection
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
-with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+with Connection('amqp://guest:guest@localhost:5672//') as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
- with conn.SimpleQueue("kombu_demo") as queue:
- queue.put({"hello": "world"}, serializer="json", compression="zlib")
+ with conn.SimpleQueue('kombu_demo') as queue:
+ queue.put({'hello': 'world'}, serializer='json', compression='zlib')
#####
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py
index 1ab6175a..e07b8c45 100644
--- a/examples/simple_task_queue/client.py
+++ b/examples/simple_task_queue/client.py
@@ -5,25 +5,25 @@ from kombu.pools import producers
from queues import task_exchange
-priority_to_routing_key = {"high": "hipri",
- "mid": "midpri",
- "low": "lopri"}
+priority_to_routing_key = {'high': 'hipri',
+ 'mid': 'midpri',
+ 'low': 'lopri'}
-def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"):
- payload = {"fun": fun, "args": args, "kwargs": kwargs}
+def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
+ payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
- producer.publish(payload, serializer="pickle",
- compression="bzip2",
+ producer.publish(payload, serializer='pickle',
+ compression='bzip2',
routing_key=routing_key)
-if __name__ == "__main__":
- from kombu import BrokerConnection
+if __name__ == '__main__':
+ from kombu import Connection
from tasks import hello_task
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
- send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={},
- priority="high")
+ connection = Connection('amqp://guest:guest@localhost:5672//')
+ send_as_task(connection, fun=hello_task, args=('Kombu', ), kwargs={},
+ priority='high')
diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py
index 680e7575..602c2b0e 100644
--- a/examples/simple_task_queue/queues.py
+++ b/examples/simple_task_queue/queues.py
@@ -1,6 +1,6 @@
from kombu import Exchange, Queue
-task_exchange = Exchange("tasks", type="direct")
-task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
- Queue("midpri", task_exchange, routing_key="midpri"),
- Queue("lopri", task_exchange, routing_key="lopri")]
+task_exchange = Exchange('tasks', type='direct')
+task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
+ Queue('midpri', task_exchange, routing_key='midpri'),
+ Queue('lopri', task_exchange, routing_key='lopri')]
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index 063a6b4d..3d933c4b 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -16,23 +16,23 @@ class Worker(ConsumerMixin):
callbacks=[self.process_task])]
def process_task(self, body, message):
- fun = body["fun"]
- args = body["args"]
- kwargs = body["kwargs"]
- self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
+ fun = body['fun']
+ args = body['args']
+ kwargs = body['kwargs']
+ self.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception, exc:
- self.error("task raised exception: %r", exc)
+ self.error('task raised exception: %r', exc)
message.ack()
-if __name__ == "__main__":
- from kombu import BrokerConnection
+if __name__ == '__main__':
+ from kombu import Connection
from kombu.utils.debug import setup_logging
- setup_logging(loglevel="INFO")
+ setup_logging(loglevel='INFO')
- with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+ with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
- print("bye bye")
+ print('bye bye')