From c03335e167a320ffbb62c953e12dca8b659d6805 Mon Sep 17 00:00:00 2001 From: Stevie Gayet <87695919+stegayet@users.noreply.github.com> Date: Mon, 15 May 2023 04:11:34 +0200 Subject: chore(ci): fix lint job (#1718) * chore(ci): align python version between matrix and tox config * chore(pydocstyle): upgrade pydocstyle to 6.3.0 * chore(pydocstyle): exclude rules * fix(mypy): fix `[truthy-function]` error * fix(pydocstyle): fix D205 rule * fix(pydocstyle): fix D212 rule * fix(pydocstyle): fix D407 rule * fix(pydocstyle): fix D412 rule * fix(pydocstyle): fix D406 rule * fix(pydocstyle): fix D411 rule --------- Co-authored-by: Stevie Gayet --- kombu/abstract.py | 2 +- kombu/asynchronous/http/base.py | 12 ++++++++--- kombu/asynchronous/hub.py | 1 + kombu/asynchronous/semaphore.py | 3 +++ kombu/asynchronous/timer.py | 1 + kombu/clocks.py | 4 +++- kombu/common.py | 13 ++++++++++-- kombu/compression.py | 3 +++ kombu/connection.py | 37 ++++++++++++++++++++++++++++------ kombu/entity.py | 20 +++++++++++++++--- kombu/message.py | 13 ++++++++---- kombu/messaging.py | 16 ++++++++++++++- kombu/mixins.py | 6 ++++-- kombu/resource.py | 5 ++++- kombu/serialization.py | 31 ++++++++++++++++++++-------- kombu/transport/SQS.py | 22 +++++++++++++------- kombu/transport/__init__.py | 1 + kombu/transport/base.py | 6 ++++-- kombu/transport/consul.py | 13 +++++++++--- kombu/transport/etcd.py | 11 +++++++++- kombu/transport/mongodb.py | 1 + kombu/transport/redis.py | 2 +- kombu/transport/sqlalchemy/__init__.py | 3 ++- kombu/transport/virtual/base.py | 22 ++++++++++++++++---- kombu/transport/virtual/exchange.py | 10 ++++++--- kombu/utils/functional.py | 4 ++++ kombu/utils/imports.py | 3 ++- kombu/utils/limits.py | 10 ++++++--- kombu/utils/scheduling.py | 1 + kombu/utils/text.py | 3 ++- kombu/utils/uuid.py | 3 ++- requirements/pkgutils.txt | 2 +- setup.cfg | 2 +- tox.ini | 6 +++--- 34 files changed, 227 insertions(+), 65 deletions(-) diff --git a/kombu/abstract.py b/kombu/abstract.py index 48a917c9..6a689fe9 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -53,7 +53,7 @@ class Object: setattr(self, name, None) def as_dict(self, recurse: bool = False) -> dict[str, Any]: - def f(obj: Any, type: Callable[[Any], Any]) -> Any: + def f(obj: Any, type: Callable[[Any], Any] | None = None) -> Any: if recurse and isinstance(obj, Object): return obj.as_dict(recurse=True) return type(obj) if type and obj is not None else obj diff --git a/kombu/asynchronous/http/base.py b/kombu/asynchronous/http/base.py index 89be531f..345e07f6 100644 --- a/kombu/asynchronous/http/base.py +++ b/kombu/asynchronous/http/base.py @@ -44,10 +44,12 @@ class Request: """A HTTP Request. Arguments: + --------- url (str): The URL to request. method (str): The HTTP method to use (defaults to ``GET``). Keyword Arguments: + ----------------- headers (Dict, ~kombu.asynchronous.http.Headers): Optional headers for this request body (str): Optional body for this request. @@ -138,7 +140,8 @@ class Request: class Response: """HTTP Response. - Arguments: + Arguments + --------- request (~kombu.asynchronous.http.Request): See :attr:`request`. code (int): See :attr:`code`. headers (~kombu.asynchronous.http.Headers): See :attr:`headers`. @@ -146,7 +149,8 @@ class Response: effective_url (str): See :attr:`effective_url`. status (str): See :attr:`status`. - Attributes: + Attributes + ---------- request (~kombu.asynchronous.http.Request): object used to get this response. code (int): HTTP response code (e.g. 200, 404, or 500). @@ -182,7 +186,8 @@ class Response: def raise_for_error(self): """Raise if the request resulted in an HTTP error code. - Raises: + Raises + ------ :class:`~kombu.exceptions.HttpError` """ if self.error: @@ -193,6 +198,7 @@ class Response: """The full contents of the response body. Note: + ---- Accessing this property will evaluate the buffer and subsequent accesses will be cached. """ diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py index e5b1163c..c2d301d7 100644 --- a/kombu/asynchronous/hub.py +++ b/kombu/asynchronous/hub.py @@ -57,6 +57,7 @@ class Hub: """Event loop object. Arguments: + --------- timer (kombu.asynchronous.Timer): Specify custom timer instance. """ diff --git a/kombu/asynchronous/semaphore.py b/kombu/asynchronous/semaphore.py index 07fb8a09..940e6d86 100644 --- a/kombu/asynchronous/semaphore.py +++ b/kombu/asynchronous/semaphore.py @@ -26,6 +26,7 @@ class LaxBoundedSemaphore: range even if released more times than it was acquired. Example: + ------- >>> x = LaxBoundedSemaphore(2) >>> x.acquire(print, 'HELLO 1') @@ -61,6 +62,7 @@ class LaxBoundedSemaphore: until the semaphore is released. Arguments: + --------- callback (Callable): The callback to apply. *partial_args (Any): partial arguments to callback. """ @@ -77,6 +79,7 @@ class LaxBoundedSemaphore: """Release semaphore. Note: + ---- If there are any waiters this will apply the first waiter that is waiting for the resource (FIFO order). """ diff --git a/kombu/asynchronous/timer.py b/kombu/asynchronous/timer.py index ab0291da..a3b7fb77 100644 --- a/kombu/asynchronous/timer.py +++ b/kombu/asynchronous/timer.py @@ -149,6 +149,7 @@ class Timer: """Enter function into the scheduler. Arguments: + --------- entry (~kombu.asynchronous.timer.Entry): Item to enter. eta (datetime.datetime): Scheduled time. priority (int): Unused. diff --git a/kombu/clocks.py b/kombu/clocks.py index d02e8b32..b5ab0866 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -18,6 +18,7 @@ class timetuple(tuple): Can be used as part of a heap to keep events ordered. Arguments: + --------- clock (Optional[int]): Event clock value. timestamp (float): Event UNIX timestamp value. id (str): Event host id (e.g. ``hostname:pid``). @@ -85,7 +86,8 @@ class LamportClock: process receives a message, it resynchronizes its logical clock with the sender. - See Also: + See Also + -------- * `Lamport timestamps`_ * `Lamports distributed mutex`_ diff --git a/kombu/common.py b/kombu/common.py index c7b2d50a..c00ae0fe 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -66,6 +66,7 @@ class Broadcast(Queue): and both the queue and exchange is configured with auto deletion. Arguments: + --------- name (str): This is used as the name of the exchange. queue (str): By default a unique id is used for the queue name for every consumer. You can specify a custom @@ -203,7 +204,8 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False): ``eventloop`` is a generator. - Examples: + Examples + -------- >>> from kombu.common import eventloop >>> def run(conn): @@ -219,7 +221,8 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False): for _ in eventloop(connection, limit=1, timeout=1): pass - See Also: + See Also + -------- :func:`itermessages`, which is an event loop bound to one or more consumers, that yields any messages received. """ @@ -236,6 +239,7 @@ def send_reply(exchange, req, msg, """Send reply for request. Arguments: + --------- exchange (kombu.Exchange, str): Reply exchange req (~kombu.Message): Original request, a message with a ``reply_to`` property. @@ -309,6 +313,7 @@ def ignore_errors(conn, fun=None, *args, **kwargs): Note: + ---- Connection and channel errors should be properly handled, and not ignored. Using this function is only acceptable in a cleanup phase, like when a connection is lost or at shutdown. @@ -348,12 +353,14 @@ class QoS: """Thread safe increment/decrement of a channels prefetch_count. Arguments: + --------- callback (Callable): Function used to set new prefetch count, e.g. ``consumer.qos`` or ``channel.basic_qos``. Will be called with a single ``prefetch_count`` keyword argument. initial_value (int): Initial prefetch count value.. Example: + ------- >>> from kombu import Consumer, Connection >>> connection = Connection('amqp://') >>> consumer = Consumer(connection) @@ -396,6 +403,7 @@ class QoS: """Increment the value, but do not update the channels QoS. Note: + ---- The MainThread will be responsible for calling :meth:`update` when necessary. """ @@ -408,6 +416,7 @@ class QoS: """Decrement the value, but do not update the channels QoS. Note: + ---- The MainThread will be responsible for calling :meth:`update` when necessary. """ diff --git a/kombu/compression.py b/kombu/compression.py index f98c971b..2c1c8591 100644 --- a/kombu/compression.py +++ b/kombu/compression.py @@ -18,6 +18,7 @@ def register(encoder, decoder, content_type, aliases=None): """Register new compression method. Arguments: + --------- encoder (Callable): Function used to compress text. decoder (Callable): Function used to decompress previously compressed text. @@ -52,6 +53,7 @@ def compress(body, content_type): """Compress text. Arguments: + --------- body (AnyStr): The text to compress. content_type (str): mime-type of compression method to use. """ @@ -63,6 +65,7 @@ def decompress(body, content_type): """Decompress compressed text. Arguments: + --------- body (AnyStr): Previously compressed text to uncompress. content_type (str): mime-type of compression method used. """ diff --git a/kombu/connection.py b/kombu/connection.py index 0c9779b5..12a08fe0 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -64,6 +64,7 @@ class Connection: """A connection to the broker. Example: + ------- >>> Connection('amqp://guest:guest@localhost:5672//') >>> Connection('amqp://foo;amqp://bar', ... failover_strategy='round-robin') @@ -80,13 +81,16 @@ class Connection: ... }) Note: + ---- SSL currently only works with the py-amqp, and qpid transports. For other transports you can use stunnel. Arguments: + --------- URL (str, Sequence): Broker URL, or a list of URLs. Keyword Arguments: + ----------------- ssl (bool/dict): Use SSL to connect to the server. Default is ``False``. May not be supported by the specified transport. @@ -102,6 +106,7 @@ class Connection: around once per second. Note: + ---- The connection is established lazily when needed. If you need the connection to be established, then force it by calling :meth:`connect`:: @@ -233,9 +238,11 @@ class Connection: """Switch connection parameters to use a new URL or hostname. Note: + ---- Does not reconnect! Arguments: + --------- conn_str (str): either a hostname or URL. """ self.close() @@ -311,6 +318,7 @@ class Connection: this is a noop operation. Arguments: + --------- rate (int): Rate is how often the tick is called compared to the actual heartbeat value. E.g. if the heartbeat is set to 3 seconds, and the tick @@ -323,9 +331,11 @@ class Connection: """Wait for a single event from the server. Arguments: + --------- timeout (float): Timeout in seconds before we give up. - Raises: + Raises + ------ socket.timeout: if the timeout is exceeded. """ return self.transport.drain_events(self.connection, **kwargs) @@ -408,6 +418,7 @@ class Connection: specified. Arguments: + --------- errback (Callable): Optional callback called each time the connection can't be established. Arguments provided are the exception raised and the interval that will be @@ -491,6 +502,7 @@ class Connection: the function. Arguments: + --------- obj: The object to ensure an action on. fun (Callable): Method to apply. @@ -515,7 +527,8 @@ class Connection: regardless of the connection state. Must provide max_retries if this is specified. - Examples: + Examples + -------- >>> from kombu import Connection, Producer >>> conn = Connection('amqp://') >>> producer = Producer(conn) @@ -601,10 +614,12 @@ class Connection: If a ``channel`` is not provided, then one will be automatically acquired (remember to close it afterwards). - See Also: + See Also + -------- :meth:`ensure` for the full list of supported keyword arguments. Example: + ------- >>> channel = connection.channel() >>> try: ... ret, channel = connection.autoretry( @@ -730,14 +745,17 @@ class Connection: def Pool(self, limit=None, **kwargs): """Pool of connections. - See Also: + See Also + -------- :class:`ConnectionPool`. Arguments: + --------- limit (int): Maximum number of active connections. Default is no limit. Example: + ------- >>> connection = Connection('amqp://') >>> pool = connection.Pool(2) >>> c1 = pool.acquire() @@ -756,14 +774,17 @@ class Connection: def ChannelPool(self, limit=None, **kwargs): """Pool of channels. - See Also: + See Also + -------- :class:`ChannelPool`. Arguments: + --------- limit (int): Maximum number of active channels. Default is no limit. Example: + ------- >>> connection = Connection('amqp://') >>> pool = connection.ChannelPool(2) >>> c1 = pool.acquire() @@ -802,6 +823,7 @@ class Connection: also it will be used as the default routing key. Arguments: + --------- name (str, kombu.Queue): Name of the queue/or a queue. no_ack (bool): Disable acknowledgments. Default is false. queue_opts (Dict): Additional keyword arguments passed to the @@ -828,7 +850,8 @@ class Connection: Create new :class:`~kombu.simple.SimpleQueue` using a channel from this connection. - See Also: + See Also + -------- Same as :meth:`SimpleQueue`, but configured with buffering semantics. The resulting queue and exchange will not be durable, also auto delete is enabled. Messages will be transient (not @@ -901,6 +924,7 @@ class Connection: """The underlying connection object. Warning: + ------- This instance is transport specific, so do not depend on the interface of this object. """ @@ -925,6 +949,7 @@ class Connection: Created upon access and closed when the connection is closed. Note: + ---- Can be used for automatic channel handling when you only need one channel, and also it is the channel implicitly used if a connection is passed instead of a channel, to functions that diff --git a/kombu/entity.py b/kombu/entity.py index 2329e748..5e6419ae 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -42,6 +42,7 @@ class Exchange(MaybeChannelBound): """An Exchange declaration. Arguments: + --------- name (str): See :attr:`name`. type (str): See :attr:`type`. channel (kombu.Connection, ChannelT): See :attr:`channel`. @@ -51,7 +52,8 @@ class Exchange(MaybeChannelBound): arguments (Dict): See :attr:`arguments`. no_declare (bool): See :attr:`no_declare` - Attributes: + Attributes + ---------- name (str): Name of the exchange. Default is no name (the default exchange). @@ -190,6 +192,7 @@ class Exchange(MaybeChannelBound): """Bind the exchange to another exchange. Arguments: + --------- nowait (bool): If set the server will not respond, and the call will not block waiting for a response. Default is :const:`False`. @@ -221,6 +224,7 @@ class Exchange(MaybeChannelBound): """Create message instance to be sent with :meth:`publish`. Arguments: + --------- body (Any): Message body. delivery_mode (bool): Set custom delivery mode. @@ -259,6 +263,7 @@ class Exchange(MaybeChannelBound): """Publish message. Arguments: + --------- message (Union[kombu.Message, str, bytes]): Message to publish. routing_key (str): Message routing key. @@ -280,6 +285,7 @@ class Exchange(MaybeChannelBound): """Delete the exchange declaration on server. Arguments: + --------- if_unused (bool): Delete only if the exchange has no bindings. Default is :const:`False`. nowait (bool): If set the server will not respond, and a @@ -322,6 +328,7 @@ class binding(Object): """Represents a queue or exchange binding. Arguments: + --------- exchange (Exchange): Exchange to bind to. routing_key (str): Routing key used as binding key. arguments (Dict): Arguments for bind operation. @@ -376,6 +383,7 @@ class Queue(MaybeChannelBound): """A Queue declaration. Arguments: + --------- name (str): See :attr:`name`. exchange (Exchange, str): See :attr:`exchange`. routing_key (str): See :attr:`routing_key`. @@ -394,7 +402,8 @@ class Queue(MaybeChannelBound): max_length_bytes (int): See :attr:`max_length_bytes`. max_priority (int): See :attr:`max_priority`. - Attributes: + Attributes + ---------- name (str): Name of the queue. Default is no name (default queue destination). @@ -628,6 +637,7 @@ class Queue(MaybeChannelBound): """Declare queue on the server. Arguments: + --------- nowait (bool): Do not wait for a reply. passive (bool): If set, the server will not create the queue. The client can use this to check whether a queue exists @@ -684,11 +694,13 @@ class Queue(MaybeChannelBound): specific types of applications where synchronous functionality is more important than performance. - Returns: + Returns + ------- ~kombu.Message: if a message was available, or :const:`None` otherwise. Arguments: + --------- no_ack (bool): If enabled the broker will automatically ack messages. accept (Set[str]): Custom list of accepted content types. @@ -717,6 +729,7 @@ class Queue(MaybeChannelBound): until the client cancels them. Arguments: + --------- consumer_tag (str): Unique identifier for the consumer. The consumer tag is local to a connection, so two clients can use the same consumer tags. If this field is empty @@ -747,6 +760,7 @@ class Queue(MaybeChannelBound): """Delete the queue. Arguments: + --------- if_unused (bool): If set, the server will only delete the queue if it has no consumers. A channel error will be raised if the queue has consumers. diff --git a/kombu/message.py b/kombu/message.py index f2af1686..3c9d07ae 100644 --- a/kombu/message.py +++ b/kombu/message.py @@ -19,7 +19,7 @@ class Message: """Base class for received messages. Keyword Arguments: - + ----------------- channel (ChannelT): If message was received, this should be the channel that the message was received on. @@ -103,7 +103,8 @@ class Message: This will remove the message from the queue. - Raises: + Raises + ------ MessageStateError: If the message has already been acknowledged/requeued/rejected. """ @@ -148,7 +149,8 @@ class Message: The message will be discarded by the server. - Raises: + Raises + ------ MessageStateError: If the message has already been acknowledged/requeued/rejected. """ @@ -166,10 +168,12 @@ class Message: """Reject this message and put it back on the queue. Warning: + ------- You must not use this method as a means of selecting messages to process. - Raises: + Raises + ------ MessageStateError: If the message has already been acknowledged/requeued/rejected. """ @@ -189,6 +193,7 @@ class Message: Returning the original python structure sent by the publisher. Note: + ---- The return value is memoized, use `_decode` to force re-evaluation. """ diff --git a/kombu/messaging.py b/kombu/messaging.py index 2b600224..fed7b3f0 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -23,6 +23,7 @@ class Producer: """Message Producer. Arguments: + --------- channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. @@ -93,6 +94,7 @@ class Producer: """Declare the exchange. Note: + ---- This happens automatically at instantiation when the :attr:`auto_declare` flag is enabled. """ @@ -126,6 +128,7 @@ class Producer: """Publish message to the specified exchange. Arguments: + --------- body (Any): Message body. routing_key (str): Message routing key. delivery_mode (enum): See :attr:`delivery_mode`. @@ -293,6 +296,7 @@ class Consumer: """Message consumer. Arguments: + --------- channel (kombu.Connection, ChannelT): see :attr:`channel`. queues (Sequence[kombu.Queue]): see :attr:`queues`. no_ack (bool): see :attr:`no_ack`. @@ -426,6 +430,7 @@ class Consumer: """Declare queues, exchanges and bindings. Note: + ---- This is done automatically at instantiation when :attr:`auto_declare` is set. """ @@ -436,6 +441,7 @@ class Consumer: """Register a new callback to be called when a message is received. Note: + ---- The signature of the callback needs to accept two arguments: `(body, message)`, which is the decoded message body and the :class:`~kombu.Message` instance. @@ -464,6 +470,7 @@ class Consumer: """Add a queue to the list of queues to consume from. Note: + ---- This will not start consuming from the queue, for that you will have to call :meth:`consume` after. """ @@ -482,6 +489,7 @@ class Consumer: use :meth:`cancel_by_queue`). Arguments: + --------- no_ack (bool): See :attr:`no_ack`. """ queues = list(self._queues.values()) @@ -497,6 +505,7 @@ class Consumer: """End all active queue consumers. Note: + ---- This does not affect already delivered messages, but it does mean the server will not send any more messages for this consumer. """ @@ -530,6 +539,7 @@ class Consumer: """Purge messages from all queues. Warning: + ------- This will *delete all ready messages*, there is no undo operation. """ return sum(queue.purge() for queue in self._queues.values()) @@ -559,6 +569,7 @@ class Consumer: The prefetch window is Ignored if the :attr:`no_ack` option is set. Arguments: + --------- prefetch_size (int): Specify the prefetch window in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and @@ -582,6 +593,7 @@ class Consumer: on the specified channel. Arguments: + --------- requeue (bool): By default the messages will be redelivered to the original recipient. With `requeue` set to true, the server will attempt to requeue the message, potentially then @@ -595,10 +607,12 @@ class Consumer: This dispatches to the registered :attr:`callbacks`. Arguments: + --------- body (Any): The decoded message body. message (~kombu.Message): The message instance. - Raises: + Raises + ------ NotImplementedError: If no consumer callbacks have been registered. """ diff --git a/kombu/mixins.py b/kombu/mixins.py index f1b3c1c9..14c1c1b9 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -49,6 +49,7 @@ class ConsumerMixin: channels can be used for different QoS requirements. Example: + ------- .. code-block:: python class Worker(ConsumerMixin): @@ -65,8 +66,8 @@ class ConsumerMixin: print('Got task: {0!r}'.format(body)) message.ack() - Methods: - + Methods + ------- * :meth:`extra_context` Optional extra context manager that will be entered @@ -257,6 +258,7 @@ class ConsumerProducerMixin(ConsumerMixin): publishing messages. Example: + ------- .. code-block:: python class Worker(ConsumerProducerMixin): diff --git a/kombu/resource.py b/kombu/resource.py index 53ba1145..9f75eb06 100644 --- a/kombu/resource.py +++ b/kombu/resource.py @@ -67,12 +67,14 @@ class Resource: """Acquire resource. Arguments: + --------- block (bool): If the limit is exceeded, then block until there is an available item. timeout (float): Timeout to wait if ``block`` is true. Default is :const:`None` (forever). - Raises: + Raises + ------ LimitExceeded: if block is false and the limit has been exceeded. """ if self._closed: @@ -103,6 +105,7 @@ class Resource: """Release resource so it can be used by another thread. Warnings: + -------- The caller is responsible for discarding the object, and to never use the resource again. A new resource must be acquired if so needed. diff --git a/kombu/serialization.py b/kombu/serialization.py index 5cddeb0b..f455dfaa 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -72,6 +72,7 @@ class SerializerRegistry: """Register a new encoder/decoder. Arguments: + --------- name (str): A convenience name for the serialization method. encoder (callable): A method that will be passed a python data @@ -114,9 +115,11 @@ class SerializerRegistry: """Unregister registered encoder/decoder. Arguments: + --------- name (str): Registered serialization method name. - Raises: + Raises + ------ SerializerNotInstalled: If a serializer by that name cannot be found. """ @@ -134,11 +137,13 @@ class SerializerRegistry: """Set the default serialization method used by this library. Arguments: + --------- name (str): The name of the registered serialization method. For example, `json` (default), `pickle`, `yaml`, `msgpack`, or any custom methods registered using :meth:`register`. - Raises: + Raises + ------ SerializerNotInstalled: If the serialization method requested is not available. """ @@ -156,6 +161,7 @@ class SerializerRegistry: as an AMQP message body. Arguments: + --------- data (List, Dict, str): The message data to send. serializer (str): An optional string representing @@ -171,12 +177,14 @@ class SerializerRegistry: serialization method will be used even if a :class:`str` or :class:`unicode` object is passed in. - Returns: + Returns + ------- Tuple[str, str, str]: A three-item tuple containing the content type (e.g., `application/json`), content encoding, (e.g., `utf-8`) and a string containing the serialized data. - Raises: + Raises + ------ SerializerNotInstalled: If the serialization method requested is not available. """ @@ -220,6 +228,7 @@ class SerializerRegistry: based on `content_type`. Arguments: + --------- data (bytes, buffer, str): The message data to deserialize. content_type (str): The content-type of the data. @@ -230,10 +239,12 @@ class SerializerRegistry: accept (Set): List of content-types to accept. - Raises: + Raises + ------ ContentDisallowed: If the content-type is not accepted. - Returns: + Returns + ------- Any: The unserialized data. """ content_type = (bytes_to_str(content_type) if content_type @@ -343,7 +354,8 @@ def register_pickle(): def register_msgpack(): """Register msgpack serializer. - See Also: + See Also + -------- https://msgpack.org/. """ pack = unpack = None @@ -391,6 +403,7 @@ def enable_insecure_serializers(choices=NOTSET): """Enable serializers that are considered to be unsafe. Note: + ---- Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you can also specify a list of serializers (by name or content type) to enable. @@ -411,6 +424,7 @@ def disable_insecure_serializers(allowed=NOTSET): or you can specify a list of deserializers to allow. Note: + ---- Producers will still be able to serialize data in these formats, but consumers will not accept incoming data using the untrusted content types. @@ -434,7 +448,8 @@ for ep, args in entrypoints('kombu.serializers'): # pragma: no cover def prepare_accept_content(content_types, name_to_type=None): """Replace aliases of content_types with full names from registry. - Raises: + Raises + ------ SerializerNotInstalled: If the serialization method requested is not available. """ diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 8a935cdd..019ac554 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -305,7 +305,8 @@ class Channel(virtual.Channel): def drain_events(self, timeout=None, callback=None, **kwargs): """Return a single payload message from one of our queues. - Raises: + Raises + ------ Queue.Empty: if no messages available. """ # If we're not allowed to consume or have no consumers, raise Empty @@ -318,7 +319,8 @@ class Channel(virtual.Channel): def _reset_cycle(self): """Reset the consume cycle. - Returns: + Returns + ------- FairCycle: object that points to our _get_bulk() method rather than the standard _get() method. This allows for multiple messages to be returned at once from SQS ( @@ -341,11 +343,12 @@ class Channel(virtual.Channel): return self.entity_name(self.queue_name_prefix + queue_name) def _new_queue(self, queue, **kwargs): - """ - Ensure a queue with given name exists in SQS. + """Ensure a queue with given name exists in SQS. + Arguments: + --------- queue (str): the AMQP queue name - Returns: + Returns str: the SQS queue URL """ # Translate to SQS name for consistency with initial @@ -481,10 +484,12 @@ class Channel(virtual.Channel): the 'ack' settings for that queue. Arguments: + --------- messages (SQSMessage): A list of SQS Message objects. queue (str): Name representing the queue they came from. - Returns: + Returns + ------- List: A list of Payload objects """ q_url = self._new_queue(queue) @@ -501,15 +506,18 @@ class Channel(virtual.Channel): prefetch_count). Note: + ---- Ignores QoS limits so caller is responsible for checking that we are allowed to consume at least one message from the queue. get_bulk will then ask QoS for an estimate of the number of extra messages that we can consume. Arguments: + --------- queue (str): The queue name to pull from. - Returns: + Returns + ------- List[Message] """ # drain_events calls `can_consume` first, consuming diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 8a217691..e00e3386 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -52,6 +52,7 @@ def resolve_transport(transport: str | None = None) -> str | None: """Get transport by name. Arguments: + --------- transport (Union[str, type]): This can be either an actual transport class, or the fully qualified path to a transport class, or the alias of a transport. diff --git a/kombu/transport/base.py b/kombu/transport/base.py index ec4c0aca..e52bcc60 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -54,7 +54,8 @@ def to_rabbitmq_queue_arguments(arguments, **options): max_priority (int): Max priority steps for queue. This will be converted to ``x-max-priority`` int. - Returns: + Returns + ------- Dict: RabbitMQ compatible queue arguments. """ prepared = dictfilter(dict( @@ -95,7 +96,8 @@ class StdChannel: def after_reply_message_received(self, queue): """Callback called after RPC reply received. - Notes: + Notes + ----- Reply queue semantics: can be used to delete the queue after transient reply message received. """ diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py index 7ace52f6..370a4302 100644 --- a/kombu/transport/consul.py +++ b/kombu/transport/consul.py @@ -97,9 +97,11 @@ class Channel(virtual.Channel): read-consistency between the nodes. Arguments: + --------- queue (str): The name of the Queue. - Returns: + Returns + ------- str: The ID of the session. """ try: @@ -134,13 +136,16 @@ class Channel(virtual.Channel): means that they have to wait before the lock is released. Arguments: + --------- queue (str): The name of the Queue. raising (Exception): Set custom lock error class. - Raises: + Raises + ------ LockError: if the lock cannot be acquired. - Returns: + Returns + ------- bool: success? """ self._acquire_lock(queue, raising=raising) @@ -170,6 +175,7 @@ class Channel(virtual.Channel): It does so by simply removing the lock key in Consul. Arguments: + --------- queue (str): The name of the queue we want to release the lock from. """ @@ -182,6 +188,7 @@ class Channel(virtual.Channel): Will release all locks it still might hold. Arguments: + --------- queue (str): The name of the Queue. """ logger.debug('Destroying session %s', self.queues[queue]['session_id']) diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py index 2ab85841..aa4771b7 100644 --- a/kombu/transport/etcd.py +++ b/kombu/transport/etcd.py @@ -78,6 +78,7 @@ class Channel(virtual.Channel): """Create and return the `queue` with the proper prefix. Arguments: + --------- queue (str): The name of the queue. """ return f'{self.prefix}/{queue}' @@ -93,6 +94,7 @@ class Channel(virtual.Channel): means that they have to wait before the lock is released. Arguments: + --------- queue (str): The name of the queue. """ lock = etcd.Lock(self.client, queue) @@ -109,6 +111,7 @@ class Channel(virtual.Channel): """Create a new `queue` if the `queue` doesn't already exist. Arguments: + --------- queue (str): The name of the queue. """ self.queues[queue] = queue @@ -123,7 +126,8 @@ class Channel(virtual.Channel): def _has_queue(self, queue, **kwargs): """Verify that queue exists. - Returns: + Returns + ------- bool: Should return :const:`True` if the queue exists or :const:`False` otherwise. """ @@ -137,6 +141,7 @@ class Channel(virtual.Channel): """Delete a `queue`. Arguments: + --------- queue (str): The name of the queue. """ self.queues.pop(queue, None) @@ -148,6 +153,7 @@ class Channel(virtual.Channel): This simply writes a key to the Etcd store Arguments: + --------- queue (str): The name of the queue. payload (dict): Message data which will be dumped to etcd. """ @@ -166,6 +172,7 @@ class Channel(virtual.Channel): only one node reads at the same time. This is for read consistency Arguments: + --------- queue (str): The name of the queue. timeout (int): Optional seconds to wait for a response. """ @@ -196,6 +203,7 @@ class Channel(virtual.Channel): """Remove all `message`s from a `queue`. Arguments: + --------- queue (str): The name of the queue. """ with self._queue_lock(queue): @@ -207,6 +215,7 @@ class Channel(virtual.Channel): """Return the size of the `queue`. Arguments: + --------- queue (str): The name of the queue. """ with self._queue_lock(queue): diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 9eef6b57..5c6877ca 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -448,6 +448,7 @@ class Channel(virtual.Channel): """Get expiration header named `argument` of queue definition. Note: + ---- `queue` must be either queue name or options itself. """ if isinstance(queue, str): diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 6cbfbdcf..fac15014 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -1362,7 +1362,7 @@ class SentinelChannel(Channel): * `master_name` - name of the redis group to poll Example: - + ------- .. code-block:: python >>> import kombu diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index a61c8ea8..ee2b0e65 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -20,8 +20,9 @@ Connection String sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation. -Examples: +Examples +-------- .. code-block:: # PostgreSQL with default driver diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py index ec47ecb8..4b138aa9 100644 --- a/kombu/transport/virtual/base.py +++ b/kombu/transport/virtual/base.py @@ -159,6 +159,7 @@ class QoS: Only supports `prefetch_count` at this point. Arguments: + --------- channel (ChannelT): Connection channel. prefetch_count (int): Initial prefetch count (defaults to 0). """ @@ -211,7 +212,8 @@ class QoS: bulk 'get message' calls are preferred to many individual 'get message' calls - like SQS. - Returns: + Returns + ------- int: greater than zero. """ pcount = self.prefetch_count @@ -273,6 +275,7 @@ class QoS: """Restore all unacknowledged messages at shutdown/gc collect. Note: + ---- Can only be called once for each instance, subsequent calls will be ignored. """ @@ -306,6 +309,7 @@ class QoS: To be filled in for visibility_timeout style implementations. Note: + ---- This is implementation optional, and currently only used by the Redis transport. """ @@ -355,6 +359,7 @@ class AbstractChannel: you'd usually want to implement in a virtual channel. Note: + ---- Do not subclass directly, but rather inherit from :class:`Channel`. """ @@ -379,6 +384,7 @@ class AbstractChannel: """Delete `queue`. Note: + ---- This just purges the queue, if you need to do more you can override this method. """ @@ -388,6 +394,7 @@ class AbstractChannel: """Create new queue. Note: + ---- Your transport can override this method if it needs to do something whenever a new queue is declared. """ @@ -395,7 +402,8 @@ class AbstractChannel: def _has_queue(self, queue, **kwargs): """Verify that queue exists. - Returns: + Returns + ------- bool: Should return :const:`True` if the queue exists or :const:`False` otherwise. """ @@ -414,6 +422,7 @@ class Channel(AbstractChannel, base.StdChannel): """Virtual channel. Arguments: + --------- connection (ConnectionT): The transport instance this channel is part of. """ @@ -675,6 +684,7 @@ class Channel(AbstractChannel, base.StdChannel): """Change QoS settings for this channel. Note: + ---- Only `prefetch_count` is supported. """ self.qos.prefetch_count = prefetch_count @@ -697,7 +707,8 @@ class Channel(AbstractChannel, base.StdChannel): def _lookup(self, exchange, routing_key, default=None): """Find all queues matching `routing_key` for the given `exchange`. - Returns: + Returns + ------- list[str]: queue names -- must return `[default]` if default is set and no queues matched. """ @@ -765,7 +776,8 @@ class Channel(AbstractChannel, base.StdChannel): def flow(self, active=True): """Enable/disable message flow. - Raises: + Raises + ------ NotImplementedError: as flow is not implemented by the base virtual implementation. """ @@ -838,6 +850,7 @@ class Channel(AbstractChannel, base.StdChannel): The value is limited to within a boundary of 0 to 9. Note: + ---- Higher value has more priority. """ try: @@ -887,6 +900,7 @@ class Transport(base.Transport): """Virtual transport. Arguments: + --------- client (kombu.Connection): The client this is a transport for. """ diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index b70544cd..e7cfd373 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -17,6 +17,7 @@ class ExchangeType: Implements the specifics for an exchange type. Arguments: + --------- channel (ChannelT): AMQ Channel. """ @@ -28,7 +29,8 @@ class ExchangeType: def lookup(self, table, exchange, routing_key, default): """Lookup all queues matching `routing_key` in `exchange`. - Returns: + Returns + ------- str: queue name, or 'default' if no queues matched. """ raise NotImplementedError('subclass responsibility') @@ -36,7 +38,8 @@ class ExchangeType: def prepare_bind(self, queue, exchange, routing_key, arguments): """Prepare queue-binding. - Returns: + Returns + ------- Tuple[str, Pattern, str]: of `(routing_key, regex, queue)` to be stored for bindings to this exchange. """ @@ -137,7 +140,8 @@ class FanoutExchange(ExchangeType): attribute is set to true, and the `Channel._queue_bind` and `Channel.get_table` methods are implemented. - See Also: + See Also + -------- the redis backend for an example implementation of these methods. """ diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 6beb17d7..29c3d96d 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -45,6 +45,7 @@ class LRUCache(UserDict): """LRU Cache implementation using a doubly linked list to track access. Arguments: + --------- limit (int): The maximum number of keys to keep in the cache. When a new key is inserted and the limit has been exceeded, the *Least Recently Used* key will be discarded from the @@ -221,6 +222,7 @@ def is_list(obj, scalars=(Mapping, str), iters=(Iterable,)): """Return true if the object is iterable. Note: + ---- Returns false if object is a mapping or string. """ return isinstance(obj, iters) and not isinstance(obj, scalars or ()) @@ -279,11 +281,13 @@ def retry_over_time(fun, catch, args=None, kwargs=None, errback=None, is increased for every retry until the max seconds is reached. Arguments: + --------- fun (Callable): The function to try catch (Tuple[BaseException]): Exceptions to catch, can be either tuple or a single exception class. Keyword Arguments: + ----------------- args (Tuple): Positional arguments passed on to the function. kwargs (Dict): Keyword arguments passed on to the function. errback (Callable): Callback for when an exception in ``catch`` diff --git a/kombu/utils/imports.py b/kombu/utils/imports.py index 8752fa1a..c9aff884 100644 --- a/kombu/utils/imports.py +++ b/kombu/utils/imports.py @@ -28,7 +28,8 @@ def symbol_by_name(name, aliases=None, imp=None, package=None, If `aliases` is provided, a dict containing short name/long name mappings, the name is looked up in the aliases first. - Examples: + Examples + -------- >>> symbol_by_name('celery.concurrency.processes.TaskPool') diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py index 36d11f1f..413c6c62 100644 --- a/kombu/utils/limits.py +++ b/kombu/utils/limits.py @@ -11,13 +11,15 @@ __all__ = ('TokenBucket',) class TokenBucket: """Token Bucket Algorithm. - See Also: + See Also + -------- https://en.wikipedia.org/wiki/Token_Bucket Most of this code was stolen from an entry in the ASPN Python Cookbook: https://code.activestate.com/recipes/511490/ Warning: + ------- Thread Safety: This implementation is not thread safe. Access to a `TokenBucket` instance should occur within the critical section of any multithreaded code. @@ -51,7 +53,8 @@ class TokenBucket: def can_consume(self, tokens=1): """Check if one or more tokens can be consumed. - Returns: + Returns + ------- bool: true if the number of tokens can be consumed from the bucket. If they can be consumed, a call will also consume the requested number of tokens from the bucket. @@ -67,7 +70,8 @@ class TokenBucket: def expected_time(self, tokens=1): """Return estimated time of token availability. - Returns: + Returns + ------- float: the time in seconds. """ _tokens = self._get_tokens() diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py index 94286be8..e4bff7b4 100644 --- a/kombu/utils/scheduling.py +++ b/kombu/utils/scheduling.py @@ -24,6 +24,7 @@ class FairCycle: an equal chance to be consumed from. Arguments: + --------- fun (Callable): Callback to call. resources (Sequence[Any]): List of resources. predicate (type): Exception predicate. diff --git a/kombu/utils/text.py b/kombu/utils/text.py index fea53347..a6558cfc 100644 --- a/kombu/utils/text.py +++ b/kombu/utils/text.py @@ -22,7 +22,8 @@ def escape_regex(p, white=''): def fmatch_iter(needle: str, haystack: Iterable[str], min_ratio: float = 0.6) -> Iterator[tuple[float, str]]: """Fuzzy match: iteratively. - Yields: + Yields + ------ Tuple: of ratio and key. """ for key in haystack: diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py index 9f77dad9..917eccf3 100644 --- a/kombu/utils/uuid.py +++ b/kombu/utils/uuid.py @@ -8,7 +8,8 @@ from uuid import UUID, uuid4 def uuid(_uuid: Callable[[], UUID] = uuid4) -> str: """Generate unique id in UUID4 format. - See Also: + See Also + -------- For now this is provided by :func:`uuid.uuid4`. """ return str(_uuid()) diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt index 3ab5ae7a..63ecff12 100644 --- a/requirements/pkgutils.txt +++ b/requirements/pkgutils.txt @@ -4,5 +4,5 @@ flake8==5.0.4 tox>=4.4.8 sphinx2rst>=1.0 bumpversion -pydocstyle==1.1.1 +pydocstyle==6.3.0 mypy==1.2.0 diff --git a/setup.cfg b/setup.cfg index d5c857d5..c087af81 100644 --- a/setup.cfg +++ b/setup.cfg @@ -48,7 +48,7 @@ files = [pep257] -ignore = D102,D104,D203,D105,D213 +ignore = D102,D107,D104,D203,D105,D213,D401,D413,D417 [bdist_rpm] requires = amqp >= 5. diff --git a/tox.ini b/tox.ini index 38345e22..85288018 100644 --- a/tox.ini +++ b/tox.ini @@ -45,9 +45,9 @@ basepython = pypy3.9: pypy3.9 pypy3.8: pypy3.8 3.7: python3.7 - 3.8,mypy: python3.8 - 3.9,apicheck,pydocstyle,flake8,linkcheck,cov: python3.9 - 3.10: python3.10 + 3.8: python3.8 + 3.9: python3.9 + 3.10,apicheck,pydocstyle,flake8,linkcheck,cov,mypy: python3.10 3.11: python3.11 install_command = python -m pip --disable-pip-version-check install {opts} {packages} -- cgit v1.2.1