summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2021-04-13 23:20:31 +0200
committerGitHub <noreply@github.com>2021-04-13 23:20:31 +0200
commitc35fd1d9b4dca7ddf764a832cb677d8cfebf171f (patch)
tree92850ca36f2189fe030edb2b6965db0657c11fbd /kombu
parent4615415bf000bb0cca2cc1116b307a5533bd6dc3 (diff)
downloadkombu-c35fd1d9b4dca7ddf764a832cb677d8cfebf171f.tar.gz
Make flake8 and pydocstyle happy (#1333)
* Make flake8 happy * Make pydocstyle happy
Diffstat (limited to 'kombu')
-rw-r--r--kombu/asynchronous/aws/sqs/connection.py9
-rw-r--r--kombu/connection.py7
-rw-r--r--kombu/transport/SQS.py81
-rw-r--r--kombu/transport/azureservicebus.py169
-rw-r--r--kombu/transport/base.py4
-rw-r--r--kombu/utils/objects.py2
6 files changed, 178 insertions, 94 deletions
diff --git a/kombu/asynchronous/aws/sqs/connection.py b/kombu/asynchronous/aws/sqs/connection.py
index ecaac41f..2a878601 100644
--- a/kombu/asynchronous/aws/sqs/connection.py
+++ b/kombu/asynchronous/aws/sqs/connection.py
@@ -56,10 +56,11 @@ class AsyncSQSConnection(AsyncAWSQueryConnection):
queue.id, callback=callback,
)
- def receive_message(self, queue, queue_url,
- number_messages=1, visibility_timeout=None,
- attributes=('ApproximateReceiveCount',), wait_time_seconds=None,
- callback=None):
+ def receive_message(
+ self, queue, queue_url, number_messages=1, visibility_timeout=None,
+ attributes=('ApproximateReceiveCount',), wait_time_seconds=None,
+ callback=None
+ ):
params = {'MaxNumberOfMessages': number_messages}
if visibility_timeout:
params['VisibilityTimeout'] = visibility_timeout
diff --git a/kombu/connection.py b/kombu/connection.py
index 47bbef66..e5b601fa 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -628,7 +628,9 @@ class Connection:
D = self.transport.default_connection_params
if not self.hostname:
- logger.warning(f"No hostname was supplied. Reverting to default '{D.get('hostname')}'")
+ logger.warning(
+ "No hostname was supplied. "
+ f"Reverting to default '{D.get('hostname')}'")
hostname = D.get('hostname')
else:
hostname = self.hostname
@@ -672,7 +674,8 @@ class Connection:
if self.transport.can_parse_url:
connection_as_uri = self.hostname
try:
- return self.transport.as_uri(connection_as_uri, include_password, mask)
+ return self.transport.as_uri(
+ connection_as_uri, include_password, mask)
except NotImplementedError:
pass
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 80de050c..f2061a68 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -70,12 +70,17 @@ exist in AWS) you can tell this transport about them as follows:
'sts_role_arn': 'arn:aws:iam::<xxx>:role/STSTest', # optional
'sts_token_timeout': 900 # optional
}
-
-backoff_policy & backoff_tasks are optional arguments. These arguments automatically change the message visibility timeout, in order to have different times between specific task
-retries. This would apply after task failure.
-AWS STS authentication is supported, by using sts_role_arn, and sts_token_timeout. sts_role_arn is the assumed IAM role ARN we are trying to access with.
-sts_token_timeout is the token timeout, defaults (and minimum) to 900 seconds. After the mentioned period, a new token will be created.
+backoff_policy & backoff_tasks are optional arguments. These arguments
+automatically change the message visibility timeout, in order to have
+different times between specific task retries. This would apply after
+task failure.
+
+AWS STS authentication is supported, by using sts_role_arn, and
+sts_token_timeout. sts_role_arn is the assumed IAM role ARN we are trying
+to access with. sts_token_timeout is the token timeout, defaults (and minimum)
+to 900 seconds. After the mentioned period, a new token will be created.
+
If you authenticate using Okta_ (e.g. calling |gac|_), you can also specify
@@ -163,11 +168,16 @@ class UndefinedQueueException(Exception):
class QoS(virtual.QoS):
+ """Quality of Service guarantees implementation for SQS."""
+
def reject(self, delivery_tag, requeue=False):
super().reject(delivery_tag, requeue=requeue)
- routing_key, message, backoff_tasks, backoff_policy = self._extract_backoff_policy_configuration_and_message(delivery_tag)
+ routing_key, message, backoff_tasks, backoff_policy = \
+ self._extract_backoff_policy_configuration_and_message(
+ delivery_tag)
if routing_key and message and backoff_tasks and backoff_policy:
- self.apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks)
+ self.apply_backoff_policy(
+ routing_key, delivery_tag, backoff_policy, backoff_tasks)
def _extract_backoff_policy_configuration_and_message(self, delivery_tag):
try:
@@ -182,9 +192,11 @@ class QoS(virtual.QoS):
backoff_policy = queue_config.get('backoff_policy')
return routing_key, message, backoff_tasks, backoff_policy
- def apply_backoff_policy(self, routing_key, delivery_tag, backoff_policy, backoff_tasks):
+ def apply_backoff_policy(self, routing_key, delivery_tag,
+ backoff_policy, backoff_tasks):
queue_url = self.channel._queue_cache[routing_key]
- task_name, number_of_retries = self.extract_task_name_and_number_of_retries(delivery_tag)
+ task_name, number_of_retries = \
+ self.extract_task_name_and_number_of_retries(delivery_tag)
if not task_name or not number_of_retries:
return None
policy_value = backoff_policy.get(number_of_retries)
@@ -200,7 +212,9 @@ class QoS(virtual.QoS):
def extract_task_name_and_number_of_retries(message):
message_headers = message.headers
task_name = message_headers['task']
- number_of_retries = int(message.properties['delivery_info']['sqs_message']['Attributes']['ApproximateReceiveCount'])
+ number_of_retries = int(
+ message.properties['delivery_info']['sqs_message']
+ ['Attributes']['ApproximateReceiveCount'])
return task_name, number_of_retries
@@ -380,7 +394,9 @@ class Channel(virtual.Channel):
@staticmethod
def __b64_encoded(byte_string):
try:
- return base64.b64encode(base64.b64decode(byte_string)) == byte_string
+ return base64.b64encode(
+ base64.b64decode(byte_string)
+ ) == byte_string
except Exception: # pylint: disable=broad-except
return False
@@ -613,7 +629,8 @@ class Channel(virtual.Channel):
# if "can't set attribute" not in str(exc):
# raise
- def new_sqs_client(self, region, access_key_id, secret_access_key, session_token=None):
+ def new_sqs_client(self, region, access_key_id,
+ secret_access_key, session_token=None):
session = boto3.session.Session(
region_name=region,
aws_access_key_id=access_key_id,
@@ -634,8 +651,9 @@ class Channel(virtual.Channel):
if queue is not None and self.predefined_queues:
if queue not in self.predefined_queues:
- raise UndefinedQueueException(f"Queue with name '{queue}' must be defined"
- " in 'predefined_queues'.")
+ raise UndefinedQueueException(
+ f"Queue with name '{queue}' must be defined"
+ " in 'predefined_queues'.")
q = self.predefined_queues[queue]
if self.transport_options.get('sts_role_arn'):
return self._handle_sts_session(queue, q)
@@ -643,10 +661,14 @@ class Channel(virtual.Channel):
if queue in self._predefined_queue_clients:
return self._predefined_queue_clients[queue]
else:
- c = self._predefined_queue_clients[queue] = self.new_sqs_client(
- region=q.get('region', self.region),
- access_key_id=q.get('access_key_id', self.conninfo.userid),
- secret_access_key=q.get('secret_access_key', self.conninfo.password))
+ c = self._predefined_queue_clients[queue] = \
+ self.new_sqs_client(
+ region=q.get('region', self.region),
+ access_key_id=q.get(
+ 'access_key_id', self.conninfo.userid),
+ secret_access_key=q.get(
+ 'secret_access_key', self.conninfo.password)
+ )
return c
if self._sqs is not None:
@@ -661,8 +683,9 @@ class Channel(virtual.Channel):
def _handle_sts_session(self, queue, q):
if not hasattr(self, 'sts_expiration'): # STS token - token init
- sts_creds = self.generate_sts_session_token(self.transport_options.get('sts_role_arn'),
- self.transport_options.get('sts_token_timeout', 900))
+ sts_creds = self.generate_sts_session_token(
+ self.transport_options.get('sts_role_arn'),
+ self.transport_options.get('sts_token_timeout', 900))
self.sts_expiration = sts_creds['Expiration']
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get('region', self.region),
@@ -671,9 +694,11 @@ class Channel(virtual.Channel):
session_token=sts_creds['SessionToken'],
)
return c
- elif self.sts_expiration.replace(tzinfo=None) < datetime.utcnow(): # STS token - refresh if expired
- sts_creds = self.generate_sts_session_token(self.transport_options.get('sts_role_arn'),
- self.transport_options.get('sts_token_timeout', 900))
+ # STS token - refresh if expired
+ elif self.sts_expiration.replace(tzinfo=None) < datetime.utcnow():
+ sts_creds = self.generate_sts_session_token(
+ self.transport_options.get('sts_role_arn'),
+ self.transport_options.get('sts_token_timeout', 900))
self.sts_expiration = sts_creds['Expiration']
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get('region', self.region),
@@ -696,7 +721,8 @@ class Channel(virtual.Channel):
def asynsqs(self, queue=None):
if queue is not None and self.predefined_queues:
- if queue in self._predefined_queue_async_clients and not hasattr(self, 'sts_expiration'):
+ if queue in self._predefined_queue_async_clients and \
+ not hasattr(self, 'sts_expiration'):
return self._predefined_queue_async_clients[queue]
if queue not in self.predefined_queues:
raise UndefinedQueueException((
@@ -704,9 +730,10 @@ class Channel(virtual.Channel):
"'predefined_queues'."
).format(queue))
q = self.predefined_queues[queue]
- c = self._predefined_queue_async_clients[queue] = AsyncSQSConnection( # noqa: E501
- sqs_connection=self.sqs(queue=queue),
- region=q.get('region', self.region)
+ c = self._predefined_queue_async_clients[queue] = \
+ AsyncSQSConnection(
+ sqs_connection=self.sqs(queue=queue),
+ region=q.get('region', self.region)
)
return c
diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py
index 331d0038..994485c2 100644
--- a/kombu/transport/azureservicebus.py
+++ b/kombu/transport/azureservicebus.py
@@ -6,12 +6,14 @@ queues and delete old queues as required.
Notes when using with Celery if you are experiencing issues with programs not
-terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library which in
-turn creates some threads. If the AzureServiceBus Channel is closed, said threads will
-be closed properly, but it seems there are times when Celery does not do this so these
-threads will be left running. As the uAMQP threads are not marked as Daemon threads, they
-will not be killed when the main thread exits. Setting the ``uamqp_keep_alive_interval``
-transport option to 0 will prevent the keep_alive thread from starting
+terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library
+which in turn creates some threads. If the AzureServiceBus Channel is closed,
+said threads will be closed properly, but it seems there are times when Celery
+does not do this so these threads will be left running. As the uAMQP threads
+are not marked as Daemon threads, they will not be killed when the main thread
+exits. Setting the ``uamqp_keep_alive_interval`` transport option to 0 will
+prevent the keep_alive thread from starting
+
More information about Azure Service Bus:
https://azure.microsoft.com/en-us/services/service-bus/
@@ -37,14 +39,19 @@ Connection string has the following format:
Transport Options
=================
-* ``queue_name_prefix`` - String prefix to prepend to queue names in a service bus namespace.
-* ``wait_time_seconds`` - Number of seconds to wait to receive messages. Default ``5``
-* ``peek_lock_seconds`` - Number of seconds the message is visible for before it is requeued
+* ``queue_name_prefix`` - String prefix to prepend to queue names in
+ a service bus namespace.
+* ``wait_time_seconds`` - Number of seconds to wait to receive messages.
+ Default ``5``
+* ``peek_lock_seconds`` - Number of seconds the message is visible for before
+ it is requeued
and sent to another consumer. Default ``60``
-* ``uamqp_keep_alive_interval`` - Interval in seconds the Azure uAMQP library should send keepalive messages.
-Default ``30``
+* ``uamqp_keep_alive_interval`` - Interval in seconds the Azure uAMQP library
+ should send keepalive messages.
+ Default ``30``
* ``retry_total`` - Azure SDK retry total. Default ``3``
-* ``retry_backoff_factor`` - Azure SDK exponential backoff factor. Default ``0.8``
+* ``retry_backoff_factor`` - Azure SDK exponential backoff factor.
+ Default ``0.8``
* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120``
"""
@@ -59,8 +66,9 @@ from kombu.utils.objects import cached_property
import azure.core.exceptions
import azure.servicebus.exceptions
import isodate
-from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusReceiver, ServiceBusSender, \
- ServiceBusReceiveMode
+from azure.servicebus import (
+ ServiceBusClient, ServiceBusMessage, ServiceBusReceiver, ServiceBusSender,
+ ServiceBusReceiveMode)
from azure.servicebus.management import ServiceBusAdministrationClient
from . import virtual
@@ -74,7 +82,11 @@ CHARS_REPLACE_TABLE = {
class SendReceive:
- def __init__(self, receiver: Optional[ServiceBusReceiver] = None, sender: Optional[ServiceBusSender] = None):
+ """Container for Sender and Receiver."""
+
+ def __init__(self,
+ receiver: Optional[ServiceBusReceiver] = None,
+ sender: Optional[ServiceBusSender] = None):
self.receiver = receiver # type: ServiceBusReceiver
self.sender = sender # type: ServiceBusSender
@@ -92,10 +104,14 @@ class Channel(virtual.Channel):
default_wait_time_seconds = 5 # in seconds
default_peek_lock_seconds = 60 # in seconds (default 60, max 300)
- default_uamqp_keep_alive_interval = 30 # in seconds (is the default from service bus repo)
- default_retry_total = 3 # number of retries (is the default from service bus repo)
- default_retry_backoff_factor = 0.8 # exponential backoff factor (is the default from service bus repo)
- default_retry_backoff_max = 120 # Max time to backoff (is the default from service bus repo)
+ # in seconds (is the default from service bus repo)
+ default_uamqp_keep_alive_interval = 30
+ # number of retries (is the default from service bus repo)
+ default_retry_total = 3
+ # exponential backoff factor (is the default from service bus repo)
+ default_retry_backoff_factor = 0.8
+ # Max time to backoff (is the default from service bus repo)
+ default_retry_backoff_max = 120
domain_format = 'kombu%(vhost)s'
_queue_service = None # type: ServiceBusClient
_queue_mgmt_service = None # type: ServiceBusAdministrationClient
@@ -115,7 +131,8 @@ class Channel(virtual.Channel):
self.qos.restore_at_shutdown = False
def _try_parse_connection_string(self) -> None:
- self._namespace, self._policy, self._sas_key = Transport.parse_uri(self.conninfo.hostname)
+ self._namespace, self._policy, self._sas_key = Transport.parse_uri(
+ self.conninfo.hostname)
# Convert
endpoint = 'sb://' + self._namespace
@@ -127,7 +144,8 @@ class Channel(virtual.Channel):
'SharedAccessKeyName': self._policy,
'SharedAccessKey': self._sas_key,
}
- self._connection_string = ';'.join([key + '=' + value for key, value in conn_dict.items()])
+ self._connection_string = ';'.join(
+ [key + '=' + value for key, value in conn_dict.items()])
def basic_consume(self, queue, no_ack, *args, **kwargs):
if no_ack:
@@ -142,10 +160,11 @@ class Channel(virtual.Channel):
self._noack_queues.discard(queue)
return super().basic_cancel(consumer_tag)
- def _add_queue_to_cache(self,
- name: str,
- receiver: Optional[ServiceBusReceiver] = None,
- sender: Optional[ServiceBusSender] = None) -> SendReceive:
+ def _add_queue_to_cache(
+ self, name: str,
+ receiver: Optional[ServiceBusReceiver] = None,
+ sender: Optional[ServiceBusSender] = None
+ ) -> SendReceive:
if name in self._queue_cache:
obj = self._queue_cache[name]
obj.sender = obj.sender or sender
@@ -158,22 +177,26 @@ class Channel(virtual.Channel):
def _get_asb_sender(self, queue: str) -> SendReceive:
queue_obj = self._queue_cache.get(queue, None)
if queue_obj is None or queue_obj.sender is None:
- sender = self.queue_service.get_queue_sender(queue, keep_alive=self.uamqp_keep_alive_interval)
+ sender = self.queue_service.get_queue_sender(
+ queue, keep_alive=self.uamqp_keep_alive_interval)
queue_obj = self._add_queue_to_cache(queue, sender=sender)
return queue_obj
- def _get_asb_receiver(self, queue: str,
- recv_mode: ServiceBusReceiveMode = ServiceBusReceiveMode.PEEK_LOCK,
- queue_cache_key: Optional[str] = None) -> SendReceive:
+ def _get_asb_receiver(
+ self, queue: str,
+ recv_mode: ServiceBusReceiveMode = ServiceBusReceiveMode.PEEK_LOCK,
+ queue_cache_key: Optional[str] = None) -> SendReceive:
cache_key = queue_cache_key or queue
queue_obj = self._queue_cache.get(cache_key, None)
if queue_obj is None or queue_obj.receiver is None:
- receiver = self.queue_service.get_queue_receiver(queue_name=queue, receive_mode=recv_mode,
- keep_alive=self.uamqp_keep_alive_interval)
+ receiver = self.queue_service.get_queue_receiver(
+ queue_name=queue, receive_mode=recv_mode,
+ keep_alive=self.uamqp_keep_alive_interval)
queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver)
return queue_obj
- def entity_name(self, name: str, table: Optional[Dict[int, int]] = None) -> str:
+ def entity_name(
+ self, name: str, table: Optional[Dict[int, int]] = None) -> str:
"""Format AMQP queue name into a valid ServiceBus queue name."""
return str(safe_str(name)).translate(table or CHARS_REPLACE_TABLE)
@@ -191,10 +214,13 @@ class Channel(virtual.Channel):
try:
return self._queue_cache[queue]
except KeyError:
- # Converts seconds into ISO8601 duration format ie 66seconds = P1M6S
- lock_duration = isodate.duration_isoformat(isodate.Duration(seconds=self.peek_lock_seconds))
+ # Converts seconds into ISO8601 duration format
+ # ie 66seconds = P1M6S
+ lock_duration = isodate.duration_isoformat(
+ isodate.Duration(seconds=self.peek_lock_seconds))
try:
- self.queue_mgmt_service.create_queue(queue_name=queue, lock_duration=lock_duration)
+ self.queue_mgmt_service.create_queue(
+ queue_name=queue, lock_duration=lock_duration)
except azure.core.exceptions.ResourceExistsError:
pass
return self._add_queue_to_cache(queue)
@@ -216,17 +242,21 @@ class Channel(virtual.Channel):
queue_obj = self._get_asb_sender(queue)
queue_obj.sender.send_messages(msg)
- def _get(self, queue: str, timeout: Optional[Union[float, int]] = None) -> Dict[str, Any]:
+ def _get(
+ self, queue: str,
+ timeout: Optional[Union[float, int]] = None
+ ) -> Dict[str, Any]:
"""Try to retrieve a single message off ``queue``."""
# If we're not ack'ing for this queue, just change receive_mode
- recv_mode = ServiceBusReceiveMode.RECEIVE_AND_DELETE if queue in self._noack_queues else \
- ServiceBusReceiveMode.PEEK_LOCK
+ recv_mode = ServiceBusReceiveMode.RECEIVE_AND_DELETE \
+ if queue in self._noack_queues else ServiceBusReceiveMode.PEEK_LOCK
queue = self.entity_name(self.queue_name_prefix + queue)
queue_obj = self._get_asb_receiver(queue, recv_mode)
- messages = queue_obj.receiver.receive_messages(max_message_count=1,
- max_wait_time=timeout or self.wait_time_seconds)
+ messages = queue_obj.receiver.receive_messages(
+ max_message_count=1,
+ max_wait_time=timeout or self.wait_time_seconds)
if not messages:
raise Empty()
@@ -251,7 +281,8 @@ class Channel(virtual.Channel):
return super().basic_ack(delivery_tag)
queue = delivery_info['azure_queue_name']
- queue_obj = self._get_asb_receiver(queue) # recv_mode is PEEK_LOCK when ack'ing messages
+ # recv_mode is PEEK_LOCK when ack'ing messages
+ queue_obj = self._get_asb_receiver(queue)
try:
queue_obj.receiver.complete_message(delivery_info['azure_message'])
@@ -278,12 +309,18 @@ class Channel(virtual.Channel):
# By default all the receivers will be in PEEK_LOCK receive mode
queue_obj = self._queue_cache.get(queue, None)
- if queue not in self._noack_queues or queue_obj is None or queue_obj.receiver is None:
- queue_obj = self._get_asb_receiver(queue, ServiceBusReceiveMode.RECEIVE_AND_DELETE, 'purge_' + queue)
+ if queue not in self._noack_queues or \
+ queue_obj is None or queue_obj.receiver is None:
+ queue_obj = self._get_asb_receiver(
+ queue,
+ ServiceBusReceiveMode.RECEIVE_AND_DELETE, 'purge_' + queue
+ )
while True:
- messages = queue_obj.receiver.receive_messages(max_message_count=max_purge_count,
- max_wait_time=0.2)
+ messages = queue_obj.receiver.receive_messages(
+ max_message_count=max_purge_count,
+ max_wait_time=0.2
+ )
n += len(messages)
if len(messages) < max_purge_count:
@@ -310,13 +347,15 @@ class Channel(virtual.Channel):
retry_total=self.retry_total,
retry_backoff_factor=self.retry_backoff_factor,
retry_backoff_max=self.retry_backoff_max
- )
+ )
return self._queue_service
@property
def queue_mgmt_service(self) -> ServiceBusAdministrationClient:
if self._queue_mgmt_service is None:
- self._queue_mgmt_service = ServiceBusAdministrationClient.from_connection_string(self._connection_string)
+ self._queue_mgmt_service = \
+ ServiceBusAdministrationClient.from_connection_string(
+ self._connection_string)
return self._queue_mgmt_service
@property
@@ -344,19 +383,25 @@ class Channel(virtual.Channel):
@cached_property
def uamqp_keep_alive_interval(self) -> int:
- return self.transport_options.get('uamqp_keep_alive_interval', self.default_uamqp_keep_alive_interval)
+ return self.transport_options.get(
+ 'uamqp_keep_alive_interval',
+ self.default_uamqp_keep_alive_interval
+ )
@cached_property
def retry_total(self) -> int:
- return self.transport_options.get('retry_total', self.default_retry_total)
+ return self.transport_options.get(
+ 'retry_total', self.default_retry_total)
@cached_property
def retry_backoff_factor(self) -> float:
- return self.transport_options.get('retry_backoff_factor', self.default_retry_backoff_factor)
+ return self.transport_options.get(
+ 'retry_backoff_factor', self.default_retry_backoff_factor)
@cached_property
def retry_backoff_max(self) -> int:
- return self.transport_options.get('retry_backoff_max', self.default_retry_backoff_max)
+ return self.transport_options.get(
+ 'retry_backoff_max', self.default_retry_backoff_max)
class Transport(virtual.Transport):
@@ -370,17 +415,25 @@ class Transport(virtual.Transport):
@staticmethod
def parse_uri(uri: str) -> Tuple[str, str, str]:
- # URL like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace}
+ # URL like:
+ # azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace}
# urllib parse does not work as the sas key could contain a slash
- # e.g. azureservicebus://rootpolicy:some/key@somenamespace
- uri = uri.replace('azureservicebus://', '') # > 'rootpolicy:some/key@somenamespace'
- policykeypair, namespace = uri.rsplit('@', 1) # > 'rootpolicy:some/key', 'somenamespace'
- policy, sas_key = policykeypair.split(':', 1) # > 'rootpolicy', 'some/key'
+ # e.g.: azureservicebus://rootpolicy:some/key@somenamespace
+
+ # > 'rootpolicy:some/key@somenamespace'
+ uri = uri.replace('azureservicebus://', '')
+ # > 'rootpolicy:some/key', 'somenamespace'
+ policykeypair, namespace = uri.rsplit('@', 1)
+ # > 'rootpolicy', 'some/key'
+ policy, sas_key = policykeypair.split(':', 1)
# Validate ASB connection string
if not all([namespace, policy, sas_key]):
- raise ValueError('Need a URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} '
- 'or the azure Endpoint connection string')
+ raise ValueError(
+ 'Need a URI like '
+ 'azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} ' # noqa
+ 'or the azure Endpoint connection string'
+ )
return namespace, policy, sas_key
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 86a2b2eb..3508949e 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -235,9 +235,7 @@ class Transport:
reader(loop)
def as_uri(self, uri: str, include_password=False, mask='**') -> str:
- """
- Customise the display format of the URI
- """
+ """Customise the display format of the URI."""
raise NotImplementedError()
@property
diff --git a/kombu/utils/objects.py b/kombu/utils/objects.py
index faddb932..fba8ed0b 100644
--- a/kombu/utils/objects.py
+++ b/kombu/utils/objects.py
@@ -12,6 +12,8 @@ except ImportError:
class cached_property(_cached_property):
+ """Implementation of Cached property."""
+
def __init__(self, fget=None, fset=None, fdel=None):
super().__init__(fget)
self.__set = fset