summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Barnett <jason.w.barnett@gmail.com>2023-01-02 14:09:15 -0500
committerAsif Saif Uddin <auvipy@gmail.com>2023-01-05 00:34:46 +0600
commitd3ca0d562e7c3ee9f8b0cfffb3bb798b98c28a84 (patch)
treec0af9ba8e6a5dc16ae05a979c68382579af1f26b
parent54cd277bc34780a5fdb2f4e1883ffeff1dce1ce4 (diff)
downloadkombu-d3ca0d562e7c3ee9f8b0cfffb3bb798b98c28a84.tar.gz
azure service bus: add type annotations and use cached property
-rw-r--r--kombu/transport/azureservicebus.py53
-rw-r--r--t/unit/transport/test_azureservicebus.py25
2 files changed, 46 insertions, 32 deletions
diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py
index a2fbd662..e7e2c0cc 100644
--- a/kombu/transport/azureservicebus.py
+++ b/kombu/transport/azureservicebus.py
@@ -57,7 +57,7 @@ from __future__ import annotations
import string
from queue import Empty
-from typing import Any
+from typing import Any, Dict, Set
import azure.core.exceptions
import azure.servicebus.exceptions
@@ -87,8 +87,8 @@ class SendReceive:
def __init__(self,
receiver: ServiceBusReceiver | None = None,
sender: ServiceBusSender | None = None):
- self.receiver = receiver # type: ServiceBusReceiver
- self.sender = sender # type: ServiceBusSender
+ self.receiver: ServiceBusReceiver = receiver
+ self.sender: ServiceBusSender = sender
def close(self) -> None:
if self.receiver:
@@ -102,21 +102,19 @@ class SendReceive:
class Channel(virtual.Channel):
"""Azure Service Bus channel."""
- default_wait_time_seconds = 5 # in seconds
- default_peek_lock_seconds = 60 # in seconds (default 60, max 300)
+ default_wait_time_seconds: int = 5 # in seconds
+ default_peek_lock_seconds: int = 60 # in seconds (default 60, max 300)
# in seconds (is the default from service bus repo)
- default_uamqp_keep_alive_interval = 30
+ default_uamqp_keep_alive_interval: int = 30
# number of retries (is the default from service bus repo)
- default_retry_total = 3
+ default_retry_total: int = 3
# exponential backoff factor (is the default from service bus repo)
- default_retry_backoff_factor = 0.8
+ default_retry_backoff_factor: float = 0.8
# Max time to backoff (is the default from service bus repo)
- default_retry_backoff_max = 120
- domain_format = 'kombu%(vhost)s'
- _queue_service = None # type: ServiceBusClient
- _queue_mgmt_service = None # type: ServiceBusAdministrationClient
- _queue_cache = {} # type: Dict[str, SendReceive]
- _noack_queues = set() # type: Set[str]
+ default_retry_backoff_max: int = 120
+ domain_format: str = 'kombu%(vhost)s'
+ _queue_cache: Dict[str, SendReceive] = {}
+ _noack_queues: Set[str] = set()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -229,7 +227,7 @@ class Channel(virtual.Channel):
"""Delete queue by name."""
queue = self.entity_name(self.queue_name_prefix + queue)
- self._queue_mgmt_service.delete_queue(queue)
+ self.queue_mgmt_service.delete_queue(queue)
send_receive_obj = self._queue_cache.pop(queue, None)
if send_receive_obj:
send_receive_obj.close()
@@ -300,7 +298,7 @@ class Channel(virtual.Channel):
return props.total_message_count
- def _purge(self, queue):
+ def _purge(self, queue) -> int:
"""Delete all current messages in a queue."""
# Azure doesn't provide a purge api yet
n = 0
@@ -339,24 +337,19 @@ class Channel(virtual.Channel):
if self.connection is not None:
self.connection.close_channel(self)
- @property
+ @cached_property
def queue_service(self) -> ServiceBusClient:
- if self._queue_service is None:
- self._queue_service = ServiceBusClient.from_connection_string(
- self._connection_string,
- retry_total=self.retry_total,
- retry_backoff_factor=self.retry_backoff_factor,
- retry_backoff_max=self.retry_backoff_max
- )
- return self._queue_service
+ return ServiceBusClient.from_connection_string(
+ self._connection_string,
+ retry_total=self.retry_total,
+ retry_backoff_factor=self.retry_backoff_factor,
+ retry_backoff_max=self.retry_backoff_max
+ )
- @property
+ @cached_property
def queue_mgmt_service(self) -> ServiceBusAdministrationClient:
- if self._queue_mgmt_service is None:
- self._queue_mgmt_service = \
- ServiceBusAdministrationClient.from_connection_string(
+ return ServiceBusAdministrationClient.from_connection_string(
self._connection_string)
- return self._queue_mgmt_service
@property
def conninfo(self):
diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py
index f2280fe4..5de93c2f 100644
--- a/t/unit/transport/test_azureservicebus.py
+++ b/t/unit/transport/test_azureservicebus.py
@@ -203,14 +203,35 @@ MockQueue = namedtuple(
)
+@pytest.fixture(autouse=True)
+def sbac_class_patch():
+ with patch('kombu.transport.azureservicebus.ServiceBusAdministrationClient') as sbac: # noqa
+ yield sbac
+
+
+@pytest.fixture(autouse=True)
+def sbc_class_patch():
+ with patch('kombu.transport.azureservicebus.ServiceBusClient') as sbc: # noqa
+ yield sbc
+
+
+@pytest.fixture(autouse=True)
+def mock_clients(
+ sbc_class_patch,
+ sbac_class_patch,
+ mock_asb,
+ mock_asb_management
+):
+ sbc_class_patch.from_connection_string.return_value = mock_asb
+ sbac_class_patch.from_connection_string.return_value = mock_asb_management
+
+
@pytest.fixture
def mock_queue(mock_asb, mock_asb_management, random_queue) -> MockQueue:
exchange = Exchange('test_servicebus', type='direct')
queue = Queue(random_queue, exchange, random_queue)
conn = Connection(URL_CREDS, transport=azureservicebus.Transport)
channel = conn.channel()
- channel._queue_service = mock_asb
- channel._queue_mgmt_service = mock_asb_management
queue(channel).declare()
producer = messaging.Producer(channel, exchange, routing_key=random_queue)