diff options
author | Gao <mishanyo1001@gmail.com> | 2022-03-15 22:21:07 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-15 20:21:07 +0600 |
commit | 0282e1419fad98da5ae956ff38c7e87e539889ac (patch) | |
tree | 55d61b6e10f64bface5e11e704b6ad279f63fd02 /t/unit | |
parent | d57dde5631c5c7dd73300a79613975531112aae6 (diff) | |
download | kombu-0282e1419fad98da5ae956ff38c7e87e539889ac.tar.gz |
Add fanout to filesystem (#1499)
* Create a folder for each queue when using filesystem transport and add fanout support
* clean up unused variables
* Add fanout support to filesystem transport
filesystem transport lacks of fanout support.
1. Add fanout support to filesystem transport.
2. Add a unit test for it.
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Remove the refactoring work and make the test passed
1. Remove all of refactoring work
2. make the test pass
* Use pathlib for some Path operation
* Some reviewed changes
Co-authored-by: Yuriy Halytskyy <y.halytskyy@auckland.ac.nz>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Diffstat (limited to 't/unit')
-rw-r--r-- | t/unit/transport/test_filesystem.py | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index a8d1708b..fdf233ae 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -1,4 +1,5 @@ import tempfile +from queue import Empty import pytest @@ -138,3 +139,96 @@ class test_FilesystemTransport: assert self.q2(consumer_channel).get() self.q2(consumer_channel).purge() assert self.q2(consumer_channel).get() is None + + +@t.skip.if_win32 +class test_FilesystemFanout: + def setup(self): + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + control_folder = tempfile.mkdtemp() + except Exception: + pytest.skip("filesystem transport: cannot create tempfiles") + + self.consumer_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_in, + "data_folder_out": data_folder_out, + "control_folder": control_folder, + }, + ) + self.consume_channel = self.consumer_connection.channel() + self.produce_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_out, + "data_folder_out": data_folder_in, + "control_folder": control_folder, + }, + ) + self.producer_channel = self.produce_connection.channel() + self.exchange = Exchange("filesystem_exchange_fanout", type="fanout") + self.q1 = Queue("queue1", exchange=self.exchange) + self.q2 = Queue("queue2", exchange=self.exchange) + + def teardown(self): + # make sure we don't attempt to restore messages at shutdown. + for channel in [self.producer_channel, self.consumer_connection]: + try: + channel._qos._dirty.clear() + except AttributeError: + pass + try: + channel._qos._delivered.clear() + except AttributeError: + pass + + def test_produce_consume(self): + + producer = Producer(self.producer_channel, self.exchange) + consumer1 = Consumer(self.consume_channel, self.q1) + consumer2 = Consumer(self.consume_channel, self.q2) + self.q2(self.consume_channel).declare() + + for i in range(10): + producer.publish({"foo": i}) + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + consumer1.consume() + consumer2.consume() + + while 1: + try: + self.consume_channel.drain_events() + except Empty: + break + + assert len(_received1) + len(_received2) == 20 + + # queue.delete + for i in range(10): + producer.publish({"foo": i}) + assert self.q1(self.consume_channel).get() + self.q1(self.consume_channel).delete() + self.q1(self.consume_channel).declare() + assert self.q1(self.consume_channel).get() is None + + # queue.purge + assert self.q2(self.consume_channel).get() + self.q2(self.consume_channel).purge() + assert self.q2(self.consume_channel).get() is None |