summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonas Wielicki <j.wielicki@sotecware.net>2014-08-04 16:34:18 +0200
committerJonas Wielicki <j.wielicki@sotecware.net>2014-08-04 16:34:18 +0200
commit25fcd8d36d496c0ab5b36737c9c621da50b21e44 (patch)
treef17aaa282ee280c55f4e506134ead64d3319e77b
parent18b31686c3293fb9e8888836420b7bf273d5f34b (diff)
parent50c98dd5ea4d3feade969489461911f5d00ebd09 (diff)
downloadurwid-25fcd8d36d496c0ab5b36737c9c621da50b21e44.tar.gz
Merge pull request #67 from eevee/loop-cleanup
Make loop wrapping simpler, and make Screen easier to plug into async loops
-rw-r--r--examples/asyncio_socket_server.py186
-rwxr-xr-xexamples/calc.py2
-rwxr-xr-xexamples/dialog.py2
-rw-r--r--examples/twisted_serve_ssh.py23
-rw-r--r--urwid/__init__.py2
-rwxr-xr-xurwid/container.py20
-rwxr-xr-xurwid/curses_display.py26
-rwxr-xr-xurwid/display_common.py38
-rw-r--r--urwid/escape.py8
-rwxr-xr-xurwid/font.py2
-rwxr-xr-xurwid/html_fragment.py10
-rw-r--r--urwid/lcd_display.py9
-rwxr-xr-xurwid/main_loop.py274
-rwxr-xr-xurwid/monitored_list.py4
-rwxr-xr-xurwid/old_str_util.py7
-rw-r--r--urwid/raw_display.py253
-rw-r--r--urwid/signals.py4
-rw-r--r--urwid/tests/test_event_loops.py18
-rw-r--r--urwid/util.py32
-rw-r--r--urwid/vterm.py4
-rwxr-xr-xurwid/web_display.py17
21 files changed, 678 insertions, 263 deletions
diff --git a/examples/asyncio_socket_server.py b/examples/asyncio_socket_server.py
new file mode 100644
index 0000000..87592d3
--- /dev/null
+++ b/examples/asyncio_socket_server.py
@@ -0,0 +1,186 @@
+"""Demo of using urwid with Python 3.4's asyncio.
+
+This code works on older Python 3.x if you install `asyncio` from PyPI, and
+even Python 2 if you install `trollius`!
+"""
+from __future__ import print_function
+
+import asyncio
+from datetime import datetime
+import sys
+import weakref
+
+import urwid
+from urwid.raw_display import Screen
+
+
+loop = asyncio.get_event_loop()
+
+
+# -----------------------------------------------------------------------------
+# General-purpose setup code
+
+def build_widgets():
+ input1 = urwid.Edit('What is your name? ')
+ input2 = urwid.Edit('What is your quest? ')
+ input3 = urwid.Edit('What is the capital of Assyria? ')
+ inputs = [input1, input2, input3]
+
+ def update_clock(widget_ref):
+ widget = widget_ref()
+ if not widget:
+ # widget is dead; the main loop must've been destroyed
+ return
+
+ widget.set_text(datetime.now().isoformat())
+
+ # Schedule us to update the clock again in one second
+ loop.call_later(1, update_clock, widget_ref)
+
+ clock = urwid.Text('')
+ update_clock(weakref.ref(clock))
+
+ return urwid.Filler(urwid.Pile([clock] + inputs), 'top')
+
+
+def unhandled(key):
+ if key == 'ctrl c':
+ raise urwid.ExitMainLoop
+
+
+# -----------------------------------------------------------------------------
+# Demo 1
+
+def demo1():
+ """Plain old urwid app. Just happens to be run atop asyncio as the event
+ loop.
+
+ Note that the clock is updated using the asyncio loop directly, not via any
+ of urwid's facilities.
+ """
+ main_widget = build_widgets()
+
+ urwid_loop = urwid.MainLoop(
+ main_widget,
+ event_loop=urwid.AsyncioEventLoop(loop=loop),
+ unhandled_input=unhandled,
+ )
+ urwid_loop.run()
+
+
+# -----------------------------------------------------------------------------
+# Demo 2
+
+class AsyncScreen(Screen):
+ """An urwid screen that speaks to an asyncio stream, rather than polling
+ file descriptors.
+ """
+ def __init__(self, reader, writer):
+ self.reader = reader
+ self.writer = writer
+
+ Screen.__init__(self, None, None)
+
+ _pending_task = None
+
+ def write(self, data):
+ self.writer.write(data)
+
+ def flush(self):
+ pass
+
+ def hook_event_loop(self, event_loop, callback):
+ # Wait on the reader's read coro, and when there's data to read, call
+ # the callback and then wait again
+ def pump_reader(fut=None):
+ if fut is None:
+ # First call, do nothing
+ pass
+ elif fut.cancelled():
+ # This is in response to an earlier .read() call, so don't
+ # schedule another one!
+ return
+ elif fut.exception():
+ pass
+ else:
+ try:
+ self.parse_input(
+ event_loop, callback, bytearray(fut.result()))
+ except urwid.ExitMainLoop:
+ # This will immediately close the transport and thus the
+ # connection, which in turn calls connection_lost, which
+ # stops the screen and the loop
+ self.writer.abort()
+
+ # asyncio.async() schedules a coroutine without using `yield from`,
+ # which would make this code not work on Python 2
+ self._pending_task = asyncio.async(
+ self.reader.read(1024), loop=event_loop._loop)
+ self._pending_task.add_done_callback(pump_reader)
+
+ pump_reader()
+
+ def unhook_event_loop(self, event_loop):
+ if self._pending_task:
+ self._pending_task.cancel()
+ del self._pending_task
+
+
+class UrwidProtocol(asyncio.Protocol):
+ def connection_made(self, transport):
+ print("Got a client!")
+ self.transport = transport
+
+ # StreamReader is super convenient here; it has a regular method on our
+ # end (feed_data), and a coroutine on the other end that will
+ # faux-block until there's data to be read. We could also just call a
+ # method directly on the screen, but this keeps the screen somewhat
+ # separate from the protocol.
+ self.reader = asyncio.StreamReader(loop=loop)
+ screen = AsyncScreen(self.reader, transport)
+
+ main_widget = build_widgets()
+ self.urwid_loop = urwid.MainLoop(
+ main_widget,
+ event_loop=urwid.AsyncioEventLoop(loop=loop),
+ screen=screen,
+ unhandled_input=unhandled,
+ )
+
+ self.urwid_loop.start()
+
+ def data_received(self, data):
+ self.reader.feed_data(data)
+
+ def connection_lost(self, exc):
+ print("Lost a client...")
+ self.reader.feed_eof()
+ self.urwid_loop.stop()
+
+
+def demo2():
+ """Urwid app served over the network to multiple clients at once, using an
+ asyncio Protocol.
+ """
+ coro = loop.create_server(UrwidProtocol, port=12345)
+ loop.run_until_complete(coro)
+ print("OK, good to go! Try this in another terminal (or two):")
+ print()
+ print(" socat TCP:127.0.0.1:12345 STDIN,raw")
+ print()
+ loop.run_forever()
+
+
+if __name__ == '__main__':
+ if len(sys.argv) == 2:
+ which = sys.argv[1]
+ else:
+ which = None
+
+ if which == '1':
+ demo1()
+ elif which == '2':
+ demo2()
+ else:
+ print("Please run me with an argument of either 1 or 2.")
+ sys.exit(1)
diff --git a/examples/calc.py b/examples/calc.py
index ec1f039..2ac324a 100755
--- a/examples/calc.py
+++ b/examples/calc.py
@@ -311,7 +311,7 @@ class CellColumn( urwid.WidgetWrap ):
if sub != 0:
# f is not an edit widget
return key
- if OPERATORS.has_key(key):
+ if key in OPERATORS:
# move trailing text to new cell below
edit = self.walker.get_cell(i).edit
cursor_pos = edit.edit_pos
diff --git a/examples/dialog.py b/examples/dialog.py
index dbdeb28..7f3a4d5 100755
--- a/examples/dialog.py
+++ b/examples/dialog.py
@@ -321,7 +321,7 @@ status may be either on or off.
def main():
- if len(sys.argv) < 2 or not MODES.has_key(sys.argv[1]):
+ if len(sys.argv) < 2 or sys.argv[1] not in MODES:
show_usage()
return
diff --git a/examples/twisted_serve_ssh.py b/examples/twisted_serve_ssh.py
index 36396dd..aad63f9 100644
--- a/examples/twisted_serve_ssh.py
+++ b/examples/twisted_serve_ssh.py
@@ -34,6 +34,7 @@ Licence: LGPL <http://opensource.org/licenses/lgpl-2.1.php>
import os
import urwid
+from urwid.raw_display import Screen
from zope.interface import Interface, Attribute, implements
from twisted.application.service import Application
@@ -159,7 +160,7 @@ class UrwidMind(Adapter):
-class TwistedScreen(urwid.BaseScreen):
+class TwistedScreen(Screen):
"""A Urwid screen which knows about the Twisted terminal protocol that is
driving it.
@@ -180,7 +181,7 @@ class TwistedScreen(urwid.BaseScreen):
# We will need these later
self.terminalProtocol = terminalProtocol
self.terminal = terminalProtocol.terminal
- urwid.BaseScreen.__init__(self)
+ Screen.__init__(self)
self.colors = 16
self._pal_escape = {}
self.bright_is_bold = True
@@ -235,13 +236,22 @@ class TwistedScreen(urwid.BaseScreen):
# twisted handles polling, so we don't need the loop to do it, we just
# push what we get to the loop from dataReceived.
- def get_input_descriptors(self):
- return []
+ def hook_event_loop(self, event_loop, callback):
+ self._urwid_callback = callback
+ self._evl = event_loop
+
+ def unhook_event_loop(self, event_loop):
+ pass
# Do nothing here either. Not entirely sure when it gets called.
def get_input(self, raw_keys=False):
return
+ def get_available_raw_input(self):
+ data = self._data
+ self._data = []
+ return data
+
# Twisted driven
def push(self, data):
"""Receive data from Twisted and push it into the urwid main loop.
@@ -254,9 +264,8 @@ class TwistedScreen(urwid.BaseScreen):
3. Pass the calculated keys as a list to the Urwid main loop.
4. Redraw the screen
"""
- keys = self.loop.input_filter(data, [])
- keys, remainder = urwid.escape.process_keyqueue(map(ord, keys), True)
- self.loop.process_input(keys)
+ self._data = list(map(ord, data))
+ self.parse_input(self._evl, self._urwid_callback)
self.loop.draw_screen()
# Convenience
diff --git a/urwid/__init__.py b/urwid/__init__.py
index a6cacf8..bc5170e 100644
--- a/urwid/__init__.py
+++ b/urwid/__init__.py
@@ -53,7 +53,7 @@ from urwid.command_map import (CommandMap, command_map,
CURSOR_PAGE_UP, CURSOR_PAGE_DOWN, CURSOR_MAX_LEFT, CURSOR_MAX_RIGHT,
ACTIVATE)
from urwid.main_loop import (ExitMainLoop, MainLoop, SelectEventLoop,
- GLibEventLoop, TornadoEventLoop)
+ GLibEventLoop, TornadoEventLoop, AsyncioEventLoop)
try:
from urwid.main_loop import TwistedEventLoop
except ImportError:
diff --git a/urwid/container.py b/urwid/container.py
index 24dddf9..8b61383 100755
--- a/urwid/container.py
+++ b/urwid/container.py
@@ -284,7 +284,7 @@ class GridFlow(WidgetWrap, WidgetContainerMixin, WidgetContainerListContentsMixi
empty.
"""
if not self.contents:
- raise IndexError, "No focus_position, GridFlow is empty"
+ raise IndexError("No focus_position, GridFlow is empty")
return self.contents.focus
def _set_focus_position(self, position):
"""
@@ -296,7 +296,7 @@ class GridFlow(WidgetWrap, WidgetContainerMixin, WidgetContainerListContentsMixi
if position < 0 or position >= len(self.contents):
raise IndexError
except (TypeError, IndexError):
- raise IndexError, "No GridFlow child widget at position %s" % (position,)
+ raise IndexError("No GridFlow child widget at position %s" % (position,))
self.contents.focus = position
focus_position = property(_get_focus_position, _set_focus_position, doc="""
index of child widget in focus. Raises :exc:`IndexError` if read when
@@ -607,7 +607,7 @@ class Overlay(Widget, WidgetContainerMixin, WidgetContainerListContentsMixin):
position -- index of child widget to be made focus
"""
if position != 1:
- raise IndexError, ("Overlay widget focus_position currently "
+ raise IndexError("Overlay widget focus_position currently "
"must always be set to 1, not %s" % (position,))
focus_position = property(_get_focus_position, _set_focus_position,
doc="index of child widget in focus, currently always 1")
@@ -871,10 +871,10 @@ class Frame(Widget, WidgetContainerMixin):
:type part: str
"""
if part not in ('header', 'footer', 'body'):
- raise IndexError, 'Invalid position for Frame: %s' % (part,)
+ raise IndexError('Invalid position for Frame: %s' % (part,))
if (part == 'header' and self._header is None) or (
part == 'footer' and self._footer is None):
- raise IndexError, 'This Frame has no %s' % (part,)
+ raise IndexError('This Frame has no %s' % (part,))
self.focus_part = part
self._invalidate()
@@ -1407,7 +1407,7 @@ class Pile(Widget, WidgetContainerMixin, WidgetContainerListContentsMixin):
empty.
"""
if not self.contents:
- raise IndexError, "No focus_position, Pile is empty"
+ raise IndexError("No focus_position, Pile is empty")
return self.contents.focus
def _set_focus_position(self, position):
"""
@@ -1419,7 +1419,7 @@ class Pile(Widget, WidgetContainerMixin, WidgetContainerListContentsMixin):
if position < 0 or position >= len(self.contents):
raise IndexError
except (TypeError, IndexError):
- raise IndexError, "No Pile child widget at position %s" % (position,)
+ raise IndexError("No Pile child widget at position %s" % (position,))
self.contents.focus = position
focus_position = property(_get_focus_position, _set_focus_position, doc="""
index of child widget in focus. Raises :exc:`IndexError` if read when
@@ -1488,7 +1488,7 @@ class Pile(Widget, WidgetContainerMixin, WidgetContainerListContentsMixin):
l.append(0) # zero-weighted items treated as ('given', 0)
if wtotal == 0:
- raise PileError, "No weighted widgets found for Pile treated as a box widget"
+ raise PileError("No weighted widgets found for Pile treated as a box widget")
if remaining < 0:
remaining = 0
@@ -1957,7 +1957,7 @@ class Columns(Widget, WidgetContainerMixin, WidgetContainerListContentsMixin):
empty.
"""
if not self.widget_list:
- raise IndexError, "No focus_position, Columns is empty"
+ raise IndexError("No focus_position, Columns is empty")
return self.contents.focus
def _set_focus_position(self, position):
"""
@@ -1969,7 +1969,7 @@ class Columns(Widget, WidgetContainerMixin, WidgetContainerListContentsMixin):
if position < 0 or position >= len(self.contents):
raise IndexError
except (TypeError, IndexError):
- raise IndexError, "No Columns child widget at position %s" % (position,)
+ raise IndexError("No Columns child widget at position %s" % (position,))
self.contents.focus = position
focus_position = property(_get_focus_position, _set_focus_position, doc="""
index of child widget in focus. Raises :exc:`IndexError` if read when
diff --git a/urwid/curses_display.py b/urwid/curses_display.py
index 758d621..441042e 100755
--- a/urwid/curses_display.py
+++ b/urwid/curses_display.py
@@ -102,12 +102,10 @@ class Screen(BaseScreen, RealTerminal):
self._mouse_tracking_enabled = enable
- def start(self):
+ def _start(self):
"""
Initialize the screen and input mode.
"""
- assert self._started == False
-
self.s = curses.initscr()
self.has_color = curses.has_colors()
if self.has_color:
@@ -130,15 +128,12 @@ class Screen(BaseScreen, RealTerminal):
if not self._signal_keys_set:
self._old_signal_keys = self.tty_signal_keys()
- super(Screen, self).start()
-
+ super(Screen, self)._start()
- def stop(self):
+ def _stop(self):
"""
Restore the screen.
"""
- if self._started == False:
- return
curses.echo()
self._curs_set(1)
try:
@@ -149,21 +144,8 @@ class Screen(BaseScreen, RealTerminal):
if self._old_signal_keys:
self.tty_signal_keys(*self._old_signal_keys)
- super(Screen, self).stop()
-
+ super(Screen, self)._stop()
- def run_wrapper(self,fn):
- """Call fn in fullscreen mode. Return to normal on exit.
-
- This function should be called to wrap your main program loop.
- Exception tracebacks will be displayed in normal mode.
- """
-
- try:
- self.start()
- return fn()
- finally:
- self.stop()
def _setup_colour_pairs(self):
"""
diff --git a/urwid/display_common.py b/urwid/display_common.py
index ca71e0b..c3b9c5d 100755
--- a/urwid/display_common.py
+++ b/urwid/display_common.py
@@ -26,7 +26,7 @@ try:
except ImportError:
pass # windows
-from urwid.util import int_scale
+from urwid.util import StoppingContext, int_scale
from urwid import signals
from urwid.compat import B, bytes3
@@ -718,12 +718,44 @@ class BaseScreen(object):
started = property(lambda self: self._started)
- def start(self):
+ def start(self, *args, **kwargs):
+ """Set up the screen. If the screen has already been started, does
+ nothing.
+
+ May be used as a context manager, in which case :meth:`stop` will
+ automatically be called at the end of the block:
+
+ with screen.start():
+ ...
+
+ You shouldn't override this method in a subclass; instead, override
+ :meth:`_start`.
+ """
+ if not self._started:
+ self._start(*args, **kwargs)
self._started = True
+ return StoppingContext(self)
+
+ def _start(self):
+ pass
def stop(self):
+ if self._started:
+ self._stop()
self._started = False
+ def _stop(self):
+ pass
+
+ def run_wrapper(self, fn, *args, **kwargs):
+ """Start the screen, call a function, then stop the screen. Extra
+ arguments are passed to `start`.
+
+ Deprecated in favor of calling `start` as a context manager.
+ """
+ with self.start(*args, **kwargs):
+ return fn()
+
def register_palette(self, palette):
"""Register a set of palette entries.
@@ -750,7 +782,7 @@ class BaseScreen(object):
raise ScreenError("Invalid register_palette entry: %s" %
repr(item))
name, like_name = item
- if not self._palette.has_key(like_name):
+ if like_name not in self._palette:
raise ScreenError("palette entry '%s' doesn't exist"%like_name)
self._palette[name] = self._palette[like_name]
diff --git a/urwid/escape.py b/urwid/escape.py
index 077e38e..12501b8 100644
--- a/urwid/escape.py
+++ b/urwid/escape.py
@@ -102,7 +102,7 @@ input_sequences = [
] + [
# modified cursor keys + home, end, 5 -- [#X and [1;#X forms
(prefix+digit+letter, escape_modifier(digit) + key)
- for prefix in "[","[1;"
+ for prefix in ("[", "[1;")
for digit in "12345678"
for letter,key in zip("ABCDEFGH",
('up','down','right','left','5','end','5','home'))
@@ -138,7 +138,7 @@ class KeyqueueTrie(object):
assert type(root) == dict, "trie conflict detected"
assert len(s) > 0, "trie conflict detected"
- if root.has_key(ord(s[0])):
+ if ord(s[0]) in root:
return self.add(root[ord(s[0])], s[1:], result)
if len(s)>1:
d = {}
@@ -163,7 +163,7 @@ class KeyqueueTrie(object):
if more_available:
raise MoreInputRequired()
return None
- if not root.has_key(keys[0]):
+ if keys[0] not in root:
return None
return self.get_recurse(root[keys[0]], keys[1:], more_available)
@@ -318,7 +318,7 @@ def process_keyqueue(codes, more_available):
if code >= 32 and code <= 126:
key = chr(code)
return [key], codes[1:]
- if _keyconv.has_key(code):
+ if code in _keyconv:
return [_keyconv[code]], codes[1:]
if code >0 and code <27:
return ["ctrl %s" % chr(ord('a')+code-1)], codes[1:]
diff --git a/urwid/font.py b/urwid/font.py
index 0a3261b..bf0c2b1 100755
--- a/urwid/font.py
+++ b/urwid/font.py
@@ -111,7 +111,7 @@ class Font(object):
return "".join(l)
def char_width(self, c):
- if self.char.has_key(c):
+ if c in self.char:
return self.char[c][0]
return 0
diff --git a/urwid/html_fragment.py b/urwid/html_fragment.py
index 159bc81..380d1d3 100755
--- a/urwid/html_fragment.py
+++ b/urwid/html_fragment.py
@@ -70,22 +70,12 @@ class HtmlGenerator(BaseScreen):
"""Not yet implemented"""
pass
- def start(self):
- pass
-
- def stop(self):
- pass
-
def set_input_timeouts(self, *args):
pass
def reset_default_terminal_palette(self, *args):
pass
- def run_wrapper(self,fn):
- """Call fn."""
- return fn()
-
def draw_screen(self, (cols, rows), r ):
"""Create an html fragment from the render object.
Append it to HtmlGenerator.fragments list.
diff --git a/urwid/lcd_display.py b/urwid/lcd_display.py
index ca6336f..4f62173 100644
--- a/urwid/lcd_display.py
+++ b/urwid/lcd_display.py
@@ -33,21 +33,12 @@ class LCDScreen(BaseScreen):
def set_mouse_tracking(self, enable=True):
pass
- def start(self):
- pass
-
- def stop(self):
- pass
-
def set_input_timeouts(self, *args):
pass
def reset_default_terminal_palette(self, *args):
pass
- def run_wrapper(self,fn):
- return fn()
-
def draw_screen(self, (cols, rows), r ):
pass
diff --git a/urwid/main_loop.py b/urwid/main_loop.py
index 0a93b5e..77022bf 100755
--- a/urwid/main_loop.py
+++ b/urwid/main_loop.py
@@ -34,7 +34,7 @@ try:
except ImportError:
pass # windows
-from urwid.util import is_mouse_event
+from urwid.util import StoppingContext, is_mouse_event
from urwid.compat import PYTHON3
from urwid.command_map import command_map, REDRAW_SCREEN
from urwid.wimp import PopUpTarget
@@ -50,6 +50,9 @@ class ExitMainLoop(Exception):
"""
pass
+class CantUseExternalLoop(Exception):
+ pass
+
class MainLoop(object):
"""
This is the standard main loop implementation for a single interactive
@@ -116,7 +119,7 @@ class MainLoop(object):
self._unhandled_input = unhandled_input
self._input_filter = input_filter
- if not hasattr(screen, 'get_input_descriptors'
+ if not hasattr(screen, 'hook_event_loop'
) and event_loop is not None:
raise NotImplementedError("screen object passed "
"%r does not support external event loops" % (screen,))
@@ -124,7 +127,6 @@ class MainLoop(object):
event_loop = SelectEventLoop()
self.event_loop = event_loop
- self._input_timeout = None
self._watch_pipes = {}
def _set_widget(self, widget):
@@ -268,14 +270,12 @@ class MainLoop(object):
Start the main loop handling input events and updating the screen. The
loop will continue until an :exc:`ExitMainLoop` exception is raised.
- This method will use :attr:`screen`'s run_wrapper() method if
- :attr:`screen`'s start() method has not already been called.
+ If you would prefer to manage the event loop yourself, don't use this
+ method. Instead, call :meth:`start` before starting the event loop,
+ and :meth:`stop` once it's finished.
"""
try:
- if self.screen.started:
- self._run()
- else:
- self.screen.run_wrapper(self._run)
+ self._run()
except ExitMainLoop:
pass
@@ -293,101 +293,106 @@ class MainLoop(object):
>>> evl.watch_file_rval = 2
>>> ml = MainLoop(w, [], scr, event_loop=evl)
>>> ml.run() # doctest:+ELLIPSIS
+ screen.start()
screen.set_mouse_tracking()
+ screen.unhook_event_loop(...)
+ screen.hook_event_loop(...)
+ event_loop.enter_idle(<bound method MainLoop.entering_idle...>)
+ event_loop.run()
+ event_loop.remove_enter_idle(1)
+ screen.unhook_event_loop(...)
+ screen.stop()
+ >>> ml.draw_screen() # doctest:+ELLIPSIS
screen.get_cols_rows()
widget.render((20, 10), focus=True)
screen.draw_screen((20, 10), 'fake canvas')
- screen.get_input_descriptors()
- event_loop.watch_file(42, <bound method ...>)
- event_loop.enter_idle(<bound method ...>)
- event_loop.run()
- event_loop.remove_enter_idle(1)
- event_loop.remove_watch_file(2)
- >>> scr.started = False
- >>> ml.run() # doctest:+ELLIPSIS
- screen.run_wrapper(<bound method ...>)
"""
- def _run(self):
- if self.handle_mouse:
- self.screen.set_mouse_tracking()
+ def start(self):
+ """
+ Sets up the main loop, hooking into the event loop where necessary.
+ Starts the :attr:`screen` if it hasn't already been started.
- if not hasattr(self.screen, 'get_input_descriptors'):
- return self._run_screen_event_loop()
+ If you want to control starting and stopping the event loop yourself,
+ you should call this method before starting, and call `stop` once the
+ loop has finished. You may also use this method as a context manager,
+ which will stop the loop automatically at the end of the block:
- self.draw_screen()
+ with main_loop.start():
+ ...
- fd_handles = []
- def reset_input_descriptors(only_remove=False):
- for handle in fd_handles:
- self.event_loop.remove_watch_file(handle)
- if only_remove:
- del fd_handles[:]
- else:
- fd_handles[:] = [
- self.event_loop.watch_file(fd, self._update)
- for fd in self.screen.get_input_descriptors()]
- if not fd_handles and self._input_timeout is not None:
- self.event_loop.remove_alarm(self._input_timeout)
+ Note that some event loop implementations don't handle exceptions
+ specially if you manage the event loop yourself. In particular, the
+ Twisted and asyncio loops won't stop automatically when
+ :exc:`ExitMainLoop` (or anything else) is raised.
+ """
+ self.screen.start()
+
+ if self.handle_mouse:
+ self.screen.set_mouse_tracking()
+
+ if not hasattr(self.screen, 'hook_event_loop'):
+ raise CantUseExternalLoop(
+ "Screen {0!r} doesn't support external event loops")
try:
signals.connect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED,
- reset_input_descriptors)
+ self._reset_input_descriptors)
except NameError:
pass
# watch our input descriptors
- reset_input_descriptors()
- idle_handle = self.event_loop.enter_idle(self.entering_idle)
+ self._reset_input_descriptors()
+ self.idle_handle = self.event_loop.enter_idle(self.entering_idle)
- # Go..
- self.event_loop.run()
+ return StoppingContext(self)
- # tidy up
- self.event_loop.remove_enter_idle(idle_handle)
- reset_input_descriptors(True)
+ def stop(self):
+ """
+ Cleans up any hooks added to the event loop. Only call this if you're
+ managing the event loop yourself, after the loop stops.
+ """
+ self.event_loop.remove_enter_idle(self.idle_handle)
+ del self.idle_handle
signals.disconnect_signal(self.screen, INPUT_DESCRIPTORS_CHANGED,
- reset_input_descriptors)
+ self._reset_input_descriptors)
+ self.screen.unhook_event_loop(self.event_loop)
+
+ self.screen.stop()
+
+ def _reset_input_descriptors(self):
+ self.screen.unhook_event_loop(self.event_loop)
+ self.screen.hook_event_loop(self.event_loop, self._update)
+
+ def _run(self):
+ try:
+ self.start()
+ except CantUseExternalLoop:
+ try:
+ return self._run_screen_event_loop()
+ finally:
+ self.screen.stop()
- def _update(self, timeout=False):
+ self.event_loop.run()
+ self.stop()
+
+ def _update(self, keys, raw):
"""
>>> w = _refl("widget")
>>> w.selectable_rval = True
>>> w.mouse_event_rval = True
>>> scr = _refl("screen")
>>> scr.get_cols_rows_rval = (15, 5)
- >>> scr.get_input_nonblocking_rval = 1, ['y'], [121]
>>> evl = _refl("event_loop")
>>> ml = MainLoop(w, [], scr, event_loop=evl)
>>> ml._input_timeout = "old timeout"
- >>> ml._update() # doctest:+ELLIPSIS
- event_loop.remove_alarm('old timeout')
- screen.get_input_nonblocking()
- event_loop.alarm(1, <function ...>)
+ >>> ml._update(['y'], [121]) # doctest:+ELLIPSIS
screen.get_cols_rows()
widget.selectable()
widget.keypress((15, 5), 'y')
- >>> scr.get_input_nonblocking_rval = None, [("mouse press", 1, 5, 4)
- ... ], []
- >>> ml._update()
- screen.get_input_nonblocking()
+ >>> ml._update([("mouse press", 1, 5, 4)], [])
widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True)
- >>> scr.get_input_nonblocking_rval = None, [], []
- >>> ml._update()
- screen.get_input_nonblocking()
+ >>> ml._update([], [])
"""
- if self._input_timeout is not None and not timeout:
- # cancel the timeout, something else triggered the update
- self.event_loop.remove_alarm(self._input_timeout)
- self._input_timeout = None
-
- max_wait, keys, raw = self.screen.get_input_nonblocking()
-
- if max_wait is not None:
- # if get_input_nonblocking wants to be called back
- # make sure it happens with an alarm
- self._input_timeout = self.event_loop.alarm(max_wait,
- lambda: self._update(timeout=True))
-
keys = self.input_filter(keys, raw)
if keys:
@@ -583,7 +588,7 @@ class SelectEventLoop(object):
def alarm(self, seconds, callback):
"""
- Call callback() given time from from now. No parameters are
+ Call callback() a given time from now. No parameters are
passed to callback.
Returns a handle that may be passed to remove_alarm()
@@ -671,7 +676,7 @@ class SelectEventLoop(object):
while True:
try:
self._loop()
- except select.error, e:
+ except select.error as e:
if e.args[0] != 4:
# not just something we need to retry
raise
@@ -730,7 +735,7 @@ class GLibEventLoop(object):
def alarm(self, seconds, callback):
"""
- Call callback() given time from from now. No parameters are
+ Call callback() a given time from now. No parameters are
passed to callback.
Returns a handle that may be passed to remove_alarm()
@@ -1044,6 +1049,10 @@ class TwistedEventLoop(object):
``manage_reactor=False`` and take care of running/stopping the reactor
at the beginning/ending of your program yourself.
+ You can also forego using :class:`MainLoop`'s run() entirely, and
+ instead call start() and stop() before and after starting the
+ reactor.
+
.. _Twisted: http://twistedmatrix.com/trac/
"""
if reactor is None:
@@ -1061,7 +1070,7 @@ class TwistedEventLoop(object):
def alarm(self, seconds, callback):
"""
- Call callback() given time from from now. No parameters are
+ Call callback() a given time from now. No parameters are
passed to callback.
Returns a handle that may be passed to remove_alarm()
@@ -1200,6 +1209,121 @@ class TwistedEventLoop(object):
return wrapper
+class AsyncioEventLoop(object):
+ """
+ Event loop based on the standard library ``asyncio`` module.
+
+ ``asyncio`` is new in Python 3.4, but also exists as a backport on PyPI for
+ Python 3.3. The ``trollius`` package is available for older Pythons with
+ slightly different syntax, but also works with this loop.
+ """
+ _we_started_event_loop = False
+
+ _idle_emulation_delay = 1.0/256 # a short time (in seconds)
+
+ def __init__(self, **kwargs):
+ if 'loop' in kwargs:
+ self._loop = kwargs.pop('loop')
+ else:
+ import asyncio
+ self._loop = asyncio.get_event_loop()
+
+ def alarm(self, seconds, callback):
+ """
+ Call callback() a given time from now. No parameters are
+ passed to callback.
+
+ Returns a handle that may be passed to remove_alarm()
+
+ seconds -- time in seconds to wait before calling callback
+ callback -- function to call from event loop
+ """
+ return self._loop.call_later(seconds, callback)
+
+ def remove_alarm(self, handle):
+ """
+ Remove an alarm.
+
+ Returns True if the alarm exists, False otherwise
+ """
+ existed = not handle._cancelled
+ handle.cancel()
+ return existed
+
+ def watch_file(self, fd, callback):
+ """
+ Call callback() when fd has some data to read. No parameters
+ are passed to callback.
+
+ Returns a handle that may be passed to remove_watch_file()
+
+ fd -- file descriptor to watch for input
+ callback -- function to call when input is available
+ """
+ self._loop.add_reader(fd, callback)
+ return fd
+
+ def remove_watch_file(self, handle):
+ """
+ Remove an input file.
+
+ Returns True if the input file exists, False otherwise
+ """
+ return self._loop.remove_reader(handle)
+
+ def enter_idle(self, callback):
+ """
+ Add a callback for entering idle.
+
+ Returns a handle that may be passed to remove_idle()
+ """
+ # XXX there's no such thing as "idle" in most event loops; this fakes
+ # it the same way as Twisted, by scheduling the callback to be called
+ # repeatedly
+ mutable_handle = [None]
+ def faux_idle_callback():
+ callback()
+ mutable_handle[0] = self._loop.call_later(
+ self._idle_emulation_delay, faux_idle_callback)
+
+ mutable_handle[0] = self._loop.call_later(
+ self._idle_emulation_delay, faux_idle_callback)
+
+ return mutable_handle
+
+ def remove_enter_idle(self, handle):
+ """
+ Remove an idle callback.
+
+ Returns True if the handle was removed.
+ """
+ # `handle` is just a list containing the current actual handle
+ return self.remove_alarm(handle[0])
+
+ _exc_info = None
+
+ def _exception_handler(self, loop, context):
+ exc = context.get('exception')
+ if exc:
+ loop.stop()
+ if not isinstance(exc, ExitMainLoop):
+ # Store the exc_info so we can re-raise after the loop stops
+ import sys
+ self._exc_info = sys.exc_info()
+ else:
+ loop.default_exception_handler(context)
+
+ def run(self):
+ """
+ Start the event loop. Exit the loop when any callback raises
+ an exception. If ExitMainLoop is raised, exit cleanly.
+ """
+ self._loop.set_exception_handler(self._exception_handler)
+ self._loop.run_forever()
+ if self._exc_info:
+ raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
+ self._exc_info = None
+
def _refl(name, rval=None, exit=False):
"""
diff --git a/urwid/monitored_list.py b/urwid/monitored_list.py
index 551fb91..a1a6326 100755
--- a/urwid/monitored_list.py
+++ b/urwid/monitored_list.py
@@ -158,9 +158,9 @@ class MonitoredFocusList(MonitoredList):
self._focus = 0
return
if index < 0 or index >= len(self):
- raise IndexError, 'focus index is out of range: %s' % (index,)
+ raise IndexError('focus index is out of range: %s' % (index,))
if index != int(index):
- raise IndexError, 'invalid focus index: %s' % (index,)
+ raise IndexError('invalid focus index: %s' % (index,))
index = int(index)
if index != self._focus:
self._focus_changed(index)
diff --git a/urwid/old_str_util.py b/urwid/old_str_util.py
index 04594ec..83190f5 100755
--- a/urwid/old_str_util.py
+++ b/urwid/old_str_util.py
@@ -19,6 +19,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: http://excess.org/urwid/
+from __future__ import print_function
import re
@@ -357,10 +358,10 @@ def process_east_asian_width():
out.append( (num, l) )
last = l
- print "widths = ["
+ print("widths = [")
for o in out[1:]: # treat control characters same as ascii
- print "\t%r," % (o,)
- print "]"
+ print("\t%r," % (o,))
+ print("]")
if __name__ == "__main__":
process_east_asian_width()
diff --git a/urwid/raw_display.py b/urwid/raw_display.py
index 185f1ab..a3d14a0 100644
--- a/urwid/raw_display.py
+++ b/urwid/raw_display.py
@@ -48,7 +48,7 @@ from subprocess import Popen, PIPE
class Screen(BaseScreen, RealTerminal):
- def __init__(self):
+ def __init__(self, input=sys.stdin, output=sys.stdout):
"""Initialize a screen that directly prints escape codes to an output
terminal.
"""
@@ -78,8 +78,11 @@ class Screen(BaseScreen, RealTerminal):
self.bright_is_bold = not term.startswith("xterm")
self.back_color_erase = not term.startswith("screen")
self._next_timeout = None
- self._term_output_file = sys.stdout
- self._term_input_file = sys.stdin
+
+ # Our connections to the world
+ self._term_output_file = output
+ self._term_input_file = input
+
# pipe for signalling external event loops about resize events
self._resize_pipe_rd, self._resize_pipe_wr = os.pipe()
fcntl.fcntl(self._resize_pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK)
@@ -164,10 +167,10 @@ class Screen(BaseScreen, RealTerminal):
def _mouse_tracking(self, enable):
if enable:
- self._term_output_file.write(escape.MOUSE_TRACKING_ON)
+ self.write(escape.MOUSE_TRACKING_ON)
self._start_gpm_tracking()
else:
- self._term_output_file.write(escape.MOUSE_TRACKING_OFF)
+ self.write(escape.MOUSE_TRACKING_OFF)
self._stop_gpm_tracking()
def _start_gpm_tracking(self):
@@ -189,15 +192,14 @@ class Screen(BaseScreen, RealTerminal):
os.waitpid(self.gpm_mev.pid, 0)
self.gpm_mev = None
- def start(self, alternate_buffer=True):
+ def _start(self, alternate_buffer=True):
"""
Initialize the screen and input mode.
alternate_buffer -- use alternate screen buffer
"""
- assert not self._started
if alternate_buffer:
- self._term_output_file.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
+ self.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
self._rows_used = None
else:
self._rows_used = 0
@@ -209,25 +211,22 @@ class Screen(BaseScreen, RealTerminal):
self.signal_init()
self._alternate_buffer = alternate_buffer
- self._input_iter = self._run_input_iter()
self._next_timeout = self.max_wait
if not self._signal_keys_set:
self._old_signal_keys = self.tty_signal_keys(fileno=fd)
- super(Screen, self).start()
-
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
# restore mouse tracking to previous state
self._mouse_tracking(self._mouse_tracking_enabled)
- def stop(self):
+ return super(Screen, self)._start()
+
+ def _stop(self):
"""
Restore the screen.
"""
self.clear()
- if not self._started:
- return
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
@@ -246,32 +245,33 @@ class Screen(BaseScreen, RealTerminal):
elif self.maxrow is not None:
move_cursor = escape.set_cursor_position(
0, self.maxrow)
- self._term_output_file.write(
+ self.write(
self._attrspec_to_escape(AttrSpec('',''))
+ escape.SI
+ move_cursor
+ escape.SHOW_CURSOR)
- self._input_iter = self._fake_input_iter()
if self._old_signal_keys:
self.tty_signal_keys(*(self._old_signal_keys + (fd,)))
- super(Screen, self).stop()
+ super(Screen, self)._stop()
- def run_wrapper(self, fn, alternate_buffer=True):
+ def write(self, data):
+ """Write some data to the terminal.
+
+ You may wish to override this if you're using something other than
+ regular files for input and output.
"""
- Call start to initialize screen, then call fn.
- When fn exits call stop to restore the screen to normal.
+ self._term_output_file.write(data)
+
+ def flush(self):
+ """Flush the output buffer.
- alternate_buffer -- use alternate screen buffer and restore
- normal screen buffer on exit
+ You may wish to override this if you're using something other than
+ regular files for input and output.
"""
- try:
- self.start(alternate_buffer)
- return fn()
- finally:
- self.stop()
+ self._term_output_file.flush()
def get_input(self, raw_keys=False):
"""Return pending input as a list.
@@ -320,14 +320,13 @@ class Screen(BaseScreen, RealTerminal):
assert self._started
self._wait_for_input_ready(self._next_timeout)
- self._next_timeout, keys, raw = self._input_iter.next()
+ keys, raw = self.parse_input(None, None, self.get_available_raw_input())
# Avoid pegging CPU at 100% when slowly resizing
if keys==['window resize'] and self.prev_input_resize:
while True:
self._wait_for_input_ready(self.resize_wait)
- self._next_timeout, keys, raw2 = \
- self._input_iter.next()
+ keys, raw2 = self.parse_input(None, None, self.get_available_raw_input())
raw += raw2
#if not keys:
# keys, raw2 = self._get_input(
@@ -364,71 +363,135 @@ class Screen(BaseScreen, RealTerminal):
fd_list.append(self.gpm_mev.stdout.fileno())
return fd_list
- def get_input_nonblocking(self):
+ _current_event_loop_handles = ()
+
+ def unhook_event_loop(self, event_loop):
"""
- Return a (next_input_timeout, keys_pressed, raw_keycodes)
- tuple.
+ Remove any hooks added by hook_event_loop.
+ """
+ for handle in self._current_event_loop_handles:
+ event_loop.remove_watch_file(handle)
- Use this method if you are implementing your own event loop.
+ if self._input_timeout:
+ event_loop.remove_alarm(self._input_timeout)
+ self._input_timeout = None
- When there is input waiting on one of the descriptors returned
- by get_input_descriptors() this method should be called to
- read and process the input.
+ def hook_event_loop(self, event_loop, callback):
+ """
+ Register the given callback with the event loop, to be called with new
+ input whenever it's available. The callback should be passed a list of
+ processed keys and a list of unprocessed keycodes.
- This method expects to be called in next_input_timeout seconds
- (a floating point number) if there is no input waiting.
+ Subclasses may wish to use parse_input to wrap the callback.
+ """
+ if hasattr(self, 'get_input_nonblocking'):
+ wrapper = self._make_legacy_input_wrapper(event_loop, callback)
+ else:
+ wrapper = lambda: self.parse_input(
+ event_loop, callback, self.get_available_raw_input())
+ fds = self.get_input_descriptors()
+ handles = []
+ for fd in fds:
+ event_loop.watch_file(fd, wrapper)
+ self._current_event_loop_handles = handles
+
+ _input_timeout = None
+ _partial_codes = None
+
+ def _make_legacy_input_wrapper(self, event_loop, callback):
+ """
+ Support old Screen classes that still have a get_input_nonblocking and
+ expect it to work.
"""
- return self._input_iter.next()
+ def wrapper():
+ if self._input_timeout:
+ event_loop.remove_alarm(self._input_timeout)
+ self._input_timeout = None
+ timeout, keys, raw = self.get_input_nonblocking()
+ if timeout is not None:
+ self._input_timeout = event_loop.alarm(timeout, wrapper)
- def _run_input_iter(self):
- def empty_resize_pipe():
- # clean out the pipe used to signal external event loops
- # that a resize has occurred
- try:
- while True: os.read(self._resize_pipe_rd, 1)
- except OSError:
- pass
+ callback(keys, raw)
- while True:
- processed = []
- codes = self._get_gpm_codes() + \
- self._get_keyboard_codes()
+ return wrapper
- original_codes = codes
- try:
- while codes:
- run, codes = escape.process_keyqueue(
- codes, True)
- processed.extend(run)
- except escape.MoreInputRequired:
- k = len(original_codes) - len(codes)
- yield (self.complete_wait, processed,
- original_codes[:k])
- empty_resize_pipe()
- original_codes = codes
- processed = []
-
- codes += self._get_keyboard_codes() + \
- self._get_gpm_codes()
- while codes:
- run, codes = escape.process_keyqueue(
- codes, False)
- processed.extend(run)
-
- if self._resized:
- processed.append('window resize')
- self._resized = False
-
- yield (self.max_wait, processed, original_codes)
- empty_resize_pipe()
-
- def _fake_input_iter(self):
- """
- This generator is a placeholder for when the screen is stopped
- to always return that no input is available.
+ def get_available_raw_input(self):
"""
- while True:
- yield (self.max_wait, [], [])
+ Return any currently-available input. Does not block.
+
+ This method is only used by the default `hook_event_loop`
+ implementation; you can safely ignore it if you implement your own.
+ """
+ codes = self._get_gpm_codes() + self._get_keyboard_codes()
+
+ if self._partial_codes:
+ codes = self._partial_codes + codes
+ self._partial_codes = None
+
+ # clean out the pipe used to signal external event loops
+ # that a resize has occurred
+ try:
+ while True: os.read(self._resize_pipe_rd, 1)
+ except OSError:
+ pass
+
+ return codes
+
+ def parse_input(self, event_loop, callback, codes, wait_for_more=True):
+ """
+ Read any available input from get_available_raw_input, parses it into
+ keys, and calls the given callback.
+
+ The current implementation tries to avoid any assumptions about what
+ the screen or event loop look like; it only deals with parsing keycodes
+ and setting a timeout when an incomplete one is detected.
+
+ `codes` should be a sequence of keycodes, i.e. bytes. A bytearray is
+ appropriate, but beware of using bytes, which only iterates as integers
+ on Python 3.
+ """
+ # Note: event_loop may be None for 100% synchronous support, only used
+ # by get_input. Not documented because you shouldn't be doing it.
+ if self._input_timeout and event_loop:
+ event_loop.remove_alarm(self._input_timeout)
+ self._input_timeout = None
+
+ original_codes = codes
+ processed = []
+ try:
+ while codes:
+ run, codes = escape.process_keyqueue(
+ codes, wait_for_more)
+ processed.extend(run)
+ except escape.MoreInputRequired:
+ # Set a timer to wait for the rest of the input; if it goes off
+ # without any new input having come in, use the partial input
+ k = len(original_codes) - len(codes)
+ processed_codes = original_codes[:k]
+ self._partial_codes = codes
+
+ def _parse_incomplete_input():
+ self._input_timeout = None
+ self._partial_codes = None
+ self.parse_input(
+ event_loop, callback, codes, wait_for_more=False)
+ if event_loop:
+ self._input_timeout = event_loop.alarm(
+ self.complete_wait, _parse_incomplete_input)
+
+ else:
+ processed_codes = original_codes
+ self._partial_codes = None
+
+ if self._resized:
+ processed.append('window resize')
+ self._resized = False
+
+ if callback:
+ callback(processed, processed_codes)
+ else:
+ # For get_input
+ return processed, processed_codes
def _get_keyboard_codes(self):
codes = []
@@ -444,7 +507,7 @@ class Screen(BaseScreen, RealTerminal):
try:
while self.gpm_mev is not None and self.gpm_event_pending:
codes.extend(self._encode_gpm_event())
- except IOError, e:
+ except IOError as e:
if e.args[0] != 11:
raise
return codes
@@ -463,7 +526,7 @@ class Screen(BaseScreen, RealTerminal):
ready,w,err = select.select(
fd_list,[],fd_list, timeout)
break
- except select.error, e:
+ except select.error as e:
if e.args[0] != 4:
raise
if self._resized:
@@ -592,8 +655,8 @@ class Screen(BaseScreen, RealTerminal):
while True:
try:
- self._term_output_file.write(escape.DESIGNATE_G1_SPECIAL)
- self._term_output_file.flush()
+ self.write(escape.DESIGNATE_G1_SPECIAL)
+ self.flush()
break
except IOError:
pass
@@ -770,9 +833,9 @@ class Screen(BaseScreen, RealTerminal):
for l in o:
if isinstance(l, bytes) and PYTHON3:
l = l.decode('utf-8')
- self._term_output_file.write(l)
- self._term_output_file.flush()
- except IOError, e:
+ self.write(l)
+ self.flush()
+ except IOError as e:
# ignore interrupted syscall
if e.args[0] != 4:
raise
@@ -945,8 +1008,8 @@ class Screen(BaseScreen, RealTerminal):
modify = ["%d;rgb:%02x/%02x/%02x" % (index, red, green, blue)
for index, red, green, blue in entries]
- self._term_output_file.write("\x1b]4;"+";".join(modify)+"\x1b\\")
- self._term_output_file.flush()
+ self.write("\x1b]4;"+";".join(modify)+"\x1b\\")
+ self.flush()
# shortcut for creating an AttrSpec with this screen object's
diff --git a/urwid/signals.py b/urwid/signals.py
index 80b6646..b716939 100644
--- a/urwid/signals.py
+++ b/urwid/signals.py
@@ -150,8 +150,8 @@ class Signals(object):
sig_cls = obj.__class__
if not name in self._supported.get(sig_cls, []):
- raise NameError, "No such signal %r for object %r" % \
- (name, obj)
+ raise NameError("No such signal %r for object %r" %
+ (name, obj))
# Just generate an arbitrary (but unique) key
key = Key()
diff --git a/urwid/tests/test_event_loops.py b/urwid/tests/test_event_loops.py
index 0793602..c85bbed 100644
--- a/urwid/tests/test_event_loops.py
+++ b/urwid/tests/test_event_loops.py
@@ -34,6 +34,8 @@ class EventLoopTestMixin(object):
self.assertTrue(evl.remove_watch_file(handle))
self.assertFalse(evl.remove_watch_file(handle))
+ _expected_idle_handle = 1
+
def test_run(self):
evl = self.evl
out = []
@@ -50,7 +52,9 @@ class EventLoopTestMixin(object):
1/0
handle = evl.alarm(0.01, exit_clean)
handle = evl.alarm(0.005, say_hello)
- self.assertEqual(evl.enter_idle(say_waiting), 1)
+ idle_handle = evl.enter_idle(say_waiting)
+ if self._expected_idle_handle is not None:
+ self.assertEqual(idle_handle, 1)
evl.run()
self.assertTrue("hello" in out, out)
self.assertTrue("clean exit"in out, out)
@@ -129,3 +133,15 @@ else:
self.assertTrue("ta" in out, out)
self.assertTrue("hello" in out, out)
self.assertTrue("clean exit" in out, out)
+
+
+try:
+ import asyncio
+except ImportError:
+ pass
+else:
+ class AsyncioEventLoopTest(unittest.TestCase, EventLoopTestMixin):
+ def setUp(self):
+ self.evl = urwid.AsyncioEventLoop()
+
+ _expected_idle_handle = None
diff --git a/urwid/util.py b/urwid/util.py
index ced247e..3569f8c 100644
--- a/urwid/util.py
+++ b/urwid/util.py
@@ -45,7 +45,7 @@ def detect_encoding():
except locale.Error:
pass
return locale.getlocale()[1] or ""
- except ValueError, e:
+ except ValueError as e:
# with invalid LANG value python will throw ValueError
if e.args and e.args[0].startswith("unknown locale"):
return ""
@@ -281,13 +281,14 @@ def rle_len( rle ):
run += r
return run
-def rle_append_beginning_modify( rle, (a, r) ):
+def rle_append_beginning_modify(rle, a_r):
"""
- Append (a, r) to BEGINNING of rle.
+ Append (a, r) (unpacked from *a_r*) to BEGINNING of rle.
Merge with first run when possible
MODIFIES rle parameter contents. Returns None.
"""
+ a, r = a_r
if not rle:
rle[:] = [(a, r)]
else:
@@ -298,13 +299,14 @@ def rle_append_beginning_modify( rle, (a, r) ):
rle[0:0] = [(al, r)]
-def rle_append_modify( rle, (a, r) ):
+def rle_append_modify(rle, a_r):
"""
- Append (a,r) to the rle list rle.
+ Append (a, r) (unpacked from *a_r*) to the rle list rle.
Merge with last run when possible.
MODIFIES rle parameter contents. Returns None.
"""
+ a, r = a_r
if not rle or rle[-1][0] != a:
rle.append( (a,r) )
return
@@ -405,13 +407,13 @@ def _tagmarkup_recurse( tm, attr ):
if type(tm) == tuple:
# tuples mark a new attribute boundary
if len(tm) != 2:
- raise TagMarkupException, "Tuples must be in the form (attribute, tagmarkup): %r" % (tm,)
+ raise TagMarkupException("Tuples must be in the form (attribute, tagmarkup): %r" % (tm,))
attr, element = tm
return _tagmarkup_recurse( element, attr )
if not isinstance(tm,(basestring, bytes)):
- raise TagMarkupException, "Invalid markup element: %r" % tm
+ raise TagMarkupException("Invalid markup element: %r" % tm)
# text
return [tm], [(attr, len(tm))]
@@ -431,7 +433,7 @@ class MetaSuper(type):
def __init__(cls, name, bases, d):
super(MetaSuper, cls).__init__(name, bases, d)
if hasattr(cls, "_%s__super" % name):
- raise AttributeError, "Class has same name as one of its super classes"
+ raise AttributeError("Class has same name as one of its super classes")
setattr(cls, "_%s__super" % name, super(cls))
@@ -456,3 +458,17 @@ def int_scale(val, val_range, out_range):
# if num % dem == 0 then we are exactly half-way and have rounded up.
return num // dem
+
+class StoppingContext(object):
+ """Context manager that calls ``stop`` on a given object on exit. Used to
+ make the ``start`` method on `MainLoop` and `BaseScreen` optionally act as
+ context managers.
+ """
+ def __init__(self, wrapped):
+ self._wrapped = wrapped
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_info):
+ self._wrapped.stop()
diff --git a/urwid/vterm.py b/urwid/vterm.py
index 212094c..cc4eb7f 100644
--- a/urwid/vterm.py
+++ b/urwid/vterm.py
@@ -1522,7 +1522,7 @@ class Terminal(Widget):
try:
select.select([self.master], [], [], timeout)
break
- except select.error, e:
+ except select.error as e:
if e.args[0] != 4:
raise
self.feed()
@@ -1532,7 +1532,7 @@ class Terminal(Widget):
try:
data = os.read(self.master, 4096)
- except OSError, e:
+ except OSError as e:
if e.errno == 5: # End Of File
data = ''
elif e.errno == errno.EWOULDBLOCK: # empty buffer
diff --git a/urwid/web_display.py b/urwid/web_display.py
index e18555e..44a505c 100755
--- a/urwid/web_display.py
+++ b/urwid/web_display.py
@@ -593,7 +593,7 @@ class Screen:
continue
assert len(item) == 2, "Invalid register_palette usage"
name, like_name = item
- if not self.palette.has_key(like_name):
+ if like_name not in self.palette:
raise Exception("palette entry '%s' doesn't exist"%like_name)
self.palette[name] = self.palette[like_name]
@@ -632,7 +632,8 @@ class Screen:
"""
global _prefs
- assert not self._started
+ if self._started:
+ return util.StoppingContext(self)
client_init = sys.stdin.read(50)
assert client_init.startswith("window resize "),client_init
@@ -679,11 +680,15 @@ class Screen:
signal.alarm( ALARM_DELAY )
self._started = True
+ return util.StoppingContext(self)
+
def stop(self):
"""
Restore settings and clean up.
"""
- assert self._started
+ if not self._started:
+ return
+
# XXX which exceptions does this actually raise? EnvironmentError?
try:
self._close_connection()
@@ -880,7 +885,7 @@ class Screen:
try:
iready,oready,eready = select.select(
[self.input_fd],[],[],0.5)
- except select.error, e:
+ except select.error as e:
# return on interruptions
if e.args[0] == 4:
if raw_keys:
@@ -946,7 +951,7 @@ def is_web_request():
"""
Return True if this is a CGI web request.
"""
- return os.environ.has_key('REQUEST_METHOD')
+ return 'REQUEST_METHOD' in os.environ
def handle_short_request():
"""
@@ -973,7 +978,7 @@ def handle_short_request():
# Don't know what to do with head requests etc.
return False
- if not os.environ.has_key('HTTP_X_URWID_ID'):
+ if 'HTTP_X_URWID_ID' not in os.environ:
# If no urwid id, then the application should be started.
return False