summaryrefslogtreecommitdiff
path: root/kombu/transport/azureservicebus.py
blob: f022ebb4f547e361afbe9a13a63ef580761afb28 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Azure Service Bus Message Queue transport.

The transport can be enabled by setting the CELERY_BROKER_URL to:

```
azureservicebus://{SAS policy name}:{SAS key}@{Service Bus Namespace}
```

Note that the Shared Access Policy used to connect to Azure Service Bus
requires Manage, Send and Listen claims since the broker will create new
queues and delete old queues as required.

Note that if the SAS key for the Service Bus account contains a slash, it will
have to be regenerated before it can be used in the connection URL.

More information about Azure Service Bus:
https://azure.microsoft.com/en-us/services/service-bus/

"""
from __future__ import absolute_import, unicode_literals

import string

from kombu.five import Empty, text_t
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property

from . import virtual

try:
    from azure.servicebus import ServiceBusService, Message, Queue
except ImportError:
    ServiceBusService = Message = Queue = None

# dots are replaced by dash, all other punctuation replaced by underscore.
CHARS_REPLACE_TABLE = {
    ord(c): 0x5f for c in string.punctuation if c not in '_'
}


class Channel(virtual.Channel):
    """Azure Service Bus channel."""

    default_visibility_timeout = 1800  # 30 minutes.
    domain_format = 'kombu%(vhost)s'
    _queue_service = None
    _queue_cache = {}

    def __init__(self, *args, **kwargs):
        if ServiceBusService is None:
            raise ImportError('Azure Service Bus transport requires the '
                              'azure-servicebus library')

        super(Channel, self).__init__(*args, **kwargs)

        for queue in self.queue_service.list_queues():
            self._queue_cache[queue] = queue

    def entity_name(self, name, table=CHARS_REPLACE_TABLE):
        """Format AMQP queue name into a valid ServiceBus queue name."""
        return text_t(safe_str(name)).translate(table)

    def _new_queue(self, queue, **kwargs):
        """Ensure a queue exists in ServiceBus."""
        queue = self.entity_name(self.queue_name_prefix + queue)
        try:
            return self._queue_cache[queue]
        except KeyError:
            self.queue_service.create_queue(queue, fail_on_exist=False)
            q = self._queue_cache[queue] = self.queue_service.get_queue(queue)
            return q

    def _delete(self, queue, *args, **kwargs):
        """Delete queue by name."""
        queue_name = self.entity_name(queue)
        self._queue_cache.pop(queue_name, None)
        self.queue_service.delete_queue(queue_name)
        super(Channel, self)._delete(queue_name)

    def _put(self, queue, message, **kwargs):
        """Put message onto queue."""
        msg = Message(dumps(message))
        self.queue_service.send_queue_message(self.entity_name(queue), msg)

    def _get(self, queue, timeout=None):
        """Try to retrieve a single message off ``queue``."""
        message = self.queue_service.receive_queue_message(
            self.entity_name(queue), timeout=timeout, peek_lock=False)

        if message.body is None:
            raise Empty()

        return loads(bytes_to_str(message.body))

    def _size(self, queue):
        """Return the number of messages in a queue."""
        return self._new_queue(queue).message_count

    def _purge(self, queue):
        """Delete all current messages in a queue."""
        n = 0

        while True:
            message = self.queue_service.read_delete_queue_message(
                self.entity_name(queue), timeout=0.1)

            if not message.body:
                break
            else:
                n += 1

        return n

    @property
    def queue_service(self):
        if self._queue_service is None:
            self._queue_service = ServiceBusService(
                service_namespace=self.conninfo.hostname,
                shared_access_key_name=self.conninfo.userid,
                shared_access_key_value=self.conninfo.password)

        return self._queue_service

    @property
    def conninfo(self):
        return self.connection.client

    @property
    def transport_options(self):
        return self.connection.client.transport_options

    @cached_property
    def visibility_timeout(self):
        return (self.transport_options.get('visibility_timeout') or
                self.default_visibility_timeout)

    @cached_property
    def queue_name_prefix(self):
        return self.transport_options.get('queue_name_prefix', '')


class Transport(virtual.Transport):
    """Azure Service Bus transport."""

    Channel = Channel

    polling_interval = 1
    default_port = None