diff options
Diffstat (limited to 'urwid/raw_display.py')
-rw-r--r-- | urwid/raw_display.py | 253 |
1 files changed, 158 insertions, 95 deletions
diff --git a/urwid/raw_display.py b/urwid/raw_display.py index 185f1ab..a3d14a0 100644 --- a/urwid/raw_display.py +++ b/urwid/raw_display.py @@ -48,7 +48,7 @@ from subprocess import Popen, PIPE class Screen(BaseScreen, RealTerminal): - def __init__(self): + def __init__(self, input=sys.stdin, output=sys.stdout): """Initialize a screen that directly prints escape codes to an output terminal. """ @@ -78,8 +78,11 @@ class Screen(BaseScreen, RealTerminal): self.bright_is_bold = not term.startswith("xterm") self.back_color_erase = not term.startswith("screen") self._next_timeout = None - self._term_output_file = sys.stdout - self._term_input_file = sys.stdin + + # Our connections to the world + self._term_output_file = output + self._term_input_file = input + # pipe for signalling external event loops about resize events self._resize_pipe_rd, self._resize_pipe_wr = os.pipe() fcntl.fcntl(self._resize_pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK) @@ -164,10 +167,10 @@ class Screen(BaseScreen, RealTerminal): def _mouse_tracking(self, enable): if enable: - self._term_output_file.write(escape.MOUSE_TRACKING_ON) + self.write(escape.MOUSE_TRACKING_ON) self._start_gpm_tracking() else: - self._term_output_file.write(escape.MOUSE_TRACKING_OFF) + self.write(escape.MOUSE_TRACKING_OFF) self._stop_gpm_tracking() def _start_gpm_tracking(self): @@ -189,15 +192,14 @@ class Screen(BaseScreen, RealTerminal): os.waitpid(self.gpm_mev.pid, 0) self.gpm_mev = None - def start(self, alternate_buffer=True): + def _start(self, alternate_buffer=True): """ Initialize the screen and input mode. alternate_buffer -- use alternate screen buffer """ - assert not self._started if alternate_buffer: - self._term_output_file.write(escape.SWITCH_TO_ALTERNATE_BUFFER) + self.write(escape.SWITCH_TO_ALTERNATE_BUFFER) self._rows_used = None else: self._rows_used = 0 @@ -209,25 +211,22 @@ class Screen(BaseScreen, RealTerminal): self.signal_init() self._alternate_buffer = alternate_buffer - self._input_iter = self._run_input_iter() self._next_timeout = self.max_wait if not self._signal_keys_set: self._old_signal_keys = self.tty_signal_keys(fileno=fd) - super(Screen, self).start() - signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED) # restore mouse tracking to previous state self._mouse_tracking(self._mouse_tracking_enabled) - def stop(self): + return super(Screen, self)._start() + + def _stop(self): """ Restore the screen. """ self.clear() - if not self._started: - return signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED) @@ -246,32 +245,33 @@ class Screen(BaseScreen, RealTerminal): elif self.maxrow is not None: move_cursor = escape.set_cursor_position( 0, self.maxrow) - self._term_output_file.write( + self.write( self._attrspec_to_escape(AttrSpec('','')) + escape.SI + move_cursor + escape.SHOW_CURSOR) - self._input_iter = self._fake_input_iter() if self._old_signal_keys: self.tty_signal_keys(*(self._old_signal_keys + (fd,))) - super(Screen, self).stop() + super(Screen, self)._stop() - def run_wrapper(self, fn, alternate_buffer=True): + def write(self, data): + """Write some data to the terminal. + + You may wish to override this if you're using something other than + regular files for input and output. """ - Call start to initialize screen, then call fn. - When fn exits call stop to restore the screen to normal. + self._term_output_file.write(data) + + def flush(self): + """Flush the output buffer. - alternate_buffer -- use alternate screen buffer and restore - normal screen buffer on exit + You may wish to override this if you're using something other than + regular files for input and output. """ - try: - self.start(alternate_buffer) - return fn() - finally: - self.stop() + self._term_output_file.flush() def get_input(self, raw_keys=False): """Return pending input as a list. @@ -320,14 +320,13 @@ class Screen(BaseScreen, RealTerminal): assert self._started self._wait_for_input_ready(self._next_timeout) - self._next_timeout, keys, raw = self._input_iter.next() + keys, raw = self.parse_input(None, None, self.get_available_raw_input()) # Avoid pegging CPU at 100% when slowly resizing if keys==['window resize'] and self.prev_input_resize: while True: self._wait_for_input_ready(self.resize_wait) - self._next_timeout, keys, raw2 = \ - self._input_iter.next() + keys, raw2 = self.parse_input(None, None, self.get_available_raw_input()) raw += raw2 #if not keys: # keys, raw2 = self._get_input( @@ -364,71 +363,135 @@ class Screen(BaseScreen, RealTerminal): fd_list.append(self.gpm_mev.stdout.fileno()) return fd_list - def get_input_nonblocking(self): + _current_event_loop_handles = () + + def unhook_event_loop(self, event_loop): """ - Return a (next_input_timeout, keys_pressed, raw_keycodes) - tuple. + Remove any hooks added by hook_event_loop. + """ + for handle in self._current_event_loop_handles: + event_loop.remove_watch_file(handle) - Use this method if you are implementing your own event loop. + if self._input_timeout: + event_loop.remove_alarm(self._input_timeout) + self._input_timeout = None - When there is input waiting on one of the descriptors returned - by get_input_descriptors() this method should be called to - read and process the input. + def hook_event_loop(self, event_loop, callback): + """ + Register the given callback with the event loop, to be called with new + input whenever it's available. The callback should be passed a list of + processed keys and a list of unprocessed keycodes. - This method expects to be called in next_input_timeout seconds - (a floating point number) if there is no input waiting. + Subclasses may wish to use parse_input to wrap the callback. + """ + if hasattr(self, 'get_input_nonblocking'): + wrapper = self._make_legacy_input_wrapper(event_loop, callback) + else: + wrapper = lambda: self.parse_input( + event_loop, callback, self.get_available_raw_input()) + fds = self.get_input_descriptors() + handles = [] + for fd in fds: + event_loop.watch_file(fd, wrapper) + self._current_event_loop_handles = handles + + _input_timeout = None + _partial_codes = None + + def _make_legacy_input_wrapper(self, event_loop, callback): + """ + Support old Screen classes that still have a get_input_nonblocking and + expect it to work. """ - return self._input_iter.next() + def wrapper(): + if self._input_timeout: + event_loop.remove_alarm(self._input_timeout) + self._input_timeout = None + timeout, keys, raw = self.get_input_nonblocking() + if timeout is not None: + self._input_timeout = event_loop.alarm(timeout, wrapper) - def _run_input_iter(self): - def empty_resize_pipe(): - # clean out the pipe used to signal external event loops - # that a resize has occurred - try: - while True: os.read(self._resize_pipe_rd, 1) - except OSError: - pass + callback(keys, raw) - while True: - processed = [] - codes = self._get_gpm_codes() + \ - self._get_keyboard_codes() + return wrapper - original_codes = codes - try: - while codes: - run, codes = escape.process_keyqueue( - codes, True) - processed.extend(run) - except escape.MoreInputRequired: - k = len(original_codes) - len(codes) - yield (self.complete_wait, processed, - original_codes[:k]) - empty_resize_pipe() - original_codes = codes - processed = [] - - codes += self._get_keyboard_codes() + \ - self._get_gpm_codes() - while codes: - run, codes = escape.process_keyqueue( - codes, False) - processed.extend(run) - - if self._resized: - processed.append('window resize') - self._resized = False - - yield (self.max_wait, processed, original_codes) - empty_resize_pipe() - - def _fake_input_iter(self): - """ - This generator is a placeholder for when the screen is stopped - to always return that no input is available. + def get_available_raw_input(self): """ - while True: - yield (self.max_wait, [], []) + Return any currently-available input. Does not block. + + This method is only used by the default `hook_event_loop` + implementation; you can safely ignore it if you implement your own. + """ + codes = self._get_gpm_codes() + self._get_keyboard_codes() + + if self._partial_codes: + codes = self._partial_codes + codes + self._partial_codes = None + + # clean out the pipe used to signal external event loops + # that a resize has occurred + try: + while True: os.read(self._resize_pipe_rd, 1) + except OSError: + pass + + return codes + + def parse_input(self, event_loop, callback, codes, wait_for_more=True): + """ + Read any available input from get_available_raw_input, parses it into + keys, and calls the given callback. + + The current implementation tries to avoid any assumptions about what + the screen or event loop look like; it only deals with parsing keycodes + and setting a timeout when an incomplete one is detected. + + `codes` should be a sequence of keycodes, i.e. bytes. A bytearray is + appropriate, but beware of using bytes, which only iterates as integers + on Python 3. + """ + # Note: event_loop may be None for 100% synchronous support, only used + # by get_input. Not documented because you shouldn't be doing it. + if self._input_timeout and event_loop: + event_loop.remove_alarm(self._input_timeout) + self._input_timeout = None + + original_codes = codes + processed = [] + try: + while codes: + run, codes = escape.process_keyqueue( + codes, wait_for_more) + processed.extend(run) + except escape.MoreInputRequired: + # Set a timer to wait for the rest of the input; if it goes off + # without any new input having come in, use the partial input + k = len(original_codes) - len(codes) + processed_codes = original_codes[:k] + self._partial_codes = codes + + def _parse_incomplete_input(): + self._input_timeout = None + self._partial_codes = None + self.parse_input( + event_loop, callback, codes, wait_for_more=False) + if event_loop: + self._input_timeout = event_loop.alarm( + self.complete_wait, _parse_incomplete_input) + + else: + processed_codes = original_codes + self._partial_codes = None + + if self._resized: + processed.append('window resize') + self._resized = False + + if callback: + callback(processed, processed_codes) + else: + # For get_input + return processed, processed_codes def _get_keyboard_codes(self): codes = [] @@ -444,7 +507,7 @@ class Screen(BaseScreen, RealTerminal): try: while self.gpm_mev is not None and self.gpm_event_pending: codes.extend(self._encode_gpm_event()) - except IOError, e: + except IOError as e: if e.args[0] != 11: raise return codes @@ -463,7 +526,7 @@ class Screen(BaseScreen, RealTerminal): ready,w,err = select.select( fd_list,[],fd_list, timeout) break - except select.error, e: + except select.error as e: if e.args[0] != 4: raise if self._resized: @@ -592,8 +655,8 @@ class Screen(BaseScreen, RealTerminal): while True: try: - self._term_output_file.write(escape.DESIGNATE_G1_SPECIAL) - self._term_output_file.flush() + self.write(escape.DESIGNATE_G1_SPECIAL) + self.flush() break except IOError: pass @@ -770,9 +833,9 @@ class Screen(BaseScreen, RealTerminal): for l in o: if isinstance(l, bytes) and PYTHON3: l = l.decode('utf-8') - self._term_output_file.write(l) - self._term_output_file.flush() - except IOError, e: + self.write(l) + self.flush() + except IOError as e: # ignore interrupted syscall if e.args[0] != 4: raise @@ -945,8 +1008,8 @@ class Screen(BaseScreen, RealTerminal): modify = ["%d;rgb:%02x/%02x/%02x" % (index, red, green, blue) for index, red, green, blue in entries] - self._term_output_file.write("\x1b]4;"+";".join(modify)+"\x1b\\") - self._term_output_file.flush() + self.write("\x1b]4;"+";".join(modify)+"\x1b\\") + self.flush() # shortcut for creating an AttrSpec with this screen object's |