summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIhar Nauros <53901248+frehty@users.noreply.github.com>2020-05-27 13:18:31 +0300
committerGitHub <noreply@github.com>2020-05-27 16:18:31 +0600
commit5cc4a7688201f4d2e53f468734cc2e7cf177fc40 (patch)
treebb50b4a060525f321b5d98291aef4096162a1c7d
parent246cb9e996b39be80aa9b50194e2ffa8e024efde (diff)
downloadkombu-5cc4a7688201f4d2e53f468734cc2e7cf177fc40.tar.gz
Fix for issue #1198: Celery crashes in cases where there aren’t enough available workers to start acting on “in-flight” messages in the SQS queue (#1199)
* Fix for the issue #1172 * Unit test for the fix relating to the issue #1172 * Fix for issue #1198: Celery crashes in cases where there aren’t enough available workers to start acting on “in-flight” messages in the SQS queue * Fix for issue #1198: fixed lint issues * Fix for issue #1198: added unit tests Co-authored-by: inauros <inauros@copyright.com>
-rw-r--r--kombu/transport/SQS.py17
-rw-r--r--t/unit/transport/test_SQS.py79
2 files changed, 90 insertions, 6 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 83a436de..9af8aa17 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -67,6 +67,7 @@ import socket
import string
import uuid
+from botocore.exceptions import ClientError
from vine import transform, ensure_promise, promise
from kombu.asynchronous import get_event_loop
@@ -458,17 +459,21 @@ class Channel(virtual.Channel):
message = self.qos.get(delivery_tag).delivery_info
sqs_message = message['sqs_message']
except KeyError:
- pass
+ super(Channel, self).basic_ack(delivery_tag)
else:
queue = None
if 'routing_key' in message:
queue = self.canonical_queue_name(message['routing_key'])
- self.sqs(queue=queue).delete_message(
- QueueUrl=message['sqs_queue'],
- ReceiptHandle=sqs_message['ReceiptHandle'],
- )
- super(Channel, self).basic_ack(delivery_tag)
+ try:
+ self.sqs(queue=queue).delete_message(
+ QueueUrl=message['sqs_queue'],
+ ReceiptHandle=sqs_message['ReceiptHandle']
+ )
+ except ClientError:
+ super(Channel, self).basic_reject(delivery_tag)
+ else:
+ super(Channel, self).basic_ack(delivery_tag)
def _size(self, queue):
"""Return the number of messages in a queue."""
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py
index 23d45cd6..307dadbc 100644
--- a/t/unit/transport/test_SQS.py
+++ b/t/unit/transport/test_SQS.py
@@ -12,7 +12,9 @@ import pytest
import random
import string
+from botocore.exceptions import ClientError
from case import Mock, skip
+from case.mock import patch
from kombu import messaging
from kombu import Connection, Exchange, Queue
@@ -506,6 +508,83 @@ class test_Channel:
QueueUrl=message['sqs_queue'],
ReceiptHandle=message['sqs_message']['ReceiptHandle']
)
+ assert {1} == self.channel.qos._dirty
+
+ @patch('kombu.transport.virtual.base.Channel.basic_ack')
+ @patch('kombu.transport.virtual.base.Channel.basic_reject')
+ def test_basic_ack_with_mocked_channel_methods(self, basic_reject_mock,
+ basic_ack_mock):
+ """Test that basic_ack calls the delete_message properly"""
+ message = {
+ 'sqs_message': {
+ 'ReceiptHandle': '1'
+ },
+ 'sqs_queue': 'testing_queue'
+ }
+ mock_messages = Mock()
+ mock_messages.delivery_info = message
+ self.channel.qos.append(mock_messages, 1)
+ self.channel.sqs().delete_message = Mock()
+ self.channel.basic_ack(1)
+ self.sqs_conn_mock.delete_message.assert_called_with(
+ QueueUrl=message['sqs_queue'],
+ ReceiptHandle=message['sqs_message']['ReceiptHandle']
+ )
+ basic_ack_mock.assert_called_with(1)
+ assert not basic_reject_mock.called
+
+ @patch('kombu.transport.virtual.base.Channel.basic_ack')
+ @patch('kombu.transport.virtual.base.Channel.basic_reject')
+ def test_basic_ack_without_sqs_message(self, basic_reject_mock,
+ basic_ack_mock):
+ """Test that basic_ack calls the delete_message properly"""
+ message = {
+ 'sqs_queue': 'testing_queue'
+ }
+ mock_messages = Mock()
+ mock_messages.delivery_info = message
+ self.channel.qos.append(mock_messages, 1)
+ self.channel.sqs().delete_message = Mock()
+ self.channel.basic_ack(1)
+ assert not self.sqs_conn_mock.delete_message.called
+ basic_ack_mock.assert_called_with(1)
+ assert not basic_reject_mock.called
+
+ @patch('kombu.transport.virtual.base.Channel.basic_ack')
+ @patch('kombu.transport.virtual.base.Channel.basic_reject')
+ def test_basic_ack_invalid_receipt_handle(self, basic_reject_mock,
+ basic_ack_mock):
+ """Test that basic_ack calls the delete_message properly"""
+ message = {
+ 'sqs_message': {
+ 'ReceiptHandle': '2'
+ },
+ 'sqs_queue': 'testing_queue'
+ }
+ error_response = {
+ 'Error': {
+ 'Code': 'InvalidParameterValue',
+ 'Message': 'Value 2 for parameter ReceiptHandle is invalid.'
+ ' Reason: The receipt handle has expired.'
+ }
+ }
+ operation_name = 'DeleteMessage'
+
+ mock_messages = Mock()
+ mock_messages.delivery_info = message
+ self.channel.qos.append(mock_messages, 2)
+ self.channel.sqs().delete_message = Mock()
+ self.channel.sqs().delete_message.side_effect = ClientError(
+ error_response=error_response,
+ operation_name=operation_name
+ )
+ self.channel.basic_ack(2)
+ self.sqs_conn_mock.delete_message.assert_called_with(
+ QueueUrl=message['sqs_queue'],
+ ReceiptHandle=message['sqs_message']['ReceiptHandle']
+ )
+ basic_reject_mock.assert_called_with(2)
+ assert not basic_ack_mock.called
def test_predefined_queues_primes_queue_cache(self):
connection = Connection(transport=SQS.Transport, transport_options={