summaryrefslogtreecommitdiff
path: root/kombu/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/connection.py')
-rw-r--r--kombu/connection.py78
1 files changed, 51 insertions, 27 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index 87efa1f1..3db99461 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -109,6 +109,7 @@ class Connection(object):
:keyword virtual_host: Default virtual host if not provided in the URL.
:keyword port: Default port if not provided in the URL.
"""
+
port = None
virtual_host = '/'
connect_timeout = 5
@@ -212,16 +213,18 @@ class Connection(object):
self.declared_entities = set()
def switch(self, url):
- """Switch connection parameters to use a new URL (does not
- reconnect)"""
+ """Switch connection parameters to use a new URL.
+
+ Note:
+ Does not reconnect!
+ """
self.close()
self.declared_entities.clear()
self._closed = False
self._init_params(**dict(self._initial_params, **parse_url(url)))
def maybe_switch_next(self):
- """Switch to next URL given by the current failover strategy (if
- any)."""
+ """Switch to next URL given by the current failover strategy."""
if self.cycle:
self.switch(next(self.cycle))
@@ -268,7 +271,9 @@ class Connection(object):
return chan
def heartbeat_check(self, rate=2):
- """Allow the transport to perform any periodic tasks
+ """Check heartbeats.
+
+ Allow the transport to perform any periodic tasks
required to make heartbeats work. This should be called
approximately every second.
@@ -437,8 +442,9 @@ class Connection(object):
def ensure(self, obj, fun, errback=None, max_retries=None,
interval_start=1, interval_step=1, interval_max=1,
on_revive=None):
- """Ensure operation completes, regardless of any channel/connection
- errors occurring.
+ """Ensure operation completes.
+
+ Regardless of any channel/connection errors occurring.
Retries by establishing the connection, and reapplying
the function.
@@ -578,8 +584,7 @@ class Connection(object):
return transport_cls
def clone(self, **kwargs):
- """Create a copy of the connection with the same connection
- settings."""
+ """Create a copy of the connection with same settings."""
return self.__class__(**dict(self._info(resolve=False), **kwargs))
def get_heartbeat_interval(self):
@@ -696,20 +701,20 @@ class Connection(object):
return ChannelPool(self, limit, **kwargs)
def Producer(self, channel=None, *args, **kwargs):
- """Create new :class:`kombu.Producer` instance using this
- connection."""
+ """Create new :class:`kombu.Producer` instance."""
from .messaging import Producer
return Producer(channel or self, *args, **kwargs)
def Consumer(self, queues=None, channel=None, *args, **kwargs):
- """Create new :class:`kombu.Consumer` instance using this
- connection."""
+ """Create new :class:`kombu.Consumer` instance."""
from .messaging import Consumer
return Consumer(channel or self, queues, *args, **kwargs)
def SimpleQueue(self, name, no_ack=None, queue_opts=None,
exchange_opts=None, channel=None, **kwargs):
- """Create new :class:`~kombu.simple.SimpleQueue`, using a channel
+ """Simple persistent queue API.
+
+ Create new :class:`~kombu.simple.SimpleQueue`, using a channel
from this connection.
If ``name`` is a string, a queue and exchange will be automatically
@@ -733,7 +738,9 @@ class Connection(object):
def SimpleBuffer(self, name, no_ack=None, queue_opts=None,
exchange_opts=None, channel=None, **kwargs):
- """Create new :class:`~kombu.simple.SimpleQueue` using a channel
+ """Simple ephemeral queue API.
+
+ Create new :class:`~kombu.simple.SimpleQueue` using a channel
from this connection.
See Also:
@@ -756,11 +763,9 @@ class Connection(object):
return exchange_type in self.transport.implements.exchange_type
def __repr__(self):
- """``x.__repr__() <==> repr(x)``"""
return '<Connection: {0} at {1:#x}>'.format(self.as_uri(), id(self))
def __copy__(self):
- """``x.__copy__() <==> copy(x)``"""
return self.clone()
def __reduce__(self):
@@ -801,8 +806,9 @@ class Connection(object):
@property
def default_channel(self):
- """Default channel, created upon access and closed when the connection
- is closed.
+ """Default channel.
+
+ Created upon access and closed when the connection is closed.
Note:
Can be used for automatic channel handling when you only need one
@@ -829,8 +835,13 @@ class Connection(object):
@cached_property
def manager(self):
- """Experimental manager that can be used to manage/monitor the broker
- instance. Not available for all transports."""
+ """AMQP Management API.
+
+ Experimental manager that can be used to manage/monitor the broker
+ instance.
+
+ Not available for all transports.
+ """
return self.transport.manager
def get_manager(self, *args, **kwargs):
@@ -838,8 +849,11 @@ class Connection(object):
@cached_property
def recoverable_connection_errors(self):
- """List of connection related exceptions that can be recovered from,
- but where the connection must be closed and re-established first."""
+ """Recoverable connection errors.
+
+ List of connection related exceptions that can be recovered from,
+ but where the connection must be closed and re-established first.
+ """
try:
return self.transport.recoverable_connection_errors
except AttributeError:
@@ -851,8 +865,11 @@ class Connection(object):
@cached_property
def recoverable_channel_errors(self):
- """List of channel related exceptions that can be automatically
- recovered from without re-establishing the connection."""
+ """Recoverable channel errors.
+
+ List of channel related exceptions that can be automatically
+ recovered from without re-establishing the connection.
+ """
try:
return self.transport.recoverable_channel_errors
except AttributeError:
@@ -879,6 +896,8 @@ BrokerConnection = Connection
class ConnectionPool(Resource):
+ """Pool of connections."""
+
LimitExceeded = exceptions.ConnectionLimitExceeded
close_after_fork = True
@@ -921,6 +940,8 @@ class ConnectionPool(Resource):
class ChannelPool(Resource):
+ """Pool of channels."""
+
LimitExceeded = exceptions.ChannelLimitExceeded
def __init__(self, connection, limit=None, **kwargs):
@@ -944,8 +965,11 @@ class ChannelPool(Resource):
def maybe_channel(channel):
- """Return the default channel if argument is a connection instance,
- otherwise just return the channel given."""
+ """Get channel from object.
+
+ Return the default channel if argument is a connection instance,
+ otherwise just return the channel given.
+ """
if is_connection(channel):
return channel.default_channel
return channel