diff options
author | Aseda Aboagye <aaboagye@google.com> | 2015-10-23 15:46:46 -0700 |
---|---|---|
committer | chrome-bot <chrome-bot@chromium.org> | 2015-11-11 11:00:55 -0800 |
commit | 77e76fc7fb8ed606386eae7f8528ead205d740ea (patch) | |
tree | 85a700fe4a6a1b5a97d95c7bd9e17f11f809cd2e /util | |
parent | 8501badde7178f6197f534093ff7aae33af2ca71 (diff) | |
download | chrome-ec-77e76fc7fb8ed606386eae7f8528ead205d740ea.tar.gz |
util: Add EC-3PO, the EC console interpreter.
EC-3PO is the console interpreter that will one day replace the EC console
that we have today. EC-3PO aims to migrate our rich debug console from
the EC itself to the host. This allows us to maintain our rich debug
console without impacting our EC image sizes while also allowing us to
add new features.
This commit is the 1st of three phases, the insertion phase. The main
goal of this insertion phase is to get EC-3PO in place between servo and
the EC UART, while not modifying the behaviour of the console too
much. At this point, EC-3PO is capable of the following things:
- Replicate command editing.
- Save command history.
- Performs error checking with console commands.
The command editing should be at parity with the current EC console.
With EC-3PO, one can have a much longer command history which also
persists across EC reboots. And lastly, with a cooperating EC image,
EC-3PO can perform error checking with console commands. Automatically
retrying console commands if the command was incorrectly received at the
EC end.
Currently, commands are sent to the EC in a "packed" plaintext form.
The next phase will introduce the host command packet communication.
console.py is the module that provides the console interface between the
user and the interpreter. It handles the presentation of the console
including command editing.
It also has an accompanying set of unit tests in console_unittest.py.
It currently has 1 test suite to test the various console editing
methods.
interpreter.py is the module that provides the interpretation layer
between the EC and the user. It also is responsible for the automatic
command retrying. It requires pipe connections to be made to it for
command and debug data communication.
BUG=chrome-os-partner:46054
BRANCH=None
TEST=util/ec3po/console_unittest.py
TEST=Flash GLaDOS with a modified EC build. Run console.py passing the
EC UART, verify that I can edit commands, send commands, view command
history, and receive output from the EC.
TEST=cros lint --log-level debug ./util/ec3po/console.py
TEST=cros lint --log-level debug ./util/ec3po/interpreter.py
TEST=cros lint --log-level debug ./util/ec3po/console_unittest.py
Change-Id: I38ae425836efd69044334e1ed0daf3f88a95917c
Signed-off-by: Aseda Aboagye <aaboagye@google.com>
Reviewed-on: https://chromium-review.googlesource.com/308615
Commit-Ready: Aseda Aboagye <aaboagye@chromium.org>
Tested-by: Aseda Aboagye <aaboagye@chromium.org>
Reviewed-by: Randall Spangler <rspangler@chromium.org>
Reviewed-by: Wai-Hong Tam <waihong@chromium.org>
Diffstat (limited to 'util')
-rwxr-xr-x | util/ec3po/__init__.py | 0 | ||||
-rwxr-xr-x | util/ec3po/console.py | 694 | ||||
-rwxr-xr-x | util/ec3po/console_unittest.py | 1112 | ||||
-rwxr-xr-x | util/ec3po/interpreter.py | 259 |
4 files changed, 2065 insertions, 0 deletions
diff --git a/util/ec3po/__init__.py b/util/ec3po/__init__.py new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/util/ec3po/__init__.py diff --git a/util/ec3po/console.py b/util/ec3po/console.py new file mode 100755 index 0000000000..de657b7ae0 --- /dev/null +++ b/util/ec3po/console.py @@ -0,0 +1,694 @@ +#!/usr/bin/python2 +# Copyright 2015 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""EC-3PO Interactive Console Interface + +console provides the console interface between the user and the interpreter. It +handles the presentation of the EC console including editing methods as well as +session-persistent command history. +""" +from __future__ import print_function +import argparse +from chromite.lib import cros_logging as logging +import multiprocessing +import os +import pty +import select +import sys + +import interpreter + + +PROMPT = '> ' +CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name. +CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user. + + +class EscState(object): + """Class which contains an enumeration for states of ESC sequences.""" + ESC_START = 1 + ESC_BRACKET = 2 + ESC_BRACKET_1 = 3 + ESC_BRACKET_3 = 4 + ESC_BRACKET_8 = 5 + + +class ControlKey(object): + """Class which contains codes for various control keys.""" + BACKSPACE = 0x08 + CTRL_A = 0x01 + CTRL_B = 0x02 + CTRL_D = 0x04 + CTRL_E = 0x05 + CTRL_F = 0x06 + CTRL_K = 0x0b + CTRL_N = 0xe + CTRL_P = 0x10 + CARRIAGE_RETURN = 0x0d + ESC = 0x1b + + +class MoveCursorError(Exception): + """Exception class for errors when moving the cursor.""" + pass + + +class Console(object): + """Class which provides the console interface between the EC and the user. + + This class essentially represents the console interface between the user and + the EC. It handles all of the console editing behaviour + + Attributes: + master_pty: File descriptor to the master side of the PTY. Used for driving + output to the user and receiving user input. + user_pty: A string representing the PTY name of the served console. + cmd_pipe: A multiprocessing.Connection object which represents the console + side of the command pipe. This must be a bidirectional pipe. Console + commands and responses utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the console's + read-only side of the debug pipe. This must be a unidirectional pipe + attached to the intepreter. EC debug messages use this pipe. + input_buffer: A string representing the current input command. + input_buffer_pos: An integer representing the current position in the buffer + to insert a char. + partial_cmd: A string representing the command entered on a line before + pressing the up arrow keys. + esc_state: An integer represeting the current state within an escape + sequence. + line_limit: An integer representing the maximum number of characters on a + line. + history: A list of strings containing the past entered console commands. + history_pos: An integer representing the current history buffer position. + This index is used to show previous commands. + prompt: A string representing the console prompt displayed to the user. + """ + + def __init__(self, master_pty, user_pty, cmd_pipe, dbg_pipe): + """Initalises a Console object with the provided arguments. + + Args: + master_pty: File descriptor to the master side of the PTY. Used for driving + output to the user and receiving user input. + user_pty: A string representing the PTY name of the served console. + cmd_pipe: A multiprocessing.Connection object which represents the console + side of the command pipe. This must be a bidirectional pipe. Console + commands and responses utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the console's + read-only side of the debug pipe. This must be a unidirectional pipe + attached to the intepreter. EC debug messages use this pipe. + """ + self.master_pty = master_pty + self.user_pty = user_pty + self.cmd_pipe = cmd_pipe + self.dbg_pipe = dbg_pipe + self.input_buffer = '' + self.input_buffer_pos = 0 + self.partial_cmd = '' + self.esc_state = 0 + self.line_limit = CONSOLE_INPUT_LINE_SIZE + self.history = [] + self.history_pos = 0 + self.prompt = PROMPT + + def __str__(self): + """Show internal state of Console object as a string.""" + string = [] + string.append('master_pty: %s' % self.master_pty) + string.append('user_pty: %s' % self.user_pty) + string.append('cmd_pipe: %s' % self.cmd_pipe) + string.append('dbg_pipe: %s' % self.dbg_pipe) + string.append('input_buffer: %s' % self.input_buffer) + string.append('input_buffer_pos: %d' % self.input_buffer_pos) + string.append('esc_state: %d' % self.esc_state) + string.append('line_limit: %d' % self.line_limit) + string.append('history: [\'' + '\', \''.join(self.history) + '\']') + string.append('history_pos: %d' % self.history_pos) + string.append('prompt: \'%s\'' % self.prompt) + string.append('partial_cmd: \'%s\''% self.partial_cmd) + return '\n'.join(string) + + def PrintHistory(self): + """Print the history of entered commands.""" + fd = self.master_pty + # Make it pretty by figuring out how wide to pad the numbers. + wide = (len(self.history) / 10) + 1 + for i in range(len(self.history)): + line = ' %*d %s\r\n' % (wide, i, self.history[i]) + os.write(fd, line) + + def ShowPreviousCommand(self): + """Shows the previous command from the history list.""" + # There's nothing to do if there's no history at all. + if not self.history: + logging.debug('No history to print.') + return + + # Don't do anything if there's no more history to show. + if self.history_pos == 0: + logging.debug('No more history to show.') + return + + logging.debug('current history position: %d.', self.history_pos) + + # Decrement the history buffer position. + self.history_pos -= 1 + logging.debug('new history position.: %d', self.history_pos) + + # Save the text entered on the console if any. + if self.history_pos == len(self.history)-1: + logging.debug('saving partial_cmd: \'%s\'', self.input_buffer) + self.partial_cmd = self.input_buffer + + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + + # Print the last entry in the history buffer. + logging.debug('printing previous entry %d - %s', self.history_pos, + self.history[self.history_pos]) + fd = self.master_pty + prev_cmd = self.history[self.history_pos] + os.write(fd, prev_cmd) + # Update the input buffer. + self.input_buffer = prev_cmd + self.input_buffer_pos = len(prev_cmd) + + def ShowNextCommand(self): + """Shows the next command from the history list.""" + # Don't do anything if there's no history at all. + if not self.history: + logging.debug('History buffer is empty.') + return + + fd = self.master_pty + + logging.debug('current history position: %d', self.history_pos) + # Increment the history position. + self.history_pos += 1 + + # Restore the partial cmd. + if self.history_pos == len(self.history): + logging.debug('Restoring partial command of \'%s\'', self.partial_cmd) + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + # Print the partially entered command if any. + os.write(fd, self.partial_cmd) + self.input_buffer = self.partial_cmd + self.input_buffer_pos = len(self.input_buffer) + # Now that we've printed it, clear the partial cmd storage. + self.partial_cmd = '' + # Reset history position. + self.history_pos = len(self.history) + return + + logging.debug('new history position: %d', self.history_pos) + if self.history_pos > len(self.history)-1: + logging.debug('No more history to show.') + self.history_pos -= 1 + logging.debug('Reset history position to %d', self.history_pos) + return + + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + + # Print the newer entry from the history buffer. + logging.debug('printing next entry %d - %s', self.history_pos, + self.history[self.history_pos]) + next_cmd = self.history[self.history_pos] + os.write(fd, next_cmd) + # Update the input buffer. + self.input_buffer = next_cmd + self.input_buffer_pos = len(next_cmd) + logging.debug('new history position: %d.', self.history_pos) + + def SliceOutChar(self): + """Remove a char from the line and shift everything over 1 column.""" + fd = self.master_pty + # Remove the character at the input_buffer_pos by slicing it out. + self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + \ + self.input_buffer[self.input_buffer_pos+1:] + # Write the rest of the line + moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos:]) + # Write a space to clear out the last char + moved_col += os.write(fd, ' ') + # Update the input buffer position. + self.input_buffer_pos += moved_col + # Reset the cursor + self.MoveCursor('left', moved_col) + + def HandleEsc(self, byte): + """HandleEsc processes escape sequences. + + Args: + byte: An integer representing the current byte in the sequence. + """ + # We shouldn't be handling an escape sequence if we haven't seen one. + assert self.esc_state != 0 + + if self.esc_state is EscState.ESC_START: + logging.debug('ESC_START') + if byte == ord('['): + self.esc_state = EscState.ESC_BRACKET + return + + else: + logging.error('Unexpected sequence. %c' % byte) + self.esc_state = 0 + + elif self.esc_state is EscState.ESC_BRACKET: + logging.debug('ESC_BRACKET') + # Left Arrow key was pressed. + if byte == ord('D'): + logging.debug('Left arrow key pressed.') + self.MoveCursor('left', 1) + self.esc_state = 0 # Reset the state. + return + + # Right Arrow key. + elif byte == ord('C'): + logging.debug('Right arrow key pressed.') + self.MoveCursor('right', 1) + self.esc_state = 0 # Reset the state. + return + + # Up Arrow key. + elif byte == ord('A'): + logging.debug('Up arrow key pressed.') + self.ShowPreviousCommand() + # Reset the state. + self.esc_state = 0 # Reset the state. + return + + # Down Arrow key. + elif byte == ord('B'): + logging.debug('Down arrow key pressed.') + self.ShowNextCommand() + # Reset the state. + self.esc_state = 0 # Reset the state. + return + + # For some reason, minicom sends a 1 instead of 7. /shrug + # TODO(aaboagye): Figure out why this happens. + elif byte == ord('1') or byte == ord('7'): + self.esc_state = EscState.ESC_BRACKET_1 + + elif byte == ord('3'): + self.esc_state = EscState.ESC_BRACKET_3 + + elif byte == ord('8'): + self.esc_state = EscState.ESC_BRACKET_8 + + else: + logging.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)' + % (chr(byte), byte)) + self.esc_state = 0 + return + + elif self.esc_state is EscState.ESC_BRACKET_1: + logging.debug('ESC_BRACKET_1') + # HOME key. + if byte == ord('~'): + logging.debug('Home key pressed.') + self.MoveCursor('left', self.input_buffer_pos) + self.esc_state = 0 # Reset the state. + logging.debug('ESC sequence complete.') + return + + elif self.esc_state is EscState.ESC_BRACKET_3: + logging.debug('ESC_BRACKET_3') + # DEL key. + if byte == ord('~'): + logging.debug('Delete key pressed.') + if self.input_buffer_pos != len(self.input_buffer): + self.SliceOutChar() + self.esc_state = 0 # Reset the state. + + elif self.esc_state is EscState.ESC_BRACKET_8: + logging.debug('ESC_BRACKET_8') + # END key. + if byte == ord('~'): + logging.debug('End key pressed.') + self.MoveCursor('right', + len(self.input_buffer) - self.input_buffer_pos) + self.esc_state = 0 # Reset the state. + logging.debug('ESC sequence complete.') + return + + else: + logging.error('Unexpected sequence. %c' % byte) + self.esc_state = 0 + + else: + logging.error('Unexpected sequence. %c' % byte) + self.esc_state = 0 + + def ProcessInput(self): + """Captures the input determines what actions to take.""" + # There's nothing to do if the input buffer is empty. + if len(self.input_buffer) == 0: + return + + # Don't store 2 consecutive identical commands in the history. + if (self.history and self.history[-1] != self.input_buffer + or not self.history): + self.history.append(self.input_buffer) + + # Split the command up by spaces. + line = self.input_buffer.split(' ') + logging.debug('cmd: %s' % (self.input_buffer)) + cmd = line[0].lower() + + # The 'history' command is a special case that we handle locally. + if cmd == 'history': + self.PrintHistory() + return + + # Send the command to the interpreter. + logging.debug('Sending command to interpreter.') + self.cmd_pipe.send(self.input_buffer) + + def HandleChar(self, byte): + """HandleChar does a certain action when it receives a character. + + Args: + byte: An integer representing the character received from the user. + """ + # Keep handling the ESC sequence if we're in the middle of it. + if self.esc_state != 0: + self.HandleEsc(byte) + return + + # When we're at the end of the line, we should only allow going backwards, + # backspace, carriage return, up, or down. The arrow keys are escape + # sequences, so we let the escape...escape. + if (self.input_buffer_pos >= self.line_limit and + byte not in [ControlKey.CTRL_B, ControlKey.ESC, ControlKey.BACKSPACE, + ControlKey.CTRL_A, ControlKey.CARRIAGE_RETURN, + ControlKey.CTRL_P, ControlKey.CTRL_N]): + return + + # If the input buffer is full we can't accept new chars. + buffer_full = len(self.input_buffer) >= self.line_limit + + fd = self.master_pty + + # Carriage_Return/Enter + if byte == ControlKey.CARRIAGE_RETURN: + logging.debug('Enter key pressed.') + # Put a carriage return/newline and the print the prompt. + os.write(fd, '\r\n') + + # TODO(aaboagye): When we control the printing of all output, print the + # prompt AFTER printing all the output. We can't do it yet because we + # don't know how much is coming from the EC. + + # Print the prompt. + os.write(fd, self.prompt) + # Process the input. + self.ProcessInput() + # Now, clear the buffer. + self.input_buffer = '' + self.input_buffer_pos = 0 + # Reset history buffer pos. + self.history_pos = len(self.history) + # Clear partial command. + self.partial_cmd = '' + + # Backspace + elif byte == ControlKey.BACKSPACE: + logging.debug('Backspace pressed.') + if self.input_buffer_pos > 0: + # Move left 1 column. + self.MoveCursor('left', 1) + # Remove the character at the input_buffer_pos by slicing it out. + self.SliceOutChar() + + logging.debug('input_buffer_pos: %d' % (self.input_buffer_pos)) + + # Ctrl+A. Move cursor to beginning of the line + elif byte == ControlKey.CTRL_A: + logging.debug('Control+A pressed.') + self.MoveCursor('left', self.input_buffer_pos) + + # Ctrl+B. Move cursor left 1 column. + elif byte == ControlKey.CTRL_B: + logging.debug('Control+B pressed.') + self.MoveCursor('left', 1) + + # Ctrl+D. Delete a character. + elif byte == ControlKey.CTRL_D: + logging.debug('Control+D pressed.') + if self.input_buffer_pos != len(self.input_buffer): + # Remove the character by slicing it out. + self.SliceOutChar() + + # Ctrl+E. Move cursor to end of the line. + elif byte == ControlKey.CTRL_E: + logging.debug('Control+E pressed.') + self.MoveCursor('right', + len(self.input_buffer) - self.input_buffer_pos) + + # Ctrl+F. Move cursor right 1 column. + elif byte == ControlKey.CTRL_F: + logging.debug('Control+F pressed.') + self.MoveCursor('right', 1) + + # Ctrl+K. Kill line. + elif byte == ControlKey.CTRL_K: + logging.debug('Control+K pressed.') + self.KillLine() + + # Ctrl+N. Next line. + elif byte == ControlKey.CTRL_N: + logging.debug('Control+N pressed.') + self.ShowNextCommand() + + # Ctrl+P. Previous line. + elif byte == ControlKey.CTRL_P: + logging.debug('Control+P pressed.') + self.ShowPreviousCommand() + + # ESC sequence + elif byte == ControlKey.ESC: + # Starting an ESC sequence + self.esc_state = EscState.ESC_START + + # Only print printable chars. + elif IsPrintable(byte): + # Drop the character if we're full. + if buffer_full: + logging.debug('Dropped char: %c(%d)', byte, byte) + return + # Print the character. + os.write(fd, chr(byte)) + # Print the rest of the line (if any). + extra_bytes_written = os.write(fd, + self.input_buffer[self.input_buffer_pos:]) + + # Recreate the input buffer. + self.input_buffer = (self.input_buffer[0:self.input_buffer_pos] + + ('%c' % byte) + + self.input_buffer[self.input_buffer_pos:]) + # Update the input buffer position. + self.input_buffer_pos += 1 + extra_bytes_written + + # Reset the cursor if we wrote any extra bytes. + if extra_bytes_written: + self.MoveCursor('left', extra_bytes_written) + + logging.debug('input_buffer_pos: %d' % (self.input_buffer_pos)) + + def MoveCursor(self, direction, count): + """MoveCursor moves the cursor left or right by count columns. + + Args: + direction: A string that should be either 'left' or 'right' representing + the direction to move the cursor on the console. + count: An integer representing how many columns the cursor should be + moved. + + Raises: + ValueError: If the direction is not equal to 'left' or 'right'. + """ + # If there's nothing to move, we're done. + if not count: + return + fd = self.master_pty + seq = '\033[' + str(count) + if direction == 'left': + # Bind the movement. + if count > self.input_buffer_pos: + count = self.input_buffer_pos + seq += 'D' + logging.debug('move cursor left %d', count) + self.input_buffer_pos -= count + + elif direction == 'right': + # Bind the movement. + if (count + self.input_buffer_pos) > len(self.input_buffer): + count = 0 + seq += 'C' + logging.debug('move cursor right %d', count) + self.input_buffer_pos += count + + else: + raise MoveCursorError(('The only valid directions are \'left\' and ' + '\'right\'')) + + logging.debug('input_buffer_pos: %d' % self.input_buffer_pos) + # Move the cursor. + if count != 0: + os.write(fd, seq) + + def KillLine(self): + """Kill the rest of the line based on the input buffer position.""" + # Killing the line is killing all the text to the right. + diff = len(self.input_buffer) - self.input_buffer_pos + logging.debug('diff: %d' % diff) + # Diff shouldn't be negative, but if it is for some reason, let's try to + # correct the cursor. + if diff < 0: + logging.warning('Resetting input buffer position to %d...', + len(self.input_buffer)) + self.MoveCursor('left', -diff) + return + if diff: + self.MoveCursor('right', diff) + for _ in range(diff): + self.SendBackspace() + self.input_buffer_pos -= diff + self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + + def SendBackspace(self): + """Backspace a character on the console.""" + os.write(self.master_pty, '\033[1D \033[1D') + + +def IsPrintable(byte): + """Determines if a byte is printable. + + Args: + byte: An integer potentially representing a printable character. + + Returns: + A boolean indicating whether the byte is a printable character. + """ + return byte >= ord(' ') and byte <= ord('~') + + +def StartLoop(console): + """Starts the infinite loop of console processing. + + Args: + console: A Console object that has been properly initialzed. + """ + logging.info('EC Console is being served on %s.', console.user_pty) + logging.debug(console) + while True: + # Check to see if pipes or the console are ready for reading. + read_list = [console.master_pty, console.cmd_pipe, console.dbg_pipe] + ready_for_reading = select.select(read_list, [], [])[0] + + for obj in ready_for_reading: + if obj is console.master_pty: + logging.debug('Input from user') + # Convert to bytes so we can look for non-printable chars such as + # Ctrl+A, Ctrl+E, etc. + line = bytearray(os.read(console.master_pty, CONSOLE_MAX_READ)) + for i in line: + # Handle each character as it arrives. + console.HandleChar(i) + + elif obj is console.cmd_pipe: + data = console.cmd_pipe.recv() + # Write it to the user console. + logging.debug('|CMD|->\'%s\'', data) + os.write(console.master_pty, data) + + elif obj is console.dbg_pipe: + data = console.dbg_pipe.recv() + # Write it to the user console. + logging.debug('|DBG|->\'%s\'', data) + os.write(console.master_pty, data) + + +def main(): + """Kicks off the EC-3PO interactive console interface and interpreter. + + We create some pipes to communicate with an interpreter, instantiate an + interpreter, create a PTY pair, and begin serving the console interface. + """ + # Set up argument parser. + parser = argparse.ArgumentParser(description=('Start interactive EC console ' + 'and interpreter.')) + # TODO(aaboagye): Eventually get this from servod. + parser.add_argument('ec_uart_pty', + help=('The full PTY name that the EC UART' + ' is present on. eg: /dev/pts/12')) + parser.add_argument('--log-level', + default='info', + help=('info, debug, warning, error, or critical')) + + # Parse arguments. + args = parser.parse_args() + + # Can't do much without an EC to talk to. + if not args.ec_uart_pty: + parser.print_help() + sys.exit(1) + + # Set logging level. + args.log_level = args.log_level.lower() + if args.log_level == 'info': + log_level = logging.INFO + elif args.log_level == 'debug': + log_level = logging.DEBUG + elif args.log_level == 'warning': + log_level = logging.WARNING + elif args.log_level == 'error': + log_level = logging.ERROR + elif args.log_level == 'critical': + log_level = logging.CRITICAL + else: + print('Error: Invalid log level.') + parser.print_help() + sys.exit(1) + + # Start logging with a timestamp, module, and log level shown in each log + # entry. + logging.basicConfig(level=log_level, format=('%(asctime)s - %(module)s -' + ' %(levelname)s - %(message)s')) + + # Create some pipes to communicate between the interpreter and the console. + # The command pipe is bidirectional. + cmd_pipe_interactive, cmd_pipe_interp = multiprocessing.Pipe() + # The debug pipe is unidirectional from interpreter to console only. + dbg_pipe_interactive, dbg_pipe_interp = multiprocessing.Pipe(duplex=False) + + # Create an interpreter instance. + itpr = interpreter.Interpreter(args.ec_uart_pty, cmd_pipe_interp, + dbg_pipe_interp, log_level) + + # Spawn an interpreter process. + itpr_process = multiprocessing.Process(target=interpreter.StartLoop, + args=(itpr,)) + # Make sure to kill the interpreter when we terminate. + itpr_process.daemon = True + # Start the interpreter. + itpr_process.start() + + # Open a new pseudo-terminal pair + (master_pty, user_pty) = pty.openpty() + # Create a console. + console = Console(master_pty, os.ttyname(user_pty), cmd_pipe_interactive, + dbg_pipe_interactive) + # Start serving the console. + StartLoop(console) + + +if __name__ == '__main__': + main() diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py new file mode 100755 index 0000000000..7e9e4ab9f1 --- /dev/null +++ b/util/ec3po/console_unittest.py @@ -0,0 +1,1112 @@ +#!/usr/bin/python2 +# Copyright 2015 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Unit tests for the EC-3PO Console interface.""" +from __future__ import print_function +import binascii +import mock +import multiprocessing +import tempfile +import unittest + +import console + +ESC_STRING = chr(console.ControlKey.ESC) + +class Keys(object): + """A class that contains the escape sequences for special keys.""" + LEFT_ARROW = [console.ControlKey.ESC, ord('['), ord('D')] + RIGHT_ARROW = [console.ControlKey.ESC, ord('['), ord('C')] + UP_ARROW = [console.ControlKey.ESC, ord('['), ord('A')] + DOWN_ARROW = [console.ControlKey.ESC, ord('['), ord('B')] + HOME = [console.ControlKey.ESC, ord('['), ord('1'), ord('~')] + END = [console.ControlKey.ESC, ord('['), ord('8'), ord('~')] + DEL = [console.ControlKey.ESC, ord('['), ord('3'), ord('~')] + +class OutputStream(object): + """A class that has methods which return common console output.""" + + @staticmethod + def MoveCursorLeft(count): + """Produces what would be printed to the console if the cursor moved left. + + Args: + count: An integer representing how many columns to move left. + + Returns: + string: A string which contains what would be printed to the console if + the cursor moved left. + """ + string = ESC_STRING + string += '[' + str(count) + 'D' + return string + + @staticmethod + def MoveCursorRight(count): + """Produces what would be printed to the console if the cursor moved right. + + Args: + count: An integer representing how many columns to move right. + + Returns: + string: A string which contains what would be printed to the console if + the cursor moved right. + """ + string = ESC_STRING + string += '[' + str(count) + 'C' + return string + +BACKSPACE_STRING = '' +# Move cursor left 1 column. +BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) +# Write a space. +BACKSPACE_STRING += ' ' +# Move cursor left 1 column. +BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) + +class TestConsoleEditingMethods(unittest.TestCase): + """Test case to verify all console editing methods.""" + + def setUp(self): + """Setup the test harness.""" + # Create a temp file and set both the master and slave PTYs to the file to + # create a loopback. + self.tempfile = tempfile.TemporaryFile() + + # Create some dummy pipes. These won't be used since we'll mock out sends + # to the interpreter. + dummy_pipe_end_0, dummy_pipe_end_1 = multiprocessing.Pipe() + self.console = console.Console(self.tempfile.fileno(), self.tempfile, + dummy_pipe_end_0, dummy_pipe_end_1) + + # Mock out sends to the interpreter. + multiprocessing.Pipe.send = mock.MagicMock() + + def StringToByteList(self, string): + """Converts a string to list of bytes. + + Args: + string: A literal string to turn into a list of bytes. + + Returns: + A list of integers representing the byte value of each character in the + string. + """ + return [ord(c) for c in string] + + def BadConsoleOutput(self, expected, got): + """Format the console output into readable text. + + Args: + expected: A list of bytes representing the expected output console + stream. + got: A list of byte representing the actual output console stream. + + Returns: + string: A formatted string which shows the expected console output stream + and the actual console output stream. + """ + esc_state = 0 + string = 'Incorrect console output stream.\n' + string += 'exp: |' + count = 0 + for char in expected: + if esc_state != 0: + if esc_state == console.EscState.ESC_START: + if char == '[': + esc_state = console.EscState.ESC_BRACKET + elif esc_state == console.EscState.ESC_BRACKET: + if char == 'D': + string += '[cursor left ' + str(count) + ' cols]' + esc_state = 0 + elif char == 'C': + string += '[cursor right ' + str(count) + ' cols]' + esc_state = 0 + else: + count = int(char) + # Print if it's printable. + elif console.IsPrintable(ord(char)): + string += char + else: + # It might be a sequence of some type. + if ord(char) == console.ControlKey.ESC: + # Need to look at the following sequence. + esc_state = console.EscState.ESC_START + else: + string += '{' + binascii.hexlify(char) + '}' + + string += '|\n\ngot: |' + for char in got: + if esc_state != 0: + if esc_state == console.EscState.ESC_START: + if char == '[': + esc_state = console.EscState.ESC_BRACKET + elif esc_state == console.EscState.ESC_BRACKET: + if char == 'D': + string += '[cursor left ' + str(count) + ' cols]' + esc_state = 0 + elif char == 'C': + string += '[cursor right ' + str(count) + ' cols]' + esc_state = 0 + else: + count = int(char) + # Print if it's printable. + elif console.IsPrintable(ord(char)): + string += char + else: + # It might be a sequence of some type. + if ord(char) == console.ControlKey.ESC: + # Need to look at the following sequence. + esc_state = console.EscState.ESC_START + else: + string += '{' + binascii.hexlify(char) + '}' + string += '|\n\n' + + # TODO(aaboagye): It would be nice to replace all those move left 1, ' ', + # move left 1, with backspace. + + return string + + def CheckConsoleOutput(self, exp_console_out): + """Verify what was sent out the console matches what we expect. + + Args: + exp_console_out: A string representing the console output stream. + """ + # Read what was sent out the console. + self.tempfile.seek(0) + console_out = self.tempfile.read() + + self.assertEqual(exp_console_out, + console_out, + (self.BadConsoleOutput(exp_console_out, console_out) + + str(self.console))) + + def CheckInputBuffer(self, exp_input_buffer): + """Verify that the input buffer contains what we expect. + + Args: + exp_input_buffer: A string containing the contents of the current input + buffer. + """ + self.assertEqual(exp_input_buffer, self.console.input_buffer, + ('input buffer does not match expected.\n' + 'expected: |' + exp_input_buffer + '|\n' + 'got: |' + self.console.input_buffer + '|\n' + + str(self.console))) + + def CheckInputBufferPosition(self, exp_pos): + """Verify the input buffer position. + + Args: + exp_pos: An integer representing the expected input buffer position. + """ + self.assertEqual(exp_pos, self.console.input_buffer_pos, + 'input buffer position is incorrect.\ngot: ' + + str(self.console.input_buffer_pos) + '\nexp: ' + + str(exp_pos) + '\n' + str(self.console)) + + def CheckHistoryBuffer(self, exp_history): + """Verify that the items in the history buffer are what we expect. + + Args: + exp_history: A list of strings representing the expected contents of the + history buffer. + """ + # First, check to see if the length is what we expect. + self.assertEqual(len(exp_history), len(self.console.history), + ('The number of items in the history is unexpected.\n' + 'exp: ' + str(len(exp_history)) + '\n' + 'got: ' + str(len(self.console.history)) + '\n' + 'internal state:\n' + str(self.console))) + + # Next, check the actual contents of the history buffer. + for i in range(len(exp_history)): + self.assertEqual(exp_history[i], self.console.history[i], + ('history buffer contents are incorrect.\n' + 'exp: ' + exp_history[i] + '\n' + 'got: ' + self.console.history[i] + '\n' + 'internal state:\n' + str(self.console))) + + def test_EnteringChars(self): + """Verify that characters are echoed onto the console.""" + test_str = 'abc' + input_stream = self.StringToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # Check the input position. + exp_pos = len(test_str) + self.CheckInputBufferPosition(exp_pos) + + # Verify that the input buffer is correct. + expected_buffer = test_str + self.CheckInputBuffer(expected_buffer) + + # Check console output + exp_console_out = test_str + self.CheckConsoleOutput(exp_console_out) + + def test_EnteringDeletingMoreCharsThanEntered(self): + """Verify that we can press backspace more than we have entered chars.""" + test_str = 'spamspam' + input_stream = self.StringToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # Now backspace 1 more than what we sent. + input_stream = [] + for _ in range(len(test_str) + 1): + input_stream.append(console.ControlKey.BACKSPACE) + + # Send that sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that input buffer position is 0. + self.CheckInputBufferPosition(0) + + # Next, examine the output stream for the correct sequence. + exp_console_out = test_str + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + + # Now, verify that we got what we expected. + self.CheckConsoleOutput(exp_console_out) + + def test_EnteringMoreThanCharLimit(self): + """Verify that we drop characters when the line is too long.""" + test_str = self.console.line_limit * 'o' # All allowed. + test_str += 5 * 'x' # All should be dropped. + input_stream = self.StringToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, we expect that input buffer position should be equal to the line + # limit. + exp_pos = self.console.line_limit + self.CheckInputBufferPosition(exp_pos) + + # The input buffer should only hold until the line limit. + exp_buffer = test_str[0:self.console.line_limit] + self.CheckInputBuffer(exp_buffer) + + # Lastly, check that the extra characters are not printed. + exp_console_out = exp_buffer + self.CheckConsoleOutput(exp_console_out) + + def test_ValidKeysOnLongLine(self): + """Verify that we can still press valid keys if the line is too long.""" + # Fill the line. + test_str = self.console.line_limit * 'o' + exp_console_out = test_str + # Try to fill it even more; these should all be dropped. + test_str += 5 * 'x' + input_stream = self.StringToByteList(test_str) + + # We should be able to press the following keys: + # - Backspace + # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N + # - Delete + # - Home/CTRL+A + # - End/CTRL+E + # - Carriage Return + + # Backspace 1 character + input_stream.append(console.ControlKey.BACKSPACE) + exp_console_out += BACKSPACE_STRING + # Refill the line. + input_stream.extend(self.StringToByteList('o')) + exp_console_out += 'o' + + # Left arrow key. + input_stream.extend(Keys.LEFT_ARROW) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Right arrow key. + input_stream.extend(Keys.RIGHT_ARROW) + exp_console_out += OutputStream.MoveCursorRight(1) + + # CTRL+B + input_stream.append(console.ControlKey.CTRL_B) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # CTRL+F + input_stream.append(console.ControlKey.CTRL_F) + exp_console_out += OutputStream.MoveCursorRight(1) + + # Let's press enter now so we can test up and down. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + exp_console_out += '\r\n' + self.console.prompt + + # Up arrow key. + input_stream.extend(Keys.UP_ARROW) + exp_console_out += test_str[:self.console.line_limit] + + # Down arrow key. + input_stream.extend(Keys.DOWN_ARROW) + # Since the line was blank, we have to backspace the entire line. + exp_console_out += self.console.line_limit * BACKSPACE_STRING + + # CTRL+P + input_stream.append(console.ControlKey.CTRL_P) + exp_console_out += test_str[:self.console.line_limit] + + # CTRL+N + input_stream.append(console.ControlKey.CTRL_N) + # Since the line was blank, we have to backspace the entire line. + exp_console_out += self.console.line_limit * BACKSPACE_STRING + + # Press the Up arrow key to reprint the long line. + input_stream.extend(Keys.UP_ARROW) + exp_console_out += test_str[:self.console.line_limit] + + # Press the Home key to jump to the beginning of the line. + input_stream.extend(Keys.HOME) + exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) + + # Press the End key to jump to the end of the line. + input_stream.extend(Keys.END) + exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) + + # Press CTRL+A to jump to the beginning of the line. + input_stream.append(console.ControlKey.CTRL_A) + exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) + + # Press CTRL+E to jump to the end of the line. + input_stream.extend(Keys.END) + exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) + + # Move left one column so we can delete a character. + input_stream.extend(Keys.LEFT_ARROW) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Press the delete key. + input_stream.extend(Keys.DEL) + # This should look like a space, and then move cursor left 1 column since + # we're at the end of line. + exp_console_out += ' ' + OutputStream.MoveCursorLeft(1) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify everything happened correctly. + self.CheckConsoleOutput(exp_console_out) + + def test_BackspaceOnEmptyLine(self): + """Verify that we can backspace on an empty line with no bad effects.""" + # Send a single backspace. + test_str = [console.ControlKey.BACKSPACE] + + # Send the characters in. + for byte in test_str: + self.console.HandleChar(byte) + + # Check the input position. + exp_pos = 0 + self.CheckInputBufferPosition(exp_pos) + + # Check that buffer is empty. + exp_input_buffer = '' + self.CheckInputBuffer(exp_input_buffer) + + # Check that the console output is empty. + exp_console_out = '' + self.CheckConsoleOutput(exp_console_out) + + def test_BackspaceWithinLine(self): + """Verify that we shift the chars over when backspacing within a line.""" + # Misspell 'help' + test_str = 'heelp' + input_stream = self.StringToByteList(test_str) + # Use the arrow key to go back to fix it. + # Move cursor left 1 column. + input_stream.extend(2*Keys.LEFT_ARROW) + # Backspace once to remove the extra 'e'. + input_stream.append(console.ControlKey.BACKSPACE) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify the input buffer + exp_input_buffer = 'help' + self.CheckInputBuffer(exp_input_buffer) + + # Verify the input buffer position. It should be at 2 (cursor over the 'l') + self.CheckInputBufferPosition(2) + + # We expect the console output to be the test string, with two moves to the + # left, another move left, and then the rest of the line followed by a + # space. + exp_console_out = test_str + exp_console_out += 2 * OutputStream.MoveCursorLeft(1) + + # Move cursor left 1 column. + exp_console_out += OutputStream.MoveCursorLeft(1) + # Rest of the line and a space. (test_str in this case) + exp_console_out += 'lp ' + # Reset the cursor 2 + 1 to the left. + exp_console_out += OutputStream.MoveCursorLeft(3) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_JumpToBeginningOfLineViaCtrlA(self): + """Verify that we can jump to the beginning of a line with Ctrl+A.""" + # Enter some chars and press CTRL+A + test_str = 'abc' + input_stream = self.StringToByteList(test_str) + [console.ControlKey.CTRL_A] + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect to see our test string followed by a move cursor left. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + + # Check to see what whas printed on the console. + self.CheckConsoleOutput(exp_console_out) + + # Check that the input buffer position is now 0. + self.CheckInputBufferPosition(0) + + # Check input buffer still contains our test string. + self.CheckInputBuffer(test_str) + + def test_JumpToBeginningOfLineViaHomeKey(self): + """Jump to beginning of line via HOME key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + input_stream.extend(Keys.HOME) + + # Send out the stream. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that input buffer position is now 0. + self.CheckInputBufferPosition(0) + + # Next, verify that the input buffer did not change. + self.CheckInputBuffer(test_str) + + # Lastly, check that the cursor moved correctly. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + self.CheckConsoleOutput(exp_console_out) + + def test_JumpToEndOfLineViaEndKey(self): + """Jump to the end of the line using the END key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + input_stream += [console.ControlKey.CTRL_A] + # Now, jump to the end of the line. + input_stream.extend(Keys.END) + + # Send out the stream. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is correct. This should be at the + # end of the test string. + self.CheckInputBufferPosition(len(test_str)) + + # The expected output should be the test string, followed by a jump to the + # beginning of the line, and lastly a jump to the end of the line. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Now the jump back to the end of the line. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + + # Verify console output stream. + self.CheckConsoleOutput(exp_console_out) + + def test_JumpToEndOfLineViaCtrlE(self): + """Enter some chars and then try to jump to the end. (Should be a no-op)""" + test_str = 'sysinfo' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_E) + + # Send out the stream + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position isn't any further than we expect. + # At this point, the position should be at the end of the test string. + self.CheckInputBufferPosition(len(test_str)) + + # Now, let's try to jump to the beginning and then jump back to the end. + input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E] + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Perform the same verification. + self.CheckInputBufferPosition(len(test_str)) + + # Lastly try to jump again, beyond the end. + input_stream = [console.ControlKey.CTRL_E] + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Perform the same verification. + self.CheckInputBufferPosition(len(test_str)) + + # We expect to see the test string, a jump to the begining of the line, and + # one jump to the end of the line. + exp_console_out = test_str + # Jump to beginning. + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Jump back to end. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + + # Verify the console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveLeftWithArrowKey(self): + """Move cursor left one column with arrow key.""" + test_str = 'tastyspam' + input_stream = self.StringToByteList(test_str) + input_stream.extend(Keys.LEFT_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1 less than the length. + self.CheckInputBufferPosition(len(test_str) - 1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a one column move left. + exp_console_out = test_str + OutputStream.MoveCursorLeft(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveLeftWithCtrlB(self): + """Move cursor back one column with Ctrl+B.""" + test_str = 'tastyspam' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_B) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1 less than the length. + self.CheckInputBufferPosition(len(test_str) - 1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a one column move left. + exp_console_out = test_str + OutputStream.MoveCursorLeft(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveRightWithArrowKey(self): + """Move cursor one column to the right with the arrow key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + # Jump to beginning of line. + input_stream.append(console.ControlKey.CTRL_A) + # Press right arrow key. + input_stream.extend(Keys.RIGHT_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1. + self.CheckInputBufferPosition(1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a jump to the beginning of the + # line, and finally a move right 1. + exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) + + # A move right 1 column. + exp_console_out += OutputStream.MoveCursorRight(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveRightWithCtrlF(self): + """Move cursor forward one column with Ctrl+F.""" + test_str = 'panicinfo' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_A) + # Now, move right one column. + input_stream.append(console.ControlKey.CTRL_F) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1. + self.CheckInputBufferPosition(1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a jump to the beginning of the + # line, and finally a move right 1. + exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) + + # A move right 1 column. + exp_console_out += OutputStream.MoveCursorRight(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_ImpossibleMoveLeftWithArrowKey(self): + """Verify that we can't move left at the beginning of the line.""" + # We shouldn't be able to move left if we're at the beginning of the line. + input_stream = Keys.LEFT_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Nothing should have been output. + exp_console_output = '' + self.CheckConsoleOutput(exp_console_output) + + # The input buffer position should still be 0. + self.CheckInputBufferPosition(0) + + # The input buffer itself should be empty. + self.CheckInputBuffer('') + + def test_ImpossibleMoveRightWithArrowKey(self): + """Verify that we can't move right at the end of the line.""" + # We shouldn't be able to move right if we're at the end of the line. + input_stream = Keys.RIGHT_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Nothing should have been output. + exp_console_output = '' + self.CheckConsoleOutput(exp_console_output) + + # The input buffer position should still be 0. + self.CheckInputBufferPosition(0) + + # The input buffer itself should be empty. + self.CheckInputBuffer('') + + def test_KillEntireLine(self): + """Verify that we can kill an entire line with Ctrl+K.""" + test_str = 'accelinfo on' + input_stream = self.StringToByteList(test_str) + # Jump to beginning of line and then kill it with Ctrl+K. + input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, we expect that the input buffer is empty. + self.CheckInputBuffer('') + + # The buffer position should be 0. + self.CheckInputBufferPosition(0) + + # What we expect to see on the console stream should be the following. The + # test string, a jump to the beginning of the line, then jump back to the + # end of the line and replace the line with spaces. + exp_console_out = test_str + # Jump to beginning of line. + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Jump to end of line. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + # Replace line with spaces, which looks like backspaces. + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + + # Verify the console output. + self.CheckConsoleOutput(exp_console_out) + + def test_KillPartialLine(self): + """Verify that we can kill a portion of a line.""" + test_str = 'accelread 0 1' + input_stream = self.StringToByteList(test_str) + len_to_kill = 5 + for _ in range(len_to_kill): + # Move cursor left + input_stream.extend(Keys.LEFT_ARROW) + # Now kill + input_stream.append(console.ControlKey.CTRL_K) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, check that the input buffer was truncated. + exp_input_buffer = test_str[:-len_to_kill] + self.CheckInputBuffer(exp_input_buffer) + + # Verify the input buffer position. + self.CheckInputBufferPosition(len(test_str) - len_to_kill) + + # The console output stream that we expect is the test string followed by a + # move left of len_to_kill, then a jump to the end of the line and backspace + # of len_to_kill. + exp_console_out = test_str + for _ in range(len_to_kill): + # Move left 1 column. + exp_console_out += OutputStream.MoveCursorLeft(1) + # Then jump to the end of the line + exp_console_out += OutputStream.MoveCursorRight(len_to_kill) + # Backspace of len_to_kill + for _ in range(len_to_kill): + exp_console_out += BACKSPACE_STRING + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_InsertingCharacters(self): + """Verify that we can insert charcters within the line.""" + test_str = 'accel 0 1' # Here we forgot the 'read' part in 'accelread' + input_stream = self.StringToByteList(test_str) + # We need to move over to the 'l' and add read. + insertion_point = test_str.find('l') + 1 + for i in range(len(test_str) - insertion_point): + # Move cursor left. + input_stream.extend(Keys.LEFT_ARROW) + # Now, add in 'read' + added_str = 'read' + input_stream.extend(self.StringToByteList(added_str)) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that the input buffer is correct. + exp_input_buffer = test_str[:insertion_point] + added_str + exp_input_buffer += test_str[insertion_point:] + self.CheckInputBuffer(exp_input_buffer) + + # Verify that the input buffer position is correct. + exp_input_buffer_pos = insertion_point + len(added_str) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + # The console output stream that we expect is the test string, followed by + # move cursor left until the 'l' was found, the added test string while + # shifting characters around. + exp_console_out = test_str + for i in range(len(test_str) - insertion_point): + # Move cursor left. + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Now for each character, write the rest of the line will be shifted to the + # right one column. + for i in range(len(added_str)): + # Printed character. + exp_console_out += added_str[i] + # The rest of the line + exp_console_out += test_str[insertion_point:] + # Reset the cursor back left + reset_dist = len(test_str[insertion_point:]) + exp_console_out += OutputStream.MoveCursorLeft(reset_dist) + + # Verify the console output. + self.CheckConsoleOutput(exp_console_out) + + def test_StoreCommandHistory(self): + """Verify that entered commands are stored in the history.""" + test_commands = [] + test_commands.append('help') + test_commands.append('version') + test_commands.append('accelread 0 1') + input_stream = [] + for c in test_commands: + input_stream.extend(self.StringToByteList(c)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect to have the test commands in the history buffer. + exp_history_buf = test_commands + self.CheckHistoryBuffer(exp_history_buf) + + def test_CycleUpThruCommandHistory(self): + """Verify that the UP arrow key will print itmes in the history buffer.""" + # Enter some commands. + test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] + input_stream = [] + for command in test_commands: + input_stream.extend(self.StringToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Now, hit the UP arrow key to print the previous entries. + for i in range(len(test_commands)): + input_stream.extend(Keys.UP_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be test commands with prompts printed in + # between, followed by line kills with the previous test commands printed. + exp_console_out = '' + for i in range(len(test_commands)): + exp_console_out += test_commands[i] + '\r\n' + self.console.prompt + + # When we press up, the line should be cleared and print the previous buffer + # entry. + for i in range(len(test_commands)-1, 0, -1): + exp_console_out += test_commands[i] + # Backspace to the beginning. + for i in range(len(test_commands[i])): + exp_console_out += BACKSPACE_STRING + + # The last command should just be printed out with no backspacing. + exp_console_out += test_commands[0] + + # Now, verify. + self.CheckConsoleOutput(exp_console_out) + + def test_UpArrowOnEmptyHistory(self): + """Ensure nothing happens if the history is empty.""" + # Press the up arrow key twice. + input_stream = 2 * Keys.UP_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect nothing to have happened. + exp_console_out = '' + exp_input_buffer = '' + exp_input_buffer_pos = 0 + exp_history_buf = [] + + # Verify. + self.CheckConsoleOutput(exp_console_out) + self.CheckInputBufferPosition(exp_input_buffer_pos) + self.CheckInputBuffer(exp_input_buffer) + self.CheckHistoryBuffer(exp_history_buf) + + def test_UpArrowDoesNotGoOutOfBounds(self): + """Verify that pressing the up arrow many times won't go out of bounds.""" + # Enter one command. + test_str = 'help version' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + # Then press the up arrow key twice. + input_stream.extend(2 * Keys.UP_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the history buffer is correct. + exp_history_buf = [test_str] + self.CheckHistoryBuffer(exp_history_buf) + + # We expect that the console output should only contain our entered command, + # a new prompt, and then our command aggain. + exp_console_out = test_str + '\r\n' + self.console.prompt + # Pressing up should reprint the command we entered. + exp_console_out += test_str + + # Verify. + self.CheckConsoleOutput(exp_console_out) + + def test_CycleDownThruCommandHistory(self): + """Verify that we can select entries by hitting the down arrow.""" + # Enter at least 4 commands. + test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] + input_stream = [] + for command in test_commands: + input_stream.extend(self.StringToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Now, hit the UP arrow key twice to print the previous two entries. + for i in range(2): + input_stream.extend(Keys.UP_ARROW) + + # Now, hit the DOWN arrow key twice to print the newer entries. + input_stream.extend(2*Keys.DOWN_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be commands that we entered, followed by + # prompts, then followed by our last two commands in reverse. Then, we + # should see the last entry in the list, followed by the saved partial cmd + # of a blank line. + exp_console_out = '' + for i in range(len(test_commands)): + exp_console_out += test_commands[i] + '\r\n' + self.console.prompt + + # When we press up, the line should be cleared and print the previous buffer + # entry. + for i in range(len(test_commands)-1, 1, -1): + exp_console_out += test_commands[i] + # Backspace to the beginning. + for i in range(len(test_commands[i])): + exp_console_out += BACKSPACE_STRING + + # When we press down, it should have cleared the last command (which we + # covered with the previous for loop), and then prints the next command. + exp_console_out += test_commands[3] + for i in range(len(test_commands[3])): + exp_console_out += BACKSPACE_STRING + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + # Verify input buffer. + exp_input_buffer = '' # Empty because our partial command was empty. + exp_input_buffer_pos = len(exp_input_buffer) + self.CheckInputBuffer(exp_input_buffer) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + def test_SavingPartialCommandWhenNavigatingHistory(self): + """Verify that partial commands are saved when navigating history.""" + # Enter a command. + test_str = 'accelinfo' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Enter a partial command. + partial_cmd = 'ver' + input_stream.extend(self.StringToByteList(partial_cmd)) + + # Hit the UP arrow key. + input_stream.extend(Keys.UP_ARROW) + # Then, the DOWN arrow key. + input_stream.extend(Keys.DOWN_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be the command we entered, a prompt, the + # partial command, clearing of the partial command, the command entered, + # clearing of the command entered, and then the partial command. + exp_console_out = test_str + '\r\n' + self.console.prompt + exp_console_out += partial_cmd + for _ in range(len(partial_cmd)): + exp_console_out += BACKSPACE_STRING + exp_console_out += test_str + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + exp_console_out += partial_cmd + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + # Verify input buffer. + exp_input_buffer = partial_cmd + exp_input_buffer_pos = len(exp_input_buffer) + self.CheckInputBuffer(exp_input_buffer) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + def test_DownArrowOnEmptyHistory(self): + """Ensure nothing happens if the history is empty.""" + # Then press the up down arrow twice. + input_stream = 2 * Keys.DOWN_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect nothing to have happened. + exp_console_out = '' + exp_input_buffer = '' + exp_input_buffer_pos = 0 + exp_history_buf = [] + + # Verify. + self.CheckConsoleOutput(exp_console_out) + self.CheckInputBufferPosition(exp_input_buffer_pos) + self.CheckInputBuffer(exp_input_buffer) + self.CheckHistoryBuffer(exp_history_buf) + + def test_DeleteCharsUsingDELKey(self): + """Verify that we can delete characters using the DEL key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + + # Hit the left arrow key 2 times. + input_stream.extend(2 * Keys.LEFT_ARROW) + + # Press the DEL key. + input_stream.extend(Keys.DEL) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be the command we entered, 2 individual cursor + # moves to the left, and then removing a char and shifting everything to the + # left one column. + exp_console_out = test_str + exp_console_out += 2 * OutputStream.MoveCursorLeft(1) + + # Remove the char by shifting everything to the left one, slicing out the + # remove char. + exp_console_out += test_str[-1:] + ' ' + + # Reset the cursor by moving back 2 columns because of the 'n' and space. + exp_console_out += OutputStream.MoveCursorLeft(2) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + # Verify input buffer. The input buffer should have the char sliced out and + # be positioned where the char was removed. + exp_input_buffer = test_str[:-2] + test_str[-1:] + exp_input_buffer_pos = len(exp_input_buffer) - 1 + self.CheckInputBuffer(exp_input_buffer) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + def test_RepeatedCommandInHistory(self): + """Verify that we don't store 2 consecutive identical commands in history""" + # Enter a few commands. + test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] + # Repeat the last command. + test_commands.append(test_commands[len(test_commands)-1]) + + input_stream = [] + for command in test_commands: + input_stream.extend(self.StringToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the history buffer is correct. The last command, since + # it was repeated, should not have been added to the history. + exp_history_buf = test_commands[0:len(test_commands)-1] + self.CheckHistoryBuffer(exp_history_buf) + + +if __name__ == '__main__': + unittest.main() diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py new file mode 100755 index 0000000000..4c31037584 --- /dev/null +++ b/util/ec3po/interpreter.py @@ -0,0 +1,259 @@ +#!/usr/bin/python2 +# Copyright 2015 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""EC-3PO EC Interpreter + +interpreter provides the interpretation layer between the EC UART and the user. +It recives commands through its command pipe, formats the commands for the EC, +and sends the command to the EC. It also presents data from the EC to either be +displayed via the interactive console interface, or some other consumer. It +additionally supports automatic command retrying if the EC drops a character in +a command. +""" +from __future__ import print_function +from chromite.lib import cros_logging as logging +import os +import Queue +import select + + +COMMAND_RETRIES = 3 # Number of attempts to retry a command. +EC_MAX_READ = 1024 # Max bytes to read at a time from the EC. + + +class Interpreter(object): + """Class which provides the interpretation layer between the EC and user. + + This class essentially performs all of the intepretation for the EC and the + user. It handles all of the automatic command retrying as well as the + formation of commands. + + Attributes: + ec_uart_pty: A string representing the EC UART to connect to. + cmd_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the command pipe. This must be a bidirectional pipe. + Commands and responses will utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the debug pipe. This must be a unidirectional pipe + with write capabilities. EC debug output will utilize this pipe. + cmd_retries: An integer representing the number of attempts the console + should retry commands if it receives an error. + log_level: An integer representing the numeric value of the log level. + inputs: A list of objects that the intpreter selects for reading. + Initially, these are the EC UART and the command pipe. + outputs: A list of objects that the interpreter selects for writing. + ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART. + cmd_in_progress: A string that represents the current command sent to the + EC that is pending reception verification. + """ + def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO): + """Intializes an Interpreter object with the provided args. + + Args: + ec_uart_pty: A string representing the EC UART to connect to. + cmd_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the command pipe. This must be a bidirectional + pipe. Commands and responses will utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the debug pipe. This must be a unidirectional pipe + with write capabilities. EC debug output will utilize this pipe. + cmd_retries: An integer representing the number of attempts the console + should retry commands if it receives an error. + log_level: An optional integer representing the numeric value of the log + level. By default, the log level will be logging.INFO (20). + """ + self.ec_uart_pty = open(ec_uart_pty, 'a+') + self.cmd_pipe = cmd_pipe + self.dbg_pipe = dbg_pipe + self.cmd_retries = COMMAND_RETRIES + self.log_level = log_level + self.inputs = [self.ec_uart_pty, self.cmd_pipe] + self.outputs = [] + self.ec_cmd_queue = Queue.Queue() + self.cmd_in_progress = '' + + def EnqueueCmd(self, packed_cmd): + """Enqueue a packed console command to be sent to the EC UART. + + Args: + packed_cmd: A string which contains the packed command to be sent. + """ + # Enqueue a packed command to be sent to the EC. + self.ec_cmd_queue.put(packed_cmd) + logging.debug('Commands now in queue: %d', self.ec_cmd_queue.qsize()) + # Add the EC UART as an output to be serviced. + self.outputs.append(self.ec_uart_pty) + + def PackCommand(self, raw_cmd): + r"""Packs a command for use with error checking. + + For error checking, we pack console commands in a particular format. The + format is as follows: + + &&[x][x][x][x]&{cmd}\n\n + ^ ^ ^^ ^^ ^ ^-- 2 newlines. + | | || || |-- the raw console command. + | | || ||-- 1 ampersand. + | | ||____|--- 2 hex digits representing the CRC8 of cmd. + | |____|-- 2 hex digits reprsenting the length of cmd. + |-- 2 ampersands + + Args: + raw_cmd: A pre-packed string which contains the raw command. + + Returns: + A string which contains the packed command. + """ + # The command format is as follows. + # &&[x][x][x][x]&{cmd}\n\n + packed_cmd = [] + packed_cmd.append('&&') + # The first pair of hex digits are the length of the command. + packed_cmd.append('%02x' % len(raw_cmd)) + # Then the CRC8 of cmd. + packed_cmd.append('%02x' % Crc8(raw_cmd)) + packed_cmd.append('&') + # Now, the raw command followed by 2 newlines. + packed_cmd.append(raw_cmd) + packed_cmd.append('\n\n') + return ''.join(packed_cmd) + + def ProcessCommand(self, command): + """Captures the input determines what actions to take. + + Args: + command: A string representing the command sent by the user. + """ + command = command.strip() + # There's nothing to do if the command is empty. + if len(command) == 0: + return + + # All other commands need to be packed first before they go to the EC. + packed_cmd = self.PackCommand(command) + logging.debug('packed cmd: ' + packed_cmd) + self.EnqueueCmd(packed_cmd) + # TODO(aaboagye): Make a dict of commands and keys and eventually, handle + # partial matching based on unique prefixes. + + def CheckECResponse(self): + """Checks the response from the EC for any errors.""" + # An invalid response is at most 4 bytes. + data = os.read(self.ec_uart_pty.fileno(), 4) + if '&E' not in data: + # No error received. Clear the command in progress. + self.cmd_in_progress = '' + # Reset the retry count. + self.cmd_retries = COMMAND_RETRIES + # Forward the data to the user. + self.dbg_pipe.send(data) + elif self.cmd_retries > 0: + # The EC encountered an error. We'll have to retry again. + logging.warning('EC replied with error. Retrying.') + self.cmd_retries -= 1 + logging.warning('Retries remaining: %d', self.cmd_retries) + # Add the EC UART to the writers again. + self.outputs.append(self.ec_uart_pty) + else: + # We're out of retries, so just give up. + logging.error('Command failed. No retries left.') + # Clear the command in progress. + self.cmd_in_progress = '' + # Reset the retry count. + self.cmd_retries = COMMAND_RETRIES + + def SendCmdToEC(self): + """Sends a command to the EC.""" + # If we're retrying a command, just try to send it again. + if self.cmd_retries < COMMAND_RETRIES: + cmd = self.cmd_in_progress + else: + # If we're not retrying, we should not be writing to the EC if we have no + # items in our command queue. + assert not self.ec_cmd_queue.empty() + # Get the command to send. + cmd = self.ec_cmd_queue.get() + + # Send the command. + logging.debug('Sending command to EC.') + self.ec_uart_pty.write(cmd) + self.ec_uart_pty.flush() + + # Now, that we've sent the command we will need to make sure the EC + # received it without an error. Store the current command as in + # progress. We will clear this if the EC responds with a non-error. + self.cmd_in_progress = cmd + # Remove the EC UART from the writers while we wait for a response. + self.outputs.remove(self.ec_uart_pty) + + +def Crc8(data): + """Calculates the CRC8 of data. + + The generator polynomial used is: x^8 + x^2 + x + 1. + This is the same implementation that is used in the EC. + + Args: + data: A string of data that we wish to calculate the CRC8 on. + + Returns: + crc >> 8: An integer representing the CRC8 value. + """ + crc = 0 + for byte in data: + crc ^= (ord(byte) << 8) + for _ in range(8): + if crc & 0x8000: + crc ^= (0x1070 << 3) + crc <<= 1 + return crc >> 8 + + +def StartLoop(interp): + """Starts an infinite loop of servicing the user and the EC. + + StartLoop checks to see if there are any commands to process, processing them + if any, and forwards EC output to the user. + + When sending a command to the EC, we send the command once and check the + response to see if the EC encountered an error when receiving the command. An + error condition is reported to the interpreter by a string with at least one + '&' and 'E'. The full string is actually '&&EE', however it's possible that + the leading ampersand or trailing 'E' could be dropped. If an error is + encountered, the interpreter will retry up to the amount configured. + + Args: + interp: An Interpreter object that has been properly initialised. + """ + while True: + readable, writeable, _ = select.select(interp.inputs, interp.outputs, []) + + for obj in readable: + # Handle any debug prints from the EC. + if obj is interp.ec_uart_pty: + logging.debug('EC has data') + if interp.cmd_in_progress: + # A command was just sent to the EC. We need to check to see if the + # EC is telling us that it received a corrupted command. + logging.debug('Command in progress so checking response...') + interp.CheckECResponse() + + # Read what the EC sent us. + data = os.read(obj.fileno(), EC_MAX_READ) + logging.debug('got: \'%s\'', data) + # For now, just forward everything the EC sends us. + logging.debug('Forwarding to user...') + interp.dbg_pipe.send(data) + + # Handle any commands from the user. + elif obj is interp.cmd_pipe: + logging.debug('Command data available. Begin processing.') + data = interp.cmd_pipe.recv() + # Process the command. + interp.ProcessCommand(data) + + for obj in writeable: + # Send a command to the EC. + if obj is interp.ec_uart_pty: + interp.SendCmdToEC() |