summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexey Stepanov <penguinolog@users.noreply.github.com>2023-04-18 17:35:19 +0200
committerGitHub <noreply@github.com>2023-04-18 17:35:19 +0200
commitd1710f0983e86dc5ec06efec6e94f3b6d204bbb3 (patch)
treed4c6681b6d47ea96d9b71155b9b0976e99730584
parentdb10343d8aa937770907a3dcc4456cbeefe16549 (diff)
downloadurwid-d1710f0983e86dc5ec06efec6e94f3b6d204bbb3.tar.gz
[BREAKING CHANGE] Refactoring: Split event loop in several modules (#537)
* [BREAKING CHANGE] Refactoring: Split event loop in several modules * `urwid.main_loop` is split into multiple modules which is easier to maintain * `urwid.compat` is not used anymore and removed * `TornadoEventLoop`, `GLibEventLoop`, `TwistedEventLoop` and `TrioEventLoop` accessible ONLY if required dependencies installed (like: Tornado installed -> `TornadoEventLoop` is accessible for import) * `TornadoEventLoop` use the same idle logic as `AsyncioLoop`: tornado.ioloop.IOLoop is asyncio based. * Trio < 0.15 is not supported. Version 0.15 was released almost 3 years ago. * Tornado < 5.0 is not supported. Tornado 5.0 was released 5 years ago. * Remove useless shebang * `EventLoop` should be real abstract * add new module docstrings * Fix docstrings * remove unneeded import --------- Co-authored-by: Aleksei Stepanov <alekseis@nvidia.com>
-rw-r--r--docs/manual/mainloop.rst4
-rw-r--r--test_requirements.txt2
-rw-r--r--urwid/__init__.py32
-rw-r--r--urwid/compat.py36
-rw-r--r--urwid/event_loop/__init__.py33
-rw-r--r--urwid/event_loop/abstract_loop.py143
-rw-r--r--urwid/event_loop/asyncio_loop.py148
-rw-r--r--urwid/event_loop/glib_loop.py262
-rwxr-xr-xurwid/event_loop/main_loop.py686
-rw-r--r--urwid/event_loop/select_loop.py190
-rw-r--r--urwid/event_loop/tornado_loop.py153
-rw-r--r--urwid/event_loop/trio_loop.py (renamed from urwid/_async_kw_event_loop.py)112
-rw-r--r--urwid/event_loop/twisted_loop.py236
-rwxr-xr-xurwid/html_fragment.py2
-rwxr-xr-xurwid/main_loop.py1589
-rw-r--r--urwid/tests/test_doctests.py4
-rw-r--r--urwid/tests/test_event_loops.py2
-rw-r--r--urwid/util.py2
18 files changed, 1935 insertions, 1701 deletions
diff --git a/docs/manual/mainloop.rst b/docs/manual/mainloop.rst
index ed19e2b..1d7b4e4 100644
--- a/docs/manual/mainloop.rst
+++ b/docs/manual/mainloop.rst
@@ -143,9 +143,7 @@ This event loop integrates with Tornado.
``AsyncioEventLoop``
--------------------
-This event loop integrates with the asyncio module in Python 3.4,
-the asyncio package available for Python 3.3 or the trollius
-package available for Python 2.
+This event loop integrates with the asyncio module in Python.
::
diff --git a/test_requirements.txt b/test_requirements.txt
index d6c52e6..5acdfc8 100644
--- a/test_requirements.txt
+++ b/test_requirements.txt
@@ -1,4 +1,4 @@
-tornado<5.0.0
+tornado>=5
coverage[toml]
twisted
trio \ No newline at end of file
diff --git a/urwid/__init__.py b/urwid/__init__.py
index e28b9a8..0960e58 100644
--- a/urwid/__init__.py
+++ b/urwid/__init__.py
@@ -98,7 +98,7 @@ from urwid.graphics import (
scale_bar_values,
)
from urwid.listbox import ListBox, ListBoxError, ListWalker, ListWalkerError, SimpleFocusListWalker, SimpleListWalker
-from urwid.main_loop import AsyncioEventLoop, ExitMainLoop, GLibEventLoop, MainLoop, SelectEventLoop, TornadoEventLoop
+from urwid.event_loop import EventLoop, AsyncioEventLoop, ExitMainLoop, MainLoop, SelectEventLoop
from urwid.monitored_list import MonitoredFocusList, MonitoredList
from urwid.signals import MetaSignals, Signals, connect_signal, disconnect_signal, emit_signal, register_signal
from urwid.version import VERSION, __version__
@@ -141,14 +141,6 @@ from urwid.widget import (
)
from urwid.wimp import Button, CheckBox, CheckBoxError, PopUpLauncher, PopUpTarget, RadioButton, SelectableIcon
-try:
- from urwid.main_loop import TwistedEventLoop
-except ImportError:
- pass
-try:
- from urwid.main_loop import TrioEventLoop
-except ImportError:
- pass
from urwid import raw_display
from urwid.display_common import (
BLACK,
@@ -197,3 +189,25 @@ from urwid.util import (
within_double_byte,
)
from urwid.vterm import TermCanvas, TermCharset, Terminal, TermModes, TermScroller
+
+# Optional event loops with external dependencies
+
+try:
+ from urwid.event_loop import TornadoEventLoop
+except ImportError:
+ pass
+
+try:
+ from urwid.event_loop import GLibEventLoop
+except ImportError:
+ pass
+
+try:
+ from urwid.event_loop import TwistedEventLoop
+except ImportError:
+ pass
+
+try:
+ from urwid.event_loop import TrioEventLoop
+except ImportError:
+ pass
diff --git a/urwid/compat.py b/urwid/compat.py
deleted file mode 100644
index 8bcbf05..0000000
--- a/urwid/compat.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/usr/bin/python
-#
-# Urwid python compatibility definitions
-# Copyright (C) 2011 Ian Ward
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# Urwid web site: https://urwid.org/
-
-
-def reraise(tp, value, tb=None):
- """
- Reraise an exception.
- Taken from "six" library (https://pythonhosted.org/six/).
- """
- try:
- if value is None:
- value = tp()
- if value.__traceback__ is not tb:
- raise value.with_traceback(tb)
- raise value
- finally:
- value = None
- tb = None
diff --git a/urwid/event_loop/__init__.py b/urwid/event_loop/__init__.py
new file mode 100644
index 0000000..b7bccfc
--- /dev/null
+++ b/urwid/event_loop/__init__.py
@@ -0,0 +1,33 @@
+"""Package with EventLoop implementations for urwid."""
+
+from __future__ import annotations
+
+from .abstract_loop import EventLoop, ExitMainLoop
+from .asyncio_loop import AsyncioEventLoop
+from .main_loop import MainLoop
+from .select_loop import SelectEventLoop
+
+try:
+ from .twisted_loop import TwistedEventLoop
+except ImportError:
+ pass
+
+try:
+ from .tornado_loop import TornadoEventLoop
+except ImportError:
+ pass
+
+try:
+ from .glib_loop import GLibEventLoop
+except ImportError:
+ pass
+
+try:
+ from .twisted_loop import TwistedEventLoop
+except ImportError:
+ pass
+
+try:
+ from .trio_loop import TrioEventLoop
+except ImportError:
+ pass
diff --git a/urwid/event_loop/abstract_loop.py b/urwid/event_loop/abstract_loop.py
new file mode 100644
index 0000000..7e5c0fb
--- /dev/null
+++ b/urwid/event_loop/abstract_loop.py
@@ -0,0 +1,143 @@
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+"""Abstract shared code for urwid EventLoop implementation."""
+
+from __future__ import annotations
+
+import abc
+import signal
+import typing
+from collections.abc import Callable
+
+if typing.TYPE_CHECKING:
+ from types import FrameType
+
+__all__ = ("ExitMainLoop", "EventLoop")
+
+
+class ExitMainLoop(Exception):
+ """
+ When this exception is raised within a main loop the main loop
+ will exit cleanly.
+ """
+ pass
+
+
+class EventLoop(abc.ABC):
+ """
+ Abstract class representing an event loop to be used by :class:`MainLoop`.
+ """
+
+ @abc.abstractmethod
+ def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]) -> typing.Any:
+ """
+ Call callback() a given time from now. No parameters are
+ passed to callback.
+
+ This method has no default implementation.
+
+ Returns a handle that may be passed to remove_alarm()
+
+ seconds -- floating point time to wait before calling callback
+ callback -- function to call from event loop
+ """
+
+ @abc.abstractmethod
+ def enter_idle(self, callback):
+ """
+ Add a callback for entering idle.
+
+ This method has no default implementation.
+
+ Returns a handle that may be passed to remove_idle()
+ """
+
+ @abc.abstractmethod
+ def remove_alarm(self, handle) -> bool:
+ """
+ Remove an alarm.
+
+ This method has no default implementation.
+
+ Returns True if the alarm exists, False otherwise
+ """
+
+ @abc.abstractmethod
+ def remove_enter_idle(self, handle) -> bool:
+ """
+ Remove an idle callback.
+
+ This method has no default implementation.
+
+ Returns True if the handle was removed.
+ """
+
+ @abc.abstractmethod
+ def remove_watch_file(self, handle) -> bool:
+ """
+ Remove an input file.
+
+ This method has no default implementation.
+
+ Returns True if the input file exists, False otherwise
+ """
+
+ @abc.abstractmethod
+ def run(self) -> None:
+ """
+ Start the event loop. Exit the loop when any callback raises
+ an exception. If ExitMainLoop is raised, exit cleanly.
+
+ This method has no default implementation.
+ """
+
+ @abc.abstractmethod
+ def watch_file(self, fd: int, callback: Callable[[], typing.Any]):
+ """
+ Call callback() when fd has some data to read. No parameters
+ are passed to callback.
+
+ This method has no default implementation.
+
+ Returns a handle that may be passed to remove_watch_file()
+
+ fd -- file descriptor to watch for input
+ callback -- function to call when input is available
+ """
+
+ def set_signal_handler(
+ self,
+ signum: int,
+ handler: Callable[[int, FrameType | None], typing.Any] | int | signal.Handlers,
+ ) -> Callable[[int, FrameType | None], typing.Any] | int | signal.Handlers | None:
+ """
+ Sets the signal handler for signal signum.
+
+ The default implementation of :meth:`set_signal_handler`
+ is simply a proxy function that calls :func:`signal.signal()`
+ and returns the resulting value.
+
+ signum -- signal number
+ handler -- function (taking signum as its single argument),
+ or `signal.SIG_IGN`, or `signal.SIG_DFL`
+ """
+ return signal.signal(signum, handler)
diff --git a/urwid/event_loop/asyncio_loop.py b/urwid/event_loop/asyncio_loop.py
new file mode 100644
index 0000000..e8f1522
--- /dev/null
+++ b/urwid/event_loop/asyncio_loop.py
@@ -0,0 +1,148 @@
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+"""Asyncio based urwid EventLoop implementation."""
+
+from __future__ import annotations
+
+import asyncio
+import typing
+from collections.abc import Callable
+
+from .abstract_loop import EventLoop, ExitMainLoop
+
+__all__ = ("AsyncioEventLoop",)
+
+
+class AsyncioEventLoop(EventLoop):
+ """
+ Event loop based on the standard library ``asyncio`` module.
+
+ """
+ _we_started_event_loop = False
+
+ _idle_emulation_delay = 1.0/30 # a short time (in seconds)
+
+ def __init__(self, *, loop: asyncio.AbstractEventLoop | None = None, **kwargs) -> None:
+ if loop:
+ self._loop: asyncio.AbstractEventLoop = loop
+ else:
+ self._loop = asyncio.get_event_loop()
+
+ self._exc: BaseException | None = None
+
+ def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]) -> asyncio.TimerHandle:
+ """
+ Call callback() a given time from now. No parameters are
+ passed to callback.
+
+ Returns a handle that may be passed to remove_alarm()
+
+ seconds -- time in seconds to wait before calling callback
+ callback -- function to call from event loop
+ """
+ return self._loop.call_later(seconds, callback)
+
+ def remove_alarm(self, handle) -> bool:
+ """
+ Remove an alarm.
+
+ Returns True if the alarm exists, False otherwise
+ """
+ cancelled = (
+ handle.cancelled()
+ if getattr(handle, 'cancelled', None)
+ else handle._cancelled
+ )
+ existed = not cancelled
+ handle.cancel()
+ return existed
+
+ def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int:
+ """
+ Call callback() when fd has some data to read. No parameters
+ are passed to callback.
+
+ Returns a handle that may be passed to remove_watch_file()
+
+ fd -- file descriptor to watch for input
+ callback -- function to call when input is available
+ """
+ self._loop.add_reader(fd, callback)
+ return fd
+
+ def remove_watch_file(self, handle: int) -> bool:
+ """
+ Remove an input file.
+
+ Returns True if the input file exists, False otherwise
+ """
+ return self._loop.remove_reader(handle)
+
+ def enter_idle(self, callback: Callable[[], typing.Any]) -> list[asyncio.TimerHandle]:
+ """
+ Add a callback for entering idle.
+
+ Returns a handle that may be passed to remove_idle()
+ """
+ # XXX there's no such thing as "idle" in most event loops; this fakes
+ # it the same way as Twisted, by scheduling the callback to be called
+ # repeatedly
+ mutable_handle: list[asyncio.TimerHandle] = []
+
+ def faux_idle_callback():
+ callback()
+ mutable_handle[0] = self._loop.call_later(self._idle_emulation_delay, faux_idle_callback)
+
+ mutable_handle.append(self._loop.call_later(self._idle_emulation_delay, faux_idle_callback))
+
+ return mutable_handle
+
+ def remove_enter_idle(self, handle) -> bool:
+ """
+ Remove an idle callback.
+
+ Returns True if the handle was removed.
+ """
+ # `handle` is just a list containing the current actual handle
+ return self.remove_alarm(handle[0])
+
+ def _exception_handler(self, loop: asyncio.AbstractEventLoop, context):
+ exc = context.get('exception')
+ if exc:
+ loop.stop()
+ if not isinstance(exc, ExitMainLoop):
+ # Store the exc_info so we can re-raise after the loop stops
+ self._exc = exc
+ else:
+ loop.default_exception_handler(context)
+
+ def run(self) -> None:
+ """
+ Start the event loop. Exit the loop when any callback raises
+ an exception. If ExitMainLoop is raised, exit cleanly.
+ """
+ self._loop.set_exception_handler(self._exception_handler)
+ self._loop.run_forever()
+ if self._exc:
+ exc = self._exc
+ self._exc = None
+ raise exc.with_traceback(exc.__traceback__)
diff --git a/urwid/event_loop/glib_loop.py b/urwid/event_loop/glib_loop.py
new file mode 100644
index 0000000..de9de91
--- /dev/null
+++ b/urwid/event_loop/glib_loop.py
@@ -0,0 +1,262 @@
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+"""GLib based urwid EventLoop implementation.
+
+PyGObject library is required.
+"""
+
+from __future__ import annotations
+
+import functools
+import signal
+import typing
+from collections.abc import Callable
+
+from gi.repository import GLib
+
+from .abstract_loop import EventLoop, ExitMainLoop
+
+if typing.TYPE_CHECKING:
+ from types import FrameType
+
+ from typing_extensions import Literal, ParamSpec
+ _Spec = ParamSpec("_Spec")
+ _T = typing.TypeVar("_T")
+
+__all__ = ("GLibEventLoop",)
+
+
+def _ignore_handler(_sig: int, _frame: FrameType | None = None) -> None:
+ return None
+
+
+class GLibEventLoop(EventLoop):
+ """
+ Event loop based on GLib.MainLoop
+ """
+
+ def __init__(self) -> None:
+ self._alarms: list[int] = []
+ self._watch_files: dict[int, int] = {}
+ self._idle_handle: int = 0
+ self._glib_idle_enabled = False # have we called glib.idle_add?
+ self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {}
+ self._loop = GLib.MainLoop()
+ self._exc: BaseException | None = None
+ self._enable_glib_idle()
+ self._signal_handlers: dict[int, int] = {}
+
+ def alarm(
+ self,
+ seconds: float | int,
+ callback: Callable[[], typing.Any],
+ ) -> tuple[int, Callable[[], typing.Any]]:
+ """
+ Call callback() a given time from now. No parameters are
+ passed to callback.
+
+ Returns a handle that may be passed to remove_alarm()
+
+ seconds -- floating point time to wait before calling callback
+ callback -- function to call from event loop
+ """
+ @self.handle_exit
+ def ret_false() -> Literal[False]:
+ callback()
+ self._enable_glib_idle()
+ return False
+
+ fd = GLib.timeout_add(int(seconds*1000), ret_false)
+ self._alarms.append(fd)
+ return (fd, callback)
+
+ def set_signal_handler(
+ self,
+ signum: int,
+ handler: Callable[[int, FrameType | None], typing.Any] | int | signal.Handlers,
+ ) -> None:
+ """
+ Sets the signal handler for signal signum.
+
+ .. WARNING::
+ Because this method uses the `GLib`-specific `unix_signal_add`
+ function, its behaviour is different than `signal.signal().`
+
+ If `signum` is not `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGUSR1`,
+ `SIGUSR2` or `SIGWINCH`, this method performs no actions and
+ immediately returns None.
+
+ Returns None in all cases (unlike :func:`signal.signal()`).
+ ..
+
+ signum -- signal number
+ handler -- function (taking signum as its single argument),
+ or `signal.SIG_IGN`, or `signal.SIG_DFL`
+ """
+ glib_signals = [
+ signal.SIGHUP,
+ signal.SIGINT,
+ signal.SIGTERM,
+ signal.SIGUSR1,
+ signal.SIGUSR2,
+ ]
+
+ # GLib supports SIGWINCH as of version 2.54.
+ if not GLib.check_version(2, 54, 0):
+ glib_signals.append(signal.SIGWINCH)
+
+ if signum not in glib_signals:
+ # The GLib event loop supports only the signals listed above
+ return
+
+ if signum in self._signal_handlers:
+ GLib.source_remove(self._signal_handlers.pop(signum))
+
+ if handler == signal.Handlers.SIG_IGN:
+ handler = _ignore_handler
+ elif handler == signal.Handlers.SIG_DFL:
+ return
+
+ def final_handler(signal_number: int):
+ # MyPy False-negative: signal.Handlers casted
+ handler(signal_number, None) # type: ignore[operator]
+ return GLib.SOURCE_CONTINUE
+
+ source = GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signum, final_handler, signum)
+ self._signal_handlers[signum] = source
+
+ def remove_alarm(self, handle) -> bool:
+ """
+ Remove an alarm.
+
+ Returns True if the alarm exists, False otherwise
+ """
+ try:
+ self._alarms.remove(handle[0])
+ GLib.source_remove(handle[0])
+ return True
+ except ValueError:
+ return False
+
+ def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int:
+ """
+ Call callback() when fd has some data to read. No parameters
+ are passed to callback.
+
+ Returns a handle that may be passed to remove_watch_file()
+
+ fd -- file descriptor to watch for input
+ callback -- function to call when input is available
+ """
+ @self.handle_exit
+ def io_callback(source, cb_condition) -> Literal[True]:
+ callback()
+ self._enable_glib_idle()
+ return True
+
+ self._watch_files[fd] = GLib.io_add_watch(fd, GLib.IO_IN, io_callback)
+ return fd
+
+ def remove_watch_file(self, handle: int) -> bool:
+ """
+ Remove an input file.
+
+ Returns True if the input file exists, False otherwise
+ """
+ if handle in self._watch_files:
+ GLib.source_remove(self._watch_files[handle])
+ del self._watch_files[handle]
+ return True
+ return False
+
+ def enter_idle(self, callback: Callable[[], typing.Any]) -> int:
+ """
+ Add a callback for entering idle.
+
+ Returns a handle that may be passed to remove_enter_idle()
+ """
+ self._idle_handle += 1
+ self._idle_callbacks[self._idle_handle] = callback
+ return self._idle_handle
+
+ def _enable_glib_idle(self) -> None:
+ if self._glib_idle_enabled:
+ return
+ GLib.idle_add(self._glib_idle_callback)
+ self._glib_idle_enabled = True
+
+ def _glib_idle_callback(self):
+ for callback in self._idle_callbacks.values():
+ callback()
+ self._glib_idle_enabled = False
+ return False # ask glib not to call again (or we would be called
+
+ def remove_enter_idle(self, handle) -> bool:
+ """
+ Remove an idle callback.
+
+ Returns True if the handle was removed.
+ """
+ try:
+ del self._idle_callbacks[handle]
+ except KeyError:
+ return False
+ return True
+
+ def run(self) -> None:
+ """
+ Start the event loop. Exit the loop when any callback raises
+ an exception. If ExitMainLoop is raised, exit cleanly.
+ """
+ try:
+ self._loop.run()
+ finally:
+ if self._loop.is_running():
+ self._loop.quit()
+ if self._exc:
+ # An exception caused us to exit, raise it now
+ exc = self._exc
+ self._exc = None
+ raise exc.with_traceback(exc.__traceback__)
+
+ def handle_exit(self, f: Callable[_Spec, _T]) -> Callable[_Spec, _T | Literal[False]]:
+ """
+ Decorator that cleanly exits the :class:`GLibEventLoop` if
+ :exc:`ExitMainLoop` is thrown inside of the wrapped function. Store the
+ exception info if some other exception occurs, it will be reraised after
+ the loop quits.
+
+ *f* -- function to be wrapped
+ """
+
+ @functools.wraps(f)
+ def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | Literal[False]:
+ try:
+ return f(*args, **kwargs)
+ except ExitMainLoop:
+ self._loop.quit()
+ except BaseException as exc:
+ self._exc = exc
+ if self._loop.is_running():
+ self._loop.quit()
+ return False
+ return wrapper
diff --git a/urwid/event_loop/main_loop.py b/urwid/event_loop/main_loop.py
new file mode 100755
index 0000000..cd6a270
--- /dev/null
+++ b/urwid/event_loop/main_loop.py
@@ -0,0 +1,686 @@
+#!/usr/bin/python
+#
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+
+from __future__ import annotations
+
+import heapq
+import os
+import time
+import typing
+import warnings
+from collections.abc import Callable, Iterable
+
+from urwid import raw_display, signals
+from urwid.command_map import REDRAW_SCREEN, command_map
+from urwid.display_common import INPUT_DESCRIPTORS_CHANGED
+from urwid.util import StoppingContext, is_mouse_event
+from urwid.wimp import PopUpTarget
+
+from .abstract_loop import ExitMainLoop
+from .select_loop import SelectEventLoop
+
+if typing.TYPE_CHECKING:
+ from urwid.display_common import BaseScreen
+ from urwid.widget import Widget
+
+
+try:
+ import fcntl
+except ImportError:
+ pass # windows
+
+PIPE_BUFFER_READ_SIZE = 4096 # can expect this much on Linux, so try for that
+
+__all__ = ("CantUseExternalLoop", "MainLoop")
+
+
+class CantUseExternalLoop(Exception):
+ pass
+
+
+class MainLoop:
+ """
+ This is the standard main loop implementation for a single interactive
+ session.
+
+ :param widget: the topmost widget used for painting the screen, stored as
+ :attr:`widget` and may be modified. Must be a box widget.
+ :type widget: widget instance
+
+ :param palette: initial palette for screen
+ :type palette: iterable of palette entries
+
+ :param screen: screen to use, default is a new :class:`raw_display.Screen`
+ instance; stored as :attr:`screen`
+ :type screen: display module screen instance
+
+ :param handle_mouse: ``True`` to ask :attr:`.screen` to process mouse events
+ :type handle_mouse: bool
+
+ :param input_filter: a function to filter input before sending it to
+ :attr:`.widget`, called from :meth:`.input_filter`
+ :type input_filter: callable
+
+ :param unhandled_input: a function called when input is not handled by
+ :attr:`.widget`, called from :meth:`.unhandled_input`
+ :type unhandled_input: callable
+
+ :param event_loop: if :attr:`.screen` supports external an event loop it may be
+ given here, default is a new :class:`SelectEventLoop` instance;
+ stored as :attr:`.event_loop`
+ :type event_loop: event loop instance
+
+ :param pop_ups: `True` to wrap :attr:`.widget` with a :class:`PopUpTarget`
+ instance to allow any widget to open a pop-up anywhere on the screen
+ :type pop_ups: boolean
+
+
+ .. attribute:: screen
+
+ The screen object this main loop uses for screen updates and reading input
+
+ .. attribute:: event_loop
+
+ The event loop object this main loop uses for waiting on alarms and IO
+ """
+
+ def __init__(
+ self,
+ widget: Widget,
+ palette=(),
+ screen: BaseScreen | None = None,
+ handle_mouse: bool = True,
+ input_filter: Callable[[list[str], list[int]], list[str]] | None = None,
+ unhandled_input: Callable[[str | tuple[str, int, int, int]], bool] | None = None,
+ event_loop=None,
+ pop_ups: bool = False,
+ ):
+ self._widget = widget
+ self.handle_mouse = handle_mouse
+ self.pop_ups = pop_ups # triggers property setting side-effect
+
+ if not screen:
+ screen = raw_display.Screen()
+
+ if palette:
+ screen.register_palette(palette)
+
+ self.screen = screen
+ self.screen_size = None
+
+ self._unhandled_input = unhandled_input
+ self._input_filter = input_filter
+
+ if not hasattr(screen, 'hook_event_loop') and event_loop is not None:
+ raise NotImplementedError(f"screen object passed {screen!r} does not support external event loops")
+ if event_loop is None:
+ event_loop = SelectEventLoop()
+ self.event_loop = event_loop
+
+ if hasattr(self.screen, 'signal_handler_setter'):
+ # Tell the screen what function it must use to set
+ # signal handlers
+ self.screen.signal_handler_setter = self.event_loop.set_signal_handler
+
+ self._watch_pipes: dict[int, tuple[Callable[[], typing.Any], int]] = {}
+
+ @property
+ def widget(self) -> Widget:
+ """
+ Property for the topmost widget used to draw the screen.
+ This must be a box widget.
+ """
+ return self._widget
+
+ @widget.setter
+ def widget(self, widget: Widget) -> None:
+ self._widget = widget
+ if self.pop_ups:
+ self._topmost_widget.original_widget = self._widget
+ else:
+ self._topmost_widget = self._widget
+
+ def _set_widget(self, widget: Widget) -> None:
+ warnings.warn(
+ f"method `{self.__class__.__name__}._set_widget` is deprecated, "
+ f"please use `{self.__class__.__name__}.widget` property",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self.widget = widget
+
+ @property
+ def pop_ups(self):
+ return self._pop_ups
+
+ @pop_ups.setter
+ def pop_ups(self, pop_ups) -> None:
+ self._pop_ups = pop_ups
+ if pop_ups:
+ self._topmost_widget = PopUpTarget(self._widget)
+ else:
+ self._topmost_widget = self._widget
+
+ def _set_pop_ups(self, pop_ups) -> None:
+ warnings.warn(
+ f"method `{self.__class__.__name__}._set_pop_ups` is deprecated, "
+ f"please use `{self.__class__.__name__}.pop_ups` property",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self.pop_ups = pop_ups
+
+ def set_alarm_in(self, sec, callback, user_data=None):
+ """
+ Schedule an alarm in *sec* seconds that will call *callback* from the
+ within the :meth:`run` method.
+
+ :param sec: seconds until alarm
+ :type sec: float
+ :param callback: function to call with two parameters: this main loop
+ object and *user_data*
+ :type callback: callable
+ """
+ def cb():
+ callback(self, user_data)
+ return self.event_loop.alarm(sec, cb)
+
+ def set_alarm_at(self, tm, callback, user_data=None):
+ """
+ Schedule an alarm at *tm* time that will call *callback* from the
+ within the :meth:`run` function. Returns a handle that may be passed to
+ :meth:`remove_alarm`.
+
+ :param tm: time to call callback e.g. ``time.time() + 5``
+ :type tm: float
+ :param callback: function to call with two parameters: this main loop
+ object and *user_data*
+ :type callback: callable
+ """
+ def cb():
+ callback(self, user_data)
+ return self.event_loop.alarm(tm - time.time(), cb)
+
+ def remove_alarm(self, handle):
+ """
+ Remove an alarm. Return ``True`` if *handle* was found, ``False``
+ otherwise.
+ """
+ return self.event_loop.remove_alarm(handle)
+
+ def watch_pipe(self, callback):
+ """
+ Create a pipe for use by a subprocess or thread to trigger a callback
+ in the process/thread running the main loop.
+
+ :param callback: function taking one parameter to call from within
+ the process/thread running the main loop
+ :type callback: callable
+
+ This method returns a file descriptor attached to the write end of a
+ pipe. The read end of the pipe is added to the list of files
+ :attr:`event_loop` is watching. When data is written to the pipe the
+ callback function will be called and passed a single value containing
+ data read from the pipe.
+
+ This method may be used any time you want to update widgets from
+ another thread or subprocess.
+
+ Data may be written to the returned file descriptor with
+ ``os.write(fd, data)``. Ensure that data is less than 512 bytes (or 4K
+ on Linux) so that the callback will be triggered just once with the
+ complete value of data passed in.
+
+ If the callback returns ``False`` then the watch will be removed from
+ :attr:`event_loop` and the read end of the pipe will be closed. You
+ are responsible for closing the write end of the pipe with
+ ``os.close(fd)``.
+ """
+ pipe_rd, pipe_wr = os.pipe()
+ fcntl.fcntl(pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK)
+ watch_handle = None
+
+ def cb() -> None:
+ data = os.read(pipe_rd, PIPE_BUFFER_READ_SIZE)
+ rval = callback(data)
+ if rval is False:
+ self.event_loop.remove_watch_file(watch_handle)
+ os.close(pipe_rd)
+
+ watch_handle = self.event_loop.watch_file(pipe_rd, cb)
+ self._watch_pipes[pipe_wr] = (watch_handle, pipe_rd)
+ return pipe_wr
+
+ def remove_watch_pipe(self, write_fd):
+ """
+ Close the read end of the pipe and remove the watch created by
+ :meth:`watch_pipe`. You are responsible for closing the write end of
+ the pipe.
+
+ Returns ``True`` if the watch pipe exists, ``False`` otherwise
+ """
+ try:
+ watch_handle, pipe_rd = self._watch_pipes.pop(write_fd)
+ except KeyError:
+ return False
+
+ if not self.event_loop.remove_watch_file(watch_handle):
+ return False
+ os.close(pipe_rd)
+ return True
+
+ def watch_file(self, fd, callback):
+ """
+ Call *callback* when *fd* has some data to read. No parameters are
+ passed to callback.
+
+ Returns a handle that may be passed to :meth:`remove_watch_file`.
+ """
+ return self.event_loop.watch_file(fd, callback)
+
+ def remove_watch_file(self, handle):
+ """
+ Remove a watch file. Returns ``True`` if the watch file
+ exists, ``False`` otherwise.
+ """
+ return self.event_loop.remove_watch_file(handle)
+
+
+ def run(self):
+ """
+ Start the main loop handling input events and updating the screen. The
+ loop will continue until an :exc:`ExitMainLoop` exception is raised.
+
+ If you would prefer to manage the event loop yourself, don't use this
+ method. Instead, call :meth:`start` before starting the event loop,
+ and :meth:`stop` once it's finished.
+ """
+ try:
+ self._run()
+ except ExitMainLoop:
+ pass
+
+ def _test_run(self):
+ """
+ >>> w = _refl("widget") # _refl prints out function calls
+ >>> w.render_rval = "fake canvas" # *_rval is used for return values
+ >>> scr = _refl("screen")
+ >>> scr.get_input_descriptors_rval = [42]
+ >>> scr.get_cols_rows_rval = (20, 10)
+ >>> scr.started = True
+ >>> scr._urwid_signals = {}
+ >>> evl = _refl("event_loop")
+ >>> evl.enter_idle_rval = 1
+ >>> evl.watch_file_rval = 2
+ >>> ml = MainLoop(w, [], scr, event_loop=evl)
+ >>> ml.run() # doctest:+ELLIPSIS
+ screen.start()
+ screen.set_mouse_tracking()
+ screen.unhook_event_loop(...)
+ screen.hook_event_loop(...)
+ event_loop.enter_idle(<bound method MainLoop.entering_idle...>)
+ event_loop.run()
+ event_loop.remove_enter_idle(1)
+ screen.unhook_event_loop(...)
+ screen.stop()
+ >>> ml.draw_screen() # doctest:+ELLIPSIS
+ screen.get_cols_rows()
+ widget.render((20, 10), focus=True)
+ screen.draw_screen((20, 10), 'fake canvas')
+ """
+
+ def start(self):
+ """
+ Sets up the main loop, hooking into the event loop where necessary.
+ Starts the :attr:`screen` if it hasn't already been started.
+
+ If you want to control starting and stopping the event loop yourself,
+ you should call this method before starting, and call `stop` once the
+ loop has finished. You may also use this method as a context manager,
+ which will stop the loop automatically at the end of the block:
+
+ with main_loop.start():
+ ...
+
+ Note that some event loop implementations don't handle exceptions
+ specially if you manage the event loop yourself. In particular, the
+ Twisted and asyncio loops won't stop automatically when
+ :exc:`ExitMainLoop` (or anything else) is raised.
+ """
+
+ self.screen.start()
+
+ if self.handle_mouse:
+ self.screen.set_mouse_tracking()
+
+ if not hasattr(self.screen, 'hook_event_loop'):
+ raise CantUseExternalLoop("Screen {0!r} doesn't support external event loops")
+
+ try:
+ signals.connect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED, self._reset_input_descriptors)
+ except NameError:
+ pass
+ # watch our input descriptors
+ self._reset_input_descriptors()
+ self.idle_handle = self.event_loop.enter_idle(self.entering_idle)
+
+ # the screen is redrawn automatically after input and alarms,
+ # however, there can be none of those at the start,
+ # so draw the initial screen here unconditionally
+ self.event_loop.alarm(0, self.entering_idle)
+
+ return StoppingContext(self)
+
+ def stop(self):
+ """
+ Cleans up any hooks added to the event loop. Only call this if you're
+ managing the event loop yourself, after the loop stops.
+ """
+
+ self.event_loop.remove_enter_idle(self.idle_handle)
+ del self.idle_handle
+ signals.disconnect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED,
+ self._reset_input_descriptors)
+ self.screen.unhook_event_loop(self.event_loop)
+
+ self.screen.stop()
+
+ def _reset_input_descriptors(self):
+ self.screen.unhook_event_loop(self.event_loop)
+ self.screen.hook_event_loop(self.event_loop, self._update)
+
+ def _run(self):
+ try:
+ self.start()
+ except CantUseExternalLoop:
+ try:
+ return self._run_screen_event_loop()
+ finally:
+ self.screen.stop()
+
+ try:
+ self.event_loop.run()
+ except:
+ self.screen.stop() # clean up screen control
+ raise
+ self.stop()
+
+ def _update(self, keys: list[str], raw: list[int]):
+ """
+ >>> w = _refl("widget")
+ >>> w.selectable_rval = True
+ >>> w.mouse_event_rval = True
+ >>> scr = _refl("screen")
+ >>> scr.get_cols_rows_rval = (15, 5)
+ >>> evl = _refl("event_loop")
+ >>> ml = MainLoop(w, [], scr, event_loop=evl)
+ >>> ml._input_timeout = "old timeout"
+ >>> ml._update(['y'], [121]) # doctest:+ELLIPSIS
+ screen.get_cols_rows()
+ widget.selectable()
+ widget.keypress((15, 5), 'y')
+ >>> ml._update([("mouse press", 1, 5, 4)], [])
+ widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True)
+ >>> ml._update([], [])
+ """
+ keys = self.input_filter(keys, raw)
+
+ if keys:
+ self.process_input(keys)
+ if 'window resize' in keys:
+ self.screen_size = None
+
+ def _run_screen_event_loop(self) -> None:
+ """
+ This method is used when the screen does not support using
+ external event loops.
+
+ The alarms stored in the SelectEventLoop in :attr:`event_loop`
+ are modified by this method.
+ """
+ next_alarm = None
+
+ while True:
+ self.draw_screen()
+
+ if not next_alarm and self.event_loop._alarms:
+ next_alarm = heapq.heappop(self.event_loop._alarms)
+
+ keys: list[str] = []
+ raw: list[int] = []
+ while not keys:
+ if next_alarm:
+ sec = max(0, next_alarm[0] - time.time())
+ self.screen.set_input_timeouts(sec)
+ else:
+ self.screen.set_input_timeouts(None)
+ keys, raw = self.screen.get_input(True)
+ if not keys and next_alarm:
+ sec = next_alarm[0] - time.time()
+ if sec <= 0:
+ break
+
+ keys = self.input_filter(keys, raw)
+
+ if keys:
+ self.process_input(keys)
+
+ while next_alarm:
+ sec = next_alarm[0] - time.time()
+ if sec > 0:
+ break
+ tm, tie_break, callback = next_alarm
+ callback()
+
+ if self.event_loop._alarms:
+ next_alarm = heapq.heappop(self.event_loop._alarms)
+ else:
+ next_alarm = None
+
+ if 'window resize' in keys:
+ self.screen_size = None
+
+ def _test_run_screen_event_loop(self):
+ """
+ >>> w = _refl("widget")
+ >>> scr = _refl("screen")
+ >>> scr.get_cols_rows_rval = (10, 5)
+ >>> scr.get_input_rval = [], []
+ >>> ml = MainLoop(w, screen=scr)
+ >>> def stop_now(loop, data):
+ ... raise ExitMainLoop()
+ >>> handle = ml.set_alarm_in(0, stop_now)
+ >>> try:
+ ... ml._run_screen_event_loop()
+ ... except ExitMainLoop:
+ ... pass
+ screen.get_cols_rows()
+ widget.render((10, 5), focus=True)
+ screen.draw_screen((10, 5), None)
+ screen.set_input_timeouts(0)
+ screen.get_input(True)
+ """
+
+ def process_input(self, keys: Iterable[str | tuple[str, int, int, int]]) -> bool:
+ """
+ This method will pass keyboard input and mouse events to :attr:`widget`.
+ This method is called automatically from the :meth:`run` method when
+ there is input, but may also be called to simulate input from the user.
+
+ *keys* is a list of input returned from :attr:`screen`'s get_input()
+ or get_input_nonblocking() methods.
+
+ Returns ``True`` if any key was handled by a widget or the
+ :meth:`unhandled_input` method.
+ """
+ if not self.screen_size:
+ self.screen_size = self.screen.get_cols_rows()
+
+ something_handled = False
+
+ for k in keys:
+ if k == 'window resize':
+ continue
+
+ if isinstance(k, str):
+ if self._topmost_widget.selectable():
+ k = self._topmost_widget.keypress(self.screen_size, k)
+
+ elif isinstance(k, tuple):
+ if is_mouse_event(k):
+ event, button, col, row = k
+ if hasattr(self._topmost_widget, "mouse_event"):
+ if self._topmost_widget.mouse_event(self.screen_size, event, button, col, row, focus=True):
+ k = None
+
+ else:
+ raise TypeError(f"{k!r} is not str | tuple[str, int, int, int]")
+
+ if k:
+ if command_map[k] == REDRAW_SCREEN:
+ self.screen.clear()
+ something_handled = True
+ else:
+ something_handled |= bool(self.unhandled_input(k))
+ else:
+ something_handled = True
+
+ return something_handled
+
+ def _test_process_input(self):
+ """
+ >>> w = _refl("widget")
+ >>> w.selectable_rval = True
+ >>> scr = _refl("screen")
+ >>> scr.get_cols_rows_rval = (10, 5)
+ >>> ml = MainLoop(w, [], scr)
+ >>> ml.process_input(['enter', ('mouse drag', 1, 14, 20)])
+ screen.get_cols_rows()
+ widget.selectable()
+ widget.keypress((10, 5), 'enter')
+ widget.mouse_event((10, 5), 'mouse drag', 1, 14, 20, focus=True)
+ True
+ """
+
+ def input_filter(self, keys: list[str], raw: list[int]) -> list[str]:
+ """
+ This function is passed each all the input events and raw keystroke
+ values. These values are passed to the *input_filter* function
+ passed to the constructor. That function must return a list of keys to
+ be passed to the widgets to handle. If no *input_filter* was
+ defined this implementation will return all the input events.
+ """
+ if self._input_filter:
+ return self._input_filter(keys, raw)
+ return keys
+
+ def unhandled_input(self, input: str | tuple[str, int, int, int]) -> bool:
+ """
+ This function is called with any input that was not handled by the
+ widgets, and calls the *unhandled_input* function passed to the
+ constructor. If no *unhandled_input* was defined then the input
+ will be ignored.
+
+ *input* is the keyboard or mouse input.
+
+ The *unhandled_input* function should return ``True`` if it handled
+ the input.
+ """
+ if self._unhandled_input:
+ return self._unhandled_input(input)
+ return False
+
+ def entering_idle(self):
+ """
+ This method is called whenever the event loop is about to enter the
+ idle state. :meth:`draw_screen` is called here to update the
+ screen when anything has changed.
+ """
+ if self.screen.started:
+ self.draw_screen()
+
+ def draw_screen(self):
+ """
+ Render the widgets and paint the screen. This method is called
+ automatically from :meth:`entering_idle`.
+
+ If you modify the widgets displayed outside of handling input or
+ responding to an alarm you will need to call this method yourself
+ to repaint the screen.
+ """
+ if not self.screen_size:
+ self.screen_size = self.screen.get_cols_rows()
+
+ canvas = self._topmost_widget.render(self.screen_size, focus=True)
+ self.screen.draw_screen(self.screen_size, canvas)
+
+
+def _refl(name: str, rval=None, exit=False):
+ """
+ This function is used to test the main loop classes.
+
+ >>> scr = _refl("screen")
+ >>> scr.function("argument")
+ screen.function('argument')
+ >>> scr.callme(when="now")
+ screen.callme(when='now')
+ >>> scr.want_something_rval = 42
+ >>> x = scr.want_something()
+ screen.want_something()
+ >>> x
+ 42
+
+ """
+ class Reflect:
+ def __init__(self, name: str, rval=None):
+ self._name = name
+ self._rval = rval
+
+ def __call__(self, *argl, **argd):
+ args = ", ".join([repr(a) for a in argl])
+ if args and argd:
+ args = f"{args}, "
+ args += ", ".join([f"{k}={repr(v)}" for k, v in argd.items()])
+ print(f"{self._name}({args})")
+ if exit:
+ raise ExitMainLoop()
+ return self._rval
+
+ def __getattr__(self, attr):
+ if attr.endswith("_rval"):
+ raise AttributeError()
+ #print(self._name+"."+attr)
+ if hasattr(self, f"{attr}_rval"):
+ return Reflect(f"{self._name}.{attr}", getattr(self, f"{attr}_rval"))
+ return Reflect(f"{self._name}.{attr}")
+ return Reflect(name)
+
+
+def _test():
+ import doctest
+ doctest.testmod()
+
+
+if __name__ == '__main__':
+ _test()
diff --git a/urwid/event_loop/select_loop.py b/urwid/event_loop/select_loop.py
new file mode 100644
index 0000000..6f2edd6
--- /dev/null
+++ b/urwid/event_loop/select_loop.py
@@ -0,0 +1,190 @@
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+"""Select based urwid EventLoop implementation."""
+
+from __future__ import annotations
+
+import heapq
+import select
+import time
+import typing
+from collections.abc import Callable, Iterator
+from itertools import count
+
+from .abstract_loop import EventLoop, ExitMainLoop
+
+if typing.TYPE_CHECKING:
+ from typing_extensions import Literal
+
+__all__ = ("SelectEventLoop",)
+
+
+class SelectEventLoop(EventLoop):
+ """
+ Event loop based on :func:`select.select`
+ """
+
+ def __init__(self) -> None:
+ self._alarms: list[tuple[float, int, Callable[[], typing.Any]]] = []
+ self._watch_files: dict[int, Callable[[], typing.Any]] = {}
+ self._idle_handle: int = 0
+ self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {}
+ self._tie_break: Iterator[int] = count()
+ self._did_something: bool = False
+
+ def alarm(
+ self,
+ seconds: float | int,
+ callback: Callable[[], typing.Any],
+ ) -> tuple[float, int, Callable[[], typing.Any]]:
+ """
+ Call callback() a given time from now. No parameters are
+ passed to callback.
+
+ Returns a handle that may be passed to remove_alarm()
+
+ seconds -- floating point time to wait before calling callback
+ callback -- function to call from event loop
+ """
+ tm = time.time() + seconds
+ handle = (tm, next(self._tie_break), callback)
+ heapq.heappush(self._alarms, handle)
+ return handle
+
+ def remove_alarm(self, handle: tuple[float, int, Callable[[], typing.Any]]) -> bool:
+ """
+ Remove an alarm.
+
+ Returns True if the alarm exists, False otherwise
+ """
+ try:
+ self._alarms.remove(handle)
+ heapq.heapify(self._alarms)
+ return True
+ except ValueError:
+ return False
+
+ def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int:
+ """
+ Call callback() when fd has some data to read. No parameters
+ are passed to callback.
+
+ Returns a handle that may be passed to remove_watch_file()
+
+ fd -- file descriptor to watch for input
+ callback -- function to call when input is available
+ """
+ self._watch_files[fd] = callback
+ return fd
+
+ def remove_watch_file(self, handle: int) -> bool:
+ """
+ Remove an input file.
+
+ Returns True if the input file exists, False otherwise
+ """
+ if handle in self._watch_files:
+ del self._watch_files[handle]
+ return True
+ return False
+
+ def enter_idle(self, callback: Callable[[], typing.Any]) -> int:
+ """
+ Add a callback for entering idle.
+
+ Returns a handle that may be passed to remove_idle()
+ """
+ self._idle_handle += 1
+ self._idle_callbacks[self._idle_handle] = callback
+ return self._idle_handle
+
+ def remove_enter_idle(self, handle: int) -> bool:
+ """
+ Remove an idle callback.
+
+ Returns True if the handle was removed.
+ """
+ try:
+ del self._idle_callbacks[handle]
+ except KeyError:
+ return False
+ return True
+
+ def _entering_idle(self) -> None:
+ """
+ Call all the registered idle callbacks.
+ """
+ for callback in self._idle_callbacks.values():
+ callback()
+
+ def run(self) -> None:
+ """
+ Start the event loop. Exit the loop when any callback raises
+ an exception. If ExitMainLoop is raised, exit cleanly.
+ """
+ try:
+ self._did_something = True
+ while True:
+ try:
+ self._loop()
+ except InterruptedError:
+ pass
+ except ExitMainLoop:
+ pass
+
+ def _loop(self) -> None:
+ """
+ A single iteration of the event loop
+ """
+ fds = list(self._watch_files)
+ if self._alarms or self._did_something:
+ timeout = 0.
+ tm: float | Literal['idle'] | None = None
+
+ if self._alarms:
+ timeout_ = self._alarms[0][0]
+ tm = timeout_
+ timeout = max(timeout, timeout_ - time.time())
+
+ if self._did_something and (not self._alarms or (self._alarms and timeout > 0)):
+ timeout = 0.
+ tm = 'idle'
+
+ ready, w, err = select.select(fds, [], fds, timeout)
+
+ else:
+ tm = None
+ ready, w, err = select.select(fds, [], fds)
+
+ if not ready:
+ if tm == 'idle':
+ self._entering_idle()
+ self._did_something = False
+ elif tm is not None:
+ # must have been a timeout
+ tm, tie_break, alarm_callback = heapq.heappop(self._alarms)
+ alarm_callback()
+ self._did_something = True
+
+ for fd in ready:
+ self._watch_files[fd]()
+ self._did_something = True
diff --git a/urwid/event_loop/tornado_loop.py b/urwid/event_loop/tornado_loop.py
new file mode 100644
index 0000000..72f4635
--- /dev/null
+++ b/urwid/event_loop/tornado_loop.py
@@ -0,0 +1,153 @@
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+"""Tornado IOLoop based urwid EventLoop implementation.
+
+Tornado library is required.
+"""
+
+from __future__ import annotations
+
+import functools
+import typing
+from collections.abc import Callable
+
+from tornado import ioloop
+
+from .abstract_loop import EventLoop, ExitMainLoop
+
+if typing.TYPE_CHECKING:
+
+ from typing_extensions import Literal, ParamSpec
+ _Spec = ParamSpec("_Spec")
+ _T = typing.TypeVar("_T")
+
+__all__ = ("TornadoEventLoop",)
+
+
+class TornadoEventLoop(EventLoop):
+ """ This is an Urwid-specific event loop to plug into its MainLoop.
+ It acts as an adaptor for Tornado's IOLoop which does all
+ heavy lifting except idle-callbacks.
+
+ Notice, since Tornado has no concept of idle callbacks we
+ monkey patch ioloop._impl.poll() function to be able to detect
+ potential idle periods.
+ """
+ _idle_emulation_delay = 1.0 / 30 # a short time (in seconds)
+
+ def __init__(self, loop: ioloop.IOLoop | None = None) -> None:
+ if loop:
+ self._loop: ioloop.IOLoop = loop
+ else:
+ self._loop = ioloop.IOLoop.instance()
+
+ self._pending_alarms: dict[object, int] = {}
+ self._watch_handles: dict[int, int] = {} # {<watch_handle> : <file_descriptor>}
+ self._max_watch_handle: int = 0
+ self._exc: BaseException | None = None
+
+ def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]):
+ def wrapped() -> None:
+ try:
+ del self._pending_alarms[handle]
+ except KeyError:
+ pass
+ self.handle_exit(callback)()
+
+ handle = self._loop.add_timeout(self._loop.time() + seconds, wrapped)
+ self._pending_alarms[handle] = 1
+ return handle
+
+ def remove_alarm(self, handle) -> bool:
+ self._loop.remove_timeout(handle)
+ try:
+ del self._pending_alarms[handle]
+ except KeyError:
+ return False
+ else:
+ return True
+
+ def watch_file(self, fd: int, callback: Callable[[], _T]) -> int:
+ def handler(_fd: int, _events: int) -> None:
+ self.handle_exit(callback)()
+
+ self._loop.add_handler(fd, handler, ioloop.IOLoop.READ)
+ self._max_watch_handle += 1
+ handle = self._max_watch_handle
+ self._watch_handles[handle] = fd
+ return handle
+
+ def remove_watch_file(self, handle: int) -> bool:
+ fd = self._watch_handles.pop(handle, None)
+ if fd is None:
+ return False
+ else:
+ self._loop.remove_handler(fd)
+ return True
+
+ def enter_idle(self, callback: Callable[[], typing.Any]) -> list[object]:
+ """
+ Add a callback for entering idle.
+
+ Returns a handle that may be passed to remove_idle()
+ """
+ # XXX there's no such thing as "idle" in most event loops; this fakes
+ # it the same way as Twisted, by scheduling the callback to be called
+ # repeatedly
+ mutable_handle: list[object] = []
+
+ def faux_idle_callback():
+ callback()
+ mutable_handle[0] = self._loop.call_later(self._idle_emulation_delay, faux_idle_callback)
+
+ mutable_handle.append(self._loop.call_later(self._idle_emulation_delay, faux_idle_callback))
+
+ # asyncio used as backend, real type comes from asyncio_loop.call_later
+ return mutable_handle
+
+ def remove_enter_idle(self, handle) -> bool:
+ """
+ Remove an idle callback.
+
+ Returns True if the handle was removed.
+ """
+ # `handle` is just a list containing the current actual handle
+ return self.remove_alarm(handle[0])
+
+ def handle_exit(self, f: Callable[_Spec, _T]) -> Callable[_Spec, _T | Literal[False]]:
+ @functools.wraps(f)
+ def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | Literal[False]:
+ try:
+ return f(*args, **kwargs)
+ except ExitMainLoop:
+ self._loop.stop()
+ except Exception as exc:
+ self._exc = exc
+ self._loop.stop()
+ return False
+ return wrapper
+
+ def run(self) -> None:
+ self._loop.start()
+ if self._exc:
+ exc, self._exc = self._exc, None
+ raise exc.with_traceback(exc.__traceback__)
diff --git a/urwid/_async_kw_event_loop.py b/urwid/event_loop/trio_loop.py
index bc74b0b..6e7670b 100644
--- a/urwid/_async_kw_event_loop.py
+++ b/urwid/event_loop/trio_loop.py
@@ -1,5 +1,3 @@
-#!/usr/bin/python
-#
# Urwid main loop code using Python-3.5 features (Trio, Curio, etc)
# Copyright (C) 2018 Toshio Kuratomi
# Copyright (C) 2019 Tamas Nepusz
@@ -20,9 +18,35 @@
#
# Urwid web site: https://urwid.org/
+"""Trio Runner based urwid EventLoop implementation.
+
+Trio library is required.
+"""
+
from __future__ import annotations
-from .main_loop import EventLoop, ExitMainLoop
+import typing
+from collections.abc import Callable
+
+import trio
+
+from .abstract_loop import EventLoop, ExitMainLoop
+
+__all__ = ("TrioEventLoop",)
+
+
+class _TrioIdleCallbackInstrument(trio.abc.Instrument):
+ """IDLE callbacks emulation helper."""
+
+ __slots__ = ("idle_callbacks",)
+
+ def __init__(self, idle_callbacks):
+ self.idle_callbacks = idle_callbacks
+
+ def before_io_wait(self, timeout):
+ if timeout > 0:
+ for idle_callback in self.idle_callbacks.values():
+ idle_callback()
class TrioEventLoop(EventLoop):
@@ -34,23 +58,17 @@ class TrioEventLoop(EventLoop):
def __init__(self):
"""Constructor."""
- import trio
self._idle_handle = 0
self._idle_callbacks = {}
self._pending_tasks = []
- self._trio = trio
self._nursery = None
self._sleep = trio.sleep
- try:
- self._wait_readable = trio.lowlevel.wait_readable
- except AttributeError:
- # Trio 0.14 or older
- self._wait_readable = trio.hazmat.wait_readable
+ self._wait_readable = trio.lowlevel.wait_readable
- def alarm(self, seconds, callback):
+ def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]):
"""Calls `callback()` a given time from now. No parameters are passed
to the callback.
@@ -63,7 +81,7 @@ class TrioEventLoop(EventLoop):
"""
return self._start_task(self._alarm_task, seconds, callback)
- def enter_idle(self, callback):
+ def enter_idle(self, callback: Callable[[], typing.Any]) -> int:
"""Calls `callback()` when the event loop enters the idle state.
There is no such thing as being idle in a Trio event loop so we
@@ -81,7 +99,7 @@ class TrioEventLoop(EventLoop):
"""
return self._cancel_scope(handle)
- def remove_enter_idle(self, handle):
+ def remove_enter_idle(self, handle) -> bool:
"""Removes an idle callback.
Parameters:
@@ -93,7 +111,7 @@ class TrioEventLoop(EventLoop):
return False
return True
- def remove_watch_file(self, handle):
+ def remove_watch_file(self, handle: trio.CancelScope) -> bool:
"""Removes a file descriptor being watched for input.
Parameters:
@@ -104,7 +122,7 @@ class TrioEventLoop(EventLoop):
"""
return self._cancel_scope(handle)
- def _cancel_scope(self, scope):
+ def _cancel_scope(self, scope: trio.CancelScope) -> bool:
"""Cancels the given Trio cancellation scope.
Returns:
@@ -115,26 +133,16 @@ class TrioEventLoop(EventLoop):
scope.cancel()
return existed
- def run(self):
+ def run(self) -> None:
"""Starts the event loop. Exits the loop when any callback raises an
exception. If ExitMainLoop is raised, exits cleanly.
"""
- idle_callbacks = self._idle_callbacks
-
- # This class is duplicated in run_async(). It would be nice to move
- # this somewhere outside, but we cannot do it yet becase we need to
- # derive it from self._trio.abc.Instrument
- class TrioIdleCallbackInstrument(self._trio.abc.Instrument):
- def before_io_wait(self, timeout):
- if timeout > 0:
- for idle_callback in idle_callbacks.values():
- idle_callback()
+ emulate_idle_callbacks = _TrioIdleCallbackInstrument(self._idle_callbacks)
- emulate_idle_callbacks = TrioIdleCallbackInstrument()
-
- with self._trio.MultiError.catch(self._handle_main_loop_exception):
- self._trio.run(self._main_task, instruments=[emulate_idle_callbacks])
+ # TODO(Aleksei): trio.MultiError is deprecated in favor of exceptiongroup package usage and `Except *`
+ with trio.MultiError.catch(self._handle_main_loop_exception):
+ trio.run(self._main_task, instruments=[emulate_idle_callbacks])
async def run_async(self):
"""Starts the main loop and blocks asynchronously until the main loop
@@ -153,35 +161,16 @@ class TrioEventLoop(EventLoop):
nursery.cancel_scope.cancel()
"""
- idle_callbacks = self._idle_callbacks
-
- # This class is duplicated in run_async(). It would be nice to move
- # this somewhere outside, but we cannot do it yet becase we need to
- # derive it from self._trio.abc.Instrument
- class TrioIdleCallbackInstrument(self._trio.abc.Instrument):
- def before_io_wait(self, timeout):
- if timeout > 0:
- for idle_callback in idle_callbacks.values():
- idle_callback()
+ emulate_idle_callbacks = _TrioIdleCallbackInstrument(self._idle_callbacks)
- emulate_idle_callbacks = TrioIdleCallbackInstrument()
-
- try:
- add_instrument = self._trio.lowlevel.add_instrument
- remove_instrument = self._trio.lowlevel.remove_instrument
- except AttributeError:
- # Trio 0.14 or older
- add_instrument = self._trio.hazmat.add_instrument
- remove_instrument = self._trio.hazmat.remove_instrument
-
- with self._trio.MultiError.catch(self._handle_main_loop_exception):
- add_instrument(emulate_idle_callbacks)
+ with trio.MultiError.catch(self._handle_main_loop_exception):
+ trio.lowlevel.add_instrument(emulate_idle_callbacks)
try:
await self._main_task()
finally:
- remove_instrument(emulate_idle_callbacks)
+ trio.lowlevel.remove_instrument(emulate_idle_callbacks)
- def watch_file(self, fd, callback):
+ def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> trio.CancelScope:
"""Calls `callback()` when the given file descriptor has some data
to read. No parameters are passed to the callback.
@@ -194,7 +183,12 @@ class TrioEventLoop(EventLoop):
"""
return self._start_task(self._watch_task, fd, callback)
- async def _alarm_task(self, scope, seconds, callback):
+ async def _alarm_task(
+ self,
+ scope: trio.CancelScope,
+ seconds: float | int,
+ callback: Callable[[], typing.Any],
+ ) -> None:
"""Asynchronous task that sleeps for a given number of seconds and then
calls the given callback.
@@ -207,7 +201,7 @@ class TrioEventLoop(EventLoop):
await self._sleep(seconds)
callback()
- def _handle_main_loop_exception(self, exc):
+ def _handle_main_loop_exception(self, exc: BaseException) -> BaseException | None:
"""Handles exceptions raised from the main loop, catching ExitMainLoop
instead of letting it propagate through.
@@ -226,9 +220,9 @@ class TrioEventLoop(EventLoop):
exits the app by raising ExitMainLoop.
"""
try:
- async with self._trio.open_nursery() as self._nursery:
+ async with trio.open_nursery() as self._nursery:
self._schedule_pending_tasks()
- await self._trio.sleep_forever()
+ await trio.sleep_forever()
finally:
self._nursery = None
@@ -252,7 +246,7 @@ class TrioEventLoop(EventLoop):
Returns:
a cancellation scope for the Trio task
"""
- scope = self._trio.CancelScope()
+ scope = trio.CancelScope()
if self._nursery:
self._nursery.start_soon(task, scope, *args)
else:
diff --git a/urwid/event_loop/twisted_loop.py b/urwid/event_loop/twisted_loop.py
new file mode 100644
index 0000000..4637c1b
--- /dev/null
+++ b/urwid/event_loop/twisted_loop.py
@@ -0,0 +1,236 @@
+# Urwid main loop code
+# Copyright (C) 2004-2012 Ian Ward
+# Copyright (C) 2008 Walter Mundt
+# Copyright (C) 2009 Andrew Psaltis
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Urwid web site: https://urwid.org/
+
+"""Twisted Reactor based urwid EventLoop implementation.
+
+Twisted library is required.
+"""
+
+from __future__ import annotations
+
+import sys
+import typing
+from collections.abc import Callable
+
+from twisted.internet.abstract import FileDescriptor
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+
+from .abstract_loop import EventLoop, ExitMainLoop
+
+if typing.TYPE_CHECKING:
+ from twisted.internet.interfaces import IReactorFDSet
+ from typing_extensions import ParamSpec
+
+ _Spec = ParamSpec("_Spec")
+ _T = typing.TypeVar("_T")
+
+__all__ = ("TwistedEventLoop",)
+
+
+class _TwistedInputDescriptor(FileDescriptor):
+ def __init__(self, reactor: IReactorFDSet, fd: int, cb: Callable[[], typing.Any]) -> None:
+ self._fileno = fd
+ self.cb = cb
+ super().__init__(reactor)
+
+ def fileno(self) -> int:
+ return self._fileno
+
+ def doRead(self):
+ return self.cb()
+
+
+class TwistedEventLoop(EventLoop):
+ """
+ Event loop based on Twisted_
+ """
+ _idle_emulation_delay = 1.0/256 # a short time (in seconds)
+
+ def __init__(self, reactor=None, manage_reactor: bool = True) -> None:
+ """
+ :param reactor: reactor to use
+ :type reactor: :class:`twisted.internet.reactor`.
+ :param: manage_reactor: `True` if you want this event loop to run
+ and stop the reactor.
+ :type manage_reactor: boolean
+
+ .. WARNING::
+ Twisted's reactor doesn't like to be stopped and run again. If you
+ need to stop and run your :class:`MainLoop`, consider setting
+ ``manage_reactor=False`` and take care of running/stopping the reactor
+ at the beginning/ending of your program yourself.
+
+ You can also forego using :class:`MainLoop`'s run() entirely, and
+ instead call start() and stop() before and after starting the
+ reactor.
+
+ .. _Twisted: https://twisted.org/
+ """
+ if reactor is None:
+ import twisted.internet.reactor
+ reactor = twisted.internet.reactor
+ self.reactor = reactor
+ self._watch_files: dict[int, _TwistedInputDescriptor] = {}
+ self._idle_handle: int = 0
+ self._twisted_idle_enabled = False
+ self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {}
+ self._exc: BaseException | None = None
+ self.manage_reactor = manage_reactor
+ self._enable_twisted_idle()
+
+ def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]):
+ """
+ Call callback() a given time from now. No parameters are
+ passed to callback.
+
+ Returns a handle that may be passed to remove_alarm()
+
+ seconds -- floating point time to wait before calling callback
+ callback -- function to call from event loop
+ """
+ handle = self.reactor.callLater(seconds, self.handle_exit(callback))
+ return handle
+
+ def remove_alarm(self, handle) -> bool:
+ """
+ Remove an alarm.
+
+ Returns True if the alarm exists, False otherwise
+ """
+ try:
+ handle.cancel()
+ return True
+ except AlreadyCancelled:
+ return False
+ except AlreadyCalled:
+ return False
+
+ def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int:
+ """
+ Call callback() when fd has some data to read. No parameters
+ are passed to callback.
+
+ Returns a handle that may be passed to remove_watch_file()
+
+ fd -- file descriptor to watch for input
+ callback -- function to call when input is available
+ """
+ ind = _TwistedInputDescriptor(self.reactor, fd, self.handle_exit(callback))
+ self._watch_files[fd] = ind
+ self.reactor.addReader(ind)
+ return fd
+
+ def remove_watch_file(self, handle: int) -> bool:
+ """
+ Remove an input file.
+
+ Returns True if the input file exists, False otherwise
+ """
+ if handle in self._watch_files:
+ self.reactor.removeReader(self._watch_files[handle])
+ del self._watch_files[handle]
+ return True
+ return False
+
+ def enter_idle(self, callback: Callable[[], typing.Any]) -> int:
+ """
+ Add a callback for entering idle.
+
+ Returns a handle that may be passed to remove_enter_idle()
+ """
+ self._idle_handle += 1
+ self._idle_callbacks[self._idle_handle] = callback
+ return self._idle_handle
+
+ def _enable_twisted_idle(self) -> None:
+ """
+ Twisted's reactors don't have an idle or enter-idle callback
+ so the best we can do for now is to set a timer event in a very
+ short time to approximate an enter-idle callback.
+
+ .. WARNING::
+ This will perform worse than the other event loops until we can find a
+ fix or workaround
+ """
+ if self._twisted_idle_enabled:
+ return
+ self.reactor.callLater(
+ self._idle_emulation_delay,
+ self.handle_exit(self._twisted_idle_callback, enable_idle=False),
+ )
+ self._twisted_idle_enabled = True
+
+ def _twisted_idle_callback(self) -> None:
+ for callback in self._idle_callbacks.values():
+ callback()
+ self._twisted_idle_enabled = False
+
+ def remove_enter_idle(self, handle) -> bool:
+ """
+ Remove an idle callback.
+
+ Returns True if the handle was removed.
+ """
+ try:
+ del self._idle_callbacks[handle]
+ except KeyError:
+ return False
+ return True
+
+ def run(self) -> None:
+ """
+ Start the event loop. Exit the loop when any callback raises
+ an exception. If ExitMainLoop is raised, exit cleanly.
+ """
+ if not self.manage_reactor:
+ return
+ self.reactor.run()
+ if self._exc:
+ # An exception caused us to exit, raise it now
+ exc = self._exc
+ self._exc = None
+ raise exc.with_traceback(exc.__traceback__)
+
+ def handle_exit(self, f: Callable[_Spec, _T], enable_idle: bool = True) -> Callable[_Spec, _T | None]:
+ """
+ Decorator that cleanly exits the :class:`TwistedEventLoop` if
+ :class:`ExitMainLoop` is thrown inside of the wrapped function. Store the
+ exception info if some other exception occurs, it will be reraised after
+ the loop quits.
+
+ *f* -- function to be wrapped
+ """
+ def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | None:
+ rval = None
+ try:
+ rval = f(*args, **kwargs)
+ except ExitMainLoop:
+ if self.manage_reactor:
+ self.reactor.stop()
+ except BaseException as exc:
+ print(sys.exc_info())
+ self._exc = exc
+ if self.manage_reactor:
+ self.reactor.crash()
+ if enable_idle:
+ self._enable_twisted_idle()
+ return rval
+ return wrapper
diff --git a/urwid/html_fragment.py b/urwid/html_fragment.py
index 1f56509..006a06b 100755
--- a/urwid/html_fragment.py
+++ b/urwid/html_fragment.py
@@ -30,7 +30,7 @@ import typing
from urwid import util
from urwid.display_common import AttrSpec, BaseScreen
-from urwid.main_loop import ExitMainLoop
+from urwid.event_loop import ExitMainLoop
if typing.TYPE_CHECKING:
from typing_extensions import Literal
diff --git a/urwid/main_loop.py b/urwid/main_loop.py
deleted file mode 100755
index 5cc7514..0000000
--- a/urwid/main_loop.py
+++ /dev/null
@@ -1,1589 +0,0 @@
-#!/usr/bin/python
-#
-# Urwid main loop code
-# Copyright (C) 2004-2012 Ian Ward
-# Copyright (C) 2008 Walter Mundt
-# Copyright (C) 2009 Andrew Psaltis
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# Urwid web site: https://urwid.org/
-
-
-from __future__ import annotations
-
-import heapq
-import os
-import select
-import signal
-import sys
-import time
-import typing
-import warnings
-from collections.abc import Callable, Iterable
-from functools import wraps
-from itertools import count
-from weakref import WeakKeyDictionary
-
-try:
- import fcntl
-except ImportError:
- pass # windows
-
-from urwid import signals
-from urwid.command_map import REDRAW_SCREEN, command_map
-from urwid.compat import reraise
-from urwid.display_common import INPUT_DESCRIPTORS_CHANGED
-from urwid.util import StoppingContext, is_mouse_event
-from urwid.wimp import PopUpTarget
-
-if typing.TYPE_CHECKING:
- from .display_common import BaseScreen
- from .widget import Widget
-
-PIPE_BUFFER_READ_SIZE = 4096 # can expect this much on Linux, so try for that
-
-
-class ExitMainLoop(Exception):
- """
- When this exception is raised within a main loop the main loop
- will exit cleanly.
- """
- pass
-
-
-class CantUseExternalLoop(Exception):
- pass
-
-
-class MainLoop:
- """
- This is the standard main loop implementation for a single interactive
- session.
-
- :param widget: the topmost widget used for painting the screen, stored as
- :attr:`widget` and may be modified. Must be a box widget.
- :type widget: widget instance
-
- :param palette: initial palette for screen
- :type palette: iterable of palette entries
-
- :param screen: screen to use, default is a new :class:`raw_display.Screen`
- instance; stored as :attr:`screen`
- :type screen: display module screen instance
-
- :param handle_mouse: ``True`` to ask :attr:`.screen` to process mouse events
- :type handle_mouse: bool
-
- :param input_filter: a function to filter input before sending it to
- :attr:`.widget`, called from :meth:`.input_filter`
- :type input_filter: callable
-
- :param unhandled_input: a function called when input is not handled by
- :attr:`.widget`, called from :meth:`.unhandled_input`
- :type unhandled_input: callable
-
- :param event_loop: if :attr:`.screen` supports external an event loop it may be
- given here, default is a new :class:`SelectEventLoop` instance;
- stored as :attr:`.event_loop`
- :type event_loop: event loop instance
-
- :param pop_ups: `True` to wrap :attr:`.widget` with a :class:`PopUpTarget`
- instance to allow any widget to open a pop-up anywhere on the screen
- :type pop_ups: boolean
-
-
- .. attribute:: screen
-
- The screen object this main loop uses for screen updates and reading input
-
- .. attribute:: event_loop
-
- The event loop object this main loop uses for waiting on alarms and IO
- """
-
- def __init__(
- self,
- widget: Widget,
- palette=(),
- screen: BaseScreen | None = None,
- handle_mouse: bool = True,
- input_filter: Callable[[list[str], list[int]], list[str]] | None = None,
- unhandled_input: Callable[[str], bool] | None = None,
- event_loop=None,
- pop_ups: bool = False,
- ):
- self._widget = widget
- self.handle_mouse = handle_mouse
- self.pop_ups = pop_ups # triggers property setting side-effect
-
- if not screen:
- from urwid import raw_display
- screen = raw_display.Screen()
-
- if palette:
- screen.register_palette(palette)
-
- self.screen = screen
- self.screen_size = None
-
- self._unhandled_input = unhandled_input
- self._input_filter = input_filter
-
- if not hasattr(screen, 'hook_event_loop') and event_loop is not None:
- raise NotImplementedError(f"screen object passed {screen!r} does not support external event loops")
- if event_loop is None:
- event_loop = SelectEventLoop()
- self.event_loop = event_loop
-
- if hasattr(self.screen, 'signal_handler_setter'):
- # Tell the screen what function it must use to set
- # signal handlers
- self.screen.signal_handler_setter = self.event_loop.set_signal_handler
-
- self._watch_pipes = {}
-
- @property
- def widget(self) -> Widget:
- """
- Property for the topmost widget used to draw the screen.
- This must be a box widget.
- """
- return self._widget
-
- @widget.setter
- def widget(self, widget: Widget) -> None:
- self._widget = widget
- if self.pop_ups:
- self._topmost_widget.original_widget = self._widget
- else:
- self._topmost_widget = self._widget
-
- def _set_widget(self, widget: Widget) -> None:
- warnings.warn(
- f"method `{self.__class__.__name__}._set_widget` is deprecated, "
- f"please use `{self.__class__.__name__}.widget` property",
- DeprecationWarning,
- stacklevel=2,
- )
- self.widget = widget
-
- @property
- def pop_ups(self):
- return self._pop_ups
-
- @pop_ups.setter
- def pop_ups(self, pop_ups) -> None:
- self._pop_ups = pop_ups
- if pop_ups:
- self._topmost_widget = PopUpTarget(self._widget)
- else:
- self._topmost_widget = self._widget
-
- def _set_pop_ups(self, pop_ups) -> None:
- warnings.warn(
- f"method `{self.__class__.__name__}._set_pop_ups` is deprecated, "
- f"please use `{self.__class__.__name__}.pop_ups` property",
- DeprecationWarning,
- stacklevel=2,
- )
- self.pop_ups = pop_ups
-
- def set_alarm_in(self, sec, callback, user_data=None):
- """
- Schedule an alarm in *sec* seconds that will call *callback* from the
- within the :meth:`run` method.
-
- :param sec: seconds until alarm
- :type sec: float
- :param callback: function to call with two parameters: this main loop
- object and *user_data*
- :type callback: callable
- """
- def cb():
- callback(self, user_data)
- return self.event_loop.alarm(sec, cb)
-
- def set_alarm_at(self, tm, callback, user_data=None):
- """
- Schedule an alarm at *tm* time that will call *callback* from the
- within the :meth:`run` function. Returns a handle that may be passed to
- :meth:`remove_alarm`.
-
- :param tm: time to call callback e.g. ``time.time() + 5``
- :type tm: float
- :param callback: function to call with two parameters: this main loop
- object and *user_data*
- :type callback: callable
- """
- def cb():
- callback(self, user_data)
- return self.event_loop.alarm(tm - time.time(), cb)
-
- def remove_alarm(self, handle):
- """
- Remove an alarm. Return ``True`` if *handle* was found, ``False``
- otherwise.
- """
- return self.event_loop.remove_alarm(handle)
-
- def watch_pipe(self, callback):
- """
- Create a pipe for use by a subprocess or thread to trigger a callback
- in the process/thread running the main loop.
-
- :param callback: function taking one parameter to call from within
- the process/thread running the main loop
- :type callback: callable
-
- This method returns a file descriptor attached to the write end of a
- pipe. The read end of the pipe is added to the list of files
- :attr:`event_loop` is watching. When data is written to the pipe the
- callback function will be called and passed a single value containing
- data read from the pipe.
-
- This method may be used any time you want to update widgets from
- another thread or subprocess.
-
- Data may be written to the returned file descriptor with
- ``os.write(fd, data)``. Ensure that data is less than 512 bytes (or 4K
- on Linux) so that the callback will be triggered just once with the
- complete value of data passed in.
-
- If the callback returns ``False`` then the watch will be removed from
- :attr:`event_loop` and the read end of the pipe will be closed. You
- are responsible for closing the write end of the pipe with
- ``os.close(fd)``.
- """
- pipe_rd, pipe_wr = os.pipe()
- fcntl.fcntl(pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK)
- watch_handle = None
-
- def cb():
- data = os.read(pipe_rd, PIPE_BUFFER_READ_SIZE)
- rval = callback(data)
- if rval is False:
- self.event_loop.remove_watch_file(watch_handle)
- os.close(pipe_rd)
-
- watch_handle = self.event_loop.watch_file(pipe_rd, cb)
- self._watch_pipes[pipe_wr] = (watch_handle, pipe_rd)
- return pipe_wr
-
- def remove_watch_pipe(self, write_fd):
- """
- Close the read end of the pipe and remove the watch created by
- :meth:`watch_pipe`. You are responsible for closing the write end of
- the pipe.
-
- Returns ``True`` if the watch pipe exists, ``False`` otherwise
- """
- try:
- watch_handle, pipe_rd = self._watch_pipes.pop(write_fd)
- except KeyError:
- return False
-
- if not self.event_loop.remove_watch_file(watch_handle):
- return False
- os.close(pipe_rd)
- return True
-
- def watch_file(self, fd, callback):
- """
- Call *callback* when *fd* has some data to read. No parameters are
- passed to callback.
-
- Returns a handle that may be passed to :meth:`remove_watch_file`.
- """
- return self.event_loop.watch_file(fd, callback)
-
- def remove_watch_file(self, handle):
- """
- Remove a watch file. Returns ``True`` if the watch file
- exists, ``False`` otherwise.
- """
- return self.event_loop.remove_watch_file(handle)
-
-
- def run(self):
- """
- Start the main loop handling input events and updating the screen. The
- loop will continue until an :exc:`ExitMainLoop` exception is raised.
-
- If you would prefer to manage the event loop yourself, don't use this
- method. Instead, call :meth:`start` before starting the event loop,
- and :meth:`stop` once it's finished.
- """
- try:
- self._run()
- except ExitMainLoop:
- pass
-
- def _test_run(self):
- """
- >>> w = _refl("widget") # _refl prints out function calls
- >>> w.render_rval = "fake canvas" # *_rval is used for return values
- >>> scr = _refl("screen")
- >>> scr.get_input_descriptors_rval = [42]
- >>> scr.get_cols_rows_rval = (20, 10)
- >>> scr.started = True
- >>> scr._urwid_signals = {}
- >>> evl = _refl("event_loop")
- >>> evl.enter_idle_rval = 1
- >>> evl.watch_file_rval = 2
- >>> ml = MainLoop(w, [], scr, event_loop=evl)
- >>> ml.run() # doctest:+ELLIPSIS
- screen.start()
- screen.set_mouse_tracking()
- screen.unhook_event_loop(...)
- screen.hook_event_loop(...)
- event_loop.enter_idle(<bound method MainLoop.entering_idle...>)
- event_loop.run()
- event_loop.remove_enter_idle(1)
- screen.unhook_event_loop(...)
- screen.stop()
- >>> ml.draw_screen() # doctest:+ELLIPSIS
- screen.get_cols_rows()
- widget.render((20, 10), focus=True)
- screen.draw_screen((20, 10), 'fake canvas')
- """
-
- def start(self):
- """
- Sets up the main loop, hooking into the event loop where necessary.
- Starts the :attr:`screen` if it hasn't already been started.
-
- If you want to control starting and stopping the event loop yourself,
- you should call this method before starting, and call `stop` once the
- loop has finished. You may also use this method as a context manager,
- which will stop the loop automatically at the end of the block:
-
- with main_loop.start():
- ...
-
- Note that some event loop implementations don't handle exceptions
- specially if you manage the event loop yourself. In particular, the
- Twisted and asyncio loops won't stop automatically when
- :exc:`ExitMainLoop` (or anything else) is raised.
- """
- self.screen.start()
-
- if self.handle_mouse:
- self.screen.set_mouse_tracking()
-
- if not hasattr(self.screen, 'hook_event_loop'):
- raise CantUseExternalLoop(
- "Screen {0!r} doesn't support external event loops")
-
- try:
- signals.connect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED,
- self._reset_input_descriptors)
- except NameError:
- pass
- # watch our input descriptors
- self._reset_input_descriptors()
- self.idle_handle = self.event_loop.enter_idle(self.entering_idle)
-
- return StoppingContext(self)
-
- def stop(self):
- """
- Cleans up any hooks added to the event loop. Only call this if you're
- managing the event loop yourself, after the loop stops.
- """
- self.event_loop.remove_enter_idle(self.idle_handle)
- del self.idle_handle
- signals.disconnect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED,
- self._reset_input_descriptors)
- self.screen.unhook_event_loop(self.event_loop)
-
- self.screen.stop()
-
- def _reset_input_descriptors(self):
- self.screen.unhook_event_loop(self.event_loop)
- self.screen.hook_event_loop(self.event_loop, self._update)
-
- def _run(self):
- try:
- self.start()
- except CantUseExternalLoop:
- try:
- return self._run_screen_event_loop()
- finally:
- self.screen.stop()
-
- try:
- self.event_loop.run()
- except:
- self.screen.stop() # clean up screen control
- raise
- self.stop()
-
- def _update(self, keys: list[str], raw: list[int]):
- """
- >>> w = _refl("widget")
- >>> w.selectable_rval = True
- >>> w.mouse_event_rval = True
- >>> scr = _refl("screen")
- >>> scr.get_cols_rows_rval = (15, 5)
- >>> evl = _refl("event_loop")
- >>> ml = MainLoop(w, [], scr, event_loop=evl)
- >>> ml._input_timeout = "old timeout"
- >>> ml._update(['y'], [121]) # doctest:+ELLIPSIS
- screen.get_cols_rows()
- widget.selectable()
- widget.keypress((15, 5), 'y')
- >>> ml._update([("mouse press", 1, 5, 4)], [])
- widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True)
- >>> ml._update([], [])
- """
- keys = self.input_filter(keys, raw)
-
- if keys:
- self.process_input(keys)
- if 'window resize' in keys:
- self.screen_size = None
-
- def _run_screen_event_loop(self) -> None:
- """
- This method is used when the screen does not support using
- external event loops.
-
- The alarms stored in the SelectEventLoop in :attr:`event_loop`
- are modified by this method.
- """
- next_alarm = None
-
- while True:
- self.draw_screen()
-
- if not next_alarm and self.event_loop._alarms:
- next_alarm = heapq.heappop(self.event_loop._alarms)
-
- keys: list[str] = []
- raw: list[int] = []
- while not keys:
- if next_alarm:
- sec = max(0, next_alarm[0] - time.time())
- self.screen.set_input_timeouts(sec)
- else:
- self.screen.set_input_timeouts(None)
- keys, raw = self.screen.get_input(True)
- if not keys and next_alarm:
- sec = next_alarm[0] - time.time()
- if sec <= 0:
- break
-
- keys = self.input_filter(keys, raw)
-
- if keys:
- self.process_input(keys)
-
- while next_alarm:
- sec = next_alarm[0] - time.time()
- if sec > 0:
- break
- tm, tie_break, callback = next_alarm
- callback()
-
- if self.event_loop._alarms:
- next_alarm = heapq.heappop(self.event_loop._alarms)
- else:
- next_alarm = None
-
- if 'window resize' in keys:
- self.screen_size = None
-
- def _test_run_screen_event_loop(self):
- """
- >>> w = _refl("widget")
- >>> scr = _refl("screen")
- >>> scr.get_cols_rows_rval = (10, 5)
- >>> scr.get_input_rval = [], []
- >>> ml = MainLoop(w, screen=scr)
- >>> def stop_now(loop, data):
- ... raise ExitMainLoop()
- >>> handle = ml.set_alarm_in(0, stop_now)
- >>> try:
- ... ml._run_screen_event_loop()
- ... except ExitMainLoop:
- ... pass
- screen.get_cols_rows()
- widget.render((10, 5), focus=True)
- screen.draw_screen((10, 5), None)
- screen.set_input_timeouts(0)
- screen.get_input(True)
- """
-
- def process_input(self, keys: Iterable[str]) -> bool:
- """
- This method will pass keyboard input and mouse events to :attr:`widget`.
- This method is called automatically from the :meth:`run` method when
- there is input, but may also be called to simulate input from the user.
-
- *keys* is a list of input returned from :attr:`screen`'s get_input()
- or get_input_nonblocking() methods.
-
- Returns ``True`` if any key was handled by a widget or the
- :meth:`unhandled_input` method.
- """
- if not self.screen_size:
- self.screen_size = self.screen.get_cols_rows()
-
- something_handled = False
-
- for k in keys:
- if k == 'window resize':
- continue
- if is_mouse_event(k):
- event, button, col, row = k
- if hasattr(self._topmost_widget, "mouse_event"):
- if self._topmost_widget.mouse_event(self.screen_size, event, button, col, row, focus=True):
- k = None
- elif self._topmost_widget.selectable():
- k = self._topmost_widget.keypress(self.screen_size, k)
- if k:
- if command_map[k] == REDRAW_SCREEN:
- self.screen.clear()
- something_handled = True
- else:
- something_handled |= bool(self.unhandled_input(k))
- else:
- something_handled = True
-
- return something_handled
-
- def _test_process_input(self):
- """
- >>> w = _refl("widget")
- >>> w.selectable_rval = True
- >>> scr = _refl("screen")
- >>> scr.get_cols_rows_rval = (10, 5)
- >>> ml = MainLoop(w, [], scr)
- >>> ml.process_input(['enter', ('mouse drag', 1, 14, 20)])
- screen.get_cols_rows()
- widget.selectable()
- widget.keypress((10, 5), 'enter')
- widget.mouse_event((10, 5), 'mouse drag', 1, 14, 20, focus=True)
- True
- """
-
- def input_filter(self, keys: list[str], raw: list[int]) -> list[str]:
- """
- This function is passed each all the input events and raw keystroke
- values. These values are passed to the *input_filter* function
- passed to the constructor. That function must return a list of keys to
- be passed to the widgets to handle. If no *input_filter* was
- defined this implementation will return all the input events.
- """
- if self._input_filter:
- return self._input_filter(keys, raw)
- return keys
-
- def unhandled_input(self, input: str) -> bool:
- """
- This function is called with any input that was not handled by the
- widgets, and calls the *unhandled_input* function passed to the
- constructor. If no *unhandled_input* was defined then the input
- will be ignored.
-
- *input* is the keyboard or mouse input.
-
- The *unhandled_input* function should return ``True`` if it handled
- the input.
- """
- if self._unhandled_input:
- return self._unhandled_input(input)
-
- def entering_idle(self):
- """
- This method is called whenever the event loop is about to enter the
- idle state. :meth:`draw_screen` is called here to update the
- screen when anything has changed.
- """
- if self.screen.started:
- self.draw_screen()
-
- def draw_screen(self):
- """
- Render the widgets and paint the screen. This method is called
- automatically from :meth:`entering_idle`.
-
- If you modify the widgets displayed outside of handling input or
- responding to an alarm you will need to call this method yourself
- to repaint the screen.
- """
- if not self.screen_size:
- self.screen_size = self.screen.get_cols_rows()
-
- canvas = self._topmost_widget.render(self.screen_size, focus=True)
- self.screen.draw_screen(self.screen_size, canvas)
-
-
-class EventLoop:
- """
- Abstract class representing an event loop to be used by :class:`MainLoop`.
- """
-
- def alarm(self, seconds: float | int, callback):
- """
- Call callback() a given time from now. No parameters are
- passed to callback.
-
- This method has no default implementation.
-
- Returns a handle that may be passed to remove_alarm()
-
- seconds -- floating point time to wait before calling callback
- callback -- function to call from event loop
- """
- raise NotImplementedError()
-
- def enter_idle(self, callback):
- """
- Add a callback for entering idle.
-
- This method has no default implementation.
-
- Returns a handle that may be passed to remove_idle()
- """
- raise NotImplementedError()
-
- def remove_alarm(self, handle):
- """
- Remove an alarm.
-
- This method has no default implementation.
-
- Returns True if the alarm exists, False otherwise
- """
- raise NotImplementedError()
-
- def remove_enter_idle(self, handle):
- """
- Remove an idle callback.
-
- This method has no default implementation.
-
- Returns True if the handle was removed.
- """
- raise NotImplementedError()
-
- def remove_watch_file(self, handle):
- """
- Remove an input file.
-
- This method has no default implementation.
-
- Returns True if the input file exists, False otherwise
- """
- raise NotImplementedError()
-
- def run(self):
- """
- Start the event loop. Exit the loop when any callback raises
- an exception. If ExitMainLoop is raised, exit cleanly.
-
- This method has no default implementation.
- """
- raise NotImplementedError()
-
- def watch_file(self, fd, callback):
- """
- Call callback() when fd has some data to read. No parameters
- are passed to callback.
-
- This method has no default implementation.
-
- Returns a handle that may be passed to remove_watch_file()
-
- fd -- file descriptor to watch for input
- callback -- function to call when input is available
- """
- raise NotImplementedError()
-
- def set_signal_handler(self, signum, handler):
- """
- Sets the signal handler for signal signum.
-
- The default implementation of :meth:`set_signal_handler`
- is simply a proxy function that calls :func:`signal.signal()`
- and returns the resulting value.
-
- signum -- signal number
- handler -- function (taking signum as its single argument),
- or `signal.SIG_IGN`, or `signal.SIG_DFL`
- """
- return signal.signal(signum, handler)
-
-
-class SelectEventLoop(EventLoop):
- """
- Event loop based on :func:`select.select`
- """
-
- def __init__(self):
- self._alarms = []
- self._watch_files = {}
- self._idle_handle = 0
- self._idle_callbacks = {}
- self._tie_break = count()
-
- def alarm(self, seconds: float | int, callback):
- """
- Call callback() a given time from now. No parameters are
- passed to callback.
-
- Returns a handle that may be passed to remove_alarm()
-
- seconds -- floating point time to wait before calling callback
- callback -- function to call from event loop
- """
- tm = time.time() + seconds
- handle = (tm, next(self._tie_break), callback)
- heapq.heappush(self._alarms, handle)
- return handle
-
- def remove_alarm(self, handle):
- """
- Remove an alarm.
-
- Returns True if the alarm exists, False otherwise
- """
- try:
- self._alarms.remove(handle)
- heapq.heapify(self._alarms)
- return True
- except ValueError:
- return False
-
- def watch_file(self, fd, callback):
- """
- Call callback() when fd has some data to read. No parameters
- are passed to callback.
-
- Returns a handle that may be passed to remove_watch_file()
-
- fd -- file descriptor to watch for input
- callback -- function to call when input is available
- """
- self._watch_files[fd] = callback
- return fd
-
- def remove_watch_file(self, handle):
- """
- Remove an input file.
-
- Returns True if the input file exists, False otherwise
- """
- if handle in self._watch_files:
- del self._watch_files[handle]
- return True
- return False
-
- def enter_idle(self, callback):
- """
- Add a callback for entering idle.
-
- Returns a handle that may be passed to remove_idle()
- """
- self._idle_handle += 1
- self._idle_callbacks[self._idle_handle] = callback
- return self._idle_handle
-
- def remove_enter_idle(self, handle):
- """
- Remove an idle callback.
-
- Returns True if the handle was removed.
- """
- try:
- del self._idle_callbacks[handle]
- except KeyError:
- return False
- return True
-
- def _entering_idle(self):
- """
- Call all the registered idle callbacks.
- """
- for callback in self._idle_callbacks.values():
- callback()
-
- def run(self):
- """
- Start the event loop. Exit the loop when any callback raises
- an exception. If ExitMainLoop is raised, exit cleanly.
- """
- try:
- self._did_something = True
- while True:
- try:
- self._loop()
- except OSError as e:
- if e.args[0] != 4:
- # not just something we need to retry
- raise
- except ExitMainLoop:
- pass
-
- def _loop(self):
- """
- A single iteration of the event loop
- """
- fds = list(self._watch_files.keys())
- if self._alarms or self._did_something:
- if self._alarms:
- tm = self._alarms[0][0]
- timeout = max(0, tm - time.time())
- if self._did_something and (not self._alarms or (self._alarms and timeout > 0)):
- timeout = 0
- tm = 'idle'
- ready, w, err = select.select(fds, [], fds, timeout)
- else:
- tm = None
- ready, w, err = select.select(fds, [], fds)
-
- if not ready:
- if tm == 'idle':
- self._entering_idle()
- self._did_something = False
- elif tm is not None:
- # must have been a timeout
- tm, tie_break, alarm_callback = heapq.heappop(self._alarms)
- alarm_callback()
- self._did_something = True
-
- for fd in ready:
- self._watch_files[fd]()
- self._did_something = True
-
-
-class GLibEventLoop(EventLoop):
- """
- Event loop based on GLib.MainLoop
- """
-
- def __init__(self):
- from gi.repository import GLib
- self.GLib = GLib
- self._alarms = []
- self._watch_files = {}
- self._idle_handle = 0
- self._glib_idle_enabled = False # have we called glib.idle_add?
- self._idle_callbacks = {}
- self._loop = GLib.MainLoop()
- self._exc_info = None
- self._enable_glib_idle()
- self._signal_handlers = {}
-
- def alarm(self, seconds: float | int, callback):
- """
- Call callback() a given time from now. No parameters are
- passed to callback.
-
- Returns a handle that may be passed to remove_alarm()
-
- seconds -- floating point time to wait before calling callback
- callback -- function to call from event loop
- """
- @self.handle_exit
- def ret_false():
- callback()
- self._enable_glib_idle()
- return False
- fd = self.GLib.timeout_add(int(seconds*1000), ret_false)
- self._alarms.append(fd)
- return (fd, callback)
-
- def set_signal_handler(self, signum, handler):
- """
- Sets the signal handler for signal signum.
-
- .. WARNING::
- Because this method uses the `GLib`-specific `unix_signal_add`
- function, its behaviour is different than `signal.signal().`
-
- If `signum` is not `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGUSR1`,
- `SIGUSR2` or `SIGWINCH`, this method performs no actions and
- immediately returns None.
-
- Returns None in all cases (unlike :func:`signal.signal()`).
- ..
-
- signum -- signal number
- handler -- function (taking signum as its single argument),
- or `signal.SIG_IGN`, or `signal.SIG_DFL`
- """
- glib_signals = [
- signal.SIGHUP,
- signal.SIGINT,
- signal.SIGTERM,
- signal.SIGUSR1,
- signal.SIGUSR2,
- ]
-
- # GLib supports SIGWINCH as of version 2.54.
- if not self.GLib.check_version(2, 54, 0):
- glib_signals.append(signal.SIGWINCH)
-
- if signum not in glib_signals:
- # The GLib event loop supports only the signals listed above
- return
-
- if signum in self._signal_handlers:
- self.GLib.source_remove(self._signal_handlers.pop(signum))
-
- if handler == signal.SIG_IGN:
- handler = lambda x: None
- elif handler == signal.SIG_DFL:
- return
-
- def final_handler(signal_number):
- handler(signal_number)
- return self.GLib.SOURCE_CONTINUE
-
- source = self.GLib.unix_signal_add(self.GLib.PRIORITY_DEFAULT, signum, final_handler, signum)
- self._signal_handlers[signum] = source
-
- def remove_alarm(self, handle):
- """
- Remove an alarm.
-
- Returns True if the alarm exists, False otherwise
- """
- try:
- self._alarms.remove(handle[0])
- self.GLib.source_remove(handle[0])
- return True
- except ValueError:
- return False
-
- def watch_file(self, fd, callback):
- """
- Call callback() when fd has some data to read. No parameters
- are passed to callback.
-
- Returns a handle that may be passed to remove_watch_file()
-
- fd -- file descriptor to watch for input
- callback -- function to call when input is available
- """
- @self.handle_exit
- def io_callback(source, cb_condition):
- callback()
- self._enable_glib_idle()
- return True
- self._watch_files[fd] = self.GLib.io_add_watch(fd, self.GLib.IO_IN, io_callback)
- return fd
-
- def remove_watch_file(self, handle):
- """
- Remove an input file.
-
- Returns True if the input file exists, False otherwise
- """
- if handle in self._watch_files:
- self.GLib.source_remove(self._watch_files[handle])
- del self._watch_files[handle]
- return True
- return False
-
- def enter_idle(self, callback):
- """
- Add a callback for entering idle.
-
- Returns a handle that may be passed to remove_enter_idle()
- """
- self._idle_handle += 1
- self._idle_callbacks[self._idle_handle] = callback
- return self._idle_handle
-
- def _enable_glib_idle(self):
- if self._glib_idle_enabled:
- return
- self.GLib.idle_add(self._glib_idle_callback)
- self._glib_idle_enabled = True
-
- def _glib_idle_callback(self):
- for callback in self._idle_callbacks.values():
- callback()
- self._glib_idle_enabled = False
- return False # ask glib not to call again (or we would be called
-
- def remove_enter_idle(self, handle):
- """
- Remove an idle callback.
-
- Returns True if the handle was removed.
- """
- try:
- del self._idle_callbacks[handle]
- except KeyError:
- return False
- return True
-
- def run(self):
- """
- Start the event loop. Exit the loop when any callback raises
- an exception. If ExitMainLoop is raised, exit cleanly.
- """
- try:
- self._loop.run()
- finally:
- if self._loop.is_running():
- self._loop.quit()
- if self._exc_info:
- # An exception caused us to exit, raise it now
- exc_info = self._exc_info
- self._exc_info = None
- reraise(*exc_info)
-
- def handle_exit(self, f):
- """
- Decorator that cleanly exits the :class:`GLibEventLoop` if
- :exc:`ExitMainLoop` is thrown inside of the wrapped function. Store the
- exception info if some other exception occurs, it will be reraised after
- the loop quits.
-
- *f* -- function to be wrapped
- """
- def wrapper(*args, **kargs):
- try:
- return f(*args, **kargs)
- except ExitMainLoop:
- self._loop.quit()
- except:
- self._exc_info = sys.exc_info()
- if self._loop.is_running():
- self._loop.quit()
- return False
- return wrapper
-
-
-class TornadoEventLoop(EventLoop):
- """ This is an Urwid-specific event loop to plug into its MainLoop.
- It acts as an adaptor for Tornado's IOLoop which does all
- heavy lifting except idle-callbacks.
-
- Notice, since Tornado has no concept of idle callbacks we
- monkey patch ioloop._impl.poll() function to be able to detect
- potential idle periods.
- """
- _ioloop_registry = WeakKeyDictionary() # {<ioloop> : {<handle> : <idle_func>}}
- _max_idle_handle = 0
-
- class PollProxy:
- """ A simple proxy for a Python's poll object that wraps the .poll() method
- in order to detect idle periods and call Urwid callbacks
- """
- def __init__(self, poll_obj, idle_map):
- self.__poll_obj = poll_obj
- self.__idle_map = idle_map
- self._idle_done = False
- self._prev_timeout = 0
-
- def __getattr__(self, name: str):
- return getattr(self.__poll_obj, name)
-
- def poll(self, timeout):
- if timeout > self._prev_timeout:
- # if timeout increased we assume a timer event was handled
- self._idle_done = False
- self._prev_timeout = timeout
- start = time.time()
-
- # any IO pending wins
- events = self.__poll_obj.poll(0)
- if events:
- self._idle_done = False
- return events
-
- # our chance to enter idle
- if not self._idle_done:
- for callback in self.__idle_map.values():
- callback()
- self._idle_done = True
-
- # then complete the actual request (adjusting timeout)
- timeout = max(0, min(timeout, timeout + start - time.time()))
- events = self.__poll_obj.poll(timeout)
- if events:
- self._idle_done = False
- return events
-
- @classmethod
- def _patch_poll_impl(cls, ioloop):
- """ Wraps original poll object in the IOLoop's poll object
- """
- if ioloop in cls._ioloop_registry:
- return # we already patched this instance
-
- cls._ioloop_registry[ioloop] = idle_map = {}
- # TODO: Add support for Tornado>=5.0.0
- ioloop._impl = cls.PollProxy(ioloop._impl, idle_map)
-
- def __init__(self, ioloop=None):
- if not ioloop:
- from tornado.ioloop import IOLoop
- ioloop = IOLoop.instance()
- self._ioloop = ioloop
- self._patch_poll_impl(ioloop)
- self._pending_alarms = {}
- self._watch_handles = {} # {<watch_handle> : <file_descriptor>}
- self._max_watch_handle = 0
- self._exception = None
-
- def alarm(self, secs, callback):
- ioloop = self._ioloop
- def wrapped():
- try:
- del self._pending_alarms[handle]
- except KeyError:
- pass
- self.handle_exit(callback)()
- handle = ioloop.add_timeout(ioloop.time() + secs, wrapped)
- self._pending_alarms[handle] = 1
- return handle
-
- def remove_alarm(self, handle):
- self._ioloop.remove_timeout(handle)
- try:
- del self._pending_alarms[handle]
- except KeyError:
- return False
- else:
- return True
-
- def watch_file(self, fd, callback):
- from tornado.ioloop import IOLoop
- handler = lambda fd,events: self.handle_exit(callback)()
- self._ioloop.add_handler(fd, handler, IOLoop.READ)
- self._max_watch_handle += 1
- handle = self._max_watch_handle
- self._watch_handles[handle] = fd
- return handle
-
- def remove_watch_file(self, handle):
- fd = self._watch_handles.pop(handle, None)
- if fd is None:
- return False
- else:
- self._ioloop.remove_handler(fd)
- return True
-
- def enter_idle(self, callback):
- self._max_idle_handle += 1
- handle = self._max_idle_handle
- idle_map = self._ioloop_registry[self._ioloop]
- idle_map[handle] = callback
- return handle
-
- def remove_enter_idle(self, handle):
- idle_map = self._ioloop_registry[self._ioloop]
- cb = idle_map.pop(handle, None)
- return cb is not None
-
- def handle_exit(self, func):
- @wraps(func)
- def wrapper(*args, **kw):
- try:
- return func(*args, **kw)
- except ExitMainLoop:
- self._ioloop.stop()
- except Exception as exc:
- self._exception = exc
- self._ioloop.stop()
- return False
- return wrapper
-
- def run(self):
- self._ioloop.start()
- if self._exception:
- exc, self._exception = self._exception, None
- raise exc
-
-
-try:
- from twisted.internet.abstract import FileDescriptor
-except ImportError:
- FileDescriptor = object
-
-
-class TwistedInputDescriptor(FileDescriptor):
- def __init__(self, reactor, fd, cb):
- self._fileno = fd
- self.cb = cb
- super().__init__(reactor)
-
- def fileno(self):
- return self._fileno
-
- def doRead(self):
- return self.cb()
-
-
-class TwistedEventLoop(EventLoop):
- """
- Event loop based on Twisted_
- """
- _idle_emulation_delay = 1.0/256 # a short time (in seconds)
-
- def __init__(self, reactor=None, manage_reactor: bool = True):
- """
- :param reactor: reactor to use
- :type reactor: :class:`twisted.internet.reactor`.
- :param: manage_reactor: `True` if you want this event loop to run
- and stop the reactor.
- :type manage_reactor: boolean
-
- .. WARNING::
- Twisted's reactor doesn't like to be stopped and run again. If you
- need to stop and run your :class:`MainLoop`, consider setting
- ``manage_reactor=False`` and take care of running/stopping the reactor
- at the beginning/ending of your program yourself.
-
- You can also forego using :class:`MainLoop`'s run() entirely, and
- instead call start() and stop() before and after starting the
- reactor.
-
- .. _Twisted: https://twisted.org/
- """
- if reactor is None:
- import twisted.internet.reactor
- reactor = twisted.internet.reactor
- self.reactor = reactor
- self._alarms = []
- self._watch_files = {}
- self._idle_handle = 0
- self._twisted_idle_enabled = False
- self._idle_callbacks = {}
- self._exc_info = None
- self.manage_reactor = manage_reactor
- self._enable_twisted_idle()
-
- def alarm(self, seconds, callback):
- """
- Call callback() a given time from now. No parameters are
- passed to callback.
-
- Returns a handle that may be passed to remove_alarm()
-
- seconds -- floating point time to wait before calling callback
- callback -- function to call from event loop
- """
- handle = self.reactor.callLater(seconds, self.handle_exit(callback))
- return handle
-
- def remove_alarm(self, handle):
- """
- Remove an alarm.
-
- Returns True if the alarm exists, False otherwise
- """
- from twisted.internet.error import AlreadyCalled, AlreadyCancelled
- try:
- handle.cancel()
- return True
- except AlreadyCancelled:
- return False
- except AlreadyCalled:
- return False
-
- def watch_file(self, fd, callback):
- """
- Call callback() when fd has some data to read. No parameters
- are passed to callback.
-
- Returns a handle that may be passed to remove_watch_file()
-
- fd -- file descriptor to watch for input
- callback -- function to call when input is available
- """
- ind = TwistedInputDescriptor(self.reactor, fd, self.handle_exit(callback))
- self._watch_files[fd] = ind
- self.reactor.addReader(ind)
- return fd
-
- def remove_watch_file(self, handle):
- """
- Remove an input file.
-
- Returns True if the input file exists, False otherwise
- """
- if handle in self._watch_files:
- self.reactor.removeReader(self._watch_files[handle])
- del self._watch_files[handle]
- return True
- return False
-
- def enter_idle(self, callback):
- """
- Add a callback for entering idle.
-
- Returns a handle that may be passed to remove_enter_idle()
- """
- self._idle_handle += 1
- self._idle_callbacks[self._idle_handle] = callback
- return self._idle_handle
-
- def _enable_twisted_idle(self):
- """
- Twisted's reactors don't have an idle or enter-idle callback
- so the best we can do for now is to set a timer event in a very
- short time to approximate an enter-idle callback.
-
- .. WARNING::
- This will perform worse than the other event loops until we can find a
- fix or workaround
- """
- if self._twisted_idle_enabled:
- return
- self.reactor.callLater(self._idle_emulation_delay,
- self.handle_exit(self._twisted_idle_callback, enable_idle=False))
- self._twisted_idle_enabled = True
-
- def _twisted_idle_callback(self):
- for callback in self._idle_callbacks.values():
- callback()
- self._twisted_idle_enabled = False
-
- def remove_enter_idle(self, handle):
- """
- Remove an idle callback.
-
- Returns True if the handle was removed.
- """
- try:
- del self._idle_callbacks[handle]
- except KeyError:
- return False
- return True
-
- def run(self):
- """
- Start the event loop. Exit the loop when any callback raises
- an exception. If ExitMainLoop is raised, exit cleanly.
- """
- if not self.manage_reactor:
- return
- self.reactor.run()
- if self._exc_info:
- # An exception caused us to exit, raise it now
- exc_info = self._exc_info
- self._exc_info = None
- reraise(*exc_info)
-
- def handle_exit(self, f, enable_idle: bool = True):
- """
- Decorator that cleanly exits the :class:`TwistedEventLoop` if
- :class:`ExitMainLoop` is thrown inside of the wrapped function. Store the
- exception info if some other exception occurs, it will be reraised after
- the loop quits.
-
- *f* -- function to be wrapped
- """
- def wrapper(*args,**kargs):
- rval = None
- try:
- rval = f(*args,**kargs)
- except ExitMainLoop:
- if self.manage_reactor:
- self.reactor.stop()
- except:
- print(sys.exc_info())
- self._exc_info = sys.exc_info()
- if self.manage_reactor:
- self.reactor.crash()
- if enable_idle:
- self._enable_twisted_idle()
- return rval
- return wrapper
-
-
-class AsyncioEventLoop(EventLoop):
- """
- Event loop based on the standard library ``asyncio`` module.
-
- ``asyncio`` is new in Python 3.4, but also exists as a backport on PyPI for
- Python 3.3. The ``trollius`` package is available for older Pythons with
- slightly different syntax, but also works with this loop.
- """
- _we_started_event_loop = False
-
- _idle_emulation_delay = 1.0/30 # a short time (in seconds)
-
- def __init__(self, **kwargs):
- if 'loop' in kwargs:
- self._loop = kwargs.pop('loop')
- else:
- import asyncio
- self._loop = asyncio.get_event_loop()
-
- def alarm(self, seconds, callback):
- """
- Call callback() a given time from now. No parameters are
- passed to callback.
-
- Returns a handle that may be passed to remove_alarm()
-
- seconds -- time in seconds to wait before calling callback
- callback -- function to call from event loop
- """
- return self._loop.call_later(seconds, callback)
-
- def remove_alarm(self, handle):
- """
- Remove an alarm.
-
- Returns True if the alarm exists, False otherwise
- """
- cancelled = (
- handle.cancelled()
- if getattr(handle, 'cancelled', None)
- else handle._cancelled
- )
- existed = not cancelled
- handle.cancel()
- return existed
-
- def watch_file(self, fd, callback):
- """
- Call callback() when fd has some data to read. No parameters
- are passed to callback.
-
- Returns a handle that may be passed to remove_watch_file()
-
- fd -- file descriptor to watch for input
- callback -- function to call when input is available
- """
- self._loop.add_reader(fd, callback)
- return fd
-
- def remove_watch_file(self, handle):
- """
- Remove an input file.
-
- Returns True if the input file exists, False otherwise
- """
- return self._loop.remove_reader(handle)
-
- def enter_idle(self, callback):
- """
- Add a callback for entering idle.
-
- Returns a handle that may be passed to remove_idle()
- """
- # XXX there's no such thing as "idle" in most event loops; this fakes
- # it the same way as Twisted, by scheduling the callback to be called
- # repeatedly
- mutable_handle = [None]
- def faux_idle_callback():
- callback()
- mutable_handle[0] = self._loop.call_later(
- self._idle_emulation_delay, faux_idle_callback)
-
- mutable_handle[0] = self._loop.call_later(
- self._idle_emulation_delay, faux_idle_callback)
-
- return mutable_handle
-
- def remove_enter_idle(self, handle):
- """
- Remove an idle callback.
-
- Returns True if the handle was removed.
- """
- # `handle` is just a list containing the current actual handle
- return self.remove_alarm(handle[0])
-
- _exc_info = None
-
- def _exception_handler(self, loop, context):
- exc = context.get('exception')
- if exc:
- loop.stop()
- if not isinstance(exc, ExitMainLoop):
- # Store the exc_info so we can re-raise after the loop stops
- self._exc_info = (type(exc), exc, exc.__traceback__)
- else:
- loop.default_exception_handler(context)
-
- def run(self):
- """
- Start the event loop. Exit the loop when any callback raises
- an exception. If ExitMainLoop is raised, exit cleanly.
- """
- self._loop.set_exception_handler(self._exception_handler)
- self._loop.run_forever()
- if self._exc_info:
- exc_info = self._exc_info
- self._exc_info = None
- reraise(*exc_info)
-
-
-# Import Trio's event loop only if we are on Python 3.5 or above (async def is
-# not supported in earlier versions).
-from ._async_kw_event_loop import TrioEventLoop
-
-
-def _refl(name: str, rval=None, exit=False):
- """
- This function is used to test the main loop classes.
-
- >>> scr = _refl("screen")
- >>> scr.function("argument")
- screen.function('argument')
- >>> scr.callme(when="now")
- screen.callme(when='now')
- >>> scr.want_something_rval = 42
- >>> x = scr.want_something()
- screen.want_something()
- >>> x
- 42
-
- """
- class Reflect:
- def __init__(self, name: int, rval=None):
- self._name = name
- self._rval = rval
-
- def __call__(self, *argl, **argd):
- args = ", ".join([repr(a) for a in argl])
- if args and argd:
- args = f"{args}, "
- args = args + ", ".join([f"{k}={repr(v)}" for k,v in argd.items()])
- print(f"{self._name}({args})")
- if exit:
- raise ExitMainLoop()
- return self._rval
-
- def __getattr__(self, attr):
- if attr.endswith("_rval"):
- raise AttributeError()
- #print(self._name+"."+attr)
- if hasattr(self, f"{attr}_rval"):
- return Reflect(f"{self._name}.{attr}", getattr(self, f"{attr}_rval"))
- return Reflect(f"{self._name}.{attr}")
- return Reflect(name)
-
-
-def _test():
- import doctest
- doctest.testmod()
-
-
-if __name__=='__main__':
- _test()
diff --git a/urwid/tests/test_doctests.py b/urwid/tests/test_doctests.py
index cb0526e..48a0188 100644
--- a/urwid/tests/test_doctests.py
+++ b/urwid/tests/test_doctests.py
@@ -13,11 +13,11 @@ def load_tests(loader, tests, ignore):
urwid.wimp,
urwid.decoration,
urwid.display_common,
- urwid.main_loop,
+ urwid.event_loop.main_loop,
urwid.numedit,
urwid.monitored_list,
urwid.raw_display,
- 'urwid.split_repr', # override function with same name
+ 'urwid.split_repr', # override function with same name
urwid.util,
urwid.signals,
urwid.graphics,
diff --git a/urwid/tests/test_event_loops.py b/urwid/tests/test_event_loops.py
index 77dbae1..fac7f72 100644
--- a/urwid/tests/test_event_loops.py
+++ b/urwid/tests/test_event_loops.py
@@ -114,6 +114,8 @@ else:
from tornado.ioloop import IOLoop
self.evl = urwid.TornadoEventLoop(IOLoop())
+ _expected_idle_handle = None
+
try:
import twisted
diff --git a/urwid/util.py b/urwid/util.py
index 7fe2b6c..fa2f907 100644
--- a/urwid/util.py
+++ b/urwid/util.py
@@ -431,7 +431,7 @@ def _tagmarkup_recurse(tm, attr):
return [tm], [(attr, len(tm))]
-def is_mouse_event(ev: str) -> bool:
+def is_mouse_event(ev: tuple[str, int, int, int] | typing.Any) -> bool:
return isinstance(ev, tuple) and len(ev) == 4 and "mouse" in ev[0]