diff options
Diffstat (limited to 't/unit/transport/test_filesystem.py')
-rw-r--r-- | t/unit/transport/test_filesystem.py | 112 |
1 files changed, 1 insertions, 111 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index c452e7ca..b22e3b8d 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -1,19 +1,17 @@ from __future__ import annotations import tempfile -from fcntl import LOCK_EX, LOCK_NB, LOCK_SH from queue import Empty -from unittest.mock import call, patch import pytest import t.skip from kombu import Connection, Consumer, Exchange, Producer, Queue -from kombu.transport.filesystem import lock, unlock @t.skip.if_win32 class test_FilesystemTransport: + def setup(self): self.channels = set() try: @@ -147,7 +145,6 @@ class test_FilesystemTransport: @t.skip.if_win32 class test_FilesystemFanout: - def setup(self): try: data_folder_in = tempfile.mkdtemp() @@ -237,110 +234,3 @@ class test_FilesystemFanout: assert self.q2(self.consume_channel).get() self.q2(self.consume_channel).purge() assert self.q2(self.consume_channel).get() is None - - -@t.skip.if_win32 -class test_FilesystemLock: - def test_lock(self): - file_obj1 = tempfile.NamedTemporaryFile() - with open(file_obj1.name) as file_obj2: - lock(file_obj1, LOCK_SH) - with pytest.raises(BlockingIOError): - lock(file_obj2, LOCK_EX | LOCK_NB) - - lock(file_obj2, LOCK_SH) - unlock(file_obj2) - - unlock(file_obj1) - lock(file_obj2, LOCK_EX) - unlock(file_obj2) - file_obj1.close() - - -@t.skip.if_win32 -class test_FilesystemLockDuringProcess: - def setup(self): - try: - data_folder_in = tempfile.mkdtemp() - data_folder_out = tempfile.mkdtemp() - control_folder = tempfile.mkdtemp() - except Exception: - pytest.skip("filesystem transport: cannot create tempfiles") - - self.consumer_connection = Connection( - transport="filesystem", - transport_options={ - "data_folder_in": data_folder_in, - "data_folder_out": data_folder_out, - "control_folder": control_folder, - }, - ) - self.consume_channel = self.consumer_connection.channel() - self.produce_connection = Connection( - transport="filesystem", - transport_options={ - "data_folder_in": data_folder_out, - "data_folder_out": data_folder_in, - "control_folder": control_folder, - }, - ) - self.producer_channel = self.produce_connection.channel() - self.exchange = Exchange("filesystem_exchange_lock", type="fanout") - self.q = Queue("queue1", exchange=self.exchange) - - def teardown(self): - # make sure we don't attempt to restore messages at shutdown. - for channel in [self.producer_channel, self.consumer_connection]: - try: - channel._qos._dirty.clear() - except AttributeError: - pass - try: - channel._qos._delivered.clear() - except AttributeError: - pass - - def test_lock_during_process(self): - producer = Producer(self.producer_channel, self.exchange) - - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - consumer = Consumer(self.consume_channel, self.q) - assert unlock_m.call_count == 1 - lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX) - - self.q(self.consume_channel).declare() - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - producer.publish({"foo": 1}) - assert unlock_m.call_count == 2 - assert lock_m.call_count == 2 - exchange_file_obj = unlock_m.call_args_list[0][0][0] - msg_file_obj = unlock_m.call_args_list[1][0][0] - assert lock_m.call_args_list == [call(exchange_file_obj, LOCK_SH), - call(msg_file_obj, LOCK_EX)] - - def callback(_, message): - message.ack() - - consumer.register_callback(callback) - consumer.consume() - - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - self.consume_channel.drain_events() - assert lock_m.call_count == 1 - assert unlock_m.call_count == 1 - lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_SH) - - producer.publish({"foo": 0}) - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - self.q(self.consume_channel).purge() - assert lock_m.call_count == 1 - assert unlock_m.call_count == 1 - lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX) |