summaryrefslogtreecommitdiff
path: root/kombu/transport/filesystem.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/filesystem.py')
-rw-r--r--kombu/transport/filesystem.py87
1 files changed, 28 insertions, 59 deletions
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index eb07bba6..92d8c4e4 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -93,7 +93,6 @@ from __future__ import annotations
import os
import shutil
-import signal
import tempfile
import uuid
from collections import namedtuple
@@ -112,26 +111,6 @@ from . import virtual
VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))
-
-@contextmanager
-def timeout_manager(seconds: int):
- def timeout_handler(signum, frame):
- # Now that flock retries automatically when interrupted, we need
- # an exception to stop it
- # This exception will propagate on the main thread,
- # make sure you're calling flock there
- raise InterruptedError
-
- original_handler = signal.signal(signal.SIGALRM, timeout_handler)
-
- try:
- signal.alarm(seconds)
- yield
- finally:
- signal.alarm(0)
- signal.signal(signal.SIGALRM, original_handler)
-
-
# needs win32all to work on Windows
if os.name == 'nt':
@@ -159,7 +138,7 @@ if os.name == 'nt':
elif os.name == 'posix':
import fcntl
- from fcntl import LOCK_EX, LOCK_SH
+ from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa
def lock(file, flags):
"""Create file lock."""
@@ -175,21 +154,6 @@ else:
'Filesystem plugin only defined for NT and POSIX platforms')
-@contextmanager
-def lock_with_timeout(file, flags, timeout: int = 1):
- with timeout_manager(timeout):
- try:
- lock(file, flags)
- yield
- except InterruptedError:
- # Catch the exception raised by the handler
- # If we weren't raising an exception,
- # flock would automatically retry on signals
- raise BlockingIOError("Lock timed out")
- finally:
- unlock(file)
-
-
exchange_queue_t = namedtuple("exchange_queue_t",
["routing_key", "pattern", "queue"])
@@ -204,14 +168,18 @@ class Channel(virtual.Channel):
file = self.control_folder / f"{exchange}.exchange"
if "w" in mode:
self.control_folder.mkdir(exist_ok=True)
- lock_mode = LOCK_EX if "w" in mode else LOCK_SH
+ f_obj = file.open(mode)
- with file.open(mode) as f_obj:
- try:
- with lock_with_timeout(f_obj, lock_mode):
- yield f_obj
- except OSError as err:
- raise ChannelError(f"Cannot open {file}") from err
+ try:
+ if "w" in mode:
+ lock(f_obj, LOCK_EX)
+ yield f_obj
+ except OSError:
+ raise ChannelError(f"Cannot open {file}")
+ finally:
+ if "w" in mode:
+ unlock(f_obj)
+ f_obj.close()
def get_table(self, exchange):
try:
@@ -241,12 +209,15 @@ class Channel(virtual.Channel):
filename = os.path.join(self.data_folder_out, filename)
try:
- with open(filename, 'wb') as f:
- with lock_with_timeout(f, LOCK_EX):
- f.write(str_to_bytes(dumps(payload)))
- except OSError as err:
+ f = open(filename, 'wb')
+ lock(f, LOCK_EX)
+ f.write(str_to_bytes(dumps(payload)))
+ except OSError:
raise ChannelError(
- f'Cannot add file {filename!r} to directory') from err
+ f'Cannot add file {filename!r} to directory')
+ finally:
+ unlock(f)
+ f.close()
def _get(self, queue):
"""Get next message from `queue`."""
@@ -274,14 +245,14 @@ class Channel(virtual.Channel):
filename = os.path.join(processed_folder, filename)
try:
- with open(filename, 'rb') as f:
- with lock_with_timeout(f, LOCK_SH):
- payload = f.read()
- if not self.store_processed:
- os.remove(filename)
- except OSError as err:
+ f = open(filename, 'rb')
+ payload = f.read()
+ f.close()
+ if not self.store_processed:
+ os.remove(filename)
+ except OSError:
raise ChannelError(
- f'Cannot read file {filename!r} from queue.') from err
+ f'Cannot read file {filename!r} from queue.')
return loads(bytes_to_str(payload))
@@ -301,9 +272,7 @@ class Channel(virtual.Channel):
continue
filename = os.path.join(self.data_folder_in, filename)
- with open(filename, 'wb') as f:
- with lock_with_timeout(f, LOCK_EX):
- os.remove(filename)
+ os.remove(filename)
count += 1