diff options
author | Jakub Pieńkowski <8525083+Jakski@users.noreply.github.com> | 2022-04-18 14:10:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-18 18:10:38 +0600 |
commit | 661d92222eb485fd9131543c8a0a224824f5e0f0 (patch) | |
tree | 67c0469003d4b7e734dd798de13bda3bb27f1992 /t | |
parent | 0f9f554b7cb9a307b07bec74688095053034fd57 (diff) | |
download | kombu-661d92222eb485fd9131543c8a0a224824f5e0f0.tar.gz |
Support pymongo 4.x (#1536)
* Support pymongo 4.x
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix problems detected by CI
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Diffstat (limited to 't')
-rw-r--r-- | t/integration/common.py | 10 | ||||
-rw-r--r-- | t/integration/test_mongodb.py | 115 | ||||
-rw-r--r-- | t/unit/transport/test_mongodb.py | 87 |
3 files changed, 170 insertions, 42 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index 0c40b6e9..82dc3f1b 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -138,14 +138,21 @@ class BaseExchangeTypes: message.delivery_info['exchange'] == '' assert message.payload == body - def _consume(self, connection, queue): + def _create_consumer(self, connection, queue): consumer = kombu.Consumer( connection, [queue], accept=['pickle'] ) consumer.register_callback(self._callback) + return consumer + + def _consume_from(self, connection, consumer): with consumer: connection.drain_events(timeout=1) + def _consume(self, connection, queue): + with self._create_consumer(connection, queue): + connection.drain_events(timeout=1) + def _publish(self, channel, exchange, queues=None, routing_key=None): producer = kombu.Producer(channel, exchange=exchange) if routing_key: @@ -215,7 +222,6 @@ class BaseExchangeTypes: channel, ex, [test_queue1, test_queue2, test_queue3], routing_key='t.1' ) - self._consume(conn, test_queue1) self._consume(conn, test_queue2) with pytest.raises(socket.timeout): diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py new file mode 100644 index 00000000..445f1389 --- /dev/null +++ b/t/integration/test_mongodb.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import os + +import pytest + +import kombu + +from .common import (BaseExchangeTypes, BaseMessage, BasePriority, + BasicFunctionality) + + +def get_connection(hostname, port, vhost): + return kombu.Connection( + f'mongodb://{hostname}:{port}/{vhost}', + transport_options={'ttl': True}, + ) + + +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('mongodb://localhost:12345?connectTimeoutMS=1') + + +@pytest.fixture() +def connection(request): + return get_connection( + hostname=os.environ.get('MONGODB_HOST', 'localhost'), + port=os.environ.get('MONGODB_27017_TCP', '27017'), + vhost=getattr( + request.config, "slaveinput", {} + ).get("slaveid", 'tests'), + ) + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBBasicFunctionality(BasicFunctionality): + pass + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBBaseExchangeTypes(BaseExchangeTypes): + + # MongoDB consumer skips old messages upon initialization. + # Ensure that it's created before test messages are published. + + def test_fanout(self, connection): + ex = kombu.Exchange('test_fanout', type='fanout') + test_queue1 = kombu.Queue('fanout1', exchange=ex) + consumer1 = self._create_consumer(connection, test_queue1) + test_queue2 = kombu.Queue('fanout2', exchange=ex) + consumer2 = self._create_consumer(connection, test_queue2) + + with connection as conn: + with conn.channel() as channel: + self._publish(channel, ex, [test_queue1, test_queue2]) + + self._consume_from(conn, consumer1) + self._consume_from(conn, consumer2) + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBPriority(BasePriority): + + # drain_events() consumes only one value unlike in py-amqp. + + def test_publish_consume(self, connection): + test_queue = kombu.Queue( + 'priority_test', routing_key='priority_test', max_priority=10 + ) + + received_messages = [] + + def callback(body, message): + received_messages.append(body) + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 3], + [{'msg': 'second'}, 6], + [{'msg': 'third'}, 3], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + # Second message must be received first + assert received_messages[0] == {'msg': 'second'} + assert received_messages[1] == {'msg': 'first'} + assert received_messages[2] == {'msg': 'third'} + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBMessage(BaseMessage): + pass diff --git a/t/unit/transport/test_mongodb.py b/t/unit/transport/test_mongodb.py index fe31a477..6bb5f1f9 100644 --- a/t/unit/transport/test_mongodb.py +++ b/t/unit/transport/test_mongodb.py @@ -153,16 +153,15 @@ class test_mongodb_channel(BaseMongoDBChannelCase): def test_get(self): - self.set_operation_return_value('messages', 'find_and_modify', { + self.set_operation_return_value('messages', 'find_one_and_delete', { '_id': 'docId', 'payload': '{"some": "data"}', }) event = self.channel._get('foobar') self.assert_collection_accessed('messages') self.assert_operation_called_with( - 'messages', 'find_and_modify', - query={'queue': 'foobar'}, - remove=True, + 'messages', 'find_one_and_delete', + {'queue': 'foobar'}, sort=[ ('priority', pymongo.ASCENDING), ], @@ -170,7 +169,11 @@ class test_mongodb_channel(BaseMongoDBChannelCase): assert event == {'some': 'data'} - self.set_operation_return_value('messages', 'find_and_modify', None) + self.set_operation_return_value( + 'messages', + 'find_one_and_delete', + None, + ) with pytest.raises(Empty): self.channel._get('foobar') @@ -190,7 +193,7 @@ class test_mongodb_channel(BaseMongoDBChannelCase): self.channel._put('foobar', {'some': 'data'}) self.assert_collection_accessed('messages') - self.assert_operation_called_with('messages', 'insert', { + self.assert_operation_called_with('messages', 'insert_one', { 'queue': 'foobar', 'priority': 9, 'payload': '{"some": "data"}', @@ -202,17 +205,17 @@ class test_mongodb_channel(BaseMongoDBChannelCase): self.channel._put_fanout('foobar', {'some': 'data'}, 'foo') self.assert_collection_accessed('messages.broadcast') - self.assert_operation_called_with('broadcast', 'insert', { + self.assert_operation_called_with('broadcast', 'insert_one', { 'queue': 'foobar', 'payload': '{"some": "data"}', }) def test_size(self): - self.set_operation_return_value('messages', 'find.count', 77) + self.set_operation_return_value('messages', 'count_documents', 77) result = self.channel._size('foobar') self.assert_collection_accessed('messages') self.assert_operation_called_with( - 'messages', 'find', {'queue': 'foobar'}, + 'messages', 'count_documents', {'queue': 'foobar'}, ) assert result == 77 @@ -229,7 +232,7 @@ class test_mongodb_channel(BaseMongoDBChannelCase): assert result == 77 def test_purge(self): - self.set_operation_return_value('messages', 'find.count', 77) + self.set_operation_return_value('messages', 'count_documents', 77) result = self.channel._purge('foobar') self.assert_collection_accessed('messages') @@ -278,11 +281,11 @@ class test_mongodb_channel(BaseMongoDBChannelCase): self.channel._queue_bind('test_exchange', 'foo', '*', 'foo') self.assert_collection_accessed('messages.routing') self.assert_operation_called_with( - 'routing', 'update', - {'queue': 'foo', 'pattern': '*', - 'routing_key': 'foo', 'exchange': 'test_exchange'}, + 'routing', 'update_one', {'queue': 'foo', 'pattern': '*', 'routing_key': 'foo', 'exchange': 'test_exchange'}, + {'$set': {'queue': 'foo', 'pattern': '*', + 'routing_key': 'foo', 'exchange': 'test_exchange'}}, upsert=True, ) @@ -319,16 +322,16 @@ class test_mongodb_channel(BaseMongoDBChannelCase): self.channel._ensure_indexes(self.channel.client) self.assert_operation_called_with( - 'messages', 'ensure_index', + 'messages', 'create_index', [('queue', 1), ('priority', 1), ('_id', 1)], background=True, ) self.assert_operation_called_with( - 'broadcast', 'ensure_index', + 'broadcast', 'create_index', [('queue', 1)], ) self.assert_operation_called_with( - 'routing', 'ensure_index', [('queue', 1), ('exchange', 1)], + 'routing', 'create_index', [('queue', 1), ('exchange', 1)], ) def test_create_broadcast_cursor(self): @@ -383,9 +386,9 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): self.channel._new_queue('foobar') self.assert_operation_called_with( - 'queues', 'update', + 'queues', 'update_one', {'_id': 'foobar'}, - {'_id': 'foobar', 'options': {}, 'expire_at': None}, + {'$set': {'_id': 'foobar', 'options': {}, 'expire_at': None}}, upsert=True, ) @@ -395,25 +398,23 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): '_id': 'docId', 'options': {'arguments': {'x-expires': 777}}, }) - self.set_operation_return_value('messages', 'find_and_modify', { + self.set_operation_return_value('messages', 'find_one_and_delete', { '_id': 'docId', 'payload': '{"some": "data"}', }) self.channel._get('foobar') self.assert_collection_accessed('messages', 'messages.queues') self.assert_operation_called_with( - 'messages', 'find_and_modify', - query={'queue': 'foobar'}, - remove=True, + 'messages', 'find_one_and_delete', + {'queue': 'foobar'}, sort=[ ('priority', pymongo.ASCENDING), ], ) self.assert_operation_called_with( - 'routing', 'update', + 'routing', 'update_many', {'queue': 'foobar'}, {'$set': {'expire_at': self.expire_at}}, - multi=True, ) def test_put(self): @@ -424,7 +425,7 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): self.channel._put('foobar', {'some': 'data'}) self.assert_collection_accessed('messages') - self.assert_operation_called_with('messages', 'insert', { + self.assert_operation_called_with('messages', 'insert_one', { 'queue': 'foobar', 'priority': 9, 'payload': '{"some": "data"}', @@ -439,12 +440,14 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): self.channel._queue_bind('test_exchange', 'foo', '*', 'foo') self.assert_collection_accessed('messages.routing') self.assert_operation_called_with( - 'routing', 'update', + 'routing', 'update_one', {'queue': 'foo', 'pattern': '*', 'routing_key': 'foo', 'exchange': 'test_exchange'}, - {'queue': 'foo', 'pattern': '*', - 'routing_key': 'foo', 'exchange': 'test_exchange', - 'expire_at': self.expire_at}, + {'$set': { + 'queue': 'foo', 'pattern': '*', + 'routing_key': 'foo', 'exchange': 'test_exchange', + 'expire_at': self.expire_at + }}, upsert=True, ) @@ -458,18 +461,18 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): self.channel._ensure_indexes(self.channel.client) self.assert_operation_called_with( - 'messages', 'ensure_index', [('expire_at', 1)], + 'messages', 'create_index', [('expire_at', 1)], expireAfterSeconds=0) self.assert_operation_called_with( - 'routing', 'ensure_index', [('expire_at', 1)], + 'routing', 'create_index', [('expire_at', 1)], expireAfterSeconds=0) self.assert_operation_called_with( - 'queues', 'ensure_index', [('expire_at', 1)], expireAfterSeconds=0) + 'queues', 'create_index', [('expire_at', 1)], expireAfterSeconds=0) - def test_get_expire(self): - result = self.channel._get_expire( + def test_get_queue_expire(self): + result = self.channel._get_queue_expire( {'arguments': {'x-expires': 777}}, 'x-expires') self.channel.client.assert_not_called() @@ -480,9 +483,15 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): '_id': 'docId', 'options': {'arguments': {'x-expires': 777}}, }) - result = self.channel._get_expire('foobar', 'x-expires') + result = self.channel._get_queue_expire('foobar', 'x-expires') assert result == self.expire_at + def test_get_message_expire(self): + assert self.channel._get_message_expire({ + 'properties': {'expiration': 777}, + }) == self.expire_at + assert self.channel._get_message_expire({}) is None + def test_update_queues_expire(self): self.set_operation_return_value('queues', 'find_one', { '_id': 'docId', 'options': {'arguments': {'x-expires': 777}}, @@ -491,16 +500,14 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase): self.assert_collection_accessed('messages.routing', 'messages.queues') self.assert_operation_called_with( - 'routing', 'update', + 'routing', 'update_many', {'queue': 'foobar'}, {'$set': {'expire_at': self.expire_at}}, - multi=True, ) self.assert_operation_called_with( - 'queues', 'update', + 'queues', 'update_many', {'_id': 'foobar'}, {'$set': {'expire_at': self.expire_at}}, - multi=True, ) @@ -517,7 +524,7 @@ class test_mongodb_channel_calc_queue_size(BaseMongoDBChannelCase): # Tests def test_size(self): - self.set_operation_return_value('messages', 'find.count', 77) + self.set_operation_return_value('messages', 'count_documents', 77) result = self.channel._size('foobar') |