summaryrefslogtreecommitdiff
path: root/t/integration/test_mongodb.py
blob: 445f138986a7e91b8f775fa75e3a08cb0c9290ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from __future__ import annotations

import os

import pytest

import kombu

from .common import (BaseExchangeTypes, BaseMessage, BasePriority,
                     BasicFunctionality)


def get_connection(hostname, port, vhost):
    return kombu.Connection(
        f'mongodb://{hostname}:{port}/{vhost}',
        transport_options={'ttl': True},
    )


@pytest.fixture()
def invalid_connection():
    return kombu.Connection('mongodb://localhost:12345?connectTimeoutMS=1')


@pytest.fixture()
def connection(request):
    return get_connection(
        hostname=os.environ.get('MONGODB_HOST', 'localhost'),
        port=os.environ.get('MONGODB_27017_TCP', '27017'),
        vhost=getattr(
            request.config, "slaveinput", {}
        ).get("slaveid", 'tests'),
    )


@pytest.mark.env('mongodb')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_MongoDBBasicFunctionality(BasicFunctionality):
    pass


@pytest.mark.env('mongodb')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_MongoDBBaseExchangeTypes(BaseExchangeTypes):

    # MongoDB consumer skips old messages upon initialization.
    # Ensure that it's created before test messages are published.

    def test_fanout(self, connection):
        ex = kombu.Exchange('test_fanout', type='fanout')
        test_queue1 = kombu.Queue('fanout1', exchange=ex)
        consumer1 = self._create_consumer(connection, test_queue1)
        test_queue2 = kombu.Queue('fanout2', exchange=ex)
        consumer2 = self._create_consumer(connection, test_queue2)

        with connection as conn:
            with conn.channel() as channel:
                self._publish(channel, ex, [test_queue1, test_queue2])

                self._consume_from(conn, consumer1)
                self._consume_from(conn, consumer2)


@pytest.mark.env('mongodb')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_MongoDBPriority(BasePriority):

    # drain_events() consumes only one value unlike in py-amqp.

    def test_publish_consume(self, connection):
        test_queue = kombu.Queue(
            'priority_test', routing_key='priority_test', max_priority=10
        )

        received_messages = []

        def callback(body, message):
            received_messages.append(body)
            message.ack()

        with connection as conn:
            with conn.channel() as channel:
                producer = kombu.Producer(channel)
                for msg, prio in [
                    [{'msg': 'first'}, 3],
                    [{'msg': 'second'}, 6],
                    [{'msg': 'third'}, 3],
                ]:
                    producer.publish(
                        msg,
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle',
                        priority=prio
                    )
                consumer = kombu.Consumer(
                    conn, [test_queue], accept=['pickle']
                )
                consumer.register_callback(callback)
                with consumer:
                    conn.drain_events(timeout=1)
                    conn.drain_events(timeout=1)
                    conn.drain_events(timeout=1)
                # Second message must be received first
                assert received_messages[0] == {'msg': 'second'}
                assert received_messages[1] == {'msg': 'first'}
                assert received_messages[2] == {'msg': 'third'}


@pytest.mark.env('mongodb')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_MongoDBMessage(BaseMessage):
    pass