summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancis Charette-Migneault <francis.charette.migneault@gmail.com>2023-04-18 04:54:17 +0000
committerGitHub <noreply@github.com>2023-04-18 10:54:17 +0600
commitf86f1fc6e1caab6bcd2351e3b95424cece6015e4 (patch)
treec9e1e1e4a50961c941ce6a22f117184309a0f4be
parent0dd813a81997a42a1a40477641a5349116662b70 (diff)
downloadkombu-f86f1fc6e1caab6bcd2351e3b95424cece6015e4.tar.gz
fix mongodb transport obsolete calls (#1694)
* fix mongodb transport obsolete calls + add test mock specs to check valid methods * fix linting
-rw-r--r--kombu/transport/mongodb.py9
-rw-r--r--t/unit/transport/test_mongodb.py74
2 files changed, 64 insertions, 19 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index b923f5f4..9eef6b57 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -65,6 +65,7 @@ class BroadcastCursor:
def __init__(self, cursor):
self._cursor = cursor
+ self._offset = 0
self.purge(rewind=False)
def get_size(self):
@@ -77,7 +78,7 @@ class BroadcastCursor:
if rewind:
self._cursor.rewind()
- # Fast forward the cursor past old events
+ # Fast-forward the cursor past old events
self._offset = self._cursor.collection.count_documents({})
self._cursor = self._cursor.skip(self._offset)
@@ -221,7 +222,7 @@ class Channel(virtual.Channel):
if queue in self._fanout_queues:
self._get_broadcast_cursor(queue).purge()
else:
- self.messages.remove({'queue': queue})
+ self.messages.delete_many({'queue': queue})
return size
@@ -257,10 +258,10 @@ class Channel(virtual.Channel):
self.routing.update_one(lookup, {'$set': data}, upsert=True)
def queue_delete(self, queue, **kwargs):
- self.routing.remove({'queue': queue})
+ self.routing.delete_many({'queue': queue})
if self.ttl:
- self.queues.remove({'_id': queue})
+ self.queues.delete_one({'_id': queue})
super().queue_delete(queue, **kwargs)
diff --git a/t/unit/transport/test_mongodb.py b/t/unit/transport/test_mongodb.py
index 6bb5f1f9..bd1aee64 100644
--- a/t/unit/transport/test_mongodb.py
+++ b/t/unit/transport/test_mongodb.py
@@ -10,6 +10,15 @@ from kombu import Connection
pymongo = pytest.importorskip('pymongo')
+# must import following after above validation to avoid error
+# and skip tests if missing
+# these are used to define real spec of the corresponding mocks,
+# to ensure called methods exist in real objects
+# pylint: disable=C0413
+from pymongo.collection import Collection # isort:skip # noqa: E402
+from pymongo.database import Database # isort:skip # noqa: E402
+from kombu.transport.mongodb import BroadcastCursor # isort:skip # noqa: E402
+
def _create_mock_connection(url='', **kwargs):
from kombu.transport import mongodb
@@ -22,7 +31,9 @@ def _create_mock_connection(url='', **kwargs):
now = datetime.datetime.utcnow()
def _create_client(self):
- mock = MagicMock(name='client')
+ # not really a 'MongoClient',
+ # but an actual pre-established Database connection
+ mock = MagicMock(name='client', spec=Database)
# we need new mock object for every collection
def get_collection(name):
@@ -30,7 +41,9 @@ def _create_mock_connection(url='', **kwargs):
return self.collections[name]
except KeyError:
mock = self.collections[name] = MagicMock(
- name='collection:%s' % name)
+ name='collection:%s' % name,
+ spec=Collection,
+ )
return mock
@@ -108,7 +121,7 @@ class BaseMongoDBChannelCase:
else:
method.side_effect = values
- def declare_droadcast_queue(self, queue):
+ def declare_broadcast_queue(self, queue):
self.channel.exchange_declare('fanout_exchange', type='fanout')
self.channel._queue_bind('fanout_exchange', 'foo', '*', queue)
@@ -119,9 +132,9 @@ class BaseMongoDBChannelCase:
return self.channel._broadcast_cursors[queue]
def set_broadcast_return_value(self, queue, *values):
- self.declare_droadcast_queue(queue)
+ self.declare_broadcast_queue(queue)
- cursor = MagicMock(name='cursor')
+ cursor = MagicMock(name='cursor', spec=BroadcastCursor)
cursor.__iter__.return_value = iter(values)
self.channel._broadcast_cursors[queue]._cursor = iter(cursor)
@@ -200,7 +213,7 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
})
def test_put_fanout(self):
- self.declare_droadcast_queue('foobar')
+ self.declare_broadcast_queue('foobar')
self.channel._put_fanout('foobar', {'some': 'data'}, 'foo')
@@ -221,9 +234,9 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
assert result == 77
def test_size_fanout(self):
- self.declare_droadcast_queue('foobar')
+ self.declare_broadcast_queue('foobar')
- cursor = MagicMock(name='cursor')
+ cursor = MagicMock(name='cursor', spec=BroadcastCursor)
cursor.get_size.return_value = 77
self.channel._broadcast_cursors['foobar'] = cursor
@@ -237,15 +250,15 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
result = self.channel._purge('foobar')
self.assert_collection_accessed('messages')
self.assert_operation_called_with(
- 'messages', 'remove', {'queue': 'foobar'},
+ 'messages', 'delete_many', {'queue': 'foobar'},
)
assert result == 77
def test_purge_fanout(self):
- self.declare_droadcast_queue('foobar')
+ self.declare_broadcast_queue('foobar')
- cursor = MagicMock(name='cursor')
+ cursor = MagicMock(name='cursor', spec=BroadcastCursor)
cursor.get_size.return_value = 77
self.channel._broadcast_cursors['foobar'] = cursor
@@ -293,13 +306,13 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
self.channel.queue_delete('foobar')
self.assert_collection_accessed('messages.routing')
self.assert_operation_called_with(
- 'routing', 'remove', {'queue': 'foobar'},
+ 'routing', 'delete_many', {'queue': 'foobar'},
)
def test_queue_delete_fanout(self):
- self.declare_droadcast_queue('foobar')
+ self.declare_broadcast_queue('foobar')
- cursor = MagicMock(name='cursor')
+ cursor = MagicMock(name='cursor', spec=BroadcastCursor)
self.channel._broadcast_cursors['foobar'] = cursor
self.channel.queue_delete('foobar')
@@ -318,6 +331,28 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
'messages.broadcast', capped=True, size=100000,
)
+ def test_create_broadcast_exists(self):
+ # simulate already created collection
+ self.channel.client.list_collection_names.return_value = [
+ 'messages.broadcast'
+ ]
+
+ broadcast = self.channel._create_broadcast(self.channel.client)
+ self.channel.client.create_collection.assert_not_called()
+ assert broadcast is None # not returned since not created
+
+ def test_get_broadcast_cursor_created(self):
+ self.channel._fanout_queues['foobar'] = 'fanout_exchange'
+ created_cursor = self.channel._get_broadcast_cursor('foobar')
+ cached_cursor = self.channel._broadcast_cursors['foobar']
+ assert cached_cursor is created_cursor
+
+ def test_get_broadcast_cursor_exists(self):
+ self.declare_broadcast_queue('foobar')
+ cached_cursor = self.channel._broadcast_cursors['foobar']
+ getter_cursor = self.channel._get_broadcast_cursor('foobar')
+ assert cached_cursor is getter_cursor
+
def test_ensure_indexes(self):
self.channel._ensure_indexes(self.channel.client)
@@ -455,7 +490,7 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
self.channel.queue_delete('foobar')
self.assert_collection_accessed('messages.queues')
self.assert_operation_called_with(
- 'queues', 'remove', {'_id': 'foobar'})
+ 'queues', 'delete_one', {'_id': 'foobar'})
def test_ensure_indexes(self):
self.channel._ensure_indexes(self.channel.client)
@@ -531,3 +566,12 @@ class test_mongodb_channel_calc_queue_size(BaseMongoDBChannelCase):
self.assert_operation_has_calls('messages', 'find', [])
assert result == 0
+
+
+class test_mongodb_transport(BaseMongoDBChannelCase):
+ def setup(self):
+ self.connection = _create_mock_connection()
+
+ def test_driver_version(self):
+ version = self.connection.transport.driver_version()
+ assert version == pymongo.__version__