summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorkarajan1001 <mishanyo1001@gmail.com>2022-09-12 19:08:44 +0800
committerAsif Saif Uddin <auvipy@gmail.com>2022-09-23 17:29:34 +0600
commit6ae9fac150305bb6ee83192a4cccf54613efcb2f (patch)
tree6492c01bda7f144d1feb1d8828dc5ba18ed9bba7 /t
parent2d88c43feff2cab9011568eb1abe44e13b7c7207 (diff)
downloadkombu-6ae9fac150305bb6ee83192a4cccf54613efcb2f.tar.gz
Solve Kombu filesystem transport not thread safe
partly fix: #398 1. add exclusive lock during the whole exchange file update. 2. add some unit test for file lock
Diffstat (limited to 't')
-rw-r--r--t/unit/transport/test_filesystem.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py
index b22e3b8d..20c7f47a 100644
--- a/t/unit/transport/test_filesystem.py
+++ b/t/unit/transport/test_filesystem.py
@@ -1,7 +1,9 @@
from __future__ import annotations
import tempfile
+from fcntl import LOCK_EX, LOCK_SH
from queue import Empty
+from unittest.mock import call, patch
import pytest
@@ -234,3 +236,69 @@ class test_FilesystemFanout:
assert self.q2(self.consume_channel).get()
self.q2(self.consume_channel).purge()
assert self.q2(self.consume_channel).get() is None
+
+
+@t.skip.if_win32
+class test_FilesystemLock:
+ def setup(self):
+ try:
+ data_folder_in = tempfile.mkdtemp()
+ data_folder_out = tempfile.mkdtemp()
+ control_folder = tempfile.mkdtemp()
+ except Exception:
+ pytest.skip("filesystem transport: cannot create tempfiles")
+
+ self.consumer_connection = Connection(
+ transport="filesystem",
+ transport_options={
+ "data_folder_in": data_folder_in,
+ "data_folder_out": data_folder_out,
+ "control_folder": control_folder,
+ },
+ )
+ self.consume_channel = self.consumer_connection.channel()
+ self.produce_connection = Connection(
+ transport="filesystem",
+ transport_options={
+ "data_folder_in": data_folder_out,
+ "data_folder_out": data_folder_in,
+ "control_folder": control_folder,
+ },
+ )
+ self.producer_channel = self.produce_connection.channel()
+ self.exchange = Exchange("filesystem_exchange_lock", type="fanout")
+ self.q = Queue("queue1", exchange=self.exchange)
+
+ def teardown(self):
+ # make sure we don't attempt to restore messages at shutdown.
+ for channel in [self.producer_channel, self.consumer_connection]:
+ try:
+ channel._qos._dirty.clear()
+ except AttributeError:
+ pass
+ try:
+ channel._qos._delivered.clear()
+ except AttributeError:
+ pass
+
+ def test_lock_during_process(self):
+ producer = Producer(self.producer_channel, self.exchange)
+
+ with patch("kombu.transport.filesystem.lock") as lock_m, patch(
+ "kombu.transport.filesystem.unlock"
+ ) as unlock_m:
+ Consumer(self.consume_channel, self.q)
+ assert unlock_m.call_count == 1
+ lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX)
+
+ self.q(self.consume_channel).declare()
+ with patch("kombu.transport.filesystem.lock") as lock_m, patch(
+ "kombu.transport.filesystem.unlock"
+ ) as unlock_m:
+ producer.publish({"foo": 1})
+ assert unlock_m.call_count == 2
+ assert lock_m.call_count == 2
+ exchange_file_obj = unlock_m.call_args_list[0][0][0]
+ msg_file_obj = unlock_m.call_args_list[1][0][0]
+ assert lock_m.call_args_list == [call(exchange_file_obj, LOCK_SH),
+ call(msg_file_obj, LOCK_EX)]