summaryrefslogtreecommitdiff
path: root/kombu/transport
diff options
context:
space:
mode:
authorn0061q <95093640+n0061q@users.noreply.github.com>2021-12-03 10:46:41 +0200
committerGitHub <noreply@github.com>2021-12-03 09:46:41 +0100
commit081e9e6cfff944acd14f9bd1f810d227a069a01c (patch)
tree0d6debabe1a31fffe4321bcf554e27f591522ac0 /kombu/transport
parenta2d7cfd8aeab64c87eb0301b0f570eff9bffddc4 (diff)
downloadkombu-081e9e6cfff944acd14f9bd1f810d227a069a01c.tar.gz
SQS transport: detect FIFO queue properly by checking queue URL (#1450)
* SQS: detect FIFO queue properly * SQS: validate predefined queues Co-authored-by: Yaroslav Litus <Yaroslav.Litus@F-Secure.com>
Diffstat (limited to 'kombu/transport')
-rw-r--r--kombu/transport/SQS.py32
1 files changed, 30 insertions, 2 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index fb6d3780..6e9b8dfa 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -59,8 +59,8 @@ exist in AWS) you can tell this transport about them as follows:
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
'backoff_tasks': ['svc.tasks.tasks.task1'] # optional
},
- 'queue-2': {
- 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
+ 'queue-2.fifo': {
+ 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb.fifo',
'access_key_id': 'c',
'secret_access_key': 'd',
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
@@ -71,6 +71,9 @@ exist in AWS) you can tell this transport about them as follows:
'sts_token_timeout': 900 # optional
}
+Note that FIFO and standard queues must be named accordingly (the name of
+a FIFO queue must end with the .fifo suffix).
+
backoff_policy & backoff_tasks are optional arguments. These arguments
automatically change the message visibility timeout, in order to have
different times between specific task retries. This would apply after
@@ -167,6 +170,10 @@ class UndefinedQueueException(Exception):
"""Predefined queues are being used and an undefined queue was used."""
+class InvalidQueueException(Exception):
+ """Predefined queues are being used and configuration is not valid."""
+
+
class QoS(virtual.QoS):
"""Quality of Service guarantees implementation for SQS."""
@@ -237,6 +244,7 @@ class Channel(virtual.Channel):
if boto3 is None:
raise ImportError('boto3 is not installed')
super().__init__(*args, **kwargs)
+ self._validate_predifined_queues()
# SQS blows up if you try to create a new queue when one already
# exists but with a different visibility_timeout. This prepopulates
@@ -246,6 +254,26 @@ class Channel(virtual.Channel):
self.hub = kwargs.get('hub') or get_event_loop()
+ def _validate_predifined_queues(self):
+ """Check that standard and FIFO queues are named properly.
+
+ AWS requires FIFO queues to have a name
+ that ends with the .fifo suffix.
+ """
+ for queue_name, q in self.predefined_queues.items():
+ fifo_url = q['url'].endswith('.fifo')
+ fifo_name = queue_name.endswith('.fifo')
+ if fifo_url and not fifo_name:
+ raise InvalidQueueException(
+ "Queue with url '{}' must have a name "
+ "ending with .fifo".format(q['url'])
+ )
+ elif not fifo_url and fifo_name:
+ raise InvalidQueueException(
+ "Queue with name '{}' is not a FIFO queue: "
+ "'{}'".format(queue_name, q['url'])
+ )
+
def _update_queue_cache(self, queue_name_prefix):
if self.predefined_queues:
for queue_name, q in self.predefined_queues.items():