summaryrefslogtreecommitdiff
path: root/kombu/transport/mongodb.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r--kombu/transport/mongodb.py16
1 files changed, 11 insertions, 5 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 7620a2e6..692cb189 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -78,6 +78,8 @@ class BroadcastCursor(object):
class Channel(virtual.Channel):
+ """MongoDB Channel."""
+
supports_fanout = True
# Mutable container. Shared by all class instances
@@ -318,8 +320,7 @@ class Channel(virtual.Channel):
capped=True)
def _ensure_indexes(self, database):
- """Ensure indexes on collections.
- """
+ """Ensure indexes on collections."""
messages = database[self.messages_collection]
messages.ensure_index(
[('queue', 1), ('priority', 1), ('_id', 1)], background=True,
@@ -395,8 +396,11 @@ class Channel(virtual.Channel):
return ret
def _get_expire(self, queue, argument):
- """Gets expiration header named `argument` of queue definition.
- `queue` must be either queue name or options itself."""
+ """Get expiration header named `argument` of queue definition.
+
+ Note:
+ `queue` must be either queue name or options itself.
+ """
if isinstance(queue, basestring):
doc = self.queues.find_one({'_id': queue})
@@ -415,7 +419,7 @@ class Channel(virtual.Channel):
return self.get_now() + datetime.timedelta(milliseconds=value)
def _update_queues_expire(self, queue):
- """Updates expiration field on queues documents."""
+ """Update expiration field on queues documents."""
expire_at = self._get_expire(queue, 'x-expires')
if not expire_at:
@@ -434,6 +438,8 @@ class Channel(virtual.Channel):
class Transport(virtual.Transport):
+ """MongoDB Transport."""
+
Channel = Channel
can_parse_url = True