diff options
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r-- | kombu/transport/mongodb.py | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index b923f5f4..9eef6b57 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -65,6 +65,7 @@ class BroadcastCursor: def __init__(self, cursor): self._cursor = cursor + self._offset = 0 self.purge(rewind=False) def get_size(self): @@ -77,7 +78,7 @@ class BroadcastCursor: if rewind: self._cursor.rewind() - # Fast forward the cursor past old events + # Fast-forward the cursor past old events self._offset = self._cursor.collection.count_documents({}) self._cursor = self._cursor.skip(self._offset) @@ -221,7 +222,7 @@ class Channel(virtual.Channel): if queue in self._fanout_queues: self._get_broadcast_cursor(queue).purge() else: - self.messages.remove({'queue': queue}) + self.messages.delete_many({'queue': queue}) return size @@ -257,10 +258,10 @@ class Channel(virtual.Channel): self.routing.update_one(lookup, {'$set': data}, upsert=True) def queue_delete(self, queue, **kwargs): - self.routing.remove({'queue': queue}) + self.routing.delete_many({'queue': queue}) if self.ttl: - self.queues.remove({'_id': queue}) + self.queues.delete_one({'_id': queue}) super().queue_delete(queue, **kwargs) |