diff options
| author | pgjones <philip.graham.jones@googlemail.com> | 2022-07-23 11:56:12 +0100 |
|---|---|---|
| committer | Phil Jones <philip.graham.jones@googlemail.com> | 2023-01-24 20:19:57 +0000 |
| commit | 5ed9c956aaf68a8e2defa9722109aa2c7bcf7ae1 (patch) | |
| tree | f103622c530a5a4fa121319e4b35d040ebcf80dc /src | |
| parent | 0d4ca6e72c155e30aedd4315e8678ee9cada32b4 (diff) | |
| download | blinker-5ed9c956aaf68a8e2defa9722109aa2c7bcf7ae1.tar.gz | |
Add a send_async method to the Signal
This allows for signals to send to coroutine receivers by awaiting
them. The _async_wrapper and _sync_wrapper allows for conversion of
sync and async receivers as required if defined. If not defined a
runtime error is raised.
The wrappers are used to avoid any direct tie into asyncio, trio,
greenbacks, asgiref, or other specific async implementation.
Diffstat (limited to 'src')
| -rw-r--r-- | src/blinker/_utilities.py | 37 | ||||
| -rw-r--r-- | src/blinker/base.py | 54 |
2 files changed, 82 insertions, 9 deletions
diff --git a/src/blinker/_utilities.py b/src/blinker/_utilities.py index 68e8422..22beb81 100644 --- a/src/blinker/_utilities.py +++ b/src/blinker/_utilities.py @@ -1,3 +1,8 @@ +import asyncio +import inspect +import sys +from functools import partial +from typing import Any from weakref import ref from blinker._saferef import BoundMethodWeakref @@ -93,3 +98,35 @@ class lazy_property: value = self._deferred(obj) setattr(obj, self._deferred.__name__, value) return value + + +def is_coroutine_function(func: Any) -> bool: + # Python < 3.8 does not correctly determine partially wrapped + # coroutine functions are coroutine functions, hence the need for + # this to exist. Code taken from CPython. + if sys.version_info >= (3, 8): + return asyncio.iscoroutinefunction(func) + else: + # Note that there is something special about the AsyncMock + # such that it isn't determined as a coroutine function + # without an explicit check. + try: + from unittest.mock import AsyncMock + + if isinstance(func, AsyncMock): + return True + except ImportError: + # Not testing, no asynctest to import + pass + + while inspect.ismethod(func): + func = func.__func__ + while isinstance(func, partial): + func = func.func + if not inspect.isfunction(func): + return False + result = bool(func.__code__.co_flags & inspect.CO_COROUTINE) + return ( + result + or getattr(func, "_is_coroutine", None) is asyncio.coroutines._is_coroutine + ) diff --git a/src/blinker/base.py b/src/blinker/base.py index f80750c..8d41721 100644 --- a/src/blinker/base.py +++ b/src/blinker/base.py @@ -13,6 +13,7 @@ from warnings import warn from weakref import WeakValueDictionary from blinker._utilities import hashable_identity +from blinker._utilities import is_coroutine_function from blinker._utilities import lazy_property from blinker._utilities import reference from blinker._utilities import symbol @@ -242,7 +243,7 @@ class Signal: ) return self.connected_to(receiver, sender) - def send(self, *sender, **kwargs): + def send(self, *sender, _async_wrapper=None, **kwargs): """Emit this signal on behalf of *sender*, passing on ``kwargs``. Returns a list of 2-tuples, pairing receivers with their return @@ -250,9 +251,51 @@ class Signal: :param sender: Any object or ``None``. If omitted, synonymous with ``None``. Only accepts one positional argument. + :param _async_wrapper: A callable that should wrap a coroutine + receiver and run it when called synchronously. :param kwargs: Data to be sent to receivers. """ + if self.is_muted: + return [] + + sender = self._extract_sender(sender) + results = [] + for receiver in self.receivers_for(sender): + if is_coroutine_function(receiver): + if _async_wrapper is None: + raise RuntimeError("Cannot send to a coroutine function") + receiver = _async_wrapper(receiver) + results.append((receiver, receiver(sender, **kwargs))) + return results + + async def send_async(self, *sender, _sync_wrapper=None, **kwargs): + """Emit this signal on behalf of *sender*, passing on ``kwargs``. + + Returns a list of 2-tuples, pairing receivers with their return + value. The ordering of receiver notification is undefined. + + :param sender: Any object or ``None``. If omitted, synonymous + with ``None``. Only accepts one positional argument. + :param _sync_wrapper: A callable that should wrap a synchronous + receiver and run it when awaited. + + :param kwargs: Data to be sent to receivers. + """ + if self.is_muted: + return [] + + sender = self._extract_sender(sender) + results = [] + for receiver in self.receivers_for(sender): + if not is_coroutine_function(receiver): + if _sync_wrapper is None: + raise RuntimeError("Cannot send to a non-coroutine function") + receiver = _sync_wrapper(receiver) + results.append((receiver, await receiver(sender, **kwargs))) + return results + + def _extract_sender(self, sender): if not self.receivers: # Ensure correct signature even on no-op sends, disable with -O # for lowest possible cost. @@ -273,14 +316,7 @@ class Signal: ) else: sender = sender[0] - - if self.is_muted: - return [] - else: - return [ - (receiver, receiver(sender, **kwargs)) - for receiver in self.receivers_for(sender) - ] + return sender def has_receivers_for(self, sender): """True if there is probably a receiver for *sender*. |
