diff options
| author | Ihar Nauros <53901248+frehty@users.noreply.github.com> | 2020-05-27 13:18:31 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-27 16:18:31 +0600 |
| commit | 5cc4a7688201f4d2e53f468734cc2e7cf177fc40 (patch) | |
| tree | bb50b4a060525f321b5d98291aef4096162a1c7d | |
| parent | 246cb9e996b39be80aa9b50194e2ffa8e024efde (diff) | |
| download | kombu-5cc4a7688201f4d2e53f468734cc2e7cf177fc40.tar.gz | |
Fix for issue #1198: Celery crashes in cases where there aren’t enough available workers to start acting on “in-flight” messages in the SQS queue (#1199)
* Fix for the issue #1172
* Unit test for the fix relating to the issue #1172
* Fix for issue #1198: Celery crashes in cases where there aren’t enough available workers to start acting on “in-flight” messages in the SQS queue
* Fix for issue #1198: fixed lint issues
* Fix for issue #1198: added unit tests
Co-authored-by: inauros <inauros@copyright.com>
| -rw-r--r-- | kombu/transport/SQS.py | 17 | ||||
| -rw-r--r-- | t/unit/transport/test_SQS.py | 79 |
2 files changed, 90 insertions, 6 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 83a436de..9af8aa17 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -67,6 +67,7 @@ import socket import string import uuid +from botocore.exceptions import ClientError from vine import transform, ensure_promise, promise from kombu.asynchronous import get_event_loop @@ -458,17 +459,21 @@ class Channel(virtual.Channel): message = self.qos.get(delivery_tag).delivery_info sqs_message = message['sqs_message'] except KeyError: - pass + super(Channel, self).basic_ack(delivery_tag) else: queue = None if 'routing_key' in message: queue = self.canonical_queue_name(message['routing_key']) - self.sqs(queue=queue).delete_message( - QueueUrl=message['sqs_queue'], - ReceiptHandle=sqs_message['ReceiptHandle'], - ) - super(Channel, self).basic_ack(delivery_tag) + try: + self.sqs(queue=queue).delete_message( + QueueUrl=message['sqs_queue'], + ReceiptHandle=sqs_message['ReceiptHandle'] + ) + except ClientError: + super(Channel, self).basic_reject(delivery_tag) + else: + super(Channel, self).basic_ack(delivery_tag) def _size(self, queue): """Return the number of messages in a queue.""" diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 23d45cd6..307dadbc 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -12,7 +12,9 @@ import pytest import random import string +from botocore.exceptions import ClientError from case import Mock, skip +from case.mock import patch from kombu import messaging from kombu import Connection, Exchange, Queue @@ -506,6 +508,83 @@ class test_Channel: QueueUrl=message['sqs_queue'], ReceiptHandle=message['sqs_message']['ReceiptHandle'] ) + assert {1} == self.channel.qos._dirty + + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_with_mocked_channel_methods(self, basic_reject_mock, + basic_ack_mock): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_message': { + 'ReceiptHandle': '1' + }, + 'sqs_queue': 'testing_queue' + } + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 1) + self.channel.sqs().delete_message = Mock() + self.channel.basic_ack(1) + self.sqs_conn_mock.delete_message.assert_called_with( + QueueUrl=message['sqs_queue'], + ReceiptHandle=message['sqs_message']['ReceiptHandle'] + ) + basic_ack_mock.assert_called_with(1) + assert not basic_reject_mock.called + + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_without_sqs_message(self, basic_reject_mock, + basic_ack_mock): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_queue': 'testing_queue' + } + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 1) + self.channel.sqs().delete_message = Mock() + self.channel.basic_ack(1) + assert not self.sqs_conn_mock.delete_message.called + basic_ack_mock.assert_called_with(1) + assert not basic_reject_mock.called + + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_invalid_receipt_handle(self, basic_reject_mock, + basic_ack_mock): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_message': { + 'ReceiptHandle': '2' + }, + 'sqs_queue': 'testing_queue' + } + error_response = { + 'Error': { + 'Code': 'InvalidParameterValue', + 'Message': 'Value 2 for parameter ReceiptHandle is invalid.' + ' Reason: The receipt handle has expired.' + } + } + operation_name = 'DeleteMessage' + + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 2) + self.channel.sqs().delete_message = Mock() + self.channel.sqs().delete_message.side_effect = ClientError( + error_response=error_response, + operation_name=operation_name + ) + self.channel.basic_ack(2) + self.sqs_conn_mock.delete_message.assert_called_with( + QueueUrl=message['sqs_queue'], + ReceiptHandle=message['sqs_message']['ReceiptHandle'] + ) + basic_reject_mock.assert_called_with(2) + assert not basic_ack_mock.called def test_predefined_queues_primes_queue_cache(self): connection = Connection(transport=SQS.Transport, transport_options={ |
