diff options
-rw-r--r-- | kombu/connection.py | 7 | ||||
-rw-r--r-- | kombu/transport/azureservicebus.py | 89 | ||||
-rw-r--r-- | kombu/transport/base.py | 6 | ||||
-rw-r--r-- | t/unit/transport/test_azureservicebus.py | 2 |
4 files changed, 86 insertions, 18 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index 7b4bd097..94c43070 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -661,11 +661,16 @@ class Connection: def as_uri(self, include_password=False, mask='**', getfields=itemgetter('port', 'userid', 'password', - 'virtual_host', 'transport')): + 'virtual_host', 'transport')) -> str: """Convert connection parameters to URL form.""" hostname = self.hostname or 'localhost' if self.transport.can_parse_url: connection_as_uri = self.hostname + try: + return self.transport.as_uri(connection_as_uri, include_password, mask) + except NotImplementedError: + pass + if self.uri_prefix: connection_as_uri = f'{self.uri_prefix}+{hostname}' if not include_password: diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index b117ef8b..74a5cd17 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -4,6 +4,15 @@ Note that the Shared Access Policy used to connect to Azure Service Bus requires Manage, Send and Listen claims since the broker will create new queues and delete old queues as required. + +Notes when using with Celery if you are experiencing issues with programs not +terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library which in +turn creates some threads. If the AzureServiceBus Channel is closed, said threads will +be closed properly, but it seems there are times when Celery does not do this so these +threads will be left running. As the uAMQP threads are not marked as Daemon threads, they +will not be killed when the main thread exits. Setting the ``uamqp_keep_alive_interval`` +transport option to 0 will prevent the keep_alive thread from starting + More information about Azure Service Bus: https://azure.microsoft.com/en-us/services/service-bus/ @@ -28,15 +37,20 @@ Connection string has the following format: Transport Options ================= -* ``queue_name_prefix`` - String prefix to prepend to queue names in a service bus namespace +* ``queue_name_prefix`` - String prefix to prepend to queue names in a service bus namespace. * ``wait_time_seconds`` - Number of seconds to wait to receive messages. Default ``5`` * ``peek_lock_seconds`` - Number of seconds the message is visible for before it is requeued and sent to another consumer. Default ``60`` +* ``uamqp_keep_alive_interval`` - Interval in seconds the Azure uAMQP library should send keepalive messages. +Default ``30`` +* ``retry_total`` - Azure SDK retry total. Default ``3`` +* ``retry_backoff_factor`` - Azure SDK exponential backoff factor. Default ``0.8`` +* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120`` """ import string from queue import Empty -from typing import Dict, Any, Optional, Union, Set +from typing import Dict, Any, Optional, Union, Set, Tuple from kombu.utils.encoding import bytes_to_str, safe_str from kombu.utils.json import loads, dumps @@ -76,6 +90,10 @@ class Channel(virtual.Channel): default_wait_time_seconds = 5 # in seconds default_peek_lock_seconds = 60 # in seconds (default 60, max 300) + default_uamqp_keep_alive_interval = 30 # in seconds (is the default from service bus repo) + default_retry_total = 3 # number of retries (is the default from service bus repo) + default_retry_backoff_factor = 0.8 # exponential backoff factor (is the default from service bus repo) + default_retry_backoff_max = 120 # Max time to backoff (is the default from service bus repo) domain_format = 'kombu%(vhost)s' _queue_service = None # type: ServiceBusClient _queue_mgmt_service = None # type: ServiceBusAdministrationClient @@ -95,16 +113,7 @@ class Channel(virtual.Channel): self.qos.restore_at_shutdown = False def _try_parse_connection_string(self) -> None: - # URL like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} - # urllib parse does not work as the sas key could contain a slash - # e.g. azureservicebus://rootpolicy:some/key@somenamespace - uri = self.conninfo.hostname.replace('azureservicebus://', '') # > 'rootpolicy:some/key@somenamespace' - policykeypair, self._namespace = uri.rsplit('@', 1) # > 'rootpolicy:some/key', 'somenamespace' - self._policy, self._sas_key = policykeypair.split(':', 1) # > 'rootpolicy', 'some/key' - - # Validate ASB connection string - if not all([self._namespace, self._policy, self._sas_key]): - raise ValueError('Need an URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace}') + self._namespace, self._policy, self._sas_key = Transport.parse_uri(self.conninfo.hostname) # Convert endpoint = 'sb://' + self._namespace @@ -147,7 +156,7 @@ class Channel(virtual.Channel): def _get_asb_sender(self, queue: str) -> SendReceive: queue_obj = self._queue_cache.get(queue, None) if queue_obj is None or queue_obj.sender is None: - sender = self.queue_service.get_queue_sender(queue) + sender = self.queue_service.get_queue_sender(queue, keep_alive=self.uamqp_keep_alive_interval) queue_obj = self._add_queue_to_cache(queue, sender=sender) return queue_obj @@ -157,7 +166,8 @@ class Channel(virtual.Channel): cache_key = queue_cache_key or queue queue_obj = self._queue_cache.get(cache_key, None) if queue_obj is None or queue_obj.receiver is None: - receiver = self.queue_service.get_queue_receiver(queue_name=queue, receive_mode=recv_mode) + receiver = self.queue_service.get_queue_receiver(queue_name=queue, receive_mode=recv_mode, + keep_alive=self.uamqp_keep_alive_interval) queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver) return queue_obj @@ -228,6 +238,7 @@ class Channel(virtual.Channel): msg = loads(bytes_to_str(body)) msg['properties']['delivery_info']['azure_message'] = message + msg['properties']['delivery_info']['azure_queue_name'] = queue return msg @@ -237,7 +248,7 @@ class Channel(virtual.Channel): if delivery_info['exchange'] in self._noack_queues: return super().basic_ack(delivery_tag) - queue = self.entity_name(self.queue_name_prefix + delivery_info['exchange']) + queue = delivery_info['azure_queue_name'] queue_obj = self._get_asb_receiver(queue) # recv_mode is PEEK_LOCK when ack'ing messages try: @@ -292,7 +303,12 @@ class Channel(virtual.Channel): @property def queue_service(self) -> ServiceBusClient: if self._queue_service is None: - self._queue_service = ServiceBusClient.from_connection_string(self._connection_string) + self._queue_service = ServiceBusClient.from_connection_string( + self._connection_string, + retry_total=self.retry_total, + retry_backoff_factor=self.retry_backoff_factor, + retry_backoff_max=self.retry_backoff_max + ) return self._queue_service @property @@ -324,6 +340,22 @@ class Channel(virtual.Channel): self.default_peek_lock_seconds), 300) # Limit upper bounds to 300 + @cached_property + def uamqp_keep_alive_interval(self) -> int: + return self.transport_options.get('uamqp_keep_alive_interval', self.default_uamqp_keep_alive_interval) + + @cached_property + def retry_total(self) -> int: + return self.transport_options.get('retry_total', self.default_retry_total) + + @cached_property + def retry_backoff_factor(self) -> float: + return self.transport_options.get('retry_backoff_factor', self.default_retry_backoff_factor) + + @cached_property + def retry_backoff_max(self) -> int: + return self.transport_options.get('retry_backoff_max', self.default_retry_backoff_max) + class Transport(virtual.Transport): """Azure Service Bus transport.""" @@ -333,3 +365,28 @@ class Transport(virtual.Transport): polling_interval = 1 default_port = None can_parse_url = True + + @staticmethod + def parse_uri(uri: str) -> Tuple[str, str, str]: + # URL like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} + # urllib parse does not work as the sas key could contain a slash + # e.g. azureservicebus://rootpolicy:some/key@somenamespace + uri = uri.replace('azureservicebus://', '') # > 'rootpolicy:some/key@somenamespace' + policykeypair, namespace = uri.rsplit('@', 1) # > 'rootpolicy:some/key', 'somenamespace' + policy, sas_key = policykeypair.split(':', 1) # > 'rootpolicy', 'some/key' + + # Validate ASB connection string + if not all([namespace, policy, sas_key]): + raise ValueError('Need a URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} ' + 'or the azure Endpoint connection string') + + return namespace, policy, sas_key + + @classmethod + def as_uri(cls, uri: str, include_password=False, mask='**') -> str: + namespace, policy, sas_key = cls.parse_uri(uri) + return 'azureservicebus://{0}:{1}@{2}'.format( + policy, + sas_key if include_password else mask, + namespace + ) diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 1b7fa5b0..86a2b2eb 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -234,6 +234,12 @@ class Transport: reader = self.__reader = self._make_reader(connection) reader(loop) + def as_uri(self, uri: str, include_password=False, mask='**') -> str: + """ + Customise the display format of the URI + """ + raise NotImplementedError() + @property def default_connection_params(self): return {} diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index 08069f8b..909592ec 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -68,7 +68,7 @@ class ASBMock: def get_queue_receiver(self, queue_name, **kwargs): return self.queues[queue_name].get_receiver(kwargs) - def get_queue_sender(self, queue_name): + def get_queue_sender(self, queue_name, **kwargs): return self.queues[queue_name].get_sender() |