summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStevie Gayet <87695919+stegayet@users.noreply.github.com>2023-05-15 04:11:34 +0200
committerGitHub <noreply@github.com>2023-05-15 08:11:34 +0600
commitc03335e167a320ffbb62c953e12dca8b659d6805 (patch)
treeb76627f0282d66cb5399ea38b70bac7360066c4d
parent8426032ae25b6e35bbb72fe084281a447d30991f (diff)
downloadkombu-c03335e167a320ffbb62c953e12dca8b659d6805.tar.gz
chore(ci): fix lint job (#1718)
* chore(ci): align python version between matrix and tox config * chore(pydocstyle): upgrade pydocstyle to 6.3.0 * chore(pydocstyle): exclude rules * fix(mypy): fix `[truthy-function]` error * fix(pydocstyle): fix D205 rule * fix(pydocstyle): fix D212 rule * fix(pydocstyle): fix D407 rule * fix(pydocstyle): fix D412 rule * fix(pydocstyle): fix D406 rule * fix(pydocstyle): fix D411 rule --------- Co-authored-by: Stevie Gayet <stegayet@users.noreply.github.com>
-rw-r--r--kombu/abstract.py2
-rw-r--r--kombu/asynchronous/http/base.py12
-rw-r--r--kombu/asynchronous/hub.py1
-rw-r--r--kombu/asynchronous/semaphore.py3
-rw-r--r--kombu/asynchronous/timer.py1
-rw-r--r--kombu/clocks.py4
-rw-r--r--kombu/common.py13
-rw-r--r--kombu/compression.py3
-rw-r--r--kombu/connection.py37
-rw-r--r--kombu/entity.py20
-rw-r--r--kombu/message.py13
-rw-r--r--kombu/messaging.py16
-rw-r--r--kombu/mixins.py6
-rw-r--r--kombu/resource.py5
-rw-r--r--kombu/serialization.py31
-rw-r--r--kombu/transport/SQS.py22
-rw-r--r--kombu/transport/__init__.py1
-rw-r--r--kombu/transport/base.py6
-rw-r--r--kombu/transport/consul.py13
-rw-r--r--kombu/transport/etcd.py11
-rw-r--r--kombu/transport/mongodb.py1
-rw-r--r--kombu/transport/redis.py2
-rw-r--r--kombu/transport/sqlalchemy/__init__.py3
-rw-r--r--kombu/transport/virtual/base.py22
-rw-r--r--kombu/transport/virtual/exchange.py10
-rw-r--r--kombu/utils/functional.py4
-rw-r--r--kombu/utils/imports.py3
-rw-r--r--kombu/utils/limits.py10
-rw-r--r--kombu/utils/scheduling.py1
-rw-r--r--kombu/utils/text.py3
-rw-r--r--kombu/utils/uuid.py3
-rw-r--r--requirements/pkgutils.txt2
-rw-r--r--setup.cfg2
-rw-r--r--tox.ini6
34 files changed, 227 insertions, 65 deletions
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 48a917c9..6a689fe9 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -53,7 +53,7 @@ class Object:
setattr(self, name, None)
def as_dict(self, recurse: bool = False) -> dict[str, Any]:
- def f(obj: Any, type: Callable[[Any], Any]) -> Any:
+ def f(obj: Any, type: Callable[[Any], Any] | None = None) -> Any:
if recurse and isinstance(obj, Object):
return obj.as_dict(recurse=True)
return type(obj) if type and obj is not None else obj
diff --git a/kombu/asynchronous/http/base.py b/kombu/asynchronous/http/base.py
index 89be531f..345e07f6 100644
--- a/kombu/asynchronous/http/base.py
+++ b/kombu/asynchronous/http/base.py
@@ -44,10 +44,12 @@ class Request:
"""A HTTP Request.
Arguments:
+ ---------
url (str): The URL to request.
method (str): The HTTP method to use (defaults to ``GET``).
Keyword Arguments:
+ -----------------
headers (Dict, ~kombu.asynchronous.http.Headers): Optional headers for
this request
body (str): Optional body for this request.
@@ -138,7 +140,8 @@ class Request:
class Response:
"""HTTP Response.
- Arguments:
+ Arguments
+ ---------
request (~kombu.asynchronous.http.Request): See :attr:`request`.
code (int): See :attr:`code`.
headers (~kombu.asynchronous.http.Headers): See :attr:`headers`.
@@ -146,7 +149,8 @@ class Response:
effective_url (str): See :attr:`effective_url`.
status (str): See :attr:`status`.
- Attributes:
+ Attributes
+ ----------
request (~kombu.asynchronous.http.Request): object used to
get this response.
code (int): HTTP response code (e.g. 200, 404, or 500).
@@ -182,7 +186,8 @@ class Response:
def raise_for_error(self):
"""Raise if the request resulted in an HTTP error code.
- Raises:
+ Raises
+ ------
:class:`~kombu.exceptions.HttpError`
"""
if self.error:
@@ -193,6 +198,7 @@ class Response:
"""The full contents of the response body.
Note:
+ ----
Accessing this property will evaluate the buffer
and subsequent accesses will be cached.
"""
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py
index e5b1163c..c2d301d7 100644
--- a/kombu/asynchronous/hub.py
+++ b/kombu/asynchronous/hub.py
@@ -57,6 +57,7 @@ class Hub:
"""Event loop object.
Arguments:
+ ---------
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""
diff --git a/kombu/asynchronous/semaphore.py b/kombu/asynchronous/semaphore.py
index 07fb8a09..940e6d86 100644
--- a/kombu/asynchronous/semaphore.py
+++ b/kombu/asynchronous/semaphore.py
@@ -26,6 +26,7 @@ class LaxBoundedSemaphore:
range even if released more times than it was acquired.
Example:
+ -------
>>> x = LaxBoundedSemaphore(2)
>>> x.acquire(print, 'HELLO 1')
@@ -61,6 +62,7 @@ class LaxBoundedSemaphore:
until the semaphore is released.
Arguments:
+ ---------
callback (Callable): The callback to apply.
*partial_args (Any): partial arguments to callback.
"""
@@ -77,6 +79,7 @@ class LaxBoundedSemaphore:
"""Release semaphore.
Note:
+ ----
If there are any waiters this will apply the first waiter
that is waiting for the resource (FIFO order).
"""
diff --git a/kombu/asynchronous/timer.py b/kombu/asynchronous/timer.py
index ab0291da..a3b7fb77 100644
--- a/kombu/asynchronous/timer.py
+++ b/kombu/asynchronous/timer.py
@@ -149,6 +149,7 @@ class Timer:
"""Enter function into the scheduler.
Arguments:
+ ---------
entry (~kombu.asynchronous.timer.Entry): Item to enter.
eta (datetime.datetime): Scheduled time.
priority (int): Unused.
diff --git a/kombu/clocks.py b/kombu/clocks.py
index d02e8b32..b5ab0866 100644
--- a/kombu/clocks.py
+++ b/kombu/clocks.py
@@ -18,6 +18,7 @@ class timetuple(tuple):
Can be used as part of a heap to keep events ordered.
Arguments:
+ ---------
clock (Optional[int]): Event clock value.
timestamp (float): Event UNIX timestamp value.
id (str): Event host id (e.g. ``hostname:pid``).
@@ -85,7 +86,8 @@ class LamportClock:
process receives a message, it resynchronizes its logical clock with
the sender.
- See Also:
+ See Also
+ --------
* `Lamport timestamps`_
* `Lamports distributed mutex`_
diff --git a/kombu/common.py b/kombu/common.py
index c7b2d50a..c00ae0fe 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -66,6 +66,7 @@ class Broadcast(Queue):
and both the queue and exchange is configured with auto deletion.
Arguments:
+ ---------
name (str): This is used as the name of the exchange.
queue (str): By default a unique id is used for the queue
name for every consumer. You can specify a custom
@@ -203,7 +204,8 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
``eventloop`` is a generator.
- Examples:
+ Examples
+ --------
>>> from kombu.common import eventloop
>>> def run(conn):
@@ -219,7 +221,8 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
for _ in eventloop(connection, limit=1, timeout=1):
pass
- See Also:
+ See Also
+ --------
:func:`itermessages`, which is an event loop bound to one or more
consumers, that yields any messages received.
"""
@@ -236,6 +239,7 @@ def send_reply(exchange, req, msg,
"""Send reply for request.
Arguments:
+ ---------
exchange (kombu.Exchange, str): Reply exchange
req (~kombu.Message): Original request, a message with
a ``reply_to`` property.
@@ -309,6 +313,7 @@ def ignore_errors(conn, fun=None, *args, **kwargs):
Note:
+ ----
Connection and channel errors should be properly handled,
and not ignored. Using this function is only acceptable in a cleanup
phase, like when a connection is lost or at shutdown.
@@ -348,12 +353,14 @@ class QoS:
"""Thread safe increment/decrement of a channels prefetch_count.
Arguments:
+ ---------
callback (Callable): Function used to set new prefetch count,
e.g. ``consumer.qos`` or ``channel.basic_qos``. Will be called
with a single ``prefetch_count`` keyword argument.
initial_value (int): Initial prefetch count value..
Example:
+ -------
>>> from kombu import Consumer, Connection
>>> connection = Connection('amqp://')
>>> consumer = Consumer(connection)
@@ -396,6 +403,7 @@ class QoS:
"""Increment the value, but do not update the channels QoS.
Note:
+ ----
The MainThread will be responsible for calling :meth:`update`
when necessary.
"""
@@ -408,6 +416,7 @@ class QoS:
"""Decrement the value, but do not update the channels QoS.
Note:
+ ----
The MainThread will be responsible for calling :meth:`update`
when necessary.
"""
diff --git a/kombu/compression.py b/kombu/compression.py
index f98c971b..2c1c8591 100644
--- a/kombu/compression.py
+++ b/kombu/compression.py
@@ -18,6 +18,7 @@ def register(encoder, decoder, content_type, aliases=None):
"""Register new compression method.
Arguments:
+ ---------
encoder (Callable): Function used to compress text.
decoder (Callable): Function used to decompress previously
compressed text.
@@ -52,6 +53,7 @@ def compress(body, content_type):
"""Compress text.
Arguments:
+ ---------
body (AnyStr): The text to compress.
content_type (str): mime-type of compression method to use.
"""
@@ -63,6 +65,7 @@ def decompress(body, content_type):
"""Decompress compressed text.
Arguments:
+ ---------
body (AnyStr): Previously compressed text to uncompress.
content_type (str): mime-type of compression method used.
"""
diff --git a/kombu/connection.py b/kombu/connection.py
index 0c9779b5..12a08fe0 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -64,6 +64,7 @@ class Connection:
"""A connection to the broker.
Example:
+ -------
>>> Connection('amqp://guest:guest@localhost:5672//')
>>> Connection('amqp://foo;amqp://bar',
... failover_strategy='round-robin')
@@ -80,13 +81,16 @@ class Connection:
... })
Note:
+ ----
SSL currently only works with the py-amqp, and qpid
transports. For other transports you can use stunnel.
Arguments:
+ ---------
URL (str, Sequence): Broker URL, or a list of URLs.
Keyword Arguments:
+ -----------------
ssl (bool/dict): Use SSL to connect to the server.
Default is ``False``.
May not be supported by the specified transport.
@@ -102,6 +106,7 @@ class Connection:
around once per second.
Note:
+ ----
The connection is established lazily when needed. If you need the
connection to be established, then force it by calling
:meth:`connect`::
@@ -233,9 +238,11 @@ class Connection:
"""Switch connection parameters to use a new URL or hostname.
Note:
+ ----
Does not reconnect!
Arguments:
+ ---------
conn_str (str): either a hostname or URL.
"""
self.close()
@@ -311,6 +318,7 @@ class Connection:
this is a noop operation.
Arguments:
+ ---------
rate (int): Rate is how often the tick is called
compared to the actual heartbeat value. E.g. if
the heartbeat is set to 3 seconds, and the tick
@@ -323,9 +331,11 @@ class Connection:
"""Wait for a single event from the server.
Arguments:
+ ---------
timeout (float): Timeout in seconds before we give up.
- Raises:
+ Raises
+ ------
socket.timeout: if the timeout is exceeded.
"""
return self.transport.drain_events(self.connection, **kwargs)
@@ -408,6 +418,7 @@ class Connection:
specified.
Arguments:
+ ---------
errback (Callable): Optional callback called each time the
connection can't be established. Arguments provided are
the exception raised and the interval that will be
@@ -491,6 +502,7 @@ class Connection:
the function.
Arguments:
+ ---------
obj: The object to ensure an action on.
fun (Callable): Method to apply.
@@ -515,7 +527,8 @@ class Connection:
regardless of the connection state. Must provide max_retries
if this is specified.
- Examples:
+ Examples
+ --------
>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
@@ -601,10 +614,12 @@ class Connection:
If a ``channel`` is not provided, then one will be automatically
acquired (remember to close it afterwards).
- See Also:
+ See Also
+ --------
:meth:`ensure` for the full list of supported keyword arguments.
Example:
+ -------
>>> channel = connection.channel()
>>> try:
... ret, channel = connection.autoretry(
@@ -730,14 +745,17 @@ class Connection:
def Pool(self, limit=None, **kwargs):
"""Pool of connections.
- See Also:
+ See Also
+ --------
:class:`ConnectionPool`.
Arguments:
+ ---------
limit (int): Maximum number of active connections.
Default is no limit.
Example:
+ -------
>>> connection = Connection('amqp://')
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
@@ -756,14 +774,17 @@ class Connection:
def ChannelPool(self, limit=None, **kwargs):
"""Pool of channels.
- See Also:
+ See Also
+ --------
:class:`ChannelPool`.
Arguments:
+ ---------
limit (int): Maximum number of active channels.
Default is no limit.
Example:
+ -------
>>> connection = Connection('amqp://')
>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
@@ -802,6 +823,7 @@ class Connection:
also it will be used as the default routing key.
Arguments:
+ ---------
name (str, kombu.Queue): Name of the queue/or a queue.
no_ack (bool): Disable acknowledgments. Default is false.
queue_opts (Dict): Additional keyword arguments passed to the
@@ -828,7 +850,8 @@ class Connection:
Create new :class:`~kombu.simple.SimpleQueue` using a channel
from this connection.
- See Also:
+ See Also
+ --------
Same as :meth:`SimpleQueue`, but configured with buffering
semantics. The resulting queue and exchange will not be durable,
also auto delete is enabled. Messages will be transient (not
@@ -901,6 +924,7 @@ class Connection:
"""The underlying connection object.
Warning:
+ -------
This instance is transport specific, so do not
depend on the interface of this object.
"""
@@ -925,6 +949,7 @@ class Connection:
Created upon access and closed when the connection is closed.
Note:
+ ----
Can be used for automatic channel handling when you only need one
channel, and also it is the channel implicitly used if
a connection is passed instead of a channel, to functions that
diff --git a/kombu/entity.py b/kombu/entity.py
index 2329e748..5e6419ae 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -42,6 +42,7 @@ class Exchange(MaybeChannelBound):
"""An Exchange declaration.
Arguments:
+ ---------
name (str): See :attr:`name`.
type (str): See :attr:`type`.
channel (kombu.Connection, ChannelT): See :attr:`channel`.
@@ -51,7 +52,8 @@ class Exchange(MaybeChannelBound):
arguments (Dict): See :attr:`arguments`.
no_declare (bool): See :attr:`no_declare`
- Attributes:
+ Attributes
+ ----------
name (str): Name of the exchange.
Default is no name (the default exchange).
@@ -190,6 +192,7 @@ class Exchange(MaybeChannelBound):
"""Bind the exchange to another exchange.
Arguments:
+ ---------
nowait (bool): If set the server will not respond, and the call
will not block waiting for a response.
Default is :const:`False`.
@@ -221,6 +224,7 @@ class Exchange(MaybeChannelBound):
"""Create message instance to be sent with :meth:`publish`.
Arguments:
+ ---------
body (Any): Message body.
delivery_mode (bool): Set custom delivery mode.
@@ -259,6 +263,7 @@ class Exchange(MaybeChannelBound):
"""Publish message.
Arguments:
+ ---------
message (Union[kombu.Message, str, bytes]):
Message to publish.
routing_key (str): Message routing key.
@@ -280,6 +285,7 @@ class Exchange(MaybeChannelBound):
"""Delete the exchange declaration on server.
Arguments:
+ ---------
if_unused (bool): Delete only if the exchange has no bindings.
Default is :const:`False`.
nowait (bool): If set the server will not respond, and a
@@ -322,6 +328,7 @@ class binding(Object):
"""Represents a queue or exchange binding.
Arguments:
+ ---------
exchange (Exchange): Exchange to bind to.
routing_key (str): Routing key used as binding key.
arguments (Dict): Arguments for bind operation.
@@ -376,6 +383,7 @@ class Queue(MaybeChannelBound):
"""A Queue declaration.
Arguments:
+ ---------
name (str): See :attr:`name`.
exchange (Exchange, str): See :attr:`exchange`.
routing_key (str): See :attr:`routing_key`.
@@ -394,7 +402,8 @@ class Queue(MaybeChannelBound):
max_length_bytes (int): See :attr:`max_length_bytes`.
max_priority (int): See :attr:`max_priority`.
- Attributes:
+ Attributes
+ ----------
name (str): Name of the queue.
Default is no name (default queue destination).
@@ -628,6 +637,7 @@ class Queue(MaybeChannelBound):
"""Declare queue on the server.
Arguments:
+ ---------
nowait (bool): Do not wait for a reply.
passive (bool): If set, the server will not create the queue.
The client can use this to check whether a queue exists
@@ -684,11 +694,13 @@ class Queue(MaybeChannelBound):
specific types of applications where synchronous functionality
is more important than performance.
- Returns:
+ Returns
+ -------
~kombu.Message: if a message was available,
or :const:`None` otherwise.
Arguments:
+ ---------
no_ack (bool): If enabled the broker will
automatically ack messages.
accept (Set[str]): Custom list of accepted content types.
@@ -717,6 +729,7 @@ class Queue(MaybeChannelBound):
until the client cancels them.
Arguments:
+ ---------
consumer_tag (str): Unique identifier for the consumer.
The consumer tag is local to a connection, so two clients
can use the same consumer tags. If this field is empty
@@ -747,6 +760,7 @@ class Queue(MaybeChannelBound):
"""Delete the queue.
Arguments:
+ ---------
if_unused (bool): If set, the server will only delete the queue
if it has no consumers. A channel error will be raised
if the queue has consumers.
diff --git a/kombu/message.py b/kombu/message.py
index f2af1686..3c9d07ae 100644
--- a/kombu/message.py
+++ b/kombu/message.py
@@ -19,7 +19,7 @@ class Message:
"""Base class for received messages.
Keyword Arguments:
-
+ -----------------
channel (ChannelT): If message was received, this should be the
channel that the message was received on.
@@ -103,7 +103,8 @@ class Message:
This will remove the message from the queue.
- Raises:
+ Raises
+ ------
MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
@@ -148,7 +149,8 @@ class Message:
The message will be discarded by the server.
- Raises:
+ Raises
+ ------
MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
@@ -166,10 +168,12 @@ class Message:
"""Reject this message and put it back on the queue.
Warning:
+ -------
You must not use this method as a means of selecting messages
to process.
- Raises:
+ Raises
+ ------
MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
@@ -189,6 +193,7 @@ class Message:
Returning the original python structure sent by the publisher.
Note:
+ ----
The return value is memoized, use `_decode` to force
re-evaluation.
"""
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 2b600224..fed7b3f0 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -23,6 +23,7 @@ class Producer:
"""Message Producer.
Arguments:
+ ---------
channel (kombu.Connection, ChannelT): Connection or channel.
exchange (kombu.entity.Exchange, str): Optional default exchange.
routing_key (str): Optional default routing key.
@@ -93,6 +94,7 @@ class Producer:
"""Declare the exchange.
Note:
+ ----
This happens automatically at instantiation when
the :attr:`auto_declare` flag is enabled.
"""
@@ -126,6 +128,7 @@ class Producer:
"""Publish message to the specified exchange.
Arguments:
+ ---------
body (Any): Message body.
routing_key (str): Message routing key.
delivery_mode (enum): See :attr:`delivery_mode`.
@@ -293,6 +296,7 @@ class Consumer:
"""Message consumer.
Arguments:
+ ---------
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
@@ -426,6 +430,7 @@ class Consumer:
"""Declare queues, exchanges and bindings.
Note:
+ ----
This is done automatically at instantiation
when :attr:`auto_declare` is set.
"""
@@ -436,6 +441,7 @@ class Consumer:
"""Register a new callback to be called when a message is received.
Note:
+ ----
The signature of the callback needs to accept two arguments:
`(body, message)`, which is the decoded message body
and the :class:`~kombu.Message` instance.
@@ -464,6 +470,7 @@ class Consumer:
"""Add a queue to the list of queues to consume from.
Note:
+ ----
This will not start consuming from the queue,
for that you will have to call :meth:`consume` after.
"""
@@ -482,6 +489,7 @@ class Consumer:
use :meth:`cancel_by_queue`).
Arguments:
+ ---------
no_ack (bool): See :attr:`no_ack`.
"""
queues = list(self._queues.values())
@@ -497,6 +505,7 @@ class Consumer:
"""End all active queue consumers.
Note:
+ ----
This does not affect already delivered messages, but it does
mean the server will not send any more messages for this consumer.
"""
@@ -530,6 +539,7 @@ class Consumer:
"""Purge messages from all queues.
Warning:
+ -------
This will *delete all ready messages*, there is no undo operation.
"""
return sum(queue.purge() for queue in self._queues.values())
@@ -559,6 +569,7 @@ class Consumer:
The prefetch window is Ignored if the :attr:`no_ack` option is set.
Arguments:
+ ---------
prefetch_size (int): Specify the prefetch window in octets.
The server will send a message in advance if it is equal to
or smaller in size than the available prefetch size (and
@@ -582,6 +593,7 @@ class Consumer:
on the specified channel.
Arguments:
+ ---------
requeue (bool): By default the messages will be redelivered
to the original recipient. With `requeue` set to true, the
server will attempt to requeue the message, potentially then
@@ -595,10 +607,12 @@ class Consumer:
This dispatches to the registered :attr:`callbacks`.
Arguments:
+ ---------
body (Any): The decoded message body.
message (~kombu.Message): The message instance.
- Raises:
+ Raises
+ ------
NotImplementedError: If no consumer callbacks have been
registered.
"""
diff --git a/kombu/mixins.py b/kombu/mixins.py
index f1b3c1c9..14c1c1b9 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -49,6 +49,7 @@ class ConsumerMixin:
channels can be used for different QoS requirements.
Example:
+ -------
.. code-block:: python
class Worker(ConsumerMixin):
@@ -65,8 +66,8 @@ class ConsumerMixin:
print('Got task: {0!r}'.format(body))
message.ack()
- Methods:
-
+ Methods
+ -------
* :meth:`extra_context`
Optional extra context manager that will be entered
@@ -257,6 +258,7 @@ class ConsumerProducerMixin(ConsumerMixin):
publishing messages.
Example:
+ -------
.. code-block:: python
class Worker(ConsumerProducerMixin):
diff --git a/kombu/resource.py b/kombu/resource.py
index 53ba1145..9f75eb06 100644
--- a/kombu/resource.py
+++ b/kombu/resource.py
@@ -67,12 +67,14 @@ class Resource:
"""Acquire resource.
Arguments:
+ ---------
block (bool): If the limit is exceeded,
then block until there is an available item.
timeout (float): Timeout to wait
if ``block`` is true. Default is :const:`None` (forever).
- Raises:
+ Raises
+ ------
LimitExceeded: if block is false and the limit has been exceeded.
"""
if self._closed:
@@ -103,6 +105,7 @@ class Resource:
"""Release resource so it can be used by another thread.
Warnings:
+ --------
The caller is responsible for discarding the object,
and to never use the resource again. A new resource must
be acquired if so needed.
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 5cddeb0b..f455dfaa 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -72,6 +72,7 @@ class SerializerRegistry:
"""Register a new encoder/decoder.
Arguments:
+ ---------
name (str): A convenience name for the serialization method.
encoder (callable): A method that will be passed a python data
@@ -114,9 +115,11 @@ class SerializerRegistry:
"""Unregister registered encoder/decoder.
Arguments:
+ ---------
name (str): Registered serialization method name.
- Raises:
+ Raises
+ ------
SerializerNotInstalled: If a serializer by that name
cannot be found.
"""
@@ -134,11 +137,13 @@ class SerializerRegistry:
"""Set the default serialization method used by this library.
Arguments:
+ ---------
name (str): The name of the registered serialization method.
For example, `json` (default), `pickle`, `yaml`, `msgpack`,
or any custom methods registered using :meth:`register`.
- Raises:
+ Raises
+ ------
SerializerNotInstalled: If the serialization method
requested is not available.
"""
@@ -156,6 +161,7 @@ class SerializerRegistry:
as an AMQP message body.
Arguments:
+ ---------
data (List, Dict, str): The message data to send.
serializer (str): An optional string representing
@@ -171,12 +177,14 @@ class SerializerRegistry:
serialization method will be used even if a :class:`str`
or :class:`unicode` object is passed in.
- Returns:
+ Returns
+ -------
Tuple[str, str, str]: A three-item tuple containing the
content type (e.g., `application/json`), content encoding, (e.g.,
`utf-8`) and a string containing the serialized data.
- Raises:
+ Raises
+ ------
SerializerNotInstalled: If the serialization method
requested is not available.
"""
@@ -220,6 +228,7 @@ class SerializerRegistry:
based on `content_type`.
Arguments:
+ ---------
data (bytes, buffer, str): The message data to deserialize.
content_type (str): The content-type of the data.
@@ -230,10 +239,12 @@ class SerializerRegistry:
accept (Set): List of content-types to accept.
- Raises:
+ Raises
+ ------
ContentDisallowed: If the content-type is not accepted.
- Returns:
+ Returns
+ -------
Any: The unserialized data.
"""
content_type = (bytes_to_str(content_type) if content_type
@@ -343,7 +354,8 @@ def register_pickle():
def register_msgpack():
"""Register msgpack serializer.
- See Also:
+ See Also
+ --------
https://msgpack.org/.
"""
pack = unpack = None
@@ -391,6 +403,7 @@ def enable_insecure_serializers(choices=NOTSET):
"""Enable serializers that are considered to be unsafe.
Note:
+ ----
Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you
can also specify a list of serializers (by name or content type)
to enable.
@@ -411,6 +424,7 @@ def disable_insecure_serializers(allowed=NOTSET):
or you can specify a list of deserializers to allow.
Note:
+ ----
Producers will still be able to serialize data
in these formats, but consumers will not accept
incoming data using the untrusted content types.
@@ -434,7 +448,8 @@ for ep, args in entrypoints('kombu.serializers'): # pragma: no cover
def prepare_accept_content(content_types, name_to_type=None):
"""Replace aliases of content_types with full names from registry.
- Raises:
+ Raises
+ ------
SerializerNotInstalled: If the serialization method
requested is not available.
"""
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 8a935cdd..019ac554 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -305,7 +305,8 @@ class Channel(virtual.Channel):
def drain_events(self, timeout=None, callback=None, **kwargs):
"""Return a single payload message from one of our queues.
- Raises:
+ Raises
+ ------
Queue.Empty: if no messages available.
"""
# If we're not allowed to consume or have no consumers, raise Empty
@@ -318,7 +319,8 @@ class Channel(virtual.Channel):
def _reset_cycle(self):
"""Reset the consume cycle.
- Returns:
+ Returns
+ -------
FairCycle: object that points to our _get_bulk() method
rather than the standard _get() method. This allows for
multiple messages to be returned at once from SQS (
@@ -341,11 +343,12 @@ class Channel(virtual.Channel):
return self.entity_name(self.queue_name_prefix + queue_name)
def _new_queue(self, queue, **kwargs):
- """
- Ensure a queue with given name exists in SQS.
+ """Ensure a queue with given name exists in SQS.
+
Arguments:
+ ---------
queue (str): the AMQP queue name
- Returns:
+ Returns
str: the SQS queue URL
"""
# Translate to SQS name for consistency with initial
@@ -481,10 +484,12 @@ class Channel(virtual.Channel):
the 'ack' settings for that queue.
Arguments:
+ ---------
messages (SQSMessage): A list of SQS Message objects.
queue (str): Name representing the queue they came from.
- Returns:
+ Returns
+ -------
List: A list of Payload objects
"""
q_url = self._new_queue(queue)
@@ -501,15 +506,18 @@ class Channel(virtual.Channel):
prefetch_count).
Note:
+ ----
Ignores QoS limits so caller is responsible for checking
that we are allowed to consume at least one message from the
queue. get_bulk will then ask QoS for an estimate of
the number of extra messages that we can consume.
Arguments:
+ ---------
queue (str): The queue name to pull from.
- Returns:
+ Returns
+ -------
List[Message]
"""
# drain_events calls `can_consume` first, consuming
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 8a217691..e00e3386 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -52,6 +52,7 @@ def resolve_transport(transport: str | None = None) -> str | None:
"""Get transport by name.
Arguments:
+ ---------
transport (Union[str, type]): This can be either
an actual transport class, or the fully qualified
path to a transport class, or the alias of a transport.
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index ec4c0aca..e52bcc60 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -54,7 +54,8 @@ def to_rabbitmq_queue_arguments(arguments, **options):
max_priority (int): Max priority steps for queue.
This will be converted to ``x-max-priority`` int.
- Returns:
+ Returns
+ -------
Dict: RabbitMQ compatible queue arguments.
"""
prepared = dictfilter(dict(
@@ -95,7 +96,8 @@ class StdChannel:
def after_reply_message_received(self, queue):
"""Callback called after RPC reply received.
- Notes:
+ Notes
+ -----
Reply queue semantics: can be used to delete the queue
after transient reply message received.
"""
diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py
index 7ace52f6..370a4302 100644
--- a/kombu/transport/consul.py
+++ b/kombu/transport/consul.py
@@ -97,9 +97,11 @@ class Channel(virtual.Channel):
read-consistency between the nodes.
Arguments:
+ ---------
queue (str): The name of the Queue.
- Returns:
+ Returns
+ -------
str: The ID of the session.
"""
try:
@@ -134,13 +136,16 @@ class Channel(virtual.Channel):
means that they have to wait before the lock is released.
Arguments:
+ ---------
queue (str): The name of the Queue.
raising (Exception): Set custom lock error class.
- Raises:
+ Raises
+ ------
LockError: if the lock cannot be acquired.
- Returns:
+ Returns
+ -------
bool: success?
"""
self._acquire_lock(queue, raising=raising)
@@ -170,6 +175,7 @@ class Channel(virtual.Channel):
It does so by simply removing the lock key in Consul.
Arguments:
+ ---------
queue (str): The name of the queue we want to release
the lock from.
"""
@@ -182,6 +188,7 @@ class Channel(virtual.Channel):
Will release all locks it still might hold.
Arguments:
+ ---------
queue (str): The name of the Queue.
"""
logger.debug('Destroying session %s', self.queues[queue]['session_id'])
diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py
index 2ab85841..aa4771b7 100644
--- a/kombu/transport/etcd.py
+++ b/kombu/transport/etcd.py
@@ -78,6 +78,7 @@ class Channel(virtual.Channel):
"""Create and return the `queue` with the proper prefix.
Arguments:
+ ---------
queue (str): The name of the queue.
"""
return f'{self.prefix}/{queue}'
@@ -93,6 +94,7 @@ class Channel(virtual.Channel):
means that they have to wait before the lock is released.
Arguments:
+ ---------
queue (str): The name of the queue.
"""
lock = etcd.Lock(self.client, queue)
@@ -109,6 +111,7 @@ class Channel(virtual.Channel):
"""Create a new `queue` if the `queue` doesn't already exist.
Arguments:
+ ---------
queue (str): The name of the queue.
"""
self.queues[queue] = queue
@@ -123,7 +126,8 @@ class Channel(virtual.Channel):
def _has_queue(self, queue, **kwargs):
"""Verify that queue exists.
- Returns:
+ Returns
+ -------
bool: Should return :const:`True` if the queue exists
or :const:`False` otherwise.
"""
@@ -137,6 +141,7 @@ class Channel(virtual.Channel):
"""Delete a `queue`.
Arguments:
+ ---------
queue (str): The name of the queue.
"""
self.queues.pop(queue, None)
@@ -148,6 +153,7 @@ class Channel(virtual.Channel):
This simply writes a key to the Etcd store
Arguments:
+ ---------
queue (str): The name of the queue.
payload (dict): Message data which will be dumped to etcd.
"""
@@ -166,6 +172,7 @@ class Channel(virtual.Channel):
only one node reads at the same time. This is for read consistency
Arguments:
+ ---------
queue (str): The name of the queue.
timeout (int): Optional seconds to wait for a response.
"""
@@ -196,6 +203,7 @@ class Channel(virtual.Channel):
"""Remove all `message`s from a `queue`.
Arguments:
+ ---------
queue (str): The name of the queue.
"""
with self._queue_lock(queue):
@@ -207,6 +215,7 @@ class Channel(virtual.Channel):
"""Return the size of the `queue`.
Arguments:
+ ---------
queue (str): The name of the queue.
"""
with self._queue_lock(queue):
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 9eef6b57..5c6877ca 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -448,6 +448,7 @@ class Channel(virtual.Channel):
"""Get expiration header named `argument` of queue definition.
Note:
+ ----
`queue` must be either queue name or options itself.
"""
if isinstance(queue, str):
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 6cbfbdcf..fac15014 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -1362,7 +1362,7 @@ class SentinelChannel(Channel):
* `master_name` - name of the redis group to poll
Example:
-
+ -------
.. code-block:: python
>>> import kombu
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index a61c8ea8..ee2b0e65 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -20,8 +20,9 @@ Connection String
sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING
For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.
-Examples:
+Examples
+--------
.. code-block::
# PostgreSQL with default driver
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index ec47ecb8..4b138aa9 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -159,6 +159,7 @@ class QoS:
Only supports `prefetch_count` at this point.
Arguments:
+ ---------
channel (ChannelT): Connection channel.
prefetch_count (int): Initial prefetch count (defaults to 0).
"""
@@ -211,7 +212,8 @@ class QoS:
bulk 'get message' calls are preferred to many individual 'get message'
calls - like SQS.
- Returns:
+ Returns
+ -------
int: greater than zero.
"""
pcount = self.prefetch_count
@@ -273,6 +275,7 @@ class QoS:
"""Restore all unacknowledged messages at shutdown/gc collect.
Note:
+ ----
Can only be called once for each instance, subsequent
calls will be ignored.
"""
@@ -306,6 +309,7 @@ class QoS:
To be filled in for visibility_timeout style implementations.
Note:
+ ----
This is implementation optional, and currently only
used by the Redis transport.
"""
@@ -355,6 +359,7 @@ class AbstractChannel:
you'd usually want to implement in a virtual channel.
Note:
+ ----
Do not subclass directly, but rather inherit
from :class:`Channel`.
"""
@@ -379,6 +384,7 @@ class AbstractChannel:
"""Delete `queue`.
Note:
+ ----
This just purges the queue, if you need to do more you can
override this method.
"""
@@ -388,6 +394,7 @@ class AbstractChannel:
"""Create new queue.
Note:
+ ----
Your transport can override this method if it needs
to do something whenever a new queue is declared.
"""
@@ -395,7 +402,8 @@ class AbstractChannel:
def _has_queue(self, queue, **kwargs):
"""Verify that queue exists.
- Returns:
+ Returns
+ -------
bool: Should return :const:`True` if the queue exists
or :const:`False` otherwise.
"""
@@ -414,6 +422,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Virtual channel.
Arguments:
+ ---------
connection (ConnectionT): The transport instance this
channel is part of.
"""
@@ -675,6 +684,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Change QoS settings for this channel.
Note:
+ ----
Only `prefetch_count` is supported.
"""
self.qos.prefetch_count = prefetch_count
@@ -697,7 +707,8 @@ class Channel(AbstractChannel, base.StdChannel):
def _lookup(self, exchange, routing_key, default=None):
"""Find all queues matching `routing_key` for the given `exchange`.
- Returns:
+ Returns
+ -------
list[str]: queue names -- must return `[default]`
if default is set and no queues matched.
"""
@@ -765,7 +776,8 @@ class Channel(AbstractChannel, base.StdChannel):
def flow(self, active=True):
"""Enable/disable message flow.
- Raises:
+ Raises
+ ------
NotImplementedError: as flow
is not implemented by the base virtual implementation.
"""
@@ -838,6 +850,7 @@ class Channel(AbstractChannel, base.StdChannel):
The value is limited to within a boundary of 0 to 9.
Note:
+ ----
Higher value has more priority.
"""
try:
@@ -887,6 +900,7 @@ class Transport(base.Transport):
"""Virtual transport.
Arguments:
+ ---------
client (kombu.Connection): The client this is a transport for.
"""
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index b70544cd..e7cfd373 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -17,6 +17,7 @@ class ExchangeType:
Implements the specifics for an exchange type.
Arguments:
+ ---------
channel (ChannelT): AMQ Channel.
"""
@@ -28,7 +29,8 @@ class ExchangeType:
def lookup(self, table, exchange, routing_key, default):
"""Lookup all queues matching `routing_key` in `exchange`.
- Returns:
+ Returns
+ -------
str: queue name, or 'default' if no queues matched.
"""
raise NotImplementedError('subclass responsibility')
@@ -36,7 +38,8 @@ class ExchangeType:
def prepare_bind(self, queue, exchange, routing_key, arguments):
"""Prepare queue-binding.
- Returns:
+ Returns
+ -------
Tuple[str, Pattern, str]: of `(routing_key, regex, queue)`
to be stored for bindings to this exchange.
"""
@@ -137,7 +140,8 @@ class FanoutExchange(ExchangeType):
attribute is set to true, and the `Channel._queue_bind` and
`Channel.get_table` methods are implemented.
- See Also:
+ See Also
+ --------
the redis backend for an example implementation of these methods.
"""
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 6beb17d7..29c3d96d 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -45,6 +45,7 @@ class LRUCache(UserDict):
"""LRU Cache implementation using a doubly linked list to track access.
Arguments:
+ ---------
limit (int): The maximum number of keys to keep in the cache.
When a new key is inserted and the limit has been exceeded,
the *Least Recently Used* key will be discarded from the
@@ -221,6 +222,7 @@ def is_list(obj, scalars=(Mapping, str), iters=(Iterable,)):
"""Return true if the object is iterable.
Note:
+ ----
Returns false if object is a mapping or string.
"""
return isinstance(obj, iters) and not isinstance(obj, scalars or ())
@@ -279,11 +281,13 @@ def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
is increased for every retry until the max seconds is reached.
Arguments:
+ ---------
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
+ -----------------
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``
diff --git a/kombu/utils/imports.py b/kombu/utils/imports.py
index 8752fa1a..c9aff884 100644
--- a/kombu/utils/imports.py
+++ b/kombu/utils/imports.py
@@ -28,7 +28,8 @@ def symbol_by_name(name, aliases=None, imp=None, package=None,
If `aliases` is provided, a dict containing short name/long name
mappings, the name is looked up in the aliases first.
- Examples:
+ Examples
+ --------
>>> symbol_by_name('celery.concurrency.processes.TaskPool')
<class 'celery.concurrency.processes.TaskPool'>
diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py
index 36d11f1f..413c6c62 100644
--- a/kombu/utils/limits.py
+++ b/kombu/utils/limits.py
@@ -11,13 +11,15 @@ __all__ = ('TokenBucket',)
class TokenBucket:
"""Token Bucket Algorithm.
- See Also:
+ See Also
+ --------
https://en.wikipedia.org/wiki/Token_Bucket
Most of this code was stolen from an entry in the ASPN Python Cookbook:
https://code.activestate.com/recipes/511490/
Warning:
+ -------
Thread Safety: This implementation is not thread safe.
Access to a `TokenBucket` instance should occur within the critical
section of any multithreaded code.
@@ -51,7 +53,8 @@ class TokenBucket:
def can_consume(self, tokens=1):
"""Check if one or more tokens can be consumed.
- Returns:
+ Returns
+ -------
bool: true if the number of tokens can be consumed
from the bucket. If they can be consumed, a call will also
consume the requested number of tokens from the bucket.
@@ -67,7 +70,8 @@ class TokenBucket:
def expected_time(self, tokens=1):
"""Return estimated time of token availability.
- Returns:
+ Returns
+ -------
float: the time in seconds.
"""
_tokens = self._get_tokens()
diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py
index 94286be8..e4bff7b4 100644
--- a/kombu/utils/scheduling.py
+++ b/kombu/utils/scheduling.py
@@ -24,6 +24,7 @@ class FairCycle:
an equal chance to be consumed from.
Arguments:
+ ---------
fun (Callable): Callback to call.
resources (Sequence[Any]): List of resources.
predicate (type): Exception predicate.
diff --git a/kombu/utils/text.py b/kombu/utils/text.py
index fea53347..a6558cfc 100644
--- a/kombu/utils/text.py
+++ b/kombu/utils/text.py
@@ -22,7 +22,8 @@ def escape_regex(p, white=''):
def fmatch_iter(needle: str, haystack: Iterable[str], min_ratio: float = 0.6) -> Iterator[tuple[float, str]]:
"""Fuzzy match: iteratively.
- Yields:
+ Yields
+ ------
Tuple: of ratio and key.
"""
for key in haystack:
diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py
index 9f77dad9..917eccf3 100644
--- a/kombu/utils/uuid.py
+++ b/kombu/utils/uuid.py
@@ -8,7 +8,8 @@ from uuid import UUID, uuid4
def uuid(_uuid: Callable[[], UUID] = uuid4) -> str:
"""Generate unique id in UUID4 format.
- See Also:
+ See Also
+ --------
For now this is provided by :func:`uuid.uuid4`.
"""
return str(_uuid())
diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt
index 3ab5ae7a..63ecff12 100644
--- a/requirements/pkgutils.txt
+++ b/requirements/pkgutils.txt
@@ -4,5 +4,5 @@ flake8==5.0.4
tox>=4.4.8
sphinx2rst>=1.0
bumpversion
-pydocstyle==1.1.1
+pydocstyle==6.3.0
mypy==1.2.0
diff --git a/setup.cfg b/setup.cfg
index d5c857d5..c087af81 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -48,7 +48,7 @@ files =
[pep257]
-ignore = D102,D104,D203,D105,D213
+ignore = D102,D107,D104,D203,D105,D213,D401,D413,D417
[bdist_rpm]
requires = amqp >= 5.
diff --git a/tox.ini b/tox.ini
index 38345e22..85288018 100644
--- a/tox.ini
+++ b/tox.ini
@@ -45,9 +45,9 @@ basepython =
pypy3.9: pypy3.9
pypy3.8: pypy3.8
3.7: python3.7
- 3.8,mypy: python3.8
- 3.9,apicheck,pydocstyle,flake8,linkcheck,cov: python3.9
- 3.10: python3.10
+ 3.8: python3.8
+ 3.9: python3.9
+ 3.10,apicheck,pydocstyle,flake8,linkcheck,cov,mypy: python3.10
3.11: python3.11
install_command = python -m pip --disable-pip-version-check install {opts} {packages}