From d841e82241a209573a1da638aa64921c7e65b811 Mon Sep 17 00:00:00 2001 From: jason kirtland Date: Thu, 23 Jul 2015 17:18:39 +0200 Subject: Adds Signal.send_async for asyncio --- CHANGES | 2 ++ blinker/__init__.py | 5 +++++ blinker/_async.py | 28 ++++++++++++++++++++++++++++ blinker/base.py | 11 +++++++++++ tests/_test_async.py | 43 +++++++++++++++++++++++++++++++++++++++++++ tests/test_signals.py | 5 +++++ 6 files changed, 94 insertions(+) create mode 100644 blinker/_async.py create mode 100644 tests/_test_async.py diff --git a/CHANGES b/CHANGES index 9346a3c..880c314 100644 --- a/CHANGES +++ b/CHANGES @@ -5,6 +5,8 @@ Blinker Changelog Version 1.5dev -------------- +- Added Signal.send_async, dispatching to an arbitrary mix of connected + coroutines and receiver functions. Version 1.4 ----------- diff --git a/blinker/__init__.py b/blinker/__init__.py index 57e1be8..a3251bd 100644 --- a/blinker/__init__.py +++ b/blinker/__init__.py @@ -20,3 +20,8 @@ __all__ = [ __version__ = '1.5dev' + +try: + import blinker._async +except (ImportError, SyntaxError): + pass diff --git a/blinker/_async.py b/blinker/_async.py new file mode 100644 index 0000000..5a88f13 --- /dev/null +++ b/blinker/_async.py @@ -0,0 +1,28 @@ +import asyncio + +from blinker.base import Signal + + +try: + schedule = asyncio.ensure_future +except AttributeError: + schedule = asyncio.async + + +@asyncio.coroutine +def _wrap_plain_value(value): + """Pass through a coroutine *value* or wrap a plain value.""" + if asyncio.iscoroutine(value): + value = yield from value + return value + + +def send_async(self, *sender, **kwargs): + return [(receiver, schedule(_wrap_plain_value(value))) + for receiver, value + in self.send(*sender, **kwargs)] + + +send_async.__doc__ = Signal.send_async.__doc__ +Signal.send_async = send_async + diff --git a/blinker/base.py b/blinker/base.py index cc5880e..67aa41d 100644 --- a/blinker/base.py +++ b/blinker/base.py @@ -266,6 +266,17 @@ class Signal(object): return [(receiver, receiver(sender, **kwargs)) for receiver in self.receivers_for(sender)] + def send_async(self, *sender, **kwargs): + """Send and collect results from connected functions and coroutines. + + As `Signal.send`, but also schedules any coroutines connected to the + signal, and uniformly presents all receiver return values as futures, + even if one or more receivers are regular functions. + + Available only if asyncio and `yield from` are present. + """ + raise NotImplementedError("asyncio support unavailable") + def has_receivers_for(self, sender): """True if there is probably a receiver for *sender*. diff --git a/tests/_test_async.py b/tests/_test_async.py new file mode 100644 index 0000000..35f3ced --- /dev/null +++ b/tests/_test_async.py @@ -0,0 +1,43 @@ +import asyncio + +import blinker + + +def test_send_async(): + calls = [] + + @asyncio.coroutine + def receiver_a(sender): + calls.append(receiver_a) + return 'value a' + + @asyncio.coroutine + def receiver_b(sender): + calls.append(receiver_b) + return 'value b' + + def receiver_c(sender): + calls.append(receiver_c) + return 'value c' + + sig = blinker.Signal() + sig.connect(receiver_a) + sig.connect(receiver_b) + sig.connect(receiver_c) + + @asyncio.coroutine + def collect(): + return sig.send_async() + + loop = asyncio.get_event_loop() + results = loop.run_until_complete(collect()) + + expected = { + receiver_a: 'value a', + receiver_b: 'value b', + receiver_c: 'value c', + } + + assert set(calls) == set(expected.keys()) + collected_results = {v.result() for r, v in results} + assert collected_results == set(expected.values()) diff --git a/tests/test_signals.py b/tests/test_signals.py index a1172ed..e651465 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -10,6 +10,11 @@ from nose.tools import assert_raises jython = sys.platform.startswith('java') pypy = hasattr(sys, 'pypy_version_info') +try: + from _test_async import test_send_async +except (SyntaxError, ImportError): + pass + def collect_acyclic_refs(): # cpython releases these immediately without a collection -- cgit v1.2.1 From 5936e96bb059dc766d0e87fc569f61dfb5eca963 Mon Sep 17 00:00:00 2001 From: Youri Ackx Date: Fri, 2 Oct 2020 16:59:19 +0200 Subject: Fix incorrect import statement - Attempt to import asyncio.create_task - Fallback on asyncio.ensure_future - Remove obsolete asyncio.async call --- blinker/_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blinker/_async.py b/blinker/_async.py index 5a88f13..2b530e4 100644 --- a/blinker/_async.py +++ b/blinker/_async.py @@ -4,9 +4,9 @@ from blinker.base import Signal try: - schedule = asyncio.ensure_future + schedule = asyncio.create_task except AttributeError: - schedule = asyncio.async + schedule = asyncio.ensure_future @asyncio.coroutine -- cgit v1.2.1 From c7b83a18a54efa9e08a9446e2d16956ee5fe353b Mon Sep 17 00:00:00 2001 From: Youri Ackx Date: Fri, 2 Oct 2020 17:17:42 +0200 Subject: Document support for send_async --- README.md | 3 +++ docs/source/index.rst | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/README.md b/README.md index 907a3ec..e9963a6 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,9 @@ interested parties to subscribe to events, or "signals". Signal receivers can subscribe to specific senders or receive signals sent by any sender. +It supports dispatching to an arbitrary mix of connected +coroutines and receiver functions. + ```python >>> from blinker import signal >>> started = signal('round-started') diff --git a/docs/source/index.rst b/docs/source/index.rst index bdb40ef..521e7c5 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -14,6 +14,7 @@ The core of Blinker is quite small but provides powerful features: - sending arbitrary data payloads - collecting return values from signal receivers - thread safety + - coroutines as signal receivers Blinker was written by Jason Kirtand and is provided under the MIT License. The library supports Python 2.7 and Python 3.5 or later; @@ -94,6 +95,35 @@ notifications being sent, and these no-op sends are optimized to be as inexpensive as possible. +Async support +------------- + +Send a signal asynchronously to coroutine receivers. + + >>> async def receiver_a(sender): + ... return 'value a' + ... + >>> async def receiver_b(sender): + ... return 'value b' + ... + >>> ready = signal('ready') + >>> ready.connect(receiver_a) + >>> ready.connect(receiver_b) + ... + >>> async def collect(): + ... return ready.send_async('sender') + ... + >>> loop = asyncio.get_event_loop() + >>> results = loop.run_until_complete(collect()) + >>> len(results) + 2 + >>> [v.result() for r, v in results][0] + value a + +Dispatching to an arbitrary mix of connected +coroutines and receiver functions is supported. + + Subscribing to Specific Senders ------------------------------- -- cgit v1.2.1