diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-05-06 08:39:08 +0200 |
---|---|---|
committer | Matus Valo <matusvalo@gmail.com> | 2020-05-06 14:36:21 +0200 |
commit | 5113df342eb4eb7a2033d56c2dc958874ab24059 (patch) | |
tree | b71dbdeb8f23624ac6f1ad85d39a9e7fb98f34b0 /t/integration | |
parent | 5d55eba3ecfc4eb673c5273d8f33017164bb7801 (diff) | |
download | kombu-5113df342eb4eb7a2033d56c2dc958874ab24059.tar.gz |
Added TTL integration tests
Diffstat (limited to 't/integration')
-rw-r--r-- | t/integration/common.py | 49 | ||||
-rw-r--r-- | t/integration/test_py_amqp.py | 8 |
2 files changed, 56 insertions, 1 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index acffb7fc..a1f96e63 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals import socket from contextlib import closing +from time import sleep import pytest import kombu @@ -152,3 +153,51 @@ class BaseExchangeTypes(object): with pytest.raises(socket.timeout): # topic3 queue should not have data self._consume(conn, test_queue3) + + +class BaseTimeToLive(object): + def test_publish_consume(self, connection): + test_queue = kombu.Queue('ttl_test', routing_key='ttl_test') + + def callback(body, message): + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + producer.publish( + {'hello': 'world'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + expiration=2 + ) + + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + sleep(3) + with consumer: + with pytest.raises(socket.timeout): + conn.drain_events(timeout=1) + + def test_simple_queue_publish_consume(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('ttl_simple_queue_test')) as queue: + queue.put( + {'Hello': 'World'}, headers={'k1': 'v1'}, expiration=2 + ) + sleep(3) + with pytest.raises(queue.Empty): + queue.get(timeout=1) + + def test_simple_buffer_publish_consume(self, connection): + with connection as conn: + with closing(conn.SimpleBuffer('ttl_simple_buffer_test')) as buf: + buf.put({'Hello': 'World'}, headers={'k1': 'v1'}, expiration=2) + sleep(3) + with pytest.raises(buf.Empty): + buf.get(timeout=1) diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index 98f6836d..dfc17ad0 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -5,7 +5,7 @@ import os import pytest import kombu -from .common import BasicFunctionality, BaseExchangeTypes +from .common import BasicFunctionality, BaseExchangeTypes, BaseTimeToLive def get_connection( @@ -35,3 +35,9 @@ class test_PyAMQPBasicFunctionality(BasicFunctionality): @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes): pass + + +@pytest.mark.env('py-amqp') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_PyAMQPTimeToLive(BaseTimeToLive): + pass |