#!/usr/bin/env python # Copyright 2015 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Unit tests for the EC-3PO interpreter.""" # Note: This is a py2/3 compatible file. from __future__ import print_function import logging import tempfile import unittest from ec3po import interpreter from ec3po import threadproc_shim import mock # pylint:disable=import-error import six def GetBuiltins(func): if six.PY2: return "__builtin__." + func return "builtins." + func class TestEnhancedECBehaviour(unittest.TestCase): """Test case to verify all enhanced EC interpretation tasks.""" def setUp(self): """Setup the test harness.""" # Setup logging with a timestamp, the module, and the log level. logging.basicConfig( level=logging.DEBUG, format=("%(asctime)s - %(module)s - %(levelname)s - %(message)s"), ) # Create a tempfile that would represent the EC UART PTY. self.tempfile = tempfile.NamedTemporaryFile() # Create the pipes that the interpreter will use. self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe( duplex=False ) # Mock the open() function so we can inspect reads/writes to the EC. self.ec_uart_pty = mock.mock_open() with mock.patch(GetBuiltins("open"), self.ec_uart_pty): # Create an interpreter. self.itpr = interpreter.Interpreter( self.tempfile.name, self.cmd_pipe_itpr, self.dbg_pipe_itpr, log_level=logging.DEBUG, name="EC", ) @mock.patch("ec3po.interpreter.os") def test_HandlingCommandsThatProduceNoOutput(self, mock_os): """Verify that the Interpreter correctly handles non-output commands. Args: mock_os: MagicMock object replacing the 'os' module for this test case. """ # The interpreter init should open the EC UART PTY. expected_ec_calls = [mock.call(self.tempfile.name, "r+b", buffering=0)] # Have a command come in the command pipe. The first command will be an # interrogation to determine if the EC is enhanced or not. self.cmd_pipe_user.send(interpreter.EC_SYN) self.itpr.HandleUserData() # At this point, the command should be queued up waiting to be sent, so # let's actually send it to the EC. self.itpr.SendCmdToEC() expected_ec_calls.extend( [mock.call().write(interpreter.EC_SYN), mock.call().flush()] ) # Now, assume that the EC sends only 1 response back of EC_ACK. mock_os.read.side_effect = [interpreter.EC_ACK] # When reading the EC, the interpreter will call file.fileno() to pass to # os.read(). expected_ec_calls.append(mock.call().fileno()) # Simulate the response. self.itpr.HandleECData() # Now that the interrogation was complete, it's time to send down the real # command. test_cmd = b"chan save" # Send the test command down the pipe. self.cmd_pipe_user.send(test_cmd) self.itpr.HandleUserData() self.itpr.SendCmdToEC() # Since the EC image is enhanced, we should have sent a packed command. expected_ec_calls.append( mock.call().write(self.itpr.PackCommand(test_cmd)) ) expected_ec_calls.append(mock.call().flush()) # Now that the first command was sent, we should send another command which # produces no output. The console would send another interrogation. self.cmd_pipe_user.send(interpreter.EC_SYN) self.itpr.HandleUserData() self.itpr.SendCmdToEC() expected_ec_calls.extend( [mock.call().write(interpreter.EC_SYN), mock.call().flush()] ) # Again, assume that the EC sends only 1 response back of EC_ACK. mock_os.read.side_effect = [interpreter.EC_ACK] # When reading the EC, the interpreter will call file.fileno() to pass to # os.read(). expected_ec_calls.append(mock.call().fileno()) # Simulate the response. self.itpr.HandleECData() # Now send the second test command. test_cmd = b"chan 0" self.cmd_pipe_user.send(test_cmd) self.itpr.HandleUserData() self.itpr.SendCmdToEC() # Since the EC image is enhanced, we should have sent a packed command. expected_ec_calls.append( mock.call().write(self.itpr.PackCommand(test_cmd)) ) expected_ec_calls.append(mock.call().flush()) # Finally, verify that the appropriate writes were actually sent to the EC. self.ec_uart_pty.assert_has_calls(expected_ec_calls) @mock.patch("ec3po.interpreter.os") def test_CommandRetryingOnError(self, mock_os): """Verify that commands are retried if an error is encountered. Args: mock_os: MagicMock object replacing the 'os' module for this test case. """ # The interpreter init should open the EC UART PTY. expected_ec_calls = [mock.call(self.tempfile.name, "r+b", buffering=0)] # Have a command come in the command pipe. The first command will be an # interrogation to determine if the EC is enhanced or not. self.cmd_pipe_user.send(interpreter.EC_SYN) self.itpr.HandleUserData() # At this point, the command should be queued up waiting to be sent, so # let's actually send it to the EC. self.itpr.SendCmdToEC() expected_ec_calls.extend( [mock.call().write(interpreter.EC_SYN), mock.call().flush()] ) # Now, assume that the EC sends only 1 response back of EC_ACK. mock_os.read.side_effect = [interpreter.EC_ACK] # When reading the EC, the interpreter will call file.fileno() to pass to # os.read(). expected_ec_calls.append(mock.call().fileno()) # Simulate the response. self.itpr.HandleECData() # Let's send a command that is received on the EC-side with an error. test_cmd = b"accelinfo" self.cmd_pipe_user.send(test_cmd) self.itpr.HandleUserData() self.itpr.SendCmdToEC() packed_cmd = self.itpr.PackCommand(test_cmd) expected_ec_calls.extend( [mock.call().write(packed_cmd), mock.call().flush()] ) # Have the EC return the error string twice. mock_os.read.side_effect = [b"&&EE", b"&&EE"] for i in range(2): # When reading the EC, the interpreter will call file.fileno() to pass to # os.read(). expected_ec_calls.append(mock.call().fileno()) # Simulate the response. self.itpr.HandleECData() # Since an error was received, the EC should attempt to retry the command. expected_ec_calls.extend( [mock.call().write(packed_cmd), mock.call().flush()] ) # Verify that the retry count was decremented. self.assertEqual( interpreter.COMMAND_RETRIES - i - 1, self.itpr.cmd_retries, "Unexpected cmd_remaining count.", ) # Actually retry the command. self.itpr.SendCmdToEC() # Now assume that the last one goes through with no trouble. expected_ec_calls.extend( [mock.call().write(packed_cmd), mock.call().flush()] ) self.itpr.SendCmdToEC() # Verify all the calls. self.ec_uart_pty.assert_has_calls(expected_ec_calls) def test_PackCommandsForEnhancedEC(self): """Verify that the interpreter packs commands for enhanced EC images.""" # Assume current EC image is enhanced. self.itpr.enhanced_ec = True # Receive a command from the user. test_cmd = b"gettime" self.cmd_pipe_user.send(test_cmd) # Mock out PackCommand to see if it was called. self.itpr.PackCommand = mock.MagicMock() # Have the interpreter handle the command. self.itpr.HandleUserData() # Verify that PackCommand() was called. self.itpr.PackCommand.assert_called_once_with(test_cmd) def test_DontPackCommandsForNonEnhancedEC(self): """Verify the interpreter doesn't pack commands for non-enhanced images.""" # Assume current EC image is not enhanced. self.itpr.enhanced_ec = False # Receive a command from the user. test_cmd = b"gettime" self.cmd_pipe_user.send(test_cmd) # Mock out PackCommand to see if it was called. self.itpr.PackCommand = mock.MagicMock() # Have the interpreter handle the command. self.itpr.HandleUserData() # Verify that PackCommand() was called. self.itpr.PackCommand.assert_not_called() @mock.patch("ec3po.interpreter.os") def test_KeepingTrackOfInterrogation(self, mock_os): """Verify that the interpreter can track the state of the interrogation. Args: mock_os: MagicMock object replacing the 'os' module. for this test case. """ # Upon init, the interpreter should assume that the current EC image is not # enhanced. self.assertFalse( self.itpr.enhanced_ec, msg=("State of enhanced_ec upon init is not False."), ) # Assume an interrogation request comes in from the user. self.cmd_pipe_user.send(interpreter.EC_SYN) self.itpr.HandleUserData() # Verify the state is now within an interrogation. self.assertTrue(self.itpr.interrogating, "interrogating should be True") # The state of enhanced_ec should not be changed yet because we haven't # received a valid response yet. self.assertFalse( self.itpr.enhanced_ec, msg=("State of enhanced_ec is not False.") ) # Assume that the EC responds with an EC_ACK. mock_os.read.side_effect = [interpreter.EC_ACK] self.itpr.HandleECData() # Now, the interrogation should be complete and we should know that the # current EC image is enhanced. self.assertFalse( self.itpr.interrogating, msg=("interrogating should be False") ) self.assertTrue(self.itpr.enhanced_ec, msg="enhanced_ec sholud be True") # Now let's perform another interrogation, but pretend that the EC ignores # it. self.cmd_pipe_user.send(interpreter.EC_SYN) self.itpr.HandleUserData() # Verify interrogating state. self.assertTrue(self.itpr.interrogating, "interrogating sholud be True") # We should assume that the image is not enhanced until we get the valid # response. self.assertFalse( self.itpr.enhanced_ec, "enhanced_ec should be False now." ) # Let's pretend that we get a random debug print. This should clear the # interrogating flag. mock_os.read.side_effect = [b"[1660.593076 HC 0x103]"] self.itpr.HandleECData() # Verify that interrogating flag is cleared and enhanced_ec is still False. self.assertFalse( self.itpr.interrogating, "interrogating should be False." ) self.assertFalse( self.itpr.enhanced_ec, "enhanced_ec should still be False." ) class TestUARTDisconnection(unittest.TestCase): """Test case to verify interpreter disconnection/reconnection.""" def setUp(self): """Setup the test harness.""" # Setup logging with a timestamp, the module, and the log level. logging.basicConfig( level=logging.DEBUG, format=("%(asctime)s - %(module)s - %(levelname)s - %(message)s"), ) # Create a tempfile that would represent the EC UART PTY. self.tempfile = tempfile.NamedTemporaryFile() # Create the pipes that the interpreter will use. self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe( duplex=False ) # Mock the open() function so we can inspect reads/writes to the EC. self.ec_uart_pty = mock.mock_open() with mock.patch(GetBuiltins("open"), self.ec_uart_pty): # Create an interpreter. self.itpr = interpreter.Interpreter( self.tempfile.name, self.cmd_pipe_itpr, self.dbg_pipe_itpr, log_level=logging.DEBUG, name="EC", ) # First, check that interpreter is initialized to connected. self.assertTrue( self.itpr.connected, ("The interpreter should be initialized in a connected state"), ) def test_DisconnectStopsECTraffic(self): """Verify that when in disconnected state, no debug prints are sent.""" # Let's send a disconnect command through the command pipe. self.cmd_pipe_user.send(b"disconnect") self.itpr.HandleUserData() # Verify interpreter is disconnected from EC. self.assertFalse( self.itpr.connected, ("The interpreter should be disconnected.") ) # Verify that the EC UART is no longer a member of the inputs. The # interpreter will never pull data from the EC if it's not a member of the # inputs list. self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) def test_CommandsDroppedWhenDisconnected(self): """Verify that when in disconnected state, commands are dropped.""" # Send a command, followed by 'disconnect'. self.cmd_pipe_user.send(b"taskinfo") self.itpr.HandleUserData() self.cmd_pipe_user.send(b"disconnect") self.itpr.HandleUserData() # Verify interpreter is disconnected from EC. self.assertFalse( self.itpr.connected, ("The interpreter should be disconnected.") ) # Verify that the EC UART is no longer a member of the inputs nor outputs. self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) # Have the user send a few more commands in the disconnected state. command = "help\n" for char in command: self.cmd_pipe_user.send(char.encode("utf-8")) self.itpr.HandleUserData() # The command queue should be empty. self.assertEqual(0, self.itpr.ec_cmd_queue.qsize()) # Now send the reconnect command. self.cmd_pipe_user.send(b"reconnect") with mock.patch(GetBuiltins("open"), mock.mock_open()): self.itpr.HandleUserData() # Verify interpreter is connected. self.assertTrue(self.itpr.connected) # Verify that EC UART is a member of the inputs. self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) # Since no command was sent after reconnection, verify that the EC UART is # not a member of the outputs. self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) def test_ReconnectAllowsECTraffic(self): """Verify that when connected, EC UART traffic is allowed.""" # Let's send a disconnect command through the command pipe. self.cmd_pipe_user.send(b"disconnect") self.itpr.HandleUserData() # Verify interpreter is disconnected. self.assertFalse( self.itpr.connected, ("The interpreter should be disconnected.") ) # Verify that the EC UART is no longer a member of the inputs nor outputs. self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) # Issue reconnect command through the command pipe. self.cmd_pipe_user.send(b"reconnect") with mock.patch(GetBuiltins("open"), mock.mock_open()): self.itpr.HandleUserData() # Verify interpreter is connected. self.assertTrue( self.itpr.connected, ("The interpreter should be connected.") ) # Verify that the EC UART is now a member of the inputs. self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) # Since we have issued no commands during the disconnected state, no # commands are pending and therefore the PTY should not be added to the # outputs. self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) if __name__ == "__main__": unittest.main()