summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Koshelev <daevaorn@gmail.com>2014-05-08 09:06:49 +0400
committerAlex Koshelev <daevaorn@gmail.com>2014-05-08 09:06:49 +0400
commit3dd3d71eb6ecb422ba73176ce3ed6ae62337d4f0 (patch)
treecf5d6f03650a48344d10e318be49708b5dd8d5e2
parentddb320e9f34deacf283e582895f4bee133a7a22b (diff)
downloadkombu-3dd3d71eb6ecb422ba73176ce3ed6ae62337d4f0.tar.gz
Priority support for MongoDB transport. Transport table update. Priority
conversion unification.
-rw-r--r--README.rst51
-rw-r--r--docs/userguide/connections.rst50
-rw-r--r--kombu/tests/transport/virtual/test_base.py16
-rw-r--r--kombu/transport/beanstalk.py2
-rw-r--r--kombu/transport/mongodb.py9
-rw-r--r--kombu/transport/redis.py7
-rw-r--r--kombu/transport/virtual/__init__.py20
-rw-r--r--kombu/transport/zookeeper.py8
8 files changed, 98 insertions, 65 deletions
diff --git a/README.rst b/README.rst
index 8b4adf80..1012c0de 100644
--- a/README.rst
+++ b/README.rst
@@ -81,31 +81,31 @@ and the `Wikipedia article about AMQP`_.
Transport Comparison
====================
-+---------------+----------+------------+------------+---------------+
-| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
-+---------------+----------+------------+------------+---------------+
-| *amqp* | Native | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
-+---------------+----------+------------+------------+---------------+
-| *mongodb* | Virtual | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
-+---------------+----------+------------+------------+---------------+
-| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *django* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
++---------------+----------+------------+------------+---------------+--------------+
+| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
++---------------+----------+------------+------------+---------------+--------------+
+| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
++---------------+----------+------------+------------+---------------+--------------+
+| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
@@ -115,6 +115,7 @@ Transport Comparison
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
+.. [#f3] AMQP Message priority support depends on broker implementation.
Documentation
-------------
diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst
index f97b4b79..38037a29 100644
--- a/docs/userguide/connections.rst
+++ b/docs/userguide/connections.rst
@@ -145,29 +145,31 @@ transport URL, or use ``amqp`` to have the fallback.
Transport Comparison
====================
-+---------------+----------+------------+------------+---------------+
-| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
-+---------------+----------+------------+------------+---------------+
-| *amqp* | Native | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
-+---------------+----------+------------+------------+---------------+
-| *mongodb* | Virtual | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
-+---------------+----------+------------+------------+---------------+
-| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *django* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
++---------------+----------+------------+------------+---------------+--------------+
+| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
++---------------+----------+------------+------------+---------------+--------------+
+| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
++---------------+----------+------------+------------+---------------+--------------+
+| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
@@ -176,3 +178,5 @@ Transport Comparison
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
+
+.. [#f3] AMQP Message priority support depends on broker implementation.
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index d249c4e7..bdc7382b 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -515,6 +515,22 @@ class test_Channel(Case):
with self.assertRaises(ChannelError):
self.channel.queue_declare(queue='21wisdjwqe', passive=True)
+ def test_get_message_priority(self):
+ def _message(priority):
+ return self.channel.prepare_message('the message with priority',
+ priority=priority)
+
+ self.assertEqual(self.channel._get_message_priority(_message(5)),
+ 5)
+ self.assertEqual(self.channel._get_message_priority(_message(self.channel.min_priority - 10)),
+ self.channel.min_priority)
+ self.assertEqual(self.channel._get_message_priority(_message(self.channel.max_priority + 10)),
+ self.channel.max_priority)
+ self.assertEqual(self.channel._get_message_priority(_message('foobar')),
+ self.channel.default_priority)
+ self.assertEqual(self.channel._get_message_priority(_message(2), reverse=True),
+ self.channel.max_priority - 2)
+
class test_Transport(Case):
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index 9dff8b49..544fd438 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -44,7 +44,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
extra = {}
- priority = message['properties']['delivery_info']['priority']
+ priority = self._get_message_priority(message)
ttr = message['properties'].get('ttr')
if ttr is not None:
extra['ttr'] = ttr
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 78af0f9f..5f726d09 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -99,7 +99,8 @@ class Channel(virtual.Channel):
else:
msg = self.get_messages().find_and_modify(
query={'queue': queue},
- sort={'_id': pymongo.ASCENDING},
+ sort=[('priority', pymongo.ASCENDING),
+ ('_id', pymongo.ASCENDING)],
remove=True,
)
@@ -116,7 +117,9 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
self.get_messages().insert({'payload': dumps(message),
- 'queue': queue})
+ 'queue': queue,
+ 'priority': self._get_message_priority(message,
+ reverse=True)})
def _purge(self, queue):
size = self._size(queue)
@@ -202,7 +205,7 @@ class Channel(virtual.Channel):
def _ensure_indexes(self):
'''Ensure indexes on collections.'''
self.get_messages().ensure_index(
- [('queue', 1), ('_id', 1)], background=True,
+ [('queue', 1), ('priority', 1), ('_id', 1)], background=True,
)
self.get_broadcast().ensure_index([('queue', 1)])
self.get_routing().ensure_index([('queue', 1), ('exchange', 1)])
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 3ee049cb..164c8c2c 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -662,11 +662,8 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
"""Deliver message."""
- try:
- pri = max(min(int(
- message['properties']['delivery_info']['priority']), 9), 0)
- except (TypeError, ValueError, KeyError):
- pri = 0
+ pri = self._get_message_priority(message)
+
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index cb844de9..2e34cab7 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -367,6 +367,11 @@ class Channel(AbstractChannel, base.StdChannel):
# List of options to transfer from :attr:`transport_options`.
from_transport_options = ('body_encoding', 'deadletter_queue')
+ # Priority defaults
+ default_priority = 0
+ min_priority = 0
+ max_priority = 9
+
def __init__(self, connection, **kwargs):
self.connection = connection
self._consumers = set()
@@ -653,7 +658,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Prepare message data."""
properties = properties or {}
info = properties.setdefault('delivery_info', {})
- info['priority'] = priority or 0
+ info['priority'] = priority or self.default_priority
return {'body': body,
'content-encoding': content_encoding,
@@ -723,6 +728,19 @@ class Channel(AbstractChannel, base.StdChannel):
self._reset_cycle()
return self._cycle
+ def _get_message_priority(self, message, reverse=False):
+ """Gets priority from message and converts it to the bounds: [0, 9].
+ Higher value has more priority.
+ """
+ try:
+ priority = max(min(int(message['properties']['delivery_info']['priority']),
+ self.max_priority),
+ self.min_priority)
+ except (TypeError, ValueError, KeyError):
+ priority = self.default_priority
+
+ return (self.max_priority - priority) if reverse else priority
+
class Management(base.Management):
diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py
index 2d1c8abc..6645507a 100644
--- a/kombu/transport/zookeeper.py
+++ b/kombu/transport/zookeeper.py
@@ -37,7 +37,6 @@ from kombu.utils.encoding import bytes_to_str
from . import virtual
-MAX_PRIORITY = 9
try:
import kazoo
@@ -103,13 +102,8 @@ class Channel(virtual.Channel):
return queue
def _put(self, queue, message, **kwargs):
- try:
- priority = message['properties']['delivery_info']['priority']
- except KeyError:
- priority = 0
-
queue = self._get_queue(queue)
- queue.put(dumps(message), priority=(MAX_PRIORITY - priority))
+ queue.put(dumps(message), priority=self._get_message_priority(message, reverse=True))
def _get(self, queue):
queue = self._get_queue(queue)