diff options
68 files changed, 595 insertions, 245 deletions
@@ -5,6 +5,7 @@ PYTEST=py.test GIT=git TOX=tox ICONV=iconv +PYDOCSTYLE=pydocstyle FLAKE8=flake8 FLAKEPLUS=flakeplus SPHINX2RST=sphinx2rst @@ -35,6 +36,7 @@ help: @echo " flakes -------- - Check code for syntax and style errors." @echo " flakecheck - Run flake8 on the source code." @echo " flakepluscheck - Run flakeplus on the source code." + @echo " pep257check - Run pep257 on the source code." @echo "readme - Regenerate README.rst file." @echo "contrib - Regenerate CONTRIBUTING.rst file" @echo "clean-dist --------- - Clean all distribution build artifacts." @@ -95,7 +97,10 @@ flakepluscheck: flakeplusdiag: -$(MAKE) flakepluscheck -flakes: flakediag flakeplusdiag +pep257check: + $(PYDOCSTYLE) --ignore=D102,D104,D203,D105 "$(PROJ)" + +flakes: flakediag flakeplusdiag pep257check clean-readme: -rm -f $(README) diff --git a/kombu/__init__.py b/kombu/__init__.py index ca6d2487..ab2789ba 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,4 +1,4 @@ -"""Messaging library for Python""" +"""Messaging library for Python.""" from __future__ import absolute_import, unicode_literals import os @@ -74,6 +74,7 @@ for module, items in all_by_module.items(): class module(ModuleType): + """Customized Python module.""" def __getattr__(self, name): if name in object_origins: diff --git a/kombu/abstract.py b/kombu/abstract.py index e120fbd6..b8c80c0b 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -20,8 +20,11 @@ def _any(v): class Object(object): - """Common base class supporting automatic kwargs->attributes handling, - and cloning.""" + """Common base class. + + Supports automatic kwargs->attributes handling, and cloning. + """ + attrs = () def __init__(self, *args, **kwargs): @@ -54,6 +57,7 @@ class Object(object): @python_2_unicode_compatible class MaybeChannelBound(Object): """Mixin for classes that can be bound to an AMQP channel.""" + _channel = None _is_bound = False @@ -61,7 +65,7 @@ class MaybeChannelBound(Object): can_cache_declaration = False def __call__(self, channel): - """`self(channel) -> self.bind(channel)`""" + """`self(channel) -> self.bind(channel)`.""" return self.bind(channel) def bind(self, channel): diff --git a/kombu/async/aws/__init__.py b/kombu/async/aws/__init__.py index 7872cdb9..0bdc37af 100644 --- a/kombu/async/aws/__init__.py +++ b/kombu/async/aws/__init__.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, unicode_literals def connect_sqs(aws_access_key_id=None, aws_secret_access_key=None, **kwargs): + """Return async connection to Amazon SQS.""" from .sqs.connection import AsyncSQSConnection return AsyncSQSConnection( aws_access_key_id, aws_secret_access_key, **kwargs diff --git a/kombu/async/aws/connection.py b/kombu/async/aws/connection.py index 861f993c..cc907d7b 100644 --- a/kombu/async/aws/connection.py +++ b/kombu/async/aws/connection.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Amazon AWS Connection.""" from __future__ import absolute_import, unicode_literals from io import BytesIO @@ -16,7 +17,7 @@ try: from urllib.parse import urlunsplit except ImportError: from urlparse import urlunsplit # noqa -from xml.sax import parseString as sax_parse +from xml.sax import parseString as sax_parse # noqa try: # pragma: no cover from email import message_from_file @@ -36,6 +37,7 @@ __all__ = [ @python_2_unicode_compatible class AsyncHTTPResponse(object): + """Async HTTP Response.""" def __init__(self, response): self.response = response @@ -77,6 +79,8 @@ class AsyncHTTPResponse(object): @python_2_unicode_compatible class AsyncHTTPConnection(object): + """Async HTTP Connection.""" + Request = Request Response = AsyncHTTPResponse @@ -154,10 +158,13 @@ class AsyncHTTPConnection(object): class AsyncHTTPSConnection(AsyncHTTPConnection): + """Async HTTPS Connection.""" + scheme = 'https' class AsyncConnection(object): + """Async AWS Connection.""" def __init__(self, http_client=None, **kwargs): if boto is None: @@ -193,6 +200,7 @@ class AsyncConnection(object): class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection): + """Async AWS Authn Connection.""" def __init__(self, host, http_client=None, http_client_params={}, **kwargs): @@ -208,6 +216,7 @@ class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection): class AsyncAWSQueryConnection(AsyncConnection, AWSQueryConnection): + """Async AWS Query Connection.""" def __init__(self, host, http_client=None, http_client_params={}, **kwargs): diff --git a/kombu/async/aws/ext.py b/kombu/async/aws/ext.py index df8fe862..b0e497cb 100644 --- a/kombu/async/aws/ext.py +++ b/kombu/async/aws/ext.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- - +"""Amazon boto interface.""" from __future__ import absolute_import, unicode_literals try: diff --git a/kombu/async/aws/sqs/__init__.py b/kombu/async/aws/sqs/__init__.py index 7491cb4c..fe529584 100644 --- a/kombu/async/aws/sqs/__init__.py +++ b/kombu/async/aws/sqs/__init__.py @@ -9,12 +9,14 @@ __all__ = ['regions', 'connect_to_region'] def regions(): + """Return list of known AWS regions.""" if boto is None: raise ImportError('boto is not installed') return get_regions('sqs', connection_cls=AsyncSQSConnection) def connect_to_region(region_name, **kwargs): + """Connect to specific AWS region.""" for region in regions(): if region.name == region_name: return region.connect(**kwargs) diff --git a/kombu/async/aws/sqs/connection.py b/kombu/async/aws/sqs/connection.py index 4b95698c..32b9926b 100644 --- a/kombu/async/aws/sqs/connection.py +++ b/kombu/async/aws/sqs/connection.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Amazon SQS Connection.""" from __future__ import absolute_import, unicode_literals from vine import transform @@ -15,6 +16,7 @@ __all__ = ['AsyncSQSConnection'] class AsyncSQSConnection(AsyncAWSQueryConnection, SQSConnection): + """Async SQS Connection.""" def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, is_secure=True, port=None, proxy=None, proxy_port=None, diff --git a/kombu/async/aws/sqs/ext.py b/kombu/async/aws/sqs/ext.py index 93ce7963..eb48f3e9 100644 --- a/kombu/async/aws/sqs/ext.py +++ b/kombu/async/aws/sqs/ext.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Amazon SQS boto interface.""" from __future__ import absolute_import, unicode_literals diff --git a/kombu/async/aws/sqs/message.py b/kombu/async/aws/sqs/message.py index f9b4d6d3..36359841 100644 --- a/kombu/async/aws/sqs/message.py +++ b/kombu/async/aws/sqs/message.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Amazon SQS message implementation.""" from __future__ import absolute_import, unicode_literals from .ext import ( @@ -12,6 +13,7 @@ __all__ = [ class BaseAsyncMessage(object): + """Base class for messages received on async client.""" def delete(self, callback=None): if self.queue: @@ -25,20 +27,20 @@ class BaseAsyncMessage(object): class AsyncRawMessage(BaseAsyncMessage, RawMessage): - pass + """Raw Message.""" class AsyncMessage(BaseAsyncMessage, Message): - pass + """Serialized message.""" class AsyncMHMessage(BaseAsyncMessage, MHMessage): - pass + """MHM Message (uhm, look that up later).""" class AsyncEncodedMHMessage(BaseAsyncMessage, EncodedMHMessage): - pass + """Encoded MH Message.""" class AsyncJSONMessage(BaseAsyncMessage, JSONMessage): - pass + """Json serialized message.""" diff --git a/kombu/async/aws/sqs/queue.py b/kombu/async/aws/sqs/queue.py index b5237c1d..140ff31a 100644 --- a/kombu/async/aws/sqs/queue.py +++ b/kombu/async/aws/sqs/queue.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Amazon SQS queue implementation.""" from __future__ import absolute_import, unicode_literals from vine import transform @@ -10,10 +11,12 @@ _all__ = ['AsyncQueue'] def list_first(rs): + """Get the first item in a list, or None if list empty.""" return rs[0] if len(rs) == 1 else None class AsyncQueue(_Queue): + """Async SQS Queue.""" def __init__(self, connection=None, url=None, message_class=AsyncMessage): self.connection = connection diff --git a/kombu/async/debug.py b/kombu/async/debug.py index 6385d233..0e5ffbe0 100644 --- a/kombu/async/debug.py +++ b/kombu/async/debug.py @@ -7,6 +7,7 @@ from kombu.utils.functional import reprcall def repr_flag(flag): + """Return description of event loop flag.""" return '{0}{1}{2}'.format('R' if flag & READ else '', 'W' if flag & WRITE else '', '!' if flag & ERR else '') @@ -24,10 +25,12 @@ def _rcb(obj): def repr_active(h): + """Return description of active readers and writers.""" return ', '.join(repr_readers(h) + repr_writers(h)) def repr_events(h, events): + """Return description of events returned by poll.""" return ', '.join( '{0}({1})->{2}'.format( _rcb(callback_for(h, fd, fl, '(GONE)')), fd, @@ -38,16 +41,19 @@ def repr_events(h, events): def repr_readers(h): + """Return description of pending readers.""" return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR)) for fd, cb in items(h.readers)] def repr_writers(h): + """Return description of pending writers.""" return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE)) for fd, cb in items(h.writers)] def callback_for(h, fd, flag, *default): + """Return the callback used for hub+fd+flag.""" try: if flag & READ: return h.readers[fd] diff --git a/kombu/async/http/__init__.py b/kombu/async/http/__init__.py index 1401395e..b64c7bd8 100644 --- a/kombu/async/http/__init__.py +++ b/kombu/async/http/__init__.py @@ -8,11 +8,13 @@ __all__ = ['Client', 'Headers', 'Response', 'Request'] def Client(hub=None, **kwargs): + """Create new HTTP client.""" from .curl import CurlClient return CurlClient(hub, **kwargs) def get_client(hub=None, **kwargs): + """Get or create HTTP client bound to the current event loop.""" hub = hub or get_event_loop() try: return hub._current_http_client diff --git a/kombu/async/http/base.py b/kombu/async/http/base.py index 92c6bdf4..f8a8bc0a 100644 --- a/kombu/async/http/base.py +++ b/kombu/async/http/base.py @@ -1,3 +1,4 @@ +"""Base async HTTP client implementation.""" from __future__ import absolute_import, unicode_literals import sys @@ -26,6 +27,8 @@ def normalize_header(key): class Headers(dict): + """Represents a mapping of HTTP headers.""" + # TODO: This is just a regular dict and will not perform normalization # when looking up keys etc. @@ -176,8 +179,11 @@ class Response(object): self.error = HttpError(self.code, self.status, self) def raise_for_error(self): - """Raise :class:`~kombu.exceptions.HttpError` if the request resulted - in a HTTP error code.""" + """Raise if the request resulted in an HTTP error code. + + Raises: + :class:`~kombu.exceptions.HttpError` + """ if self.error: raise self.error diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py index c9d8f215..5200493d 100644 --- a/kombu/async/http/curl.py +++ b/kombu/async/http/curl.py @@ -1,3 +1,4 @@ +"""HTTP Client using pyCurl.""" from __future__ import absolute_import, unicode_literals from collections import deque @@ -33,6 +34,8 @@ EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) class CurlClient(BaseClient): + """Curl HTTP Client.""" + Curl = Curl def __init__(self, hub=None, max_clients=10): diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 5e3da78d..348e1f4a 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -6,7 +6,7 @@ import errno from contextlib import contextmanager from time import sleep -from types import GeneratorType as generator +from types import GeneratorType as generator # noqa from vine import Thenable, promise @@ -42,10 +42,12 @@ def _dummy_context(*args, **kwargs): def get_event_loop(): + """Get current event loop object.""" return _current_loop def set_event_loop(loop): + """Set the current event loop object.""" global _current_loop _current_loop = loop return loop diff --git a/kombu/async/semaphore.py b/kombu/async/semaphore.py index 66015dc3..af40cc03 100644 --- a/kombu/async/semaphore.py +++ b/kombu/async/semaphore.py @@ -43,8 +43,11 @@ class LaxBoundedSemaphore(object): self._pop_waiter = self._waiting.popleft def acquire(self, callback, *partial_args, **partial_kwargs): - """Acquire semaphore, applying ``callback`` if - the resource is available. + """Acquire semaphore. + + This will immediately apply ``callback`` if + the resource is available, otherwise the callback is suspended + until the semaphore is released. Arguments: callback (Callable): The callback to apply. diff --git a/kombu/async/timer.py b/kombu/async/timer.py index 2c7d91c7..55f7bd6e 100644 --- a/kombu/async/timer.py +++ b/kombu/async/timer.py @@ -33,6 +33,10 @@ scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry')) def to_timestamp(d, default_timezone=utc, time=monotonic): + """Convert datetime to timestamp. + + If d' is already a timestamp, then that will be used. + """ if isinstance(d, datetime): if d.tzinfo is None: d = d.replace(tzinfo=default_timezone) @@ -44,6 +48,8 @@ def to_timestamp(d, default_timezone=utc, time=monotonic): @total_ordering @python_2_unicode_compatible class Entry(object): + """Schedule Entry.""" + if not IS_PYPY: # pragma: no cover __slots__ = ( 'fun', 'args', 'kwargs', 'tref', 'canceled', @@ -85,7 +91,8 @@ class Entry(object): class Timer(object): - """ETA scheduler.""" + """Async timer implementation.""" + Entry = Entry on_error = None @@ -171,9 +178,12 @@ class Timer(object): def __iter__(self, min=min, nowfun=monotonic, pop=heapq.heappop, push=heapq.heappush): - """This iterator yields a tuple of ``(entry, wait_seconds)``, + """Iterate over schedule. + + This iterator yields a tuple of ``(entry, wait_seconds)``, where if entry is :const:`None` the caller should wait - for ``wait_seconds`` until it polls the schedule again.""" + for ``wait_seconds`` until it polls the schedule again. + """ max_interval = self.max_interval queue = self._queue diff --git a/kombu/clocks.py b/kombu/clocks.py index a4610ac6..3de05c0f 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -24,6 +24,7 @@ class timetuple(tuple): id (str): Event host id (e.g. ``hostname:pid``). obj (Any): Optional obj to associate with this event. """ + __slots__ = () def __new__(cls, clock, timestamp, id, obj=None): @@ -99,6 +100,7 @@ class LamportClock(object): the time stamp of the incoming message. """ + #: The clocks current value. value = 0 @@ -117,7 +119,9 @@ class LamportClock(object): return self.value def sort_heap(self, h): - """List of tuples containing at least two elements, representing + """Sort heap of events. + + List of tuples containing at least two elements, representing an event, where the first element is the event's scalar clock value, and the second element is the id of the process (usually ``"hostname:pid"``): ``sh([(clock, processid, ...?), (...)])`` @@ -129,7 +133,6 @@ class LamportClock(object): present. Will return the latest event. - """ if h[0][0] == h[1][0]: same = [] diff --git a/kombu/common.py b/kombu/common.py index fa75cd9f..1005b405 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -58,7 +58,9 @@ def oid_from(instance): class Broadcast(Queue): - """Convenience class used to define broadcast queues. + """Broadcast queue. + + Convenience class used to define broadcast queues. Every queue instance will have a unique name, and both the queue and exchange is configured with auto deletion. @@ -71,6 +73,7 @@ class Broadcast(Queue): **kwargs (Any): See :class:`~kombu.Queue` for a list of additional keyword arguments supported. """ + attrs = Queue.attrs + (('queue', None),) def __init__(self, name=None, queue=None, auto_delete=True, @@ -92,6 +95,7 @@ def declaration_cached(entity, channel): def maybe_declare(entity, channel=None, retry=False, **retry_policy): + """Declare entity (cached).""" is_bound = entity.is_bound orig = entity @@ -136,6 +140,7 @@ def _imaybe_declare(entity, declared, ident, channel, def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): + """Drain messages from consumer instance.""" acc = deque() def on_message(body, message): @@ -154,6 +159,7 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): def itermessages(conn, channel, queue, limit=1, timeout=None, callbacks=None, **kwargs): + """Iterator over messages.""" return drain_consumer( conn.Consumer(queues=[queue], channel=channel, **kwargs), limit=limit, timeout=timeout, callbacks=callbacks, @@ -222,7 +228,7 @@ def send_reply(exchange, req, msg, def collect_replies(conn, channel, queue, *args, **kwargs): - """Generator collecting replies from ``queue``""" + """Generator collecting replies from ``queue``.""" no_ack = kwargs.setdefault('no_ack', True) received = False try: @@ -291,8 +297,11 @@ def revive_connection(connection, channel, on_revive=None): def insured(pool, fun, args, kwargs, errback=None, on_revive=None, **opts): - """Ensures function performing broker commands completes - despite intermittent connection failures.""" + """Function wrapper to handle connection errors. + + Ensures function performing broker commands completes + despite intermittent connection failures. + """ errback = errback or _ensure_errback with pool.acquire(block=True) as conn: @@ -347,6 +356,7 @@ class QoS(object): ... print('prefetch count now: %r' % (prefetch_count,)) >>> QoS(set_qos, 10) """ + prev = None def __init__(self, callback, initial_value): diff --git a/kombu/compat.py b/kombu/compat.py index 322b27ff..0ffdf6e1 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -1,4 +1,4 @@ -"""Carrot compatible interface +"""Carrot compatibility interface. See http://packages.python.org/pypi/carrot for documentation. """ @@ -25,6 +25,8 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None): class Publisher(messaging.Producer): + """Carrot compatible producer.""" + exchange = '' exchange_type = 'direct' routing_key = '' @@ -74,6 +76,8 @@ class Publisher(messaging.Producer): class Consumer(messaging.Consumer): + """Carrot compatible consumer.""" + queue = '' exchange = '' routing_key = '' diff --git a/kombu/connection.py b/kombu/connection.py index 87efa1f1..3db99461 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -109,6 +109,7 @@ class Connection(object): :keyword virtual_host: Default virtual host if not provided in the URL. :keyword port: Default port if not provided in the URL. """ + port = None virtual_host = '/' connect_timeout = 5 @@ -212,16 +213,18 @@ class Connection(object): self.declared_entities = set() def switch(self, url): - """Switch connection parameters to use a new URL (does not - reconnect)""" + """Switch connection parameters to use a new URL. + + Note: + Does not reconnect! + """ self.close() self.declared_entities.clear() self._closed = False self._init_params(**dict(self._initial_params, **parse_url(url))) def maybe_switch_next(self): - """Switch to next URL given by the current failover strategy (if - any).""" + """Switch to next URL given by the current failover strategy.""" if self.cycle: self.switch(next(self.cycle)) @@ -268,7 +271,9 @@ class Connection(object): return chan def heartbeat_check(self, rate=2): - """Allow the transport to perform any periodic tasks + """Check heartbeats. + + Allow the transport to perform any periodic tasks required to make heartbeats work. This should be called approximately every second. @@ -437,8 +442,9 @@ class Connection(object): def ensure(self, obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None): - """Ensure operation completes, regardless of any channel/connection - errors occurring. + """Ensure operation completes. + + Regardless of any channel/connection errors occurring. Retries by establishing the connection, and reapplying the function. @@ -578,8 +584,7 @@ class Connection(object): return transport_cls def clone(self, **kwargs): - """Create a copy of the connection with the same connection - settings.""" + """Create a copy of the connection with same settings.""" return self.__class__(**dict(self._info(resolve=False), **kwargs)) def get_heartbeat_interval(self): @@ -696,20 +701,20 @@ class Connection(object): return ChannelPool(self, limit, **kwargs) def Producer(self, channel=None, *args, **kwargs): - """Create new :class:`kombu.Producer` instance using this - connection.""" + """Create new :class:`kombu.Producer` instance.""" from .messaging import Producer return Producer(channel or self, *args, **kwargs) def Consumer(self, queues=None, channel=None, *args, **kwargs): - """Create new :class:`kombu.Consumer` instance using this - connection.""" + """Create new :class:`kombu.Consumer` instance.""" from .messaging import Consumer return Consumer(channel or self, queues, *args, **kwargs) def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs): - """Create new :class:`~kombu.simple.SimpleQueue`, using a channel + """Simple persistent queue API. + + Create new :class:`~kombu.simple.SimpleQueue`, using a channel from this connection. If ``name`` is a string, a queue and exchange will be automatically @@ -733,7 +738,9 @@ class Connection(object): def SimpleBuffer(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs): - """Create new :class:`~kombu.simple.SimpleQueue` using a channel + """Simple ephemeral queue API. + + Create new :class:`~kombu.simple.SimpleQueue` using a channel from this connection. See Also: @@ -756,11 +763,9 @@ class Connection(object): return exchange_type in self.transport.implements.exchange_type def __repr__(self): - """``x.__repr__() <==> repr(x)``""" return '<Connection: {0} at {1:#x}>'.format(self.as_uri(), id(self)) def __copy__(self): - """``x.__copy__() <==> copy(x)``""" return self.clone() def __reduce__(self): @@ -801,8 +806,9 @@ class Connection(object): @property def default_channel(self): - """Default channel, created upon access and closed when the connection - is closed. + """Default channel. + + Created upon access and closed when the connection is closed. Note: Can be used for automatic channel handling when you only need one @@ -829,8 +835,13 @@ class Connection(object): @cached_property def manager(self): - """Experimental manager that can be used to manage/monitor the broker - instance. Not available for all transports.""" + """AMQP Management API. + + Experimental manager that can be used to manage/monitor the broker + instance. + + Not available for all transports. + """ return self.transport.manager def get_manager(self, *args, **kwargs): @@ -838,8 +849,11 @@ class Connection(object): @cached_property def recoverable_connection_errors(self): - """List of connection related exceptions that can be recovered from, - but where the connection must be closed and re-established first.""" + """Recoverable connection errors. + + List of connection related exceptions that can be recovered from, + but where the connection must be closed and re-established first. + """ try: return self.transport.recoverable_connection_errors except AttributeError: @@ -851,8 +865,11 @@ class Connection(object): @cached_property def recoverable_channel_errors(self): - """List of channel related exceptions that can be automatically - recovered from without re-establishing the connection.""" + """Recoverable channel errors. + + List of channel related exceptions that can be automatically + recovered from without re-establishing the connection. + """ try: return self.transport.recoverable_channel_errors except AttributeError: @@ -879,6 +896,8 @@ BrokerConnection = Connection class ConnectionPool(Resource): + """Pool of connections.""" + LimitExceeded = exceptions.ConnectionLimitExceeded close_after_fork = True @@ -921,6 +940,8 @@ class ConnectionPool(Resource): class ChannelPool(Resource): + """Pool of channels.""" + LimitExceeded = exceptions.ChannelLimitExceeded def __init__(self, connection, limit=None, **kwargs): @@ -944,8 +965,11 @@ class ChannelPool(Resource): def maybe_channel(channel): - """Return the default channel if argument is a connection instance, - otherwise just return the channel given.""" + """Get channel from object. + + Return the default channel if argument is a connection instance, + otherwise just return the channel given. + """ if is_connection(channel): return channel.default_channel return channel diff --git a/kombu/entity.py b/kombu/entity.py index 9c09f8b5..f84c2fe9 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -31,6 +31,7 @@ def pretty_bindings(bindings): def maybe_delivery_mode( v, modes=DELIVERY_MODES, default=PERSISTENT_DELIVERY_MODE): + """Get delivery mode by name (or none if undefined).""" if v: return v if isinstance(v, numbers.Integral) else modes[v] return default @@ -129,6 +130,7 @@ class Exchange(MaybeChannelBound): no_declare (bool): Never declare this exchange (:meth:`declare` does nothing). """ + TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE @@ -185,7 +187,7 @@ class Exchange(MaybeChannelBound): def bind_to(self, exchange='', routing_key='', arguments=None, nowait=False, **kwargs): - """Binds the exchange to another exchange. + """Bind the exchange to another exchange. Arguments: nowait (bool): If set the server will not respond, and the call @@ -460,6 +462,7 @@ class Queue(MaybeChannelBound): no_declare (bool): Never declare this queue, nor related entities (:meth:`declare` does nothing). """ + ContentDisallowed = ContentDisallowed name = '' @@ -522,8 +525,7 @@ class Queue(MaybeChannelBound): self.exchange = self.exchange(self.channel) def declare(self, nowait=False): - """Declares the queue, the exchange and binds the queue to - the exchange.""" + """Declare queue and exchange then binds queue to exchange.""" if not self.no_declare: # - declare main binding. self._create_exchange(nowait=nowait) @@ -718,7 +720,7 @@ class Queue(MaybeChannelBound): return not self.auto_delete @classmethod - def from_dict(self, queue, **options): + def from_dict(cls, queue, **options): binding_key = options.get('binding_key') or options.get('routing_key') e_durable = options.get('exchange_durable') diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 05b94700..139f1cfa 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -1,7 +1,7 @@ """Exceptions.""" from __future__ import absolute_import, unicode_literals -from socket import timeout as TimeoutError +from socket import timeout as TimeoutError # noqa from amqp import ChannelError, ConnectionError, ResourceError @@ -14,17 +14,16 @@ __all__ = [ 'ChannelLimitExceeded', 'ConnectionError', 'ChannelError', 'VersionMismatch', 'SerializerNotInstalled', 'ResourceError', 'SerializationError', 'EncodeError', 'DecodeError', 'HttpError', + 'InconsistencyError', ] class KombuError(Exception): """Common subclass for all Kombu exceptions.""" - pass class OperationalError(KombuError): """Recoverable message transport connection error.""" - pass class SerializationError(KombuError): @@ -33,7 +32,6 @@ class SerializationError(KombuError): class EncodeError(SerializationError): """Cannot encode object.""" - pass class DecodeError(SerializationError): @@ -42,51 +40,46 @@ class DecodeError(SerializationError): class NotBoundError(KombuError): """Trying to call channel dependent method on unbound entity.""" - pass class MessageStateError(KombuError): """The message has already been acknowledged.""" - pass class LimitExceeded(KombuError): """Limit exceeded.""" - pass class ConnectionLimitExceeded(LimitExceeded): """Maximum number of simultaneous connections exceeded.""" - pass class ChannelLimitExceeded(LimitExceeded): """Maximum number of simultaneous channels exceeded.""" - pass class VersionMismatch(KombuError): - pass + """Library dependency version mismatch.""" class SerializerNotInstalled(KombuError): - """Support for the requested serialization type is not installed""" - pass + """Support for the requested serialization type is not installed.""" class ContentDisallowed(SerializerNotInstalled): """Consumer does not allow this content-type.""" - pass class InconsistencyError(ConnectionError): - """Data or environment has been found to be inconsistent, - depending on the cause it may be possible to retry the operation.""" - pass + """Data or environment has been found to be inconsistent. + + Depending on the cause it may be possible to retry the operation. + """ @python_2_unicode_compatible class HttpError(Exception): + """HTTP Client Error.""" def __init__(self, code, message=None, response=None): self.code = code diff --git a/kombu/log.py b/kombu/log.py index 7428e5e1..569cb09b 100644 --- a/kombu/log.py +++ b/kombu/log.py @@ -1,3 +1,4 @@ +"""Logging Utilities.""" from __future__ import absolute_import, unicode_literals import logging @@ -25,6 +26,7 @@ DISABLE_TRACEBACKS = os.environ.get('DISABLE_TRACEBACKS') def get_logger(logger): + """Get logger by name.""" if isinstance(logger, string_t): logger = logging.getLogger(logger) if not logger.handlers: @@ -33,6 +35,7 @@ def get_logger(logger): def get_loglevel(level): + """Get loglevel by name.""" if isinstance(level, string_t): return LOG_LEVELS[level] return level @@ -53,6 +56,7 @@ def safeify_format(fmt, args, class LogMixin(object): + """Mixin that adds severity methods to any class.""" def debug(self, *args, **kwargs): return self.log(logging.DEBUG, *args, **kwargs) @@ -128,6 +132,7 @@ class Log(LogMixin): def setup_logging(loglevel=None, logfile=None): + """Setup logging.""" logger = logging.getLogger() loglevel = get_loglevel(loglevel or 'ERROR') logfile = logfile if logfile else sys.__stderr__ diff --git a/kombu/message.py b/kombu/message.py index 5a8afc19..a7c6ca9e 100644 --- a/kombu/message.py +++ b/kombu/message.py @@ -9,6 +9,8 @@ from .five import python_2_unicode_compatible, reraise, text_t from .serialization import loads from .utils.functional import dictfilter +__all__ = ['Message'] + ACK_STATES = {'ACK', 'REJECTED', 'REQUEUED'} IS_PYPY = hasattr(sys, 'pypy_version_info') @@ -68,7 +70,7 @@ class Message(object): callback(self, exc) def ack(self, multiple=False): - """Acknowledge this message as being processed., + """Acknowledge this message as being processed. This will remove the message from the queue. @@ -140,8 +142,9 @@ class Message(object): self._state = 'REQUEUED' def decode(self): - """Deserialize the message body, returning the original - python structure sent by the publisher. + """Deserialize the message body. + + Returning the original python structure sent by the publisher. Note: The return value is memoized, use `_decode` to force diff --git a/kombu/messaging.py b/kombu/messaging.py index ee2ab7e0..f85c4fe3 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -98,8 +98,7 @@ class Producer(object): self.exchange.declare() def maybe_declare(self, entity, retry=False, **retry_policy): - """Declare the exchange if it hasn't already been declared - during this session.""" + """Declare exchange if not already declared during this session.""" if entity: return maybe_declare(entity, self.channel, retry, **retry_policy) @@ -289,6 +288,7 @@ class Consumer(object): on_decode_error (Callable): see :attr:`on_decode_error`. prefetch_count (int): see :attr:`prefetch_count`. """ + ContentDisallowed = ContentDisallowed #: The connection/channel to use for this consumer. @@ -496,8 +496,7 @@ class Consumer(object): self._queues.pop(qname, None) def consuming_from(self, queue): - """Return :const:`True` if the consumer is currently - consuming from queue'.""" + """Return :const:`True` if currently consuming from queue'.""" name = queue if isinstance(queue, Queue): name = queue.name diff --git a/kombu/mixins.py b/kombu/mixins.py index 112ce9f4..310d0c63 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -1,11 +1,5 @@ # -*- coding: utf-8 -*- -""" -kombu.mixins -============ - -Useful mixin classes. - -""" +"""Mixins.""" from __future__ import absolute_import, unicode_literals import socket @@ -254,7 +248,9 @@ class ConsumerMixin(object): class ConsumerProducerMixin(ConsumerMixin): - """Version of ConsumerMixin having separate connection for also + """Consumer and Producer mixin. + + Version of ConsumerMixin having separate connection for also publishing messages. Example: @@ -280,6 +276,7 @@ class ConsumerProducerMixin(ConsumerMixin): retry=True, ) """ + _producer_connection = None def on_consume_end(self, connection, channel): diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 99485ca8..433db15a 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -37,6 +37,7 @@ debug, error = logger.debug, logger.error class Node(object): + """Mailbox node.""" #: hostname of the node. hostname = None @@ -137,6 +138,8 @@ class Node(object): class Mailbox(object): + """Process Mailbox.""" + node_cls = Node exchange_fmt = '%s.pidbox' reply_exchange_fmt = 'reply.%s.pidbox' diff --git a/kombu/pools.py b/kombu/pools.py index ed5fc521..b007b47d 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -25,6 +25,8 @@ def _after_fork_cleanup_group(group): class ProducerPool(Resource): + """Pool of :class:`kombu.Producer` instances.""" + Producer = Producer close_after_fork = True @@ -75,6 +77,7 @@ class ProducerPool(Resource): class PoolGroup(EqualityDict): + """Collection of resource pools.""" def __init__(self, limit=None, close_after_fork=True): self.limit = limit @@ -94,11 +97,13 @@ class PoolGroup(EqualityDict): def register_group(group): + """Register group (can be used as decorator).""" _groups.append(group) return group class Connections(PoolGroup): + """Collection of connection pools.""" def create(self, connection, limit): return connection.Pool(limit=limit) @@ -106,6 +111,7 @@ connections = register_group(Connections(limit=use_global_limit)) class Producers(PoolGroup): + """Collection of producer pools.""" def create(self, connection, limit): return ProducerPool(connections[connection], limit=limit) @@ -117,10 +123,12 @@ def _all_pools(): def get_limit(): + """Get current connection pool limit.""" return _limit[0] def set_limit(limit, force=False, reset_after=False, ignore_errors=False): + """Set new connection pool limit.""" limit = limit or 0 glimit = _limit[0] or 0 if limit != glimit: @@ -131,6 +139,7 @@ def set_limit(limit, force=False, reset_after=False, ignore_errors=False): def reset(*args, **kwargs): + """Reset all pools by closing open resources.""" for pool in _all_pools(): try: pool.force_close_all() diff --git a/kombu/resource.py b/kombu/resource.py index 11e44366..866ff933 100644 --- a/kombu/resource.py +++ b/kombu/resource.py @@ -19,12 +19,15 @@ def _after_fork_cleanup_resource(resource): class LifoQueue(_LifoQueue): + """Last in first out version of Queue.""" def _init(self, maxsize): self.queue = deque() class Resource(object): + """Pool of resources.""" + LimitExceeded = exceptions.LimitExceeded close_after_fork = False @@ -114,8 +117,10 @@ class Resource(object): pass def replace(self, resource): - """Replace resource with a new instance. This can be used in case - of defective resources.""" + """Replace existing resource with a new instance. + + This can be used in case of defective resources. + """ if self.limit: self._dirty.discard(resource) self.close_resource(resource) diff --git a/kombu/serialization.py b/kombu/serialization.py index 78233ad6..a9c73d1a 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -159,7 +159,9 @@ class SerializerRegistry(object): 'No encoder installed for {0}'.format(name)) def dumps(self, data, serializer=None): - """Serialize a data structure into a string suitable for sending + """Encode data. + + Serialize a data structure into a string suitable for sending as an AMQP message body. Arguments: @@ -222,7 +224,9 @@ class SerializerRegistry(object): def loads(self, data, content_type, content_encoding, accept=None, force=False, _trusted_content=TRUSTED_CONTENT): - """Deserialize a data stream as serialized using `dumps` + """Decode serialized data. + + Deserialize a data stream as serialized using `dumps` based on `content_type`. Arguments: @@ -321,8 +325,11 @@ def register_yaml(): except ImportError: def not_available(*args, **kwargs): - """In case a client receives a yaml message, but yaml - isn't installed.""" + """Raise SerializerNotInstalled. + + Used in case a client receives a yaml message, but yaml + isn't installed. + """ raise SerializerNotInstalled( 'No decoder installed for YAML. Install the PyYAML library') registry.register('yaml', None, not_available, 'application/x-yaml') @@ -338,9 +345,11 @@ else: def register_pickle(): - """The fastest serialization method, but restricts - you to python clients.""" + """Register pickle serializer. + The fastest serialization method, but restricts + you to python clients. + """ def pickle_dumps(obj, dumper=pickle.dumps): return dumper(obj, protocol=pickle_protocol) @@ -350,7 +359,11 @@ def register_pickle(): def register_msgpack(): - """See http://msgpack.sourceforge.net/""" + """Register msgpack serializer. + + See Also: + http://msgpack.sourceforge.net/. + """ pack = unpack = None try: import msgpack diff --git a/kombu/simple.py b/kombu/simple.py index 3f17a778..9da31c9b 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -84,7 +84,7 @@ class SimpleBase(object): self._consuming = True def __len__(self): - """`len(self) -> self.qsize()`""" + """`len(self) -> self.qsize()`.""" return self.qsize() def __bool__(self): @@ -93,6 +93,8 @@ class SimpleBase(object): class SimpleQueue(SimpleBase): + """Simple API for persistent queues.""" + no_ack = False queue_opts = {} exchange_opts = {'type': 'direct'} @@ -123,6 +125,8 @@ class SimpleQueue(SimpleBase): class SimpleBuffer(SimpleQueue): + """Simple API for ephemeral queues.""" + no_ack = True queue_opts = dict(durable=False, auto_delete=True) diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 689046f7..3b0689b3 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -26,6 +26,8 @@ CHARS_REPLACE_TABLE = { class Channel(virtual.Channel): + """SLMQ Channel.""" + default_visibility_timeout = 1800 # 30 minutes. domain_format = 'kombu%(vhost)s' _slmq = None @@ -59,7 +61,7 @@ class Channel(virtual.Channel): return text_t(safe_str(name)).translate(table) def _new_queue(self, queue, **kwargs): - """Ensures a queue exists in SLQS.""" + """Ensure a queue exists in SLQS.""" queue = self.entity_name(self.queue_name_prefix + queue) try: return self._queue_cache[queue] @@ -73,7 +75,7 @@ class Channel(virtual.Channel): return q def _delete(self, queue, *args, **kwargs): - """delete queue by name.""" + """Delete queue by name.""" queue_name = self.entity_name(queue) self._queue_cache.pop(queue_name, None) self.slmq.queue(queue_name).delete(force=True) @@ -169,6 +171,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """SLMQ Transport.""" + Channel = Channel polling_interval = 1 diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 0f1c65fb..5b8e5726 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -71,6 +71,7 @@ SQS_MAX_MESSAGES = 10 def maybe_int(x): + """Try to convert x' to int, or return x' if that fails.""" try: return int(x) except ValueError: @@ -78,6 +79,8 @@ def maybe_int(x): class Channel(virtual.Channel): + """SQS Channel.""" + default_region = 'us-east-1' default_visibility_timeout = 1800 # 30 minutes. default_wait_time_seconds = 10 # disabled see #198 @@ -454,6 +457,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """SQS Transport.""" + Channel = Channel polling_interval = 1 diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 2bfc9f5d..7fb689ed 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -7,6 +7,7 @@ from kombu.utils.imports import symbol_by_name def supports_librabbitmq(): + """Return true if :pypi:`librabbitmq` can be used.""" if _detect_environment() == 'default': try: import librabbitmq # noqa @@ -40,6 +41,13 @@ _transport_cache = {} def resolve_transport(transport=None): + """Get transport by name. + + Arguments: + transport (Union[str, type]): This can be either + an actual transport class, or the fully qualified + path to a transport class, or the alias of a transport. + """ if isinstance(transport, string_t): try: transport = TRANSPORT_ALIASES[transport] diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 4e2d6a9d..6d381b83 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -20,6 +20,8 @@ def _LeftBlank(obj, method): class StdChannel(object): + """Standard channel base class.""" + no_ack_consumers = None def Consumer(self, *args, **kwargs): @@ -34,8 +36,12 @@ class StdChannel(object): raise _LeftBlank(self, 'get_bindings') def after_reply_message_received(self, queue): - """reply queue semantics: can be used to delete the queue - after transient reply message received.""" + """Callback called after RPC reply received. + + Notes: + Reply queue semantics: can be used to delete the queue + after transient reply message received. + """ pass def __enter__(self): @@ -46,6 +52,7 @@ class StdChannel(object): class Management(object): + """AMQP Management API (incomplete).""" def __init__(self, transport): self.transport = transport @@ -55,6 +62,7 @@ class Management(object): class Implements(dict): + """Helper class used to define transport features.""" def __getattr__(self, key): try: @@ -78,6 +86,7 @@ default_transport_capabilities = Implements( class Transport(object): """Base class for transports.""" + Management = Management #: The :class:`~kombu.Connection` owning this instance. diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py index 2598e473..26dca7ac 100644 --- a/kombu/transport/consul.py +++ b/kombu/transport/consul.py @@ -32,7 +32,7 @@ DEFAULT_HOST = 'localhost' class LockError(Exception): - pass + """An error occurred while trying to acquire the lock.""" class Channel(virtual.Channel): @@ -65,7 +65,9 @@ class Channel(virtual.Channel): return '{0}/{1}'.format(self.prefix, queue) def _get_or_create_session(self, queue): - """Try to renew the session if it exists, otherwise create a new + """Get or create consul session. + + Try to renew the session if it exists, otherwise create a new session in Consul. This session is used to acquire a lock inside Consul so that we achieve @@ -77,7 +79,6 @@ class Channel(virtual.Channel): Returns: str: The ID of the session. """ - try: session_id = self.queues[queue]['session_id'] except KeyError: @@ -141,8 +142,9 @@ class Channel(virtual.Channel): raise raising() def _release_lock(self, queue): - """Try to release a lock. It does so by simply removing the lock key - in Consul. + """Try to release a lock. + + It does so by simply removing the lock key in Consul. Arguments: queue (str): The name of the queue we want to release @@ -152,8 +154,9 @@ class Channel(virtual.Channel): self.client.kv.delete(key=self._lock_key(queue)) def _destroy_session(self, queue): - """Destroy a previously created Consul session and - release all locks it still might hold. + """Destroy a previously created Consul session. + + Will release all locks it still might hold. Arguments: queue (str): The name of the Queue. diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py index afdd7763..cc7cfc21 100644 --- a/kombu/transport/etcd.py +++ b/kombu/transport/etcd.py @@ -33,9 +33,7 @@ DEFAULT_HOST = 'localhost' class Channel(virtual.Channel): - """ - Etcd Channel class which talks to the Etcd. - """ + """Etcd Channel class which talks to the Etcd.""" prefix = 'kombu' index = None @@ -44,9 +42,6 @@ class Channel(virtual.Channel): lock_ttl = 10 def __init__(self, *args, **kwargs): - """ - Creates a new instance of the etcd.Channel. - """ if etcd is None: raise ImportError('Missing python-etcd library') @@ -62,8 +57,7 @@ class Channel(virtual.Channel): self.client = etcd.Client(host=host, port=int(port)) def _key_prefix(self, queue): - """ - Creates and returns the `queue` with the proper prefix. + """Create and return the `queue` with the proper prefix. Arguments: queue (str): The name of the queue. @@ -94,8 +88,7 @@ class Channel(virtual.Channel): lock.release() def _new_queue(self, queue, **_): - """ - Creates a new `queue` if the `queue` doesn't already exist. + """Create a new `queue` if the `queue` doesn't already exist. Arguments: queue (str): The name of the queue. @@ -110,8 +103,7 @@ class Channel(virtual.Channel): return self.client.read(key=self._key_prefix(queue)) def _has_queue(self, queue, **kwargs): - """ - Verify that queue exists. + """Verify that queue exists. Returns: bool: Should return :const:`True` if the queue exists @@ -124,8 +116,7 @@ class Channel(virtual.Channel): return False def _delete(self, queue, *args, **_): - """ - Deletes a `queue`. + """Delete a `queue`. Arguments: queue (str): The name of the queue. @@ -186,8 +177,7 @@ class Channel(virtual.Channel): raise Empty() def _purge(self, queue): - """ - Removes all `message`s from a `queue`. + """Remove all `message`s from a `queue`. Arguments: queue (str): The name of the queue. @@ -198,8 +188,7 @@ class Channel(virtual.Channel): return self.client.delete(key=key, recursive=True) def _size(self, queue): - """ - Returns the size of the `queue`. + """Return the size of the `queue`. Arguments: queue (str): The name of the queue. @@ -227,9 +216,7 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): - """ - Etcd storage Transport for Kombu. - """ + """Etcd storage Transport for Kombu.""" Channel = Channel @@ -242,9 +229,7 @@ class Transport(virtual.Transport): exchange_type=frozenset(['direct'])) def __init__(self, *args, **kwargs): - """ - Creates a new instance of etcd.Transport. - """ + """Create a new instance of etcd.Transport.""" if etcd is None: raise ImportError('Missing python-etcd library') @@ -259,9 +244,7 @@ class Transport(virtual.Transport): ) def verify_connection(self, connection): - """ - Verifies the connection works. - """ + """Verify the connection works.""" port = connection.client.port or self.default_port host = connection.client.hostname or DEFAULT_HOST @@ -276,8 +259,7 @@ class Transport(virtual.Transport): return False def driver_version(self): - """ - Returns the version of the etcd library. + """Return the version of the etcd library. .. note:: diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index 1bace4cf..b27dfdb8 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -1,4 +1,4 @@ -"""File-system Transport +"""File-system Transport. Transport using the file-system as the message store. """ @@ -34,10 +34,12 @@ if os.name == 'nt': __overlapped = pywintypes.OVERLAPPED() def lock(file, flags): + """Create file lock.""" hfile = win32file._get_osfhandle(file.fileno()) win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped) def unlock(file): + """Remove file lock.""" hfile = win32file._get_osfhandle(file.fileno()) win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped) @@ -47,9 +49,11 @@ elif os.name == 'posix': from fcntl import LOCK_EX, LOCK_SH, LOCK_NB # noqa def lock(file, flags): # noqa + """Create file lock.""" fcntl.flock(file.fileno(), flags) def unlock(file): # noqa + """Remove file lock.""" fcntl.flock(file.fileno(), fcntl.LOCK_UN) else: raise RuntimeError( @@ -57,10 +61,10 @@ else: class Channel(virtual.Channel): + """Filesystem Channel.""" def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" - filename = '%s_%s.%s.msg' % (int(round(monotonic() * 1000)), uuid.uuid4(), queue) filename = os.path.join(self.data_folder_out, filename) @@ -78,7 +82,6 @@ class Channel(virtual.Channel): def _get(self, queue): """Get next message from `queue`.""" - queue_find = '.' + queue + '.msg' folder = os.listdir(self.data_folder_in) folder = sorted(folder) @@ -180,6 +183,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """Filesystem Transport.""" + Channel = Channel default_port = 0 diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index 00e37340..93de2ed0 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -30,6 +30,7 @@ ssl not supported by librabbitmq, please use pyamqp:// or stunnel\ class Message(base.Message): + """AMQP Message (librabbitmq).""" def __init__(self, channel, props, info, body): super(Message, self).__init__( @@ -44,6 +45,8 @@ class Message(base.Message): class Channel(amqp.Channel, base.StdChannel): + """AMQP Channel (librabbitmq).""" + Message = Message def prepare_message(self, body, priority=None, @@ -59,11 +62,15 @@ class Channel(amqp.Channel, base.StdChannel): class Connection(amqp.Connection): + """AMQP Connection (librabbitmq).""" + Channel = Channel Message = Message class Transport(base.Transport): + """AMQP Transport (librabbitmq).""" + Connection = Connection default_port = DEFAULT_PORT diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 419212f3..0674e288 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -8,6 +8,8 @@ from . import virtual class Channel(virtual.Channel): + """In-memory Channel.""" + queues = {} do_restore = False supports_fanout = True @@ -60,6 +62,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """In-memory Transport.""" + Channel = Channel #: memory backend state is global. diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 7620a2e6..692cb189 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -78,6 +78,8 @@ class BroadcastCursor(object): class Channel(virtual.Channel): + """MongoDB Channel.""" + supports_fanout = True # Mutable container. Shared by all class instances @@ -318,8 +320,7 @@ class Channel(virtual.Channel): capped=True) def _ensure_indexes(self, database): - """Ensure indexes on collections. - """ + """Ensure indexes on collections.""" messages = database[self.messages_collection] messages.ensure_index( [('queue', 1), ('priority', 1), ('_id', 1)], background=True, @@ -395,8 +396,11 @@ class Channel(virtual.Channel): return ret def _get_expire(self, queue, argument): - """Gets expiration header named `argument` of queue definition. - `queue` must be either queue name or options itself.""" + """Get expiration header named `argument` of queue definition. + + Note: + `queue` must be either queue name or options itself. + """ if isinstance(queue, basestring): doc = self.queues.find_one({'_id': queue}) @@ -415,7 +419,7 @@ class Channel(virtual.Channel): return self.get_now() + datetime.timedelta(milliseconds=value) def _update_queues_expire(self, queue): - """Updates expiration field on queues documents.""" + """Update expiration field on queues documents.""" expire_at = self._get_expire(queue, 'x-expires') if not expire_at: @@ -434,6 +438,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """MongoDB Transport.""" + Channel = Channel can_parse_url = True diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index ec510de2..bc424653 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -14,6 +14,7 @@ DEFAULT_SSL_PORT = 5671 class Message(base.Message): + """AMQP Message.""" def __init__(self, channel, msg, **kwargs): props = msg.properties @@ -30,12 +31,14 @@ class Message(base.Message): class Channel(amqp.Channel, base.StdChannel): + """AMQP Channel.""" + Message = Message def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None, _Message=amqp.Message): - """Prepares message so that it can be sent using this transport.""" + """Prepare message so that it can be sent using this transport.""" return _Message( body, priority=priority, @@ -51,10 +54,14 @@ class Channel(amqp.Channel, base.StdChannel): class Connection(amqp.Connection): + """AMQP Connection.""" + Channel = Channel class Transport(base.Transport): + """AMQP Transport.""" + Connection = Connection default_port = DEFAULT_PORT @@ -159,6 +166,7 @@ class Transport(base.Transport): class SSLTransport(Transport): + """AMQP SSL Transport.""" def __init__(self, *args, **kwargs): super(SSLTransport, self).__init__(*args, **kwargs) diff --git a/kombu/transport/pyro.py b/kombu/transport/pyro.py index 8fb15ad6..67656aa4 100644 --- a/kombu/transport/pyro.py +++ b/kombu/transport/pyro.py @@ -24,6 +24,7 @@ Unable to locate pyro nameserver {0.virtual_host} on host {0.hostname}\ class Channel(virtual.Channel): + """Pyro Channel.""" def queues(self): return self.shared_queues.get_queue_names() @@ -64,6 +65,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """Pyro Transport.""" + Channel = Channel #: memory backend state is global. diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index 9afa8474..a79c7e73 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -1,6 +1,4 @@ -""" -kombu.transport.qpid -======================= +"""Qpid Transport. `Qpid`_ transport using `qpid-python`_ as the client and `qpid-tools`_ for broker management. @@ -77,7 +75,6 @@ The :attr:`~kombu.Connection.transport_options` argument to the options override and replace any other default or specified values. If using Celery, this can be accomplished by setting the *BROKER_TRANSPORT_OPTIONS* Celery option. - """ from __future__ import absolute_import, unicode_literals @@ -138,8 +135,10 @@ PY3 = sys.version_info[0] == 3 def dependency_is_none(dependency): - """Return True if the dependency is None, otherwise False. This is done - using a function so that tests can mock this behavior easily. + """Return True if the dependency is None, otherwise False. + + This is done using a function so that tests can mock this + behavior easily. :param dependency: The module to check if it is None :return: True if dependency is None otherwise False. @@ -149,7 +148,7 @@ def dependency_is_none(dependency): class AuthenticationFailure(Exception): - pass + """Cannot authenticate with Qpid.""" class QoS(object): @@ -186,8 +185,7 @@ class QoS(object): self._not_yet_acked = OrderedDict() def can_consume(self): - """Return True if the :class:`~kombu.transport.qpid.Channel` can - consume more messages, else False. + """Return True if the :class:`Channel` can consume more messages. Used to ensure the client adheres to currently active prefetch limits. @@ -204,8 +202,7 @@ class QoS(object): ) def can_consume_max_estimate(self): - """Return the remaining message capacity for the associated - :class:`kombu.transport.qpid.Channel`. + """Return the remaining message capacity. Returns an estimated number of outstanding messages that a :class:`kombu.transport.qpid.Channel` can accept without @@ -238,8 +235,9 @@ class QoS(object): self._not_yet_acked[delivery_tag] = message def get(self, delivery_tag): - """Get an un-ACKed message by delivery_tag. If called with an invalid - delivery_tag a :exc:`KeyError` is raised. + """Get an un-ACKed message by delivery_tag. + + If called with an invalid delivery_tag a :exc:`KeyError` is raised. :param delivery_tag: The delivery tag associated with the message to be returned. @@ -431,7 +429,7 @@ class Channel(base.StdChannel): return message def _put(self, routing_key, message, exchange=None, **kwargs): - """Synchronous send of a single message onto a queue or exchange. + """Synchronously send a single message onto a queue or exchange. An internal method which synchronously sends a single message onto a given queue or exchange. If exchange is not specified, @@ -732,7 +730,7 @@ class Channel(base.StdChannel): raise exc def exchange_delete(self, exchange_name, **kwargs): - """Delete an exchange specified by name + """Delete an exchange specified by name. :param exchange_name: The name of the exchange to be deleted. :type exchange_name: str @@ -1209,7 +1207,9 @@ class Channel(base.StdChannel): class Connection(object): - """Encapsulate a connection object for the + """Qpid Connection. + + Encapsulate a connection object for the :class:`~kombu.transport.qpid.Transport`. :param host: The host that connections should connect to. @@ -1331,7 +1331,7 @@ class Connection(object): return self._qpid_conn def close(self): - """Close the connection + """Close the connection. Closing the connection will close all associated session, senders, or receivers used by the Connection. diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 7e157704..4a2d62ab 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -69,6 +69,7 @@ Probably the key ({1!r}) has been removed from the Redis database. def get_redis_error_classes(): + """Return tuple of redis error classes.""" from redis import exceptions # This exception suddenly changed name between redis-py versions if hasattr(exceptions, 'InvalidData'): @@ -92,16 +93,18 @@ def get_redis_error_classes(): def get_redis_ConnectionError(): + """Return the redis ConnectionError exception class.""" from redis import exceptions return exceptions.ConnectionError class MutexHeld(Exception): - pass + """Raised when another party holds the lock.""" @contextmanager def Mutex(client, name, expire): + """The Redis lock implementation (probably shaky).""" lock_id = uuid() i_won = client.setnx(name, lock_id) try: @@ -131,6 +134,8 @@ def _after_fork_cleanup_channel(channel): class QoS(virtual.QoS): + """Redis Ack Emulation.""" + restore_at_shutdown = True def __init__(self, *args, **kwargs): @@ -223,6 +228,8 @@ class QoS(virtual.QoS): class MultiChannelPoller(object): + """Async I/O poller for Redis transport.""" + eventflags = READ | ERR #: Set by :meth:`get` while reading from the socket. @@ -285,7 +292,7 @@ class MultiChannelPoller(object): (channel, client, cmd) in self._chan_to_sock) def _register_BRPOP(self, channel): - """enable BRPOP mode for channel.""" + """Enable BRPOP mode for channel.""" ident = channel, channel.client, 'BRPOP' if not self._client_registered(channel, channel.client, 'BRPOP'): channel._in_poll = False @@ -294,7 +301,7 @@ class MultiChannelPoller(object): channel._brpop_start() def _register_LISTEN(self, channel): - """enable LISTEN mode for channel.""" + """Enable LISTEN mode for channel.""" if not self._client_registered(channel, channel.subclient, 'LISTEN'): channel._in_listen = False self._register(channel, channel.subclient, 'LISTEN') @@ -372,6 +379,8 @@ class MultiChannelPoller(object): class Channel(virtual.Channel): + """Redis Channel.""" + QoS = QoS _client = None @@ -979,6 +988,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """Redis Transport.""" + Channel = Channel polling_interval = None # disable sleep between unsuccessful polls. @@ -1083,5 +1094,7 @@ class SentinelChannel(Channel): class SentinelTransport(Transport): + """Redis Sentinel Transport.""" + default_port = 26379 Channel = SentinelChannel diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py index 372a32e9..916775c8 100644 --- a/kombu/transport/virtual/base.py +++ b/kombu/transport/virtual/base.py @@ -62,6 +62,7 @@ queue_binding_t = namedtuple('queue_binding_t', ( class Base64(object): + """Base64 codec.""" def encode(self, s): return bytes_to_str(base64.b64encode(str_to_bytes(s))) @@ -72,15 +73,14 @@ class Base64(object): class NotEquivalentError(Exception): """Entity declaration is not equivalent to the previous declaration.""" - pass class UndeliverableWarning(UserWarning): """The message could not be delivered to a queue.""" - pass class BrokerState(object): + """Broker state holds exchanges, queues and bindings.""" #: Mapping of exchange name to #: :class:`kombu.transport.virtual.exchange.ExchangeType` @@ -198,7 +198,7 @@ class QoS(object): return not pcount or len(self._delivered) - len(self._dirty) < pcount def can_consume_max_estimate(self): - """Returns the maximum number of messages allowed to be returned. + """Return the maximum number of messages allowed to be returned. Returns an estimated number of messages that a consumer may be allowed to consume at once from the broker. This is used for services where @@ -264,7 +264,7 @@ class QoS(object): return errors def restore_unacked_once(self, stderr=None): - """Restores all unacknowledged messages at shutdown/gc collect. + """Restore all unacknowledged messages at shutdown/gc collect. Note: Can only be called once for each instance, subsequent @@ -295,8 +295,9 @@ class QoS(object): state.restored = True def restore_visible(self, *args, **kwargs): - """Restore any pending unackwnowledged messages for visibility_timeout - style implementations. + """Restore any pending unackwnowledged messages. + + To be filled in for visibility_timeout style implementations. Note: This is implementation optional, and currently only @@ -306,6 +307,7 @@ class QoS(object): class Message(base.Message): + """Message object.""" def __init__(self, channel, payload, **kwargs): self._raw = payload @@ -342,7 +344,9 @@ class Message(base.Message): class AbstractChannel(object): - """This is an abstract class defining the channel methods + """Abstract channel interface. + + This is an abstract class defining the channel methods you'd usually want to implement in a virtual channel. Note: @@ -409,6 +413,7 @@ class Channel(AbstractChannel, base.StdChannel): connection (ConnectionT): The transport instance this channel is part of. """ + #: message class used. Message = Message @@ -617,7 +622,7 @@ class Channel(AbstractChannel, base.StdChannel): ) def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): - """Consume from `queue`""" + """Consume from `queue`.""" self._tag_to_queue[consumer_tag] = queue self._active_queues.append(queue) @@ -769,8 +774,10 @@ class Channel(AbstractChannel, base.StdChannel): raise NotImplementedError('virtual channels do not support flow.') def close(self): - """Close channel, cancel all consumers, and requeue unacked - messages.""" + """Close channel. + + Cancel all consumers, and requeue unacked messages. + """ if not self.closed: self.closed = True for consumer in list(self._consumers): @@ -823,8 +830,9 @@ class Channel(AbstractChannel, base.StdChannel): return self._cycle def _get_message_priority(self, message, reverse=False): - """Get priority from message and limit the value within a - boundary of 0 to 9. + """Get priority from message. + + The value is limited to within a boundary of 0 to 9. Note: Higher value has more priority. @@ -842,6 +850,7 @@ class Channel(AbstractChannel, base.StdChannel): class Management(base.Management): + """Base class for the AMQP management API.""" def __init__(self, transport): super(Management, self).__init__(transport) @@ -861,6 +870,7 @@ class Transport(base.Transport): Arguments: client (kombu.Connection): The client this is a transport for. """ + Channel = Channel Cycle = FairCycle Management = Management diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index c61d7147..b690d186 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -11,11 +11,14 @@ from kombu.utils.text import escape_regex class ExchangeType(object): - """Implements the specifics for an exchange type. + """Base class for exchanges. + + Implements the specifics for an exchange type. Arguments: channel (ChannelT): AMQ Channel. """ + type = None def __init__(self, channel): @@ -48,7 +51,11 @@ class ExchangeType(object): class DirectExchange(ExchangeType): - """The `direct` exchange routes based on exact routing keys.""" + """Direct exchange. + + The `direct` exchange routes based on exact routing keys. + """ + type = 'direct' def lookup(self, table, exchange, routing_key, default): @@ -65,9 +72,13 @@ class DirectExchange(ExchangeType): class TopicExchange(ExchangeType): - """The `topic` exchange routes messages based on words separated by + """Topic exchange. + + The `topic` exchange routes messages based on words separated by dots, using wildcard characters ``*`` (any single word), and ``#`` - (one or more words).""" + (one or more words). + """ + type = 'topic' #: map of wildcard to regex conversions @@ -102,8 +113,11 @@ class TopicExchange(ExchangeType): )) def _match(self, pattern, string): - """Same as :func:`re.match`, except the regex is compiled and cached, - then reused on subsequent matches with the same pattern.""" + """Match regular expression (cached). + + Same as :func:`re.match`, except the regex is compiled and cached, + then reused on subsequent matches with the same pattern. + """ try: compiled = self._compiled[pattern] except KeyError: @@ -112,7 +126,9 @@ class TopicExchange(ExchangeType): class FanoutExchange(ExchangeType): - """The `fanout` exchange implements broadcast messaging by delivering + """Fanout exchange. + + The `fanout` exchange implements broadcast messaging by delivering copies of all messages to all queues bound to the exchange. To support fanout the virtual channel needs to store the table @@ -123,6 +139,7 @@ class FanoutExchange(ExchangeType): See Also: the redis backend for an example implementation of these methods. """ + type = 'fanout' def lookup(self, table, exchange, routing_key, default): diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index d6844f67..70ff1743 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -76,6 +76,7 @@ __author__ = 'Mahendra M <mahendra.m@gmail.com>' class Channel(virtual.Channel): + """Zookeeper Channel.""" _client = None _queues = {} @@ -173,6 +174,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """Zookeeper Transport.""" + Channel = Channel polling_interval = 1 default_port = DEFAULT_PORT diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 6cf0a286..f06b5504 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -1,4 +1,4 @@ -"""DEPRECATED - Import from modules below""" +"""DEPRECATED - Import from modules below.""" from __future__ import absolute_import, print_function, unicode_literals from .collections import EqualityDict diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py index f0c8418f..1b893635 100644 --- a/kombu/utils/amq_manager.py +++ b/kombu/utils/amq_manager.py @@ -1,8 +1,10 @@ +"""AMQP Management API utilities.""" from __future__ import absolute_import, unicode_literals def get_manager(client, hostname=None, port=None, userid=None, password=None): + """Get pyrabbit manager.""" import pyrabbit opt = client.transport_options.get diff --git a/kombu/utils/collections.py b/kombu/utils/collections.py index 7d8fd1ee..105cc47d 100644 --- a/kombu/utils/collections.py +++ b/kombu/utils/collections.py @@ -3,8 +3,12 @@ from __future__ import absolute_import, unicode_literals class HashedSeq(list): - """type used for hash() to make sure the hash is not generated - multiple times.""" + """Hashed Sequence. + + Type used for hash() to make sure the hash is not generated + multiple times. + """ + __slots__ = 'hashvalue' def __init__(self, *seq): @@ -16,6 +20,7 @@ class HashedSeq(list): def eqhash(o): + """Call ``obj.__eqhash__``.""" try: return o.__eqhash__() except AttributeError: @@ -23,6 +28,7 @@ def eqhash(o): class EqualityDict(dict): + """Dict using the eq operator for keying.""" def __getitem__(self, key): h = eqhash(key) diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index acd438b8..80903b6d 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -1,3 +1,4 @@ +"""Python Compatibility Utilities.""" from __future__ import absolute_import, unicode_literals import numbers @@ -24,10 +25,19 @@ except ImportError: # pragma: no cover except ImportError: register_after_fork = None # noqa +try: + from typing import NamedTuple +except ImportError: + import collections + + def NamedTuple(name, fields): + """Typed version of collections.namedtuple.""" + return collections.namedtuple(name, [k for k, _ in fields]) + _environment = None def coro(gen): - + """Decorator to mark generator as co-routine.""" @wraps(gen) def wind_up(*args, **kwargs): it = gen(*args, **kwargs) @@ -63,12 +73,14 @@ def _detect_environment(): def detect_environment(): + """Detect the current environment: default, eventlet, or gevent.""" global _environment if _environment is None: _environment = _detect_environment() return _environment def entrypoints(namespace): + """Return setuptools entrypoints for namespace.""" try: from pkg_resources import iter_entry_points except ImportError: @@ -77,6 +89,7 @@ def entrypoints(namespace): def fileno(f): + """Get fileno from file-like object.""" if isinstance(f, numbers.Integral): return f return f.fileno() @@ -92,9 +105,8 @@ def maybe_fileno(f): @contextmanager def nested(*managers): # pragma: no cover + """Nest context managers.""" # flake8: noqa - """Combine multiple context managers into a single nested - context manager.""" exits = [] vars = [] exc = (None, None, None) diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py index d5118a6f..f52f8f39 100644 --- a/kombu/utils/debug.py +++ b/kombu/utils/debug.py @@ -13,6 +13,7 @@ __all__ = ['setup_logging', 'Logwrapped'] def setup_logging(loglevel=logging.DEBUG, loggers=['kombu.connection', 'kombu.channel']): + """Setup logging to stdout.""" for logger in loggers: l = get_logger(logger) l.addHandler(logging.StreamHandler()) @@ -21,6 +22,8 @@ def setup_logging(loglevel=logging.DEBUG, loggers=['kombu.connection', @python_2_unicode_compatible class Logwrapped(object): + """Wrap all object methods, to log on call.""" + __ignore = ('__enter__', '__exit__') def __init__(self, instance, logger=None, ident=None): diff --git a/kombu/utils/div.py b/kombu/utils/div.py index d73afe07..db52048c 100644 --- a/kombu/utils/div.py +++ b/kombu/utils/div.py @@ -1,3 +1,4 @@ +"""Div. Utilities.""" from __future__ import absolute_import, unicode_literals, print_function from .encoding import default_encode @@ -6,6 +7,7 @@ import sys def emergency_dump_state(state, open_file=open, dump=None, stderr=None): + """Dump message state to stdout or file.""" from pprint import pformat from tempfile import mktemp stderr = sys.stderr if stderr is None else stderr diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py index 7e1023fd..21b6d903 100644 --- a/kombu/utils/encoding.py +++ b/kombu/utils/encoding.py @@ -21,45 +21,54 @@ default_encoding_file = None def set_default_encoding_file(file): + """Set file used to get codec information.""" global default_encoding_file default_encoding_file = file def get_default_encoding_file(): + """Get file used to get codec information.""" return default_encoding_file if sys.platform.startswith('java'): # pragma: no cover def default_encoding(file=None): + """Get default encoding.""" return 'utf-8' else: def default_encoding(file=None): # noqa + """Get default encoding.""" file = file or get_default_encoding_file() return getattr(file, 'encoding', None) or sys.getfilesystemencoding() if is_py3k: # pragma: no cover def str_to_bytes(s): + """Convert str to bytes.""" if isinstance(s, str): return s.encode() return s def bytes_to_str(s): + """Convert bytes to str.""" if isinstance(s, bytes): return s.decode() return s def from_utf8(s, *args, **kwargs): + """Get str from utf-8 encoding.""" return s def ensure_bytes(s): + """Ensure s is bytes, not str.""" if not isinstance(s, bytes): return str_to_bytes(s) return s def default_encode(obj): + """Encode using default encoding.""" return obj str_t = str @@ -67,17 +76,21 @@ if is_py3k: # pragma: no cover else: def str_to_bytes(s): # noqa + """Convert str to bytes.""" if isinstance(s, unicode): return s.encode() return s def bytes_to_str(s): # noqa + """Convert bytes to str.""" return s def from_utf8(s, *args, **kwargs): # noqa + """Convert utf-8 to ASCII.""" return s.encode('utf-8', *args, **kwargs) def default_encode(obj, file=None): # noqa + """Get default encoding.""" return unicode(obj, default_encoding(file)) str_t = unicode @@ -91,6 +104,7 @@ except NameError: # pragma: no cover def safe_str(s, errors='replace'): + """Safe form of str(), void of unicode errors.""" s = bytes_to_str(s) if not isinstance(s, (text_t, bytes)): return safe_repr(s, errors) @@ -120,6 +134,7 @@ else: def safe_repr(o, errors='replace'): + """Safe form of repr, void of Unicode errors.""" try: return repr(o) except Exception: diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index f5454c5c..523d8d67 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -1,10 +1,4 @@ -""" -kombu.utils.eventio -=================== - -Evented IO support for multiple platforms. - -""" +"""Selector Utilities.""" from __future__ import absolute_import, unicode_literals import errno @@ -332,4 +326,5 @@ def _get_poller(): def poll(*args, **kwargs): + """Create new poller instance.""" return _get_poller()(*args, **kwargs) diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 114ecb95..55f39512 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -1,3 +1,4 @@ +"""Functional Utilities.""" from __future__ import absolute_import, unicode_literals import random @@ -146,7 +147,7 @@ class LRUCache(UserDict): def memoize(maxsize=None, keyfun=None, Cache=LRUCache): - + """Decorator to cache function return value.""" def _memoize(fun): mutex = threading.Lock() cache = Cache(limit=maxsize) @@ -233,15 +234,18 @@ class lazy(object): def maybe_evaluate(value): - """Evaluates if the value is a :class:`lazy` instance.""" + """Evaluate value only if value is a :class:`lazy` instance.""" if isinstance(value, lazy): return value.evaluate() return value def is_list(l, scalars=(Mapping, string_t), iters=(Iterable,)): - """Return true if the object is iterable (but not - if object is a mapping or string).""" + """Return true if the object is iterable. + + Note: + Returns false if object is a mapping or string. + """ return isinstance(l, iters) and not isinstance(l, scalars or ()) @@ -251,7 +255,7 @@ def maybe_list(l, scalars=(Mapping, string_t)): def dictfilter(d=None, **kw): - """Remove all keys from dict ``d`` whose value is :const:`None`""" + """Remove all keys from dict ``d`` whose value is :const:`None`.""" d = kw if d is None else (dict(d, **kw) if kw else d) return {k: v for k, v in items(d) if v is not None} diff --git a/kombu/utils/json.py b/kombu/utils/json.py index 40518af6..c9801705 100644 --- a/kombu/utils/json.py +++ b/kombu/utils/json.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""JSON Serialization Utilities.""" from __future__ import absolute_import, unicode_literals import datetime @@ -13,7 +14,7 @@ try: from django.utils.functional import Promise as DjangoPromise except ImportError: # pragma: no cover class DjangoPromise(object): # noqa - pass + """Dummy object.""" try: import simplejson as json @@ -33,6 +34,7 @@ _encoder_cls = type(json._default_encoder) class JSONEncoder(_encoder_cls): + """Kombu custom json encoder.""" def default(self, o, dates=(datetime.datetime, datetime.date), @@ -61,10 +63,12 @@ class JSONEncoder(_encoder_cls): def dumps(s, _dumps=json.dumps, cls=JSONEncoder, default_kwargs=_json_extra_kwargs, **kwargs): + """Serialize object to json string.""" return _dumps(s, cls=cls, **dict(default_kwargs, **kwargs)) def loads(s, _loads=json.loads, decode_bytes=IS_PY3): + """Deserialize json from string.""" # None of the json implementations supports decoding from # a buffer/memoryview, or even reading from a stream # (load is just loads(fp.read())) diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py index 13f52a12..65c574ba 100644 --- a/kombu/utils/limits.py +++ b/kombu/utils/limits.py @@ -65,8 +65,7 @@ class TokenBucket(object): return False def expected_time(self, tokens=1): - """Get the current exepected time for when a new token is to be - available. + """Return estimated time of token availability. Returns: float: the time in seconds. diff --git a/kombu/utils/objects.py b/kombu/utils/objects.py index b1f09e96..dced5c5e 100644 --- a/kombu/utils/objects.py +++ b/kombu/utils/objects.py @@ -1,9 +1,11 @@ +"""Object Utilities.""" from __future__ import absolute_import, unicode_literals class cached_property(object): - """Property descriptor that caches the return value - of the get function. + """Cached property descriptor. + + Caches the return value of the get method on first call. Examples: .. code-block:: python diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py index e5833fa1..d75710d1 100644 --- a/kombu/utils/scheduling.py +++ b/kombu/utils/scheduling.py @@ -1,4 +1,4 @@ -"""Consumer scheduling utilities.""" +"""Scheduling Utilities.""" from __future__ import absolute_import, unicode_literals from itertools import count @@ -20,8 +20,16 @@ CYCLE_ALIASES = { @python_2_unicode_compatible class FairCycle(object): - """Consume from a set of resources, where each resource gets - an equal chance to be consumed from.""" + """Cycle between resources. + + Consume from a set of resources, where each resource gets + an equal chance to be consumed from. + + Arguments: + fun (Callable): Callback to call. + resources (Sequence[Any]): List of resources. + predicate (type): Exception predicate. + """ def __init__(self, fun, resources, predicate=Exception): self.fun = fun @@ -41,6 +49,7 @@ class FairCycle(object): raise self.predicate() def get(self, callback, **kwargs): + """Get from next resource.""" succeeded = 0 for tried in count(0): # for infinity resource = self._next() @@ -55,22 +64,27 @@ class FairCycle(object): succeeded += 1 def close(self): + """Close cycle.""" pass def __repr__(self): + """``repr(cycle)``.""" return '<FairCycle: {self.pos}/{size} {self.resources}>'.format( self=self, size=len(self.resources)) class round_robin_cycle(object): + """Iterator that cycles between items in round-robin.""" def __init__(self, it=None): self.items = it if it is not None else [] def update(self, it): + """Update items from iterable.""" self.items[:] = it def consume(self, n): + """Consume n items.""" return self.items[:n] def rotate(self, last_used): @@ -84,16 +98,21 @@ class round_robin_cycle(object): class priority_cycle(round_robin_cycle): + """Cycle that repeats items in order.""" def rotate(self, last_used): + """Unused in this implementation.""" pass class sorted_cycle(priority_cycle): + """Cycle in sorted order.""" def consume(self, n): + """Consume n items.""" return sorted(self.items[:n]) def cycle_by_name(name): + """Get cycle class by name.""" return symbol_by_name(name, CYCLE_ALIASES) diff --git a/kombu/utils/text.py b/kombu/utils/text.py index 717b7ecc..cd9feedc 100644 --- a/kombu/utils/text.py +++ b/kombu/utils/text.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Text Utilities.""" from __future__ import absolute_import, unicode_literals from difflib import SequenceMatcher @@ -8,6 +9,8 @@ from kombu.five import string_t def escape_regex(p, white=''): + # type: (str, str) -> str + """Escape string for use within a regular expression.""" # what's up with re.escape? that code must be neglected or someting return ''.join(c if c.isalnum() or c in white else ('\\000' if c == '\000' else '\\' + c) @@ -15,6 +18,12 @@ def escape_regex(p, white=''): def fmatch_iter(needle, haystack, min_ratio=0.6): + # type: (str, Sequence[str], float) -> Iterator[Tuple[float, str]] + """Fuzzy match: iteratively. + + Yields: + Tuple: of ratio and key. + """ for key in haystack: ratio = SequenceMatcher(None, needle, key).ratio() if ratio >= min_ratio: @@ -22,6 +31,8 @@ def fmatch_iter(needle, haystack, min_ratio=0.6): def fmatch_best(needle, haystack, min_ratio=0.6): + # type: (str, Sequence[str], float) -> str + """Fuzzy match - Find best match (scalar).""" try: return sorted( fmatch_iter(needle, haystack, min_ratio), reverse=True, @@ -31,6 +42,8 @@ def fmatch_best(needle, haystack, min_ratio=0.6): def version_string_as_tuple(s): + # type (str) -> version_info_t + """Convert version string to version info tuple.""" v = _unpack_version(*s.split('.')) # X.Y.3a1 -> (X, Y, 3, 'a1') if isinstance(v.micro, string_t): @@ -42,10 +55,12 @@ def version_string_as_tuple(s): def _unpack_version(major, minor=0, micro=0, releaselevel='', serial=''): + # type: (int, int, int, str, str) -> version_info_t return version_info_t(int(major), int(minor), micro, releaselevel, serial) def _splitmicro(micro, releaselevel='', serial=''): + # type: (int, str, str) -> Tuple[int, str, str] for index, char in enumerate(micro): if not char.isdigit(): break diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 48cad7e4..7842bc28 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -1,5 +1,7 @@ +"""URL Utilities.""" from __future__ import absolute_import, unicode_literals +from collections import Mapping from functools import partial try: @@ -10,54 +12,82 @@ except ImportError: from kombu.five import string_t +from .compat import NamedTuple + safequote = partial(quote, safe='') -def _parse_url(url): - scheme = urlparse(url).scheme - schemeless = url[len(scheme) + 3:] - # parse with HTTP URL semantics - parts = urlparse('http://' + schemeless) - path = parts.path or '' - path = path[1:] if path and path[0] == '/' else path - return (scheme, unquote(parts.hostname or '') or None, parts.port, - unquote(parts.username or '') or None, - unquote(parts.password or '') or None, - unquote(path or '') or None, - dict(parse_qsl(parts.query))) +urlparts = NamedTuple('urlparts', [ + ('scheme', str), + ('hostname', str), + ('port', int), + ('username', str), + ('password', str), + ('path', str), + ('query', Mapping), +]) def parse_url(url): + # type: (str) -> Dict + """Parse URL into mapping of components.""" scheme, host, port, user, password, path, query = _parse_url(url) return dict(transport=scheme, hostname=host, port=port, userid=user, password=password, virtual_host=path, **query) +def url_to_parts(url): + # type: (str) -> urlparts + """Parse URL into :class:`urlparts` tuple of components.""" + scheme = urlparse(url).scheme + schemeless = url[len(scheme) + 3:] + # parse with HTTP URL semantics + parts = urlparse('http://' + schemeless) + path = parts.path or '' + path = path[1:] if path and path[0] == '/' else path + return urlparts( + scheme, + unquote(parts.hostname or '') or None, + parts.port, + unquote(parts.username or '') or None, + unquote(parts.password or '') or None, + unquote(path or '') or None, + dict(parse_qsl(parts.query)), + ) +_parse_url = url_to_parts # noqa + + def as_url(scheme, host=None, port=None, user=None, password=None, path=None, query=None, sanitize=False, mask='**'): - parts = ['{0}://'.format(scheme)] - if user or password: - if user: - parts.append(safequote(user)) - if password: - if sanitize: - parts.extend([':', mask] if mask else [':']) - else: - parts.extend([':', safequote(password)]) - parts.append('@') - parts.append(safequote(host) if host else '') - if port: - parts.extend([':', port]) - parts.extend(['/', path]) - return ''.join(str(part) for part in parts if part) + # type: (str, str, int, str, str, str, str, bool, str) -> str + """Generate URL from component parts.""" + parts = ['{0}://'.format(scheme)] + if user or password: + if user: + parts.append(safequote(user)) + if password: + if sanitize: + parts.extend([':', mask] if mask else [':']) + else: + parts.extend([':', safequote(password)]) + parts.append('@') + parts.append(safequote(host) if host else '') + if port: + parts.extend([':', port]) + parts.extend(['/', path]) + return ''.join(str(part) for part in parts if part) def sanitize_url(url, mask='**'): + # type: (str, str) -> str + """Return copy of URL with password removed.""" return as_url(*_parse_url(url), sanitize=True, mask=mask) def maybe_sanitize_url(url, mask='**'): + # type: (Any, str) -> Any + """Sanitize url, or do nothing if url undefined.""" if isinstance(url, string_t) and '://' in url: return sanitize_url(url, mask) return url diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py index 286f07fb..341475bf 100644 --- a/kombu/utils/uuid.py +++ b/kombu/utils/uuid.py @@ -1,11 +1,11 @@ +"""UUID utilities.""" from __future__ import absolute_import, unicode_literals from uuid import uuid4 def uuid(_uuid=uuid4): - """Generate a unique id, having - hopefully - a very small chance of - collision. + """Generate unique id in UUID4 format. See Also: For now this is provided by :func:`uuid.uuid4`. @@ -7,6 +7,11 @@ source-dir = docs/ build-dir = docs/_build all_files = 1 +[flake8] +# classes can be lowercase, arguments and variables can be uppercase +# whenever it makes the code more readable. +ignore = N806, N802, N801, N803 + [bdist_rpm] requires = amqp >= 1.4.5 @@ -1,5 +1,15 @@ [tox] -envlist = 2.7,pypy,3.4,3.5,pypy3,flake8,flakeplus,apicheck,cov +envlist = + 2.7 + pypy + 3.4 + 3.5 + pypy3 + flake8 + flakeplus + apicheck + pydocstyle + cov [testenv] sitepackages = False @@ -12,13 +22,13 @@ deps= 2.7,pypy,jython,cov: -r{toxinidir}/requirements/test-ci-py2.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt - flake8,flakeplus: -r{toxinidir}/requirements/pkgutils.txt + flake8,flakeplus,pydocstyle: -r{toxinidir}/requirements/pkgutils.txt commands = pip install -U -r{toxinidir}/requirements/dev.txt py.test -xv basepython = - 2.7,flakeplus,flake8,apicheck,linkcheck,cov: python2.7 + 2.7,flakeplus,flake8,apicheck,linkcheck,cov,pydocstyle: python2.7 3.3: python3.3 3.4: python3.4 3.5: python3.5 @@ -44,3 +54,7 @@ commands = [testenv:flakeplus] commands = flakeplus --2.7 {toxinidir}/kombu {toxinidir}/t + +[testenv:pydocstyle] +commands = + pydocstyle --ignore=D102,D104,D203,D105 kombu |