summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Saryerwinnie <js@jamesls.com>2013-01-10 13:51:21 -0800
committerAsk Solem <ask@celeryproject.org>2013-01-11 15:07:47 +0000
commitfdf962b63e4174d1547ade38c77778fa79f66a53 (patch)
tree5881e0a2b78967dbe177bc1f28ac6d977d434c2e
parente93424dcb16e73f42f803bc842b74f8a8fd52e18 (diff)
downloadkombu-fdf962b63e4174d1547ade38c77778fa79f66a53.tar.gz
Add long-polling support to SQS transport
The polling interval was set to 0 and a new transport option (wait_time_seconds) was added. This parameter specifies how long to wait for a message from SQS, and defaults to 20 seconds, which is the maximum value currently allowed by SQS. Fixes #176. Closes #198.
-rw-r--r--kombu/transport/SQS.py12
1 files changed, 10 insertions, 2 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 57278f3e..cbe0f912 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -122,6 +122,8 @@ class Channel(virtual.Channel):
default_region = 'us-east-1'
default_visibility_timeout = 1800 # 30 minutes.
+ # 20 seconds is the max value currently supported by SQS.
+ default_wait_time_seconds = 20
domain_format = 'kombu%(vhost)s'
_sdb = None
_sqs = None
@@ -226,7 +228,7 @@ class Channel(virtual.Channel):
def _get(self, queue):
"""Try to retrieve a single message off ``queue``."""
q = self._new_queue(queue)
- rs = q.get_messages(1)
+ rs = q.get_messages(1, wait_time_seconds=self.wait_time_seconds)
if rs:
m = rs[0]
payload = loads(rs[0].get_body())
@@ -338,11 +340,17 @@ class Channel(virtual.Channel):
def region(self):
return self.transport_options.get('region') or self.default_region
+ @cached_property
+ def wait_time_seconds(self):
+ return (self.transport_options.get('wait_time_seconds') or
+ self.default_wait_time_seconds)
+
class Transport(virtual.Transport):
Channel = Channel
- polling_interval = 1
+ polling_interval = 0
+ wait_time_seconds = 20
default_port = None
connection_errors = (StdConnectionError, exception.SQSError, socket.error)
channel_errors = (exception.SQSDecodeError, StdChannelError)