diff options
Diffstat (limited to 'kombu/connection.py')
-rw-r--r-- | kombu/connection.py | 78 |
1 files changed, 51 insertions, 27 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index 87efa1f1..3db99461 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -109,6 +109,7 @@ class Connection(object): :keyword virtual_host: Default virtual host if not provided in the URL. :keyword port: Default port if not provided in the URL. """ + port = None virtual_host = '/' connect_timeout = 5 @@ -212,16 +213,18 @@ class Connection(object): self.declared_entities = set() def switch(self, url): - """Switch connection parameters to use a new URL (does not - reconnect)""" + """Switch connection parameters to use a new URL. + + Note: + Does not reconnect! + """ self.close() self.declared_entities.clear() self._closed = False self._init_params(**dict(self._initial_params, **parse_url(url))) def maybe_switch_next(self): - """Switch to next URL given by the current failover strategy (if - any).""" + """Switch to next URL given by the current failover strategy.""" if self.cycle: self.switch(next(self.cycle)) @@ -268,7 +271,9 @@ class Connection(object): return chan def heartbeat_check(self, rate=2): - """Allow the transport to perform any periodic tasks + """Check heartbeats. + + Allow the transport to perform any periodic tasks required to make heartbeats work. This should be called approximately every second. @@ -437,8 +442,9 @@ class Connection(object): def ensure(self, obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None): - """Ensure operation completes, regardless of any channel/connection - errors occurring. + """Ensure operation completes. + + Regardless of any channel/connection errors occurring. Retries by establishing the connection, and reapplying the function. @@ -578,8 +584,7 @@ class Connection(object): return transport_cls def clone(self, **kwargs): - """Create a copy of the connection with the same connection - settings.""" + """Create a copy of the connection with same settings.""" return self.__class__(**dict(self._info(resolve=False), **kwargs)) def get_heartbeat_interval(self): @@ -696,20 +701,20 @@ class Connection(object): return ChannelPool(self, limit, **kwargs) def Producer(self, channel=None, *args, **kwargs): - """Create new :class:`kombu.Producer` instance using this - connection.""" + """Create new :class:`kombu.Producer` instance.""" from .messaging import Producer return Producer(channel or self, *args, **kwargs) def Consumer(self, queues=None, channel=None, *args, **kwargs): - """Create new :class:`kombu.Consumer` instance using this - connection.""" + """Create new :class:`kombu.Consumer` instance.""" from .messaging import Consumer return Consumer(channel or self, queues, *args, **kwargs) def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs): - """Create new :class:`~kombu.simple.SimpleQueue`, using a channel + """Simple persistent queue API. + + Create new :class:`~kombu.simple.SimpleQueue`, using a channel from this connection. If ``name`` is a string, a queue and exchange will be automatically @@ -733,7 +738,9 @@ class Connection(object): def SimpleBuffer(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs): - """Create new :class:`~kombu.simple.SimpleQueue` using a channel + """Simple ephemeral queue API. + + Create new :class:`~kombu.simple.SimpleQueue` using a channel from this connection. See Also: @@ -756,11 +763,9 @@ class Connection(object): return exchange_type in self.transport.implements.exchange_type def __repr__(self): - """``x.__repr__() <==> repr(x)``""" return '<Connection: {0} at {1:#x}>'.format(self.as_uri(), id(self)) def __copy__(self): - """``x.__copy__() <==> copy(x)``""" return self.clone() def __reduce__(self): @@ -801,8 +806,9 @@ class Connection(object): @property def default_channel(self): - """Default channel, created upon access and closed when the connection - is closed. + """Default channel. + + Created upon access and closed when the connection is closed. Note: Can be used for automatic channel handling when you only need one @@ -829,8 +835,13 @@ class Connection(object): @cached_property def manager(self): - """Experimental manager that can be used to manage/monitor the broker - instance. Not available for all transports.""" + """AMQP Management API. + + Experimental manager that can be used to manage/monitor the broker + instance. + + Not available for all transports. + """ return self.transport.manager def get_manager(self, *args, **kwargs): @@ -838,8 +849,11 @@ class Connection(object): @cached_property def recoverable_connection_errors(self): - """List of connection related exceptions that can be recovered from, - but where the connection must be closed and re-established first.""" + """Recoverable connection errors. + + List of connection related exceptions that can be recovered from, + but where the connection must be closed and re-established first. + """ try: return self.transport.recoverable_connection_errors except AttributeError: @@ -851,8 +865,11 @@ class Connection(object): @cached_property def recoverable_channel_errors(self): - """List of channel related exceptions that can be automatically - recovered from without re-establishing the connection.""" + """Recoverable channel errors. + + List of channel related exceptions that can be automatically + recovered from without re-establishing the connection. + """ try: return self.transport.recoverable_channel_errors except AttributeError: @@ -879,6 +896,8 @@ BrokerConnection = Connection class ConnectionPool(Resource): + """Pool of connections.""" + LimitExceeded = exceptions.ConnectionLimitExceeded close_after_fork = True @@ -921,6 +940,8 @@ class ConnectionPool(Resource): class ChannelPool(Resource): + """Pool of channels.""" + LimitExceeded = exceptions.ChannelLimitExceeded def __init__(self, connection, limit=None, **kwargs): @@ -944,8 +965,11 @@ class ChannelPool(Resource): def maybe_channel(channel): - """Return the default channel if argument is a connection instance, - otherwise just return the channel given.""" + """Get channel from object. + + Return the default channel if argument is a connection instance, + otherwise just return the channel given. + """ if is_connection(channel): return channel.default_channel return channel |