summaryrefslogtreecommitdiff
path: root/t/unit/transport/test_azureservicebus.py
diff options
context:
space:
mode:
authorNicolas Mota <nicolas_mota@me.com>2019-10-23 11:25:15 -0300
committerOmer Katz <omer.drow@gmail.com>2019-10-23 17:25:15 +0300
commit977fe3d4f6e0ba8ebf77fcb4f829e7bc6df54fc2 (patch)
tree3a0ddb0a92a15a462b5eeed67a7f9fe2a17f7640 /t/unit/transport/test_azureservicebus.py
parent2d92827746f0f2e5c964d685246836370f14786b (diff)
downloadkombu-977fe3d4f6e0ba8ebf77fcb4f829e7bc6df54fc2.tar.gz
Add wait timeout settings to receive queue message (#1110)
* Add wait timeout settings to receive queue message * add tests * add more tests and remove servicebus from coveragerc
Diffstat (limited to 't/unit/transport/test_azureservicebus.py')
-rw-r--r--t/unit/transport/test_azureservicebus.py232
1 files changed, 232 insertions, 0 deletions
diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py
new file mode 100644
index 00000000..c5b5acca
--- /dev/null
+++ b/t/unit/transport/test_azureservicebus.py
@@ -0,0 +1,232 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from case import skip, patch
+from kombu import messaging
+from kombu import Connection, Exchange, Queue
+from kombu.five import Empty
+from kombu.transport import azureservicebus
+
+try:
+ # azure-servicebus version >= 0.50.0
+ from azure.servicebus.control_client import Message, ServiceBusService
+except ImportError:
+ try:
+ # azure-servicebus version <= 0.21.1
+ from azure.servicebus import Message, ServiceBusService
+ except ImportError:
+ ServiceBusService = Message = None
+
+
+class QueueMock(object):
+ """ Hold information about a queue. """
+
+ def __init__(self, name):
+ self.name = name
+ self.messages = []
+ self.message_count = 0
+
+ def __repr__(self):
+ return 'QueueMock: {} messages'.format(len(self.messages))
+
+
+def _create_mock_connection(url='', **kwargs):
+
+ class _Channel(azureservicebus.Channel):
+ # reset _fanout_queues for each instance
+ queues = []
+ _queue_service = None
+
+ def list_queues(self):
+ return self.queues
+
+ @property
+ def queue_service(self):
+ if self._queue_service is None:
+ self._queue_service = AzureServiceBusClientMock()
+ return self._queue_service
+
+ class Transport(azureservicebus.Transport):
+ Channel = _Channel
+
+ return Connection(url, transport=Transport, **kwargs)
+
+
+class AzureServiceBusClientMock(object):
+
+ def __init__(self):
+ """
+ Imitate the ServiceBus Client.
+ """
+ # queues doesn't exist on the real client, here for testing.
+ self.queues = []
+ self._queue_cache = {}
+ self.queues.append(self.create_queue(queue_name='unittest_queue'))
+
+ def create_queue(self, queue_name, queue=None, fail_on_exist=False):
+ queue = QueueMock(name=queue_name)
+ self.queues.append(queue)
+ self._queue_cache[queue_name] = queue
+ return queue
+
+ def get_queue(self, queue_name=None):
+ for queue in self.queues:
+ if queue.name == queue_name:
+ return queue
+
+ def list_queues(self):
+ return self.queues
+
+ def send_queue_message(self, queue_name=None, message=None):
+ queue = self.get_queue(queue_name)
+ queue.messages.append(message)
+
+ def receive_queue_message(self, queue_name, peek_lock=True, timeout=60):
+ queue = self.get_queue(queue_name)
+ if queue:
+ try:
+ return queue.messages.pop(0)
+ except IndexError:
+ return Message()
+
+ def read_delete_queue_message(self, queue_name, timeout='60'):
+ return self.receive_queue_message(queue_name)
+
+ def delete_queue(self, queue_name=None):
+ queue = self.get_queue(queue_name)
+ if queue:
+ del queue
+
+
+@skip.unless_module('azure.servicebus')
+class test_Channel:
+
+ def handleMessageCallback(self, message):
+ self.callback_message = message
+
+ def setup(self):
+ self.url = 'azureservicebus://'
+ self.queue_name = 'unittest_queue'
+
+ self.exchange = Exchange('test_servicebus', type='direct')
+ self.queue = Queue(self.queue_name, self.exchange, self.queue_name)
+ self.connection = _create_mock_connection(self.url)
+ self.channel = self.connection.default_channel
+ self.queue(self.channel).declare()
+
+ self.producer = messaging.Producer(self.channel,
+ self.exchange,
+ routing_key=self.queue_name)
+
+ self.channel.basic_consume(self.queue_name,
+ no_ack=False,
+ callback=self.handleMessageCallback,
+ consumer_tag='unittest')
+
+ def teardown(self):
+ # Removes QoS reserved messages so we don't restore msgs on shutdown.
+ try:
+ qos = self.channel._qos
+ except AttributeError:
+ pass
+ else:
+ if qos:
+ qos._dirty.clear()
+ qos._delivered.clear()
+
+ def test_queue_service(self):
+ # Test gettings queue service without credentials
+ conn = Connection(self.url, transport=azureservicebus.Transport)
+ with pytest.raises(ValueError) as exc:
+ conn.channel()
+ assert exc == 'You need to provide servicebus namespace'
+
+ # Test getting queue service when queue_service is not setted
+ with patch('kombu.transport.azureservicebus.ServiceBusService') as m:
+ channel = conn.channel()
+
+ # Remove queue service to get from service bus again
+ channel._queue_service = None
+ channel.queue_service
+
+ assert m.call_count == 2
+
+ # Calling queue_service again needs to reuse ServiceBus instance
+ channel.queue_service
+ assert m.call_count == 2
+
+ def test_conninfo(self):
+ conninfo = self.channel.conninfo
+ assert conninfo is self.connection
+
+ def test_transport_type(self):
+ transport_options = self.channel.transport_options
+ assert transport_options == {}
+
+ def test_visibility_timeout(self):
+ # Test getting default visibility timeout
+ assert (
+ self.channel.visibility_timeout ==
+ azureservicebus.Channel.default_visibility_timeout
+ )
+
+ # Test getting value setted in transport options
+ del self.channel.visibility_timeout
+ self.channel.transport_options['visibility_timeout'] = 10
+ assert self.channel.visibility_timeout == 10
+
+ def test_wait_timeout_seconds(self):
+ # Test getting default wait timeout seconds
+ assert (
+ self.channel.wait_time_seconds ==
+ azureservicebus.Channel.default_wait_time_seconds
+ )
+
+ # Test getting value setted in transport options
+ del self.channel.wait_time_seconds
+ self.channel.transport_options['wait_time_seconds'] = 10
+ assert self.channel.wait_time_seconds == 10
+
+ def test_get_from_azure(self):
+ # Test getting a single message
+ message = 'my test message'
+ self.producer.publish(message)
+ result = self.channel._get(self.queue_name)
+ assert 'body' in result.keys()
+
+ # Test getting multiple messages
+ for i in range(3):
+ message = 'message: {0}'.format(i)
+ self.producer.publish(message)
+
+ queue_service = self.channel.queue_service
+ assert len(queue_service.get_queue(self.queue_name).messages) == 3
+
+ for i in range(3):
+ result = self.channel._get(self.queue_name)
+
+ assert len(queue_service.get_queue(self.queue_name).messages) == 0
+
+ def test_get_with_empty_list(self):
+ with pytest.raises(Empty):
+ self.channel._get(self.queue_name)
+
+ def test_put_and_get(self):
+ message = 'my test message'
+ self.producer.publish(message)
+ results = self.queue(self.channel).get().payload
+ assert message == results
+
+ def test_delete_queue(self):
+ # Test deleting queue without message
+ queue_name = 'new_unittest_queue'
+ self.channel._new_queue(queue_name)
+ self.channel._delete(queue_name)
+ assert queue_name not in self.channel._queue_cache
+
+ # Test deleting queue with message
+ message = 'my test message'
+ self.producer.publish(message)
+ self.channel._delete(self.queue_name)
+ assert queue_name not in self.channel._queue_cache