summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile7
-rw-r--r--kombu/__init__.py3
-rw-r--r--kombu/abstract.py10
-rw-r--r--kombu/async/aws/__init__.py1
-rw-r--r--kombu/async/aws/connection.py11
-rw-r--r--kombu/async/aws/ext.py2
-rw-r--r--kombu/async/aws/sqs/__init__.py2
-rw-r--r--kombu/async/aws/sqs/connection.py2
-rw-r--r--kombu/async/aws/sqs/ext.py1
-rw-r--r--kombu/async/aws/sqs/message.py12
-rw-r--r--kombu/async/aws/sqs/queue.py3
-rw-r--r--kombu/async/debug.py6
-rw-r--r--kombu/async/http/__init__.py2
-rw-r--r--kombu/async/http/base.py10
-rw-r--r--kombu/async/http/curl.py3
-rw-r--r--kombu/async/hub.py4
-rw-r--r--kombu/async/semaphore.py7
-rw-r--r--kombu/async/timer.py16
-rw-r--r--kombu/clocks.py7
-rw-r--r--kombu/common.py18
-rw-r--r--kombu/compat.py6
-rw-r--r--kombu/connection.py78
-rw-r--r--kombu/entity.py10
-rw-r--r--kombu/exceptions.py25
-rw-r--r--kombu/log.py5
-rw-r--r--kombu/message.py9
-rw-r--r--kombu/messaging.py7
-rw-r--r--kombu/mixins.py13
-rw-r--r--kombu/pidbox.py3
-rw-r--r--kombu/pools.py9
-rw-r--r--kombu/resource.py9
-rw-r--r--kombu/serialization.py27
-rw-r--r--kombu/simple.py6
-rw-r--r--kombu/transport/SLMQ.py8
-rw-r--r--kombu/transport/SQS.py5
-rw-r--r--kombu/transport/__init__.py8
-rw-r--r--kombu/transport/base.py13
-rw-r--r--kombu/transport/consul.py17
-rw-r--r--kombu/transport/etcd.py40
-rw-r--r--kombu/transport/filesystem.py11
-rw-r--r--kombu/transport/librabbitmq.py7
-rw-r--r--kombu/transport/memory.py4
-rw-r--r--kombu/transport/mongodb.py16
-rw-r--r--kombu/transport/pyamqp.py10
-rw-r--r--kombu/transport/pyro.py3
-rw-r--r--kombu/transport/qpid.py34
-rw-r--r--kombu/transport/redis.py19
-rw-r--r--kombu/transport/virtual/base.py34
-rw-r--r--kombu/transport/virtual/exchange.py31
-rw-r--r--kombu/transport/zookeeper.py3
-rw-r--r--kombu/utils/__init__.py2
-rw-r--r--kombu/utils/amq_manager.py2
-rw-r--r--kombu/utils/collections.py10
-rw-r--r--kombu/utils/compat.py18
-rw-r--r--kombu/utils/debug.py3
-rw-r--r--kombu/utils/div.py2
-rw-r--r--kombu/utils/encoding.py15
-rw-r--r--kombu/utils/eventio.py9
-rw-r--r--kombu/utils/functional.py14
-rw-r--r--kombu/utils/json.py6
-rw-r--r--kombu/utils/limits.py3
-rw-r--r--kombu/utils/objects.py6
-rw-r--r--kombu/utils/scheduling.py25
-rw-r--r--kombu/utils/text.py15
-rw-r--r--kombu/utils/url.py84
-rw-r--r--kombu/utils/uuid.py4
-rw-r--r--setup.cfg5
-rw-r--r--tox.ini20
68 files changed, 595 insertions, 245 deletions
diff --git a/Makefile b/Makefile
index 60d6a10d..4082e006 100644
--- a/Makefile
+++ b/Makefile
@@ -5,6 +5,7 @@ PYTEST=py.test
GIT=git
TOX=tox
ICONV=iconv
+PYDOCSTYLE=pydocstyle
FLAKE8=flake8
FLAKEPLUS=flakeplus
SPHINX2RST=sphinx2rst
@@ -35,6 +36,7 @@ help:
@echo " flakes -------- - Check code for syntax and style errors."
@echo " flakecheck - Run flake8 on the source code."
@echo " flakepluscheck - Run flakeplus on the source code."
+ @echo " pep257check - Run pep257 on the source code."
@echo "readme - Regenerate README.rst file."
@echo "contrib - Regenerate CONTRIBUTING.rst file"
@echo "clean-dist --------- - Clean all distribution build artifacts."
@@ -95,7 +97,10 @@ flakepluscheck:
flakeplusdiag:
-$(MAKE) flakepluscheck
-flakes: flakediag flakeplusdiag
+pep257check:
+ $(PYDOCSTYLE) --ignore=D102,D104,D203,D105 "$(PROJ)"
+
+flakes: flakediag flakeplusdiag pep257check
clean-readme:
-rm -f $(README)
diff --git a/kombu/__init__.py b/kombu/__init__.py
index ca6d2487..ab2789ba 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -1,4 +1,4 @@
-"""Messaging library for Python"""
+"""Messaging library for Python."""
from __future__ import absolute_import, unicode_literals
import os
@@ -74,6 +74,7 @@ for module, items in all_by_module.items():
class module(ModuleType):
+ """Customized Python module."""
def __getattr__(self, name):
if name in object_origins:
diff --git a/kombu/abstract.py b/kombu/abstract.py
index e120fbd6..b8c80c0b 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -20,8 +20,11 @@ def _any(v):
class Object(object):
- """Common base class supporting automatic kwargs->attributes handling,
- and cloning."""
+ """Common base class.
+
+ Supports automatic kwargs->attributes handling, and cloning.
+ """
+
attrs = ()
def __init__(self, *args, **kwargs):
@@ -54,6 +57,7 @@ class Object(object):
@python_2_unicode_compatible
class MaybeChannelBound(Object):
"""Mixin for classes that can be bound to an AMQP channel."""
+
_channel = None
_is_bound = False
@@ -61,7 +65,7 @@ class MaybeChannelBound(Object):
can_cache_declaration = False
def __call__(self, channel):
- """`self(channel) -> self.bind(channel)`"""
+ """`self(channel) -> self.bind(channel)`."""
return self.bind(channel)
def bind(self, channel):
diff --git a/kombu/async/aws/__init__.py b/kombu/async/aws/__init__.py
index 7872cdb9..0bdc37af 100644
--- a/kombu/async/aws/__init__.py
+++ b/kombu/async/aws/__init__.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import, unicode_literals
def connect_sqs(aws_access_key_id=None, aws_secret_access_key=None, **kwargs):
+ """Return async connection to Amazon SQS."""
from .sqs.connection import AsyncSQSConnection
return AsyncSQSConnection(
aws_access_key_id, aws_secret_access_key, **kwargs
diff --git a/kombu/async/aws/connection.py b/kombu/async/aws/connection.py
index 861f993c..cc907d7b 100644
--- a/kombu/async/aws/connection.py
+++ b/kombu/async/aws/connection.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""Amazon AWS Connection."""
from __future__ import absolute_import, unicode_literals
from io import BytesIO
@@ -16,7 +17,7 @@ try:
from urllib.parse import urlunsplit
except ImportError:
from urlparse import urlunsplit # noqa
-from xml.sax import parseString as sax_parse
+from xml.sax import parseString as sax_parse # noqa
try: # pragma: no cover
from email import message_from_file
@@ -36,6 +37,7 @@ __all__ = [
@python_2_unicode_compatible
class AsyncHTTPResponse(object):
+ """Async HTTP Response."""
def __init__(self, response):
self.response = response
@@ -77,6 +79,8 @@ class AsyncHTTPResponse(object):
@python_2_unicode_compatible
class AsyncHTTPConnection(object):
+ """Async HTTP Connection."""
+
Request = Request
Response = AsyncHTTPResponse
@@ -154,10 +158,13 @@ class AsyncHTTPConnection(object):
class AsyncHTTPSConnection(AsyncHTTPConnection):
+ """Async HTTPS Connection."""
+
scheme = 'https'
class AsyncConnection(object):
+ """Async AWS Connection."""
def __init__(self, http_client=None, **kwargs):
if boto is None:
@@ -193,6 +200,7 @@ class AsyncConnection(object):
class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection):
+ """Async AWS Authn Connection."""
def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
@@ -208,6 +216,7 @@ class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection):
class AsyncAWSQueryConnection(AsyncConnection, AWSQueryConnection):
+ """Async AWS Query Connection."""
def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
diff --git a/kombu/async/aws/ext.py b/kombu/async/aws/ext.py
index df8fe862..b0e497cb 100644
--- a/kombu/async/aws/ext.py
+++ b/kombu/async/aws/ext.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-
+"""Amazon boto interface."""
from __future__ import absolute_import, unicode_literals
try:
diff --git a/kombu/async/aws/sqs/__init__.py b/kombu/async/aws/sqs/__init__.py
index 7491cb4c..fe529584 100644
--- a/kombu/async/aws/sqs/__init__.py
+++ b/kombu/async/aws/sqs/__init__.py
@@ -9,12 +9,14 @@ __all__ = ['regions', 'connect_to_region']
def regions():
+ """Return list of known AWS regions."""
if boto is None:
raise ImportError('boto is not installed')
return get_regions('sqs', connection_cls=AsyncSQSConnection)
def connect_to_region(region_name, **kwargs):
+ """Connect to specific AWS region."""
for region in regions():
if region.name == region_name:
return region.connect(**kwargs)
diff --git a/kombu/async/aws/sqs/connection.py b/kombu/async/aws/sqs/connection.py
index 4b95698c..32b9926b 100644
--- a/kombu/async/aws/sqs/connection.py
+++ b/kombu/async/aws/sqs/connection.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""Amazon SQS Connection."""
from __future__ import absolute_import, unicode_literals
from vine import transform
@@ -15,6 +16,7 @@ __all__ = ['AsyncSQSConnection']
class AsyncSQSConnection(AsyncAWSQueryConnection, SQSConnection):
+ """Async SQS Connection."""
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
diff --git a/kombu/async/aws/sqs/ext.py b/kombu/async/aws/sqs/ext.py
index 93ce7963..eb48f3e9 100644
--- a/kombu/async/aws/sqs/ext.py
+++ b/kombu/async/aws/sqs/ext.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""Amazon SQS boto interface."""
from __future__ import absolute_import, unicode_literals
diff --git a/kombu/async/aws/sqs/message.py b/kombu/async/aws/sqs/message.py
index f9b4d6d3..36359841 100644
--- a/kombu/async/aws/sqs/message.py
+++ b/kombu/async/aws/sqs/message.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""Amazon SQS message implementation."""
from __future__ import absolute_import, unicode_literals
from .ext import (
@@ -12,6 +13,7 @@ __all__ = [
class BaseAsyncMessage(object):
+ """Base class for messages received on async client."""
def delete(self, callback=None):
if self.queue:
@@ -25,20 +27,20 @@ class BaseAsyncMessage(object):
class AsyncRawMessage(BaseAsyncMessage, RawMessage):
- pass
+ """Raw Message."""
class AsyncMessage(BaseAsyncMessage, Message):
- pass
+ """Serialized message."""
class AsyncMHMessage(BaseAsyncMessage, MHMessage):
- pass
+ """MHM Message (uhm, look that up later)."""
class AsyncEncodedMHMessage(BaseAsyncMessage, EncodedMHMessage):
- pass
+ """Encoded MH Message."""
class AsyncJSONMessage(BaseAsyncMessage, JSONMessage):
- pass
+ """Json serialized message."""
diff --git a/kombu/async/aws/sqs/queue.py b/kombu/async/aws/sqs/queue.py
index b5237c1d..140ff31a 100644
--- a/kombu/async/aws/sqs/queue.py
+++ b/kombu/async/aws/sqs/queue.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""Amazon SQS queue implementation."""
from __future__ import absolute_import, unicode_literals
from vine import transform
@@ -10,10 +11,12 @@ _all__ = ['AsyncQueue']
def list_first(rs):
+ """Get the first item in a list, or None if list empty."""
return rs[0] if len(rs) == 1 else None
class AsyncQueue(_Queue):
+ """Async SQS Queue."""
def __init__(self, connection=None, url=None, message_class=AsyncMessage):
self.connection = connection
diff --git a/kombu/async/debug.py b/kombu/async/debug.py
index 6385d233..0e5ffbe0 100644
--- a/kombu/async/debug.py
+++ b/kombu/async/debug.py
@@ -7,6 +7,7 @@ from kombu.utils.functional import reprcall
def repr_flag(flag):
+ """Return description of event loop flag."""
return '{0}{1}{2}'.format('R' if flag & READ else '',
'W' if flag & WRITE else '',
'!' if flag & ERR else '')
@@ -24,10 +25,12 @@ def _rcb(obj):
def repr_active(h):
+ """Return description of active readers and writers."""
return ', '.join(repr_readers(h) + repr_writers(h))
def repr_events(h, events):
+ """Return description of events returned by poll."""
return ', '.join(
'{0}({1})->{2}'.format(
_rcb(callback_for(h, fd, fl, '(GONE)')), fd,
@@ -38,16 +41,19 @@ def repr_events(h, events):
def repr_readers(h):
+ """Return description of pending readers."""
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
for fd, cb in items(h.readers)]
def repr_writers(h):
+ """Return description of pending writers."""
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
for fd, cb in items(h.writers)]
def callback_for(h, fd, flag, *default):
+ """Return the callback used for hub+fd+flag."""
try:
if flag & READ:
return h.readers[fd]
diff --git a/kombu/async/http/__init__.py b/kombu/async/http/__init__.py
index 1401395e..b64c7bd8 100644
--- a/kombu/async/http/__init__.py
+++ b/kombu/async/http/__init__.py
@@ -8,11 +8,13 @@ __all__ = ['Client', 'Headers', 'Response', 'Request']
def Client(hub=None, **kwargs):
+ """Create new HTTP client."""
from .curl import CurlClient
return CurlClient(hub, **kwargs)
def get_client(hub=None, **kwargs):
+ """Get or create HTTP client bound to the current event loop."""
hub = hub or get_event_loop()
try:
return hub._current_http_client
diff --git a/kombu/async/http/base.py b/kombu/async/http/base.py
index 92c6bdf4..f8a8bc0a 100644
--- a/kombu/async/http/base.py
+++ b/kombu/async/http/base.py
@@ -1,3 +1,4 @@
+"""Base async HTTP client implementation."""
from __future__ import absolute_import, unicode_literals
import sys
@@ -26,6 +27,8 @@ def normalize_header(key):
class Headers(dict):
+ """Represents a mapping of HTTP headers."""
+
# TODO: This is just a regular dict and will not perform normalization
# when looking up keys etc.
@@ -176,8 +179,11 @@ class Response(object):
self.error = HttpError(self.code, self.status, self)
def raise_for_error(self):
- """Raise :class:`~kombu.exceptions.HttpError` if the request resulted
- in a HTTP error code."""
+ """Raise if the request resulted in an HTTP error code.
+
+ Raises:
+ :class:`~kombu.exceptions.HttpError`
+ """
if self.error:
raise self.error
diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py
index c9d8f215..5200493d 100644
--- a/kombu/async/http/curl.py
+++ b/kombu/async/http/curl.py
@@ -1,3 +1,4 @@
+"""HTTP Client using pyCurl."""
from __future__ import absolute_import, unicode_literals
from collections import deque
@@ -33,6 +34,8 @@ EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH'])
class CurlClient(BaseClient):
+ """Curl HTTP Client."""
+
Curl = Curl
def __init__(self, hub=None, max_clients=10):
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 5e3da78d..348e1f4a 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -6,7 +6,7 @@ import errno
from contextlib import contextmanager
from time import sleep
-from types import GeneratorType as generator
+from types import GeneratorType as generator # noqa
from vine import Thenable, promise
@@ -42,10 +42,12 @@ def _dummy_context(*args, **kwargs):
def get_event_loop():
+ """Get current event loop object."""
return _current_loop
def set_event_loop(loop):
+ """Set the current event loop object."""
global _current_loop
_current_loop = loop
return loop
diff --git a/kombu/async/semaphore.py b/kombu/async/semaphore.py
index 66015dc3..af40cc03 100644
--- a/kombu/async/semaphore.py
+++ b/kombu/async/semaphore.py
@@ -43,8 +43,11 @@ class LaxBoundedSemaphore(object):
self._pop_waiter = self._waiting.popleft
def acquire(self, callback, *partial_args, **partial_kwargs):
- """Acquire semaphore, applying ``callback`` if
- the resource is available.
+ """Acquire semaphore.
+
+ This will immediately apply ``callback`` if
+ the resource is available, otherwise the callback is suspended
+ until the semaphore is released.
Arguments:
callback (Callable): The callback to apply.
diff --git a/kombu/async/timer.py b/kombu/async/timer.py
index 2c7d91c7..55f7bd6e 100644
--- a/kombu/async/timer.py
+++ b/kombu/async/timer.py
@@ -33,6 +33,10 @@ scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry'))
def to_timestamp(d, default_timezone=utc, time=monotonic):
+ """Convert datetime to timestamp.
+
+ If d' is already a timestamp, then that will be used.
+ """
if isinstance(d, datetime):
if d.tzinfo is None:
d = d.replace(tzinfo=default_timezone)
@@ -44,6 +48,8 @@ def to_timestamp(d, default_timezone=utc, time=monotonic):
@total_ordering
@python_2_unicode_compatible
class Entry(object):
+ """Schedule Entry."""
+
if not IS_PYPY: # pragma: no cover
__slots__ = (
'fun', 'args', 'kwargs', 'tref', 'canceled',
@@ -85,7 +91,8 @@ class Entry(object):
class Timer(object):
- """ETA scheduler."""
+ """Async timer implementation."""
+
Entry = Entry
on_error = None
@@ -171,9 +178,12 @@ class Timer(object):
def __iter__(self, min=min, nowfun=monotonic,
pop=heapq.heappop, push=heapq.heappush):
- """This iterator yields a tuple of ``(entry, wait_seconds)``,
+ """Iterate over schedule.
+
+ This iterator yields a tuple of ``(entry, wait_seconds)``,
where if entry is :const:`None` the caller should wait
- for ``wait_seconds`` until it polls the schedule again."""
+ for ``wait_seconds`` until it polls the schedule again.
+ """
max_interval = self.max_interval
queue = self._queue
diff --git a/kombu/clocks.py b/kombu/clocks.py
index a4610ac6..3de05c0f 100644
--- a/kombu/clocks.py
+++ b/kombu/clocks.py
@@ -24,6 +24,7 @@ class timetuple(tuple):
id (str): Event host id (e.g. ``hostname:pid``).
obj (Any): Optional obj to associate with this event.
"""
+
__slots__ = ()
def __new__(cls, clock, timestamp, id, obj=None):
@@ -99,6 +100,7 @@ class LamportClock(object):
the time stamp of the incoming message.
"""
+
#: The clocks current value.
value = 0
@@ -117,7 +119,9 @@ class LamportClock(object):
return self.value
def sort_heap(self, h):
- """List of tuples containing at least two elements, representing
+ """Sort heap of events.
+
+ List of tuples containing at least two elements, representing
an event, where the first element is the event's scalar clock value,
and the second element is the id of the process (usually
``"hostname:pid"``): ``sh([(clock, processid, ...?), (...)])``
@@ -129,7 +133,6 @@ class LamportClock(object):
present.
Will return the latest event.
-
"""
if h[0][0] == h[1][0]:
same = []
diff --git a/kombu/common.py b/kombu/common.py
index fa75cd9f..1005b405 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -58,7 +58,9 @@ def oid_from(instance):
class Broadcast(Queue):
- """Convenience class used to define broadcast queues.
+ """Broadcast queue.
+
+ Convenience class used to define broadcast queues.
Every queue instance will have a unique name,
and both the queue and exchange is configured with auto deletion.
@@ -71,6 +73,7 @@ class Broadcast(Queue):
**kwargs (Any): See :class:`~kombu.Queue` for a list
of additional keyword arguments supported.
"""
+
attrs = Queue.attrs + (('queue', None),)
def __init__(self, name=None, queue=None, auto_delete=True,
@@ -92,6 +95,7 @@ def declaration_cached(entity, channel):
def maybe_declare(entity, channel=None, retry=False, **retry_policy):
+ """Declare entity (cached)."""
is_bound = entity.is_bound
orig = entity
@@ -136,6 +140,7 @@ def _imaybe_declare(entity, declared, ident, channel,
def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
+ """Drain messages from consumer instance."""
acc = deque()
def on_message(body, message):
@@ -154,6 +159,7 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
def itermessages(conn, channel, queue, limit=1, timeout=None,
callbacks=None, **kwargs):
+ """Iterator over messages."""
return drain_consumer(
conn.Consumer(queues=[queue], channel=channel, **kwargs),
limit=limit, timeout=timeout, callbacks=callbacks,
@@ -222,7 +228,7 @@ def send_reply(exchange, req, msg,
def collect_replies(conn, channel, queue, *args, **kwargs):
- """Generator collecting replies from ``queue``"""
+ """Generator collecting replies from ``queue``."""
no_ack = kwargs.setdefault('no_ack', True)
received = False
try:
@@ -291,8 +297,11 @@ def revive_connection(connection, channel, on_revive=None):
def insured(pool, fun, args, kwargs, errback=None, on_revive=None, **opts):
- """Ensures function performing broker commands completes
- despite intermittent connection failures."""
+ """Function wrapper to handle connection errors.
+
+ Ensures function performing broker commands completes
+ despite intermittent connection failures.
+ """
errback = errback or _ensure_errback
with pool.acquire(block=True) as conn:
@@ -347,6 +356,7 @@ class QoS(object):
... print('prefetch count now: %r' % (prefetch_count,))
>>> QoS(set_qos, 10)
"""
+
prev = None
def __init__(self, callback, initial_value):
diff --git a/kombu/compat.py b/kombu/compat.py
index 322b27ff..0ffdf6e1 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -1,4 +1,4 @@
-"""Carrot compatible interface
+"""Carrot compatibility interface.
See http://packages.python.org/pypi/carrot for documentation.
"""
@@ -25,6 +25,8 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None):
class Publisher(messaging.Producer):
+ """Carrot compatible producer."""
+
exchange = ''
exchange_type = 'direct'
routing_key = ''
@@ -74,6 +76,8 @@ class Publisher(messaging.Producer):
class Consumer(messaging.Consumer):
+ """Carrot compatible consumer."""
+
queue = ''
exchange = ''
routing_key = ''
diff --git a/kombu/connection.py b/kombu/connection.py
index 87efa1f1..3db99461 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -109,6 +109,7 @@ class Connection(object):
:keyword virtual_host: Default virtual host if not provided in the URL.
:keyword port: Default port if not provided in the URL.
"""
+
port = None
virtual_host = '/'
connect_timeout = 5
@@ -212,16 +213,18 @@ class Connection(object):
self.declared_entities = set()
def switch(self, url):
- """Switch connection parameters to use a new URL (does not
- reconnect)"""
+ """Switch connection parameters to use a new URL.
+
+ Note:
+ Does not reconnect!
+ """
self.close()
self.declared_entities.clear()
self._closed = False
self._init_params(**dict(self._initial_params, **parse_url(url)))
def maybe_switch_next(self):
- """Switch to next URL given by the current failover strategy (if
- any)."""
+ """Switch to next URL given by the current failover strategy."""
if self.cycle:
self.switch(next(self.cycle))
@@ -268,7 +271,9 @@ class Connection(object):
return chan
def heartbeat_check(self, rate=2):
- """Allow the transport to perform any periodic tasks
+ """Check heartbeats.
+
+ Allow the transport to perform any periodic tasks
required to make heartbeats work. This should be called
approximately every second.
@@ -437,8 +442,9 @@ class Connection(object):
def ensure(self, obj, fun, errback=None, max_retries=None,
interval_start=1, interval_step=1, interval_max=1,
on_revive=None):
- """Ensure operation completes, regardless of any channel/connection
- errors occurring.
+ """Ensure operation completes.
+
+ Regardless of any channel/connection errors occurring.
Retries by establishing the connection, and reapplying
the function.
@@ -578,8 +584,7 @@ class Connection(object):
return transport_cls
def clone(self, **kwargs):
- """Create a copy of the connection with the same connection
- settings."""
+ """Create a copy of the connection with same settings."""
return self.__class__(**dict(self._info(resolve=False), **kwargs))
def get_heartbeat_interval(self):
@@ -696,20 +701,20 @@ class Connection(object):
return ChannelPool(self, limit, **kwargs)
def Producer(self, channel=None, *args, **kwargs):
- """Create new :class:`kombu.Producer` instance using this
- connection."""
+ """Create new :class:`kombu.Producer` instance."""
from .messaging import Producer
return Producer(channel or self, *args, **kwargs)
def Consumer(self, queues=None, channel=None, *args, **kwargs):
- """Create new :class:`kombu.Consumer` instance using this
- connection."""
+ """Create new :class:`kombu.Consumer` instance."""
from .messaging import Consumer
return Consumer(channel or self, queues, *args, **kwargs)
def SimpleQueue(self, name, no_ack=None, queue_opts=None,
exchange_opts=None, channel=None, **kwargs):
- """Create new :class:`~kombu.simple.SimpleQueue`, using a channel
+ """Simple persistent queue API.
+
+ Create new :class:`~kombu.simple.SimpleQueue`, using a channel
from this connection.
If ``name`` is a string, a queue and exchange will be automatically
@@ -733,7 +738,9 @@ class Connection(object):
def SimpleBuffer(self, name, no_ack=None, queue_opts=None,
exchange_opts=None, channel=None, **kwargs):
- """Create new :class:`~kombu.simple.SimpleQueue` using a channel
+ """Simple ephemeral queue API.
+
+ Create new :class:`~kombu.simple.SimpleQueue` using a channel
from this connection.
See Also:
@@ -756,11 +763,9 @@ class Connection(object):
return exchange_type in self.transport.implements.exchange_type
def __repr__(self):
- """``x.__repr__() <==> repr(x)``"""
return '<Connection: {0} at {1:#x}>'.format(self.as_uri(), id(self))
def __copy__(self):
- """``x.__copy__() <==> copy(x)``"""
return self.clone()
def __reduce__(self):
@@ -801,8 +806,9 @@ class Connection(object):
@property
def default_channel(self):
- """Default channel, created upon access and closed when the connection
- is closed.
+ """Default channel.
+
+ Created upon access and closed when the connection is closed.
Note:
Can be used for automatic channel handling when you only need one
@@ -829,8 +835,13 @@ class Connection(object):
@cached_property
def manager(self):
- """Experimental manager that can be used to manage/monitor the broker
- instance. Not available for all transports."""
+ """AMQP Management API.
+
+ Experimental manager that can be used to manage/monitor the broker
+ instance.
+
+ Not available for all transports.
+ """
return self.transport.manager
def get_manager(self, *args, **kwargs):
@@ -838,8 +849,11 @@ class Connection(object):
@cached_property
def recoverable_connection_errors(self):
- """List of connection related exceptions that can be recovered from,
- but where the connection must be closed and re-established first."""
+ """Recoverable connection errors.
+
+ List of connection related exceptions that can be recovered from,
+ but where the connection must be closed and re-established first.
+ """
try:
return self.transport.recoverable_connection_errors
except AttributeError:
@@ -851,8 +865,11 @@ class Connection(object):
@cached_property
def recoverable_channel_errors(self):
- """List of channel related exceptions that can be automatically
- recovered from without re-establishing the connection."""
+ """Recoverable channel errors.
+
+ List of channel related exceptions that can be automatically
+ recovered from without re-establishing the connection.
+ """
try:
return self.transport.recoverable_channel_errors
except AttributeError:
@@ -879,6 +896,8 @@ BrokerConnection = Connection
class ConnectionPool(Resource):
+ """Pool of connections."""
+
LimitExceeded = exceptions.ConnectionLimitExceeded
close_after_fork = True
@@ -921,6 +940,8 @@ class ConnectionPool(Resource):
class ChannelPool(Resource):
+ """Pool of channels."""
+
LimitExceeded = exceptions.ChannelLimitExceeded
def __init__(self, connection, limit=None, **kwargs):
@@ -944,8 +965,11 @@ class ChannelPool(Resource):
def maybe_channel(channel):
- """Return the default channel if argument is a connection instance,
- otherwise just return the channel given."""
+ """Get channel from object.
+
+ Return the default channel if argument is a connection instance,
+ otherwise just return the channel given.
+ """
if is_connection(channel):
return channel.default_channel
return channel
diff --git a/kombu/entity.py b/kombu/entity.py
index 9c09f8b5..f84c2fe9 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -31,6 +31,7 @@ def pretty_bindings(bindings):
def maybe_delivery_mode(
v, modes=DELIVERY_MODES, default=PERSISTENT_DELIVERY_MODE):
+ """Get delivery mode by name (or none if undefined)."""
if v:
return v if isinstance(v, numbers.Integral) else modes[v]
return default
@@ -129,6 +130,7 @@ class Exchange(MaybeChannelBound):
no_declare (bool): Never declare this exchange
(:meth:`declare` does nothing).
"""
+
TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE
@@ -185,7 +187,7 @@ class Exchange(MaybeChannelBound):
def bind_to(self, exchange='', routing_key='',
arguments=None, nowait=False, **kwargs):
- """Binds the exchange to another exchange.
+ """Bind the exchange to another exchange.
Arguments:
nowait (bool): If set the server will not respond, and the call
@@ -460,6 +462,7 @@ class Queue(MaybeChannelBound):
no_declare (bool): Never declare this queue, nor related
entities (:meth:`declare` does nothing).
"""
+
ContentDisallowed = ContentDisallowed
name = ''
@@ -522,8 +525,7 @@ class Queue(MaybeChannelBound):
self.exchange = self.exchange(self.channel)
def declare(self, nowait=False):
- """Declares the queue, the exchange and binds the queue to
- the exchange."""
+ """Declare queue and exchange then binds queue to exchange."""
if not self.no_declare:
# - declare main binding.
self._create_exchange(nowait=nowait)
@@ -718,7 +720,7 @@ class Queue(MaybeChannelBound):
return not self.auto_delete
@classmethod
- def from_dict(self, queue, **options):
+ def from_dict(cls, queue, **options):
binding_key = options.get('binding_key') or options.get('routing_key')
e_durable = options.get('exchange_durable')
diff --git a/kombu/exceptions.py b/kombu/exceptions.py
index 05b94700..139f1cfa 100644
--- a/kombu/exceptions.py
+++ b/kombu/exceptions.py
@@ -1,7 +1,7 @@
"""Exceptions."""
from __future__ import absolute_import, unicode_literals
-from socket import timeout as TimeoutError
+from socket import timeout as TimeoutError # noqa
from amqp import ChannelError, ConnectionError, ResourceError
@@ -14,17 +14,16 @@ __all__ = [
'ChannelLimitExceeded', 'ConnectionError', 'ChannelError',
'VersionMismatch', 'SerializerNotInstalled', 'ResourceError',
'SerializationError', 'EncodeError', 'DecodeError', 'HttpError',
+ 'InconsistencyError',
]
class KombuError(Exception):
"""Common subclass for all Kombu exceptions."""
- pass
class OperationalError(KombuError):
"""Recoverable message transport connection error."""
- pass
class SerializationError(KombuError):
@@ -33,7 +32,6 @@ class SerializationError(KombuError):
class EncodeError(SerializationError):
"""Cannot encode object."""
- pass
class DecodeError(SerializationError):
@@ -42,51 +40,46 @@ class DecodeError(SerializationError):
class NotBoundError(KombuError):
"""Trying to call channel dependent method on unbound entity."""
- pass
class MessageStateError(KombuError):
"""The message has already been acknowledged."""
- pass
class LimitExceeded(KombuError):
"""Limit exceeded."""
- pass
class ConnectionLimitExceeded(LimitExceeded):
"""Maximum number of simultaneous connections exceeded."""
- pass
class ChannelLimitExceeded(LimitExceeded):
"""Maximum number of simultaneous channels exceeded."""
- pass
class VersionMismatch(KombuError):
- pass
+ """Library dependency version mismatch."""
class SerializerNotInstalled(KombuError):
- """Support for the requested serialization type is not installed"""
- pass
+ """Support for the requested serialization type is not installed."""
class ContentDisallowed(SerializerNotInstalled):
"""Consumer does not allow this content-type."""
- pass
class InconsistencyError(ConnectionError):
- """Data or environment has been found to be inconsistent,
- depending on the cause it may be possible to retry the operation."""
- pass
+ """Data or environment has been found to be inconsistent.
+
+ Depending on the cause it may be possible to retry the operation.
+ """
@python_2_unicode_compatible
class HttpError(Exception):
+ """HTTP Client Error."""
def __init__(self, code, message=None, response=None):
self.code = code
diff --git a/kombu/log.py b/kombu/log.py
index 7428e5e1..569cb09b 100644
--- a/kombu/log.py
+++ b/kombu/log.py
@@ -1,3 +1,4 @@
+"""Logging Utilities."""
from __future__ import absolute_import, unicode_literals
import logging
@@ -25,6 +26,7 @@ DISABLE_TRACEBACKS = os.environ.get('DISABLE_TRACEBACKS')
def get_logger(logger):
+ """Get logger by name."""
if isinstance(logger, string_t):
logger = logging.getLogger(logger)
if not logger.handlers:
@@ -33,6 +35,7 @@ def get_logger(logger):
def get_loglevel(level):
+ """Get loglevel by name."""
if isinstance(level, string_t):
return LOG_LEVELS[level]
return level
@@ -53,6 +56,7 @@ def safeify_format(fmt, args,
class LogMixin(object):
+ """Mixin that adds severity methods to any class."""
def debug(self, *args, **kwargs):
return self.log(logging.DEBUG, *args, **kwargs)
@@ -128,6 +132,7 @@ class Log(LogMixin):
def setup_logging(loglevel=None, logfile=None):
+ """Setup logging."""
logger = logging.getLogger()
loglevel = get_loglevel(loglevel or 'ERROR')
logfile = logfile if logfile else sys.__stderr__
diff --git a/kombu/message.py b/kombu/message.py
index 5a8afc19..a7c6ca9e 100644
--- a/kombu/message.py
+++ b/kombu/message.py
@@ -9,6 +9,8 @@ from .five import python_2_unicode_compatible, reraise, text_t
from .serialization import loads
from .utils.functional import dictfilter
+__all__ = ['Message']
+
ACK_STATES = {'ACK', 'REJECTED', 'REQUEUED'}
IS_PYPY = hasattr(sys, 'pypy_version_info')
@@ -68,7 +70,7 @@ class Message(object):
callback(self, exc)
def ack(self, multiple=False):
- """Acknowledge this message as being processed.,
+ """Acknowledge this message as being processed.
This will remove the message from the queue.
@@ -140,8 +142,9 @@ class Message(object):
self._state = 'REQUEUED'
def decode(self):
- """Deserialize the message body, returning the original
- python structure sent by the publisher.
+ """Deserialize the message body.
+
+ Returning the original python structure sent by the publisher.
Note:
The return value is memoized, use `_decode` to force
diff --git a/kombu/messaging.py b/kombu/messaging.py
index ee2ab7e0..f85c4fe3 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -98,8 +98,7 @@ class Producer(object):
self.exchange.declare()
def maybe_declare(self, entity, retry=False, **retry_policy):
- """Declare the exchange if it hasn't already been declared
- during this session."""
+ """Declare exchange if not already declared during this session."""
if entity:
return maybe_declare(entity, self.channel, retry, **retry_policy)
@@ -289,6 +288,7 @@ class Consumer(object):
on_decode_error (Callable): see :attr:`on_decode_error`.
prefetch_count (int): see :attr:`prefetch_count`.
"""
+
ContentDisallowed = ContentDisallowed
#: The connection/channel to use for this consumer.
@@ -496,8 +496,7 @@ class Consumer(object):
self._queues.pop(qname, None)
def consuming_from(self, queue):
- """Return :const:`True` if the consumer is currently
- consuming from queue'."""
+ """Return :const:`True` if currently consuming from queue'."""
name = queue
if isinstance(queue, Queue):
name = queue.name
diff --git a/kombu/mixins.py b/kombu/mixins.py
index 112ce9f4..310d0c63 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -1,11 +1,5 @@
# -*- coding: utf-8 -*-
-"""
-kombu.mixins
-============
-
-Useful mixin classes.
-
-"""
+"""Mixins."""
from __future__ import absolute_import, unicode_literals
import socket
@@ -254,7 +248,9 @@ class ConsumerMixin(object):
class ConsumerProducerMixin(ConsumerMixin):
- """Version of ConsumerMixin having separate connection for also
+ """Consumer and Producer mixin.
+
+ Version of ConsumerMixin having separate connection for also
publishing messages.
Example:
@@ -280,6 +276,7 @@ class ConsumerProducerMixin(ConsumerMixin):
retry=True,
)
"""
+
_producer_connection = None
def on_consume_end(self, connection, channel):
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index 99485ca8..433db15a 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -37,6 +37,7 @@ debug, error = logger.debug, logger.error
class Node(object):
+ """Mailbox node."""
#: hostname of the node.
hostname = None
@@ -137,6 +138,8 @@ class Node(object):
class Mailbox(object):
+ """Process Mailbox."""
+
node_cls = Node
exchange_fmt = '%s.pidbox'
reply_exchange_fmt = 'reply.%s.pidbox'
diff --git a/kombu/pools.py b/kombu/pools.py
index ed5fc521..b007b47d 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -25,6 +25,8 @@ def _after_fork_cleanup_group(group):
class ProducerPool(Resource):
+ """Pool of :class:`kombu.Producer` instances."""
+
Producer = Producer
close_after_fork = True
@@ -75,6 +77,7 @@ class ProducerPool(Resource):
class PoolGroup(EqualityDict):
+ """Collection of resource pools."""
def __init__(self, limit=None, close_after_fork=True):
self.limit = limit
@@ -94,11 +97,13 @@ class PoolGroup(EqualityDict):
def register_group(group):
+ """Register group (can be used as decorator)."""
_groups.append(group)
return group
class Connections(PoolGroup):
+ """Collection of connection pools."""
def create(self, connection, limit):
return connection.Pool(limit=limit)
@@ -106,6 +111,7 @@ connections = register_group(Connections(limit=use_global_limit))
class Producers(PoolGroup):
+ """Collection of producer pools."""
def create(self, connection, limit):
return ProducerPool(connections[connection], limit=limit)
@@ -117,10 +123,12 @@ def _all_pools():
def get_limit():
+ """Get current connection pool limit."""
return _limit[0]
def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
+ """Set new connection pool limit."""
limit = limit or 0
glimit = _limit[0] or 0
if limit != glimit:
@@ -131,6 +139,7 @@ def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
def reset(*args, **kwargs):
+ """Reset all pools by closing open resources."""
for pool in _all_pools():
try:
pool.force_close_all()
diff --git a/kombu/resource.py b/kombu/resource.py
index 11e44366..866ff933 100644
--- a/kombu/resource.py
+++ b/kombu/resource.py
@@ -19,12 +19,15 @@ def _after_fork_cleanup_resource(resource):
class LifoQueue(_LifoQueue):
+ """Last in first out version of Queue."""
def _init(self, maxsize):
self.queue = deque()
class Resource(object):
+ """Pool of resources."""
+
LimitExceeded = exceptions.LimitExceeded
close_after_fork = False
@@ -114,8 +117,10 @@ class Resource(object):
pass
def replace(self, resource):
- """Replace resource with a new instance. This can be used in case
- of defective resources."""
+ """Replace existing resource with a new instance.
+
+ This can be used in case of defective resources.
+ """
if self.limit:
self._dirty.discard(resource)
self.close_resource(resource)
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 78233ad6..a9c73d1a 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -159,7 +159,9 @@ class SerializerRegistry(object):
'No encoder installed for {0}'.format(name))
def dumps(self, data, serializer=None):
- """Serialize a data structure into a string suitable for sending
+ """Encode data.
+
+ Serialize a data structure into a string suitable for sending
as an AMQP message body.
Arguments:
@@ -222,7 +224,9 @@ class SerializerRegistry(object):
def loads(self, data, content_type, content_encoding,
accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
- """Deserialize a data stream as serialized using `dumps`
+ """Decode serialized data.
+
+ Deserialize a data stream as serialized using `dumps`
based on `content_type`.
Arguments:
@@ -321,8 +325,11 @@ def register_yaml():
except ImportError:
def not_available(*args, **kwargs):
- """In case a client receives a yaml message, but yaml
- isn't installed."""
+ """Raise SerializerNotInstalled.
+
+ Used in case a client receives a yaml message, but yaml
+ isn't installed.
+ """
raise SerializerNotInstalled(
'No decoder installed for YAML. Install the PyYAML library')
registry.register('yaml', None, not_available, 'application/x-yaml')
@@ -338,9 +345,11 @@ else:
def register_pickle():
- """The fastest serialization method, but restricts
- you to python clients."""
+ """Register pickle serializer.
+ The fastest serialization method, but restricts
+ you to python clients.
+ """
def pickle_dumps(obj, dumper=pickle.dumps):
return dumper(obj, protocol=pickle_protocol)
@@ -350,7 +359,11 @@ def register_pickle():
def register_msgpack():
- """See http://msgpack.sourceforge.net/"""
+ """Register msgpack serializer.
+
+ See Also:
+ http://msgpack.sourceforge.net/.
+ """
pack = unpack = None
try:
import msgpack
diff --git a/kombu/simple.py b/kombu/simple.py
index 3f17a778..9da31c9b 100644
--- a/kombu/simple.py
+++ b/kombu/simple.py
@@ -84,7 +84,7 @@ class SimpleBase(object):
self._consuming = True
def __len__(self):
- """`len(self) -> self.qsize()`"""
+ """`len(self) -> self.qsize()`."""
return self.qsize()
def __bool__(self):
@@ -93,6 +93,8 @@ class SimpleBase(object):
class SimpleQueue(SimpleBase):
+ """Simple API for persistent queues."""
+
no_ack = False
queue_opts = {}
exchange_opts = {'type': 'direct'}
@@ -123,6 +125,8 @@ class SimpleQueue(SimpleBase):
class SimpleBuffer(SimpleQueue):
+ """Simple API for ephemeral queues."""
+
no_ack = True
queue_opts = dict(durable=False,
auto_delete=True)
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py
index 689046f7..3b0689b3 100644
--- a/kombu/transport/SLMQ.py
+++ b/kombu/transport/SLMQ.py
@@ -26,6 +26,8 @@ CHARS_REPLACE_TABLE = {
class Channel(virtual.Channel):
+ """SLMQ Channel."""
+
default_visibility_timeout = 1800 # 30 minutes.
domain_format = 'kombu%(vhost)s'
_slmq = None
@@ -59,7 +61,7 @@ class Channel(virtual.Channel):
return text_t(safe_str(name)).translate(table)
def _new_queue(self, queue, **kwargs):
- """Ensures a queue exists in SLQS."""
+ """Ensure a queue exists in SLQS."""
queue = self.entity_name(self.queue_name_prefix + queue)
try:
return self._queue_cache[queue]
@@ -73,7 +75,7 @@ class Channel(virtual.Channel):
return q
def _delete(self, queue, *args, **kwargs):
- """delete queue by name."""
+ """Delete queue by name."""
queue_name = self.entity_name(queue)
self._queue_cache.pop(queue_name, None)
self.slmq.queue(queue_name).delete(force=True)
@@ -169,6 +171,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """SLMQ Transport."""
+
Channel = Channel
polling_interval = 1
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 0f1c65fb..5b8e5726 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -71,6 +71,7 @@ SQS_MAX_MESSAGES = 10
def maybe_int(x):
+ """Try to convert x' to int, or return x' if that fails."""
try:
return int(x)
except ValueError:
@@ -78,6 +79,8 @@ def maybe_int(x):
class Channel(virtual.Channel):
+ """SQS Channel."""
+
default_region = 'us-east-1'
default_visibility_timeout = 1800 # 30 minutes.
default_wait_time_seconds = 10 # disabled see #198
@@ -454,6 +457,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """SQS Transport."""
+
Channel = Channel
polling_interval = 1
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 2bfc9f5d..7fb689ed 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -7,6 +7,7 @@ from kombu.utils.imports import symbol_by_name
def supports_librabbitmq():
+ """Return true if :pypi:`librabbitmq` can be used."""
if _detect_environment() == 'default':
try:
import librabbitmq # noqa
@@ -40,6 +41,13 @@ _transport_cache = {}
def resolve_transport(transport=None):
+ """Get transport by name.
+
+ Arguments:
+ transport (Union[str, type]): This can be either
+ an actual transport class, or the fully qualified
+ path to a transport class, or the alias of a transport.
+ """
if isinstance(transport, string_t):
try:
transport = TRANSPORT_ALIASES[transport]
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 4e2d6a9d..6d381b83 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -20,6 +20,8 @@ def _LeftBlank(obj, method):
class StdChannel(object):
+ """Standard channel base class."""
+
no_ack_consumers = None
def Consumer(self, *args, **kwargs):
@@ -34,8 +36,12 @@ class StdChannel(object):
raise _LeftBlank(self, 'get_bindings')
def after_reply_message_received(self, queue):
- """reply queue semantics: can be used to delete the queue
- after transient reply message received."""
+ """Callback called after RPC reply received.
+
+ Notes:
+ Reply queue semantics: can be used to delete the queue
+ after transient reply message received.
+ """
pass
def __enter__(self):
@@ -46,6 +52,7 @@ class StdChannel(object):
class Management(object):
+ """AMQP Management API (incomplete)."""
def __init__(self, transport):
self.transport = transport
@@ -55,6 +62,7 @@ class Management(object):
class Implements(dict):
+ """Helper class used to define transport features."""
def __getattr__(self, key):
try:
@@ -78,6 +86,7 @@ default_transport_capabilities = Implements(
class Transport(object):
"""Base class for transports."""
+
Management = Management
#: The :class:`~kombu.Connection` owning this instance.
diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py
index 2598e473..26dca7ac 100644
--- a/kombu/transport/consul.py
+++ b/kombu/transport/consul.py
@@ -32,7 +32,7 @@ DEFAULT_HOST = 'localhost'
class LockError(Exception):
- pass
+ """An error occurred while trying to acquire the lock."""
class Channel(virtual.Channel):
@@ -65,7 +65,9 @@ class Channel(virtual.Channel):
return '{0}/{1}'.format(self.prefix, queue)
def _get_or_create_session(self, queue):
- """Try to renew the session if it exists, otherwise create a new
+ """Get or create consul session.
+
+ Try to renew the session if it exists, otherwise create a new
session in Consul.
This session is used to acquire a lock inside Consul so that we achieve
@@ -77,7 +79,6 @@ class Channel(virtual.Channel):
Returns:
str: The ID of the session.
"""
-
try:
session_id = self.queues[queue]['session_id']
except KeyError:
@@ -141,8 +142,9 @@ class Channel(virtual.Channel):
raise raising()
def _release_lock(self, queue):
- """Try to release a lock. It does so by simply removing the lock key
- in Consul.
+ """Try to release a lock.
+
+ It does so by simply removing the lock key in Consul.
Arguments:
queue (str): The name of the queue we want to release
@@ -152,8 +154,9 @@ class Channel(virtual.Channel):
self.client.kv.delete(key=self._lock_key(queue))
def _destroy_session(self, queue):
- """Destroy a previously created Consul session and
- release all locks it still might hold.
+ """Destroy a previously created Consul session.
+
+ Will release all locks it still might hold.
Arguments:
queue (str): The name of the Queue.
diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py
index afdd7763..cc7cfc21 100644
--- a/kombu/transport/etcd.py
+++ b/kombu/transport/etcd.py
@@ -33,9 +33,7 @@ DEFAULT_HOST = 'localhost'
class Channel(virtual.Channel):
- """
- Etcd Channel class which talks to the Etcd.
- """
+ """Etcd Channel class which talks to the Etcd."""
prefix = 'kombu'
index = None
@@ -44,9 +42,6 @@ class Channel(virtual.Channel):
lock_ttl = 10
def __init__(self, *args, **kwargs):
- """
- Creates a new instance of the etcd.Channel.
- """
if etcd is None:
raise ImportError('Missing python-etcd library')
@@ -62,8 +57,7 @@ class Channel(virtual.Channel):
self.client = etcd.Client(host=host, port=int(port))
def _key_prefix(self, queue):
- """
- Creates and returns the `queue` with the proper prefix.
+ """Create and return the `queue` with the proper prefix.
Arguments:
queue (str): The name of the queue.
@@ -94,8 +88,7 @@ class Channel(virtual.Channel):
lock.release()
def _new_queue(self, queue, **_):
- """
- Creates a new `queue` if the `queue` doesn't already exist.
+ """Create a new `queue` if the `queue` doesn't already exist.
Arguments:
queue (str): The name of the queue.
@@ -110,8 +103,7 @@ class Channel(virtual.Channel):
return self.client.read(key=self._key_prefix(queue))
def _has_queue(self, queue, **kwargs):
- """
- Verify that queue exists.
+ """Verify that queue exists.
Returns:
bool: Should return :const:`True` if the queue exists
@@ -124,8 +116,7 @@ class Channel(virtual.Channel):
return False
def _delete(self, queue, *args, **_):
- """
- Deletes a `queue`.
+ """Delete a `queue`.
Arguments:
queue (str): The name of the queue.
@@ -186,8 +177,7 @@ class Channel(virtual.Channel):
raise Empty()
def _purge(self, queue):
- """
- Removes all `message`s from a `queue`.
+ """Remove all `message`s from a `queue`.
Arguments:
queue (str): The name of the queue.
@@ -198,8 +188,7 @@ class Channel(virtual.Channel):
return self.client.delete(key=key, recursive=True)
def _size(self, queue):
- """
- Returns the size of the `queue`.
+ """Return the size of the `queue`.
Arguments:
queue (str): The name of the queue.
@@ -227,9 +216,7 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
- """
- Etcd storage Transport for Kombu.
- """
+ """Etcd storage Transport for Kombu."""
Channel = Channel
@@ -242,9 +229,7 @@ class Transport(virtual.Transport):
exchange_type=frozenset(['direct']))
def __init__(self, *args, **kwargs):
- """
- Creates a new instance of etcd.Transport.
- """
+ """Create a new instance of etcd.Transport."""
if etcd is None:
raise ImportError('Missing python-etcd library')
@@ -259,9 +244,7 @@ class Transport(virtual.Transport):
)
def verify_connection(self, connection):
- """
- Verifies the connection works.
- """
+ """Verify the connection works."""
port = connection.client.port or self.default_port
host = connection.client.hostname or DEFAULT_HOST
@@ -276,8 +259,7 @@ class Transport(virtual.Transport):
return False
def driver_version(self):
- """
- Returns the version of the etcd library.
+ """Return the version of the etcd library.
.. note::
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index 1bace4cf..b27dfdb8 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -1,4 +1,4 @@
-"""File-system Transport
+"""File-system Transport.
Transport using the file-system as the message store.
"""
@@ -34,10 +34,12 @@ if os.name == 'nt':
__overlapped = pywintypes.OVERLAPPED()
def lock(file, flags):
+ """Create file lock."""
hfile = win32file._get_osfhandle(file.fileno())
win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped)
def unlock(file):
+ """Remove file lock."""
hfile = win32file._get_osfhandle(file.fileno())
win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)
@@ -47,9 +49,11 @@ elif os.name == 'posix':
from fcntl import LOCK_EX, LOCK_SH, LOCK_NB # noqa
def lock(file, flags): # noqa
+ """Create file lock."""
fcntl.flock(file.fileno(), flags)
def unlock(file): # noqa
+ """Remove file lock."""
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
raise RuntimeError(
@@ -57,10 +61,10 @@ else:
class Channel(virtual.Channel):
+ """Filesystem Channel."""
def _put(self, queue, payload, **kwargs):
"""Put `message` onto `queue`."""
-
filename = '%s_%s.%s.msg' % (int(round(monotonic() * 1000)),
uuid.uuid4(), queue)
filename = os.path.join(self.data_folder_out, filename)
@@ -78,7 +82,6 @@ class Channel(virtual.Channel):
def _get(self, queue):
"""Get next message from `queue`."""
-
queue_find = '.' + queue + '.msg'
folder = os.listdir(self.data_folder_in)
folder = sorted(folder)
@@ -180,6 +183,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """Filesystem Transport."""
+
Channel = Channel
default_port = 0
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index 00e37340..93de2ed0 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -30,6 +30,7 @@ ssl not supported by librabbitmq, please use pyamqp:// or stunnel\
class Message(base.Message):
+ """AMQP Message (librabbitmq)."""
def __init__(self, channel, props, info, body):
super(Message, self).__init__(
@@ -44,6 +45,8 @@ class Message(base.Message):
class Channel(amqp.Channel, base.StdChannel):
+ """AMQP Channel (librabbitmq)."""
+
Message = Message
def prepare_message(self, body, priority=None,
@@ -59,11 +62,15 @@ class Channel(amqp.Channel, base.StdChannel):
class Connection(amqp.Connection):
+ """AMQP Connection (librabbitmq)."""
+
Channel = Channel
Message = Message
class Transport(base.Transport):
+ """AMQP Transport (librabbitmq)."""
+
Connection = Connection
default_port = DEFAULT_PORT
diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py
index 419212f3..0674e288 100644
--- a/kombu/transport/memory.py
+++ b/kombu/transport/memory.py
@@ -8,6 +8,8 @@ from . import virtual
class Channel(virtual.Channel):
+ """In-memory Channel."""
+
queues = {}
do_restore = False
supports_fanout = True
@@ -60,6 +62,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """In-memory Transport."""
+
Channel = Channel
#: memory backend state is global.
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 7620a2e6..692cb189 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -78,6 +78,8 @@ class BroadcastCursor(object):
class Channel(virtual.Channel):
+ """MongoDB Channel."""
+
supports_fanout = True
# Mutable container. Shared by all class instances
@@ -318,8 +320,7 @@ class Channel(virtual.Channel):
capped=True)
def _ensure_indexes(self, database):
- """Ensure indexes on collections.
- """
+ """Ensure indexes on collections."""
messages = database[self.messages_collection]
messages.ensure_index(
[('queue', 1), ('priority', 1), ('_id', 1)], background=True,
@@ -395,8 +396,11 @@ class Channel(virtual.Channel):
return ret
def _get_expire(self, queue, argument):
- """Gets expiration header named `argument` of queue definition.
- `queue` must be either queue name or options itself."""
+ """Get expiration header named `argument` of queue definition.
+
+ Note:
+ `queue` must be either queue name or options itself.
+ """
if isinstance(queue, basestring):
doc = self.queues.find_one({'_id': queue})
@@ -415,7 +419,7 @@ class Channel(virtual.Channel):
return self.get_now() + datetime.timedelta(milliseconds=value)
def _update_queues_expire(self, queue):
- """Updates expiration field on queues documents."""
+ """Update expiration field on queues documents."""
expire_at = self._get_expire(queue, 'x-expires')
if not expire_at:
@@ -434,6 +438,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """MongoDB Transport."""
+
Channel = Channel
can_parse_url = True
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index ec510de2..bc424653 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -14,6 +14,7 @@ DEFAULT_SSL_PORT = 5671
class Message(base.Message):
+ """AMQP Message."""
def __init__(self, channel, msg, **kwargs):
props = msg.properties
@@ -30,12 +31,14 @@ class Message(base.Message):
class Channel(amqp.Channel, base.StdChannel):
+ """AMQP Channel."""
+
Message = Message
def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None,
headers=None, properties=None, _Message=amqp.Message):
- """Prepares message so that it can be sent using this transport."""
+ """Prepare message so that it can be sent using this transport."""
return _Message(
body,
priority=priority,
@@ -51,10 +54,14 @@ class Channel(amqp.Channel, base.StdChannel):
class Connection(amqp.Connection):
+ """AMQP Connection."""
+
Channel = Channel
class Transport(base.Transport):
+ """AMQP Transport."""
+
Connection = Connection
default_port = DEFAULT_PORT
@@ -159,6 +166,7 @@ class Transport(base.Transport):
class SSLTransport(Transport):
+ """AMQP SSL Transport."""
def __init__(self, *args, **kwargs):
super(SSLTransport, self).__init__(*args, **kwargs)
diff --git a/kombu/transport/pyro.py b/kombu/transport/pyro.py
index 8fb15ad6..67656aa4 100644
--- a/kombu/transport/pyro.py
+++ b/kombu/transport/pyro.py
@@ -24,6 +24,7 @@ Unable to locate pyro nameserver {0.virtual_host} on host {0.hostname}\
class Channel(virtual.Channel):
+ """Pyro Channel."""
def queues(self):
return self.shared_queues.get_queue_names()
@@ -64,6 +65,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """Pyro Transport."""
+
Channel = Channel
#: memory backend state is global.
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 9afa8474..a79c7e73 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1,6 +1,4 @@
-"""
-kombu.transport.qpid
-=======================
+"""Qpid Transport.
`Qpid`_ transport using `qpid-python`_ as the client and `qpid-tools`_ for
broker management.
@@ -77,7 +75,6 @@ The :attr:`~kombu.Connection.transport_options` argument to the
options override and replace any other default or specified values. If using
Celery, this can be accomplished by setting the
*BROKER_TRANSPORT_OPTIONS* Celery option.
-
"""
from __future__ import absolute_import, unicode_literals
@@ -138,8 +135,10 @@ PY3 = sys.version_info[0] == 3
def dependency_is_none(dependency):
- """Return True if the dependency is None, otherwise False. This is done
- using a function so that tests can mock this behavior easily.
+ """Return True if the dependency is None, otherwise False.
+
+ This is done using a function so that tests can mock this
+ behavior easily.
:param dependency: The module to check if it is None
:return: True if dependency is None otherwise False.
@@ -149,7 +148,7 @@ def dependency_is_none(dependency):
class AuthenticationFailure(Exception):
- pass
+ """Cannot authenticate with Qpid."""
class QoS(object):
@@ -186,8 +185,7 @@ class QoS(object):
self._not_yet_acked = OrderedDict()
def can_consume(self):
- """Return True if the :class:`~kombu.transport.qpid.Channel` can
- consume more messages, else False.
+ """Return True if the :class:`Channel` can consume more messages.
Used to ensure the client adheres to currently active prefetch
limits.
@@ -204,8 +202,7 @@ class QoS(object):
)
def can_consume_max_estimate(self):
- """Return the remaining message capacity for the associated
- :class:`kombu.transport.qpid.Channel`.
+ """Return the remaining message capacity.
Returns an estimated number of outstanding messages that a
:class:`kombu.transport.qpid.Channel` can accept without
@@ -238,8 +235,9 @@ class QoS(object):
self._not_yet_acked[delivery_tag] = message
def get(self, delivery_tag):
- """Get an un-ACKed message by delivery_tag. If called with an invalid
- delivery_tag a :exc:`KeyError` is raised.
+ """Get an un-ACKed message by delivery_tag.
+
+ If called with an invalid delivery_tag a :exc:`KeyError` is raised.
:param delivery_tag: The delivery tag associated with the message
to be returned.
@@ -431,7 +429,7 @@ class Channel(base.StdChannel):
return message
def _put(self, routing_key, message, exchange=None, **kwargs):
- """Synchronous send of a single message onto a queue or exchange.
+ """Synchronously send a single message onto a queue or exchange.
An internal method which synchronously sends a single message onto
a given queue or exchange. If exchange is not specified,
@@ -732,7 +730,7 @@ class Channel(base.StdChannel):
raise exc
def exchange_delete(self, exchange_name, **kwargs):
- """Delete an exchange specified by name
+ """Delete an exchange specified by name.
:param exchange_name: The name of the exchange to be deleted.
:type exchange_name: str
@@ -1209,7 +1207,9 @@ class Channel(base.StdChannel):
class Connection(object):
- """Encapsulate a connection object for the
+ """Qpid Connection.
+
+ Encapsulate a connection object for the
:class:`~kombu.transport.qpid.Transport`.
:param host: The host that connections should connect to.
@@ -1331,7 +1331,7 @@ class Connection(object):
return self._qpid_conn
def close(self):
- """Close the connection
+ """Close the connection.
Closing the connection will close all associated session, senders, or
receivers used by the Connection.
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 7e157704..4a2d62ab 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -69,6 +69,7 @@ Probably the key ({1!r}) has been removed from the Redis database.
def get_redis_error_classes():
+ """Return tuple of redis error classes."""
from redis import exceptions
# This exception suddenly changed name between redis-py versions
if hasattr(exceptions, 'InvalidData'):
@@ -92,16 +93,18 @@ def get_redis_error_classes():
def get_redis_ConnectionError():
+ """Return the redis ConnectionError exception class."""
from redis import exceptions
return exceptions.ConnectionError
class MutexHeld(Exception):
- pass
+ """Raised when another party holds the lock."""
@contextmanager
def Mutex(client, name, expire):
+ """The Redis lock implementation (probably shaky)."""
lock_id = uuid()
i_won = client.setnx(name, lock_id)
try:
@@ -131,6 +134,8 @@ def _after_fork_cleanup_channel(channel):
class QoS(virtual.QoS):
+ """Redis Ack Emulation."""
+
restore_at_shutdown = True
def __init__(self, *args, **kwargs):
@@ -223,6 +228,8 @@ class QoS(virtual.QoS):
class MultiChannelPoller(object):
+ """Async I/O poller for Redis transport."""
+
eventflags = READ | ERR
#: Set by :meth:`get` while reading from the socket.
@@ -285,7 +292,7 @@ class MultiChannelPoller(object):
(channel, client, cmd) in self._chan_to_sock)
def _register_BRPOP(self, channel):
- """enable BRPOP mode for channel."""
+ """Enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
@@ -294,7 +301,7 @@ class MultiChannelPoller(object):
channel._brpop_start()
def _register_LISTEN(self, channel):
- """enable LISTEN mode for channel."""
+ """Enable LISTEN mode for channel."""
if not self._client_registered(channel, channel.subclient, 'LISTEN'):
channel._in_listen = False
self._register(channel, channel.subclient, 'LISTEN')
@@ -372,6 +379,8 @@ class MultiChannelPoller(object):
class Channel(virtual.Channel):
+ """Redis Channel."""
+
QoS = QoS
_client = None
@@ -979,6 +988,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """Redis Transport."""
+
Channel = Channel
polling_interval = None # disable sleep between unsuccessful polls.
@@ -1083,5 +1094,7 @@ class SentinelChannel(Channel):
class SentinelTransport(Transport):
+ """Redis Sentinel Transport."""
+
default_port = 26379
Channel = SentinelChannel
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index 372a32e9..916775c8 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -62,6 +62,7 @@ queue_binding_t = namedtuple('queue_binding_t', (
class Base64(object):
+ """Base64 codec."""
def encode(self, s):
return bytes_to_str(base64.b64encode(str_to_bytes(s)))
@@ -72,15 +73,14 @@ class Base64(object):
class NotEquivalentError(Exception):
"""Entity declaration is not equivalent to the previous declaration."""
- pass
class UndeliverableWarning(UserWarning):
"""The message could not be delivered to a queue."""
- pass
class BrokerState(object):
+ """Broker state holds exchanges, queues and bindings."""
#: Mapping of exchange name to
#: :class:`kombu.transport.virtual.exchange.ExchangeType`
@@ -198,7 +198,7 @@ class QoS(object):
return not pcount or len(self._delivered) - len(self._dirty) < pcount
def can_consume_max_estimate(self):
- """Returns the maximum number of messages allowed to be returned.
+ """Return the maximum number of messages allowed to be returned.
Returns an estimated number of messages that a consumer may be allowed
to consume at once from the broker. This is used for services where
@@ -264,7 +264,7 @@ class QoS(object):
return errors
def restore_unacked_once(self, stderr=None):
- """Restores all unacknowledged messages at shutdown/gc collect.
+ """Restore all unacknowledged messages at shutdown/gc collect.
Note:
Can only be called once for each instance, subsequent
@@ -295,8 +295,9 @@ class QoS(object):
state.restored = True
def restore_visible(self, *args, **kwargs):
- """Restore any pending unackwnowledged messages for visibility_timeout
- style implementations.
+ """Restore any pending unackwnowledged messages.
+
+ To be filled in for visibility_timeout style implementations.
Note:
This is implementation optional, and currently only
@@ -306,6 +307,7 @@ class QoS(object):
class Message(base.Message):
+ """Message object."""
def __init__(self, channel, payload, **kwargs):
self._raw = payload
@@ -342,7 +344,9 @@ class Message(base.Message):
class AbstractChannel(object):
- """This is an abstract class defining the channel methods
+ """Abstract channel interface.
+
+ This is an abstract class defining the channel methods
you'd usually want to implement in a virtual channel.
Note:
@@ -409,6 +413,7 @@ class Channel(AbstractChannel, base.StdChannel):
connection (ConnectionT): The transport instance this
channel is part of.
"""
+
#: message class used.
Message = Message
@@ -617,7 +622,7 @@ class Channel(AbstractChannel, base.StdChannel):
)
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
- """Consume from `queue`"""
+ """Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
@@ -769,8 +774,10 @@ class Channel(AbstractChannel, base.StdChannel):
raise NotImplementedError('virtual channels do not support flow.')
def close(self):
- """Close channel, cancel all consumers, and requeue unacked
- messages."""
+ """Close channel.
+
+ Cancel all consumers, and requeue unacked messages.
+ """
if not self.closed:
self.closed = True
for consumer in list(self._consumers):
@@ -823,8 +830,9 @@ class Channel(AbstractChannel, base.StdChannel):
return self._cycle
def _get_message_priority(self, message, reverse=False):
- """Get priority from message and limit the value within a
- boundary of 0 to 9.
+ """Get priority from message.
+
+ The value is limited to within a boundary of 0 to 9.
Note:
Higher value has more priority.
@@ -842,6 +850,7 @@ class Channel(AbstractChannel, base.StdChannel):
class Management(base.Management):
+ """Base class for the AMQP management API."""
def __init__(self, transport):
super(Management, self).__init__(transport)
@@ -861,6 +870,7 @@ class Transport(base.Transport):
Arguments:
client (kombu.Connection): The client this is a transport for.
"""
+
Channel = Channel
Cycle = FairCycle
Management = Management
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index c61d7147..b690d186 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -11,11 +11,14 @@ from kombu.utils.text import escape_regex
class ExchangeType(object):
- """Implements the specifics for an exchange type.
+ """Base class for exchanges.
+
+ Implements the specifics for an exchange type.
Arguments:
channel (ChannelT): AMQ Channel.
"""
+
type = None
def __init__(self, channel):
@@ -48,7 +51,11 @@ class ExchangeType(object):
class DirectExchange(ExchangeType):
- """The `direct` exchange routes based on exact routing keys."""
+ """Direct exchange.
+
+ The `direct` exchange routes based on exact routing keys.
+ """
+
type = 'direct'
def lookup(self, table, exchange, routing_key, default):
@@ -65,9 +72,13 @@ class DirectExchange(ExchangeType):
class TopicExchange(ExchangeType):
- """The `topic` exchange routes messages based on words separated by
+ """Topic exchange.
+
+ The `topic` exchange routes messages based on words separated by
dots, using wildcard characters ``*`` (any single word), and ``#``
- (one or more words)."""
+ (one or more words).
+ """
+
type = 'topic'
#: map of wildcard to regex conversions
@@ -102,8 +113,11 @@ class TopicExchange(ExchangeType):
))
def _match(self, pattern, string):
- """Same as :func:`re.match`, except the regex is compiled and cached,
- then reused on subsequent matches with the same pattern."""
+ """Match regular expression (cached).
+
+ Same as :func:`re.match`, except the regex is compiled and cached,
+ then reused on subsequent matches with the same pattern.
+ """
try:
compiled = self._compiled[pattern]
except KeyError:
@@ -112,7 +126,9 @@ class TopicExchange(ExchangeType):
class FanoutExchange(ExchangeType):
- """The `fanout` exchange implements broadcast messaging by delivering
+ """Fanout exchange.
+
+ The `fanout` exchange implements broadcast messaging by delivering
copies of all messages to all queues bound to the exchange.
To support fanout the virtual channel needs to store the table
@@ -123,6 +139,7 @@ class FanoutExchange(ExchangeType):
See Also:
the redis backend for an example implementation of these methods.
"""
+
type = 'fanout'
def lookup(self, table, exchange, routing_key, default):
diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py
index d6844f67..70ff1743 100644
--- a/kombu/transport/zookeeper.py
+++ b/kombu/transport/zookeeper.py
@@ -76,6 +76,7 @@ __author__ = 'Mahendra M <mahendra.m@gmail.com>'
class Channel(virtual.Channel):
+ """Zookeeper Channel."""
_client = None
_queues = {}
@@ -173,6 +174,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """Zookeeper Transport."""
+
Channel = Channel
polling_interval = 1
default_port = DEFAULT_PORT
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 6cf0a286..f06b5504 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -1,4 +1,4 @@
-"""DEPRECATED - Import from modules below"""
+"""DEPRECATED - Import from modules below."""
from __future__ import absolute_import, print_function, unicode_literals
from .collections import EqualityDict
diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py
index f0c8418f..1b893635 100644
--- a/kombu/utils/amq_manager.py
+++ b/kombu/utils/amq_manager.py
@@ -1,8 +1,10 @@
+"""AMQP Management API utilities."""
from __future__ import absolute_import, unicode_literals
def get_manager(client, hostname=None, port=None, userid=None,
password=None):
+ """Get pyrabbit manager."""
import pyrabbit
opt = client.transport_options.get
diff --git a/kombu/utils/collections.py b/kombu/utils/collections.py
index 7d8fd1ee..105cc47d 100644
--- a/kombu/utils/collections.py
+++ b/kombu/utils/collections.py
@@ -3,8 +3,12 @@ from __future__ import absolute_import, unicode_literals
class HashedSeq(list):
- """type used for hash() to make sure the hash is not generated
- multiple times."""
+ """Hashed Sequence.
+
+ Type used for hash() to make sure the hash is not generated
+ multiple times.
+ """
+
__slots__ = 'hashvalue'
def __init__(self, *seq):
@@ -16,6 +20,7 @@ class HashedSeq(list):
def eqhash(o):
+ """Call ``obj.__eqhash__``."""
try:
return o.__eqhash__()
except AttributeError:
@@ -23,6 +28,7 @@ def eqhash(o):
class EqualityDict(dict):
+ """Dict using the eq operator for keying."""
def __getitem__(self, key):
h = eqhash(key)
diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py
index acd438b8..80903b6d 100644
--- a/kombu/utils/compat.py
+++ b/kombu/utils/compat.py
@@ -1,3 +1,4 @@
+"""Python Compatibility Utilities."""
from __future__ import absolute_import, unicode_literals
import numbers
@@ -24,10 +25,19 @@ except ImportError: # pragma: no cover
except ImportError:
register_after_fork = None # noqa
+try:
+ from typing import NamedTuple
+except ImportError:
+ import collections
+
+ def NamedTuple(name, fields):
+ """Typed version of collections.namedtuple."""
+ return collections.namedtuple(name, [k for k, _ in fields])
+
_environment = None
def coro(gen):
-
+ """Decorator to mark generator as co-routine."""
@wraps(gen)
def wind_up(*args, **kwargs):
it = gen(*args, **kwargs)
@@ -63,12 +73,14 @@ def _detect_environment():
def detect_environment():
+ """Detect the current environment: default, eventlet, or gevent."""
global _environment
if _environment is None:
_environment = _detect_environment()
return _environment
def entrypoints(namespace):
+ """Return setuptools entrypoints for namespace."""
try:
from pkg_resources import iter_entry_points
except ImportError:
@@ -77,6 +89,7 @@ def entrypoints(namespace):
def fileno(f):
+ """Get fileno from file-like object."""
if isinstance(f, numbers.Integral):
return f
return f.fileno()
@@ -92,9 +105,8 @@ def maybe_fileno(f):
@contextmanager
def nested(*managers): # pragma: no cover
+ """Nest context managers."""
# flake8: noqa
- """Combine multiple context managers into a single nested
- context manager."""
exits = []
vars = []
exc = (None, None, None)
diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py
index d5118a6f..f52f8f39 100644
--- a/kombu/utils/debug.py
+++ b/kombu/utils/debug.py
@@ -13,6 +13,7 @@ __all__ = ['setup_logging', 'Logwrapped']
def setup_logging(loglevel=logging.DEBUG, loggers=['kombu.connection',
'kombu.channel']):
+ """Setup logging to stdout."""
for logger in loggers:
l = get_logger(logger)
l.addHandler(logging.StreamHandler())
@@ -21,6 +22,8 @@ def setup_logging(loglevel=logging.DEBUG, loggers=['kombu.connection',
@python_2_unicode_compatible
class Logwrapped(object):
+ """Wrap all object methods, to log on call."""
+
__ignore = ('__enter__', '__exit__')
def __init__(self, instance, logger=None, ident=None):
diff --git a/kombu/utils/div.py b/kombu/utils/div.py
index d73afe07..db52048c 100644
--- a/kombu/utils/div.py
+++ b/kombu/utils/div.py
@@ -1,3 +1,4 @@
+"""Div. Utilities."""
from __future__ import absolute_import, unicode_literals, print_function
from .encoding import default_encode
@@ -6,6 +7,7 @@ import sys
def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
+ """Dump message state to stdout or file."""
from pprint import pformat
from tempfile import mktemp
stderr = sys.stderr if stderr is None else stderr
diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py
index 7e1023fd..21b6d903 100644
--- a/kombu/utils/encoding.py
+++ b/kombu/utils/encoding.py
@@ -21,45 +21,54 @@ default_encoding_file = None
def set_default_encoding_file(file):
+ """Set file used to get codec information."""
global default_encoding_file
default_encoding_file = file
def get_default_encoding_file():
+ """Get file used to get codec information."""
return default_encoding_file
if sys.platform.startswith('java'): # pragma: no cover
def default_encoding(file=None):
+ """Get default encoding."""
return 'utf-8'
else:
def default_encoding(file=None): # noqa
+ """Get default encoding."""
file = file or get_default_encoding_file()
return getattr(file, 'encoding', None) or sys.getfilesystemencoding()
if is_py3k: # pragma: no cover
def str_to_bytes(s):
+ """Convert str to bytes."""
if isinstance(s, str):
return s.encode()
return s
def bytes_to_str(s):
+ """Convert bytes to str."""
if isinstance(s, bytes):
return s.decode()
return s
def from_utf8(s, *args, **kwargs):
+ """Get str from utf-8 encoding."""
return s
def ensure_bytes(s):
+ """Ensure s is bytes, not str."""
if not isinstance(s, bytes):
return str_to_bytes(s)
return s
def default_encode(obj):
+ """Encode using default encoding."""
return obj
str_t = str
@@ -67,17 +76,21 @@ if is_py3k: # pragma: no cover
else:
def str_to_bytes(s): # noqa
+ """Convert str to bytes."""
if isinstance(s, unicode):
return s.encode()
return s
def bytes_to_str(s): # noqa
+ """Convert bytes to str."""
return s
def from_utf8(s, *args, **kwargs): # noqa
+ """Convert utf-8 to ASCII."""
return s.encode('utf-8', *args, **kwargs)
def default_encode(obj, file=None): # noqa
+ """Get default encoding."""
return unicode(obj, default_encoding(file))
str_t = unicode
@@ -91,6 +104,7 @@ except NameError: # pragma: no cover
def safe_str(s, errors='replace'):
+ """Safe form of str(), void of unicode errors."""
s = bytes_to_str(s)
if not isinstance(s, (text_t, bytes)):
return safe_repr(s, errors)
@@ -120,6 +134,7 @@ else:
def safe_repr(o, errors='replace'):
+ """Safe form of repr, void of Unicode errors."""
try:
return repr(o)
except Exception:
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index f5454c5c..523d8d67 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -1,10 +1,4 @@
-"""
-kombu.utils.eventio
-===================
-
-Evented IO support for multiple platforms.
-
-"""
+"""Selector Utilities."""
from __future__ import absolute_import, unicode_literals
import errno
@@ -332,4 +326,5 @@ def _get_poller():
def poll(*args, **kwargs):
+ """Create new poller instance."""
return _get_poller()(*args, **kwargs)
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 114ecb95..55f39512 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -1,3 +1,4 @@
+"""Functional Utilities."""
from __future__ import absolute_import, unicode_literals
import random
@@ -146,7 +147,7 @@ class LRUCache(UserDict):
def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
-
+ """Decorator to cache function return value."""
def _memoize(fun):
mutex = threading.Lock()
cache = Cache(limit=maxsize)
@@ -233,15 +234,18 @@ class lazy(object):
def maybe_evaluate(value):
- """Evaluates if the value is a :class:`lazy` instance."""
+ """Evaluate value only if value is a :class:`lazy` instance."""
if isinstance(value, lazy):
return value.evaluate()
return value
def is_list(l, scalars=(Mapping, string_t), iters=(Iterable,)):
- """Return true if the object is iterable (but not
- if object is a mapping or string)."""
+ """Return true if the object is iterable.
+
+ Note:
+ Returns false if object is a mapping or string.
+ """
return isinstance(l, iters) and not isinstance(l, scalars or ())
@@ -251,7 +255,7 @@ def maybe_list(l, scalars=(Mapping, string_t)):
def dictfilter(d=None, **kw):
- """Remove all keys from dict ``d`` whose value is :const:`None`"""
+ """Remove all keys from dict ``d`` whose value is :const:`None`."""
d = kw if d is None else (dict(d, **kw) if kw else d)
return {k: v for k, v in items(d) if v is not None}
diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index 40518af6..c9801705 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""JSON Serialization Utilities."""
from __future__ import absolute_import, unicode_literals
import datetime
@@ -13,7 +14,7 @@ try:
from django.utils.functional import Promise as DjangoPromise
except ImportError: # pragma: no cover
class DjangoPromise(object): # noqa
- pass
+ """Dummy object."""
try:
import simplejson as json
@@ -33,6 +34,7 @@ _encoder_cls = type(json._default_encoder)
class JSONEncoder(_encoder_cls):
+ """Kombu custom json encoder."""
def default(self, o,
dates=(datetime.datetime, datetime.date),
@@ -61,10 +63,12 @@ class JSONEncoder(_encoder_cls):
def dumps(s, _dumps=json.dumps, cls=JSONEncoder,
default_kwargs=_json_extra_kwargs, **kwargs):
+ """Serialize object to json string."""
return _dumps(s, cls=cls, **dict(default_kwargs, **kwargs))
def loads(s, _loads=json.loads, decode_bytes=IS_PY3):
+ """Deserialize json from string."""
# None of the json implementations supports decoding from
# a buffer/memoryview, or even reading from a stream
# (load is just loads(fp.read()))
diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py
index 13f52a12..65c574ba 100644
--- a/kombu/utils/limits.py
+++ b/kombu/utils/limits.py
@@ -65,8 +65,7 @@ class TokenBucket(object):
return False
def expected_time(self, tokens=1):
- """Get the current exepected time for when a new token is to be
- available.
+ """Return estimated time of token availability.
Returns:
float: the time in seconds.
diff --git a/kombu/utils/objects.py b/kombu/utils/objects.py
index b1f09e96..dced5c5e 100644
--- a/kombu/utils/objects.py
+++ b/kombu/utils/objects.py
@@ -1,9 +1,11 @@
+"""Object Utilities."""
from __future__ import absolute_import, unicode_literals
class cached_property(object):
- """Property descriptor that caches the return value
- of the get function.
+ """Cached property descriptor.
+
+ Caches the return value of the get method on first call.
Examples:
.. code-block:: python
diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py
index e5833fa1..d75710d1 100644
--- a/kombu/utils/scheduling.py
+++ b/kombu/utils/scheduling.py
@@ -1,4 +1,4 @@
-"""Consumer scheduling utilities."""
+"""Scheduling Utilities."""
from __future__ import absolute_import, unicode_literals
from itertools import count
@@ -20,8 +20,16 @@ CYCLE_ALIASES = {
@python_2_unicode_compatible
class FairCycle(object):
- """Consume from a set of resources, where each resource gets
- an equal chance to be consumed from."""
+ """Cycle between resources.
+
+ Consume from a set of resources, where each resource gets
+ an equal chance to be consumed from.
+
+ Arguments:
+ fun (Callable): Callback to call.
+ resources (Sequence[Any]): List of resources.
+ predicate (type): Exception predicate.
+ """
def __init__(self, fun, resources, predicate=Exception):
self.fun = fun
@@ -41,6 +49,7 @@ class FairCycle(object):
raise self.predicate()
def get(self, callback, **kwargs):
+ """Get from next resource."""
succeeded = 0
for tried in count(0): # for infinity
resource = self._next()
@@ -55,22 +64,27 @@ class FairCycle(object):
succeeded += 1
def close(self):
+ """Close cycle."""
pass
def __repr__(self):
+ """``repr(cycle)``."""
return '<FairCycle: {self.pos}/{size} {self.resources}>'.format(
self=self, size=len(self.resources))
class round_robin_cycle(object):
+ """Iterator that cycles between items in round-robin."""
def __init__(self, it=None):
self.items = it if it is not None else []
def update(self, it):
+ """Update items from iterable."""
self.items[:] = it
def consume(self, n):
+ """Consume n items."""
return self.items[:n]
def rotate(self, last_used):
@@ -84,16 +98,21 @@ class round_robin_cycle(object):
class priority_cycle(round_robin_cycle):
+ """Cycle that repeats items in order."""
def rotate(self, last_used):
+ """Unused in this implementation."""
pass
class sorted_cycle(priority_cycle):
+ """Cycle in sorted order."""
def consume(self, n):
+ """Consume n items."""
return sorted(self.items[:n])
def cycle_by_name(name):
+ """Get cycle class by name."""
return symbol_by_name(name, CYCLE_ALIASES)
diff --git a/kombu/utils/text.py b/kombu/utils/text.py
index 717b7ecc..cd9feedc 100644
--- a/kombu/utils/text.py
+++ b/kombu/utils/text.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+"""Text Utilities."""
from __future__ import absolute_import, unicode_literals
from difflib import SequenceMatcher
@@ -8,6 +9,8 @@ from kombu.five import string_t
def escape_regex(p, white=''):
+ # type: (str, str) -> str
+ """Escape string for use within a regular expression."""
# what's up with re.escape? that code must be neglected or someting
return ''.join(c if c.isalnum() or c in white
else ('\\000' if c == '\000' else '\\' + c)
@@ -15,6 +18,12 @@ def escape_regex(p, white=''):
def fmatch_iter(needle, haystack, min_ratio=0.6):
+ # type: (str, Sequence[str], float) -> Iterator[Tuple[float, str]]
+ """Fuzzy match: iteratively.
+
+ Yields:
+ Tuple: of ratio and key.
+ """
for key in haystack:
ratio = SequenceMatcher(None, needle, key).ratio()
if ratio >= min_ratio:
@@ -22,6 +31,8 @@ def fmatch_iter(needle, haystack, min_ratio=0.6):
def fmatch_best(needle, haystack, min_ratio=0.6):
+ # type: (str, Sequence[str], float) -> str
+ """Fuzzy match - Find best match (scalar)."""
try:
return sorted(
fmatch_iter(needle, haystack, min_ratio), reverse=True,
@@ -31,6 +42,8 @@ def fmatch_best(needle, haystack, min_ratio=0.6):
def version_string_as_tuple(s):
+ # type (str) -> version_info_t
+ """Convert version string to version info tuple."""
v = _unpack_version(*s.split('.'))
# X.Y.3a1 -> (X, Y, 3, 'a1')
if isinstance(v.micro, string_t):
@@ -42,10 +55,12 @@ def version_string_as_tuple(s):
def _unpack_version(major, minor=0, micro=0, releaselevel='', serial=''):
+ # type: (int, int, int, str, str) -> version_info_t
return version_info_t(int(major), int(minor), micro, releaselevel, serial)
def _splitmicro(micro, releaselevel='', serial=''):
+ # type: (int, str, str) -> Tuple[int, str, str]
for index, char in enumerate(micro):
if not char.isdigit():
break
diff --git a/kombu/utils/url.py b/kombu/utils/url.py
index 48cad7e4..7842bc28 100644
--- a/kombu/utils/url.py
+++ b/kombu/utils/url.py
@@ -1,5 +1,7 @@
+"""URL Utilities."""
from __future__ import absolute_import, unicode_literals
+from collections import Mapping
from functools import partial
try:
@@ -10,54 +12,82 @@ except ImportError:
from kombu.five import string_t
+from .compat import NamedTuple
+
safequote = partial(quote, safe='')
-def _parse_url(url):
- scheme = urlparse(url).scheme
- schemeless = url[len(scheme) + 3:]
- # parse with HTTP URL semantics
- parts = urlparse('http://' + schemeless)
- path = parts.path or ''
- path = path[1:] if path and path[0] == '/' else path
- return (scheme, unquote(parts.hostname or '') or None, parts.port,
- unquote(parts.username or '') or None,
- unquote(parts.password or '') or None,
- unquote(path or '') or None,
- dict(parse_qsl(parts.query)))
+urlparts = NamedTuple('urlparts', [
+ ('scheme', str),
+ ('hostname', str),
+ ('port', int),
+ ('username', str),
+ ('password', str),
+ ('path', str),
+ ('query', Mapping),
+])
def parse_url(url):
+ # type: (str) -> Dict
+ """Parse URL into mapping of components."""
scheme, host, port, user, password, path, query = _parse_url(url)
return dict(transport=scheme, hostname=host,
port=port, userid=user,
password=password, virtual_host=path, **query)
+def url_to_parts(url):
+ # type: (str) -> urlparts
+ """Parse URL into :class:`urlparts` tuple of components."""
+ scheme = urlparse(url).scheme
+ schemeless = url[len(scheme) + 3:]
+ # parse with HTTP URL semantics
+ parts = urlparse('http://' + schemeless)
+ path = parts.path or ''
+ path = path[1:] if path and path[0] == '/' else path
+ return urlparts(
+ scheme,
+ unquote(parts.hostname or '') or None,
+ parts.port,
+ unquote(parts.username or '') or None,
+ unquote(parts.password or '') or None,
+ unquote(path or '') or None,
+ dict(parse_qsl(parts.query)),
+ )
+_parse_url = url_to_parts # noqa
+
+
def as_url(scheme, host=None, port=None, user=None, password=None,
path=None, query=None, sanitize=False, mask='**'):
- parts = ['{0}://'.format(scheme)]
- if user or password:
- if user:
- parts.append(safequote(user))
- if password:
- if sanitize:
- parts.extend([':', mask] if mask else [':'])
- else:
- parts.extend([':', safequote(password)])
- parts.append('@')
- parts.append(safequote(host) if host else '')
- if port:
- parts.extend([':', port])
- parts.extend(['/', path])
- return ''.join(str(part) for part in parts if part)
+ # type: (str, str, int, str, str, str, str, bool, str) -> str
+ """Generate URL from component parts."""
+ parts = ['{0}://'.format(scheme)]
+ if user or password:
+ if user:
+ parts.append(safequote(user))
+ if password:
+ if sanitize:
+ parts.extend([':', mask] if mask else [':'])
+ else:
+ parts.extend([':', safequote(password)])
+ parts.append('@')
+ parts.append(safequote(host) if host else '')
+ if port:
+ parts.extend([':', port])
+ parts.extend(['/', path])
+ return ''.join(str(part) for part in parts if part)
def sanitize_url(url, mask='**'):
+ # type: (str, str) -> str
+ """Return copy of URL with password removed."""
return as_url(*_parse_url(url), sanitize=True, mask=mask)
def maybe_sanitize_url(url, mask='**'):
+ # type: (Any, str) -> Any
+ """Sanitize url, or do nothing if url undefined."""
if isinstance(url, string_t) and '://' in url:
return sanitize_url(url, mask)
return url
diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py
index 286f07fb..341475bf 100644
--- a/kombu/utils/uuid.py
+++ b/kombu/utils/uuid.py
@@ -1,11 +1,11 @@
+"""UUID utilities."""
from __future__ import absolute_import, unicode_literals
from uuid import uuid4
def uuid(_uuid=uuid4):
- """Generate a unique id, having - hopefully - a very small chance of
- collision.
+ """Generate unique id in UUID4 format.
See Also:
For now this is provided by :func:`uuid.uuid4`.
diff --git a/setup.cfg b/setup.cfg
index 02321d2f..b4362123 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -7,6 +7,11 @@ source-dir = docs/
build-dir = docs/_build
all_files = 1
+[flake8]
+# classes can be lowercase, arguments and variables can be uppercase
+# whenever it makes the code more readable.
+ignore = N806, N802, N801, N803
+
[bdist_rpm]
requires = amqp >= 1.4.5
diff --git a/tox.ini b/tox.ini
index 4beb3809..f57a4ee2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,15 @@
[tox]
-envlist = 2.7,pypy,3.4,3.5,pypy3,flake8,flakeplus,apicheck,cov
+envlist =
+ 2.7
+ pypy
+ 3.4
+ 3.5
+ pypy3
+ flake8
+ flakeplus
+ apicheck
+ pydocstyle
+ cov
[testenv]
sitepackages = False
@@ -12,13 +22,13 @@ deps=
2.7,pypy,jython,cov: -r{toxinidir}/requirements/test-ci-py2.txt
apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt
- flake8,flakeplus: -r{toxinidir}/requirements/pkgutils.txt
+ flake8,flakeplus,pydocstyle: -r{toxinidir}/requirements/pkgutils.txt
commands = pip install -U -r{toxinidir}/requirements/dev.txt
py.test -xv
basepython =
- 2.7,flakeplus,flake8,apicheck,linkcheck,cov: python2.7
+ 2.7,flakeplus,flake8,apicheck,linkcheck,cov,pydocstyle: python2.7
3.3: python3.3
3.4: python3.4
3.5: python3.5
@@ -44,3 +54,7 @@ commands =
[testenv:flakeplus]
commands =
flakeplus --2.7 {toxinidir}/kombu {toxinidir}/t
+
+[testenv:pydocstyle]
+commands =
+ pydocstyle --ignore=D102,D104,D203,D105 kombu