diff options
Diffstat (limited to 'kombu/asynchronous/hub.py')
| -rw-r--r-- | kombu/asynchronous/hub.py | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py index b1f7e241..e5b1163c 100644 --- a/kombu/asynchronous/hub.py +++ b/kombu/asynchronous/hub.py @@ -1,6 +1,9 @@ """Event loop implementation.""" +from __future__ import annotations + import errno +import threading from contextlib import contextmanager from queue import Empty from time import sleep @@ -18,7 +21,7 @@ from .timer import Timer __all__ = ('Hub', 'get_event_loop', 'set_event_loop') logger = get_logger(__name__) -_current_loop = None +_current_loop: Hub | None = None W_UNKNOWN_EVENT = """\ Received unknown event %r for fd %r, please contact support!\ @@ -38,12 +41,12 @@ def _dummy_context(*args, **kwargs): yield -def get_event_loop(): +def get_event_loop() -> Hub | None: """Get current event loop object.""" return _current_loop -def set_event_loop(loop): +def set_event_loop(loop: Hub | None) -> Hub | None: """Set the current event loop object.""" global _current_loop _current_loop = loop @@ -78,6 +81,7 @@ class Hub: self.on_tick = set() self.on_close = set() self._ready = set() + self._ready_lock = threading.Lock() self._running = False self._loop = None @@ -198,7 +202,8 @@ class Hub: def call_soon(self, callback, *args): if not isinstance(callback, Thenable): callback = promise(callback, args) - self._ready.add(callback) + with self._ready_lock: + self._ready.add(callback) return callback def call_later(self, delay, callback, *args): @@ -242,6 +247,12 @@ class Hub: except (AttributeError, KeyError, OSError): pass + def _pop_ready(self): + with self._ready_lock: + ready = self._ready + self._ready = set() + return ready + def close(self, *args): [self._unregister(fd) for fd in self.readers] self.readers.clear() @@ -257,8 +268,7 @@ class Hub: # To avoid infinite loop where one of the callables adds items # to self._ready (via call_soon or otherwise). # we create new list with current self._ready - todos = list(self._ready) - self._ready = set() + todos = self._pop_ready() for item in todos: item() @@ -288,17 +298,17 @@ class Hub: propagate = self.propagate_errors while 1: - todo = self._ready - self._ready = set() - - for tick_callback in on_tick: - tick_callback() + todo = self._pop_ready() for item in todo: if item: item() poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 + + for tick_callback in on_tick: + tick_callback() + # print('[[[HUB]]]: %s' % (self.repr_active(),)) if readers or writers: to_consolidate = [] |
