diff options
author | James Saryerwinnie <js@jamesls.com> | 2013-01-10 13:51:21 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-01-11 15:07:47 +0000 |
commit | fdf962b63e4174d1547ade38c77778fa79f66a53 (patch) | |
tree | 5881e0a2b78967dbe177bc1f28ac6d977d434c2e | |
parent | e93424dcb16e73f42f803bc842b74f8a8fd52e18 (diff) | |
download | kombu-fdf962b63e4174d1547ade38c77778fa79f66a53.tar.gz |
Add long-polling support to SQS transport
The polling interval was set to 0 and a new
transport option (wait_time_seconds) was added. This
parameter specifies how long to wait for a message from
SQS, and defaults to 20 seconds, which is the maximum
value currently allowed by SQS.
Fixes #176. Closes #198.
-rw-r--r-- | kombu/transport/SQS.py | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 57278f3e..cbe0f912 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -122,6 +122,8 @@ class Channel(virtual.Channel): default_region = 'us-east-1' default_visibility_timeout = 1800 # 30 minutes. + # 20 seconds is the max value currently supported by SQS. + default_wait_time_seconds = 20 domain_format = 'kombu%(vhost)s' _sdb = None _sqs = None @@ -226,7 +228,7 @@ class Channel(virtual.Channel): def _get(self, queue): """Try to retrieve a single message off ``queue``.""" q = self._new_queue(queue) - rs = q.get_messages(1) + rs = q.get_messages(1, wait_time_seconds=self.wait_time_seconds) if rs: m = rs[0] payload = loads(rs[0].get_body()) @@ -338,11 +340,17 @@ class Channel(virtual.Channel): def region(self): return self.transport_options.get('region') or self.default_region + @cached_property + def wait_time_seconds(self): + return (self.transport_options.get('wait_time_seconds') or + self.default_wait_time_seconds) + class Transport(virtual.Transport): Channel = Channel - polling_interval = 1 + polling_interval = 0 + wait_time_seconds = 20 default_port = None connection_errors = (StdConnectionError, exception.SQSError, socket.error) channel_errors = (exception.SQSDecodeError, StdChannelError) |