diff options
Diffstat (limited to 't/unit/transport/test_SQS.py')
| -rw-r--r-- | t/unit/transport/test_SQS.py | 87 |
1 files changed, 83 insertions, 4 deletions
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 944728f1..2b1219fc 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -4,6 +4,8 @@ NOTE: The SQSQueueMock and SQSConnectionMock classes originally come from http://github.com/pcsforeducation/sqs-mock-python. They have been patched slightly. """ +from __future__ import annotations + import base64 import os import random @@ -38,6 +40,11 @@ example_predefined_queues = { 'access_key_id': 'c', 'secret_access_key': 'd', }, + 'queue-3.fifo': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-3.fifo', + 'access_key_id': 'e', + 'secret_access_key': 'f', + } } @@ -151,6 +158,7 @@ class test_Channel: predefined_queues_sqs_conn_mocks = { 'queue-1': SQSClientMock(QueueName='queue-1'), 'queue-2': SQSClientMock(QueueName='queue-2'), + 'queue-3.fifo': SQSClientMock(QueueName='queue-3.fifo') } def mock_sqs(): @@ -330,13 +338,13 @@ class test_Channel: with pytest.raises(Empty): self.channel._get_bulk(self.queue_name) - def test_is_base64_encoded(self): + def test_optional_b64_decode(self): raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": "celery.task.PingTask",' \ b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}' # noqa b64_enc = base64.b64encode(raw) - assert self.channel._Channel__b64_encoded(b64_enc) - assert not self.channel._Channel__b64_encoded(raw) - assert not self.channel._Channel__b64_encoded(b"test123") + assert self.channel._optional_b64_decode(b64_enc) == raw + assert self.channel._optional_b64_decode(raw) == raw + assert self.channel._optional_b64_decode(b"test123") == b"test123" def test_messages_to_python(self): from kombu.asynchronous.aws.sqs.message import Message @@ -738,6 +746,77 @@ class test_Channel: QueueUrl='https://sqs.us-east-1.amazonaws.com/xxx/queue-1', ReceiptHandle='test_message_id', VisibilityTimeout=20) + def test_predefined_queues_put_to_fifo_queue(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-3.fifo' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message') + + sqs_queue_mock.send_message.assert_called_once() + assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1] + assert 'MessageDeduplicationId' in \ + sqs_queue_mock.send_message.call_args[1] + + def test_predefined_queues_put_to_queue(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-2' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message', DelaySeconds=10) + + sqs_queue_mock.send_message.assert_called_once() + + assert 'DelaySeconds' in sqs_queue_mock.send_message.call_args[1] + assert sqs_queue_mock.send_message.call_args[1]['DelaySeconds'] == 10 + + @pytest.mark.parametrize('predefined_queues', ( + { + 'invalid-fifo-queue-name': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue.fifo', + 'access_key_id': 'a', + 'secret_access_key': 'b' + } + }, + { + 'standard-queue.fifo': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue', + 'access_key_id': 'a', + 'secret_access_key': 'b' + } + } + )) + def test_predefined_queues_invalid_configuration(self, predefined_queues): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': predefined_queues, + }) + with pytest.raises(SQS.InvalidQueueException): + connection.channel() + def test_sts_new_session(self): # Arrange connection = Connection(transport=SQS.Transport, transport_options={ |
