summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpgjones <philip.graham.jones@googlemail.com>2023-02-26 11:17:50 +0000
committerPhil Jones <philip.graham.jones@googlemail.com>2023-04-02 14:58:03 +0100
commitfae6164cda6c5e9af89895a22f700402fa4b5ec5 (patch)
treea82ad0c87c84c6c43955c6908f58bfd8c232f453
parent5ed9c956aaf68a8e2defa9722109aa2c7bcf7ae1 (diff)
downloadblinker-fae6164cda6c5e9af89895a22f700402fa4b5ec5.tar.gz
Add an initial set of type hints
These focus on the public API with the private aspect omitted. This is because these are hard to type, and may not be required with modern python. However, this should still be useful if adopted for code that uses blinker.
-rw-r--r--MANIFEST.in1
-rw-r--r--pyproject.toml22
-rw-r--r--requirements/typing.in1
-rw-r--r--requirements/typing.txt13
-rw-r--r--src/blinker/_saferef.py2
-rw-r--r--src/blinker/_utilities.py27
-rw-r--r--src/blinker/base.py111
-rw-r--r--src/blinker/py.typed0
-rw-r--r--tests/test_signals.py1
-rw-r--r--tox.ini5
10 files changed, 135 insertions, 48 deletions
diff --git a/MANIFEST.in b/MANIFEST.in
index cab5663..41180b4 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,5 +1,6 @@
include CHANGES.rst
include tox.ini
+include src/blinker/py.typed
graft docs
prune docs/_build
graft tests
diff --git a/pyproject.toml b/pyproject.toml
index 65e5e5d..8dd5b87 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -24,6 +24,7 @@ classifiers = [
]
requires-python = ">= 3.7"
dynamic = ["version"]
+dependencies = ["typing-extensions"]
[project.urls]
Homepage = "https://blinker.readthedocs.io"
@@ -36,6 +37,27 @@ Chat = "https://discord.gg/pallets"
file = "README.rst"
content-type = "text/x-rst"
+[tool.mypy]
+python_version = "3.7"
+files = ["src/blinker"]
+show_error_codes = true
+pretty = true
+#strict = true
+allow_redefinition = true
+disallow_subclassing_any = true
+#disallow_untyped_calls = true
+#disallow_untyped_defs = true
+disallow_incomplete_defs = true
+no_implicit_optional = true
+local_partial_types = true
+no_implicit_reexport = true
+strict_equality = true
+warn_redundant_casts = true
+warn_unused_configs = true
+warn_unused_ignores = true
+warn_return_any = true
+#warn_unreachable = True
+
[tool.setuptools]
license-files = ["LICENSE.rst"]
include-package-data = false
diff --git a/requirements/typing.in b/requirements/typing.in
new file mode 100644
index 0000000..f0aa93a
--- /dev/null
+++ b/requirements/typing.in
@@ -0,0 +1 @@
+mypy
diff --git a/requirements/typing.txt b/requirements/typing.txt
new file mode 100644
index 0000000..074101b
--- /dev/null
+++ b/requirements/typing.txt
@@ -0,0 +1,13 @@
+# SHA1:7983aaa01d64547827c20395d77e248c41b2572f
+#
+# This file is autogenerated by pip-compile-multi
+# To update, run:
+#
+# pip-compile-multi
+#
+mypy==1.0.1
+ # via -r requirements/typing.in
+mypy-extensions==1.0.0
+ # via mypy
+typing-extensions==4.5.0
+ # via mypy
diff --git a/src/blinker/_saferef.py b/src/blinker/_saferef.py
index 29479d4..39ac90c 100644
--- a/src/blinker/_saferef.py
+++ b/src/blinker/_saferef.py
@@ -108,7 +108,7 @@ class BoundMethodWeakref:
produce the same BoundMethodWeakref instance.
"""
- _all_instances = weakref.WeakValueDictionary()
+ _all_instances = weakref.WeakValueDictionary() # type: ignore
def __new__(cls, target, on_delete=None, *arguments, **named):
"""Create new instance or return current instance.
diff --git a/src/blinker/_utilities.py b/src/blinker/_utilities.py
index 22beb81..9201046 100644
--- a/src/blinker/_utilities.py
+++ b/src/blinker/_utilities.py
@@ -1,12 +1,14 @@
import asyncio
import inspect
import sys
+import typing as t
from functools import partial
-from typing import Any
from weakref import ref
from blinker._saferef import BoundMethodWeakref
+IdentityType = t.Union[t.Tuple[int, int], str, int]
+
class _symbol:
def __init__(self, name):
@@ -38,7 +40,7 @@ class symbol:
"""
- symbols = {}
+ symbols = {} # type: ignore
def __new__(cls, name):
try:
@@ -47,11 +49,11 @@ class symbol:
return cls.symbols.setdefault(name, _symbol(name))
-def hashable_identity(obj):
+def hashable_identity(obj: object) -> IdentityType:
if hasattr(obj, "__func__"):
- return (id(obj.__func__), id(obj.__self__))
+ return (id(obj.__func__), id(obj.__self__)) # type: ignore
elif hasattr(obj, "im_func"):
- return (id(obj.im_func), id(obj.im_self))
+ return (id(obj.im_func), id(obj.im_self)) # type: ignore
elif isinstance(obj, (int, str)):
return obj
else:
@@ -64,8 +66,13 @@ WeakTypes = (ref, BoundMethodWeakref)
class annotatable_weakref(ref):
"""A weakref.ref that supports custom instance attributes."""
+ receiver_id: t.Optional[IdentityType]
+ sender_id: t.Optional[IdentityType]
+
-def reference(object, callback=None, **annotations):
+def reference( # type: ignore
+ object, callback=None, **annotations
+) -> annotatable_weakref:
"""Return an annotated weak ref."""
if callable(object):
weak = callable_reference(object, callback)
@@ -73,7 +80,7 @@ def reference(object, callback=None, **annotations):
weak = annotatable_weakref(object, callback)
for key, value in annotations.items():
setattr(weak, key, value)
- return weak
+ return weak # type: ignore
def callable_reference(object, callback=None):
@@ -100,7 +107,7 @@ class lazy_property:
return value
-def is_coroutine_function(func: Any) -> bool:
+def is_coroutine_function(func: t.Any) -> bool:
# Python < 3.8 does not correctly determine partially wrapped
# coroutine functions are coroutine functions, hence the need for
# this to exist. Code taken from CPython.
@@ -111,7 +118,7 @@ def is_coroutine_function(func: Any) -> bool:
# such that it isn't determined as a coroutine function
# without an explicit check.
try:
- from unittest.mock import AsyncMock
+ from unittest.mock import AsyncMock # type: ignore
if isinstance(func, AsyncMock):
return True
@@ -128,5 +135,5 @@ def is_coroutine_function(func: Any) -> bool:
result = bool(func.__code__.co_flags & inspect.CO_COROUTINE)
return (
result
- or getattr(func, "_is_coroutine", None) is asyncio.coroutines._is_coroutine
+ or getattr(func, "_is_coroutine", None) is asyncio.coroutines._is_coroutine # type: ignore # noqa: B950
)
diff --git a/src/blinker/base.py b/src/blinker/base.py
index 8d41721..997fef5 100644
--- a/src/blinker/base.py
+++ b/src/blinker/base.py
@@ -7,12 +7,17 @@ each manages its own receivers and message emission.
The :func:`signal` function provides singleton behavior for named signals.
"""
+import typing as t
from collections import defaultdict
from contextlib import contextmanager
from warnings import warn
from weakref import WeakValueDictionary
+import typing_extensions as te
+
+from blinker._utilities import annotatable_weakref
from blinker._utilities import hashable_identity
+from blinker._utilities import IdentityType
from blinker._utilities import is_coroutine_function
from blinker._utilities import lazy_property
from blinker._utilities import reference
@@ -24,6 +29,14 @@ ANY = symbol("ANY")
ANY.__doc__ = 'Token for "any sender".'
ANY_ID = 0
+T_callable = t.TypeVar("T_callable", bound=t.Callable)
+
+T = t.TypeVar("T")
+P = te.ParamSpec("P")
+
+AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
+SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
+
class Signal:
"""A notification emitter."""
@@ -33,7 +46,7 @@ class Signal:
ANY = ANY
@lazy_property
- def receiver_connected(self):
+ def receiver_connected(self) -> "Signal":
"""Emitted after each :meth:`connect`.
The signal sender is the signal instance, and the :meth:`connect`
@@ -45,7 +58,7 @@ class Signal:
return Signal(doc="Emitted after a receiver connects.")
@lazy_property
- def receiver_disconnected(self):
+ def receiver_disconnected(self) -> "Signal":
"""Emitted after :meth:`disconnect`.
The sender is the signal instance, and the :meth:`disconnect` arguments
@@ -68,7 +81,7 @@ class Signal:
"""
return Signal(doc="Emitted after a receiver disconnects.")
- def __init__(self, doc=None):
+ def __init__(self, doc: t.Optional[str] = None) -> None:
"""
:param doc: optional. If provided, will be assigned to the signal's
__doc__ attribute.
@@ -82,13 +95,17 @@ class Signal:
#: internal :class:`Signal` implementation, however the boolean value
#: of the mapping is useful as an extremely efficient check to see if
#: any receivers are connected to the signal.
- self.receivers = {}
+ self.receivers: t.Dict[
+ IdentityType, t.Union[t.Callable, annotatable_weakref]
+ ] = {}
self.is_muted = False
- self._by_receiver = defaultdict(set)
- self._by_sender = defaultdict(set)
- self._weak_senders = {}
+ self._by_receiver: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set)
+ self._by_sender: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set)
+ self._weak_senders: t.Dict[IdentityType, annotatable_weakref] = {}
- def connect(self, receiver, sender=ANY, weak=True):
+ def connect(
+ self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True
+ ) -> T_callable:
"""Connect *receiver* to signal events sent by *sender*.
:param receiver: A callable. Will be invoked by :meth:`send` with
@@ -108,11 +125,13 @@ class Signal:
"""
receiver_id = hashable_identity(receiver)
+ receiver_ref: t.Union[annotatable_weakref, T_callable]
if weak:
receiver_ref = reference(receiver, self._cleanup_receiver)
receiver_ref.receiver_id = receiver_id
else:
receiver_ref = receiver
+ sender_id: IdentityType
if sender is ANY:
sender_id = ANY_ID
else:
@@ -153,7 +172,9 @@ class Signal:
raise e
return receiver
- def connect_via(self, sender, weak=False):
+ def connect_via(
+ self, sender: t.Any, weak: bool = False
+ ) -> t.Callable[[T_callable], T_callable]:
"""Connect the decorated function as a receiver for *sender*.
:param sender: Any object or :obj:`ANY`. The decorated function
@@ -175,14 +196,16 @@ class Signal:
"""
- def decorator(fn):
+ def decorator(fn: T_callable) -> T_callable:
self.connect(fn, sender, weak)
return fn
return decorator
@contextmanager
- def connected_to(self, receiver, sender=ANY):
+ def connected_to(
+ self, receiver: t.Callable, sender: t.Any = ANY
+ ) -> t.Generator[None, None, None]:
"""Execute a block with the signal temporarily connected to *receiver*.
:param receiver: a receiver callable
@@ -212,7 +235,7 @@ class Signal:
self.disconnect(receiver)
@contextmanager
- def muted(self):
+ def muted(self) -> t.Generator[None, None, None]:
"""Context manager for temporarily disabling signal.
Useful for test purposes.
"""
@@ -224,7 +247,9 @@ class Signal:
finally:
self.is_muted = False
- def temporarily_connected_to(self, receiver, sender=ANY):
+ def temporarily_connected_to(
+ self, receiver: t.Callable, sender: t.Any = ANY
+ ) -> t.ContextManager[None]:
"""An alias for :meth:`connected_to`.
:param receiver: a receiver callable
@@ -243,7 +268,12 @@ class Signal:
)
return self.connected_to(receiver, sender)
- def send(self, *sender, _async_wrapper=None, **kwargs):
+ def send(
+ self,
+ *sender: t.Any,
+ _async_wrapper: t.Optional[AsyncWrapperType] = None,
+ **kwargs: t.Any,
+ ) -> t.List[t.Tuple[t.Callable, t.Any]]:
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -266,10 +296,15 @@ class Signal:
if _async_wrapper is None:
raise RuntimeError("Cannot send to a coroutine function")
receiver = _async_wrapper(receiver)
- results.append((receiver, receiver(sender, **kwargs)))
+ results.append((receiver, receiver(sender, **kwargs))) # type: ignore
return results
- async def send_async(self, *sender, _sync_wrapper=None, **kwargs):
+ async def send_async(
+ self,
+ *sender: t.Any,
+ _sync_wrapper: t.Optional[SyncWrapperType] = None,
+ **kwargs: t.Any,
+ ) -> t.List[t.Tuple[t.Callable, t.Any]]:
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -291,8 +326,8 @@ class Signal:
if not is_coroutine_function(receiver):
if _sync_wrapper is None:
raise RuntimeError("Cannot send to a non-coroutine function")
- receiver = _sync_wrapper(receiver)
- results.append((receiver, await receiver(sender, **kwargs)))
+ receiver = _sync_wrapper(receiver) # type: ignore
+ results.append((receiver, await receiver(sender, **kwargs))) # type: ignore
return results
def _extract_sender(self, sender):
@@ -318,7 +353,7 @@ class Signal:
sender = sender[0]
return sender
- def has_receivers_for(self, sender):
+ def has_receivers_for(self, sender: t.Any) -> bool:
"""True if there is probably a receiver for *sender*.
Performs an optimistic check only. Does not guarantee that all
@@ -334,7 +369,9 @@ class Signal:
return False
return hashable_identity(sender) in self._by_sender
- def receivers_for(self, sender):
+ def receivers_for(
+ self, sender: t.Any
+ ) -> t.Generator[t.Union[t.Callable, annotatable_weakref], None, None]:
"""Iterate all live receivers listening for *sender*."""
# TODO: test receivers_for(ANY)
if self.receivers:
@@ -353,9 +390,10 @@ class Signal:
self._disconnect(receiver_id, ANY_ID)
continue
receiver = strong
+ receiver = t.cast(t.Union[t.Callable, annotatable_weakref], receiver)
yield receiver
- def disconnect(self, receiver, sender=ANY):
+ def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None:
"""Disconnect *receiver* from this signal's events.
:param receiver: a previously :meth:`connected<connect>` callable
@@ -364,6 +402,7 @@ class Signal:
to disconnect from all senders. Defaults to ``ANY``.
"""
+ sender_id: IdentityType
if sender is ANY:
sender_id = ANY_ID
else:
@@ -377,7 +416,7 @@ class Signal:
):
self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
- def _disconnect(self, receiver_id, sender_id):
+ def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None:
if sender_id == ANY_ID:
if self._by_receiver.pop(receiver_id, False):
for bucket in self._by_sender.values():
@@ -387,19 +426,19 @@ class Signal:
self._by_sender[sender_id].discard(receiver_id)
self._by_receiver[receiver_id].discard(sender_id)
- def _cleanup_receiver(self, receiver_ref):
+ def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None:
"""Disconnect a receiver from all senders."""
- self._disconnect(receiver_ref.receiver_id, ANY_ID)
+ self._disconnect(t.cast(IdentityType, receiver_ref.receiver_id), ANY_ID)
- def _cleanup_sender(self, sender_ref):
+ def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None:
"""Disconnect all receivers from a sender."""
- sender_id = sender_ref.sender_id
+ sender_id = t.cast(IdentityType, sender_ref.sender_id)
assert sender_id != ANY_ID
self._weak_senders.pop(sender_id, None)
for receiver_id in self._by_sender.pop(sender_id, ()):
self._by_receiver[receiver_id].discard(sender_id)
- def _cleanup_bookkeeping(self):
+ def _cleanup_bookkeeping(self) -> None:
"""Prune unused sender/receiver bookkeeping. Not threadsafe.
Connecting & disconnecting leave behind a small amount of bookkeeping
@@ -425,7 +464,7 @@ class Signal:
if not bucket:
mapping.pop(_id, None)
- def _clear_state(self):
+ def _clear_state(self) -> None:
"""Throw away all signal state. Useful for unit tests."""
self._weak_senders.clear()
self.receivers.clear()
@@ -456,13 +495,13 @@ call signature. This global signal is planned to be removed in 1.6.
class NamedSignal(Signal):
"""A named generic notification emitter."""
- def __init__(self, name, doc=None):
+ def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
Signal.__init__(self, doc)
#: The name of this signal.
self.name = name
- def __repr__(self):
+ def __repr__(self) -> str:
base = Signal.__repr__(self)
return f"{base[:-1]}; {self.name!r}>"
@@ -470,16 +509,16 @@ class NamedSignal(Signal):
class Namespace(dict):
"""A mapping of signal names to signals."""
- def signal(self, name, doc=None):
+ def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal:
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
- return self[name]
+ return self[name] # type: ignore
except KeyError:
- return self.setdefault(name, NamedSignal(name, doc))
+ return self.setdefault(name, NamedSignal(name, doc)) # type: ignore
class WeakNamespace(WeakValueDictionary):
@@ -493,16 +532,16 @@ class WeakNamespace(WeakValueDictionary):
"""
- def signal(self, name, doc=None):
+ def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal:
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
- return self[name]
+ return self[name] # type: ignore
except KeyError:
- return self.setdefault(name, NamedSignal(name, doc))
+ return self.setdefault(name, NamedSignal(name, doc)) # type: ignore
signal = Namespace().signal
diff --git a/src/blinker/py.typed b/src/blinker/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/blinker/py.typed
diff --git a/tests/test_signals.py b/tests/test_signals.py
index 1fca3e5..f06ebc0 100644
--- a/tests/test_signals.py
+++ b/tests/test_signals.py
@@ -342,7 +342,6 @@ async def test_async_receiver():
sentinel.append(sender)
def wrapper(func):
-
async def inner(*args, **kwargs):
func(*args, **kwargs)
diff --git a/tox.ini b/tox.ini
index 5149f37..75f9933 100644
--- a/tox.ini
+++ b/tox.ini
@@ -4,6 +4,7 @@ envlist =
pypy3{9,8,7}
docs
lint
+ typing
skip_missing_interpreters = true
[testenv]
@@ -20,3 +21,7 @@ deps =
pre-commit>=2.20
commands =
pre-commit run --all-files --show-diff-on-failure {tty:--color=always} {posargs}
+
+[testenv:typing]
+deps = -r requirements/typing.txt
+commands = mypy