summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorpgjones <philip.graham.jones@googlemail.com>2022-07-23 11:56:12 +0100
committerPhil Jones <philip.graham.jones@googlemail.com>2023-01-24 20:19:57 +0000
commit5ed9c956aaf68a8e2defa9722109aa2c7bcf7ae1 (patch)
treef103622c530a5a4fa121319e4b35d040ebcf80dc /src
parent0d4ca6e72c155e30aedd4315e8678ee9cada32b4 (diff)
downloadblinker-5ed9c956aaf68a8e2defa9722109aa2c7bcf7ae1.tar.gz
Add a send_async method to the Signal
This allows for signals to send to coroutine receivers by awaiting them. The _async_wrapper and _sync_wrapper allows for conversion of sync and async receivers as required if defined. If not defined a runtime error is raised. The wrappers are used to avoid any direct tie into asyncio, trio, greenbacks, asgiref, or other specific async implementation.
Diffstat (limited to 'src')
-rw-r--r--src/blinker/_utilities.py37
-rw-r--r--src/blinker/base.py54
2 files changed, 82 insertions, 9 deletions
diff --git a/src/blinker/_utilities.py b/src/blinker/_utilities.py
index 68e8422..22beb81 100644
--- a/src/blinker/_utilities.py
+++ b/src/blinker/_utilities.py
@@ -1,3 +1,8 @@
+import asyncio
+import inspect
+import sys
+from functools import partial
+from typing import Any
from weakref import ref
from blinker._saferef import BoundMethodWeakref
@@ -93,3 +98,35 @@ class lazy_property:
value = self._deferred(obj)
setattr(obj, self._deferred.__name__, value)
return value
+
+
+def is_coroutine_function(func: Any) -> bool:
+ # Python < 3.8 does not correctly determine partially wrapped
+ # coroutine functions are coroutine functions, hence the need for
+ # this to exist. Code taken from CPython.
+ if sys.version_info >= (3, 8):
+ return asyncio.iscoroutinefunction(func)
+ else:
+ # Note that there is something special about the AsyncMock
+ # such that it isn't determined as a coroutine function
+ # without an explicit check.
+ try:
+ from unittest.mock import AsyncMock
+
+ if isinstance(func, AsyncMock):
+ return True
+ except ImportError:
+ # Not testing, no asynctest to import
+ pass
+
+ while inspect.ismethod(func):
+ func = func.__func__
+ while isinstance(func, partial):
+ func = func.func
+ if not inspect.isfunction(func):
+ return False
+ result = bool(func.__code__.co_flags & inspect.CO_COROUTINE)
+ return (
+ result
+ or getattr(func, "_is_coroutine", None) is asyncio.coroutines._is_coroutine
+ )
diff --git a/src/blinker/base.py b/src/blinker/base.py
index f80750c..8d41721 100644
--- a/src/blinker/base.py
+++ b/src/blinker/base.py
@@ -13,6 +13,7 @@ from warnings import warn
from weakref import WeakValueDictionary
from blinker._utilities import hashable_identity
+from blinker._utilities import is_coroutine_function
from blinker._utilities import lazy_property
from blinker._utilities import reference
from blinker._utilities import symbol
@@ -242,7 +243,7 @@ class Signal:
)
return self.connected_to(receiver, sender)
- def send(self, *sender, **kwargs):
+ def send(self, *sender, _async_wrapper=None, **kwargs):
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -250,9 +251,51 @@ class Signal:
:param sender: Any object or ``None``. If omitted, synonymous
with ``None``. Only accepts one positional argument.
+ :param _async_wrapper: A callable that should wrap a coroutine
+ receiver and run it when called synchronously.
:param kwargs: Data to be sent to receivers.
"""
+ if self.is_muted:
+ return []
+
+ sender = self._extract_sender(sender)
+ results = []
+ for receiver in self.receivers_for(sender):
+ if is_coroutine_function(receiver):
+ if _async_wrapper is None:
+ raise RuntimeError("Cannot send to a coroutine function")
+ receiver = _async_wrapper(receiver)
+ results.append((receiver, receiver(sender, **kwargs)))
+ return results
+
+ async def send_async(self, *sender, _sync_wrapper=None, **kwargs):
+ """Emit this signal on behalf of *sender*, passing on ``kwargs``.
+
+ Returns a list of 2-tuples, pairing receivers with their return
+ value. The ordering of receiver notification is undefined.
+
+ :param sender: Any object or ``None``. If omitted, synonymous
+ with ``None``. Only accepts one positional argument.
+ :param _sync_wrapper: A callable that should wrap a synchronous
+ receiver and run it when awaited.
+
+ :param kwargs: Data to be sent to receivers.
+ """
+ if self.is_muted:
+ return []
+
+ sender = self._extract_sender(sender)
+ results = []
+ for receiver in self.receivers_for(sender):
+ if not is_coroutine_function(receiver):
+ if _sync_wrapper is None:
+ raise RuntimeError("Cannot send to a non-coroutine function")
+ receiver = _sync_wrapper(receiver)
+ results.append((receiver, await receiver(sender, **kwargs)))
+ return results
+
+ def _extract_sender(self, sender):
if not self.receivers:
# Ensure correct signature even on no-op sends, disable with -O
# for lowest possible cost.
@@ -273,14 +316,7 @@ class Signal:
)
else:
sender = sender[0]
-
- if self.is_muted:
- return []
- else:
- return [
- (receiver, receiver(sender, **kwargs))
- for receiver in self.receivers_for(sender)
- ]
+ return sender
def has_receivers_for(self, sender):
"""True if there is probably a receiver for *sender*.