kombu.connection

Broker connection and pools.

copyright:
  1. 2009 - 2011 by Ask Solem.
license:

BSD, see LICENSE for more details.

Connection

class kombu.connection.BrokerConnection(hostname='localhost', userid='guest', password='guest', virtual_host='/', port=None, insist=False, ssl=False, transport=None, connect_timeout=5, backend_cls=None)

A connection to the broker.

Parameters:
  • hostname – Hostname/address of the server to connect to. Default is "localhost".
  • userid – Username. Default is "guest".
  • password – Password. Default is "guest".
  • virtual_host – Virtual host. Default is "/".
  • port – Port of the server. Default is transport specific.
  • insist – Insist on connecting to a server. In a configuration with multiple load-sharing servers, the insist option tells the server that the client is insisting on a connection to the specified server. Default is False.
  • ssl – Use ssl to connect to the server. Default is False.
  • transport – Transport class to use. Can be a class, or a string specifying the path to the class. (e.g. kombu.transport.pyamqplib.Transport), or one of the aliases: amqplib, pika, redis, memory.
  • connect_timeout – Timeout in seconds for connecting to the server. May not be suported by the specified transport.

Usage

Creating a connection:

>>> conn = BrokerConnection("rabbit.example.com")

The connection is established lazily when needed. If you need the connection to be established, then force it to do so using connect():

>>> conn.connect()

Remember to always close the connection:

>>> conn.release()

Attributes

connection_errors

List of exceptions that may be raised by the connection.

channel_errors

List of exceptions that may be raised by the channel.

transport
host

The host as a hostname/port pair separated by colon.

connection

The underlying connection object.

Warning

This instance is transport specific, so do not depend on the interface of this object.

Methods

connect()

Establish connection to server immediately.

channel()

Request a new channel.

drain_events(**kwargs)

Wait for a single event from the server.

Parameters:
  • timeout – Timeout in seconds before we give up. Raises socket.timeout if the timeout is execeded.

Usually used from an event loop.

release()

Close the connection (if open).

ensure_connection(errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30)

Ensure we have a connection to the server.

If not retry establishing the connection with the settings specified.

Parameters:
  • errback – Optional callback called each time the connection can’t be established. Arguments provided are the exception raised and the interval that will be slept (exc, interval).
  • max_retries – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
  • interval_start – The number of seconds we start sleeping for.
  • interval_step – How many seconds added to the interval for each retry.
  • interval_max – Maximum number of seconds to sleep between each retry.
ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1)

Ensure operation completes, regardless of any channel/connection errors occuring.

Will retry by establishing the connection, and reapplying the function.

Parameters:
  • fun – Method to apply.
  • errback – Optional callback called each time the connection can’t be established. Arguments provided are the exception raised and the interval that will be slept (exc, interval).
  • max_retries – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
  • interval_start – The number of seconds we start sleeping for.
  • interval_step – How many seconds added to the interval for each retry.
  • interval_max – Maximum number of seconds to sleep between each retry.

Example

This is an example ensuring a publish operation:

>>> def errback(exc, interval):
...     print("Couldn't publish message: %r. Retry in %ds" % (
...             exc, interval))
>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish(message, routing_key)
create_transport()
get_transport_cls()

Get the currently used transport class.

clone(**kwargs)

Create a copy of the connection with the same connection settings.

info()

Get connection info.

Pool(limit=None, preload=None)

Pool of connections.

See ConnectionPool.

Parameters:
  • limit – Maximum number of active connections. Default is no limit.
  • preload – Number of connections to preload when the pool is created. Default is 0.

Example usage:

>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
ChannelPool(limit=None, preload=None)

Pool of channels.

See ChannelPool.

Parameters:
  • limit – Maximum number of active channels. Default is no limit.
  • preload – Number of channels to preload when the pool is created. Default is 0.

Example usage:

>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
SimpleQueue(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)

Create new SimpleQueue, using a channel from this connection.

If name is a string, a queue and exchange will be automatically created using that name as the name of the queue and exchange, also it will be used as the default routing key.

Parameters:
  • name – Name of the queue/or a Queue.
  • no_ack – Disable acknowledgements. Default is false.
  • queue_opts – Additional keyword arguments passed to the constructor of the automatically created Queue.
  • exchange_opts – Additional keyword arguments passed to the constructor of the automatically created Exchange.
  • channel – Channel to use. If not specified a new channel from the current connection will be used. Remember to call close() when done with the object.
SimpleBuffer(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)

Create new SimpleQueue using a channel from this connection.

Same as SimpleQueue(), but configured with buffering semantics. The resulting queue and exchange will not be durable, also auto delete is enabled. Messages will be transient (not persistent), and acknowledgements are disabled (no_ack).

Pools

See also

The shortcut methods BrokerConnection.Pool() and BrokerConnection.ChannelPool() is the recommended way to instantiate these classes.

class kombu.connection.ConnectionPool(connection, limit=None, preload=None)
LimitExceeded

Maximum number of simultaneous connections exceeded.

acquire(block=False, timeout=None)

Acquire resource.

Parameters:
  • block – If the limit is exceeded, block until there is an available item.
  • timeout – Timeout to wait if block is true. Default is None (forever).
Raises LimitExceeded:
 

if block is false and the limit has been exceeded.

release(resource)

Release resource so it can be used by another thread.

The caller is responsible for discarding the object, and to never use the resource again. A new resource must be acquired if so needed.

class kombu.connection.ChannelPool(connection, limit=None, preload=None)
LimitExceeded

Maximum number of simultaenous channels exceeded.

acquire(block=False, timeout=None)

Acquire resource.

Parameters:
  • block – If the limit is exceeded, block until there is an available item.
  • timeout – Timeout to wait if block is true. Default is None (forever).
Raises LimitExceeded:
 

if block is false and the limit has been exceeded.

release(resource)

Release resource so it can be used by another thread.

The caller is responsible for discarding the object, and to never use the resource again. A new resource must be acquired if so needed.

Table Of Contents

Previous topic

API Reference

Next topic

kombu.simple

This Page