summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVadim Bendebury <vbendeb@chromium.org>2020-04-17 17:01:29 -0700
committerCommit Bot <commit-bot@chromium.org>2020-05-04 23:06:44 +0000
commitdc6ce9503a4c3f8e8f3a80e6cb79f5269272ebf0 (patch)
treea7f18315faf59ddfe2f57e8a575fd7e1453a326c
parent7e1672d90661a1cb26cb1a429547e8934c68f34a (diff)
downloadchrome-ec-dc6ce9503a4c3f8e8f3a80e6cb79f5269272ebf0.tar.gz
util: script for generating RO hashes
This python script will be run in a Chrome OS factory image, with limited availability of chromite libraries. The command line parameters of the script are a set of AP firmware address ranges and FMAP section names. The script does the following: - use flashrom to read the FMAP area from the AP flash and dump_fmap to generate the flash map description. - verify that section names passed in as parameters (if any) are indeed are present in the flash map. - verify that all passed in ranges and sections fit into the WP_RO area of the flash (as defined if the flash map). - prepare a layout file to instruct flashrom to read only the sections of interest (as defined by ranges and section names passed in the command line). - use flashrom again to read the required sections of the AP flash into a file. - read the file and and pass the required sections through the sha256 hash calculation. - prepare the Cr50 vendor command to pass information about the flash ranges and the sha256 sum to Cr50 and send the command. A unit test is also being added. BUG=b:153764696 TEST=./util/test_ap_ro_hash.py succeeds. with the rest of the patches added end to end AP RO verification procedure also succeeds. Signed-off-by: Vadim Bendebury <vbendeb@chromium.org> Change-Id: Ic0fa3759b3a32db8cf521be28c3c7dfe0cd35278 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2161576 Reviewed-by: Mary Ruthven <mruthven@chromium.org>
-rwxr-xr-xutil/ap_ro_hash.py564
-rwxr-xr-xutil/test_ap_ro_hash.py303
2 files changed, 867 insertions, 0 deletions
diff --git a/util/ap_ro_hash.py b/util/ap_ro_hash.py
new file mode 100755
index 0000000000..a6a8fc2837
--- /dev/null
+++ b/util/ap_ro_hash.py
@@ -0,0 +1,564 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-"
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""ap_ro_hash.py - script for generating hashes of AP RO sections.
+
+This script is supposed to run as part of the factory image of a Chrome OS
+device, with limited Chrome OS specific Python modules availability.
+
+The command line parameters are a set of one or more AP RO firmware address
+ranges. A range could be expressed as either two colon separated hex numbers
+(the offset of the range base address into the flash space and the size of the
+range) or a text string (the name of a flash map section).
+
+The script does the following:
+
+ - using the flashrom utility read the fmap section from the AP firmware chip;
+
+ - using dump_fmap retrieve addresses of all sections of the image, of
+ interest is the WP_RO section which encompasses the entire RO space of the
+ flash chip;
+
+ - verify input parameters: make sure that input ranges and fmap sections do
+ not overlap, and that all off the ranges and sections fall into WP_RO;
+
+ - create a layout file to direct flashrom to read only the ranges of interest
+ and read the flash again, this time creating a file with the required
+ sections present;
+
+ - calculate the sha256 sum of the required sections;
+
+ - prepare and send to the Cr50 a vendor command including the ranges and the
+ calculated checksum;
+
+ - examine the Cr50 return code and report errors, if any.
+"""
+
+import argparse
+import hashlib
+import os
+import struct
+import subprocess
+import sys
+import syslog
+import tempfile
+
+
+class ApRoHashCmdLineError(Exception):
+ 'Exceptions due to command line arguments errors'
+ pass
+
+
+class ApRoHashExecutionError(Exception):
+ 'Exceptions due to command system command execution errors'
+ pass
+
+
+class ApRoTpmResponseError(Exception):
+ 'Exceptions due to TPM command execution errors'
+ pass
+
+VENDOR_CC_SEED_AP_RO_CHECK = 54
+
+# The tag and format are the same for command and response.
+TPM_TAG = 0x8001
+HEADER_FMT = '>H2LH'
+
+class Logger(object):
+ """A class to support printing into a log and on the console when required.
+
+ Attributes:
+ _print_to_console: bool, if true - print on the console in addition to
+ the syslog.
+ """
+ def __init__(self):
+ self._print_to_console = False
+
+ def _write(self, prefix, end, *args):
+ """Common function for message handling.
+
+ All messages are sent to syslog. Error and fatal messages are also
+ always sent to the console to stderr.
+
+ If _print_to_console is True, the regular messages are also printed to
+ stdout.
+
+ If prefix is set to FATAL, the program terminates.
+
+ Args:
+ prefix: string, one of '', 'ERROR', or 'FATAL'
+ end: string, a character to add in the end (facilitates printing
+ the newline the same as print())
+ args: strings to print, concatenated with a space.
+ """
+ text = ' '.join(args) + end
+ if prefix:
+ text = '%s: %s' % (prefix, text)
+
+ syslog.syslog(text)
+ if self._print_to_console or prefix in ('ERROR', 'FATAL'):
+ if prefix:
+ outf = sys.stderr
+ else:
+ outf = sys.stdout
+ print(text, file=outf, end='')
+ if prefix == 'FATAL':
+ sys.exit(1)
+
+ def enable_verbose(self):
+ 'Enable verbose logging (printing on the console).'
+ self._print_to_console = True
+
+ def log(self, *args, end='\n'):
+ 'Process regular log message.'
+ self._write('', end, *args)
+
+ def error(self, *args, end='\n'):
+ 'Process non fatal error message.'
+ self._write('ERROR', end, *args)
+
+ def fatal(self, *args, end='\n'):
+ 'Process fatal level error message.'
+ self._write('FATAL', end, *args)
+
+
+LOG = Logger()
+
+class Cr50TpmPacket(object):
+ """Class to represent a TPM vendor command packet.
+
+ Cr50 TPM vendor command response packets have the following format, all
+ header fields are in big endian order:
+
+ <tag><total length><vendor command><sub command><payload, variable length>
+ | | | +---- 2 bytes
+ | | +------------------- 4 bytes
+ | +--------------------------------- 4 bytes
+ +------------------------------------------ 2 bytes
+
+ This class allows to accumulate data of the vendor command payload, and
+ generate the full vendor command packet when requested.
+
+ Attributes:
+ _subcmd: int, the vendor subcommand
+ _data: bytes, the vendor command payload
+ """
+ TPM_COMMAND = 0x20000000
+
+ def __init__(self, subcmd):
+ self._subcmd = subcmd
+ self._data = bytes()
+
+ def add_data(self, data):
+ 'Add data to the vendor command payload.'
+ self._data += data
+
+ def packet(self):
+ """Generate full vendor command packet using subcommand and payload.
+
+ Returns:
+ A byte array which is a fully formatted vendor command packet.
+ """
+ header = struct.pack(HEADER_FMT,
+ TPM_TAG,
+ struct.calcsize(HEADER_FMT) + len(self._data),
+ self.TPM_COMMAND,
+ self._subcmd)
+ return bytearray(header) + self._data
+
+
+class Cr50TpmResponse(object):
+ """Class to represent a TPM response packet.
+
+ Cr50 TPM vendor command response packets have the following format, all
+ header fields are in big endian order:
+
+ <tag><total length><return code><sub command>[<data (optional)>]
+ | | | +---- 2 bytes
+ | | +---------------- 4 bytes
+ | +------------------------------ 4 bytes
+ +--------------------------------------- 2 bytes
+
+ This class takes a byte buffer, runs basic verification (expected 'tag',
+ 'total length' matching the buffer length, 'sub command' matching the
+ expected value when provided. If verification succeeds, the 'total
+ length', 'return code' and 'data' fields are save for future reference.
+
+ If verification fails, appropriate exceptions are raised.
+
+ Attributes:
+ _rc: int, 'return code' from packet header.
+ _length: int, number of bytes in the packet
+ _payload: bytes, contents of the data field, could be empty
+ """
+ def __init__(self, response, exp_subcmd=None):
+ header_size = struct.calcsize(HEADER_FMT)
+ if len(response) < header_size:
+ raise ApRoTpmResponseError(
+ 'response too short (%d bytes)' % len(response))
+ tag, self._length, self._rc, subcmd = struct.unpack_from(HEADER_FMT,
+ response)
+ if tag != TPM_TAG:
+ raise ApRoTpmResponseError('unexpected TPM tag %04x' % tag)
+ if self._length != len(response):
+ raise ApRoTpmResponseError('length mismatch (%d != %d)' %
+ (self._length, len(response)))
+ if self._length not in (header_size, header_size + 1):
+ raise ApRoTpmResponseError('unexpected response length %d' %
+ (self._length))
+
+ if exp_subcmd != None and exp_subcmd != subcmd:
+ raise ApRoTpmResponseError('subcommand mismatch (%04x != %04x)' %
+ (subcmd, exp_subcmd))
+ self._payload = response[header_size:]
+
+ @property
+ def rc(self):
+ 'Get the response return code.'
+ return self._rc
+
+ @property
+ def payload(self):
+ 'Get the response payload.'
+ return self._payload
+
+
+class TpmChannel(object):
+ """Class to represent a channel to communicate with the TPM
+
+ Communications could happen over /dev/tpm0 directly, if it is available,
+ or through trunksd using the trunks_send utility.
+
+ Attributes:
+ _os_dev: int, file device number of the file opened to communicate
+ with the TPM. Set to None if opening attempt returned an
+ error.
+ _response: bytes, data received from trunks_send. Not applicable in
+ case communications use /dev/tpm0.
+ """
+ def __init__(self):
+ try:
+ self._os_dev = os.open('/dev/tpm0', os.O_RDWR)
+ LOG.log('will use /dev/tpm0')
+ except OSError:
+ self._os_dev = None
+ LOG.log('will use trunks_send')
+ self._response = bytes()
+
+ def write(self, data):
+ """Send command to the TPM.
+
+ Args:
+ data: byte array, the fully TPM command (i.e. Cr50 vendor
+ command).
+ """
+ if self._os_dev:
+ LOG.log('will call write to send %d bytes' % len(data), end='')
+ rv = os.write(self._os_dev, data)
+ LOG.log(', sent %d' % rv)
+ return
+ command = '/usr/sbin/trunks_send --raw ' + ' '.join(
+ '%02x' % x for x in data)
+ rv = run(command, ignore_error=False)[0]
+ rv_hex = [rv[2*x:2*x+2] for x in range(int(len(rv)/2))]
+ self._response = bytes([int(x, 16) for x in rv_hex])
+
+ def read(self):
+ """Read TPM response.
+
+ Read the response directly from /dev/tpm0 or use as a response the
+ string returned by the previous trunks_send invocation.
+
+ Returns:
+ A byte array, the contents of the TPM response packet.
+ """
+ if self._os_dev:
+ # We don't expect much, but let's allow for a long response packet.
+ return os.read(self._os_dev, 1000)
+ rv = self._response
+ self._response = bytes()
+ return rv
+
+
+def run(command, ignore_error=True):
+ """Run system command.
+
+ The command could be passed as a string (in which case every word in the
+ string is considered a separate command line element) or as a list of
+ strings.
+
+ Args:
+ command: string or list of strings. If string is given, it is
+ converted into a list of strings before proceeding.
+ ignore_error: Bool, if set to False and command execution fails, raise
+ the exception.
+
+ Returns:
+ A tuple of two possibly multiline strings, the stdio and stderr
+ generated while the command was being executed.
+
+ Raises:
+ ApRoHashExecutionError in case the executed command reported a non
+ zero return status, and ignore_error is False
+ """
+ if isinstance(command, str):
+ command = command.split()
+ LOG.log('will run "%s"' % ' '.join(command))
+
+ rv = subprocess.run(command,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ if ignore_error and rv.returncode != 0:
+ raise ApRoHashExecutionError(' '.join(command))
+
+ return(rv.stdout.decode('ascii'), rv.stderr.decode('ascii'))
+
+
+def send_to_cr50(ranges, digest):
+ """Program RO hash information in Cr50.
+
+ This function creates a vendor command packet which will allow Cr50 to
+ save ranges and the sha256 hash of the AP flash RO space which needs to be
+ verified.
+
+ This vendor command payload has the following format:
+
+ <32 bytes of sha256 digest><range>[<range>..]
+
+ The Cr50 response contains the return code (rc) in the header set to zero
+ on success, or a generic error on failure. In case of failure the actual
+ error code is delivered as the last byte of the response packet.
+
+ Args:
+ ranges: a list of two tuples of ints, base addresses and sizes of
+ hashed flash areas
+ digest: combined 32 bit sha256 digest of all ranges
+
+ Returns:
+ An integer value, zero on success, of the actual error code on failure.
+ """
+ subcmd = VENDOR_CC_SEED_AP_RO_CHECK
+ p = Cr50TpmPacket(subcmd)
+ p.add_data(digest)
+ for r in ranges:
+ for n in r:
+ p.add_data(bytearray(struct.pack('<L', n)))
+ channel = TpmChannel()
+ channel.write(p.packet())
+ tpm_response = Cr50TpmResponse(channel.read(), subcmd)
+ if tpm_response.rc:
+ return int(tpm_response.payload[0])
+ return 0
+
+
+def read_fmap(tmpd):
+ """Read AP firmware fmap section and convert it into a dictionary.
+
+ The flashrom utility is used to read the only section of the AP firmware,
+ the fmap, into a file. Then the dump_fmap utility is used to print the
+ fmap contents, which consists of a set of section descriptions of the
+ following structure:
+
+ area: 22
+ area_offset: 0x00c00000
+ area_size: 0x00400000 (4194304)
+ area_name: WP_RO
+
+ This function parses the section descriptions to generate a dictionary
+ where the keys are area_names, and values are two int tuples of
+ (area_offset, area_size).
+
+ Args:
+ tmpd: string, directory to use to store temp files.
+
+ Returns:
+ The generated dictionary.
+ """
+ fmap_file = os.path.join(tmpd, 'fmap.bin')
+ run('flashrom -i FMAP -r %s' % fmap_file, ignore_error=False)
+ fmap_text = run('dump_fmap ' + fmap_file, ignore_error=False)[0]
+
+ fmap = {}
+ offset = 0
+ size = 0
+ for line in fmap_text.splitlines():
+ tokens = line.split()
+ if tokens[0] == 'area_offset:':
+ offset = int(tokens[1], 16)
+ continue
+ if tokens[0] == 'area_size:':
+ size = int(tokens[1], 16)
+ continue
+ if tokens[0] == 'area_name:':
+ fmap[tokens[1]] = (offset, size)
+ LOG.log('%20s: %08x:%08x' % (tokens[1], offset, offset + size - 1))
+ continue
+ return fmap
+
+
+def verify_ranges(ranges, limit):
+ """Verify that all ranges fall into the limit.
+
+ Args:
+ ranges: a sorted list of two tuples of ints, base addresses and sizes
+ of hashed flash areas
+ limit: a range all ranges' elements must fit into
+
+ Returns:
+ A string describing non compliant ranges, an empty string if no problems
+ have been found.
+ """
+ base = limit[0]
+ top = base + limit[1]
+ errors = []
+ for i, r in enumerate(ranges):
+ rerrors = []
+ rbase = r[0]
+ rtop = rbase + r[1]
+ if rbase < base or rtop > top:
+ rerrors.append('is outside the RO area',)
+ if i > 0:
+ prev_r = ranges[i - 1]
+ if prev_r[0] + prev_r[1] > rbase:
+ rerrors.append('overlaps with %x:%x' % (prev_r[0], prev_r[1],))
+ if rerrors:
+ errors.append(' '.join(['Range %x:%x:' % (r[0], r[1]),] + rerrors))
+
+ return '\n'.join(errors)
+
+
+def read_ro_ranges(check_file_name, tmpd, ranges):
+ """Read firmware ranges into a file.
+
+ Based on the passed in ranges create a layout file to store the list of
+ sections to be read by flashrom, and read these sections of the AP
+ firmware into a file.
+
+ Args:
+ check_file_name: string, name of the file to read into
+ tmpd: string, name of the directory used for the output and layout files
+ ranges: a list of two int elements, base address and size of each area
+ """
+ layout_file = os.path.join(tmpd, 'layout')
+ read_cmd = ['flashrom', '-l', layout_file, '-r', check_file_name]
+ with open(layout_file, 'w') as lf:
+ for i, r in enumerate(ranges):
+ name = 'section%d' % i
+ lf.write('%x:%x %s\n' % (r[0], r[0] + r[1] - 1, name))
+ read_cmd += ['-i', name]
+ run(read_cmd, ignore_error=False)
+
+
+def calculate_hash(ro_file, ranges):
+ """Calculate sha256 hashes of ranges in the file.
+
+ Args:
+ ro_file: string, name of the file containing the areas to be hashed
+ ranges: a list of two int elements, base address and size of each area
+
+ Returns:
+ A byte array, the calculated sha256 hash of all areas concatenated.
+ """
+ sha256 = hashlib.sha256()
+ with open(ro_file, 'rb') as rof:
+ for offset, size in ranges:
+ rof.seek(offset)
+ while size:
+ buf = rof.read(size)
+ if not buf:
+ raise ApRoHashCmdLineError(
+ 'image is too small for range starting at 0x%x'
+ % offset)
+ sha256.update(buf)
+ size -= len(buf)
+
+ return sha256.digest()
+
+
+def get_args(args):
+ """Prepare argument parser and retrieve command line arguments.
+
+ Returns the parser object with a namespace with all present optional
+ arguments set.
+ """
+ parser = argparse.ArgumentParser(description='AP RO Hashing utility')
+
+ parser.add_argument(
+ '--verbose', '-v',
+ type=bool,
+ help=('enable verbose logging on the console'),
+ default=False)
+
+ return parser.parse_known_args(args)
+
+
+def main(args):
+ 'Main function, receives a list of strings, command line arguments'
+
+ nsp, rest = get_args(args)
+
+ if nsp.verbose:
+ LOG.enable_verbose()
+
+ ranges = [] # Ranges from the command line, including FMAP sections.
+
+ # Let's keep all temp files in an ephemeral temp directory.
+ with tempfile.TemporaryDirectory() as tmpd:
+ bad_ranges = [] # Invalid ranges, if any.
+ bad_section_names = [] # Invalid section names, if any.
+
+ fmap = read_fmap(tmpd)
+
+
+ for arg in rest:
+ if ':' in arg:
+ try:
+ ranges.append(([int('%s' % x, 16) for
+ x in arg.split(':', 1)]),)
+ except ValueError:
+ bad_ranges.append(arg)
+ continue
+ if arg not in fmap:
+ bad_section_names.append(arg)
+ continue
+ ranges.append(fmap[arg])
+
+ if not ranges:
+ raise ApRoHashCmdLineError('no hashing ranges specified')
+
+ error_msg = ''
+ if bad_ranges:
+ error_msg += 'Ranges %s not valid\n' % ' '.join(bad_ranges)
+ if bad_section_names:
+ error_msg += ('Section(s) "%s" not in FMAP\n' %
+ '" "'.join(bad_section_names))
+
+ # Make sure the list is sorted by the first element.
+ ranges.sort(key=lambda x: x[0])
+
+ # Make sure ranges do not overlap and fall into the WP_RO section.
+ error_msg += verify_ranges(ranges, fmap['WP_RO'])
+ if error_msg:
+ raise ApRoHashCmdLineError(error_msg)
+
+ ro_check_file = os.path.join(tmpd, 'ro_check.bin')
+ read_ro_ranges(ro_check_file, tmpd, ranges)
+ digest = calculate_hash(ro_check_file, ranges)
+
+ rv = send_to_cr50(ranges, digest)
+ if rv != 0:
+ LOG.error('Cr50 returned error %d' % rv)
+ return rv
+ print('SUCCEEDED')
+ return 0
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv[1:])
+ except ApRoHashCmdLineError as e:
+ LOG.fatal('%s' % e)
+ except ApRoHashExecutionError as e:
+ LOG.fatal('command \"%s\" failed' % e)
+ except ApRoTpmResponseError as e:
+ LOG.fatal('TPM response problem: \"%s\"' % e)
diff --git a/util/test_ap_ro_hash.py b/util/test_ap_ro_hash.py
new file mode 100755
index 0000000000..ad84d41927
--- /dev/null
+++ b/util/test_ap_ro_hash.py
@@ -0,0 +1,303 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-"
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+'Unit tests for ap_ro_hash.py'
+
+import contextlib
+import io
+import tempfile
+import unittest
+
+from unittest.mock import patch
+
+import ap_ro_hash
+
+TEST_FILE_SIZE = 64 * 1024
+TPM_OK_RESPONSE = (128, 1, 0, 0, 0, 12, 0, 0, 0, 0, 0, 54)
+
+# pylint: disable=unused-argument
+def mocked_run(command, ignore_error=True):
+ """Test mock of ap_ro_hash.run().
+
+ Adding this mock allows to simulate various aspects of expected
+ Chrome OS device behavior.
+ """
+ # Simulated dump_fmap output.
+ fmap_sample = """
+area: 1
+area_offset: 0x00000000
+area_size: 0x00006000 (27576)
+area_name: RW_A
+area: 2
+area_offset: 0x00006000
+area_size: 0x00006000 (27576)
+area_name: RW_B
+area: 3
+area_offset: 0x0000c000
+area_size: 0x00004000 (16384)
+area_name: WP_RO
+area: 4
+area_offset: 0x0000c000
+area_size: 0x00000040 (64)
+area_name: RO_VPD
+area: 5
+area_offset: 0x0000c040
+area_size: 0x000017b0 (6064)
+area_name: RO_UNUSED
+area: 6
+area_offset: 0x0000d7f0
+area_size: 0x00002810 (10256)
+area_name: RO_SECTION
+"""
+ if isinstance(command, list):
+ command = ' '.join(command)
+ if command.startswith('dump_fmap'):
+ return fmap_sample.strip(), ''
+ if command.startswith('/usr/sbin/trunks_send'):
+ # Simulated trunks_send always reports success.
+ return ''.join('%02x' % x for x in TPM_OK_RESPONSE), ''
+ if command.startswith('flashrom'):
+ if '-l' in command and '-r' in command:
+ # This is a request to read flash sections for hashing. Let's
+ # create a file with fixed contents large enough to cover all
+ # sections. File will be saved in the temp directory created by
+ # ap_ro_hash.main().
+ fname = command.split('-r')[1].strip().split()[0]
+ with open(fname, 'ab') as ff:
+ togo = TEST_FILE_SIZE
+ # Fill the file with repeating pattern of 0..255.
+ filler = bytearray([x for x in range(256)])
+ while togo:
+ size = min(togo, len(filler))
+ ff.write(filler[:size])
+ togo -= size
+ return '', ''
+# pylint: disable=
+
+ap_ro_hash.run = mocked_run
+
+class TestApRoHash(unittest.TestCase):
+ 'Test class for ap_ro_hash.py.'
+
+ @patch('ap_ro_hash.sys.exit')
+ @patch('ap_ro_hash.syslog')
+ def test_logger(self, mock_syslog, mock_sysexit):
+ 'Verify the Logger class behavior.'
+ # Intercept writes into stdout and stderr
+ with io.StringIO() as stdo, io.StringIO() as stde:
+ with contextlib.redirect_stdout(
+ stdo), contextlib.redirect_stderr(stde):
+
+ logger = ap_ro_hash.Logger()
+
+ def reset_all():
+ 'Restore mocks and io before the next test.'
+ for m in mock_syslog, mock_sysexit:
+ m.reset_mock()
+ for s in stdo, stde:
+ s.truncate(0)
+ s.seek(0)
+
+ # Regular message goes to syslog only.
+ logger.log('log message')
+ self.assertTrue(mock_syslog.syslog.called)
+ self.assertEqual(mock_syslog.syslog.call_args[0][0],
+ 'log message\n')
+ self.assertFalse(mock_sysexit.called)
+ self.assertEqual(stdo.getvalue(), '')
+ self.assertEqual(stde.getvalue(), '')
+ reset_all()
+
+ # Error message goes to syslog and stderr.
+ logger.error('error message')
+ expected = 'ERROR: error message\n'
+ self.assertTrue(mock_syslog.syslog.called)
+ self.assertEqual(mock_syslog.syslog.call_args[0][0], expected)
+ self.assertFalse(mock_sysexit.called)
+ self.assertEqual(stdo.getvalue(), '')
+ self.assertEqual(stde.getvalue(), expected)
+ reset_all()
+
+ # Error message goes to syslog and stderr, and triggers a call
+ # to sys.exit().
+ logger.fatal('fatal error')
+ expected = 'FATAL: fatal error\n'
+ self.assertTrue(mock_syslog.syslog.called)
+ self.assertEqual(mock_syslog.syslog.call_args[0][0], expected)
+ self.assertTrue(mock_sysexit.called)
+ self.assertEqual(stdo.getvalue(), '')
+ self.assertEqual(stde.getvalue(), expected)
+ reset_all()
+
+ # In verbose mode regular messages go into syslog and stdout.
+ logger.enable_verbose()
+ logger.log('log message')
+ expected = 'log message\n'
+ self.assertTrue(mock_syslog.syslog.called)
+ self.assertEqual(mock_syslog.syslog.call_args[0][0], expected)
+ self.assertFalse(mock_sysexit.called)
+ self.assertEqual(stdo.getvalue(), expected)
+ self.assertEqual(stde.getvalue(), '')
+ reset_all()
+
+ # Newline is not added if suppressed by the caller.
+ logger.log('log message', end='')
+ expected = 'log message'
+ self.assertTrue(mock_syslog.syslog.called)
+ self.assertEqual(mock_syslog.syslog.call_args[0][0], expected)
+ self.assertFalse(mock_sysexit.called)
+ self.assertEqual(stdo.getvalue(), expected)
+ self.assertEqual(stde.getvalue(), '')
+
+ def test_cr50_packet(self):
+ 'Verify proper creation of TPM the Vendor command.'
+ p = ap_ro_hash.Cr50TpmPacket(10)
+ empty_packet = bytearray([128, 1, 0, 0, 0, 12, 32, 0, 0, 0, 0, 10])
+ self.assertEqual(p.packet(), empty_packet)
+ data_packet = bytearray([128, 1, 0, 0, 0, 16, 32, 0, 0,
+ 0, 0, 10, 1, 2, 3, 4])
+ p.add_data(bytearray([1, 2, 3, 4]))
+ self.assertEqual(p.packet(), data_packet)
+
+ def test_cr50_response(self):
+ 'Verify Cr50TpmResponse implementation.'
+ response_data = bytearray([128, 1, 0, 0, 0, 12, 0, 0, 0, 0, 0, 10])
+
+ def verify_assert(message, corrupt_offset=None):
+ """Introduce verify that incorrect data triggers exception.
+
+ Use response_data as the response body. If corrupt_offset is not
+ None - modify the value at the offset before instantiating the
+ packet, and then restore it in the end.
+ """
+ if corrupt_offset != None:
+ response_data[corrupt_offset] += 1
+ with self.assertRaises(ap_ro_hash.ApRoTpmResponseError) as cm:
+ ap_ro_hash.Cr50TpmResponse(response_data, 10)
+ self.assertTrue(str(cm.exception).startswith(message))
+ if corrupt_offset != None:
+ response_data[corrupt_offset] -= 1
+
+ r = ap_ro_hash.Cr50TpmResponse(response_data)
+ self.assertEqual(r.rc, 0)
+ self.assertEqual(r.payload, bytearray([]))
+
+ # Corrupt the tag field.
+ verify_assert('unexpected TPM tag', 0)
+
+ # Modify the length field.
+ verify_assert('length mismatch', 5)
+
+ # and modify the subcommand value
+ verify_assert('subcommand mismatch', len(response_data) - 1)
+
+ # Set the rc value.
+ response_data[9] = 1
+ r = ap_ro_hash.Cr50TpmResponse(response_data)
+ self.assertEqual(r.rc, 1)
+ self.assertEqual(r.payload, bytearray([]))
+
+ # Add payload (and increase the length), and verify that the payload
+ # is retrieved as expected.
+ response_data[5] += 1
+ response_data.append(5)
+ r = ap_ro_hash.Cr50TpmResponse(response_data)
+ self.assertEqual(r.rc, 1)
+ self.assertEqual(r.payload, bytearray([5]))
+
+ # Create a valid structured but too long response.
+ response_data.append(5)
+ response_data[5] += 1
+ verify_assert('unexpected response length')
+
+ @patch('ap_ro_hash.Logger._write')
+ @patch('ap_ro_hash.os.open')
+ def test_tpm_channel(self, mock_open, mock_log_write):
+ """Verify TpmChannel implementation.
+
+ Use mocks to simulate the OSError exception to trigger the switch
+ between /dev/tpm0 and trunks_send.
+ """
+ ap_ro_hash.Log = ap_ro_hash.Logger()
+ c = ap_ro_hash.TpmChannel()
+ self.assertTrue(mock_open.called)
+ self.assertEqual(mock_log_write.call_args[0][2], 'will use /dev/tpm0')
+ mock_open.reset_mock()
+ mock_log_write.reset_mock()
+ mock_open.side_effect = OSError
+ c = ap_ro_hash.TpmChannel()
+ self.assertTrue(mock_open.called)
+ self.assertEqual(mock_log_write.call_args[0][2], 'will use trunks_send')
+ c.write(bytearray([1, 2, 3])) # Does not really matter what we write.
+ ap_ro_hash.Cr50TpmResponse(c.read())
+ self.assertEqual(c.read(), bytes())
+
+ def test_get_fmap(self):
+ 'Verify proper processing of dump_fmap output.'
+ # Equivalent representation of fmap_sample defined above.
+ expected_fmap = dict({
+ 'RW_A': (0, 24576),
+ 'RW_B': (24576, 24576),
+ 'WP_RO': (49152, 16384),
+ 'RO_VPD': (49152, 64),
+ 'RO_UNUSED': (49216, 6064),
+ 'RO_SECTION': (55280, 10256),
+ })
+ with tempfile.TemporaryDirectory() as tmpd:
+ fmap = ap_ro_hash.read_fmap(tmpd)
+ self.assertEqual(len(fmap), len(expected_fmap))
+ for k, v in fmap.items():
+ self.assertTrue(k in expected_fmap)
+ self.assertEqual(expected_fmap[k], v)
+
+ def test_ranges_errors(self):
+ 'Verify proper detection of address range errors.'
+ # Command line parameters and associated substrings to be found in the
+ # corresponding error messages.
+ bad_ranges_sets = (
+ (['0:100'], 'is outside'),
+ (['RW-A', 'RW_B'], 'not in FMAP', 'is outside'),
+ (['RO_VPD', 'RO_UNUSED', '1000:100'], 'is outside'),
+ (['RO_VPD', 'RO_UNUSED', 'c030:100'], 'overlaps'),
+ )
+ for rset in bad_ranges_sets:
+ ranges = rset[0]
+ with self.assertRaises(ap_ro_hash.ApRoHashCmdLineError) as cm:
+ ap_ro_hash.main(ranges)
+ for err_msg in rset[1:]:
+ self.assertTrue(err_msg in str(cm.exception))
+
+ @patch('ap_ro_hash.TpmChannel.__init__')
+ @patch('ap_ro_hash.TpmChannel.write')
+ @patch('ap_ro_hash.TpmChannel.read')
+ def test_end_to_end(self, mock_cread, mock_cwrite, mock_cinit):
+ """Test end to end processing.
+
+ Includes validating command line arguments, generating the fmap
+ dictionary from the mock dump_fmap output, calculating hash on the
+ mock binary file and creating the vendor command including the sha256
+ hash and the ranges.
+
+ Mocking TpmChannel allows to verify the vendor command contents.
+ """
+ exp_vendor_command = bytes([
+ 0x80, 0x01, 0x00, 0x00, 0x00, 0x44, 0x20, 0x00, 0x00, 0x00, 0x00,
+ 0x36, 0xad, 0x75, 0x79, 0x56, 0x27, 0xdf, 0x99, 0x22, 0xa3, 0x01,
+ 0xd1, 0x66, 0xc6, 0xb4, 0xb4, 0xc8, 0x94, 0xf7, 0x94, 0x48, 0x6d,
+ 0x91, 0x7f, 0xbb, 0x2d, 0x85, 0x4c, 0x69, 0x72, 0xe2, 0xce, 0x5d,
+ 0x00, 0xc0, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x40, 0xc0, 0x00,
+ 0x00, 0x10, 0x00, 0x00, 0x00, 0xf0, 0xd7, 0x00, 0x00, 0x10, 0x28,
+ 0x00, 0x00])
+ mock_cinit.return_value = None
+ mock_cread.return_value = bytes(TPM_OK_RESPONSE)
+ with io.StringIO() as stdo:
+ with contextlib.redirect_stdout(stdo):
+ ap_ro_hash.main(['RO_VPD', 'RO_SECTION', 'c040:10'])
+ self.assertEqual(stdo.getvalue(), 'SUCCEEDED\n')
+ self.assertEqual(mock_cwrite.call_args[0][0], exp_vendor_command)
+
+
+if __name__ == '__main__':
+ unittest.main()