summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--[-rwxr-xr-x]util/ec3po/__init__.py0
-rwxr-xr-xutil/ec3po/console.py186
-rwxr-xr-xutil/ec3po/console_unittest.py6
-rw-r--r--[-rwxr-xr-x]util/ec3po/interpreter.py39
-rwxr-xr-xutil/ec3po/interpreter_unittest.py6
5 files changed, 124 insertions, 113 deletions
diff --git a/util/ec3po/__init__.py b/util/ec3po/__init__.py
index e69de29bb2..e69de29bb2 100755..100644
--- a/util/ec3po/__init__.py
+++ b/util/ec3po/__init__.py
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index b2e179671e..962d042ff3 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -2,16 +2,20 @@
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+
"""EC-3PO Interactive Console Interface
console provides the console interface between the user and the interpreter. It
handles the presentation of the EC console including editing methods as well as
session-persistent command history.
"""
+
from __future__ import print_function
+
import argparse
import binascii
-from chromite.lib import cros_logging as logging
+# pylint: disable=cros-logging-import
+import logging
import multiprocessing
import os
import pty
@@ -50,11 +54,6 @@ class ControlKey(object):
ESC = 0x1b
-class MoveCursorError(Exception):
- """Exception class for errors when moving the cursor."""
- pass
-
-
class Console(object):
"""Class which provides the console interface between the EC and the user.
@@ -62,6 +61,7 @@ class Console(object):
the EC. It handles all of the console editing behaviour
Attributes:
+ logger: A logger for this module.
master_pty: File descriptor to the master side of the PTY. Used for driving
output to the user and receiving user input.
user_pty: A string representing the PTY name of the served console.
@@ -104,6 +104,7 @@ class Console(object):
read-only side of the debug pipe. This must be a unidirectional pipe
attached to the intepreter. EC debug messages use this pipe.
"""
+ self.logger = logging.getLogger('EC3PO.Console')
self.master_pty = master_pty
self.user_pty = user_pty
self.cmd_pipe = cmd_pipe
@@ -148,23 +149,23 @@ class Console(object):
"""Shows the previous command from the history list."""
# There's nothing to do if there's no history at all.
if not self.history:
- logging.debug('No history to print.')
+ self.logger.debug('No history to print.')
return
# Don't do anything if there's no more history to show.
if self.history_pos == 0:
- logging.debug('No more history to show.')
+ self.logger.debug('No more history to show.')
return
- logging.debug('current history position: %d.', self.history_pos)
+ self.logger.debug('current history position: %d.', self.history_pos)
# Decrement the history buffer position.
self.history_pos -= 1
- logging.debug('new history position.: %d', self.history_pos)
+ self.logger.debug('new history position.: %d', self.history_pos)
# Save the text entered on the console if any.
if self.history_pos == len(self.history)-1:
- logging.debug('saving partial_cmd: \'%s\'', self.input_buffer)
+ self.logger.debug('saving partial_cmd: \'%s\'', self.input_buffer)
self.partial_cmd = self.input_buffer
# Backspace the line.
@@ -172,8 +173,8 @@ class Console(object):
self.SendBackspace()
# Print the last entry in the history buffer.
- logging.debug('printing previous entry %d - %s', self.history_pos,
- self.history[self.history_pos])
+ self.logger.debug('printing previous entry %d - %s', self.history_pos,
+ self.history[self.history_pos])
fd = self.master_pty
prev_cmd = self.history[self.history_pos]
os.write(fd, prev_cmd)
@@ -185,18 +186,18 @@ class Console(object):
"""Shows the next command from the history list."""
# Don't do anything if there's no history at all.
if not self.history:
- logging.debug('History buffer is empty.')
+ self.logger.debug('History buffer is empty.')
return
fd = self.master_pty
- logging.debug('current history position: %d', self.history_pos)
+ self.logger.debug('current history position: %d', self.history_pos)
# Increment the history position.
self.history_pos += 1
# Restore the partial cmd.
if self.history_pos == len(self.history):
- logging.debug('Restoring partial command of \'%s\'', self.partial_cmd)
+ self.logger.debug('Restoring partial command of \'%s\'', self.partial_cmd)
# Backspace the line.
for _ in range(self.input_buffer_pos):
self.SendBackspace()
@@ -210,11 +211,11 @@ class Console(object):
self.history_pos = len(self.history)
return
- logging.debug('new history position: %d', self.history_pos)
+ self.logger.debug('new history position: %d', self.history_pos)
if self.history_pos > len(self.history)-1:
- logging.debug('No more history to show.')
+ self.logger.debug('No more history to show.')
self.history_pos -= 1
- logging.debug('Reset history position to %d', self.history_pos)
+ self.logger.debug('Reset history position to %d', self.history_pos)
return
# Backspace the line.
@@ -222,14 +223,14 @@ class Console(object):
self.SendBackspace()
# Print the newer entry from the history buffer.
- logging.debug('printing next entry %d - %s', self.history_pos,
- self.history[self.history_pos])
+ self.logger.debug('printing next entry %d - %s', self.history_pos,
+ self.history[self.history_pos])
next_cmd = self.history[self.history_pos]
os.write(fd, next_cmd)
# Update the input buffer.
self.input_buffer = next_cmd
self.input_buffer_pos = len(next_cmd)
- logging.debug('new history position: %d.', self.history_pos)
+ self.logger.debug('new history position: %d.', self.history_pos)
def SliceOutChar(self):
"""Remove a char from the line and shift everything over 1 column."""
@@ -256,34 +257,34 @@ class Console(object):
assert self.esc_state != 0
if self.esc_state is EscState.ESC_START:
- logging.debug('ESC_START')
+ self.logger.debug('ESC_START')
if byte == ord('['):
self.esc_state = EscState.ESC_BRACKET
return
else:
- logging.error('Unexpected sequence. %c' % byte)
+ self.logger.error('Unexpected sequence. %c', byte)
self.esc_state = 0
elif self.esc_state is EscState.ESC_BRACKET:
- logging.debug('ESC_BRACKET')
+ self.logger.debug('ESC_BRACKET')
# Left Arrow key was pressed.
if byte == ord('D'):
- logging.debug('Left arrow key pressed.')
+ self.logger.debug('Left arrow key pressed.')
self.MoveCursor('left', 1)
self.esc_state = 0 # Reset the state.
return
# Right Arrow key.
elif byte == ord('C'):
- logging.debug('Right arrow key pressed.')
+ self.logger.debug('Right arrow key pressed.')
self.MoveCursor('right', 1)
self.esc_state = 0 # Reset the state.
return
# Up Arrow key.
elif byte == ord('A'):
- logging.debug('Up arrow key pressed.')
+ self.logger.debug('Up arrow key pressed.')
self.ShowPreviousCommand()
# Reset the state.
self.esc_state = 0 # Reset the state.
@@ -291,7 +292,7 @@ class Console(object):
# Down Arrow key.
elif byte == ord('B'):
- logging.debug('Down arrow key pressed.')
+ self.logger.debug('Down arrow key pressed.')
self.ShowNextCommand()
# Reset the state.
self.esc_state = 0 # Reset the state.
@@ -309,47 +310,47 @@ class Console(object):
self.esc_state = EscState.ESC_BRACKET_8
else:
- logging.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)'
- % (chr(byte), byte))
+ self.logger.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)',
+ chr(byte), byte)
self.esc_state = 0
return
elif self.esc_state is EscState.ESC_BRACKET_1:
- logging.debug('ESC_BRACKET_1')
+ self.logger.debug('ESC_BRACKET_1')
# HOME key.
if byte == ord('~'):
- logging.debug('Home key pressed.')
+ self.logger.debug('Home key pressed.')
self.MoveCursor('left', self.input_buffer_pos)
self.esc_state = 0 # Reset the state.
- logging.debug('ESC sequence complete.')
+ self.logger.debug('ESC sequence complete.')
return
elif self.esc_state is EscState.ESC_BRACKET_3:
- logging.debug('ESC_BRACKET_3')
+ self.logger.debug('ESC_BRACKET_3')
# DEL key.
if byte == ord('~'):
- logging.debug('Delete key pressed.')
+ self.logger.debug('Delete key pressed.')
if self.input_buffer_pos != len(self.input_buffer):
self.SliceOutChar()
self.esc_state = 0 # Reset the state.
elif self.esc_state is EscState.ESC_BRACKET_8:
- logging.debug('ESC_BRACKET_8')
+ self.logger.debug('ESC_BRACKET_8')
# END key.
if byte == ord('~'):
- logging.debug('End key pressed.')
+ self.logger.debug('End key pressed.')
self.MoveCursor('right',
len(self.input_buffer) - self.input_buffer_pos)
self.esc_state = 0 # Reset the state.
- logging.debug('ESC sequence complete.')
+ self.logger.debug('ESC sequence complete.')
return
else:
- logging.error('Unexpected sequence. %c' % byte)
+ self.logger.error('Unexpected sequence. %c', byte)
self.esc_state = 0
else:
- logging.error('Unexpected sequence. %c' % byte)
+ self.logger.error('Unexpected sequence. %c', byte)
self.esc_state = 0
def ProcessInput(self):
@@ -365,7 +366,7 @@ class Console(object):
# Split the command up by spaces.
line = self.input_buffer.split(' ')
- logging.debug('cmd: %s' % (self.input_buffer))
+ self.logger.debug('cmd: %s', self.input_buffer)
cmd = line[0].lower()
# The 'history' command is a special case that we handle locally.
@@ -374,7 +375,7 @@ class Console(object):
return
# Send the command to the interpreter.
- logging.debug('Sending command to interpreter.')
+ self.logger.debug('Sending command to interpreter.')
self.cmd_pipe.send(self.input_buffer)
def CheckForEnhancedECImage(self):
@@ -388,15 +389,15 @@ class Console(object):
correctly.
"""
# Send interrogation byte and wait for the response.
- logging.debug('Performing interrogation.')
+ self.logger.debug('Performing interrogation.')
self.cmd_pipe.send(interpreter.EC_SYN)
response = ''
if self.dbg_pipe.poll(interpreter.EC_INTERROGATION_TIMEOUT):
response = self.dbg_pipe.recv()
- logging.debug('response: \'%s\'', binascii.hexlify(response))
+ self.logger.debug('response: \'%s\'', binascii.hexlify(response))
else:
- logging.debug('Timed out waiting for EC_ACK')
+ self.logger.debug('Timed out waiting for EC_ACK')
# Verify the acknowledgment.
return response == interpreter.EC_ACK
@@ -412,7 +413,7 @@ class Console(object):
# simply behave as a pass-thru.
if byte == ControlKey.CARRIAGE_RETURN:
self.enhanced_ec = self.CheckForEnhancedECImage()
- logging.debug('Enhanced EC image? %r', self.enhanced_ec)
+ self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
if not self.enhanced_ec:
# Send everything straight to the EC to handle.
@@ -420,7 +421,7 @@ class Console(object):
# Reset the input buffer.
self.input_buffer = ''
self.input_buffer_pos = 0
- logging.debug('Reset input buffer.')
+ self.logger.debug('Reset input buffer.')
return
# Keep handling the ESC sequence if we're in the middle of it.
@@ -444,7 +445,7 @@ class Console(object):
# Carriage_Return/Enter
if byte == ControlKey.CARRIAGE_RETURN:
- logging.debug('Enter key pressed.')
+ self.logger.debug('Enter key pressed.')
# Put a carriage return/newline and the print the prompt.
os.write(fd, '\r\n')
@@ -466,56 +467,56 @@ class Console(object):
# Backspace
elif byte == ControlKey.BACKSPACE:
- logging.debug('Backspace pressed.')
+ self.logger.debug('Backspace pressed.')
if self.input_buffer_pos > 0:
# Move left 1 column.
self.MoveCursor('left', 1)
# Remove the character at the input_buffer_pos by slicing it out.
self.SliceOutChar()
- logging.debug('input_buffer_pos: %d' % (self.input_buffer_pos))
+ self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
# Ctrl+A. Move cursor to beginning of the line
elif byte == ControlKey.CTRL_A:
- logging.debug('Control+A pressed.')
+ self.logger.debug('Control+A pressed.')
self.MoveCursor('left', self.input_buffer_pos)
# Ctrl+B. Move cursor left 1 column.
elif byte == ControlKey.CTRL_B:
- logging.debug('Control+B pressed.')
+ self.logger.debug('Control+B pressed.')
self.MoveCursor('left', 1)
# Ctrl+D. Delete a character.
elif byte == ControlKey.CTRL_D:
- logging.debug('Control+D pressed.')
+ self.logger.debug('Control+D pressed.')
if self.input_buffer_pos != len(self.input_buffer):
# Remove the character by slicing it out.
self.SliceOutChar()
# Ctrl+E. Move cursor to end of the line.
elif byte == ControlKey.CTRL_E:
- logging.debug('Control+E pressed.')
+ self.logger.debug('Control+E pressed.')
self.MoveCursor('right',
len(self.input_buffer) - self.input_buffer_pos)
# Ctrl+F. Move cursor right 1 column.
elif byte == ControlKey.CTRL_F:
- logging.debug('Control+F pressed.')
+ self.logger.debug('Control+F pressed.')
self.MoveCursor('right', 1)
# Ctrl+K. Kill line.
elif byte == ControlKey.CTRL_K:
- logging.debug('Control+K pressed.')
+ self.logger.debug('Control+K pressed.')
self.KillLine()
# Ctrl+N. Next line.
elif byte == ControlKey.CTRL_N:
- logging.debug('Control+N pressed.')
+ self.logger.debug('Control+N pressed.')
self.ShowNextCommand()
# Ctrl+P. Previous line.
elif byte == ControlKey.CTRL_P:
- logging.debug('Control+P pressed.')
+ self.logger.debug('Control+P pressed.')
self.ShowPreviousCommand()
# ESC sequence
@@ -527,7 +528,7 @@ class Console(object):
elif IsPrintable(byte):
# Drop the character if we're full.
if buffer_full:
- logging.debug('Dropped char: %c(%d)', byte, byte)
+ self.logger.debug('Dropped char: %c(%d)', byte, byte)
return
# Print the character.
os.write(fd, chr(byte))
@@ -546,7 +547,7 @@ class Console(object):
if extra_bytes_written:
self.MoveCursor('left', extra_bytes_written)
- logging.debug('input_buffer_pos: %d' % (self.input_buffer_pos))
+ self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
def MoveCursor(self, direction, count):
"""MoveCursor moves the cursor left or right by count columns.
@@ -558,7 +559,7 @@ class Console(object):
moved.
Raises:
- ValueError: If the direction is not equal to 'left' or 'right'.
+ AssertionError: If the direction is not equal to 'left' or 'right'.
"""
# If there's nothing to move, we're done.
if not count:
@@ -570,7 +571,7 @@ class Console(object):
if count > self.input_buffer_pos:
count = self.input_buffer_pos
seq += 'D'
- logging.debug('move cursor left %d', count)
+ self.logger.debug('move cursor left %d', count)
self.input_buffer_pos -= count
elif direction == 'right':
@@ -578,14 +579,14 @@ class Console(object):
if (count + self.input_buffer_pos) > len(self.input_buffer):
count = 0
seq += 'C'
- logging.debug('move cursor right %d', count)
+ self.logger.debug('move cursor right %d', count)
self.input_buffer_pos += count
else:
- raise MoveCursorError(('The only valid directions are \'left\' and '
- '\'right\''))
+ raise AssertionError(('The only valid directions are \'left\' and '
+ '\'right\''))
- logging.debug('input_buffer_pos: %d' % self.input_buffer_pos)
+ self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
# Move the cursor.
if count != 0:
os.write(fd, seq)
@@ -594,12 +595,12 @@ class Console(object):
"""Kill the rest of the line based on the input buffer position."""
# Killing the line is killing all the text to the right.
diff = len(self.input_buffer) - self.input_buffer_pos
- logging.debug('diff: %d' % diff)
+ self.logger.debug('diff: %d', diff)
# Diff shouldn't be negative, but if it is for some reason, let's try to
# correct the cursor.
if diff < 0:
- logging.warning('Resetting input buffer position to %d...',
- len(self.input_buffer))
+ self.logger.warning('Resetting input buffer position to %d...',
+ len(self.input_buffer))
self.MoveCursor('left', -diff)
return
if diff:
@@ -632,8 +633,8 @@ def StartLoop(console):
Args:
console: A Console object that has been properly initialzed.
"""
- logging.info('EC Console is being served on %s.', console.user_pty)
- logging.debug(console)
+ console.logger.info('EC Console is being served on %s.', console.user_pty)
+ console.logger.debug(console)
while True:
# Check to see if pipes or the console are ready for reading.
read_list = [console.master_pty, console.cmd_pipe, console.dbg_pipe]
@@ -641,7 +642,7 @@ def StartLoop(console):
for obj in ready_for_reading:
if obj is console.master_pty:
- logging.debug('Input from user')
+ console.logger.debug('Input from user')
# Convert to bytes so we can look for non-printable chars such as
# Ctrl+A, Ctrl+E, etc.
line = bytearray(os.read(console.master_pty, CONSOLE_MAX_READ))
@@ -652,21 +653,25 @@ def StartLoop(console):
elif obj is console.cmd_pipe:
data = console.cmd_pipe.recv()
# Write it to the user console.
- logging.debug('|CMD|->\'%s\'', data)
+ console.logger.debug('|CMD|->\'%s\'', data)
os.write(console.master_pty, data)
elif obj is console.dbg_pipe:
data = console.dbg_pipe.recv()
# Write it to the user console.
- logging.debug('|DBG|->\'%s\'', data)
+ console.logger.debug('|DBG|->\'%s\'', data)
os.write(console.master_pty, data)
-def main():
+def main(argv):
"""Kicks off the EC-3PO interactive console interface and interpreter.
We create some pipes to communicate with an interpreter, instantiate an
interpreter, create a PTY pair, and begin serving the console interface.
+
+ Args:
+ argv: A list of strings containing the arguments this module was called
+ with.
"""
# Set up argument parser.
parser = argparse.ArgumentParser(description=('Start interactive EC console '
@@ -677,32 +682,25 @@ def main():
' is present on. eg: /dev/pts/12'))
parser.add_argument('--log-level',
default='info',
- help=('info, debug, warning, error, or critical'))
+ help='info, debug, warning, error, or critical')
# Parse arguments.
- args = parser.parse_args()
-
- # Can't do much without an EC to talk to.
- if not args.ec_uart_pty:
- parser.print_help()
- sys.exit(1)
+ opts = parser.parse_args(argv)
# Set logging level.
- args.log_level = args.log_level.lower()
- if args.log_level == 'info':
+ opts.log_level = opts.log_level.lower()
+ if opts.log_level == 'info':
log_level = logging.INFO
- elif args.log_level == 'debug':
+ elif opts.log_level == 'debug':
log_level = logging.DEBUG
- elif args.log_level == 'warning':
+ elif opts.log_level == 'warning':
log_level = logging.WARNING
- elif args.log_level == 'error':
+ elif opts.log_level == 'error':
log_level = logging.ERROR
- elif args.log_level == 'critical':
+ elif opts.log_level == 'critical':
log_level = logging.CRITICAL
else:
- print('Error: Invalid log level.')
- parser.print_help()
- sys.exit(1)
+ parser.error('Invalid log level. (info, debug, warning, error, critical)')
# Start logging with a timestamp, module, and log level shown in each log
# entry.
@@ -716,7 +714,7 @@ def main():
dbg_pipe_interactive, dbg_pipe_interp = multiprocessing.Pipe(duplex=False)
# Create an interpreter instance.
- itpr = interpreter.Interpreter(args.ec_uart_pty, cmd_pipe_interp,
+ itpr = interpreter.Interpreter(opts.ec_uart_pty, cmd_pipe_interp,
dbg_pipe_interp, log_level)
# Spawn an interpreter process.
@@ -737,4 +735,4 @@ def main():
if __name__ == '__main__':
- main()
+ main(sys.argv[1:])
diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py
index 67c686c85b..5963e9c84f 100755
--- a/util/ec3po/console_unittest.py
+++ b/util/ec3po/console_unittest.py
@@ -2,10 +2,14 @@
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+
"""Unit tests for the EC-3PO Console interface."""
+
from __future__ import print_function
+
import binascii
-from chromite.lib import cros_logging as logging
+# pylint: disable=cros-logging-import
+import logging
import mock
import multiprocessing
import tempfile
diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py
index 999c7b96e0..31079fb908 100755..100644
--- a/util/ec3po/interpreter.py
+++ b/util/ec3po/interpreter.py
@@ -1,19 +1,22 @@
-#!/usr/bin/python2
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+
"""EC-3PO EC Interpreter
interpreter provides the interpretation layer between the EC UART and the user.
-It recives commands through its command pipe, formats the commands for the EC,
+It receives commands through its command pipe, formats the commands for the EC,
and sends the command to the EC. It also presents data from the EC to either be
displayed via the interactive console interface, or some other consumer. It
additionally supports automatic command retrying if the EC drops a character in
a command.
"""
+
from __future__ import print_function
+
import binascii
-from chromite.lib import cros_logging as logging
+# pylint: disable=cros-logging-import
+import logging
import os
import Queue
import select
@@ -35,6 +38,7 @@ class Interpreter(object):
formation of commands for EC images which support that.
Attributes:
+ logger: A logger for this module.
ec_uart_pty: A string representing the EC UART to connect to.
cmd_pipe: A multiprocessing.Connection object which represents the
Interpreter side of the command pipe. This must be a bidirectional pipe.
@@ -75,6 +79,7 @@ class Interpreter(object):
log_level: An optional integer representing the numeric value of the log
level. By default, the log level will be logging.INFO (20).
"""
+ self.logger = logging.getLogger('EC3PO.Interpreter')
self.ec_uart_pty = open(ec_uart_pty, 'a+')
self.cmd_pipe = cmd_pipe
self.dbg_pipe = dbg_pipe
@@ -115,7 +120,7 @@ class Interpreter(object):
command: A string which contains the command to be sent.
"""
self.ec_cmd_queue.put(command)
- logging.debug('Commands now in queue: %d', self.ec_cmd_queue.qsize())
+ self.logger.debug('Commands now in queue: %d', self.ec_cmd_queue.qsize())
# Add the EC UART as an output to be serviced.
self.outputs.append(self.ec_uart_pty)
@@ -176,7 +181,7 @@ class Interpreter(object):
# Check for interrogation command.
if command == EC_SYN:
# User is requesting interrogation. Send SYN as is.
- logging.debug('User requesting interrogation.')
+ self.logger.debug('User requesting interrogation.')
self.interrogating = True
# Assume the EC isn't enhanced until we get a response.
self.enhanced_ec = False
@@ -186,22 +191,22 @@ class Interpreter(object):
# TODO(aaboagye): Make a dict of commands and keys and eventually,
# handle partial matching based on unique prefixes.
- logging.debug('command: \'%s\'', command)
+ self.logger.debug('command: \'%s\'', command)
self.EnqueueCmd(command)
def HandleCmdRetries(self):
"""Attempts to retry commands if possible."""
if self.cmd_retries > 0:
# The EC encountered an error. We'll have to retry again.
- logging.warning('Retrying command...')
+ self.logger.warning('Retrying command...')
self.cmd_retries -= 1
- logging.warning('Retries remaining: %d', self.cmd_retries)
+ self.logger.warning('Retries remaining: %d', self.cmd_retries)
# Retry the command and add the EC UART to the writers again.
self.EnqueueCmd(self.last_cmd)
self.outputs.append(self.ec_uart_pty)
else:
# We're out of retries, so just give up.
- logging.error('Command failed. No retries left.')
+ self.logger.error('Command failed. No retries left.')
# Clear the command in progress.
self.last_cmd = ''
# Reset the retry count.
@@ -222,7 +227,7 @@ class Interpreter(object):
# Send the command.
self.ec_uart_pty.write(cmd)
self.ec_uart_pty.flush()
- logging.debug('Sent command to EC.')
+ self.logger.debug('Sent command to EC.')
if self.enhanced_ec and cmd != EC_SYN:
# Now, that we've sent the command, store the current command as the last
@@ -237,13 +242,13 @@ class Interpreter(object):
def HandleECData(self):
"""Handle any debug prints from the EC."""
- logging.debug('EC has data')
+ self.logger.debug('EC has data')
# Read what the EC sent us.
data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ)
- logging.debug('got: \'%s\'', binascii.hexlify(data))
+ self.logger.debug('got: \'%s\'', binascii.hexlify(data))
if '&E' in data and self.enhanced_ec:
# We received an error, so we should retry it if possible.
- logging.warning('Error string found in data.')
+ self.logger.warning('Error string found in data.')
self.HandleCmdRetries()
return
@@ -252,18 +257,18 @@ class Interpreter(object):
if self.interrogating:
self.enhanced_ec = data == EC_ACK
if self.enhanced_ec:
- logging.debug('The current EC image seems enhanced.')
+ self.logger.debug('The current EC image seems enhanced.')
else:
- logging.debug('The current EC image does NOT seem enhanced.')
+ self.logger.debug('The current EC image does NOT seem enhanced.')
# Done interrogating.
self.interrogating = False
# For now, just forward everything the EC sends us.
- logging.debug('Forwarding to user...')
+ self.logger.debug('Forwarding to user...')
self.dbg_pipe.send(data)
def HandleUserData(self):
"""Handle any incoming commands from the user."""
- logging.debug('Command data available. Begin processing.')
+ self.logger.debug('Command data available. Begin processing.')
data = self.cmd_pipe.recv()
# Process the command.
self.ProcessCommand(data)
diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py
index 46bbcf8e93..f74bfe051c 100755
--- a/util/ec3po/interpreter_unittest.py
+++ b/util/ec3po/interpreter_unittest.py
@@ -2,9 +2,13 @@
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+
"""Unit tests for the EC-3PO interpreter."""
+
from __future__ import print_function
-from chromite.lib import cros_logging as logging
+
+# pylint: disable=cros-logging-import
+import logging
import mock
import multiprocessing
import tempfile