diff options
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r-- | kombu/transport/mongodb.py | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 7620a2e6..692cb189 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -78,6 +78,8 @@ class BroadcastCursor(object): class Channel(virtual.Channel): + """MongoDB Channel.""" + supports_fanout = True # Mutable container. Shared by all class instances @@ -318,8 +320,7 @@ class Channel(virtual.Channel): capped=True) def _ensure_indexes(self, database): - """Ensure indexes on collections. - """ + """Ensure indexes on collections.""" messages = database[self.messages_collection] messages.ensure_index( [('queue', 1), ('priority', 1), ('_id', 1)], background=True, @@ -395,8 +396,11 @@ class Channel(virtual.Channel): return ret def _get_expire(self, queue, argument): - """Gets expiration header named `argument` of queue definition. - `queue` must be either queue name or options itself.""" + """Get expiration header named `argument` of queue definition. + + Note: + `queue` must be either queue name or options itself. + """ if isinstance(queue, basestring): doc = self.queues.find_one({'_id': queue}) @@ -415,7 +419,7 @@ class Channel(virtual.Channel): return self.get_now() + datetime.timedelta(milliseconds=value) def _update_queues_expire(self, queue): - """Updates expiration field on queues documents.""" + """Update expiration field on queues documents.""" expire_at = self._get_expire(queue, 'x-expires') if not expire_at: @@ -434,6 +438,8 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): + """MongoDB Transport.""" + Channel = Channel can_parse_url = True |