diff options
Diffstat (limited to 'kombu/transport/filesystem.py')
-rw-r--r-- | kombu/transport/filesystem.py | 87 |
1 files changed, 28 insertions, 59 deletions
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index eb07bba6..92d8c4e4 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -93,7 +93,6 @@ from __future__ import annotations import os import shutil -import signal import tempfile import uuid from collections import namedtuple @@ -112,26 +111,6 @@ from . import virtual VERSION = (1, 0, 0) __version__ = '.'.join(map(str, VERSION)) - -@contextmanager -def timeout_manager(seconds: int): - def timeout_handler(signum, frame): - # Now that flock retries automatically when interrupted, we need - # an exception to stop it - # This exception will propagate on the main thread, - # make sure you're calling flock there - raise InterruptedError - - original_handler = signal.signal(signal.SIGALRM, timeout_handler) - - try: - signal.alarm(seconds) - yield - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, original_handler) - - # needs win32all to work on Windows if os.name == 'nt': @@ -159,7 +138,7 @@ if os.name == 'nt': elif os.name == 'posix': import fcntl - from fcntl import LOCK_EX, LOCK_SH + from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa def lock(file, flags): """Create file lock.""" @@ -175,21 +154,6 @@ else: 'Filesystem plugin only defined for NT and POSIX platforms') -@contextmanager -def lock_with_timeout(file, flags, timeout: int = 1): - with timeout_manager(timeout): - try: - lock(file, flags) - yield - except InterruptedError: - # Catch the exception raised by the handler - # If we weren't raising an exception, - # flock would automatically retry on signals - raise BlockingIOError("Lock timed out") - finally: - unlock(file) - - exchange_queue_t = namedtuple("exchange_queue_t", ["routing_key", "pattern", "queue"]) @@ -204,14 +168,18 @@ class Channel(virtual.Channel): file = self.control_folder / f"{exchange}.exchange" if "w" in mode: self.control_folder.mkdir(exist_ok=True) - lock_mode = LOCK_EX if "w" in mode else LOCK_SH + f_obj = file.open(mode) - with file.open(mode) as f_obj: - try: - with lock_with_timeout(f_obj, lock_mode): - yield f_obj - except OSError as err: - raise ChannelError(f"Cannot open {file}") from err + try: + if "w" in mode: + lock(f_obj, LOCK_EX) + yield f_obj + except OSError: + raise ChannelError(f"Cannot open {file}") + finally: + if "w" in mode: + unlock(f_obj) + f_obj.close() def get_table(self, exchange): try: @@ -241,12 +209,15 @@ class Channel(virtual.Channel): filename = os.path.join(self.data_folder_out, filename) try: - with open(filename, 'wb') as f: - with lock_with_timeout(f, LOCK_EX): - f.write(str_to_bytes(dumps(payload))) - except OSError as err: + f = open(filename, 'wb') + lock(f, LOCK_EX) + f.write(str_to_bytes(dumps(payload))) + except OSError: raise ChannelError( - f'Cannot add file {filename!r} to directory') from err + f'Cannot add file {filename!r} to directory') + finally: + unlock(f) + f.close() def _get(self, queue): """Get next message from `queue`.""" @@ -274,14 +245,14 @@ class Channel(virtual.Channel): filename = os.path.join(processed_folder, filename) try: - with open(filename, 'rb') as f: - with lock_with_timeout(f, LOCK_SH): - payload = f.read() - if not self.store_processed: - os.remove(filename) - except OSError as err: + f = open(filename, 'rb') + payload = f.read() + f.close() + if not self.store_processed: + os.remove(filename) + except OSError: raise ChannelError( - f'Cannot read file {filename!r} from queue.') from err + f'Cannot read file {filename!r} from queue.') return loads(bytes_to_str(payload)) @@ -301,9 +272,7 @@ class Channel(virtual.Channel): continue filename = os.path.join(self.data_folder_in, filename) - with open(filename, 'wb') as f: - with lock_with_timeout(f, LOCK_EX): - os.remove(filename) + os.remove(filename) count += 1 |