summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuis Saavedra <571849+lsaavedr@users.noreply.github.com>2021-05-22 15:28:05 -0400
committerGitHub <noreply@github.com>2021-05-22 21:28:05 +0200
commite6d76d13fc342dec473828a7780c0c2f77b4b738 (patch)
treeefd3084f43b2acfa2c4a2e387f31da5980dca439
parenta920bb036515645460799f0019e294963a28e1ee (diff)
downloadkombu-e6d76d13fc342dec473828a7780c0c2f77b4b738.tar.gz
add accept parameter to SimpleQueue class (#1140)
* add accept parameter to SimpleQueue class * Fixed missing accept for get_nowait() and added unittests * Remove unused **kwargs from SimpleQueue.__init__ * Use self.consumer.accept instead of new attribute in SimpleQueue * Add tests for simple interface when accept=[] Co-authored-by: Matus Valo <matusvalo@gmail.com>
-rw-r--r--kombu/simple.py8
-rw-r--r--t/unit/test_simple.py36
2 files changed, 37 insertions, 7 deletions
diff --git a/kombu/simple.py b/kombu/simple.py
index cdcf86f3..bab49881 100644
--- a/kombu/simple.py
+++ b/kombu/simple.py
@@ -67,7 +67,7 @@ class SimpleBase:
remaining = timeout - elapsed
def get_nowait(self):
- m = self.queue.get(no_ack=self.no_ack)
+ m = self.queue.get(no_ack=self.no_ack, accept=self.consumer.accept)
if not m:
raise self.Empty()
return m
@@ -118,7 +118,7 @@ class SimpleQueue(SimpleBase):
def __init__(self, channel, name, no_ack=None, queue_opts=None,
queue_args=None, exchange_opts=None, serializer=None,
- compression=None, **kwargs):
+ compression=None, accept=None):
queue = name
queue_opts = dict(self.queue_opts, **queue_opts or {})
queue_args = dict(self.queue_args, **queue_args or {})
@@ -134,13 +134,13 @@ class SimpleQueue(SimpleBase):
else:
exchange = queue.exchange
routing_key = queue.routing_key
- consumer = messaging.Consumer(channel, queue)
+ consumer = messaging.Consumer(channel, queue, accept=accept)
producer = messaging.Producer(channel, exchange,
serializer=serializer,
routing_key=routing_key,
compression=compression)
super().__init__(channel, producer,
- consumer, no_ack, **kwargs)
+ consumer, no_ack)
class SimpleBuffer(SimpleQueue):
diff --git a/t/unit/test_simple.py b/t/unit/test_simple.py
index 1f9cc8e2..c4badb03 100644
--- a/t/unit/test_simple.py
+++ b/t/unit/test_simple.py
@@ -3,6 +3,7 @@ import pytest
from unittest.mock import Mock
from kombu import Connection, Exchange, Queue
+from kombu.exceptions import ContentDisallowed
class SimpleBase:
@@ -21,13 +22,10 @@ class SimpleBase:
def setup(self):
self.connection = Connection(transport='memory')
self.connection.default_channel.exchange_declare('amq.direct')
- self.q = self.Queue(None, no_ack=True)
def teardown(self):
- self.q.close()
self.connection.close()
self.connection = None
- self.q = None
def test_produce__consume(self):
q = self.Queue('test_produce__consume', no_ack=True)
@@ -50,6 +48,38 @@ class SimpleBase:
with pytest.raises(q.Empty):
q.get(block=False)
+ def test_get_nowait_accept(self):
+ q = self.Queue('test_accept', serializer='pickle', accept=['json'])
+ q.put({'hello': 'SimpleSync'})
+ with pytest.raises(ContentDisallowed):
+ q.get_nowait().payload
+
+ q = self.Queue('test_accept1', serializer='json', accept=[])
+ q.put({'hello': 'SimpleSync'})
+ with pytest.raises(ContentDisallowed):
+ q.get_nowait().payload
+
+ q = self.Queue(
+ 'test_accept2', serializer='pickle', accept=['json', 'pickle'])
+ q.put({'hello': 'SimpleSync'})
+ assert q.get_nowait().payload == {'hello': 'SimpleSync'}
+
+ def test_get_accept(self):
+ q = self.Queue('test_accept', serializer='pickle', accept=['json'])
+ q.put({'hello': 'SimpleSync'})
+ with pytest.raises(ContentDisallowed):
+ q.get().payload
+
+ q = self.Queue('test_accept1', serializer='pickle', accept=[])
+ q.put({'hello': 'SimpleSync'})
+ with pytest.raises(ContentDisallowed):
+ q.get().payload
+
+ q = self.Queue(
+ 'test_accept2', serializer='pickle', accept=['json', 'pickle'])
+ q.put({'hello': 'SimpleSync'})
+ assert q.get().payload == {'hello': 'SimpleSync'}
+
def test_clear(self):
q = self.Queue('test_clear', no_ack=True)