summaryrefslogtreecommitdiff
path: root/urwid/raw_display.py
diff options
context:
space:
mode:
Diffstat (limited to 'urwid/raw_display.py')
-rw-r--r--urwid/raw_display.py253
1 files changed, 158 insertions, 95 deletions
diff --git a/urwid/raw_display.py b/urwid/raw_display.py
index 185f1ab..a3d14a0 100644
--- a/urwid/raw_display.py
+++ b/urwid/raw_display.py
@@ -48,7 +48,7 @@ from subprocess import Popen, PIPE
class Screen(BaseScreen, RealTerminal):
- def __init__(self):
+ def __init__(self, input=sys.stdin, output=sys.stdout):
"""Initialize a screen that directly prints escape codes to an output
terminal.
"""
@@ -78,8 +78,11 @@ class Screen(BaseScreen, RealTerminal):
self.bright_is_bold = not term.startswith("xterm")
self.back_color_erase = not term.startswith("screen")
self._next_timeout = None
- self._term_output_file = sys.stdout
- self._term_input_file = sys.stdin
+
+ # Our connections to the world
+ self._term_output_file = output
+ self._term_input_file = input
+
# pipe for signalling external event loops about resize events
self._resize_pipe_rd, self._resize_pipe_wr = os.pipe()
fcntl.fcntl(self._resize_pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK)
@@ -164,10 +167,10 @@ class Screen(BaseScreen, RealTerminal):
def _mouse_tracking(self, enable):
if enable:
- self._term_output_file.write(escape.MOUSE_TRACKING_ON)
+ self.write(escape.MOUSE_TRACKING_ON)
self._start_gpm_tracking()
else:
- self._term_output_file.write(escape.MOUSE_TRACKING_OFF)
+ self.write(escape.MOUSE_TRACKING_OFF)
self._stop_gpm_tracking()
def _start_gpm_tracking(self):
@@ -189,15 +192,14 @@ class Screen(BaseScreen, RealTerminal):
os.waitpid(self.gpm_mev.pid, 0)
self.gpm_mev = None
- def start(self, alternate_buffer=True):
+ def _start(self, alternate_buffer=True):
"""
Initialize the screen and input mode.
alternate_buffer -- use alternate screen buffer
"""
- assert not self._started
if alternate_buffer:
- self._term_output_file.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
+ self.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
self._rows_used = None
else:
self._rows_used = 0
@@ -209,25 +211,22 @@ class Screen(BaseScreen, RealTerminal):
self.signal_init()
self._alternate_buffer = alternate_buffer
- self._input_iter = self._run_input_iter()
self._next_timeout = self.max_wait
if not self._signal_keys_set:
self._old_signal_keys = self.tty_signal_keys(fileno=fd)
- super(Screen, self).start()
-
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
# restore mouse tracking to previous state
self._mouse_tracking(self._mouse_tracking_enabled)
- def stop(self):
+ return super(Screen, self)._start()
+
+ def _stop(self):
"""
Restore the screen.
"""
self.clear()
- if not self._started:
- return
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
@@ -246,32 +245,33 @@ class Screen(BaseScreen, RealTerminal):
elif self.maxrow is not None:
move_cursor = escape.set_cursor_position(
0, self.maxrow)
- self._term_output_file.write(
+ self.write(
self._attrspec_to_escape(AttrSpec('',''))
+ escape.SI
+ move_cursor
+ escape.SHOW_CURSOR)
- self._input_iter = self._fake_input_iter()
if self._old_signal_keys:
self.tty_signal_keys(*(self._old_signal_keys + (fd,)))
- super(Screen, self).stop()
+ super(Screen, self)._stop()
- def run_wrapper(self, fn, alternate_buffer=True):
+ def write(self, data):
+ """Write some data to the terminal.
+
+ You may wish to override this if you're using something other than
+ regular files for input and output.
"""
- Call start to initialize screen, then call fn.
- When fn exits call stop to restore the screen to normal.
+ self._term_output_file.write(data)
+
+ def flush(self):
+ """Flush the output buffer.
- alternate_buffer -- use alternate screen buffer and restore
- normal screen buffer on exit
+ You may wish to override this if you're using something other than
+ regular files for input and output.
"""
- try:
- self.start(alternate_buffer)
- return fn()
- finally:
- self.stop()
+ self._term_output_file.flush()
def get_input(self, raw_keys=False):
"""Return pending input as a list.
@@ -320,14 +320,13 @@ class Screen(BaseScreen, RealTerminal):
assert self._started
self._wait_for_input_ready(self._next_timeout)
- self._next_timeout, keys, raw = self._input_iter.next()
+ keys, raw = self.parse_input(None, None, self.get_available_raw_input())
# Avoid pegging CPU at 100% when slowly resizing
if keys==['window resize'] and self.prev_input_resize:
while True:
self._wait_for_input_ready(self.resize_wait)
- self._next_timeout, keys, raw2 = \
- self._input_iter.next()
+ keys, raw2 = self.parse_input(None, None, self.get_available_raw_input())
raw += raw2
#if not keys:
# keys, raw2 = self._get_input(
@@ -364,71 +363,135 @@ class Screen(BaseScreen, RealTerminal):
fd_list.append(self.gpm_mev.stdout.fileno())
return fd_list
- def get_input_nonblocking(self):
+ _current_event_loop_handles = ()
+
+ def unhook_event_loop(self, event_loop):
"""
- Return a (next_input_timeout, keys_pressed, raw_keycodes)
- tuple.
+ Remove any hooks added by hook_event_loop.
+ """
+ for handle in self._current_event_loop_handles:
+ event_loop.remove_watch_file(handle)
- Use this method if you are implementing your own event loop.
+ if self._input_timeout:
+ event_loop.remove_alarm(self._input_timeout)
+ self._input_timeout = None
- When there is input waiting on one of the descriptors returned
- by get_input_descriptors() this method should be called to
- read and process the input.
+ def hook_event_loop(self, event_loop, callback):
+ """
+ Register the given callback with the event loop, to be called with new
+ input whenever it's available. The callback should be passed a list of
+ processed keys and a list of unprocessed keycodes.
- This method expects to be called in next_input_timeout seconds
- (a floating point number) if there is no input waiting.
+ Subclasses may wish to use parse_input to wrap the callback.
+ """
+ if hasattr(self, 'get_input_nonblocking'):
+ wrapper = self._make_legacy_input_wrapper(event_loop, callback)
+ else:
+ wrapper = lambda: self.parse_input(
+ event_loop, callback, self.get_available_raw_input())
+ fds = self.get_input_descriptors()
+ handles = []
+ for fd in fds:
+ event_loop.watch_file(fd, wrapper)
+ self._current_event_loop_handles = handles
+
+ _input_timeout = None
+ _partial_codes = None
+
+ def _make_legacy_input_wrapper(self, event_loop, callback):
+ """
+ Support old Screen classes that still have a get_input_nonblocking and
+ expect it to work.
"""
- return self._input_iter.next()
+ def wrapper():
+ if self._input_timeout:
+ event_loop.remove_alarm(self._input_timeout)
+ self._input_timeout = None
+ timeout, keys, raw = self.get_input_nonblocking()
+ if timeout is not None:
+ self._input_timeout = event_loop.alarm(timeout, wrapper)
- def _run_input_iter(self):
- def empty_resize_pipe():
- # clean out the pipe used to signal external event loops
- # that a resize has occurred
- try:
- while True: os.read(self._resize_pipe_rd, 1)
- except OSError:
- pass
+ callback(keys, raw)
- while True:
- processed = []
- codes = self._get_gpm_codes() + \
- self._get_keyboard_codes()
+ return wrapper
- original_codes = codes
- try:
- while codes:
- run, codes = escape.process_keyqueue(
- codes, True)
- processed.extend(run)
- except escape.MoreInputRequired:
- k = len(original_codes) - len(codes)
- yield (self.complete_wait, processed,
- original_codes[:k])
- empty_resize_pipe()
- original_codes = codes
- processed = []
-
- codes += self._get_keyboard_codes() + \
- self._get_gpm_codes()
- while codes:
- run, codes = escape.process_keyqueue(
- codes, False)
- processed.extend(run)
-
- if self._resized:
- processed.append('window resize')
- self._resized = False
-
- yield (self.max_wait, processed, original_codes)
- empty_resize_pipe()
-
- def _fake_input_iter(self):
- """
- This generator is a placeholder for when the screen is stopped
- to always return that no input is available.
+ def get_available_raw_input(self):
"""
- while True:
- yield (self.max_wait, [], [])
+ Return any currently-available input. Does not block.
+
+ This method is only used by the default `hook_event_loop`
+ implementation; you can safely ignore it if you implement your own.
+ """
+ codes = self._get_gpm_codes() + self._get_keyboard_codes()
+
+ if self._partial_codes:
+ codes = self._partial_codes + codes
+ self._partial_codes = None
+
+ # clean out the pipe used to signal external event loops
+ # that a resize has occurred
+ try:
+ while True: os.read(self._resize_pipe_rd, 1)
+ except OSError:
+ pass
+
+ return codes
+
+ def parse_input(self, event_loop, callback, codes, wait_for_more=True):
+ """
+ Read any available input from get_available_raw_input, parses it into
+ keys, and calls the given callback.
+
+ The current implementation tries to avoid any assumptions about what
+ the screen or event loop look like; it only deals with parsing keycodes
+ and setting a timeout when an incomplete one is detected.
+
+ `codes` should be a sequence of keycodes, i.e. bytes. A bytearray is
+ appropriate, but beware of using bytes, which only iterates as integers
+ on Python 3.
+ """
+ # Note: event_loop may be None for 100% synchronous support, only used
+ # by get_input. Not documented because you shouldn't be doing it.
+ if self._input_timeout and event_loop:
+ event_loop.remove_alarm(self._input_timeout)
+ self._input_timeout = None
+
+ original_codes = codes
+ processed = []
+ try:
+ while codes:
+ run, codes = escape.process_keyqueue(
+ codes, wait_for_more)
+ processed.extend(run)
+ except escape.MoreInputRequired:
+ # Set a timer to wait for the rest of the input; if it goes off
+ # without any new input having come in, use the partial input
+ k = len(original_codes) - len(codes)
+ processed_codes = original_codes[:k]
+ self._partial_codes = codes
+
+ def _parse_incomplete_input():
+ self._input_timeout = None
+ self._partial_codes = None
+ self.parse_input(
+ event_loop, callback, codes, wait_for_more=False)
+ if event_loop:
+ self._input_timeout = event_loop.alarm(
+ self.complete_wait, _parse_incomplete_input)
+
+ else:
+ processed_codes = original_codes
+ self._partial_codes = None
+
+ if self._resized:
+ processed.append('window resize')
+ self._resized = False
+
+ if callback:
+ callback(processed, processed_codes)
+ else:
+ # For get_input
+ return processed, processed_codes
def _get_keyboard_codes(self):
codes = []
@@ -444,7 +507,7 @@ class Screen(BaseScreen, RealTerminal):
try:
while self.gpm_mev is not None and self.gpm_event_pending:
codes.extend(self._encode_gpm_event())
- except IOError, e:
+ except IOError as e:
if e.args[0] != 11:
raise
return codes
@@ -463,7 +526,7 @@ class Screen(BaseScreen, RealTerminal):
ready,w,err = select.select(
fd_list,[],fd_list, timeout)
break
- except select.error, e:
+ except select.error as e:
if e.args[0] != 4:
raise
if self._resized:
@@ -592,8 +655,8 @@ class Screen(BaseScreen, RealTerminal):
while True:
try:
- self._term_output_file.write(escape.DESIGNATE_G1_SPECIAL)
- self._term_output_file.flush()
+ self.write(escape.DESIGNATE_G1_SPECIAL)
+ self.flush()
break
except IOError:
pass
@@ -770,9 +833,9 @@ class Screen(BaseScreen, RealTerminal):
for l in o:
if isinstance(l, bytes) and PYTHON3:
l = l.decode('utf-8')
- self._term_output_file.write(l)
- self._term_output_file.flush()
- except IOError, e:
+ self.write(l)
+ self.flush()
+ except IOError as e:
# ignore interrupted syscall
if e.args[0] != 4:
raise
@@ -945,8 +1008,8 @@ class Screen(BaseScreen, RealTerminal):
modify = ["%d;rgb:%02x/%02x/%02x" % (index, red, green, blue)
for index, red, green, blue in entries]
- self._term_output_file.write("\x1b]4;"+";".join(modify)+"\x1b\\")
- self._term_output_file.flush()
+ self.write("\x1b]4;"+";".join(modify)+"\x1b\\")
+ self.flush()
# shortcut for creating an AttrSpec with this screen object's