diff options
author | Jason Barnett <jason.w.barnett@gmail.com> | 2023-01-02 14:09:15 -0500 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2023-01-05 00:34:46 +0600 |
commit | d3ca0d562e7c3ee9f8b0cfffb3bb798b98c28a84 (patch) | |
tree | c0af9ba8e6a5dc16ae05a979c68382579af1f26b | |
parent | 54cd277bc34780a5fdb2f4e1883ffeff1dce1ce4 (diff) | |
download | kombu-d3ca0d562e7c3ee9f8b0cfffb3bb798b98c28a84.tar.gz |
azure service bus: add type annotations and use cached property
-rw-r--r-- | kombu/transport/azureservicebus.py | 53 | ||||
-rw-r--r-- | t/unit/transport/test_azureservicebus.py | 25 |
2 files changed, 46 insertions, 32 deletions
diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index a2fbd662..e7e2c0cc 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -57,7 +57,7 @@ from __future__ import annotations import string from queue import Empty -from typing import Any +from typing import Any, Dict, Set import azure.core.exceptions import azure.servicebus.exceptions @@ -87,8 +87,8 @@ class SendReceive: def __init__(self, receiver: ServiceBusReceiver | None = None, sender: ServiceBusSender | None = None): - self.receiver = receiver # type: ServiceBusReceiver - self.sender = sender # type: ServiceBusSender + self.receiver: ServiceBusReceiver = receiver + self.sender: ServiceBusSender = sender def close(self) -> None: if self.receiver: @@ -102,21 +102,19 @@ class SendReceive: class Channel(virtual.Channel): """Azure Service Bus channel.""" - default_wait_time_seconds = 5 # in seconds - default_peek_lock_seconds = 60 # in seconds (default 60, max 300) + default_wait_time_seconds: int = 5 # in seconds + default_peek_lock_seconds: int = 60 # in seconds (default 60, max 300) # in seconds (is the default from service bus repo) - default_uamqp_keep_alive_interval = 30 + default_uamqp_keep_alive_interval: int = 30 # number of retries (is the default from service bus repo) - default_retry_total = 3 + default_retry_total: int = 3 # exponential backoff factor (is the default from service bus repo) - default_retry_backoff_factor = 0.8 + default_retry_backoff_factor: float = 0.8 # Max time to backoff (is the default from service bus repo) - default_retry_backoff_max = 120 - domain_format = 'kombu%(vhost)s' - _queue_service = None # type: ServiceBusClient - _queue_mgmt_service = None # type: ServiceBusAdministrationClient - _queue_cache = {} # type: Dict[str, SendReceive] - _noack_queues = set() # type: Set[str] + default_retry_backoff_max: int = 120 + domain_format: str = 'kombu%(vhost)s' + _queue_cache: Dict[str, SendReceive] = {} + _noack_queues: Set[str] = set() def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -229,7 +227,7 @@ class Channel(virtual.Channel): """Delete queue by name.""" queue = self.entity_name(self.queue_name_prefix + queue) - self._queue_mgmt_service.delete_queue(queue) + self.queue_mgmt_service.delete_queue(queue) send_receive_obj = self._queue_cache.pop(queue, None) if send_receive_obj: send_receive_obj.close() @@ -300,7 +298,7 @@ class Channel(virtual.Channel): return props.total_message_count - def _purge(self, queue): + def _purge(self, queue) -> int: """Delete all current messages in a queue.""" # Azure doesn't provide a purge api yet n = 0 @@ -339,24 +337,19 @@ class Channel(virtual.Channel): if self.connection is not None: self.connection.close_channel(self) - @property + @cached_property def queue_service(self) -> ServiceBusClient: - if self._queue_service is None: - self._queue_service = ServiceBusClient.from_connection_string( - self._connection_string, - retry_total=self.retry_total, - retry_backoff_factor=self.retry_backoff_factor, - retry_backoff_max=self.retry_backoff_max - ) - return self._queue_service + return ServiceBusClient.from_connection_string( + self._connection_string, + retry_total=self.retry_total, + retry_backoff_factor=self.retry_backoff_factor, + retry_backoff_max=self.retry_backoff_max + ) - @property + @cached_property def queue_mgmt_service(self) -> ServiceBusAdministrationClient: - if self._queue_mgmt_service is None: - self._queue_mgmt_service = \ - ServiceBusAdministrationClient.from_connection_string( + return ServiceBusAdministrationClient.from_connection_string( self._connection_string) - return self._queue_mgmt_service @property def conninfo(self): diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index f2280fe4..5de93c2f 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -203,14 +203,35 @@ MockQueue = namedtuple( ) +@pytest.fixture(autouse=True) +def sbac_class_patch(): + with patch('kombu.transport.azureservicebus.ServiceBusAdministrationClient') as sbac: # noqa + yield sbac + + +@pytest.fixture(autouse=True) +def sbc_class_patch(): + with patch('kombu.transport.azureservicebus.ServiceBusClient') as sbc: # noqa + yield sbc + + +@pytest.fixture(autouse=True) +def mock_clients( + sbc_class_patch, + sbac_class_patch, + mock_asb, + mock_asb_management +): + sbc_class_patch.from_connection_string.return_value = mock_asb + sbac_class_patch.from_connection_string.return_value = mock_asb_management + + @pytest.fixture def mock_queue(mock_asb, mock_asb_management, random_queue) -> MockQueue: exchange = Exchange('test_servicebus', type='direct') queue = Queue(random_queue, exchange, random_queue) conn = Connection(URL_CREDS, transport=azureservicebus.Transport) channel = conn.channel() - channel._queue_service = mock_asb - channel._queue_mgmt_service = mock_asb_management queue(channel).declare() producer = messaging.Producer(channel, exchange, routing_key=random_queue) |