diff options
author | Alexey Stepanov <penguinolog@users.noreply.github.com> | 2023-04-18 17:35:19 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-18 17:35:19 +0200 |
commit | d1710f0983e86dc5ec06efec6e94f3b6d204bbb3 (patch) | |
tree | d4c6681b6d47ea96d9b71155b9b0976e99730584 | |
parent | db10343d8aa937770907a3dcc4456cbeefe16549 (diff) | |
download | urwid-d1710f0983e86dc5ec06efec6e94f3b6d204bbb3.tar.gz |
[BREAKING CHANGE] Refactoring: Split event loop in several modules (#537)
* [BREAKING CHANGE] Refactoring: Split event loop in several modules
* `urwid.main_loop` is split into multiple modules which is easier to maintain
* `urwid.compat` is not used anymore and removed
* `TornadoEventLoop`, `GLibEventLoop`, `TwistedEventLoop` and `TrioEventLoop`
accessible ONLY if required dependencies installed
(like: Tornado installed -> `TornadoEventLoop` is accessible for import)
* `TornadoEventLoop` use the same idle logic as `AsyncioLoop`:
tornado.ioloop.IOLoop is asyncio based.
* Trio < 0.15 is not supported. Version 0.15 was released almost 3 years ago.
* Tornado < 5.0 is not supported. Tornado 5.0 was released 5 years ago.
* Remove useless shebang
* `EventLoop` should be real abstract
* add new module docstrings
* Fix docstrings
* remove unneeded import
---------
Co-authored-by: Aleksei Stepanov <alekseis@nvidia.com>
-rw-r--r-- | docs/manual/mainloop.rst | 4 | ||||
-rw-r--r-- | test_requirements.txt | 2 | ||||
-rw-r--r-- | urwid/__init__.py | 32 | ||||
-rw-r--r-- | urwid/compat.py | 36 | ||||
-rw-r--r-- | urwid/event_loop/__init__.py | 33 | ||||
-rw-r--r-- | urwid/event_loop/abstract_loop.py | 143 | ||||
-rw-r--r-- | urwid/event_loop/asyncio_loop.py | 148 | ||||
-rw-r--r-- | urwid/event_loop/glib_loop.py | 262 | ||||
-rwxr-xr-x | urwid/event_loop/main_loop.py | 686 | ||||
-rw-r--r-- | urwid/event_loop/select_loop.py | 190 | ||||
-rw-r--r-- | urwid/event_loop/tornado_loop.py | 153 | ||||
-rw-r--r-- | urwid/event_loop/trio_loop.py (renamed from urwid/_async_kw_event_loop.py) | 112 | ||||
-rw-r--r-- | urwid/event_loop/twisted_loop.py | 236 | ||||
-rwxr-xr-x | urwid/html_fragment.py | 2 | ||||
-rwxr-xr-x | urwid/main_loop.py | 1589 | ||||
-rw-r--r-- | urwid/tests/test_doctests.py | 4 | ||||
-rw-r--r-- | urwid/tests/test_event_loops.py | 2 | ||||
-rw-r--r-- | urwid/util.py | 2 |
18 files changed, 1935 insertions, 1701 deletions
diff --git a/docs/manual/mainloop.rst b/docs/manual/mainloop.rst index ed19e2b..1d7b4e4 100644 --- a/docs/manual/mainloop.rst +++ b/docs/manual/mainloop.rst @@ -143,9 +143,7 @@ This event loop integrates with Tornado. ``AsyncioEventLoop`` -------------------- -This event loop integrates with the asyncio module in Python 3.4, -the asyncio package available for Python 3.3 or the trollius -package available for Python 2. +This event loop integrates with the asyncio module in Python. :: diff --git a/test_requirements.txt b/test_requirements.txt index d6c52e6..5acdfc8 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -1,4 +1,4 @@ -tornado<5.0.0 +tornado>=5 coverage[toml] twisted trio
\ No newline at end of file diff --git a/urwid/__init__.py b/urwid/__init__.py index e28b9a8..0960e58 100644 --- a/urwid/__init__.py +++ b/urwid/__init__.py @@ -98,7 +98,7 @@ from urwid.graphics import ( scale_bar_values, ) from urwid.listbox import ListBox, ListBoxError, ListWalker, ListWalkerError, SimpleFocusListWalker, SimpleListWalker -from urwid.main_loop import AsyncioEventLoop, ExitMainLoop, GLibEventLoop, MainLoop, SelectEventLoop, TornadoEventLoop +from urwid.event_loop import EventLoop, AsyncioEventLoop, ExitMainLoop, MainLoop, SelectEventLoop from urwid.monitored_list import MonitoredFocusList, MonitoredList from urwid.signals import MetaSignals, Signals, connect_signal, disconnect_signal, emit_signal, register_signal from urwid.version import VERSION, __version__ @@ -141,14 +141,6 @@ from urwid.widget import ( ) from urwid.wimp import Button, CheckBox, CheckBoxError, PopUpLauncher, PopUpTarget, RadioButton, SelectableIcon -try: - from urwid.main_loop import TwistedEventLoop -except ImportError: - pass -try: - from urwid.main_loop import TrioEventLoop -except ImportError: - pass from urwid import raw_display from urwid.display_common import ( BLACK, @@ -197,3 +189,25 @@ from urwid.util import ( within_double_byte, ) from urwid.vterm import TermCanvas, TermCharset, Terminal, TermModes, TermScroller + +# Optional event loops with external dependencies + +try: + from urwid.event_loop import TornadoEventLoop +except ImportError: + pass + +try: + from urwid.event_loop import GLibEventLoop +except ImportError: + pass + +try: + from urwid.event_loop import TwistedEventLoop +except ImportError: + pass + +try: + from urwid.event_loop import TrioEventLoop +except ImportError: + pass diff --git a/urwid/compat.py b/urwid/compat.py deleted file mode 100644 index 8bcbf05..0000000 --- a/urwid/compat.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/python -# -# Urwid python compatibility definitions -# Copyright (C) 2011 Ian Ward -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Urwid web site: https://urwid.org/ - - -def reraise(tp, value, tb=None): - """ - Reraise an exception. - Taken from "six" library (https://pythonhosted.org/six/). - """ - try: - if value is None: - value = tp() - if value.__traceback__ is not tb: - raise value.with_traceback(tb) - raise value - finally: - value = None - tb = None diff --git a/urwid/event_loop/__init__.py b/urwid/event_loop/__init__.py new file mode 100644 index 0000000..b7bccfc --- /dev/null +++ b/urwid/event_loop/__init__.py @@ -0,0 +1,33 @@ +"""Package with EventLoop implementations for urwid.""" + +from __future__ import annotations + +from .abstract_loop import EventLoop, ExitMainLoop +from .asyncio_loop import AsyncioEventLoop +from .main_loop import MainLoop +from .select_loop import SelectEventLoop + +try: + from .twisted_loop import TwistedEventLoop +except ImportError: + pass + +try: + from .tornado_loop import TornadoEventLoop +except ImportError: + pass + +try: + from .glib_loop import GLibEventLoop +except ImportError: + pass + +try: + from .twisted_loop import TwistedEventLoop +except ImportError: + pass + +try: + from .trio_loop import TrioEventLoop +except ImportError: + pass diff --git a/urwid/event_loop/abstract_loop.py b/urwid/event_loop/abstract_loop.py new file mode 100644 index 0000000..7e5c0fb --- /dev/null +++ b/urwid/event_loop/abstract_loop.py @@ -0,0 +1,143 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""Abstract shared code for urwid EventLoop implementation.""" + +from __future__ import annotations + +import abc +import signal +import typing +from collections.abc import Callable + +if typing.TYPE_CHECKING: + from types import FrameType + +__all__ = ("ExitMainLoop", "EventLoop") + + +class ExitMainLoop(Exception): + """ + When this exception is raised within a main loop the main loop + will exit cleanly. + """ + pass + + +class EventLoop(abc.ABC): + """ + Abstract class representing an event loop to be used by :class:`MainLoop`. + """ + + @abc.abstractmethod + def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]) -> typing.Any: + """ + Call callback() a given time from now. No parameters are + passed to callback. + + This method has no default implementation. + + Returns a handle that may be passed to remove_alarm() + + seconds -- floating point time to wait before calling callback + callback -- function to call from event loop + """ + + @abc.abstractmethod + def enter_idle(self, callback): + """ + Add a callback for entering idle. + + This method has no default implementation. + + Returns a handle that may be passed to remove_idle() + """ + + @abc.abstractmethod + def remove_alarm(self, handle) -> bool: + """ + Remove an alarm. + + This method has no default implementation. + + Returns True if the alarm exists, False otherwise + """ + + @abc.abstractmethod + def remove_enter_idle(self, handle) -> bool: + """ + Remove an idle callback. + + This method has no default implementation. + + Returns True if the handle was removed. + """ + + @abc.abstractmethod + def remove_watch_file(self, handle) -> bool: + """ + Remove an input file. + + This method has no default implementation. + + Returns True if the input file exists, False otherwise + """ + + @abc.abstractmethod + def run(self) -> None: + """ + Start the event loop. Exit the loop when any callback raises + an exception. If ExitMainLoop is raised, exit cleanly. + + This method has no default implementation. + """ + + @abc.abstractmethod + def watch_file(self, fd: int, callback: Callable[[], typing.Any]): + """ + Call callback() when fd has some data to read. No parameters + are passed to callback. + + This method has no default implementation. + + Returns a handle that may be passed to remove_watch_file() + + fd -- file descriptor to watch for input + callback -- function to call when input is available + """ + + def set_signal_handler( + self, + signum: int, + handler: Callable[[int, FrameType | None], typing.Any] | int | signal.Handlers, + ) -> Callable[[int, FrameType | None], typing.Any] | int | signal.Handlers | None: + """ + Sets the signal handler for signal signum. + + The default implementation of :meth:`set_signal_handler` + is simply a proxy function that calls :func:`signal.signal()` + and returns the resulting value. + + signum -- signal number + handler -- function (taking signum as its single argument), + or `signal.SIG_IGN`, or `signal.SIG_DFL` + """ + return signal.signal(signum, handler) diff --git a/urwid/event_loop/asyncio_loop.py b/urwid/event_loop/asyncio_loop.py new file mode 100644 index 0000000..e8f1522 --- /dev/null +++ b/urwid/event_loop/asyncio_loop.py @@ -0,0 +1,148 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""Asyncio based urwid EventLoop implementation.""" + +from __future__ import annotations + +import asyncio +import typing +from collections.abc import Callable + +from .abstract_loop import EventLoop, ExitMainLoop + +__all__ = ("AsyncioEventLoop",) + + +class AsyncioEventLoop(EventLoop): + """ + Event loop based on the standard library ``asyncio`` module. + + """ + _we_started_event_loop = False + + _idle_emulation_delay = 1.0/30 # a short time (in seconds) + + def __init__(self, *, loop: asyncio.AbstractEventLoop | None = None, **kwargs) -> None: + if loop: + self._loop: asyncio.AbstractEventLoop = loop + else: + self._loop = asyncio.get_event_loop() + + self._exc: BaseException | None = None + + def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]) -> asyncio.TimerHandle: + """ + Call callback() a given time from now. No parameters are + passed to callback. + + Returns a handle that may be passed to remove_alarm() + + seconds -- time in seconds to wait before calling callback + callback -- function to call from event loop + """ + return self._loop.call_later(seconds, callback) + + def remove_alarm(self, handle) -> bool: + """ + Remove an alarm. + + Returns True if the alarm exists, False otherwise + """ + cancelled = ( + handle.cancelled() + if getattr(handle, 'cancelled', None) + else handle._cancelled + ) + existed = not cancelled + handle.cancel() + return existed + + def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int: + """ + Call callback() when fd has some data to read. No parameters + are passed to callback. + + Returns a handle that may be passed to remove_watch_file() + + fd -- file descriptor to watch for input + callback -- function to call when input is available + """ + self._loop.add_reader(fd, callback) + return fd + + def remove_watch_file(self, handle: int) -> bool: + """ + Remove an input file. + + Returns True if the input file exists, False otherwise + """ + return self._loop.remove_reader(handle) + + def enter_idle(self, callback: Callable[[], typing.Any]) -> list[asyncio.TimerHandle]: + """ + Add a callback for entering idle. + + Returns a handle that may be passed to remove_idle() + """ + # XXX there's no such thing as "idle" in most event loops; this fakes + # it the same way as Twisted, by scheduling the callback to be called + # repeatedly + mutable_handle: list[asyncio.TimerHandle] = [] + + def faux_idle_callback(): + callback() + mutable_handle[0] = self._loop.call_later(self._idle_emulation_delay, faux_idle_callback) + + mutable_handle.append(self._loop.call_later(self._idle_emulation_delay, faux_idle_callback)) + + return mutable_handle + + def remove_enter_idle(self, handle) -> bool: + """ + Remove an idle callback. + + Returns True if the handle was removed. + """ + # `handle` is just a list containing the current actual handle + return self.remove_alarm(handle[0]) + + def _exception_handler(self, loop: asyncio.AbstractEventLoop, context): + exc = context.get('exception') + if exc: + loop.stop() + if not isinstance(exc, ExitMainLoop): + # Store the exc_info so we can re-raise after the loop stops + self._exc = exc + else: + loop.default_exception_handler(context) + + def run(self) -> None: + """ + Start the event loop. Exit the loop when any callback raises + an exception. If ExitMainLoop is raised, exit cleanly. + """ + self._loop.set_exception_handler(self._exception_handler) + self._loop.run_forever() + if self._exc: + exc = self._exc + self._exc = None + raise exc.with_traceback(exc.__traceback__) diff --git a/urwid/event_loop/glib_loop.py b/urwid/event_loop/glib_loop.py new file mode 100644 index 0000000..de9de91 --- /dev/null +++ b/urwid/event_loop/glib_loop.py @@ -0,0 +1,262 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""GLib based urwid EventLoop implementation. + +PyGObject library is required. +""" + +from __future__ import annotations + +import functools +import signal +import typing +from collections.abc import Callable + +from gi.repository import GLib + +from .abstract_loop import EventLoop, ExitMainLoop + +if typing.TYPE_CHECKING: + from types import FrameType + + from typing_extensions import Literal, ParamSpec + _Spec = ParamSpec("_Spec") + _T = typing.TypeVar("_T") + +__all__ = ("GLibEventLoop",) + + +def _ignore_handler(_sig: int, _frame: FrameType | None = None) -> None: + return None + + +class GLibEventLoop(EventLoop): + """ + Event loop based on GLib.MainLoop + """ + + def __init__(self) -> None: + self._alarms: list[int] = [] + self._watch_files: dict[int, int] = {} + self._idle_handle: int = 0 + self._glib_idle_enabled = False # have we called glib.idle_add? + self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {} + self._loop = GLib.MainLoop() + self._exc: BaseException | None = None + self._enable_glib_idle() + self._signal_handlers: dict[int, int] = {} + + def alarm( + self, + seconds: float | int, + callback: Callable[[], typing.Any], + ) -> tuple[int, Callable[[], typing.Any]]: + """ + Call callback() a given time from now. No parameters are + passed to callback. + + Returns a handle that may be passed to remove_alarm() + + seconds -- floating point time to wait before calling callback + callback -- function to call from event loop + """ + @self.handle_exit + def ret_false() -> Literal[False]: + callback() + self._enable_glib_idle() + return False + + fd = GLib.timeout_add(int(seconds*1000), ret_false) + self._alarms.append(fd) + return (fd, callback) + + def set_signal_handler( + self, + signum: int, + handler: Callable[[int, FrameType | None], typing.Any] | int | signal.Handlers, + ) -> None: + """ + Sets the signal handler for signal signum. + + .. WARNING:: + Because this method uses the `GLib`-specific `unix_signal_add` + function, its behaviour is different than `signal.signal().` + + If `signum` is not `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGUSR1`, + `SIGUSR2` or `SIGWINCH`, this method performs no actions and + immediately returns None. + + Returns None in all cases (unlike :func:`signal.signal()`). + .. + + signum -- signal number + handler -- function (taking signum as its single argument), + or `signal.SIG_IGN`, or `signal.SIG_DFL` + """ + glib_signals = [ + signal.SIGHUP, + signal.SIGINT, + signal.SIGTERM, + signal.SIGUSR1, + signal.SIGUSR2, + ] + + # GLib supports SIGWINCH as of version 2.54. + if not GLib.check_version(2, 54, 0): + glib_signals.append(signal.SIGWINCH) + + if signum not in glib_signals: + # The GLib event loop supports only the signals listed above + return + + if signum in self._signal_handlers: + GLib.source_remove(self._signal_handlers.pop(signum)) + + if handler == signal.Handlers.SIG_IGN: + handler = _ignore_handler + elif handler == signal.Handlers.SIG_DFL: + return + + def final_handler(signal_number: int): + # MyPy False-negative: signal.Handlers casted + handler(signal_number, None) # type: ignore[operator] + return GLib.SOURCE_CONTINUE + + source = GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signum, final_handler, signum) + self._signal_handlers[signum] = source + + def remove_alarm(self, handle) -> bool: + """ + Remove an alarm. + + Returns True if the alarm exists, False otherwise + """ + try: + self._alarms.remove(handle[0]) + GLib.source_remove(handle[0]) + return True + except ValueError: + return False + + def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int: + """ + Call callback() when fd has some data to read. No parameters + are passed to callback. + + Returns a handle that may be passed to remove_watch_file() + + fd -- file descriptor to watch for input + callback -- function to call when input is available + """ + @self.handle_exit + def io_callback(source, cb_condition) -> Literal[True]: + callback() + self._enable_glib_idle() + return True + + self._watch_files[fd] = GLib.io_add_watch(fd, GLib.IO_IN, io_callback) + return fd + + def remove_watch_file(self, handle: int) -> bool: + """ + Remove an input file. + + Returns True if the input file exists, False otherwise + """ + if handle in self._watch_files: + GLib.source_remove(self._watch_files[handle]) + del self._watch_files[handle] + return True + return False + + def enter_idle(self, callback: Callable[[], typing.Any]) -> int: + """ + Add a callback for entering idle. + + Returns a handle that may be passed to remove_enter_idle() + """ + self._idle_handle += 1 + self._idle_callbacks[self._idle_handle] = callback + return self._idle_handle + + def _enable_glib_idle(self) -> None: + if self._glib_idle_enabled: + return + GLib.idle_add(self._glib_idle_callback) + self._glib_idle_enabled = True + + def _glib_idle_callback(self): + for callback in self._idle_callbacks.values(): + callback() + self._glib_idle_enabled = False + return False # ask glib not to call again (or we would be called + + def remove_enter_idle(self, handle) -> bool: + """ + Remove an idle callback. + + Returns True if the handle was removed. + """ + try: + del self._idle_callbacks[handle] + except KeyError: + return False + return True + + def run(self) -> None: + """ + Start the event loop. Exit the loop when any callback raises + an exception. If ExitMainLoop is raised, exit cleanly. + """ + try: + self._loop.run() + finally: + if self._loop.is_running(): + self._loop.quit() + if self._exc: + # An exception caused us to exit, raise it now + exc = self._exc + self._exc = None + raise exc.with_traceback(exc.__traceback__) + + def handle_exit(self, f: Callable[_Spec, _T]) -> Callable[_Spec, _T | Literal[False]]: + """ + Decorator that cleanly exits the :class:`GLibEventLoop` if + :exc:`ExitMainLoop` is thrown inside of the wrapped function. Store the + exception info if some other exception occurs, it will be reraised after + the loop quits. + + *f* -- function to be wrapped + """ + + @functools.wraps(f) + def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | Literal[False]: + try: + return f(*args, **kwargs) + except ExitMainLoop: + self._loop.quit() + except BaseException as exc: + self._exc = exc + if self._loop.is_running(): + self._loop.quit() + return False + return wrapper diff --git a/urwid/event_loop/main_loop.py b/urwid/event_loop/main_loop.py new file mode 100755 index 0000000..cd6a270 --- /dev/null +++ b/urwid/event_loop/main_loop.py @@ -0,0 +1,686 @@ +#!/usr/bin/python +# +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + + +from __future__ import annotations + +import heapq +import os +import time +import typing +import warnings +from collections.abc import Callable, Iterable + +from urwid import raw_display, signals +from urwid.command_map import REDRAW_SCREEN, command_map +from urwid.display_common import INPUT_DESCRIPTORS_CHANGED +from urwid.util import StoppingContext, is_mouse_event +from urwid.wimp import PopUpTarget + +from .abstract_loop import ExitMainLoop +from .select_loop import SelectEventLoop + +if typing.TYPE_CHECKING: + from urwid.display_common import BaseScreen + from urwid.widget import Widget + + +try: + import fcntl +except ImportError: + pass # windows + +PIPE_BUFFER_READ_SIZE = 4096 # can expect this much on Linux, so try for that + +__all__ = ("CantUseExternalLoop", "MainLoop") + + +class CantUseExternalLoop(Exception): + pass + + +class MainLoop: + """ + This is the standard main loop implementation for a single interactive + session. + + :param widget: the topmost widget used for painting the screen, stored as + :attr:`widget` and may be modified. Must be a box widget. + :type widget: widget instance + + :param palette: initial palette for screen + :type palette: iterable of palette entries + + :param screen: screen to use, default is a new :class:`raw_display.Screen` + instance; stored as :attr:`screen` + :type screen: display module screen instance + + :param handle_mouse: ``True`` to ask :attr:`.screen` to process mouse events + :type handle_mouse: bool + + :param input_filter: a function to filter input before sending it to + :attr:`.widget`, called from :meth:`.input_filter` + :type input_filter: callable + + :param unhandled_input: a function called when input is not handled by + :attr:`.widget`, called from :meth:`.unhandled_input` + :type unhandled_input: callable + + :param event_loop: if :attr:`.screen` supports external an event loop it may be + given here, default is a new :class:`SelectEventLoop` instance; + stored as :attr:`.event_loop` + :type event_loop: event loop instance + + :param pop_ups: `True` to wrap :attr:`.widget` with a :class:`PopUpTarget` + instance to allow any widget to open a pop-up anywhere on the screen + :type pop_ups: boolean + + + .. attribute:: screen + + The screen object this main loop uses for screen updates and reading input + + .. attribute:: event_loop + + The event loop object this main loop uses for waiting on alarms and IO + """ + + def __init__( + self, + widget: Widget, + palette=(), + screen: BaseScreen | None = None, + handle_mouse: bool = True, + input_filter: Callable[[list[str], list[int]], list[str]] | None = None, + unhandled_input: Callable[[str | tuple[str, int, int, int]], bool] | None = None, + event_loop=None, + pop_ups: bool = False, + ): + self._widget = widget + self.handle_mouse = handle_mouse + self.pop_ups = pop_ups # triggers property setting side-effect + + if not screen: + screen = raw_display.Screen() + + if palette: + screen.register_palette(palette) + + self.screen = screen + self.screen_size = None + + self._unhandled_input = unhandled_input + self._input_filter = input_filter + + if not hasattr(screen, 'hook_event_loop') and event_loop is not None: + raise NotImplementedError(f"screen object passed {screen!r} does not support external event loops") + if event_loop is None: + event_loop = SelectEventLoop() + self.event_loop = event_loop + + if hasattr(self.screen, 'signal_handler_setter'): + # Tell the screen what function it must use to set + # signal handlers + self.screen.signal_handler_setter = self.event_loop.set_signal_handler + + self._watch_pipes: dict[int, tuple[Callable[[], typing.Any], int]] = {} + + @property + def widget(self) -> Widget: + """ + Property for the topmost widget used to draw the screen. + This must be a box widget. + """ + return self._widget + + @widget.setter + def widget(self, widget: Widget) -> None: + self._widget = widget + if self.pop_ups: + self._topmost_widget.original_widget = self._widget + else: + self._topmost_widget = self._widget + + def _set_widget(self, widget: Widget) -> None: + warnings.warn( + f"method `{self.__class__.__name__}._set_widget` is deprecated, " + f"please use `{self.__class__.__name__}.widget` property", + DeprecationWarning, + stacklevel=2, + ) + self.widget = widget + + @property + def pop_ups(self): + return self._pop_ups + + @pop_ups.setter + def pop_ups(self, pop_ups) -> None: + self._pop_ups = pop_ups + if pop_ups: + self._topmost_widget = PopUpTarget(self._widget) + else: + self._topmost_widget = self._widget + + def _set_pop_ups(self, pop_ups) -> None: + warnings.warn( + f"method `{self.__class__.__name__}._set_pop_ups` is deprecated, " + f"please use `{self.__class__.__name__}.pop_ups` property", + DeprecationWarning, + stacklevel=2, + ) + self.pop_ups = pop_ups + + def set_alarm_in(self, sec, callback, user_data=None): + """ + Schedule an alarm in *sec* seconds that will call *callback* from the + within the :meth:`run` method. + + :param sec: seconds until alarm + :type sec: float + :param callback: function to call with two parameters: this main loop + object and *user_data* + :type callback: callable + """ + def cb(): + callback(self, user_data) + return self.event_loop.alarm(sec, cb) + + def set_alarm_at(self, tm, callback, user_data=None): + """ + Schedule an alarm at *tm* time that will call *callback* from the + within the :meth:`run` function. Returns a handle that may be passed to + :meth:`remove_alarm`. + + :param tm: time to call callback e.g. ``time.time() + 5`` + :type tm: float + :param callback: function to call with two parameters: this main loop + object and *user_data* + :type callback: callable + """ + def cb(): + callback(self, user_data) + return self.event_loop.alarm(tm - time.time(), cb) + + def remove_alarm(self, handle): + """ + Remove an alarm. Return ``True`` if *handle* was found, ``False`` + otherwise. + """ + return self.event_loop.remove_alarm(handle) + + def watch_pipe(self, callback): + """ + Create a pipe for use by a subprocess or thread to trigger a callback + in the process/thread running the main loop. + + :param callback: function taking one parameter to call from within + the process/thread running the main loop + :type callback: callable + + This method returns a file descriptor attached to the write end of a + pipe. The read end of the pipe is added to the list of files + :attr:`event_loop` is watching. When data is written to the pipe the + callback function will be called and passed a single value containing + data read from the pipe. + + This method may be used any time you want to update widgets from + another thread or subprocess. + + Data may be written to the returned file descriptor with + ``os.write(fd, data)``. Ensure that data is less than 512 bytes (or 4K + on Linux) so that the callback will be triggered just once with the + complete value of data passed in. + + If the callback returns ``False`` then the watch will be removed from + :attr:`event_loop` and the read end of the pipe will be closed. You + are responsible for closing the write end of the pipe with + ``os.close(fd)``. + """ + pipe_rd, pipe_wr = os.pipe() + fcntl.fcntl(pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK) + watch_handle = None + + def cb() -> None: + data = os.read(pipe_rd, PIPE_BUFFER_READ_SIZE) + rval = callback(data) + if rval is False: + self.event_loop.remove_watch_file(watch_handle) + os.close(pipe_rd) + + watch_handle = self.event_loop.watch_file(pipe_rd, cb) + self._watch_pipes[pipe_wr] = (watch_handle, pipe_rd) + return pipe_wr + + def remove_watch_pipe(self, write_fd): + """ + Close the read end of the pipe and remove the watch created by + :meth:`watch_pipe`. You are responsible for closing the write end of + the pipe. + + Returns ``True`` if the watch pipe exists, ``False`` otherwise + """ + try: + watch_handle, pipe_rd = self._watch_pipes.pop(write_fd) + except KeyError: + return False + + if not self.event_loop.remove_watch_file(watch_handle): + return False + os.close(pipe_rd) + return True + + def watch_file(self, fd, callback): + """ + Call *callback* when *fd* has some data to read. No parameters are + passed to callback. + + Returns a handle that may be passed to :meth:`remove_watch_file`. + """ + return self.event_loop.watch_file(fd, callback) + + def remove_watch_file(self, handle): + """ + Remove a watch file. Returns ``True`` if the watch file + exists, ``False`` otherwise. + """ + return self.event_loop.remove_watch_file(handle) + + + def run(self): + """ + Start the main loop handling input events and updating the screen. The + loop will continue until an :exc:`ExitMainLoop` exception is raised. + + If you would prefer to manage the event loop yourself, don't use this + method. Instead, call :meth:`start` before starting the event loop, + and :meth:`stop` once it's finished. + """ + try: + self._run() + except ExitMainLoop: + pass + + def _test_run(self): + """ + >>> w = _refl("widget") # _refl prints out function calls + >>> w.render_rval = "fake canvas" # *_rval is used for return values + >>> scr = _refl("screen") + >>> scr.get_input_descriptors_rval = [42] + >>> scr.get_cols_rows_rval = (20, 10) + >>> scr.started = True + >>> scr._urwid_signals = {} + >>> evl = _refl("event_loop") + >>> evl.enter_idle_rval = 1 + >>> evl.watch_file_rval = 2 + >>> ml = MainLoop(w, [], scr, event_loop=evl) + >>> ml.run() # doctest:+ELLIPSIS + screen.start() + screen.set_mouse_tracking() + screen.unhook_event_loop(...) + screen.hook_event_loop(...) + event_loop.enter_idle(<bound method MainLoop.entering_idle...>) + event_loop.run() + event_loop.remove_enter_idle(1) + screen.unhook_event_loop(...) + screen.stop() + >>> ml.draw_screen() # doctest:+ELLIPSIS + screen.get_cols_rows() + widget.render((20, 10), focus=True) + screen.draw_screen((20, 10), 'fake canvas') + """ + + def start(self): + """ + Sets up the main loop, hooking into the event loop where necessary. + Starts the :attr:`screen` if it hasn't already been started. + + If you want to control starting and stopping the event loop yourself, + you should call this method before starting, and call `stop` once the + loop has finished. You may also use this method as a context manager, + which will stop the loop automatically at the end of the block: + + with main_loop.start(): + ... + + Note that some event loop implementations don't handle exceptions + specially if you manage the event loop yourself. In particular, the + Twisted and asyncio loops won't stop automatically when + :exc:`ExitMainLoop` (or anything else) is raised. + """ + + self.screen.start() + + if self.handle_mouse: + self.screen.set_mouse_tracking() + + if not hasattr(self.screen, 'hook_event_loop'): + raise CantUseExternalLoop("Screen {0!r} doesn't support external event loops") + + try: + signals.connect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED, self._reset_input_descriptors) + except NameError: + pass + # watch our input descriptors + self._reset_input_descriptors() + self.idle_handle = self.event_loop.enter_idle(self.entering_idle) + + # the screen is redrawn automatically after input and alarms, + # however, there can be none of those at the start, + # so draw the initial screen here unconditionally + self.event_loop.alarm(0, self.entering_idle) + + return StoppingContext(self) + + def stop(self): + """ + Cleans up any hooks added to the event loop. Only call this if you're + managing the event loop yourself, after the loop stops. + """ + + self.event_loop.remove_enter_idle(self.idle_handle) + del self.idle_handle + signals.disconnect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED, + self._reset_input_descriptors) + self.screen.unhook_event_loop(self.event_loop) + + self.screen.stop() + + def _reset_input_descriptors(self): + self.screen.unhook_event_loop(self.event_loop) + self.screen.hook_event_loop(self.event_loop, self._update) + + def _run(self): + try: + self.start() + except CantUseExternalLoop: + try: + return self._run_screen_event_loop() + finally: + self.screen.stop() + + try: + self.event_loop.run() + except: + self.screen.stop() # clean up screen control + raise + self.stop() + + def _update(self, keys: list[str], raw: list[int]): + """ + >>> w = _refl("widget") + >>> w.selectable_rval = True + >>> w.mouse_event_rval = True + >>> scr = _refl("screen") + >>> scr.get_cols_rows_rval = (15, 5) + >>> evl = _refl("event_loop") + >>> ml = MainLoop(w, [], scr, event_loop=evl) + >>> ml._input_timeout = "old timeout" + >>> ml._update(['y'], [121]) # doctest:+ELLIPSIS + screen.get_cols_rows() + widget.selectable() + widget.keypress((15, 5), 'y') + >>> ml._update([("mouse press", 1, 5, 4)], []) + widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True) + >>> ml._update([], []) + """ + keys = self.input_filter(keys, raw) + + if keys: + self.process_input(keys) + if 'window resize' in keys: + self.screen_size = None + + def _run_screen_event_loop(self) -> None: + """ + This method is used when the screen does not support using + external event loops. + + The alarms stored in the SelectEventLoop in :attr:`event_loop` + are modified by this method. + """ + next_alarm = None + + while True: + self.draw_screen() + + if not next_alarm and self.event_loop._alarms: + next_alarm = heapq.heappop(self.event_loop._alarms) + + keys: list[str] = [] + raw: list[int] = [] + while not keys: + if next_alarm: + sec = max(0, next_alarm[0] - time.time()) + self.screen.set_input_timeouts(sec) + else: + self.screen.set_input_timeouts(None) + keys, raw = self.screen.get_input(True) + if not keys and next_alarm: + sec = next_alarm[0] - time.time() + if sec <= 0: + break + + keys = self.input_filter(keys, raw) + + if keys: + self.process_input(keys) + + while next_alarm: + sec = next_alarm[0] - time.time() + if sec > 0: + break + tm, tie_break, callback = next_alarm + callback() + + if self.event_loop._alarms: + next_alarm = heapq.heappop(self.event_loop._alarms) + else: + next_alarm = None + + if 'window resize' in keys: + self.screen_size = None + + def _test_run_screen_event_loop(self): + """ + >>> w = _refl("widget") + >>> scr = _refl("screen") + >>> scr.get_cols_rows_rval = (10, 5) + >>> scr.get_input_rval = [], [] + >>> ml = MainLoop(w, screen=scr) + >>> def stop_now(loop, data): + ... raise ExitMainLoop() + >>> handle = ml.set_alarm_in(0, stop_now) + >>> try: + ... ml._run_screen_event_loop() + ... except ExitMainLoop: + ... pass + screen.get_cols_rows() + widget.render((10, 5), focus=True) + screen.draw_screen((10, 5), None) + screen.set_input_timeouts(0) + screen.get_input(True) + """ + + def process_input(self, keys: Iterable[str | tuple[str, int, int, int]]) -> bool: + """ + This method will pass keyboard input and mouse events to :attr:`widget`. + This method is called automatically from the :meth:`run` method when + there is input, but may also be called to simulate input from the user. + + *keys* is a list of input returned from :attr:`screen`'s get_input() + or get_input_nonblocking() methods. + + Returns ``True`` if any key was handled by a widget or the + :meth:`unhandled_input` method. + """ + if not self.screen_size: + self.screen_size = self.screen.get_cols_rows() + + something_handled = False + + for k in keys: + if k == 'window resize': + continue + + if isinstance(k, str): + if self._topmost_widget.selectable(): + k = self._topmost_widget.keypress(self.screen_size, k) + + elif isinstance(k, tuple): + if is_mouse_event(k): + event, button, col, row = k + if hasattr(self._topmost_widget, "mouse_event"): + if self._topmost_widget.mouse_event(self.screen_size, event, button, col, row, focus=True): + k = None + + else: + raise TypeError(f"{k!r} is not str | tuple[str, int, int, int]") + + if k: + if command_map[k] == REDRAW_SCREEN: + self.screen.clear() + something_handled = True + else: + something_handled |= bool(self.unhandled_input(k)) + else: + something_handled = True + + return something_handled + + def _test_process_input(self): + """ + >>> w = _refl("widget") + >>> w.selectable_rval = True + >>> scr = _refl("screen") + >>> scr.get_cols_rows_rval = (10, 5) + >>> ml = MainLoop(w, [], scr) + >>> ml.process_input(['enter', ('mouse drag', 1, 14, 20)]) + screen.get_cols_rows() + widget.selectable() + widget.keypress((10, 5), 'enter') + widget.mouse_event((10, 5), 'mouse drag', 1, 14, 20, focus=True) + True + """ + + def input_filter(self, keys: list[str], raw: list[int]) -> list[str]: + """ + This function is passed each all the input events and raw keystroke + values. These values are passed to the *input_filter* function + passed to the constructor. That function must return a list of keys to + be passed to the widgets to handle. If no *input_filter* was + defined this implementation will return all the input events. + """ + if self._input_filter: + return self._input_filter(keys, raw) + return keys + + def unhandled_input(self, input: str | tuple[str, int, int, int]) -> bool: + """ + This function is called with any input that was not handled by the + widgets, and calls the *unhandled_input* function passed to the + constructor. If no *unhandled_input* was defined then the input + will be ignored. + + *input* is the keyboard or mouse input. + + The *unhandled_input* function should return ``True`` if it handled + the input. + """ + if self._unhandled_input: + return self._unhandled_input(input) + return False + + def entering_idle(self): + """ + This method is called whenever the event loop is about to enter the + idle state. :meth:`draw_screen` is called here to update the + screen when anything has changed. + """ + if self.screen.started: + self.draw_screen() + + def draw_screen(self): + """ + Render the widgets and paint the screen. This method is called + automatically from :meth:`entering_idle`. + + If you modify the widgets displayed outside of handling input or + responding to an alarm you will need to call this method yourself + to repaint the screen. + """ + if not self.screen_size: + self.screen_size = self.screen.get_cols_rows() + + canvas = self._topmost_widget.render(self.screen_size, focus=True) + self.screen.draw_screen(self.screen_size, canvas) + + +def _refl(name: str, rval=None, exit=False): + """ + This function is used to test the main loop classes. + + >>> scr = _refl("screen") + >>> scr.function("argument") + screen.function('argument') + >>> scr.callme(when="now") + screen.callme(when='now') + >>> scr.want_something_rval = 42 + >>> x = scr.want_something() + screen.want_something() + >>> x + 42 + + """ + class Reflect: + def __init__(self, name: str, rval=None): + self._name = name + self._rval = rval + + def __call__(self, *argl, **argd): + args = ", ".join([repr(a) for a in argl]) + if args and argd: + args = f"{args}, " + args += ", ".join([f"{k}={repr(v)}" for k, v in argd.items()]) + print(f"{self._name}({args})") + if exit: + raise ExitMainLoop() + return self._rval + + def __getattr__(self, attr): + if attr.endswith("_rval"): + raise AttributeError() + #print(self._name+"."+attr) + if hasattr(self, f"{attr}_rval"): + return Reflect(f"{self._name}.{attr}", getattr(self, f"{attr}_rval")) + return Reflect(f"{self._name}.{attr}") + return Reflect(name) + + +def _test(): + import doctest + doctest.testmod() + + +if __name__ == '__main__': + _test() diff --git a/urwid/event_loop/select_loop.py b/urwid/event_loop/select_loop.py new file mode 100644 index 0000000..6f2edd6 --- /dev/null +++ b/urwid/event_loop/select_loop.py @@ -0,0 +1,190 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""Select based urwid EventLoop implementation.""" + +from __future__ import annotations + +import heapq +import select +import time +import typing +from collections.abc import Callable, Iterator +from itertools import count + +from .abstract_loop import EventLoop, ExitMainLoop + +if typing.TYPE_CHECKING: + from typing_extensions import Literal + +__all__ = ("SelectEventLoop",) + + +class SelectEventLoop(EventLoop): + """ + Event loop based on :func:`select.select` + """ + + def __init__(self) -> None: + self._alarms: list[tuple[float, int, Callable[[], typing.Any]]] = [] + self._watch_files: dict[int, Callable[[], typing.Any]] = {} + self._idle_handle: int = 0 + self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {} + self._tie_break: Iterator[int] = count() + self._did_something: bool = False + + def alarm( + self, + seconds: float | int, + callback: Callable[[], typing.Any], + ) -> tuple[float, int, Callable[[], typing.Any]]: + """ + Call callback() a given time from now. No parameters are + passed to callback. + + Returns a handle that may be passed to remove_alarm() + + seconds -- floating point time to wait before calling callback + callback -- function to call from event loop + """ + tm = time.time() + seconds + handle = (tm, next(self._tie_break), callback) + heapq.heappush(self._alarms, handle) + return handle + + def remove_alarm(self, handle: tuple[float, int, Callable[[], typing.Any]]) -> bool: + """ + Remove an alarm. + + Returns True if the alarm exists, False otherwise + """ + try: + self._alarms.remove(handle) + heapq.heapify(self._alarms) + return True + except ValueError: + return False + + def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int: + """ + Call callback() when fd has some data to read. No parameters + are passed to callback. + + Returns a handle that may be passed to remove_watch_file() + + fd -- file descriptor to watch for input + callback -- function to call when input is available + """ + self._watch_files[fd] = callback + return fd + + def remove_watch_file(self, handle: int) -> bool: + """ + Remove an input file. + + Returns True if the input file exists, False otherwise + """ + if handle in self._watch_files: + del self._watch_files[handle] + return True + return False + + def enter_idle(self, callback: Callable[[], typing.Any]) -> int: + """ + Add a callback for entering idle. + + Returns a handle that may be passed to remove_idle() + """ + self._idle_handle += 1 + self._idle_callbacks[self._idle_handle] = callback + return self._idle_handle + + def remove_enter_idle(self, handle: int) -> bool: + """ + Remove an idle callback. + + Returns True if the handle was removed. + """ + try: + del self._idle_callbacks[handle] + except KeyError: + return False + return True + + def _entering_idle(self) -> None: + """ + Call all the registered idle callbacks. + """ + for callback in self._idle_callbacks.values(): + callback() + + def run(self) -> None: + """ + Start the event loop. Exit the loop when any callback raises + an exception. If ExitMainLoop is raised, exit cleanly. + """ + try: + self._did_something = True + while True: + try: + self._loop() + except InterruptedError: + pass + except ExitMainLoop: + pass + + def _loop(self) -> None: + """ + A single iteration of the event loop + """ + fds = list(self._watch_files) + if self._alarms or self._did_something: + timeout = 0. + tm: float | Literal['idle'] | None = None + + if self._alarms: + timeout_ = self._alarms[0][0] + tm = timeout_ + timeout = max(timeout, timeout_ - time.time()) + + if self._did_something and (not self._alarms or (self._alarms and timeout > 0)): + timeout = 0. + tm = 'idle' + + ready, w, err = select.select(fds, [], fds, timeout) + + else: + tm = None + ready, w, err = select.select(fds, [], fds) + + if not ready: + if tm == 'idle': + self._entering_idle() + self._did_something = False + elif tm is not None: + # must have been a timeout + tm, tie_break, alarm_callback = heapq.heappop(self._alarms) + alarm_callback() + self._did_something = True + + for fd in ready: + self._watch_files[fd]() + self._did_something = True diff --git a/urwid/event_loop/tornado_loop.py b/urwid/event_loop/tornado_loop.py new file mode 100644 index 0000000..72f4635 --- /dev/null +++ b/urwid/event_loop/tornado_loop.py @@ -0,0 +1,153 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""Tornado IOLoop based urwid EventLoop implementation. + +Tornado library is required. +""" + +from __future__ import annotations + +import functools +import typing +from collections.abc import Callable + +from tornado import ioloop + +from .abstract_loop import EventLoop, ExitMainLoop + +if typing.TYPE_CHECKING: + + from typing_extensions import Literal, ParamSpec + _Spec = ParamSpec("_Spec") + _T = typing.TypeVar("_T") + +__all__ = ("TornadoEventLoop",) + + +class TornadoEventLoop(EventLoop): + """ This is an Urwid-specific event loop to plug into its MainLoop. + It acts as an adaptor for Tornado's IOLoop which does all + heavy lifting except idle-callbacks. + + Notice, since Tornado has no concept of idle callbacks we + monkey patch ioloop._impl.poll() function to be able to detect + potential idle periods. + """ + _idle_emulation_delay = 1.0 / 30 # a short time (in seconds) + + def __init__(self, loop: ioloop.IOLoop | None = None) -> None: + if loop: + self._loop: ioloop.IOLoop = loop + else: + self._loop = ioloop.IOLoop.instance() + + self._pending_alarms: dict[object, int] = {} + self._watch_handles: dict[int, int] = {} # {<watch_handle> : <file_descriptor>} + self._max_watch_handle: int = 0 + self._exc: BaseException | None = None + + def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]): + def wrapped() -> None: + try: + del self._pending_alarms[handle] + except KeyError: + pass + self.handle_exit(callback)() + + handle = self._loop.add_timeout(self._loop.time() + seconds, wrapped) + self._pending_alarms[handle] = 1 + return handle + + def remove_alarm(self, handle) -> bool: + self._loop.remove_timeout(handle) + try: + del self._pending_alarms[handle] + except KeyError: + return False + else: + return True + + def watch_file(self, fd: int, callback: Callable[[], _T]) -> int: + def handler(_fd: int, _events: int) -> None: + self.handle_exit(callback)() + + self._loop.add_handler(fd, handler, ioloop.IOLoop.READ) + self._max_watch_handle += 1 + handle = self._max_watch_handle + self._watch_handles[handle] = fd + return handle + + def remove_watch_file(self, handle: int) -> bool: + fd = self._watch_handles.pop(handle, None) + if fd is None: + return False + else: + self._loop.remove_handler(fd) + return True + + def enter_idle(self, callback: Callable[[], typing.Any]) -> list[object]: + """ + Add a callback for entering idle. + + Returns a handle that may be passed to remove_idle() + """ + # XXX there's no such thing as "idle" in most event loops; this fakes + # it the same way as Twisted, by scheduling the callback to be called + # repeatedly + mutable_handle: list[object] = [] + + def faux_idle_callback(): + callback() + mutable_handle[0] = self._loop.call_later(self._idle_emulation_delay, faux_idle_callback) + + mutable_handle.append(self._loop.call_later(self._idle_emulation_delay, faux_idle_callback)) + + # asyncio used as backend, real type comes from asyncio_loop.call_later + return mutable_handle + + def remove_enter_idle(self, handle) -> bool: + """ + Remove an idle callback. + + Returns True if the handle was removed. + """ + # `handle` is just a list containing the current actual handle + return self.remove_alarm(handle[0]) + + def handle_exit(self, f: Callable[_Spec, _T]) -> Callable[_Spec, _T | Literal[False]]: + @functools.wraps(f) + def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | Literal[False]: + try: + return f(*args, **kwargs) + except ExitMainLoop: + self._loop.stop() + except Exception as exc: + self._exc = exc + self._loop.stop() + return False + return wrapper + + def run(self) -> None: + self._loop.start() + if self._exc: + exc, self._exc = self._exc, None + raise exc.with_traceback(exc.__traceback__) diff --git a/urwid/_async_kw_event_loop.py b/urwid/event_loop/trio_loop.py index bc74b0b..6e7670b 100644 --- a/urwid/_async_kw_event_loop.py +++ b/urwid/event_loop/trio_loop.py @@ -1,5 +1,3 @@ -#!/usr/bin/python -# # Urwid main loop code using Python-3.5 features (Trio, Curio, etc) # Copyright (C) 2018 Toshio Kuratomi # Copyright (C) 2019 Tamas Nepusz @@ -20,9 +18,35 @@ # # Urwid web site: https://urwid.org/ +"""Trio Runner based urwid EventLoop implementation. + +Trio library is required. +""" + from __future__ import annotations -from .main_loop import EventLoop, ExitMainLoop +import typing +from collections.abc import Callable + +import trio + +from .abstract_loop import EventLoop, ExitMainLoop + +__all__ = ("TrioEventLoop",) + + +class _TrioIdleCallbackInstrument(trio.abc.Instrument): + """IDLE callbacks emulation helper.""" + + __slots__ = ("idle_callbacks",) + + def __init__(self, idle_callbacks): + self.idle_callbacks = idle_callbacks + + def before_io_wait(self, timeout): + if timeout > 0: + for idle_callback in self.idle_callbacks.values(): + idle_callback() class TrioEventLoop(EventLoop): @@ -34,23 +58,17 @@ class TrioEventLoop(EventLoop): def __init__(self): """Constructor.""" - import trio self._idle_handle = 0 self._idle_callbacks = {} self._pending_tasks = [] - self._trio = trio self._nursery = None self._sleep = trio.sleep - try: - self._wait_readable = trio.lowlevel.wait_readable - except AttributeError: - # Trio 0.14 or older - self._wait_readable = trio.hazmat.wait_readable + self._wait_readable = trio.lowlevel.wait_readable - def alarm(self, seconds, callback): + def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]): """Calls `callback()` a given time from now. No parameters are passed to the callback. @@ -63,7 +81,7 @@ class TrioEventLoop(EventLoop): """ return self._start_task(self._alarm_task, seconds, callback) - def enter_idle(self, callback): + def enter_idle(self, callback: Callable[[], typing.Any]) -> int: """Calls `callback()` when the event loop enters the idle state. There is no such thing as being idle in a Trio event loop so we @@ -81,7 +99,7 @@ class TrioEventLoop(EventLoop): """ return self._cancel_scope(handle) - def remove_enter_idle(self, handle): + def remove_enter_idle(self, handle) -> bool: """Removes an idle callback. Parameters: @@ -93,7 +111,7 @@ class TrioEventLoop(EventLoop): return False return True - def remove_watch_file(self, handle): + def remove_watch_file(self, handle: trio.CancelScope) -> bool: """Removes a file descriptor being watched for input. Parameters: @@ -104,7 +122,7 @@ class TrioEventLoop(EventLoop): """ return self._cancel_scope(handle) - def _cancel_scope(self, scope): + def _cancel_scope(self, scope: trio.CancelScope) -> bool: """Cancels the given Trio cancellation scope. Returns: @@ -115,26 +133,16 @@ class TrioEventLoop(EventLoop): scope.cancel() return existed - def run(self): + def run(self) -> None: """Starts the event loop. Exits the loop when any callback raises an exception. If ExitMainLoop is raised, exits cleanly. """ - idle_callbacks = self._idle_callbacks - - # This class is duplicated in run_async(). It would be nice to move - # this somewhere outside, but we cannot do it yet becase we need to - # derive it from self._trio.abc.Instrument - class TrioIdleCallbackInstrument(self._trio.abc.Instrument): - def before_io_wait(self, timeout): - if timeout > 0: - for idle_callback in idle_callbacks.values(): - idle_callback() + emulate_idle_callbacks = _TrioIdleCallbackInstrument(self._idle_callbacks) - emulate_idle_callbacks = TrioIdleCallbackInstrument() - - with self._trio.MultiError.catch(self._handle_main_loop_exception): - self._trio.run(self._main_task, instruments=[emulate_idle_callbacks]) + # TODO(Aleksei): trio.MultiError is deprecated in favor of exceptiongroup package usage and `Except *` + with trio.MultiError.catch(self._handle_main_loop_exception): + trio.run(self._main_task, instruments=[emulate_idle_callbacks]) async def run_async(self): """Starts the main loop and blocks asynchronously until the main loop @@ -153,35 +161,16 @@ class TrioEventLoop(EventLoop): nursery.cancel_scope.cancel() """ - idle_callbacks = self._idle_callbacks - - # This class is duplicated in run_async(). It would be nice to move - # this somewhere outside, but we cannot do it yet becase we need to - # derive it from self._trio.abc.Instrument - class TrioIdleCallbackInstrument(self._trio.abc.Instrument): - def before_io_wait(self, timeout): - if timeout > 0: - for idle_callback in idle_callbacks.values(): - idle_callback() + emulate_idle_callbacks = _TrioIdleCallbackInstrument(self._idle_callbacks) - emulate_idle_callbacks = TrioIdleCallbackInstrument() - - try: - add_instrument = self._trio.lowlevel.add_instrument - remove_instrument = self._trio.lowlevel.remove_instrument - except AttributeError: - # Trio 0.14 or older - add_instrument = self._trio.hazmat.add_instrument - remove_instrument = self._trio.hazmat.remove_instrument - - with self._trio.MultiError.catch(self._handle_main_loop_exception): - add_instrument(emulate_idle_callbacks) + with trio.MultiError.catch(self._handle_main_loop_exception): + trio.lowlevel.add_instrument(emulate_idle_callbacks) try: await self._main_task() finally: - remove_instrument(emulate_idle_callbacks) + trio.lowlevel.remove_instrument(emulate_idle_callbacks) - def watch_file(self, fd, callback): + def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> trio.CancelScope: """Calls `callback()` when the given file descriptor has some data to read. No parameters are passed to the callback. @@ -194,7 +183,12 @@ class TrioEventLoop(EventLoop): """ return self._start_task(self._watch_task, fd, callback) - async def _alarm_task(self, scope, seconds, callback): + async def _alarm_task( + self, + scope: trio.CancelScope, + seconds: float | int, + callback: Callable[[], typing.Any], + ) -> None: """Asynchronous task that sleeps for a given number of seconds and then calls the given callback. @@ -207,7 +201,7 @@ class TrioEventLoop(EventLoop): await self._sleep(seconds) callback() - def _handle_main_loop_exception(self, exc): + def _handle_main_loop_exception(self, exc: BaseException) -> BaseException | None: """Handles exceptions raised from the main loop, catching ExitMainLoop instead of letting it propagate through. @@ -226,9 +220,9 @@ class TrioEventLoop(EventLoop): exits the app by raising ExitMainLoop. """ try: - async with self._trio.open_nursery() as self._nursery: + async with trio.open_nursery() as self._nursery: self._schedule_pending_tasks() - await self._trio.sleep_forever() + await trio.sleep_forever() finally: self._nursery = None @@ -252,7 +246,7 @@ class TrioEventLoop(EventLoop): Returns: a cancellation scope for the Trio task """ - scope = self._trio.CancelScope() + scope = trio.CancelScope() if self._nursery: self._nursery.start_soon(task, scope, *args) else: diff --git a/urwid/event_loop/twisted_loop.py b/urwid/event_loop/twisted_loop.py new file mode 100644 index 0000000..4637c1b --- /dev/null +++ b/urwid/event_loop/twisted_loop.py @@ -0,0 +1,236 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""Twisted Reactor based urwid EventLoop implementation. + +Twisted library is required. +""" + +from __future__ import annotations + +import sys +import typing +from collections.abc import Callable + +from twisted.internet.abstract import FileDescriptor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled + +from .abstract_loop import EventLoop, ExitMainLoop + +if typing.TYPE_CHECKING: + from twisted.internet.interfaces import IReactorFDSet + from typing_extensions import ParamSpec + + _Spec = ParamSpec("_Spec") + _T = typing.TypeVar("_T") + +__all__ = ("TwistedEventLoop",) + + +class _TwistedInputDescriptor(FileDescriptor): + def __init__(self, reactor: IReactorFDSet, fd: int, cb: Callable[[], typing.Any]) -> None: + self._fileno = fd + self.cb = cb + super().__init__(reactor) + + def fileno(self) -> int: + return self._fileno + + def doRead(self): + return self.cb() + + +class TwistedEventLoop(EventLoop): + """ + Event loop based on Twisted_ + """ + _idle_emulation_delay = 1.0/256 # a short time (in seconds) + + def __init__(self, reactor=None, manage_reactor: bool = True) -> None: + """ + :param reactor: reactor to use + :type reactor: :class:`twisted.internet.reactor`. + :param: manage_reactor: `True` if you want this event loop to run + and stop the reactor. + :type manage_reactor: boolean + + .. WARNING:: + Twisted's reactor doesn't like to be stopped and run again. If you + need to stop and run your :class:`MainLoop`, consider setting + ``manage_reactor=False`` and take care of running/stopping the reactor + at the beginning/ending of your program yourself. + + You can also forego using :class:`MainLoop`'s run() entirely, and + instead call start() and stop() before and after starting the + reactor. + + .. _Twisted: https://twisted.org/ + """ + if reactor is None: + import twisted.internet.reactor + reactor = twisted.internet.reactor + self.reactor = reactor + self._watch_files: dict[int, _TwistedInputDescriptor] = {} + self._idle_handle: int = 0 + self._twisted_idle_enabled = False + self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {} + self._exc: BaseException | None = None + self.manage_reactor = manage_reactor + self._enable_twisted_idle() + + def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]): + """ + Call callback() a given time from now. No parameters are + passed to callback. + + Returns a handle that may be passed to remove_alarm() + + seconds -- floating point time to wait before calling callback + callback -- function to call from event loop + """ + handle = self.reactor.callLater(seconds, self.handle_exit(callback)) + return handle + + def remove_alarm(self, handle) -> bool: + """ + Remove an alarm. + + Returns True if the alarm exists, False otherwise + """ + try: + handle.cancel() + return True + except AlreadyCancelled: + return False + except AlreadyCalled: + return False + + def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int: + """ + Call callback() when fd has some data to read. No parameters + are passed to callback. + + Returns a handle that may be passed to remove_watch_file() + + fd -- file descriptor to watch for input + callback -- function to call when input is available + """ + ind = _TwistedInputDescriptor(self.reactor, fd, self.handle_exit(callback)) + self._watch_files[fd] = ind + self.reactor.addReader(ind) + return fd + + def remove_watch_file(self, handle: int) -> bool: + """ + Remove an input file. + + Returns True if the input file exists, False otherwise + """ + if handle in self._watch_files: + self.reactor.removeReader(self._watch_files[handle]) + del self._watch_files[handle] + return True + return False + + def enter_idle(self, callback: Callable[[], typing.Any]) -> int: + """ + Add a callback for entering idle. + + Returns a handle that may be passed to remove_enter_idle() + """ + self._idle_handle += 1 + self._idle_callbacks[self._idle_handle] = callback + return self._idle_handle + + def _enable_twisted_idle(self) -> None: + """ + Twisted's reactors don't have an idle or enter-idle callback + so the best we can do for now is to set a timer event in a very + short time to approximate an enter-idle callback. + + .. WARNING:: + This will perform worse than the other event loops until we can find a + fix or workaround + """ + if self._twisted_idle_enabled: + return + self.reactor.callLater( + self._idle_emulation_delay, + self.handle_exit(self._twisted_idle_callback, enable_idle=False), + ) + self._twisted_idle_enabled = True + + def _twisted_idle_callback(self) -> None: + for callback in self._idle_callbacks.values(): + callback() + self._twisted_idle_enabled = False + + def remove_enter_idle(self, handle) -> bool: + """ + Remove an idle callback. + + Returns True if the handle was removed. + """ + try: + del self._idle_callbacks[handle] + except KeyError: + return False + return True + + def run(self) -> None: + """ + Start the event loop. Exit the loop when any callback raises + an exception. If ExitMainLoop is raised, exit cleanly. + """ + if not self.manage_reactor: + return + self.reactor.run() + if self._exc: + # An exception caused us to exit, raise it now + exc = self._exc + self._exc = None + raise exc.with_traceback(exc.__traceback__) + + def handle_exit(self, f: Callable[_Spec, _T], enable_idle: bool = True) -> Callable[_Spec, _T | None]: + """ + Decorator that cleanly exits the :class:`TwistedEventLoop` if + :class:`ExitMainLoop` is thrown inside of the wrapped function. Store the + exception info if some other exception occurs, it will be reraised after + the loop quits. + + *f* -- function to be wrapped + """ + def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | None: + rval = None + try: + rval = f(*args, **kwargs) + except ExitMainLoop: + if self.manage_reactor: + self.reactor.stop() + except BaseException as exc: + print(sys.exc_info()) + self._exc = exc + if self.manage_reactor: + self.reactor.crash() + if enable_idle: + self._enable_twisted_idle() + return rval + return wrapper diff --git a/urwid/html_fragment.py b/urwid/html_fragment.py index 1f56509..006a06b 100755 --- a/urwid/html_fragment.py +++ b/urwid/html_fragment.py @@ -30,7 +30,7 @@ import typing from urwid import util from urwid.display_common import AttrSpec, BaseScreen -from urwid.main_loop import ExitMainLoop +from urwid.event_loop import ExitMainLoop if typing.TYPE_CHECKING: from typing_extensions import Literal diff --git a/urwid/main_loop.py b/urwid/main_loop.py deleted file mode 100755 index 5cc7514..0000000 --- a/urwid/main_loop.py +++ /dev/null @@ -1,1589 +0,0 @@ -#!/usr/bin/python -# -# Urwid main loop code -# Copyright (C) 2004-2012 Ian Ward -# Copyright (C) 2008 Walter Mundt -# Copyright (C) 2009 Andrew Psaltis -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Urwid web site: https://urwid.org/ - - -from __future__ import annotations - -import heapq -import os -import select -import signal -import sys -import time -import typing -import warnings -from collections.abc import Callable, Iterable -from functools import wraps -from itertools import count -from weakref import WeakKeyDictionary - -try: - import fcntl -except ImportError: - pass # windows - -from urwid import signals -from urwid.command_map import REDRAW_SCREEN, command_map -from urwid.compat import reraise -from urwid.display_common import INPUT_DESCRIPTORS_CHANGED -from urwid.util import StoppingContext, is_mouse_event -from urwid.wimp import PopUpTarget - -if typing.TYPE_CHECKING: - from .display_common import BaseScreen - from .widget import Widget - -PIPE_BUFFER_READ_SIZE = 4096 # can expect this much on Linux, so try for that - - -class ExitMainLoop(Exception): - """ - When this exception is raised within a main loop the main loop - will exit cleanly. - """ - pass - - -class CantUseExternalLoop(Exception): - pass - - -class MainLoop: - """ - This is the standard main loop implementation for a single interactive - session. - - :param widget: the topmost widget used for painting the screen, stored as - :attr:`widget` and may be modified. Must be a box widget. - :type widget: widget instance - - :param palette: initial palette for screen - :type palette: iterable of palette entries - - :param screen: screen to use, default is a new :class:`raw_display.Screen` - instance; stored as :attr:`screen` - :type screen: display module screen instance - - :param handle_mouse: ``True`` to ask :attr:`.screen` to process mouse events - :type handle_mouse: bool - - :param input_filter: a function to filter input before sending it to - :attr:`.widget`, called from :meth:`.input_filter` - :type input_filter: callable - - :param unhandled_input: a function called when input is not handled by - :attr:`.widget`, called from :meth:`.unhandled_input` - :type unhandled_input: callable - - :param event_loop: if :attr:`.screen` supports external an event loop it may be - given here, default is a new :class:`SelectEventLoop` instance; - stored as :attr:`.event_loop` - :type event_loop: event loop instance - - :param pop_ups: `True` to wrap :attr:`.widget` with a :class:`PopUpTarget` - instance to allow any widget to open a pop-up anywhere on the screen - :type pop_ups: boolean - - - .. attribute:: screen - - The screen object this main loop uses for screen updates and reading input - - .. attribute:: event_loop - - The event loop object this main loop uses for waiting on alarms and IO - """ - - def __init__( - self, - widget: Widget, - palette=(), - screen: BaseScreen | None = None, - handle_mouse: bool = True, - input_filter: Callable[[list[str], list[int]], list[str]] | None = None, - unhandled_input: Callable[[str], bool] | None = None, - event_loop=None, - pop_ups: bool = False, - ): - self._widget = widget - self.handle_mouse = handle_mouse - self.pop_ups = pop_ups # triggers property setting side-effect - - if not screen: - from urwid import raw_display - screen = raw_display.Screen() - - if palette: - screen.register_palette(palette) - - self.screen = screen - self.screen_size = None - - self._unhandled_input = unhandled_input - self._input_filter = input_filter - - if not hasattr(screen, 'hook_event_loop') and event_loop is not None: - raise NotImplementedError(f"screen object passed {screen!r} does not support external event loops") - if event_loop is None: - event_loop = SelectEventLoop() - self.event_loop = event_loop - - if hasattr(self.screen, 'signal_handler_setter'): - # Tell the screen what function it must use to set - # signal handlers - self.screen.signal_handler_setter = self.event_loop.set_signal_handler - - self._watch_pipes = {} - - @property - def widget(self) -> Widget: - """ - Property for the topmost widget used to draw the screen. - This must be a box widget. - """ - return self._widget - - @widget.setter - def widget(self, widget: Widget) -> None: - self._widget = widget - if self.pop_ups: - self._topmost_widget.original_widget = self._widget - else: - self._topmost_widget = self._widget - - def _set_widget(self, widget: Widget) -> None: - warnings.warn( - f"method `{self.__class__.__name__}._set_widget` is deprecated, " - f"please use `{self.__class__.__name__}.widget` property", - DeprecationWarning, - stacklevel=2, - ) - self.widget = widget - - @property - def pop_ups(self): - return self._pop_ups - - @pop_ups.setter - def pop_ups(self, pop_ups) -> None: - self._pop_ups = pop_ups - if pop_ups: - self._topmost_widget = PopUpTarget(self._widget) - else: - self._topmost_widget = self._widget - - def _set_pop_ups(self, pop_ups) -> None: - warnings.warn( - f"method `{self.__class__.__name__}._set_pop_ups` is deprecated, " - f"please use `{self.__class__.__name__}.pop_ups` property", - DeprecationWarning, - stacklevel=2, - ) - self.pop_ups = pop_ups - - def set_alarm_in(self, sec, callback, user_data=None): - """ - Schedule an alarm in *sec* seconds that will call *callback* from the - within the :meth:`run` method. - - :param sec: seconds until alarm - :type sec: float - :param callback: function to call with two parameters: this main loop - object and *user_data* - :type callback: callable - """ - def cb(): - callback(self, user_data) - return self.event_loop.alarm(sec, cb) - - def set_alarm_at(self, tm, callback, user_data=None): - """ - Schedule an alarm at *tm* time that will call *callback* from the - within the :meth:`run` function. Returns a handle that may be passed to - :meth:`remove_alarm`. - - :param tm: time to call callback e.g. ``time.time() + 5`` - :type tm: float - :param callback: function to call with two parameters: this main loop - object and *user_data* - :type callback: callable - """ - def cb(): - callback(self, user_data) - return self.event_loop.alarm(tm - time.time(), cb) - - def remove_alarm(self, handle): - """ - Remove an alarm. Return ``True`` if *handle* was found, ``False`` - otherwise. - """ - return self.event_loop.remove_alarm(handle) - - def watch_pipe(self, callback): - """ - Create a pipe for use by a subprocess or thread to trigger a callback - in the process/thread running the main loop. - - :param callback: function taking one parameter to call from within - the process/thread running the main loop - :type callback: callable - - This method returns a file descriptor attached to the write end of a - pipe. The read end of the pipe is added to the list of files - :attr:`event_loop` is watching. When data is written to the pipe the - callback function will be called and passed a single value containing - data read from the pipe. - - This method may be used any time you want to update widgets from - another thread or subprocess. - - Data may be written to the returned file descriptor with - ``os.write(fd, data)``. Ensure that data is less than 512 bytes (or 4K - on Linux) so that the callback will be triggered just once with the - complete value of data passed in. - - If the callback returns ``False`` then the watch will be removed from - :attr:`event_loop` and the read end of the pipe will be closed. You - are responsible for closing the write end of the pipe with - ``os.close(fd)``. - """ - pipe_rd, pipe_wr = os.pipe() - fcntl.fcntl(pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK) - watch_handle = None - - def cb(): - data = os.read(pipe_rd, PIPE_BUFFER_READ_SIZE) - rval = callback(data) - if rval is False: - self.event_loop.remove_watch_file(watch_handle) - os.close(pipe_rd) - - watch_handle = self.event_loop.watch_file(pipe_rd, cb) - self._watch_pipes[pipe_wr] = (watch_handle, pipe_rd) - return pipe_wr - - def remove_watch_pipe(self, write_fd): - """ - Close the read end of the pipe and remove the watch created by - :meth:`watch_pipe`. You are responsible for closing the write end of - the pipe. - - Returns ``True`` if the watch pipe exists, ``False`` otherwise - """ - try: - watch_handle, pipe_rd = self._watch_pipes.pop(write_fd) - except KeyError: - return False - - if not self.event_loop.remove_watch_file(watch_handle): - return False - os.close(pipe_rd) - return True - - def watch_file(self, fd, callback): - """ - Call *callback* when *fd* has some data to read. No parameters are - passed to callback. - - Returns a handle that may be passed to :meth:`remove_watch_file`. - """ - return self.event_loop.watch_file(fd, callback) - - def remove_watch_file(self, handle): - """ - Remove a watch file. Returns ``True`` if the watch file - exists, ``False`` otherwise. - """ - return self.event_loop.remove_watch_file(handle) - - - def run(self): - """ - Start the main loop handling input events and updating the screen. The - loop will continue until an :exc:`ExitMainLoop` exception is raised. - - If you would prefer to manage the event loop yourself, don't use this - method. Instead, call :meth:`start` before starting the event loop, - and :meth:`stop` once it's finished. - """ - try: - self._run() - except ExitMainLoop: - pass - - def _test_run(self): - """ - >>> w = _refl("widget") # _refl prints out function calls - >>> w.render_rval = "fake canvas" # *_rval is used for return values - >>> scr = _refl("screen") - >>> scr.get_input_descriptors_rval = [42] - >>> scr.get_cols_rows_rval = (20, 10) - >>> scr.started = True - >>> scr._urwid_signals = {} - >>> evl = _refl("event_loop") - >>> evl.enter_idle_rval = 1 - >>> evl.watch_file_rval = 2 - >>> ml = MainLoop(w, [], scr, event_loop=evl) - >>> ml.run() # doctest:+ELLIPSIS - screen.start() - screen.set_mouse_tracking() - screen.unhook_event_loop(...) - screen.hook_event_loop(...) - event_loop.enter_idle(<bound method MainLoop.entering_idle...>) - event_loop.run() - event_loop.remove_enter_idle(1) - screen.unhook_event_loop(...) - screen.stop() - >>> ml.draw_screen() # doctest:+ELLIPSIS - screen.get_cols_rows() - widget.render((20, 10), focus=True) - screen.draw_screen((20, 10), 'fake canvas') - """ - - def start(self): - """ - Sets up the main loop, hooking into the event loop where necessary. - Starts the :attr:`screen` if it hasn't already been started. - - If you want to control starting and stopping the event loop yourself, - you should call this method before starting, and call `stop` once the - loop has finished. You may also use this method as a context manager, - which will stop the loop automatically at the end of the block: - - with main_loop.start(): - ... - - Note that some event loop implementations don't handle exceptions - specially if you manage the event loop yourself. In particular, the - Twisted and asyncio loops won't stop automatically when - :exc:`ExitMainLoop` (or anything else) is raised. - """ - self.screen.start() - - if self.handle_mouse: - self.screen.set_mouse_tracking() - - if not hasattr(self.screen, 'hook_event_loop'): - raise CantUseExternalLoop( - "Screen {0!r} doesn't support external event loops") - - try: - signals.connect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED, - self._reset_input_descriptors) - except NameError: - pass - # watch our input descriptors - self._reset_input_descriptors() - self.idle_handle = self.event_loop.enter_idle(self.entering_idle) - - return StoppingContext(self) - - def stop(self): - """ - Cleans up any hooks added to the event loop. Only call this if you're - managing the event loop yourself, after the loop stops. - """ - self.event_loop.remove_enter_idle(self.idle_handle) - del self.idle_handle - signals.disconnect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED, - self._reset_input_descriptors) - self.screen.unhook_event_loop(self.event_loop) - - self.screen.stop() - - def _reset_input_descriptors(self): - self.screen.unhook_event_loop(self.event_loop) - self.screen.hook_event_loop(self.event_loop, self._update) - - def _run(self): - try: - self.start() - except CantUseExternalLoop: - try: - return self._run_screen_event_loop() - finally: - self.screen.stop() - - try: - self.event_loop.run() - except: - self.screen.stop() # clean up screen control - raise - self.stop() - - def _update(self, keys: list[str], raw: list[int]): - """ - >>> w = _refl("widget") - >>> w.selectable_rval = True - >>> w.mouse_event_rval = True - >>> scr = _refl("screen") - >>> scr.get_cols_rows_rval = (15, 5) - >>> evl = _refl("event_loop") - >>> ml = MainLoop(w, [], scr, event_loop=evl) - >>> ml._input_timeout = "old timeout" - >>> ml._update(['y'], [121]) # doctest:+ELLIPSIS - screen.get_cols_rows() - widget.selectable() - widget.keypress((15, 5), 'y') - >>> ml._update([("mouse press", 1, 5, 4)], []) - widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True) - >>> ml._update([], []) - """ - keys = self.input_filter(keys, raw) - - if keys: - self.process_input(keys) - if 'window resize' in keys: - self.screen_size = None - - def _run_screen_event_loop(self) -> None: - """ - This method is used when the screen does not support using - external event loops. - - The alarms stored in the SelectEventLoop in :attr:`event_loop` - are modified by this method. - """ - next_alarm = None - - while True: - self.draw_screen() - - if not next_alarm and self.event_loop._alarms: - next_alarm = heapq.heappop(self.event_loop._alarms) - - keys: list[str] = [] - raw: list[int] = [] - while not keys: - if next_alarm: - sec = max(0, next_alarm[0] - time.time()) - self.screen.set_input_timeouts(sec) - else: - self.screen.set_input_timeouts(None) - keys, raw = self.screen.get_input(True) - if not keys and next_alarm: - sec = next_alarm[0] - time.time() - if sec <= 0: - break - - keys = self.input_filter(keys, raw) - - if keys: - self.process_input(keys) - - while next_alarm: - sec = next_alarm[0] - time.time() - if sec > 0: - break - tm, tie_break, callback = next_alarm - callback() - - if self.event_loop._alarms: - next_alarm = heapq.heappop(self.event_loop._alarms) - else: - next_alarm = None - - if 'window resize' in keys: - self.screen_size = None - - def _test_run_screen_event_loop(self): - """ - >>> w = _refl("widget") - >>> scr = _refl("screen") - >>> scr.get_cols_rows_rval = (10, 5) - >>> scr.get_input_rval = [], [] - >>> ml = MainLoop(w, screen=scr) - >>> def stop_now(loop, data): - ... raise ExitMainLoop() - >>> handle = ml.set_alarm_in(0, stop_now) - >>> try: - ... ml._run_screen_event_loop() - ... except ExitMainLoop: - ... pass - screen.get_cols_rows() - widget.render((10, 5), focus=True) - screen.draw_screen((10, 5), None) - screen.set_input_timeouts(0) - screen.get_input(True) - """ - - def process_input(self, keys: Iterable[str]) -> bool: - """ - This method will pass keyboard input and mouse events to :attr:`widget`. - This method is called automatically from the :meth:`run` method when - there is input, but may also be called to simulate input from the user. - - *keys* is a list of input returned from :attr:`screen`'s get_input() - or get_input_nonblocking() methods. - - Returns ``True`` if any key was handled by a widget or the - :meth:`unhandled_input` method. - """ - if not self.screen_size: - self.screen_size = self.screen.get_cols_rows() - - something_handled = False - - for k in keys: - if k == 'window resize': - continue - if is_mouse_event(k): - event, button, col, row = k - if hasattr(self._topmost_widget, "mouse_event"): - if self._topmost_widget.mouse_event(self.screen_size, event, button, col, row, focus=True): - k = None - elif self._topmost_widget.selectable(): - k = self._topmost_widget.keypress(self.screen_size, k) - if k: - if command_map[k] == REDRAW_SCREEN: - self.screen.clear() - something_handled = True - else: - something_handled |= bool(self.unhandled_input(k)) - else: - something_handled = True - - return something_handled - - def _test_process_input(self): - """ - >>> w = _refl("widget") - >>> w.selectable_rval = True - >>> scr = _refl("screen") - >>> scr.get_cols_rows_rval = (10, 5) - >>> ml = MainLoop(w, [], scr) - >>> ml.process_input(['enter', ('mouse drag', 1, 14, 20)]) - screen.get_cols_rows() - widget.selectable() - widget.keypress((10, 5), 'enter') - widget.mouse_event((10, 5), 'mouse drag', 1, 14, 20, focus=True) - True - """ - - def input_filter(self, keys: list[str], raw: list[int]) -> list[str]: - """ - This function is passed each all the input events and raw keystroke - values. These values are passed to the *input_filter* function - passed to the constructor. That function must return a list of keys to - be passed to the widgets to handle. If no *input_filter* was - defined this implementation will return all the input events. - """ - if self._input_filter: - return self._input_filter(keys, raw) - return keys - - def unhandled_input(self, input: str) -> bool: - """ - This function is called with any input that was not handled by the - widgets, and calls the *unhandled_input* function passed to the - constructor. If no *unhandled_input* was defined then the input - will be ignored. - - *input* is the keyboard or mouse input. - - The *unhandled_input* function should return ``True`` if it handled - the input. - """ - if self._unhandled_input: - return self._unhandled_input(input) - - def entering_idle(self): - """ - This method is called whenever the event loop is about to enter the - idle state. :meth:`draw_screen` is called here to update the - screen when anything has changed. - """ - if self.screen.started: - self.draw_screen() - - def draw_screen(self): - """ - Render the widgets and paint the screen. This method is called - automatically from :meth:`entering_idle`. - - If you modify the widgets displayed outside of handling input or - responding to an alarm you will need to call this method yourself - to repaint the screen. - """ - if not self.screen_size: - self.screen_size = self.screen.get_cols_rows() - - canvas = self._topmost_widget.render(self.screen_size, focus=True) - self.screen.draw_screen(self.screen_size, canvas) - - -class EventLoop: - """ - Abstract class representing an event loop to be used by :class:`MainLoop`. - """ - - def alarm(self, seconds: float | int, callback): - """ - Call callback() a given time from now. No parameters are - passed to callback. - - This method has no default implementation. - - Returns a handle that may be passed to remove_alarm() - - seconds -- floating point time to wait before calling callback - callback -- function to call from event loop - """ - raise NotImplementedError() - - def enter_idle(self, callback): - """ - Add a callback for entering idle. - - This method has no default implementation. - - Returns a handle that may be passed to remove_idle() - """ - raise NotImplementedError() - - def remove_alarm(self, handle): - """ - Remove an alarm. - - This method has no default implementation. - - Returns True if the alarm exists, False otherwise - """ - raise NotImplementedError() - - def remove_enter_idle(self, handle): - """ - Remove an idle callback. - - This method has no default implementation. - - Returns True if the handle was removed. - """ - raise NotImplementedError() - - def remove_watch_file(self, handle): - """ - Remove an input file. - - This method has no default implementation. - - Returns True if the input file exists, False otherwise - """ - raise NotImplementedError() - - def run(self): - """ - Start the event loop. Exit the loop when any callback raises - an exception. If ExitMainLoop is raised, exit cleanly. - - This method has no default implementation. - """ - raise NotImplementedError() - - def watch_file(self, fd, callback): - """ - Call callback() when fd has some data to read. No parameters - are passed to callback. - - This method has no default implementation. - - Returns a handle that may be passed to remove_watch_file() - - fd -- file descriptor to watch for input - callback -- function to call when input is available - """ - raise NotImplementedError() - - def set_signal_handler(self, signum, handler): - """ - Sets the signal handler for signal signum. - - The default implementation of :meth:`set_signal_handler` - is simply a proxy function that calls :func:`signal.signal()` - and returns the resulting value. - - signum -- signal number - handler -- function (taking signum as its single argument), - or `signal.SIG_IGN`, or `signal.SIG_DFL` - """ - return signal.signal(signum, handler) - - -class SelectEventLoop(EventLoop): - """ - Event loop based on :func:`select.select` - """ - - def __init__(self): - self._alarms = [] - self._watch_files = {} - self._idle_handle = 0 - self._idle_callbacks = {} - self._tie_break = count() - - def alarm(self, seconds: float | int, callback): - """ - Call callback() a given time from now. No parameters are - passed to callback. - - Returns a handle that may be passed to remove_alarm() - - seconds -- floating point time to wait before calling callback - callback -- function to call from event loop - """ - tm = time.time() + seconds - handle = (tm, next(self._tie_break), callback) - heapq.heappush(self._alarms, handle) - return handle - - def remove_alarm(self, handle): - """ - Remove an alarm. - - Returns True if the alarm exists, False otherwise - """ - try: - self._alarms.remove(handle) - heapq.heapify(self._alarms) - return True - except ValueError: - return False - - def watch_file(self, fd, callback): - """ - Call callback() when fd has some data to read. No parameters - are passed to callback. - - Returns a handle that may be passed to remove_watch_file() - - fd -- file descriptor to watch for input - callback -- function to call when input is available - """ - self._watch_files[fd] = callback - return fd - - def remove_watch_file(self, handle): - """ - Remove an input file. - - Returns True if the input file exists, False otherwise - """ - if handle in self._watch_files: - del self._watch_files[handle] - return True - return False - - def enter_idle(self, callback): - """ - Add a callback for entering idle. - - Returns a handle that may be passed to remove_idle() - """ - self._idle_handle += 1 - self._idle_callbacks[self._idle_handle] = callback - return self._idle_handle - - def remove_enter_idle(self, handle): - """ - Remove an idle callback. - - Returns True if the handle was removed. - """ - try: - del self._idle_callbacks[handle] - except KeyError: - return False - return True - - def _entering_idle(self): - """ - Call all the registered idle callbacks. - """ - for callback in self._idle_callbacks.values(): - callback() - - def run(self): - """ - Start the event loop. Exit the loop when any callback raises - an exception. If ExitMainLoop is raised, exit cleanly. - """ - try: - self._did_something = True - while True: - try: - self._loop() - except OSError as e: - if e.args[0] != 4: - # not just something we need to retry - raise - except ExitMainLoop: - pass - - def _loop(self): - """ - A single iteration of the event loop - """ - fds = list(self._watch_files.keys()) - if self._alarms or self._did_something: - if self._alarms: - tm = self._alarms[0][0] - timeout = max(0, tm - time.time()) - if self._did_something and (not self._alarms or (self._alarms and timeout > 0)): - timeout = 0 - tm = 'idle' - ready, w, err = select.select(fds, [], fds, timeout) - else: - tm = None - ready, w, err = select.select(fds, [], fds) - - if not ready: - if tm == 'idle': - self._entering_idle() - self._did_something = False - elif tm is not None: - # must have been a timeout - tm, tie_break, alarm_callback = heapq.heappop(self._alarms) - alarm_callback() - self._did_something = True - - for fd in ready: - self._watch_files[fd]() - self._did_something = True - - -class GLibEventLoop(EventLoop): - """ - Event loop based on GLib.MainLoop - """ - - def __init__(self): - from gi.repository import GLib - self.GLib = GLib - self._alarms = [] - self._watch_files = {} - self._idle_handle = 0 - self._glib_idle_enabled = False # have we called glib.idle_add? - self._idle_callbacks = {} - self._loop = GLib.MainLoop() - self._exc_info = None - self._enable_glib_idle() - self._signal_handlers = {} - - def alarm(self, seconds: float | int, callback): - """ - Call callback() a given time from now. No parameters are - passed to callback. - - Returns a handle that may be passed to remove_alarm() - - seconds -- floating point time to wait before calling callback - callback -- function to call from event loop - """ - @self.handle_exit - def ret_false(): - callback() - self._enable_glib_idle() - return False - fd = self.GLib.timeout_add(int(seconds*1000), ret_false) - self._alarms.append(fd) - return (fd, callback) - - def set_signal_handler(self, signum, handler): - """ - Sets the signal handler for signal signum. - - .. WARNING:: - Because this method uses the `GLib`-specific `unix_signal_add` - function, its behaviour is different than `signal.signal().` - - If `signum` is not `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGUSR1`, - `SIGUSR2` or `SIGWINCH`, this method performs no actions and - immediately returns None. - - Returns None in all cases (unlike :func:`signal.signal()`). - .. - - signum -- signal number - handler -- function (taking signum as its single argument), - or `signal.SIG_IGN`, or `signal.SIG_DFL` - """ - glib_signals = [ - signal.SIGHUP, - signal.SIGINT, - signal.SIGTERM, - signal.SIGUSR1, - signal.SIGUSR2, - ] - - # GLib supports SIGWINCH as of version 2.54. - if not self.GLib.check_version(2, 54, 0): - glib_signals.append(signal.SIGWINCH) - - if signum not in glib_signals: - # The GLib event loop supports only the signals listed above - return - - if signum in self._signal_handlers: - self.GLib.source_remove(self._signal_handlers.pop(signum)) - - if handler == signal.SIG_IGN: - handler = lambda x: None - elif handler == signal.SIG_DFL: - return - - def final_handler(signal_number): - handler(signal_number) - return self.GLib.SOURCE_CONTINUE - - source = self.GLib.unix_signal_add(self.GLib.PRIORITY_DEFAULT, signum, final_handler, signum) - self._signal_handlers[signum] = source - - def remove_alarm(self, handle): - """ - Remove an alarm. - - Returns True if the alarm exists, False otherwise - """ - try: - self._alarms.remove(handle[0]) - self.GLib.source_remove(handle[0]) - return True - except ValueError: - return False - - def watch_file(self, fd, callback): - """ - Call callback() when fd has some data to read. No parameters - are passed to callback. - - Returns a handle that may be passed to remove_watch_file() - - fd -- file descriptor to watch for input - callback -- function to call when input is available - """ - @self.handle_exit - def io_callback(source, cb_condition): - callback() - self._enable_glib_idle() - return True - self._watch_files[fd] = self.GLib.io_add_watch(fd, self.GLib.IO_IN, io_callback) - return fd - - def remove_watch_file(self, handle): - """ - Remove an input file. - - Returns True if the input file exists, False otherwise - """ - if handle in self._watch_files: - self.GLib.source_remove(self._watch_files[handle]) - del self._watch_files[handle] - return True - return False - - def enter_idle(self, callback): - """ - Add a callback for entering idle. - - Returns a handle that may be passed to remove_enter_idle() - """ - self._idle_handle += 1 - self._idle_callbacks[self._idle_handle] = callback - return self._idle_handle - - def _enable_glib_idle(self): - if self._glib_idle_enabled: - return - self.GLib.idle_add(self._glib_idle_callback) - self._glib_idle_enabled = True - - def _glib_idle_callback(self): - for callback in self._idle_callbacks.values(): - callback() - self._glib_idle_enabled = False - return False # ask glib not to call again (or we would be called - - def remove_enter_idle(self, handle): - """ - Remove an idle callback. - - Returns True if the handle was removed. - """ - try: - del self._idle_callbacks[handle] - except KeyError: - return False - return True - - def run(self): - """ - Start the event loop. Exit the loop when any callback raises - an exception. If ExitMainLoop is raised, exit cleanly. - """ - try: - self._loop.run() - finally: - if self._loop.is_running(): - self._loop.quit() - if self._exc_info: - # An exception caused us to exit, raise it now - exc_info = self._exc_info - self._exc_info = None - reraise(*exc_info) - - def handle_exit(self, f): - """ - Decorator that cleanly exits the :class:`GLibEventLoop` if - :exc:`ExitMainLoop` is thrown inside of the wrapped function. Store the - exception info if some other exception occurs, it will be reraised after - the loop quits. - - *f* -- function to be wrapped - """ - def wrapper(*args, **kargs): - try: - return f(*args, **kargs) - except ExitMainLoop: - self._loop.quit() - except: - self._exc_info = sys.exc_info() - if self._loop.is_running(): - self._loop.quit() - return False - return wrapper - - -class TornadoEventLoop(EventLoop): - """ This is an Urwid-specific event loop to plug into its MainLoop. - It acts as an adaptor for Tornado's IOLoop which does all - heavy lifting except idle-callbacks. - - Notice, since Tornado has no concept of idle callbacks we - monkey patch ioloop._impl.poll() function to be able to detect - potential idle periods. - """ - _ioloop_registry = WeakKeyDictionary() # {<ioloop> : {<handle> : <idle_func>}} - _max_idle_handle = 0 - - class PollProxy: - """ A simple proxy for a Python's poll object that wraps the .poll() method - in order to detect idle periods and call Urwid callbacks - """ - def __init__(self, poll_obj, idle_map): - self.__poll_obj = poll_obj - self.__idle_map = idle_map - self._idle_done = False - self._prev_timeout = 0 - - def __getattr__(self, name: str): - return getattr(self.__poll_obj, name) - - def poll(self, timeout): - if timeout > self._prev_timeout: - # if timeout increased we assume a timer event was handled - self._idle_done = False - self._prev_timeout = timeout - start = time.time() - - # any IO pending wins - events = self.__poll_obj.poll(0) - if events: - self._idle_done = False - return events - - # our chance to enter idle - if not self._idle_done: - for callback in self.__idle_map.values(): - callback() - self._idle_done = True - - # then complete the actual request (adjusting timeout) - timeout = max(0, min(timeout, timeout + start - time.time())) - events = self.__poll_obj.poll(timeout) - if events: - self._idle_done = False - return events - - @classmethod - def _patch_poll_impl(cls, ioloop): - """ Wraps original poll object in the IOLoop's poll object - """ - if ioloop in cls._ioloop_registry: - return # we already patched this instance - - cls._ioloop_registry[ioloop] = idle_map = {} - # TODO: Add support for Tornado>=5.0.0 - ioloop._impl = cls.PollProxy(ioloop._impl, idle_map) - - def __init__(self, ioloop=None): - if not ioloop: - from tornado.ioloop import IOLoop - ioloop = IOLoop.instance() - self._ioloop = ioloop - self._patch_poll_impl(ioloop) - self._pending_alarms = {} - self._watch_handles = {} # {<watch_handle> : <file_descriptor>} - self._max_watch_handle = 0 - self._exception = None - - def alarm(self, secs, callback): - ioloop = self._ioloop - def wrapped(): - try: - del self._pending_alarms[handle] - except KeyError: - pass - self.handle_exit(callback)() - handle = ioloop.add_timeout(ioloop.time() + secs, wrapped) - self._pending_alarms[handle] = 1 - return handle - - def remove_alarm(self, handle): - self._ioloop.remove_timeout(handle) - try: - del self._pending_alarms[handle] - except KeyError: - return False - else: - return True - - def watch_file(self, fd, callback): - from tornado.ioloop import IOLoop - handler = lambda fd,events: self.handle_exit(callback)() - self._ioloop.add_handler(fd, handler, IOLoop.READ) - self._max_watch_handle += 1 - handle = self._max_watch_handle - self._watch_handles[handle] = fd - return handle - - def remove_watch_file(self, handle): - fd = self._watch_handles.pop(handle, None) - if fd is None: - return False - else: - self._ioloop.remove_handler(fd) - return True - - def enter_idle(self, callback): - self._max_idle_handle += 1 - handle = self._max_idle_handle - idle_map = self._ioloop_registry[self._ioloop] - idle_map[handle] = callback - return handle - - def remove_enter_idle(self, handle): - idle_map = self._ioloop_registry[self._ioloop] - cb = idle_map.pop(handle, None) - return cb is not None - - def handle_exit(self, func): - @wraps(func) - def wrapper(*args, **kw): - try: - return func(*args, **kw) - except ExitMainLoop: - self._ioloop.stop() - except Exception as exc: - self._exception = exc - self._ioloop.stop() - return False - return wrapper - - def run(self): - self._ioloop.start() - if self._exception: - exc, self._exception = self._exception, None - raise exc - - -try: - from twisted.internet.abstract import FileDescriptor -except ImportError: - FileDescriptor = object - - -class TwistedInputDescriptor(FileDescriptor): - def __init__(self, reactor, fd, cb): - self._fileno = fd - self.cb = cb - super().__init__(reactor) - - def fileno(self): - return self._fileno - - def doRead(self): - return self.cb() - - -class TwistedEventLoop(EventLoop): - """ - Event loop based on Twisted_ - """ - _idle_emulation_delay = 1.0/256 # a short time (in seconds) - - def __init__(self, reactor=None, manage_reactor: bool = True): - """ - :param reactor: reactor to use - :type reactor: :class:`twisted.internet.reactor`. - :param: manage_reactor: `True` if you want this event loop to run - and stop the reactor. - :type manage_reactor: boolean - - .. WARNING:: - Twisted's reactor doesn't like to be stopped and run again. If you - need to stop and run your :class:`MainLoop`, consider setting - ``manage_reactor=False`` and take care of running/stopping the reactor - at the beginning/ending of your program yourself. - - You can also forego using :class:`MainLoop`'s run() entirely, and - instead call start() and stop() before and after starting the - reactor. - - .. _Twisted: https://twisted.org/ - """ - if reactor is None: - import twisted.internet.reactor - reactor = twisted.internet.reactor - self.reactor = reactor - self._alarms = [] - self._watch_files = {} - self._idle_handle = 0 - self._twisted_idle_enabled = False - self._idle_callbacks = {} - self._exc_info = None - self.manage_reactor = manage_reactor - self._enable_twisted_idle() - - def alarm(self, seconds, callback): - """ - Call callback() a given time from now. No parameters are - passed to callback. - - Returns a handle that may be passed to remove_alarm() - - seconds -- floating point time to wait before calling callback - callback -- function to call from event loop - """ - handle = self.reactor.callLater(seconds, self.handle_exit(callback)) - return handle - - def remove_alarm(self, handle): - """ - Remove an alarm. - - Returns True if the alarm exists, False otherwise - """ - from twisted.internet.error import AlreadyCalled, AlreadyCancelled - try: - handle.cancel() - return True - except AlreadyCancelled: - return False - except AlreadyCalled: - return False - - def watch_file(self, fd, callback): - """ - Call callback() when fd has some data to read. No parameters - are passed to callback. - - Returns a handle that may be passed to remove_watch_file() - - fd -- file descriptor to watch for input - callback -- function to call when input is available - """ - ind = TwistedInputDescriptor(self.reactor, fd, self.handle_exit(callback)) - self._watch_files[fd] = ind - self.reactor.addReader(ind) - return fd - - def remove_watch_file(self, handle): - """ - Remove an input file. - - Returns True if the input file exists, False otherwise - """ - if handle in self._watch_files: - self.reactor.removeReader(self._watch_files[handle]) - del self._watch_files[handle] - return True - return False - - def enter_idle(self, callback): - """ - Add a callback for entering idle. - - Returns a handle that may be passed to remove_enter_idle() - """ - self._idle_handle += 1 - self._idle_callbacks[self._idle_handle] = callback - return self._idle_handle - - def _enable_twisted_idle(self): - """ - Twisted's reactors don't have an idle or enter-idle callback - so the best we can do for now is to set a timer event in a very - short time to approximate an enter-idle callback. - - .. WARNING:: - This will perform worse than the other event loops until we can find a - fix or workaround - """ - if self._twisted_idle_enabled: - return - self.reactor.callLater(self._idle_emulation_delay, - self.handle_exit(self._twisted_idle_callback, enable_idle=False)) - self._twisted_idle_enabled = True - - def _twisted_idle_callback(self): - for callback in self._idle_callbacks.values(): - callback() - self._twisted_idle_enabled = False - - def remove_enter_idle(self, handle): - """ - Remove an idle callback. - - Returns True if the handle was removed. - """ - try: - del self._idle_callbacks[handle] - except KeyError: - return False - return True - - def run(self): - """ - Start the event loop. Exit the loop when any callback raises - an exception. If ExitMainLoop is raised, exit cleanly. - """ - if not self.manage_reactor: - return - self.reactor.run() - if self._exc_info: - # An exception caused us to exit, raise it now - exc_info = self._exc_info - self._exc_info = None - reraise(*exc_info) - - def handle_exit(self, f, enable_idle: bool = True): - """ - Decorator that cleanly exits the :class:`TwistedEventLoop` if - :class:`ExitMainLoop` is thrown inside of the wrapped function. Store the - exception info if some other exception occurs, it will be reraised after - the loop quits. - - *f* -- function to be wrapped - """ - def wrapper(*args,**kargs): - rval = None - try: - rval = f(*args,**kargs) - except ExitMainLoop: - if self.manage_reactor: - self.reactor.stop() - except: - print(sys.exc_info()) - self._exc_info = sys.exc_info() - if self.manage_reactor: - self.reactor.crash() - if enable_idle: - self._enable_twisted_idle() - return rval - return wrapper - - -class AsyncioEventLoop(EventLoop): - """ - Event loop based on the standard library ``asyncio`` module. - - ``asyncio`` is new in Python 3.4, but also exists as a backport on PyPI for - Python 3.3. The ``trollius`` package is available for older Pythons with - slightly different syntax, but also works with this loop. - """ - _we_started_event_loop = False - - _idle_emulation_delay = 1.0/30 # a short time (in seconds) - - def __init__(self, **kwargs): - if 'loop' in kwargs: - self._loop = kwargs.pop('loop') - else: - import asyncio - self._loop = asyncio.get_event_loop() - - def alarm(self, seconds, callback): - """ - Call callback() a given time from now. No parameters are - passed to callback. - - Returns a handle that may be passed to remove_alarm() - - seconds -- time in seconds to wait before calling callback - callback -- function to call from event loop - """ - return self._loop.call_later(seconds, callback) - - def remove_alarm(self, handle): - """ - Remove an alarm. - - Returns True if the alarm exists, False otherwise - """ - cancelled = ( - handle.cancelled() - if getattr(handle, 'cancelled', None) - else handle._cancelled - ) - existed = not cancelled - handle.cancel() - return existed - - def watch_file(self, fd, callback): - """ - Call callback() when fd has some data to read. No parameters - are passed to callback. - - Returns a handle that may be passed to remove_watch_file() - - fd -- file descriptor to watch for input - callback -- function to call when input is available - """ - self._loop.add_reader(fd, callback) - return fd - - def remove_watch_file(self, handle): - """ - Remove an input file. - - Returns True if the input file exists, False otherwise - """ - return self._loop.remove_reader(handle) - - def enter_idle(self, callback): - """ - Add a callback for entering idle. - - Returns a handle that may be passed to remove_idle() - """ - # XXX there's no such thing as "idle" in most event loops; this fakes - # it the same way as Twisted, by scheduling the callback to be called - # repeatedly - mutable_handle = [None] - def faux_idle_callback(): - callback() - mutable_handle[0] = self._loop.call_later( - self._idle_emulation_delay, faux_idle_callback) - - mutable_handle[0] = self._loop.call_later( - self._idle_emulation_delay, faux_idle_callback) - - return mutable_handle - - def remove_enter_idle(self, handle): - """ - Remove an idle callback. - - Returns True if the handle was removed. - """ - # `handle` is just a list containing the current actual handle - return self.remove_alarm(handle[0]) - - _exc_info = None - - def _exception_handler(self, loop, context): - exc = context.get('exception') - if exc: - loop.stop() - if not isinstance(exc, ExitMainLoop): - # Store the exc_info so we can re-raise after the loop stops - self._exc_info = (type(exc), exc, exc.__traceback__) - else: - loop.default_exception_handler(context) - - def run(self): - """ - Start the event loop. Exit the loop when any callback raises - an exception. If ExitMainLoop is raised, exit cleanly. - """ - self._loop.set_exception_handler(self._exception_handler) - self._loop.run_forever() - if self._exc_info: - exc_info = self._exc_info - self._exc_info = None - reraise(*exc_info) - - -# Import Trio's event loop only if we are on Python 3.5 or above (async def is -# not supported in earlier versions). -from ._async_kw_event_loop import TrioEventLoop - - -def _refl(name: str, rval=None, exit=False): - """ - This function is used to test the main loop classes. - - >>> scr = _refl("screen") - >>> scr.function("argument") - screen.function('argument') - >>> scr.callme(when="now") - screen.callme(when='now') - >>> scr.want_something_rval = 42 - >>> x = scr.want_something() - screen.want_something() - >>> x - 42 - - """ - class Reflect: - def __init__(self, name: int, rval=None): - self._name = name - self._rval = rval - - def __call__(self, *argl, **argd): - args = ", ".join([repr(a) for a in argl]) - if args and argd: - args = f"{args}, " - args = args + ", ".join([f"{k}={repr(v)}" for k,v in argd.items()]) - print(f"{self._name}({args})") - if exit: - raise ExitMainLoop() - return self._rval - - def __getattr__(self, attr): - if attr.endswith("_rval"): - raise AttributeError() - #print(self._name+"."+attr) - if hasattr(self, f"{attr}_rval"): - return Reflect(f"{self._name}.{attr}", getattr(self, f"{attr}_rval")) - return Reflect(f"{self._name}.{attr}") - return Reflect(name) - - -def _test(): - import doctest - doctest.testmod() - - -if __name__=='__main__': - _test() diff --git a/urwid/tests/test_doctests.py b/urwid/tests/test_doctests.py index cb0526e..48a0188 100644 --- a/urwid/tests/test_doctests.py +++ b/urwid/tests/test_doctests.py @@ -13,11 +13,11 @@ def load_tests(loader, tests, ignore): urwid.wimp, urwid.decoration, urwid.display_common, - urwid.main_loop, + urwid.event_loop.main_loop, urwid.numedit, urwid.monitored_list, urwid.raw_display, - 'urwid.split_repr', # override function with same name + 'urwid.split_repr', # override function with same name urwid.util, urwid.signals, urwid.graphics, diff --git a/urwid/tests/test_event_loops.py b/urwid/tests/test_event_loops.py index 77dbae1..fac7f72 100644 --- a/urwid/tests/test_event_loops.py +++ b/urwid/tests/test_event_loops.py @@ -114,6 +114,8 @@ else: from tornado.ioloop import IOLoop self.evl = urwid.TornadoEventLoop(IOLoop()) + _expected_idle_handle = None + try: import twisted diff --git a/urwid/util.py b/urwid/util.py index 7fe2b6c..fa2f907 100644 --- a/urwid/util.py +++ b/urwid/util.py @@ -431,7 +431,7 @@ def _tagmarkup_recurse(tm, attr): return [tm], [(attr, len(tm))] -def is_mouse_event(ev: str) -> bool: +def is_mouse_event(ev: tuple[str, int, int, int] | typing.Any) -> bool: return isinstance(ev, tuple) and len(ev) == 4 and "mouse" in ev[0] |