summaryrefslogtreecommitdiff
path: root/kombu/asynchronous/hub.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/asynchronous/hub.py')
-rw-r--r--kombu/asynchronous/hub.py32
1 files changed, 21 insertions, 11 deletions
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py
index b1f7e241..e5b1163c 100644
--- a/kombu/asynchronous/hub.py
+++ b/kombu/asynchronous/hub.py
@@ -1,6 +1,9 @@
"""Event loop implementation."""
+from __future__ import annotations
+
import errno
+import threading
from contextlib import contextmanager
from queue import Empty
from time import sleep
@@ -18,7 +21,7 @@ from .timer import Timer
__all__ = ('Hub', 'get_event_loop', 'set_event_loop')
logger = get_logger(__name__)
-_current_loop = None
+_current_loop: Hub | None = None
W_UNKNOWN_EVENT = """\
Received unknown event %r for fd %r, please contact support!\
@@ -38,12 +41,12 @@ def _dummy_context(*args, **kwargs):
yield
-def get_event_loop():
+def get_event_loop() -> Hub | None:
"""Get current event loop object."""
return _current_loop
-def set_event_loop(loop):
+def set_event_loop(loop: Hub | None) -> Hub | None:
"""Set the current event loop object."""
global _current_loop
_current_loop = loop
@@ -78,6 +81,7 @@ class Hub:
self.on_tick = set()
self.on_close = set()
self._ready = set()
+ self._ready_lock = threading.Lock()
self._running = False
self._loop = None
@@ -198,7 +202,8 @@ class Hub:
def call_soon(self, callback, *args):
if not isinstance(callback, Thenable):
callback = promise(callback, args)
- self._ready.add(callback)
+ with self._ready_lock:
+ self._ready.add(callback)
return callback
def call_later(self, delay, callback, *args):
@@ -242,6 +247,12 @@ class Hub:
except (AttributeError, KeyError, OSError):
pass
+ def _pop_ready(self):
+ with self._ready_lock:
+ ready = self._ready
+ self._ready = set()
+ return ready
+
def close(self, *args):
[self._unregister(fd) for fd in self.readers]
self.readers.clear()
@@ -257,8 +268,7 @@ class Hub:
# To avoid infinite loop where one of the callables adds items
# to self._ready (via call_soon or otherwise).
# we create new list with current self._ready
- todos = list(self._ready)
- self._ready = set()
+ todos = self._pop_ready()
for item in todos:
item()
@@ -288,17 +298,17 @@ class Hub:
propagate = self.propagate_errors
while 1:
- todo = self._ready
- self._ready = set()
-
- for tick_callback in on_tick:
- tick_callback()
+ todo = self._pop_ready()
for item in todo:
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
+
+ for tick_callback in on_tick:
+ tick_callback()
+
# print('[[[HUB]]]: %s' % (self.repr_active(),))
if readers or writers:
to_consolidate = []