summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMintu Kumar Sah <mintu.sah@joshtechnologygroup.com>2018-10-13 15:52:16 +0530
committerAsif Saif Uddin <auvipy@gmail.com>2018-10-13 16:22:16 +0600
commit94227bbc43b182477d37be30b881b065489ce833 (patch)
treee5e5c1f25bec51405af2d75d115f730ec30a4e2a
parent0af7519f98453bae62e0f4ad70e613e04c8fa379 (diff)
downloadkombu-94227bbc43b182477d37be30b881b065489ce833.tar.gz
Fixed Worker shutdown creates duplicate messages in SQS broker (#926)
* On restore changes message visibility instead of send new message * Acknowledge message on hub close * Use sqs instead of async sqs to delete message * changes itertools to range * Empty Hub ready * fixed test_poller_regeneration_on_access * Fixed typo in comment * Simplify loop to process self._ready * Added test case for redelivered _put * Lint fixes * Added test case for delete_message call
-rw-r--r--kombu/asynchronous/hub.py8
-rw-r--r--kombu/transport/SQS.py13
-rw-r--r--t/unit/asynchronous/test_hub.py2
-rw-r--r--t/unit/transport/test_SQS.py28
4 files changed, 48 insertions, 3 deletions
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py
index a8c3124f..7cc6e35f 100644
--- a/kombu/asynchronous/hub.py
+++ b/kombu/asynchronous/hub.py
@@ -254,6 +254,14 @@ class Hub(object):
for callback in self.on_close:
callback(self)
+ # Complete remaining todo before Hub close
+ # Eg: Acknowledge message
+ # To avoid infinite loop where one of the callables adds items
+ # to self._ready (via call_soon or otherwise).
+ # we create new list with current self._ready
+ for item in list(self._ready):
+ item()
+
def _discard(self, fd):
fd = fileno(fd)
self.readers.pop(fd, None)
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index c3e19203..b4d5093b 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -209,7 +209,14 @@ class Channel(virtual.Channel):
message['properties']['MessageDeduplicationId']
else:
kwargs['MessageDeduplicationId'] = str(uuid.uuid4())
- self.sqs.send_message(**kwargs)
+ if message.get('redelivered'):
+ self.sqs.change_message_visibility(
+ QueueUrl=q_url,
+ ReceiptHandle=message['properties']['delivery_tag'],
+ VisibilityTimeout=0
+ )
+ else:
+ self.sqs.send_message(**kwargs)
def _message_to_python(self, message, queue_name, queue):
body = base64.b64decode(message['Body'].encode())
@@ -377,8 +384,8 @@ class Channel(virtual.Channel):
except KeyError:
pass
else:
- self.asynsqs.delete_message(message['sqs_queue'],
- sqs_message['ReceiptHandle'])
+ self.sqs.delete_message(QueueUrl=message['sqs_queue'],
+ ReceiptHandle=sqs_message['ReceiptHandle'])
super(Channel, self).basic_ack(delivery_tag)
def _size(self, queue):
diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py
index 6659a163..6c6519c5 100644
--- a/t/unit/asynchronous/test_hub.py
+++ b/t/unit/asynchronous/test_hub.py
@@ -236,6 +236,7 @@ class test_Hub:
poller = self.hub.poller
self.hub.stop()
+ self.hub._ready = set()
self.hub.close()
poller.close.assert_called_with()
@@ -243,6 +244,7 @@ class test_Hub:
self.hub = Hub()
assert self.hub.poller
self.hub.stop()
+ self.hub._ready = set()
self.hub.close()
assert self.hub._poller is None
assert self.hub.poller, 'It should be regenerated automatically!'
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py
index 4ccf3292..f647f47d 100644
--- a/t/unit/transport/test_SQS.py
+++ b/t/unit/transport/test_SQS.py
@@ -290,6 +290,16 @@ class test_Channel:
results = self.queue(self.channel).get().payload
assert message == results
+ def test_redelivered(self):
+ self.channel.sqs.change_message_visibility = \
+ Mock(name='change_message_visibility')
+ message = {
+ 'redelivered': True,
+ 'properties': {'delivery_tag': 'test_message_id'}
+ }
+ self.channel._put(self.producer.routing_key, message)
+ self.sqs_conn_mock.change_message_visibility.assert_called_once()
+
def test_put_and_get_bulk(self):
# With QoS.prefetch_count = 0
message = 'my test message'
@@ -399,3 +409,21 @@ class test_Channel:
# called?
assert (expected_receive_messages_count ==
self.sqs_conn_mock._receive_messages_calls)
+
+ def test_basic_ack(self, ):
+ """Test that basic_ack calls the delete_message properly"""
+ message = {
+ 'sqs_message': {
+ 'ReceiptHandle': '1'
+ },
+ 'sqs_queue': 'testing_queue'
+ }
+ mock_messages = Mock()
+ mock_messages.delivery_info = message
+ self.channel.qos.append(mock_messages, 1)
+ self.channel.sqs.delete_message = Mock()
+ self.channel.basic_ack(1)
+ self.sqs_conn_mock.delete_message.assert_called_with(
+ QueueUrl=message['sqs_queue'],
+ ReceiptHandle=message['sqs_message']['ReceiptHandle']
+ )