summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorJakub Pieńkowski <8525083+Jakski@users.noreply.github.com>2022-04-18 14:10:38 +0200
committerGitHub <noreply@github.com>2022-04-18 18:10:38 +0600
commit661d92222eb485fd9131543c8a0a224824f5e0f0 (patch)
tree67c0469003d4b7e734dd798de13bda3bb27f1992 /t
parent0f9f554b7cb9a307b07bec74688095053034fd57 (diff)
downloadkombu-661d92222eb485fd9131543c8a0a224824f5e0f0.tar.gz
Support pymongo 4.x (#1536)
* Support pymongo 4.x * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix problems detected by CI Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Diffstat (limited to 't')
-rw-r--r--t/integration/common.py10
-rw-r--r--t/integration/test_mongodb.py115
-rw-r--r--t/unit/transport/test_mongodb.py87
3 files changed, 170 insertions, 42 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index 0c40b6e9..82dc3f1b 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -138,14 +138,21 @@ class BaseExchangeTypes:
message.delivery_info['exchange'] == ''
assert message.payload == body
- def _consume(self, connection, queue):
+ def _create_consumer(self, connection, queue):
consumer = kombu.Consumer(
connection, [queue], accept=['pickle']
)
consumer.register_callback(self._callback)
+ return consumer
+
+ def _consume_from(self, connection, consumer):
with consumer:
connection.drain_events(timeout=1)
+ def _consume(self, connection, queue):
+ with self._create_consumer(connection, queue):
+ connection.drain_events(timeout=1)
+
def _publish(self, channel, exchange, queues=None, routing_key=None):
producer = kombu.Producer(channel, exchange=exchange)
if routing_key:
@@ -215,7 +222,6 @@ class BaseExchangeTypes:
channel, ex, [test_queue1, test_queue2, test_queue3],
routing_key='t.1'
)
-
self._consume(conn, test_queue1)
self._consume(conn, test_queue2)
with pytest.raises(socket.timeout):
diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py
new file mode 100644
index 00000000..445f1389
--- /dev/null
+++ b/t/integration/test_mongodb.py
@@ -0,0 +1,115 @@
+from __future__ import annotations
+
+import os
+
+import pytest
+
+import kombu
+
+from .common import (BaseExchangeTypes, BaseMessage, BasePriority,
+ BasicFunctionality)
+
+
+def get_connection(hostname, port, vhost):
+ return kombu.Connection(
+ f'mongodb://{hostname}:{port}/{vhost}',
+ transport_options={'ttl': True},
+ )
+
+
+@pytest.fixture()
+def invalid_connection():
+ return kombu.Connection('mongodb://localhost:12345?connectTimeoutMS=1')
+
+
+@pytest.fixture()
+def connection(request):
+ return get_connection(
+ hostname=os.environ.get('MONGODB_HOST', 'localhost'),
+ port=os.environ.get('MONGODB_27017_TCP', '27017'),
+ vhost=getattr(
+ request.config, "slaveinput", {}
+ ).get("slaveid", 'tests'),
+ )
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBBasicFunctionality(BasicFunctionality):
+ pass
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBBaseExchangeTypes(BaseExchangeTypes):
+
+ # MongoDB consumer skips old messages upon initialization.
+ # Ensure that it's created before test messages are published.
+
+ def test_fanout(self, connection):
+ ex = kombu.Exchange('test_fanout', type='fanout')
+ test_queue1 = kombu.Queue('fanout1', exchange=ex)
+ consumer1 = self._create_consumer(connection, test_queue1)
+ test_queue2 = kombu.Queue('fanout2', exchange=ex)
+ consumer2 = self._create_consumer(connection, test_queue2)
+
+ with connection as conn:
+ with conn.channel() as channel:
+ self._publish(channel, ex, [test_queue1, test_queue2])
+
+ self._consume_from(conn, consumer1)
+ self._consume_from(conn, consumer2)
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBPriority(BasePriority):
+
+ # drain_events() consumes only one value unlike in py-amqp.
+
+ def test_publish_consume(self, connection):
+ test_queue = kombu.Queue(
+ 'priority_test', routing_key='priority_test', max_priority=10
+ )
+
+ received_messages = []
+
+ def callback(body, message):
+ received_messages.append(body)
+ message.ack()
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ for msg, prio in [
+ [{'msg': 'first'}, 3],
+ [{'msg': 'second'}, 6],
+ [{'msg': 'third'}, 3],
+ ]:
+ producer.publish(
+ msg,
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle',
+ priority=prio
+ )
+ consumer = kombu.Consumer(
+ conn, [test_queue], accept=['pickle']
+ )
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+ conn.drain_events(timeout=1)
+ conn.drain_events(timeout=1)
+ # Second message must be received first
+ assert received_messages[0] == {'msg': 'second'}
+ assert received_messages[1] == {'msg': 'first'}
+ assert received_messages[2] == {'msg': 'third'}
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBMessage(BaseMessage):
+ pass
diff --git a/t/unit/transport/test_mongodb.py b/t/unit/transport/test_mongodb.py
index fe31a477..6bb5f1f9 100644
--- a/t/unit/transport/test_mongodb.py
+++ b/t/unit/transport/test_mongodb.py
@@ -153,16 +153,15 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
def test_get(self):
- self.set_operation_return_value('messages', 'find_and_modify', {
+ self.set_operation_return_value('messages', 'find_one_and_delete', {
'_id': 'docId', 'payload': '{"some": "data"}',
})
event = self.channel._get('foobar')
self.assert_collection_accessed('messages')
self.assert_operation_called_with(
- 'messages', 'find_and_modify',
- query={'queue': 'foobar'},
- remove=True,
+ 'messages', 'find_one_and_delete',
+ {'queue': 'foobar'},
sort=[
('priority', pymongo.ASCENDING),
],
@@ -170,7 +169,11 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
assert event == {'some': 'data'}
- self.set_operation_return_value('messages', 'find_and_modify', None)
+ self.set_operation_return_value(
+ 'messages',
+ 'find_one_and_delete',
+ None,
+ )
with pytest.raises(Empty):
self.channel._get('foobar')
@@ -190,7 +193,7 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
self.channel._put('foobar', {'some': 'data'})
self.assert_collection_accessed('messages')
- self.assert_operation_called_with('messages', 'insert', {
+ self.assert_operation_called_with('messages', 'insert_one', {
'queue': 'foobar',
'priority': 9,
'payload': '{"some": "data"}',
@@ -202,17 +205,17 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
self.channel._put_fanout('foobar', {'some': 'data'}, 'foo')
self.assert_collection_accessed('messages.broadcast')
- self.assert_operation_called_with('broadcast', 'insert', {
+ self.assert_operation_called_with('broadcast', 'insert_one', {
'queue': 'foobar', 'payload': '{"some": "data"}',
})
def test_size(self):
- self.set_operation_return_value('messages', 'find.count', 77)
+ self.set_operation_return_value('messages', 'count_documents', 77)
result = self.channel._size('foobar')
self.assert_collection_accessed('messages')
self.assert_operation_called_with(
- 'messages', 'find', {'queue': 'foobar'},
+ 'messages', 'count_documents', {'queue': 'foobar'},
)
assert result == 77
@@ -229,7 +232,7 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
assert result == 77
def test_purge(self):
- self.set_operation_return_value('messages', 'find.count', 77)
+ self.set_operation_return_value('messages', 'count_documents', 77)
result = self.channel._purge('foobar')
self.assert_collection_accessed('messages')
@@ -278,11 +281,11 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
self.channel._queue_bind('test_exchange', 'foo', '*', 'foo')
self.assert_collection_accessed('messages.routing')
self.assert_operation_called_with(
- 'routing', 'update',
- {'queue': 'foo', 'pattern': '*',
- 'routing_key': 'foo', 'exchange': 'test_exchange'},
+ 'routing', 'update_one',
{'queue': 'foo', 'pattern': '*',
'routing_key': 'foo', 'exchange': 'test_exchange'},
+ {'$set': {'queue': 'foo', 'pattern': '*',
+ 'routing_key': 'foo', 'exchange': 'test_exchange'}},
upsert=True,
)
@@ -319,16 +322,16 @@ class test_mongodb_channel(BaseMongoDBChannelCase):
self.channel._ensure_indexes(self.channel.client)
self.assert_operation_called_with(
- 'messages', 'ensure_index',
+ 'messages', 'create_index',
[('queue', 1), ('priority', 1), ('_id', 1)],
background=True,
)
self.assert_operation_called_with(
- 'broadcast', 'ensure_index',
+ 'broadcast', 'create_index',
[('queue', 1)],
)
self.assert_operation_called_with(
- 'routing', 'ensure_index', [('queue', 1), ('exchange', 1)],
+ 'routing', 'create_index', [('queue', 1), ('exchange', 1)],
)
def test_create_broadcast_cursor(self):
@@ -383,9 +386,9 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
self.channel._new_queue('foobar')
self.assert_operation_called_with(
- 'queues', 'update',
+ 'queues', 'update_one',
{'_id': 'foobar'},
- {'_id': 'foobar', 'options': {}, 'expire_at': None},
+ {'$set': {'_id': 'foobar', 'options': {}, 'expire_at': None}},
upsert=True,
)
@@ -395,25 +398,23 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
'_id': 'docId', 'options': {'arguments': {'x-expires': 777}},
})
- self.set_operation_return_value('messages', 'find_and_modify', {
+ self.set_operation_return_value('messages', 'find_one_and_delete', {
'_id': 'docId', 'payload': '{"some": "data"}',
})
self.channel._get('foobar')
self.assert_collection_accessed('messages', 'messages.queues')
self.assert_operation_called_with(
- 'messages', 'find_and_modify',
- query={'queue': 'foobar'},
- remove=True,
+ 'messages', 'find_one_and_delete',
+ {'queue': 'foobar'},
sort=[
('priority', pymongo.ASCENDING),
],
)
self.assert_operation_called_with(
- 'routing', 'update',
+ 'routing', 'update_many',
{'queue': 'foobar'},
{'$set': {'expire_at': self.expire_at}},
- multi=True,
)
def test_put(self):
@@ -424,7 +425,7 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
self.channel._put('foobar', {'some': 'data'})
self.assert_collection_accessed('messages')
- self.assert_operation_called_with('messages', 'insert', {
+ self.assert_operation_called_with('messages', 'insert_one', {
'queue': 'foobar',
'priority': 9,
'payload': '{"some": "data"}',
@@ -439,12 +440,14 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
self.channel._queue_bind('test_exchange', 'foo', '*', 'foo')
self.assert_collection_accessed('messages.routing')
self.assert_operation_called_with(
- 'routing', 'update',
+ 'routing', 'update_one',
{'queue': 'foo', 'pattern': '*',
'routing_key': 'foo', 'exchange': 'test_exchange'},
- {'queue': 'foo', 'pattern': '*',
- 'routing_key': 'foo', 'exchange': 'test_exchange',
- 'expire_at': self.expire_at},
+ {'$set': {
+ 'queue': 'foo', 'pattern': '*',
+ 'routing_key': 'foo', 'exchange': 'test_exchange',
+ 'expire_at': self.expire_at
+ }},
upsert=True,
)
@@ -458,18 +461,18 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
self.channel._ensure_indexes(self.channel.client)
self.assert_operation_called_with(
- 'messages', 'ensure_index', [('expire_at', 1)],
+ 'messages', 'create_index', [('expire_at', 1)],
expireAfterSeconds=0)
self.assert_operation_called_with(
- 'routing', 'ensure_index', [('expire_at', 1)],
+ 'routing', 'create_index', [('expire_at', 1)],
expireAfterSeconds=0)
self.assert_operation_called_with(
- 'queues', 'ensure_index', [('expire_at', 1)], expireAfterSeconds=0)
+ 'queues', 'create_index', [('expire_at', 1)], expireAfterSeconds=0)
- def test_get_expire(self):
- result = self.channel._get_expire(
+ def test_get_queue_expire(self):
+ result = self.channel._get_queue_expire(
{'arguments': {'x-expires': 777}}, 'x-expires')
self.channel.client.assert_not_called()
@@ -480,9 +483,15 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
'_id': 'docId', 'options': {'arguments': {'x-expires': 777}},
})
- result = self.channel._get_expire('foobar', 'x-expires')
+ result = self.channel._get_queue_expire('foobar', 'x-expires')
assert result == self.expire_at
+ def test_get_message_expire(self):
+ assert self.channel._get_message_expire({
+ 'properties': {'expiration': 777},
+ }) == self.expire_at
+ assert self.channel._get_message_expire({}) is None
+
def test_update_queues_expire(self):
self.set_operation_return_value('queues', 'find_one', {
'_id': 'docId', 'options': {'arguments': {'x-expires': 777}},
@@ -491,16 +500,14 @@ class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
self.assert_collection_accessed('messages.routing', 'messages.queues')
self.assert_operation_called_with(
- 'routing', 'update',
+ 'routing', 'update_many',
{'queue': 'foobar'},
{'$set': {'expire_at': self.expire_at}},
- multi=True,
)
self.assert_operation_called_with(
- 'queues', 'update',
+ 'queues', 'update_many',
{'_id': 'foobar'},
{'$set': {'expire_at': self.expire_at}},
- multi=True,
)
@@ -517,7 +524,7 @@ class test_mongodb_channel_calc_queue_size(BaseMongoDBChannelCase):
# Tests
def test_size(self):
- self.set_operation_return_value('messages', 'find.count', 77)
+ self.set_operation_return_value('messages', 'count_documents', 77)
result = self.channel._size('foobar')