summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTerry Cain <terry@terrys-home.co.uk>2021-01-05 19:13:08 +0000
committerAsif Saif Uddin <auvipy@gmail.com>2021-01-06 21:57:40 +0600
commit90c6ac706e30b4082885b3ce22c56fa24996b645 (patch)
treec9e59b0b03a33150b693625e989bc9ed5e045414
parent43190b293cd3de4c24ab5be4d74c25a20eb2f235 (diff)
downloadkombu-90c6ac706e30b4082885b3ce22c56fa24996b645.tar.gz
Fix issues with parsing URL with an extra / in it.
-rw-r--r--kombu/connection.py7
-rw-r--r--kombu/transport/azureservicebus.py89
-rw-r--r--kombu/transport/base.py6
-rw-r--r--t/unit/transport/test_azureservicebus.py2
4 files changed, 86 insertions, 18 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index 7b4bd097..94c43070 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -661,11 +661,16 @@ class Connection:
def as_uri(self, include_password=False, mask='**',
getfields=itemgetter('port', 'userid', 'password',
- 'virtual_host', 'transport')):
+ 'virtual_host', 'transport')) -> str:
"""Convert connection parameters to URL form."""
hostname = self.hostname or 'localhost'
if self.transport.can_parse_url:
connection_as_uri = self.hostname
+ try:
+ return self.transport.as_uri(connection_as_uri, include_password, mask)
+ except NotImplementedError:
+ pass
+
if self.uri_prefix:
connection_as_uri = f'{self.uri_prefix}+{hostname}'
if not include_password:
diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py
index b117ef8b..74a5cd17 100644
--- a/kombu/transport/azureservicebus.py
+++ b/kombu/transport/azureservicebus.py
@@ -4,6 +4,15 @@ Note that the Shared Access Policy used to connect to Azure Service Bus
requires Manage, Send and Listen claims since the broker will create new
queues and delete old queues as required.
+
+Notes when using with Celery if you are experiencing issues with programs not
+terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library which in
+turn creates some threads. If the AzureServiceBus Channel is closed, said threads will
+be closed properly, but it seems there are times when Celery does not do this so these
+threads will be left running. As the uAMQP threads are not marked as Daemon threads, they
+will not be killed when the main thread exits. Setting the ``uamqp_keep_alive_interval``
+transport option to 0 will prevent the keep_alive thread from starting
+
More information about Azure Service Bus:
https://azure.microsoft.com/en-us/services/service-bus/
@@ -28,15 +37,20 @@ Connection string has the following format:
Transport Options
=================
-* ``queue_name_prefix`` - String prefix to prepend to queue names in a service bus namespace
+* ``queue_name_prefix`` - String prefix to prepend to queue names in a service bus namespace.
* ``wait_time_seconds`` - Number of seconds to wait to receive messages. Default ``5``
* ``peek_lock_seconds`` - Number of seconds the message is visible for before it is requeued
and sent to another consumer. Default ``60``
+* ``uamqp_keep_alive_interval`` - Interval in seconds the Azure uAMQP library should send keepalive messages.
+Default ``30``
+* ``retry_total`` - Azure SDK retry total. Default ``3``
+* ``retry_backoff_factor`` - Azure SDK exponential backoff factor. Default ``0.8``
+* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120``
"""
import string
from queue import Empty
-from typing import Dict, Any, Optional, Union, Set
+from typing import Dict, Any, Optional, Union, Set, Tuple
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
@@ -76,6 +90,10 @@ class Channel(virtual.Channel):
default_wait_time_seconds = 5 # in seconds
default_peek_lock_seconds = 60 # in seconds (default 60, max 300)
+ default_uamqp_keep_alive_interval = 30 # in seconds (is the default from service bus repo)
+ default_retry_total = 3 # number of retries (is the default from service bus repo)
+ default_retry_backoff_factor = 0.8 # exponential backoff factor (is the default from service bus repo)
+ default_retry_backoff_max = 120 # Max time to backoff (is the default from service bus repo)
domain_format = 'kombu%(vhost)s'
_queue_service = None # type: ServiceBusClient
_queue_mgmt_service = None # type: ServiceBusAdministrationClient
@@ -95,16 +113,7 @@ class Channel(virtual.Channel):
self.qos.restore_at_shutdown = False
def _try_parse_connection_string(self) -> None:
- # URL like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace}
- # urllib parse does not work as the sas key could contain a slash
- # e.g. azureservicebus://rootpolicy:some/key@somenamespace
- uri = self.conninfo.hostname.replace('azureservicebus://', '') # > 'rootpolicy:some/key@somenamespace'
- policykeypair, self._namespace = uri.rsplit('@', 1) # > 'rootpolicy:some/key', 'somenamespace'
- self._policy, self._sas_key = policykeypair.split(':', 1) # > 'rootpolicy', 'some/key'
-
- # Validate ASB connection string
- if not all([self._namespace, self._policy, self._sas_key]):
- raise ValueError('Need an URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace}')
+ self._namespace, self._policy, self._sas_key = Transport.parse_uri(self.conninfo.hostname)
# Convert
endpoint = 'sb://' + self._namespace
@@ -147,7 +156,7 @@ class Channel(virtual.Channel):
def _get_asb_sender(self, queue: str) -> SendReceive:
queue_obj = self._queue_cache.get(queue, None)
if queue_obj is None or queue_obj.sender is None:
- sender = self.queue_service.get_queue_sender(queue)
+ sender = self.queue_service.get_queue_sender(queue, keep_alive=self.uamqp_keep_alive_interval)
queue_obj = self._add_queue_to_cache(queue, sender=sender)
return queue_obj
@@ -157,7 +166,8 @@ class Channel(virtual.Channel):
cache_key = queue_cache_key or queue
queue_obj = self._queue_cache.get(cache_key, None)
if queue_obj is None or queue_obj.receiver is None:
- receiver = self.queue_service.get_queue_receiver(queue_name=queue, receive_mode=recv_mode)
+ receiver = self.queue_service.get_queue_receiver(queue_name=queue, receive_mode=recv_mode,
+ keep_alive=self.uamqp_keep_alive_interval)
queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver)
return queue_obj
@@ -228,6 +238,7 @@ class Channel(virtual.Channel):
msg = loads(bytes_to_str(body))
msg['properties']['delivery_info']['azure_message'] = message
+ msg['properties']['delivery_info']['azure_queue_name'] = queue
return msg
@@ -237,7 +248,7 @@ class Channel(virtual.Channel):
if delivery_info['exchange'] in self._noack_queues:
return super().basic_ack(delivery_tag)
- queue = self.entity_name(self.queue_name_prefix + delivery_info['exchange'])
+ queue = delivery_info['azure_queue_name']
queue_obj = self._get_asb_receiver(queue) # recv_mode is PEEK_LOCK when ack'ing messages
try:
@@ -292,7 +303,12 @@ class Channel(virtual.Channel):
@property
def queue_service(self) -> ServiceBusClient:
if self._queue_service is None:
- self._queue_service = ServiceBusClient.from_connection_string(self._connection_string)
+ self._queue_service = ServiceBusClient.from_connection_string(
+ self._connection_string,
+ retry_total=self.retry_total,
+ retry_backoff_factor=self.retry_backoff_factor,
+ retry_backoff_max=self.retry_backoff_max
+ )
return self._queue_service
@property
@@ -324,6 +340,22 @@ class Channel(virtual.Channel):
self.default_peek_lock_seconds),
300) # Limit upper bounds to 300
+ @cached_property
+ def uamqp_keep_alive_interval(self) -> int:
+ return self.transport_options.get('uamqp_keep_alive_interval', self.default_uamqp_keep_alive_interval)
+
+ @cached_property
+ def retry_total(self) -> int:
+ return self.transport_options.get('retry_total', self.default_retry_total)
+
+ @cached_property
+ def retry_backoff_factor(self) -> float:
+ return self.transport_options.get('retry_backoff_factor', self.default_retry_backoff_factor)
+
+ @cached_property
+ def retry_backoff_max(self) -> int:
+ return self.transport_options.get('retry_backoff_max', self.default_retry_backoff_max)
+
class Transport(virtual.Transport):
"""Azure Service Bus transport."""
@@ -333,3 +365,28 @@ class Transport(virtual.Transport):
polling_interval = 1
default_port = None
can_parse_url = True
+
+ @staticmethod
+ def parse_uri(uri: str) -> Tuple[str, str, str]:
+ # URL like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace}
+ # urllib parse does not work as the sas key could contain a slash
+ # e.g. azureservicebus://rootpolicy:some/key@somenamespace
+ uri = uri.replace('azureservicebus://', '') # > 'rootpolicy:some/key@somenamespace'
+ policykeypair, namespace = uri.rsplit('@', 1) # > 'rootpolicy:some/key', 'somenamespace'
+ policy, sas_key = policykeypair.split(':', 1) # > 'rootpolicy', 'some/key'
+
+ # Validate ASB connection string
+ if not all([namespace, policy, sas_key]):
+ raise ValueError('Need a URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} '
+ 'or the azure Endpoint connection string')
+
+ return namespace, policy, sas_key
+
+ @classmethod
+ def as_uri(cls, uri: str, include_password=False, mask='**') -> str:
+ namespace, policy, sas_key = cls.parse_uri(uri)
+ return 'azureservicebus://{0}:{1}@{2}'.format(
+ policy,
+ sas_key if include_password else mask,
+ namespace
+ )
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 1b7fa5b0..86a2b2eb 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -234,6 +234,12 @@ class Transport:
reader = self.__reader = self._make_reader(connection)
reader(loop)
+ def as_uri(self, uri: str, include_password=False, mask='**') -> str:
+ """
+ Customise the display format of the URI
+ """
+ raise NotImplementedError()
+
@property
def default_connection_params(self):
return {}
diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py
index 08069f8b..909592ec 100644
--- a/t/unit/transport/test_azureservicebus.py
+++ b/t/unit/transport/test_azureservicebus.py
@@ -68,7 +68,7 @@ class ASBMock:
def get_queue_receiver(self, queue_name, **kwargs):
return self.queues[queue_name].get_receiver(kwargs)
- def get_queue_sender(self, queue_name):
+ def get_queue_sender(self, queue_name, **kwargs):
return self.queues[queue_name].get_sender()