summaryrefslogtreecommitdiff
path: root/t/unit
diff options
context:
space:
mode:
authorGao <mishanyo1001@gmail.com>2022-03-15 22:21:07 +0800
committerGitHub <noreply@github.com>2022-03-15 20:21:07 +0600
commit0282e1419fad98da5ae956ff38c7e87e539889ac (patch)
tree55d61b6e10f64bface5e11e704b6ad279f63fd02 /t/unit
parentd57dde5631c5c7dd73300a79613975531112aae6 (diff)
downloadkombu-0282e1419fad98da5ae956ff38c7e87e539889ac.tar.gz
Add fanout to filesystem (#1499)
* Create a folder for each queue when using filesystem transport and add fanout support * clean up unused variables * Add fanout support to filesystem transport filesystem transport lacks of fanout support. 1. Add fanout support to filesystem transport. 2. Add a unit test for it. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove the refactoring work and make the test passed 1. Remove all of refactoring work 2. make the test pass * Use pathlib for some Path operation * Some reviewed changes Co-authored-by: Yuriy Halytskyy <y.halytskyy@auckland.ac.nz> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Diffstat (limited to 't/unit')
-rw-r--r--t/unit/transport/test_filesystem.py94
1 files changed, 94 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py
index a8d1708b..fdf233ae 100644
--- a/t/unit/transport/test_filesystem.py
+++ b/t/unit/transport/test_filesystem.py
@@ -1,4 +1,5 @@
import tempfile
+from queue import Empty
import pytest
@@ -138,3 +139,96 @@ class test_FilesystemTransport:
assert self.q2(consumer_channel).get()
self.q2(consumer_channel).purge()
assert self.q2(consumer_channel).get() is None
+
+
+@t.skip.if_win32
+class test_FilesystemFanout:
+ def setup(self):
+ try:
+ data_folder_in = tempfile.mkdtemp()
+ data_folder_out = tempfile.mkdtemp()
+ control_folder = tempfile.mkdtemp()
+ except Exception:
+ pytest.skip("filesystem transport: cannot create tempfiles")
+
+ self.consumer_connection = Connection(
+ transport="filesystem",
+ transport_options={
+ "data_folder_in": data_folder_in,
+ "data_folder_out": data_folder_out,
+ "control_folder": control_folder,
+ },
+ )
+ self.consume_channel = self.consumer_connection.channel()
+ self.produce_connection = Connection(
+ transport="filesystem",
+ transport_options={
+ "data_folder_in": data_folder_out,
+ "data_folder_out": data_folder_in,
+ "control_folder": control_folder,
+ },
+ )
+ self.producer_channel = self.produce_connection.channel()
+ self.exchange = Exchange("filesystem_exchange_fanout", type="fanout")
+ self.q1 = Queue("queue1", exchange=self.exchange)
+ self.q2 = Queue("queue2", exchange=self.exchange)
+
+ def teardown(self):
+ # make sure we don't attempt to restore messages at shutdown.
+ for channel in [self.producer_channel, self.consumer_connection]:
+ try:
+ channel._qos._dirty.clear()
+ except AttributeError:
+ pass
+ try:
+ channel._qos._delivered.clear()
+ except AttributeError:
+ pass
+
+ def test_produce_consume(self):
+
+ producer = Producer(self.producer_channel, self.exchange)
+ consumer1 = Consumer(self.consume_channel, self.q1)
+ consumer2 = Consumer(self.consume_channel, self.q2)
+ self.q2(self.consume_channel).declare()
+
+ for i in range(10):
+ producer.publish({"foo": i})
+
+ _received1 = []
+ _received2 = []
+
+ def callback1(message_data, message):
+ _received1.append(message)
+ message.ack()
+
+ def callback2(message_data, message):
+ _received2.append(message)
+ message.ack()
+
+ consumer1.register_callback(callback1)
+ consumer2.register_callback(callback2)
+
+ consumer1.consume()
+ consumer2.consume()
+
+ while 1:
+ try:
+ self.consume_channel.drain_events()
+ except Empty:
+ break
+
+ assert len(_received1) + len(_received2) == 20
+
+ # queue.delete
+ for i in range(10):
+ producer.publish({"foo": i})
+ assert self.q1(self.consume_channel).get()
+ self.q1(self.consume_channel).delete()
+ self.q1(self.consume_channel).declare()
+ assert self.q1(self.consume_channel).get() is None
+
+ # queue.purge
+ assert self.q2(self.consume_channel).get()
+ self.q2(self.consume_channel).purge()
+ assert self.q2(self.consume_channel).get() is None