diff options
author | eloranger <59779111+erikaloranger@users.noreply.github.com> | 2022-07-12 12:31:21 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-12 22:31:21 +0600 |
commit | 674a645356227576170ebb378c903a9445a384e4 (patch) | |
tree | 3ec6bbb5bdb49e4143ff6330eaa294f5412be665 | |
parent | b4698b502369c9aeec76160b459d13c507e7611f (diff) | |
download | kombu-674a645356227576170ebb378c903a9445a384e4.tar.gz |
Add support to SQS DelaySeconds (#1567)
* Add DelaySeconds to kwargs
* Add test
* add default value for DelaySeconds
* Fix tests and add check for properties
* Fix flake8 style issue
Co-authored-by: Edmund Lam <2623895+edmundlam@users.noreply.github.com>
-rw-r--r-- | kombu/transport/SQS.py | 27 | ||||
-rw-r--r-- | t/unit/transport/test_SQS.py | 24 |
2 files changed, 40 insertions, 11 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index fa953e8c..ac199aa1 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -402,18 +402,23 @@ class Channel(virtual.Channel): else: body = dumps(message) kwargs = {'QueueUrl': q_url, 'MessageBody': body} - if queue.endswith('.fifo'): - if 'MessageGroupId' in message['properties']: - kwargs['MessageGroupId'] = \ - message['properties']['MessageGroupId'] - else: - kwargs['MessageGroupId'] = 'default' - if 'MessageDeduplicationId' in message['properties']: - kwargs['MessageDeduplicationId'] = \ - message['properties']['MessageDeduplicationId'] - else: - kwargs['MessageDeduplicationId'] = str(uuid.uuid4()) + if 'properties' in message: + if queue.endswith('.fifo'): + if 'MessageGroupId' in message['properties']: + kwargs['MessageGroupId'] = \ + message['properties']['MessageGroupId'] + else: + kwargs['MessageGroupId'] = 'default' + if 'MessageDeduplicationId' in message['properties']: + kwargs['MessageDeduplicationId'] = \ + message['properties']['MessageDeduplicationId'] + else: + kwargs['MessageDeduplicationId'] = str(uuid.uuid4()) + else: + if "DelaySeconds" in message['properties']: + kwargs['DelaySeconds'] = \ + message['properties']['DelaySeconds'] c = self.sqs(queue=self.canonical_queue_name(queue)) if message.get('redelivered'): c.change_message_visibility( diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index ecf32e10..2b1219fc 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -770,6 +770,30 @@ class test_Channel: assert 'MessageDeduplicationId' in \ sqs_queue_mock.send_message.call_args[1] + def test_predefined_queues_put_to_queue(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-2' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message', DelaySeconds=10) + + sqs_queue_mock.send_message.assert_called_once() + + assert 'DelaySeconds' in sqs_queue_mock.send_message.call_args[1] + assert sqs_queue_mock.send_message.call_args[1]['DelaySeconds'] == 10 + @pytest.mark.parametrize('predefined_queues', ( { 'invalid-fifo-queue-name': { |