diff options
author | Luis Saavedra <571849+lsaavedr@users.noreply.github.com> | 2021-05-22 15:28:05 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-22 21:28:05 +0200 |
commit | e6d76d13fc342dec473828a7780c0c2f77b4b738 (patch) | |
tree | efd3084f43b2acfa2c4a2e387f31da5980dca439 | |
parent | a920bb036515645460799f0019e294963a28e1ee (diff) | |
download | kombu-e6d76d13fc342dec473828a7780c0c2f77b4b738.tar.gz |
add accept parameter to SimpleQueue class (#1140)
* add accept parameter to SimpleQueue class
* Fixed missing accept for get_nowait() and added unittests
* Remove unused **kwargs from SimpleQueue.__init__
* Use self.consumer.accept instead of new attribute in SimpleQueue
* Add tests for simple interface when accept=[]
Co-authored-by: Matus Valo <matusvalo@gmail.com>
-rw-r--r-- | kombu/simple.py | 8 | ||||
-rw-r--r-- | t/unit/test_simple.py | 36 |
2 files changed, 37 insertions, 7 deletions
diff --git a/kombu/simple.py b/kombu/simple.py index cdcf86f3..bab49881 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -67,7 +67,7 @@ class SimpleBase: remaining = timeout - elapsed def get_nowait(self): - m = self.queue.get(no_ack=self.no_ack) + m = self.queue.get(no_ack=self.no_ack, accept=self.consumer.accept) if not m: raise self.Empty() return m @@ -118,7 +118,7 @@ class SimpleQueue(SimpleBase): def __init__(self, channel, name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, - compression=None, **kwargs): + compression=None, accept=None): queue = name queue_opts = dict(self.queue_opts, **queue_opts or {}) queue_args = dict(self.queue_args, **queue_args or {}) @@ -134,13 +134,13 @@ class SimpleQueue(SimpleBase): else: exchange = queue.exchange routing_key = queue.routing_key - consumer = messaging.Consumer(channel, queue) + consumer = messaging.Consumer(channel, queue, accept=accept) producer = messaging.Producer(channel, exchange, serializer=serializer, routing_key=routing_key, compression=compression) super().__init__(channel, producer, - consumer, no_ack, **kwargs) + consumer, no_ack) class SimpleBuffer(SimpleQueue): diff --git a/t/unit/test_simple.py b/t/unit/test_simple.py index 1f9cc8e2..c4badb03 100644 --- a/t/unit/test_simple.py +++ b/t/unit/test_simple.py @@ -3,6 +3,7 @@ import pytest from unittest.mock import Mock from kombu import Connection, Exchange, Queue +from kombu.exceptions import ContentDisallowed class SimpleBase: @@ -21,13 +22,10 @@ class SimpleBase: def setup(self): self.connection = Connection(transport='memory') self.connection.default_channel.exchange_declare('amq.direct') - self.q = self.Queue(None, no_ack=True) def teardown(self): - self.q.close() self.connection.close() self.connection = None - self.q = None def test_produce__consume(self): q = self.Queue('test_produce__consume', no_ack=True) @@ -50,6 +48,38 @@ class SimpleBase: with pytest.raises(q.Empty): q.get(block=False) + def test_get_nowait_accept(self): + q = self.Queue('test_accept', serializer='pickle', accept=['json']) + q.put({'hello': 'SimpleSync'}) + with pytest.raises(ContentDisallowed): + q.get_nowait().payload + + q = self.Queue('test_accept1', serializer='json', accept=[]) + q.put({'hello': 'SimpleSync'}) + with pytest.raises(ContentDisallowed): + q.get_nowait().payload + + q = self.Queue( + 'test_accept2', serializer='pickle', accept=['json', 'pickle']) + q.put({'hello': 'SimpleSync'}) + assert q.get_nowait().payload == {'hello': 'SimpleSync'} + + def test_get_accept(self): + q = self.Queue('test_accept', serializer='pickle', accept=['json']) + q.put({'hello': 'SimpleSync'}) + with pytest.raises(ContentDisallowed): + q.get().payload + + q = self.Queue('test_accept1', serializer='pickle', accept=[]) + q.put({'hello': 'SimpleSync'}) + with pytest.raises(ContentDisallowed): + q.get().payload + + q = self.Queue( + 'test_accept2', serializer='pickle', accept=['json', 'pickle']) + q.put({'hello': 'SimpleSync'}) + assert q.get().payload == {'hello': 'SimpleSync'} + def test_clear(self): q = self.Queue('test_clear', no_ack=True) |