diff options
| author | Ihar Nauros <53901248+frehty@users.noreply.github.com> | 2020-05-27 13:18:31 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-27 16:18:31 +0600 |
| commit | 5cc4a7688201f4d2e53f468734cc2e7cf177fc40 (patch) | |
| tree | bb50b4a060525f321b5d98291aef4096162a1c7d /t | |
| parent | 246cb9e996b39be80aa9b50194e2ffa8e024efde (diff) | |
| download | kombu-5cc4a7688201f4d2e53f468734cc2e7cf177fc40.tar.gz | |
Fix for issue #1198: Celery crashes in cases where there aren’t enough available workers to start acting on “in-flight” messages in the SQS queue (#1199)
* Fix for the issue #1172
* Unit test for the fix relating to the issue #1172
* Fix for issue #1198: Celery crashes in cases where there aren’t enough available workers to start acting on “in-flight” messages in the SQS queue
* Fix for issue #1198: fixed lint issues
* Fix for issue #1198: added unit tests
Co-authored-by: inauros <inauros@copyright.com>
Diffstat (limited to 't')
| -rw-r--r-- | t/unit/transport/test_SQS.py | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 23d45cd6..307dadbc 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -12,7 +12,9 @@ import pytest import random import string +from botocore.exceptions import ClientError from case import Mock, skip +from case.mock import patch from kombu import messaging from kombu import Connection, Exchange, Queue @@ -506,6 +508,83 @@ class test_Channel: QueueUrl=message['sqs_queue'], ReceiptHandle=message['sqs_message']['ReceiptHandle'] ) + assert {1} == self.channel.qos._dirty + + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_with_mocked_channel_methods(self, basic_reject_mock, + basic_ack_mock): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_message': { + 'ReceiptHandle': '1' + }, + 'sqs_queue': 'testing_queue' + } + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 1) + self.channel.sqs().delete_message = Mock() + self.channel.basic_ack(1) + self.sqs_conn_mock.delete_message.assert_called_with( + QueueUrl=message['sqs_queue'], + ReceiptHandle=message['sqs_message']['ReceiptHandle'] + ) + basic_ack_mock.assert_called_with(1) + assert not basic_reject_mock.called + + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_without_sqs_message(self, basic_reject_mock, + basic_ack_mock): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_queue': 'testing_queue' + } + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 1) + self.channel.sqs().delete_message = Mock() + self.channel.basic_ack(1) + assert not self.sqs_conn_mock.delete_message.called + basic_ack_mock.assert_called_with(1) + assert not basic_reject_mock.called + + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_invalid_receipt_handle(self, basic_reject_mock, + basic_ack_mock): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_message': { + 'ReceiptHandle': '2' + }, + 'sqs_queue': 'testing_queue' + } + error_response = { + 'Error': { + 'Code': 'InvalidParameterValue', + 'Message': 'Value 2 for parameter ReceiptHandle is invalid.' + ' Reason: The receipt handle has expired.' + } + } + operation_name = 'DeleteMessage' + + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 2) + self.channel.sqs().delete_message = Mock() + self.channel.sqs().delete_message.side_effect = ClientError( + error_response=error_response, + operation_name=operation_name + ) + self.channel.basic_ack(2) + self.sqs_conn_mock.delete_message.assert_called_with( + QueueUrl=message['sqs_queue'], + ReceiptHandle=message['sqs_message']['ReceiptHandle'] + ) + basic_reject_mock.assert_called_with(2) + assert not basic_ack_mock.called def test_predefined_queues_primes_queue_cache(self): connection = Connection(transport=SQS.Transport, transport_options={ |
