summaryrefslogtreecommitdiff
path: root/t/unit/transport/test_SQS.py
diff options
context:
space:
mode:
Diffstat (limited to 't/unit/transport/test_SQS.py')
-rw-r--r--t/unit/transport/test_SQS.py87
1 files changed, 83 insertions, 4 deletions
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py
index 944728f1..2b1219fc 100644
--- a/t/unit/transport/test_SQS.py
+++ b/t/unit/transport/test_SQS.py
@@ -4,6 +4,8 @@ NOTE: The SQSQueueMock and SQSConnectionMock classes originally come from
http://github.com/pcsforeducation/sqs-mock-python. They have been patched
slightly.
"""
+from __future__ import annotations
+
import base64
import os
import random
@@ -38,6 +40,11 @@ example_predefined_queues = {
'access_key_id': 'c',
'secret_access_key': 'd',
},
+ 'queue-3.fifo': {
+ 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-3.fifo',
+ 'access_key_id': 'e',
+ 'secret_access_key': 'f',
+ }
}
@@ -151,6 +158,7 @@ class test_Channel:
predefined_queues_sqs_conn_mocks = {
'queue-1': SQSClientMock(QueueName='queue-1'),
'queue-2': SQSClientMock(QueueName='queue-2'),
+ 'queue-3.fifo': SQSClientMock(QueueName='queue-3.fifo')
}
def mock_sqs():
@@ -330,13 +338,13 @@ class test_Channel:
with pytest.raises(Empty):
self.channel._get_bulk(self.queue_name)
- def test_is_base64_encoded(self):
+ def test_optional_b64_decode(self):
raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": "celery.task.PingTask",' \
b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}' # noqa
b64_enc = base64.b64encode(raw)
- assert self.channel._Channel__b64_encoded(b64_enc)
- assert not self.channel._Channel__b64_encoded(raw)
- assert not self.channel._Channel__b64_encoded(b"test123")
+ assert self.channel._optional_b64_decode(b64_enc) == raw
+ assert self.channel._optional_b64_decode(raw) == raw
+ assert self.channel._optional_b64_decode(b"test123") == b"test123"
def test_messages_to_python(self):
from kombu.asynchronous.aws.sqs.message import Message
@@ -738,6 +746,77 @@ class test_Channel:
QueueUrl='https://sqs.us-east-1.amazonaws.com/xxx/queue-1',
ReceiptHandle='test_message_id', VisibilityTimeout=20)
+ def test_predefined_queues_put_to_fifo_queue(self):
+ connection = Connection(transport=SQS.Transport, transport_options={
+ 'predefined_queues': example_predefined_queues,
+ })
+ channel = connection.channel()
+
+ queue_name = 'queue-3.fifo'
+
+ exchange = Exchange('test_SQS', type='direct')
+ p = messaging.Producer(channel, exchange, routing_key=queue_name)
+
+ queue = Queue(queue_name, exchange, queue_name)
+ queue(channel).declare()
+
+ channel.sqs = Mock()
+ sqs_queue_mock = Mock()
+ channel.sqs.return_value = sqs_queue_mock
+ p.publish('message')
+
+ sqs_queue_mock.send_message.assert_called_once()
+ assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1]
+ assert 'MessageDeduplicationId' in \
+ sqs_queue_mock.send_message.call_args[1]
+
+ def test_predefined_queues_put_to_queue(self):
+ connection = Connection(transport=SQS.Transport, transport_options={
+ 'predefined_queues': example_predefined_queues,
+ })
+ channel = connection.channel()
+
+ queue_name = 'queue-2'
+
+ exchange = Exchange('test_SQS', type='direct')
+ p = messaging.Producer(channel, exchange, routing_key=queue_name)
+
+ queue = Queue(queue_name, exchange, queue_name)
+ queue(channel).declare()
+
+ channel.sqs = Mock()
+ sqs_queue_mock = Mock()
+ channel.sqs.return_value = sqs_queue_mock
+ p.publish('message', DelaySeconds=10)
+
+ sqs_queue_mock.send_message.assert_called_once()
+
+ assert 'DelaySeconds' in sqs_queue_mock.send_message.call_args[1]
+ assert sqs_queue_mock.send_message.call_args[1]['DelaySeconds'] == 10
+
+ @pytest.mark.parametrize('predefined_queues', (
+ {
+ 'invalid-fifo-queue-name': {
+ 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue.fifo',
+ 'access_key_id': 'a',
+ 'secret_access_key': 'b'
+ }
+ },
+ {
+ 'standard-queue.fifo': {
+ 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue',
+ 'access_key_id': 'a',
+ 'secret_access_key': 'b'
+ }
+ }
+ ))
+ def test_predefined_queues_invalid_configuration(self, predefined_queues):
+ connection = Connection(transport=SQS.Transport, transport_options={
+ 'predefined_queues': predefined_queues,
+ })
+ with pytest.raises(SQS.InvalidQueueException):
+ connection.channel()
+
def test_sts_new_session(self):
# Arrange
connection = Connection(transport=SQS.Transport, transport_options={