summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreloranger <59779111+erikaloranger@users.noreply.github.com>2022-07-12 12:31:21 -0400
committerGitHub <noreply@github.com>2022-07-12 22:31:21 +0600
commit674a645356227576170ebb378c903a9445a384e4 (patch)
tree3ec6bbb5bdb49e4143ff6330eaa294f5412be665
parentb4698b502369c9aeec76160b459d13c507e7611f (diff)
downloadkombu-674a645356227576170ebb378c903a9445a384e4.tar.gz
Add support to SQS DelaySeconds (#1567)
* Add DelaySeconds to kwargs * Add test * add default value for DelaySeconds * Fix tests and add check for properties * Fix flake8 style issue Co-authored-by: Edmund Lam <2623895+edmundlam@users.noreply.github.com>
-rw-r--r--kombu/transport/SQS.py27
-rw-r--r--t/unit/transport/test_SQS.py24
2 files changed, 40 insertions, 11 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index fa953e8c..ac199aa1 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -402,18 +402,23 @@ class Channel(virtual.Channel):
else:
body = dumps(message)
kwargs = {'QueueUrl': q_url, 'MessageBody': body}
- if queue.endswith('.fifo'):
- if 'MessageGroupId' in message['properties']:
- kwargs['MessageGroupId'] = \
- message['properties']['MessageGroupId']
- else:
- kwargs['MessageGroupId'] = 'default'
- if 'MessageDeduplicationId' in message['properties']:
- kwargs['MessageDeduplicationId'] = \
- message['properties']['MessageDeduplicationId']
- else:
- kwargs['MessageDeduplicationId'] = str(uuid.uuid4())
+ if 'properties' in message:
+ if queue.endswith('.fifo'):
+ if 'MessageGroupId' in message['properties']:
+ kwargs['MessageGroupId'] = \
+ message['properties']['MessageGroupId']
+ else:
+ kwargs['MessageGroupId'] = 'default'
+ if 'MessageDeduplicationId' in message['properties']:
+ kwargs['MessageDeduplicationId'] = \
+ message['properties']['MessageDeduplicationId']
+ else:
+ kwargs['MessageDeduplicationId'] = str(uuid.uuid4())
+ else:
+ if "DelaySeconds" in message['properties']:
+ kwargs['DelaySeconds'] = \
+ message['properties']['DelaySeconds']
c = self.sqs(queue=self.canonical_queue_name(queue))
if message.get('redelivered'):
c.change_message_visibility(
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py
index ecf32e10..2b1219fc 100644
--- a/t/unit/transport/test_SQS.py
+++ b/t/unit/transport/test_SQS.py
@@ -770,6 +770,30 @@ class test_Channel:
assert 'MessageDeduplicationId' in \
sqs_queue_mock.send_message.call_args[1]
+ def test_predefined_queues_put_to_queue(self):
+ connection = Connection(transport=SQS.Transport, transport_options={
+ 'predefined_queues': example_predefined_queues,
+ })
+ channel = connection.channel()
+
+ queue_name = 'queue-2'
+
+ exchange = Exchange('test_SQS', type='direct')
+ p = messaging.Producer(channel, exchange, routing_key=queue_name)
+
+ queue = Queue(queue_name, exchange, queue_name)
+ queue(channel).declare()
+
+ channel.sqs = Mock()
+ sqs_queue_mock = Mock()
+ channel.sqs.return_value = sqs_queue_mock
+ p.publish('message', DelaySeconds=10)
+
+ sqs_queue_mock.send_message.assert_called_once()
+
+ assert 'DelaySeconds' in sqs_queue_mock.send_message.call_args[1]
+ assert sqs_queue_mock.send_message.call_args[1]['DelaySeconds'] == 10
+
@pytest.mark.parametrize('predefined_queues', (
{
'invalid-fifo-queue-name': {