summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Barnett <jason.w.barnett@gmail.com>2022-12-17 17:48:40 -0700
committerAsif Saif Uddin <auvipy@gmail.com>2023-01-03 13:54:25 +0600
commit54cd277bc34780a5fdb2f4e1883ffeff1dce1ce4 (patch)
tree334245ac2c471f1e8a6de3c0757e89663fd02b6e
parentbdd0672a5830dfdd368d04880657015a71f685f1 (diff)
downloadkombu-54cd277bc34780a5fdb2f4e1883ffeff1dce1ce4.tar.gz
add type annotations to kombu/transport/azurestoragequeues
-rw-r--r--kombu/transport/azurestoragequeues.py32
1 files changed, 17 insertions, 15 deletions
diff --git a/kombu/transport/azurestoragequeues.py b/kombu/transport/azurestoragequeues.py
index 99a06692..16d22f0b 100644
--- a/kombu/transport/azurestoragequeues.py
+++ b/kombu/transport/azurestoragequeues.py
@@ -54,6 +54,7 @@ from __future__ import annotations
import string
from queue import Empty
+from typing import Any, Optional
from azure.core.exceptions import ResourceExistsError
@@ -85,11 +86,11 @@ CHARS_REPLACE_TABLE = {
class Channel(virtual.Channel):
"""Azure Storage Queues channel."""
- domain_format = 'kombu%(vhost)s'
- _queue_service = None
- _queue_name_cache = {}
- no_ack = True
- _noack_queues = set()
+ domain_format: str = 'kombu%(vhost)s'
+ _queue_service: Optional[QueueServiceClient] = None
+ _queue_name_cache: dict[Any, Any] = {}
+ no_ack: bool = True
+ _noack_queues: set[Any] = set()
def __init__(self, *args, **kwargs):
if QueueServiceClient is None:
@@ -112,7 +113,7 @@ class Channel(virtual.Channel):
return super().basic_consume(queue, no_ack,
*args, **kwargs)
- def entity_name(self, name, table=CHARS_REPLACE_TABLE):
+ def entity_name(self, name, table=CHARS_REPLACE_TABLE) -> str:
"""Format AMQP queue name into a valid Azure Storage Queue name."""
return str(safe_str(name)).translate(table)
@@ -173,7 +174,7 @@ class Channel(virtual.Channel):
return n
@property
- def queue_service(self):
+ def queue_service(self) -> QueueServiceClient:
if self._queue_service is None:
self._queue_service = QueueServiceClient(
account_url=self._url, credential=self._credential
@@ -190,7 +191,7 @@ class Channel(virtual.Channel):
return self.connection.client.transport_options
@cached_property
- def queue_name_prefix(self):
+ def queue_name_prefix(self) -> str:
return self.transport_options.get('queue_name_prefix', '')
@@ -199,9 +200,9 @@ class Transport(virtual.Transport):
Channel = Channel
- polling_interval = 1
- default_port = None
- can_parse_url = True
+ polling_interval: int = 1
+ default_port: Optional[int] = None
+ can_parse_url: bool = True
@staticmethod
def parse_uri(uri: str) -> tuple[str | dict, str]:
@@ -253,9 +254,10 @@ class Transport(virtual.Transport):
return credential, url
@classmethod
- def as_uri(cls, uri: str, include_password=False, mask='**') -> str:
+ def as_uri(
+ cls, uri: str, include_password: bool = False, mask: str = "**"
+ ) -> str:
credential, url = cls.parse_uri(uri)
- return 'azurestoragequeues://{}@{}'.format(
- credential if include_password else mask,
- url
+ return "azurestoragequeues://{}@{}".format(
+ credential if include_password else mask, url
)