diff options
author | Francis Charette-Migneault <francis.charette.migneault@gmail.com> | 2023-04-18 04:54:17 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-18 10:54:17 +0600 |
commit | f86f1fc6e1caab6bcd2351e3b95424cece6015e4 (patch) | |
tree | c9e1e1e4a50961c941ce6a22f117184309a0f4be | |
parent | 0dd813a81997a42a1a40477641a5349116662b70 (diff) | |
download | kombu-f86f1fc6e1caab6bcd2351e3b95424cece6015e4.tar.gz |
fix mongodb transport obsolete calls (#1694)
* fix mongodb transport obsolete calls + add test mock specs to check valid methods
* fix linting
-rw-r--r-- | kombu/transport/mongodb.py | 9 | ||||
-rw-r--r-- | t/unit/transport/test_mongodb.py | 74 |
2 files changed, 64 insertions, 19 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index b923f5f4..9eef6b57 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -65,6 +65,7 @@ class BroadcastCursor: def __init__(self, cursor): self._cursor = cursor + self._offset = 0 self.purge(rewind=False) def get_size(self): @@ -77,7 +78,7 @@ class BroadcastCursor: if rewind: self._cursor.rewind() - # Fast forward the cursor past old events + # Fast-forward the cursor past old events self._offset = self._cursor.collection.count_documents({}) self._cursor = self._cursor.skip(self._offset) @@ -221,7 +222,7 @@ class Channel(virtual.Channel): if queue in self._fanout_queues: self._get_broadcast_cursor(queue).purge() else: - self.messages.remove({'queue': queue}) + self.messages.delete_many({'queue': queue}) return size @@ -257,10 +258,10 @@ class Channel(virtual.Channel): self.routing.update_one(lookup, {'$set': data}, upsert=True) def queue_delete(self, queue, **kwargs): - self.routing.remove({'queue': queue}) + self.routing.delete_many({'queue': queue}) if self.ttl: - self.queues.remove({'_id': queue}) + self.queues.delete_one({'_id': queue}) super().queue_delete(queue, **kwargs) diff --git a/t/unit/transport/test_mongodb.py b/t/unit/transport/test_mongodb.py index 6bb5f1f9..bd1aee64 100644 --- a/t/unit/transport/test_mongodb.py +++ b/t/unit/transport/test_mongodb.py @@ -10,6 +10,15 @@ from kombu import Connection pymongo = pytest.importorskip('pymongo') +# must import following after above validation to avoid error +# and skip tests if missing +# these are used to define real spec of the corresponding mocks, +# to ensure called methods exist in real objects +# pylint: disable=C0413 +from pymongo.collection import Collection # isort:skip # noqa: E402 +from pymongo.database import Database # isort:skip # noqa: E402 +from kombu.transport.mongodb import BroadcastCursor # isort:skip # noqa: E402 + def _create_mock_connection(url='', **kwargs): from kombu.transport import mongodb @@ -22,7 +31,9 @@ def _create_mock_connection(url='', **kwargs): now = datetime.datetime.utcnow() def _create_client(self): - mock = MagicMock(name='client') + # not really a 'MongoClient', + # but an actual pre-established Database connection + mock = MagicMock(name='client', spec=Database) # we need new mock object for every collection def get_collection(name): @@ -30,7 +41,9 @@ def _create_mock_connection(url='', **kwargs): return self.collections[name] except KeyError: mock = self.collections[name] = MagicMock( - name='collection:%s' % name) + name='collection:%s' % name, + spec=Collection, + ) return mock @@ -108,7 +121,7 @@ class BaseMongoDBChannelCase: else: method.side_effect = values - def declare_droadcast_queue(self, queue): + def declare_broadcast_queue(self, queue): self.channel.exchange_declare('fanout_exchange', type='fanout') self.channel._queue_bind('fanout_exchange', 'foo', '*', queue) @@ -119,9 +132,9 @@ class BaseMongoDBChannelCase: return self.channel._broadcast_cursors[queue] def set_broadcast_return_value(self, queue, *values): - self.declare_droadcast_queue(queue) + self.declare_broadcast_queue(queue) - cursor = MagicMock(name='cursor') + cursor = MagicMock(name='cursor', spec=BroadcastCursor) cursor.__iter__.return_value = iter(values) self.channel._broadcast_cursors[queue]._cursor = iter(cursor) @@ -200,7 +213,7 @@ class test_mongodb_channel(BaseMongoDBChannelCase): }) def test_put_fanout(self): - self.declare_droadcast_queue('foobar') + self.declare_broadcast_queue('foobar') self.channel._put_fanout('foobar', {'some': 'data'}, 'foo') @@ -221,9 +234,9 @@ class test_mongodb_channel(BaseMongoDBChannelCase): assert result == 77 def test_size_fanout(self): - self.declare_droadcast_queue('foobar') + self.declare_broadcast_queue('foobar') - cursor = MagicMock(name='cursor') + cursor = MagicMock(name='cursor', spec=BroadcastCursor) cursor.get_size.return_value = 77 self.channel._broadcast_cursors['foobar'] = cursor @@ -237,15 +250,15 @@ class test_mongodb_channel(BaseMongoDBChannelCase): result = self.channel._purge('foobar') self.assert_collection_accessed('messages') self.assert_operation_called_with( - 'messages', 'remove', {'queue': 'foobar'}, + 'messages', 'delete_many', {'queue': 'foobar'}, ) assert result == 77 def test_purge_fanout(self): - self.declare_droadcast_queue('foobar') + self.declare_broadcast_queue('foobar') - cursor = MagicMock(name='cursor') + cursor = MagicMock(name='cursor', spec=BroadcastCursor) cursor.get_size.return_value = 77 self.channel._broadcast_cursors['foobar'] = cursor @@ -293,13 +306,13 @@ class test_mongodb_channel(BaseMongoDBChannelCase): self.channel.queue_delete('foobar') self.assert_collection_accessed('messages.routing') self.assert_operation_called_with( - 'routing', 'remove', {'queue': 'foobar'}, + 'routing', 'delete_many', {'queue': 'foobar'}, ) def test_queue_delete_fanout(self): - self.declare_droadcast_queue('foobar') + self.declare_broadcast_queue('foobar') - cursor = MagicMock(name='cursor') + cursor = MagicMock(name='cursor', spec=BroadcastCursor) self.channel._broadcast_cursors['foobar'] = cursor self.channel.queue_delete('foobar') @@ -318,6 +331,28 @@ class test_mongodb_channel(BaseMongoDBChannelCase): 'messages.broadcast', capped=True, size=100000, ) + def test_create_broadcast_exists(self): + # simulate already created collection + self.channel.client.list_collection_names.return_value = [ + 'messages.broadcast' + ] + + broadcast = self.channel._create_broadcast(self.channel.client) + self.channel.client.create_collection.assert_not_called() + assert broadcast is None # not returned since not created + + def test_get_broadcast_cursor_created(self): + self.channel._fanout_queues['foobar'] = 'fanout_exchange' + created_cursor = self.channel._get_broadcast_cursor('foobar') + cached_cursor = self.channel._broadcast_cursors['foobar'] + assert cached_cursor is created_cursor + + def test_get_broadcast_cursor_exists(self): + self.declare_broadcast_queue('foobar') + cached_cursor = self.channel._broadcast_cursors['foobar'] + getter_cursor = self.channel._get_broadcast_cursor('foobar') + assert cached_cursor is getter_cursor + def test_ensure_indexes(self): self.channel._ensure_indexes(self.channel.client) @@ -455,7 +490,7 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): self.channel.queue_delete('foobar') self.assert_collection_accessed('messages.queues') self.assert_operation_called_with( - 'queues', 'remove', {'_id': 'foobar'}) + 'queues', 'delete_one', {'_id': 'foobar'}) def test_ensure_indexes(self): self.channel._ensure_indexes(self.channel.client) @@ -531,3 +566,12 @@ class test_mongodb_channel_calc_queue_size(BaseMongoDBChannelCase): self.assert_operation_has_calls('messages', 'find', []) assert result == 0 + + +class test_mongodb_transport(BaseMongoDBChannelCase): + def setup(self): + self.connection = _create_mock_connection() + + def test_driver_version(self): + version = self.connection.transport.driver_version() + assert version == pymongo.__version__ |