From fdf962b63e4174d1547ade38c77778fa79f66a53 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Thu, 10 Jan 2013 13:51:21 -0800 Subject: Add long-polling support to SQS transport The polling interval was set to 0 and a new transport option (wait_time_seconds) was added. This parameter specifies how long to wait for a message from SQS, and defaults to 20 seconds, which is the maximum value currently allowed by SQS. Fixes #176. Closes #198. --- kombu/transport/SQS.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 57278f3e..cbe0f912 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -122,6 +122,8 @@ class Channel(virtual.Channel): default_region = 'us-east-1' default_visibility_timeout = 1800 # 30 minutes. + # 20 seconds is the max value currently supported by SQS. + default_wait_time_seconds = 20 domain_format = 'kombu%(vhost)s' _sdb = None _sqs = None @@ -226,7 +228,7 @@ class Channel(virtual.Channel): def _get(self, queue): """Try to retrieve a single message off ``queue``.""" q = self._new_queue(queue) - rs = q.get_messages(1) + rs = q.get_messages(1, wait_time_seconds=self.wait_time_seconds) if rs: m = rs[0] payload = loads(rs[0].get_body()) @@ -338,11 +340,17 @@ class Channel(virtual.Channel): def region(self): return self.transport_options.get('region') or self.default_region + @cached_property + def wait_time_seconds(self): + return (self.transport_options.get('wait_time_seconds') or + self.default_wait_time_seconds) + class Transport(virtual.Transport): Channel = Channel - polling_interval = 1 + polling_interval = 0 + wait_time_seconds = 20 default_port = None connection_errors = (StdConnectionError, exception.SQSError, socket.error) channel_errors = (exception.SQSDecodeError, StdChannelError) -- cgit v1.2.1