summaryrefslogtreecommitdiff
path: root/t/integration
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-05-06 08:39:08 +0200
committerMatus Valo <matusvalo@gmail.com>2020-05-06 14:36:21 +0200
commit5113df342eb4eb7a2033d56c2dc958874ab24059 (patch)
treeb71dbdeb8f23624ac6f1ad85d39a9e7fb98f34b0 /t/integration
parent5d55eba3ecfc4eb673c5273d8f33017164bb7801 (diff)
downloadkombu-5113df342eb4eb7a2033d56c2dc958874ab24059.tar.gz
Added TTL integration tests
Diffstat (limited to 't/integration')
-rw-r--r--t/integration/common.py49
-rw-r--r--t/integration/test_py_amqp.py8
2 files changed, 56 insertions, 1 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index acffb7fc..a1f96e63 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals
import socket
from contextlib import closing
+from time import sleep
import pytest
import kombu
@@ -152,3 +153,51 @@ class BaseExchangeTypes(object):
with pytest.raises(socket.timeout):
# topic3 queue should not have data
self._consume(conn, test_queue3)
+
+
+class BaseTimeToLive(object):
+ def test_publish_consume(self, connection):
+ test_queue = kombu.Queue('ttl_test', routing_key='ttl_test')
+
+ def callback(body, message):
+ message.ack()
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ producer.publish(
+ {'hello': 'world'},
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle',
+ expiration=2
+ )
+
+ consumer = kombu.Consumer(
+ conn, [test_queue], accept=['pickle']
+ )
+ consumer.register_callback(callback)
+ sleep(3)
+ with consumer:
+ with pytest.raises(socket.timeout):
+ conn.drain_events(timeout=1)
+
+ def test_simple_queue_publish_consume(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('ttl_simple_queue_test')) as queue:
+ queue.put(
+ {'Hello': 'World'}, headers={'k1': 'v1'}, expiration=2
+ )
+ sleep(3)
+ with pytest.raises(queue.Empty):
+ queue.get(timeout=1)
+
+ def test_simple_buffer_publish_consume(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleBuffer('ttl_simple_buffer_test')) as buf:
+ buf.put({'Hello': 'World'}, headers={'k1': 'v1'}, expiration=2)
+ sleep(3)
+ with pytest.raises(buf.Empty):
+ buf.get(timeout=1)
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
index 98f6836d..dfc17ad0 100644
--- a/t/integration/test_py_amqp.py
+++ b/t/integration/test_py_amqp.py
@@ -5,7 +5,7 @@ import os
import pytest
import kombu
-from .common import BasicFunctionality, BaseExchangeTypes
+from .common import BasicFunctionality, BaseExchangeTypes, BaseTimeToLive
def get_connection(
@@ -35,3 +35,9 @@ class test_PyAMQPBasicFunctionality(BasicFunctionality):
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes):
pass
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_PyAMQPTimeToLive(BaseTimeToLive):
+ pass