summaryrefslogtreecommitdiff
path: root/kombu/transport/mongodb.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r--kombu/transport/mongodb.py9
1 files changed, 5 insertions, 4 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index b923f5f4..9eef6b57 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -65,6 +65,7 @@ class BroadcastCursor:
def __init__(self, cursor):
self._cursor = cursor
+ self._offset = 0
self.purge(rewind=False)
def get_size(self):
@@ -77,7 +78,7 @@ class BroadcastCursor:
if rewind:
self._cursor.rewind()
- # Fast forward the cursor past old events
+ # Fast-forward the cursor past old events
self._offset = self._cursor.collection.count_documents({})
self._cursor = self._cursor.skip(self._offset)
@@ -221,7 +222,7 @@ class Channel(virtual.Channel):
if queue in self._fanout_queues:
self._get_broadcast_cursor(queue).purge()
else:
- self.messages.remove({'queue': queue})
+ self.messages.delete_many({'queue': queue})
return size
@@ -257,10 +258,10 @@ class Channel(virtual.Channel):
self.routing.update_one(lookup, {'$set': data}, upsert=True)
def queue_delete(self, queue, **kwargs):
- self.routing.remove({'queue': queue})
+ self.routing.delete_many({'queue': queue})
if self.ttl:
- self.queues.remove({'_id': queue})
+ self.queues.delete_one({'_id': queue})
super().queue_delete(queue, **kwargs)