summaryrefslogtreecommitdiff
path: root/kombu/transport/SQS.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/SQS.py')
-rw-r--r--kombu/transport/SQS.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index c3e19203..b4d5093b 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -209,7 +209,14 @@ class Channel(virtual.Channel):
message['properties']['MessageDeduplicationId']
else:
kwargs['MessageDeduplicationId'] = str(uuid.uuid4())
- self.sqs.send_message(**kwargs)
+ if message.get('redelivered'):
+ self.sqs.change_message_visibility(
+ QueueUrl=q_url,
+ ReceiptHandle=message['properties']['delivery_tag'],
+ VisibilityTimeout=0
+ )
+ else:
+ self.sqs.send_message(**kwargs)
def _message_to_python(self, message, queue_name, queue):
body = base64.b64decode(message['Body'].encode())
@@ -377,8 +384,8 @@ class Channel(virtual.Channel):
except KeyError:
pass
else:
- self.asynsqs.delete_message(message['sqs_queue'],
- sqs_message['ReceiptHandle'])
+ self.sqs.delete_message(QueueUrl=message['sqs_queue'],
+ ReceiptHandle=sqs_message['ReceiptHandle'])
super(Channel, self).basic_ack(delivery_tag)
def _size(self, queue):