diff options
author | karajan1001 <mishanyo1001@gmail.com> | 2022-09-12 19:08:44 +0800 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2022-09-23 17:29:34 +0600 |
commit | 6ae9fac150305bb6ee83192a4cccf54613efcb2f (patch) | |
tree | 6492c01bda7f144d1feb1d8828dc5ba18ed9bba7 /t/unit | |
parent | 2d88c43feff2cab9011568eb1abe44e13b7c7207 (diff) | |
download | kombu-6ae9fac150305bb6ee83192a4cccf54613efcb2f.tar.gz |
Solve Kombu filesystem transport not thread safe
partly fix: #398
1. add exclusive lock during the whole exchange file update.
2. add some unit test for file lock
Diffstat (limited to 't/unit')
-rw-r--r-- | t/unit/transport/test_filesystem.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index b22e3b8d..20c7f47a 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -1,7 +1,9 @@ from __future__ import annotations import tempfile +from fcntl import LOCK_EX, LOCK_SH from queue import Empty +from unittest.mock import call, patch import pytest @@ -234,3 +236,69 @@ class test_FilesystemFanout: assert self.q2(self.consume_channel).get() self.q2(self.consume_channel).purge() assert self.q2(self.consume_channel).get() is None + + +@t.skip.if_win32 +class test_FilesystemLock: + def setup(self): + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + control_folder = tempfile.mkdtemp() + except Exception: + pytest.skip("filesystem transport: cannot create tempfiles") + + self.consumer_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_in, + "data_folder_out": data_folder_out, + "control_folder": control_folder, + }, + ) + self.consume_channel = self.consumer_connection.channel() + self.produce_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_out, + "data_folder_out": data_folder_in, + "control_folder": control_folder, + }, + ) + self.producer_channel = self.produce_connection.channel() + self.exchange = Exchange("filesystem_exchange_lock", type="fanout") + self.q = Queue("queue1", exchange=self.exchange) + + def teardown(self): + # make sure we don't attempt to restore messages at shutdown. + for channel in [self.producer_channel, self.consumer_connection]: + try: + channel._qos._dirty.clear() + except AttributeError: + pass + try: + channel._qos._delivered.clear() + except AttributeError: + pass + + def test_lock_during_process(self): + producer = Producer(self.producer_channel, self.exchange) + + with patch("kombu.transport.filesystem.lock") as lock_m, patch( + "kombu.transport.filesystem.unlock" + ) as unlock_m: + Consumer(self.consume_channel, self.q) + assert unlock_m.call_count == 1 + lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX) + + self.q(self.consume_channel).declare() + with patch("kombu.transport.filesystem.lock") as lock_m, patch( + "kombu.transport.filesystem.unlock" + ) as unlock_m: + producer.publish({"foo": 1}) + assert unlock_m.call_count == 2 + assert lock_m.call_count == 2 + exchange_file_obj = unlock_m.call_args_list[0][0][0] + msg_file_obj = unlock_m.call_args_list[1][0][0] + assert lock_m.call_args_list == [call(exchange_file_obj, LOCK_SH), + call(msg_file_obj, LOCK_EX)] |