summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorGao <mishanyo1001@gmail.com>2022-09-10 00:52:53 +0800
committerGitHub <noreply@github.com>2022-09-09 22:52:53 +0600
commitafcde0a0bd6fc7554146dc8f177702a723043516 (patch)
tree38279c9ff6946cd9112bd19c2a61f6fe28a66626 /t
parent8699920e050727d385a6d5a19c939e55a86688d6 (diff)
downloadkombu-afcde0a0bd6fc7554146dc8f177702a723043516.tar.gz
Revert "Solve Kombu filesystem transport not thread safe (#1593)" (#1595)
This reverts commit 8699920e050727d385a6d5a19c939e55a86688d6.
Diffstat (limited to 't')
-rw-r--r--t/unit/transport/test_filesystem.py112
1 files changed, 1 insertions, 111 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py
index c452e7ca..b22e3b8d 100644
--- a/t/unit/transport/test_filesystem.py
+++ b/t/unit/transport/test_filesystem.py
@@ -1,19 +1,17 @@
from __future__ import annotations
import tempfile
-from fcntl import LOCK_EX, LOCK_NB, LOCK_SH
from queue import Empty
-from unittest.mock import call, patch
import pytest
import t.skip
from kombu import Connection, Consumer, Exchange, Producer, Queue
-from kombu.transport.filesystem import lock, unlock
@t.skip.if_win32
class test_FilesystemTransport:
+
def setup(self):
self.channels = set()
try:
@@ -147,7 +145,6 @@ class test_FilesystemTransport:
@t.skip.if_win32
class test_FilesystemFanout:
-
def setup(self):
try:
data_folder_in = tempfile.mkdtemp()
@@ -237,110 +234,3 @@ class test_FilesystemFanout:
assert self.q2(self.consume_channel).get()
self.q2(self.consume_channel).purge()
assert self.q2(self.consume_channel).get() is None
-
-
-@t.skip.if_win32
-class test_FilesystemLock:
- def test_lock(self):
- file_obj1 = tempfile.NamedTemporaryFile()
- with open(file_obj1.name) as file_obj2:
- lock(file_obj1, LOCK_SH)
- with pytest.raises(BlockingIOError):
- lock(file_obj2, LOCK_EX | LOCK_NB)
-
- lock(file_obj2, LOCK_SH)
- unlock(file_obj2)
-
- unlock(file_obj1)
- lock(file_obj2, LOCK_EX)
- unlock(file_obj2)
- file_obj1.close()
-
-
-@t.skip.if_win32
-class test_FilesystemLockDuringProcess:
- def setup(self):
- try:
- data_folder_in = tempfile.mkdtemp()
- data_folder_out = tempfile.mkdtemp()
- control_folder = tempfile.mkdtemp()
- except Exception:
- pytest.skip("filesystem transport: cannot create tempfiles")
-
- self.consumer_connection = Connection(
- transport="filesystem",
- transport_options={
- "data_folder_in": data_folder_in,
- "data_folder_out": data_folder_out,
- "control_folder": control_folder,
- },
- )
- self.consume_channel = self.consumer_connection.channel()
- self.produce_connection = Connection(
- transport="filesystem",
- transport_options={
- "data_folder_in": data_folder_out,
- "data_folder_out": data_folder_in,
- "control_folder": control_folder,
- },
- )
- self.producer_channel = self.produce_connection.channel()
- self.exchange = Exchange("filesystem_exchange_lock", type="fanout")
- self.q = Queue("queue1", exchange=self.exchange)
-
- def teardown(self):
- # make sure we don't attempt to restore messages at shutdown.
- for channel in [self.producer_channel, self.consumer_connection]:
- try:
- channel._qos._dirty.clear()
- except AttributeError:
- pass
- try:
- channel._qos._delivered.clear()
- except AttributeError:
- pass
-
- def test_lock_during_process(self):
- producer = Producer(self.producer_channel, self.exchange)
-
- with patch("kombu.transport.filesystem.lock") as lock_m, patch(
- "kombu.transport.filesystem.unlock"
- ) as unlock_m:
- consumer = Consumer(self.consume_channel, self.q)
- assert unlock_m.call_count == 1
- lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX)
-
- self.q(self.consume_channel).declare()
- with patch("kombu.transport.filesystem.lock") as lock_m, patch(
- "kombu.transport.filesystem.unlock"
- ) as unlock_m:
- producer.publish({"foo": 1})
- assert unlock_m.call_count == 2
- assert lock_m.call_count == 2
- exchange_file_obj = unlock_m.call_args_list[0][0][0]
- msg_file_obj = unlock_m.call_args_list[1][0][0]
- assert lock_m.call_args_list == [call(exchange_file_obj, LOCK_SH),
- call(msg_file_obj, LOCK_EX)]
-
- def callback(_, message):
- message.ack()
-
- consumer.register_callback(callback)
- consumer.consume()
-
- with patch("kombu.transport.filesystem.lock") as lock_m, patch(
- "kombu.transport.filesystem.unlock"
- ) as unlock_m:
- self.consume_channel.drain_events()
- assert lock_m.call_count == 1
- assert unlock_m.call_count == 1
- lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_SH)
-
- producer.publish({"foo": 0})
- with patch("kombu.transport.filesystem.lock") as lock_m, patch(
- "kombu.transport.filesystem.unlock"
- ) as unlock_m:
- self.q(self.consume_channel).purge()
- assert lock_m.call_count == 1
- assert unlock_m.call_count == 1
- lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX)