diff options
author | n0061q <95093640+n0061q@users.noreply.github.com> | 2021-12-03 10:46:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-03 09:46:41 +0100 |
commit | 081e9e6cfff944acd14f9bd1f810d227a069a01c (patch) | |
tree | 0d6debabe1a31fffe4321bcf554e27f591522ac0 /kombu/transport | |
parent | a2d7cfd8aeab64c87eb0301b0f570eff9bffddc4 (diff) | |
download | kombu-081e9e6cfff944acd14f9bd1f810d227a069a01c.tar.gz |
SQS transport: detect FIFO queue properly by checking queue URL (#1450)
* SQS: detect FIFO queue properly
* SQS: validate predefined queues
Co-authored-by: Yaroslav Litus <Yaroslav.Litus@F-Secure.com>
Diffstat (limited to 'kombu/transport')
-rw-r--r-- | kombu/transport/SQS.py | 32 |
1 files changed, 30 insertions, 2 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index fb6d3780..6e9b8dfa 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -59,8 +59,8 @@ exist in AWS) you can tell this transport about them as follows: 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional 'backoff_tasks': ['svc.tasks.tasks.task1'] # optional }, - 'queue-2': { - 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb', + 'queue-2.fifo': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb.fifo', 'access_key_id': 'c', 'secret_access_key': 'd', 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional @@ -71,6 +71,9 @@ exist in AWS) you can tell this transport about them as follows: 'sts_token_timeout': 900 # optional } +Note that FIFO and standard queues must be named accordingly (the name of +a FIFO queue must end with the .fifo suffix). + backoff_policy & backoff_tasks are optional arguments. These arguments automatically change the message visibility timeout, in order to have different times between specific task retries. This would apply after @@ -167,6 +170,10 @@ class UndefinedQueueException(Exception): """Predefined queues are being used and an undefined queue was used.""" +class InvalidQueueException(Exception): + """Predefined queues are being used and configuration is not valid.""" + + class QoS(virtual.QoS): """Quality of Service guarantees implementation for SQS.""" @@ -237,6 +244,7 @@ class Channel(virtual.Channel): if boto3 is None: raise ImportError('boto3 is not installed') super().__init__(*args, **kwargs) + self._validate_predifined_queues() # SQS blows up if you try to create a new queue when one already # exists but with a different visibility_timeout. This prepopulates @@ -246,6 +254,26 @@ class Channel(virtual.Channel): self.hub = kwargs.get('hub') or get_event_loop() + def _validate_predifined_queues(self): + """Check that standard and FIFO queues are named properly. + + AWS requires FIFO queues to have a name + that ends with the .fifo suffix. + """ + for queue_name, q in self.predefined_queues.items(): + fifo_url = q['url'].endswith('.fifo') + fifo_name = queue_name.endswith('.fifo') + if fifo_url and not fifo_name: + raise InvalidQueueException( + "Queue with url '{}' must have a name " + "ending with .fifo".format(q['url']) + ) + elif not fifo_url and fifo_name: + raise InvalidQueueException( + "Queue with name '{}' is not a FIFO queue: " + "'{}'".format(queue_name, q['url']) + ) + def _update_queue_cache(self, queue_name_prefix): if self.predefined_queues: for queue_name, q in self.predefined_queues.items(): |