summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKrzysztof Jagiello <me@kjagiello.com>2020-06-25 12:03:48 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2020-07-01 21:31:40 +0600
commit016f4accb634ebc3eb8fc66741f23552e5966ea7 (patch)
tree402d17e6c3583c7d472f681e449f3848860d6fba
parent90a12a1ad6d00687d90e57d4888c203522e6e0cb (diff)
downloadkombu-016f4accb634ebc3eb8fc66741f23552e5966ea7.tar.gz
Add an SQS transport option for custom botocore config
-rw-r--r--kombu/transport/SQS.py18
-rw-r--r--t/unit/transport/test_SQS.py13
2 files changed, 30 insertions, 1 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 9af8aa17..768b93d0 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -58,6 +58,19 @@ Other Features supported by this transport:
},
}
}
+
+ Client config:
+ In some cases you may need to override the botocore config. You can do it
+ as follows:
+
+ transport_option = {
+ 'client-config': {
+ 'connect_timeout': 5,
+ },
+ }
+
+ For a complete list of settings you can adjust using this option see
+ https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
""" # noqa: E501
from __future__ import absolute_import, unicode_literals
@@ -67,6 +80,7 @@ import socket
import string
import uuid
+from botocore.client import Config
from botocore.exceptions import ClientError
from vine import transform, ensure_promise, promise
@@ -518,7 +532,9 @@ class Channel(virtual.Channel):
}
if self.endpoint_url is not None:
client_kwargs['endpoint_url'] = self.endpoint_url
- return session.client('sqs', **client_kwargs)
+ client_config = self.transport_options.get('client-config') or {}
+ config = Config(**client_config)
+ return session.client('sqs', config=config, **client_kwargs)
def sqs(self, queue=None):
if queue is not None and self.predefined_queues:
diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py
index 307dadbc..09e2dc5d 100644
--- a/t/unit/transport/test_SQS.py
+++ b/t/unit/transport/test_SQS.py
@@ -275,6 +275,19 @@ class test_Channel:
# For cleanup purposes, delete the queue and the queue file
self.channel._delete(queue_name)
+ def test_botocore_config_override(self):
+ expected_connect_timeout = 5
+ client_config = {'connect_timeout': expected_connect_timeout}
+ self.connection = Connection(
+ transport=SQS.Transport,
+ transport_options={'client-config': client_config},
+ )
+ self.channel = self.connection.channel()
+ self.channel._sqs = None
+ boto3_sqs = SQS_Channel_sqs.__get__(self.channel, SQS.Channel)
+ botocore_config = boto3_sqs()._client_config
+ assert botocore_config.connect_timeout == expected_connect_timeout
+
def test_dont_create_duplicate_new_queue(self):
# All queue names start with "q", except "unittest_queue".
# which is definitely out of cache when get_all_queues returns the