From dc6ce9503a4c3f8e8f3a80e6cb79f5269272ebf0 Mon Sep 17 00:00:00 2001 From: Vadim Bendebury Date: Fri, 17 Apr 2020 17:01:29 -0700 Subject: util: script for generating RO hashes This python script will be run in a Chrome OS factory image, with limited availability of chromite libraries. The command line parameters of the script are a set of AP firmware address ranges and FMAP section names. The script does the following: - use flashrom to read the FMAP area from the AP flash and dump_fmap to generate the flash map description. - verify that section names passed in as parameters (if any) are indeed are present in the flash map. - verify that all passed in ranges and sections fit into the WP_RO area of the flash (as defined if the flash map). - prepare a layout file to instruct flashrom to read only the sections of interest (as defined by ranges and section names passed in the command line). - use flashrom again to read the required sections of the AP flash into a file. - read the file and and pass the required sections through the sha256 hash calculation. - prepare the Cr50 vendor command to pass information about the flash ranges and the sha256 sum to Cr50 and send the command. A unit test is also being added. BUG=b:153764696 TEST=./util/test_ap_ro_hash.py succeeds. with the rest of the patches added end to end AP RO verification procedure also succeeds. Signed-off-by: Vadim Bendebury Change-Id: Ic0fa3759b3a32db8cf521be28c3c7dfe0cd35278 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2161576 Reviewed-by: Mary Ruthven --- util/ap_ro_hash.py | 564 ++++++++++++++++++++++++++++++++++++++++++++++++ util/test_ap_ro_hash.py | 303 ++++++++++++++++++++++++++ 2 files changed, 867 insertions(+) create mode 100755 util/ap_ro_hash.py create mode 100755 util/test_ap_ro_hash.py diff --git a/util/ap_ro_hash.py b/util/ap_ro_hash.py new file mode 100755 index 0000000000..a6a8fc2837 --- /dev/null +++ b/util/ap_ro_hash.py @@ -0,0 +1,564 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*-" +# Copyright 2020 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""ap_ro_hash.py - script for generating hashes of AP RO sections. + +This script is supposed to run as part of the factory image of a Chrome OS +device, with limited Chrome OS specific Python modules availability. + +The command line parameters are a set of one or more AP RO firmware address +ranges. A range could be expressed as either two colon separated hex numbers +(the offset of the range base address into the flash space and the size of the +range) or a text string (the name of a flash map section). + +The script does the following: + + - using the flashrom utility read the fmap section from the AP firmware chip; + + - using dump_fmap retrieve addresses of all sections of the image, of + interest is the WP_RO section which encompasses the entire RO space of the + flash chip; + + - verify input parameters: make sure that input ranges and fmap sections do + not overlap, and that all off the ranges and sections fall into WP_RO; + + - create a layout file to direct flashrom to read only the ranges of interest + and read the flash again, this time creating a file with the required + sections present; + + - calculate the sha256 sum of the required sections; + + - prepare and send to the Cr50 a vendor command including the ranges and the + calculated checksum; + + - examine the Cr50 return code and report errors, if any. +""" + +import argparse +import hashlib +import os +import struct +import subprocess +import sys +import syslog +import tempfile + + +class ApRoHashCmdLineError(Exception): + 'Exceptions due to command line arguments errors' + pass + + +class ApRoHashExecutionError(Exception): + 'Exceptions due to command system command execution errors' + pass + + +class ApRoTpmResponseError(Exception): + 'Exceptions due to TPM command execution errors' + pass + +VENDOR_CC_SEED_AP_RO_CHECK = 54 + +# The tag and format are the same for command and response. +TPM_TAG = 0x8001 +HEADER_FMT = '>H2LH' + +class Logger(object): + """A class to support printing into a log and on the console when required. + + Attributes: + _print_to_console: bool, if true - print on the console in addition to + the syslog. + """ + def __init__(self): + self._print_to_console = False + + def _write(self, prefix, end, *args): + """Common function for message handling. + + All messages are sent to syslog. Error and fatal messages are also + always sent to the console to stderr. + + If _print_to_console is True, the regular messages are also printed to + stdout. + + If prefix is set to FATAL, the program terminates. + + Args: + prefix: string, one of '', 'ERROR', or 'FATAL' + end: string, a character to add in the end (facilitates printing + the newline the same as print()) + args: strings to print, concatenated with a space. + """ + text = ' '.join(args) + end + if prefix: + text = '%s: %s' % (prefix, text) + + syslog.syslog(text) + if self._print_to_console or prefix in ('ERROR', 'FATAL'): + if prefix: + outf = sys.stderr + else: + outf = sys.stdout + print(text, file=outf, end='') + if prefix == 'FATAL': + sys.exit(1) + + def enable_verbose(self): + 'Enable verbose logging (printing on the console).' + self._print_to_console = True + + def log(self, *args, end='\n'): + 'Process regular log message.' + self._write('', end, *args) + + def error(self, *args, end='\n'): + 'Process non fatal error message.' + self._write('ERROR', end, *args) + + def fatal(self, *args, end='\n'): + 'Process fatal level error message.' + self._write('FATAL', end, *args) + + +LOG = Logger() + +class Cr50TpmPacket(object): + """Class to represent a TPM vendor command packet. + + Cr50 TPM vendor command response packets have the following format, all + header fields are in big endian order: + + + | | | +---- 2 bytes + | | +------------------- 4 bytes + | +--------------------------------- 4 bytes + +------------------------------------------ 2 bytes + + This class allows to accumulate data of the vendor command payload, and + generate the full vendor command packet when requested. + + Attributes: + _subcmd: int, the vendor subcommand + _data: bytes, the vendor command payload + """ + TPM_COMMAND = 0x20000000 + + def __init__(self, subcmd): + self._subcmd = subcmd + self._data = bytes() + + def add_data(self, data): + 'Add data to the vendor command payload.' + self._data += data + + def packet(self): + """Generate full vendor command packet using subcommand and payload. + + Returns: + A byte array which is a fully formatted vendor command packet. + """ + header = struct.pack(HEADER_FMT, + TPM_TAG, + struct.calcsize(HEADER_FMT) + len(self._data), + self.TPM_COMMAND, + self._subcmd) + return bytearray(header) + self._data + + +class Cr50TpmResponse(object): + """Class to represent a TPM response packet. + + Cr50 TPM vendor command response packets have the following format, all + header fields are in big endian order: + + [] + | | | +---- 2 bytes + | | +---------------- 4 bytes + | +------------------------------ 4 bytes + +--------------------------------------- 2 bytes + + This class takes a byte buffer, runs basic verification (expected 'tag', + 'total length' matching the buffer length, 'sub command' matching the + expected value when provided. If verification succeeds, the 'total + length', 'return code' and 'data' fields are save for future reference. + + If verification fails, appropriate exceptions are raised. + + Attributes: + _rc: int, 'return code' from packet header. + _length: int, number of bytes in the packet + _payload: bytes, contents of the data field, could be empty + """ + def __init__(self, response, exp_subcmd=None): + header_size = struct.calcsize(HEADER_FMT) + if len(response) < header_size: + raise ApRoTpmResponseError( + 'response too short (%d bytes)' % len(response)) + tag, self._length, self._rc, subcmd = struct.unpack_from(HEADER_FMT, + response) + if tag != TPM_TAG: + raise ApRoTpmResponseError('unexpected TPM tag %04x' % tag) + if self._length != len(response): + raise ApRoTpmResponseError('length mismatch (%d != %d)' % + (self._length, len(response))) + if self._length not in (header_size, header_size + 1): + raise ApRoTpmResponseError('unexpected response length %d' % + (self._length)) + + if exp_subcmd != None and exp_subcmd != subcmd: + raise ApRoTpmResponseError('subcommand mismatch (%04x != %04x)' % + (subcmd, exp_subcmd)) + self._payload = response[header_size:] + + @property + def rc(self): + 'Get the response return code.' + return self._rc + + @property + def payload(self): + 'Get the response payload.' + return self._payload + + +class TpmChannel(object): + """Class to represent a channel to communicate with the TPM + + Communications could happen over /dev/tpm0 directly, if it is available, + or through trunksd using the trunks_send utility. + + Attributes: + _os_dev: int, file device number of the file opened to communicate + with the TPM. Set to None if opening attempt returned an + error. + _response: bytes, data received from trunks_send. Not applicable in + case communications use /dev/tpm0. + """ + def __init__(self): + try: + self._os_dev = os.open('/dev/tpm0', os.O_RDWR) + LOG.log('will use /dev/tpm0') + except OSError: + self._os_dev = None + LOG.log('will use trunks_send') + self._response = bytes() + + def write(self, data): + """Send command to the TPM. + + Args: + data: byte array, the fully TPM command (i.e. Cr50 vendor + command). + """ + if self._os_dev: + LOG.log('will call write to send %d bytes' % len(data), end='') + rv = os.write(self._os_dev, data) + LOG.log(', sent %d' % rv) + return + command = '/usr/sbin/trunks_send --raw ' + ' '.join( + '%02x' % x for x in data) + rv = run(command, ignore_error=False)[0] + rv_hex = [rv[2*x:2*x+2] for x in range(int(len(rv)/2))] + self._response = bytes([int(x, 16) for x in rv_hex]) + + def read(self): + """Read TPM response. + + Read the response directly from /dev/tpm0 or use as a response the + string returned by the previous trunks_send invocation. + + Returns: + A byte array, the contents of the TPM response packet. + """ + if self._os_dev: + # We don't expect much, but let's allow for a long response packet. + return os.read(self._os_dev, 1000) + rv = self._response + self._response = bytes() + return rv + + +def run(command, ignore_error=True): + """Run system command. + + The command could be passed as a string (in which case every word in the + string is considered a separate command line element) or as a list of + strings. + + Args: + command: string or list of strings. If string is given, it is + converted into a list of strings before proceeding. + ignore_error: Bool, if set to False and command execution fails, raise + the exception. + + Returns: + A tuple of two possibly multiline strings, the stdio and stderr + generated while the command was being executed. + + Raises: + ApRoHashExecutionError in case the executed command reported a non + zero return status, and ignore_error is False + """ + if isinstance(command, str): + command = command.split() + LOG.log('will run "%s"' % ' '.join(command)) + + rv = subprocess.run(command, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if ignore_error and rv.returncode != 0: + raise ApRoHashExecutionError(' '.join(command)) + + return(rv.stdout.decode('ascii'), rv.stderr.decode('ascii')) + + +def send_to_cr50(ranges, digest): + """Program RO hash information in Cr50. + + This function creates a vendor command packet which will allow Cr50 to + save ranges and the sha256 hash of the AP flash RO space which needs to be + verified. + + This vendor command payload has the following format: + + <32 bytes of sha256 digest>[..] + + The Cr50 response contains the return code (rc) in the header set to zero + on success, or a generic error on failure. In case of failure the actual + error code is delivered as the last byte of the response packet. + + Args: + ranges: a list of two tuples of ints, base addresses and sizes of + hashed flash areas + digest: combined 32 bit sha256 digest of all ranges + + Returns: + An integer value, zero on success, of the actual error code on failure. + """ + subcmd = VENDOR_CC_SEED_AP_RO_CHECK + p = Cr50TpmPacket(subcmd) + p.add_data(digest) + for r in ranges: + for n in r: + p.add_data(bytearray(struct.pack(' top: + rerrors.append('is outside the RO area',) + if i > 0: + prev_r = ranges[i - 1] + if prev_r[0] + prev_r[1] > rbase: + rerrors.append('overlaps with %x:%x' % (prev_r[0], prev_r[1],)) + if rerrors: + errors.append(' '.join(['Range %x:%x:' % (r[0], r[1]),] + rerrors)) + + return '\n'.join(errors) + + +def read_ro_ranges(check_file_name, tmpd, ranges): + """Read firmware ranges into a file. + + Based on the passed in ranges create a layout file to store the list of + sections to be read by flashrom, and read these sections of the AP + firmware into a file. + + Args: + check_file_name: string, name of the file to read into + tmpd: string, name of the directory used for the output and layout files + ranges: a list of two int elements, base address and size of each area + """ + layout_file = os.path.join(tmpd, 'layout') + read_cmd = ['flashrom', '-l', layout_file, '-r', check_file_name] + with open(layout_file, 'w') as lf: + for i, r in enumerate(ranges): + name = 'section%d' % i + lf.write('%x:%x %s\n' % (r[0], r[0] + r[1] - 1, name)) + read_cmd += ['-i', name] + run(read_cmd, ignore_error=False) + + +def calculate_hash(ro_file, ranges): + """Calculate sha256 hashes of ranges in the file. + + Args: + ro_file: string, name of the file containing the areas to be hashed + ranges: a list of two int elements, base address and size of each area + + Returns: + A byte array, the calculated sha256 hash of all areas concatenated. + """ + sha256 = hashlib.sha256() + with open(ro_file, 'rb') as rof: + for offset, size in ranges: + rof.seek(offset) + while size: + buf = rof.read(size) + if not buf: + raise ApRoHashCmdLineError( + 'image is too small for range starting at 0x%x' + % offset) + sha256.update(buf) + size -= len(buf) + + return sha256.digest() + + +def get_args(args): + """Prepare argument parser and retrieve command line arguments. + + Returns the parser object with a namespace with all present optional + arguments set. + """ + parser = argparse.ArgumentParser(description='AP RO Hashing utility') + + parser.add_argument( + '--verbose', '-v', + type=bool, + help=('enable verbose logging on the console'), + default=False) + + return parser.parse_known_args(args) + + +def main(args): + 'Main function, receives a list of strings, command line arguments' + + nsp, rest = get_args(args) + + if nsp.verbose: + LOG.enable_verbose() + + ranges = [] # Ranges from the command line, including FMAP sections. + + # Let's keep all temp files in an ephemeral temp directory. + with tempfile.TemporaryDirectory() as tmpd: + bad_ranges = [] # Invalid ranges, if any. + bad_section_names = [] # Invalid section names, if any. + + fmap = read_fmap(tmpd) + + + for arg in rest: + if ':' in arg: + try: + ranges.append(([int('%s' % x, 16) for + x in arg.split(':', 1)]),) + except ValueError: + bad_ranges.append(arg) + continue + if arg not in fmap: + bad_section_names.append(arg) + continue + ranges.append(fmap[arg]) + + if not ranges: + raise ApRoHashCmdLineError('no hashing ranges specified') + + error_msg = '' + if bad_ranges: + error_msg += 'Ranges %s not valid\n' % ' '.join(bad_ranges) + if bad_section_names: + error_msg += ('Section(s) "%s" not in FMAP\n' % + '" "'.join(bad_section_names)) + + # Make sure the list is sorted by the first element. + ranges.sort(key=lambda x: x[0]) + + # Make sure ranges do not overlap and fall into the WP_RO section. + error_msg += verify_ranges(ranges, fmap['WP_RO']) + if error_msg: + raise ApRoHashCmdLineError(error_msg) + + ro_check_file = os.path.join(tmpd, 'ro_check.bin') + read_ro_ranges(ro_check_file, tmpd, ranges) + digest = calculate_hash(ro_check_file, ranges) + + rv = send_to_cr50(ranges, digest) + if rv != 0: + LOG.error('Cr50 returned error %d' % rv) + return rv + print('SUCCEEDED') + return 0 + +if __name__ == '__main__': + try: + main(sys.argv[1:]) + except ApRoHashCmdLineError as e: + LOG.fatal('%s' % e) + except ApRoHashExecutionError as e: + LOG.fatal('command \"%s\" failed' % e) + except ApRoTpmResponseError as e: + LOG.fatal('TPM response problem: \"%s\"' % e) diff --git a/util/test_ap_ro_hash.py b/util/test_ap_ro_hash.py new file mode 100755 index 0000000000..ad84d41927 --- /dev/null +++ b/util/test_ap_ro_hash.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*-" +# Copyright 2020 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +'Unit tests for ap_ro_hash.py' + +import contextlib +import io +import tempfile +import unittest + +from unittest.mock import patch + +import ap_ro_hash + +TEST_FILE_SIZE = 64 * 1024 +TPM_OK_RESPONSE = (128, 1, 0, 0, 0, 12, 0, 0, 0, 0, 0, 54) + +# pylint: disable=unused-argument +def mocked_run(command, ignore_error=True): + """Test mock of ap_ro_hash.run(). + + Adding this mock allows to simulate various aspects of expected + Chrome OS device behavior. + """ + # Simulated dump_fmap output. + fmap_sample = """ +area: 1 +area_offset: 0x00000000 +area_size: 0x00006000 (27576) +area_name: RW_A +area: 2 +area_offset: 0x00006000 +area_size: 0x00006000 (27576) +area_name: RW_B +area: 3 +area_offset: 0x0000c000 +area_size: 0x00004000 (16384) +area_name: WP_RO +area: 4 +area_offset: 0x0000c000 +area_size: 0x00000040 (64) +area_name: RO_VPD +area: 5 +area_offset: 0x0000c040 +area_size: 0x000017b0 (6064) +area_name: RO_UNUSED +area: 6 +area_offset: 0x0000d7f0 +area_size: 0x00002810 (10256) +area_name: RO_SECTION +""" + if isinstance(command, list): + command = ' '.join(command) + if command.startswith('dump_fmap'): + return fmap_sample.strip(), '' + if command.startswith('/usr/sbin/trunks_send'): + # Simulated trunks_send always reports success. + return ''.join('%02x' % x for x in TPM_OK_RESPONSE), '' + if command.startswith('flashrom'): + if '-l' in command and '-r' in command: + # This is a request to read flash sections for hashing. Let's + # create a file with fixed contents large enough to cover all + # sections. File will be saved in the temp directory created by + # ap_ro_hash.main(). + fname = command.split('-r')[1].strip().split()[0] + with open(fname, 'ab') as ff: + togo = TEST_FILE_SIZE + # Fill the file with repeating pattern of 0..255. + filler = bytearray([x for x in range(256)]) + while togo: + size = min(togo, len(filler)) + ff.write(filler[:size]) + togo -= size + return '', '' +# pylint: disable= + +ap_ro_hash.run = mocked_run + +class TestApRoHash(unittest.TestCase): + 'Test class for ap_ro_hash.py.' + + @patch('ap_ro_hash.sys.exit') + @patch('ap_ro_hash.syslog') + def test_logger(self, mock_syslog, mock_sysexit): + 'Verify the Logger class behavior.' + # Intercept writes into stdout and stderr + with io.StringIO() as stdo, io.StringIO() as stde: + with contextlib.redirect_stdout( + stdo), contextlib.redirect_stderr(stde): + + logger = ap_ro_hash.Logger() + + def reset_all(): + 'Restore mocks and io before the next test.' + for m in mock_syslog, mock_sysexit: + m.reset_mock() + for s in stdo, stde: + s.truncate(0) + s.seek(0) + + # Regular message goes to syslog only. + logger.log('log message') + self.assertTrue(mock_syslog.syslog.called) + self.assertEqual(mock_syslog.syslog.call_args[0][0], + 'log message\n') + self.assertFalse(mock_sysexit.called) + self.assertEqual(stdo.getvalue(), '') + self.assertEqual(stde.getvalue(), '') + reset_all() + + # Error message goes to syslog and stderr. + logger.error('error message') + expected = 'ERROR: error message\n' + self.assertTrue(mock_syslog.syslog.called) + self.assertEqual(mock_syslog.syslog.call_args[0][0], expected) + self.assertFalse(mock_sysexit.called) + self.assertEqual(stdo.getvalue(), '') + self.assertEqual(stde.getvalue(), expected) + reset_all() + + # Error message goes to syslog and stderr, and triggers a call + # to sys.exit(). + logger.fatal('fatal error') + expected = 'FATAL: fatal error\n' + self.assertTrue(mock_syslog.syslog.called) + self.assertEqual(mock_syslog.syslog.call_args[0][0], expected) + self.assertTrue(mock_sysexit.called) + self.assertEqual(stdo.getvalue(), '') + self.assertEqual(stde.getvalue(), expected) + reset_all() + + # In verbose mode regular messages go into syslog and stdout. + logger.enable_verbose() + logger.log('log message') + expected = 'log message\n' + self.assertTrue(mock_syslog.syslog.called) + self.assertEqual(mock_syslog.syslog.call_args[0][0], expected) + self.assertFalse(mock_sysexit.called) + self.assertEqual(stdo.getvalue(), expected) + self.assertEqual(stde.getvalue(), '') + reset_all() + + # Newline is not added if suppressed by the caller. + logger.log('log message', end='') + expected = 'log message' + self.assertTrue(mock_syslog.syslog.called) + self.assertEqual(mock_syslog.syslog.call_args[0][0], expected) + self.assertFalse(mock_sysexit.called) + self.assertEqual(stdo.getvalue(), expected) + self.assertEqual(stde.getvalue(), '') + + def test_cr50_packet(self): + 'Verify proper creation of TPM the Vendor command.' + p = ap_ro_hash.Cr50TpmPacket(10) + empty_packet = bytearray([128, 1, 0, 0, 0, 12, 32, 0, 0, 0, 0, 10]) + self.assertEqual(p.packet(), empty_packet) + data_packet = bytearray([128, 1, 0, 0, 0, 16, 32, 0, 0, + 0, 0, 10, 1, 2, 3, 4]) + p.add_data(bytearray([1, 2, 3, 4])) + self.assertEqual(p.packet(), data_packet) + + def test_cr50_response(self): + 'Verify Cr50TpmResponse implementation.' + response_data = bytearray([128, 1, 0, 0, 0, 12, 0, 0, 0, 0, 0, 10]) + + def verify_assert(message, corrupt_offset=None): + """Introduce verify that incorrect data triggers exception. + + Use response_data as the response body. If corrupt_offset is not + None - modify the value at the offset before instantiating the + packet, and then restore it in the end. + """ + if corrupt_offset != None: + response_data[corrupt_offset] += 1 + with self.assertRaises(ap_ro_hash.ApRoTpmResponseError) as cm: + ap_ro_hash.Cr50TpmResponse(response_data, 10) + self.assertTrue(str(cm.exception).startswith(message)) + if corrupt_offset != None: + response_data[corrupt_offset] -= 1 + + r = ap_ro_hash.Cr50TpmResponse(response_data) + self.assertEqual(r.rc, 0) + self.assertEqual(r.payload, bytearray([])) + + # Corrupt the tag field. + verify_assert('unexpected TPM tag', 0) + + # Modify the length field. + verify_assert('length mismatch', 5) + + # and modify the subcommand value + verify_assert('subcommand mismatch', len(response_data) - 1) + + # Set the rc value. + response_data[9] = 1 + r = ap_ro_hash.Cr50TpmResponse(response_data) + self.assertEqual(r.rc, 1) + self.assertEqual(r.payload, bytearray([])) + + # Add payload (and increase the length), and verify that the payload + # is retrieved as expected. + response_data[5] += 1 + response_data.append(5) + r = ap_ro_hash.Cr50TpmResponse(response_data) + self.assertEqual(r.rc, 1) + self.assertEqual(r.payload, bytearray([5])) + + # Create a valid structured but too long response. + response_data.append(5) + response_data[5] += 1 + verify_assert('unexpected response length') + + @patch('ap_ro_hash.Logger._write') + @patch('ap_ro_hash.os.open') + def test_tpm_channel(self, mock_open, mock_log_write): + """Verify TpmChannel implementation. + + Use mocks to simulate the OSError exception to trigger the switch + between /dev/tpm0 and trunks_send. + """ + ap_ro_hash.Log = ap_ro_hash.Logger() + c = ap_ro_hash.TpmChannel() + self.assertTrue(mock_open.called) + self.assertEqual(mock_log_write.call_args[0][2], 'will use /dev/tpm0') + mock_open.reset_mock() + mock_log_write.reset_mock() + mock_open.side_effect = OSError + c = ap_ro_hash.TpmChannel() + self.assertTrue(mock_open.called) + self.assertEqual(mock_log_write.call_args[0][2], 'will use trunks_send') + c.write(bytearray([1, 2, 3])) # Does not really matter what we write. + ap_ro_hash.Cr50TpmResponse(c.read()) + self.assertEqual(c.read(), bytes()) + + def test_get_fmap(self): + 'Verify proper processing of dump_fmap output.' + # Equivalent representation of fmap_sample defined above. + expected_fmap = dict({ + 'RW_A': (0, 24576), + 'RW_B': (24576, 24576), + 'WP_RO': (49152, 16384), + 'RO_VPD': (49152, 64), + 'RO_UNUSED': (49216, 6064), + 'RO_SECTION': (55280, 10256), + }) + with tempfile.TemporaryDirectory() as tmpd: + fmap = ap_ro_hash.read_fmap(tmpd) + self.assertEqual(len(fmap), len(expected_fmap)) + for k, v in fmap.items(): + self.assertTrue(k in expected_fmap) + self.assertEqual(expected_fmap[k], v) + + def test_ranges_errors(self): + 'Verify proper detection of address range errors.' + # Command line parameters and associated substrings to be found in the + # corresponding error messages. + bad_ranges_sets = ( + (['0:100'], 'is outside'), + (['RW-A', 'RW_B'], 'not in FMAP', 'is outside'), + (['RO_VPD', 'RO_UNUSED', '1000:100'], 'is outside'), + (['RO_VPD', 'RO_UNUSED', 'c030:100'], 'overlaps'), + ) + for rset in bad_ranges_sets: + ranges = rset[0] + with self.assertRaises(ap_ro_hash.ApRoHashCmdLineError) as cm: + ap_ro_hash.main(ranges) + for err_msg in rset[1:]: + self.assertTrue(err_msg in str(cm.exception)) + + @patch('ap_ro_hash.TpmChannel.__init__') + @patch('ap_ro_hash.TpmChannel.write') + @patch('ap_ro_hash.TpmChannel.read') + def test_end_to_end(self, mock_cread, mock_cwrite, mock_cinit): + """Test end to end processing. + + Includes validating command line arguments, generating the fmap + dictionary from the mock dump_fmap output, calculating hash on the + mock binary file and creating the vendor command including the sha256 + hash and the ranges. + + Mocking TpmChannel allows to verify the vendor command contents. + """ + exp_vendor_command = bytes([ + 0x80, 0x01, 0x00, 0x00, 0x00, 0x44, 0x20, 0x00, 0x00, 0x00, 0x00, + 0x36, 0xad, 0x75, 0x79, 0x56, 0x27, 0xdf, 0x99, 0x22, 0xa3, 0x01, + 0xd1, 0x66, 0xc6, 0xb4, 0xb4, 0xc8, 0x94, 0xf7, 0x94, 0x48, 0x6d, + 0x91, 0x7f, 0xbb, 0x2d, 0x85, 0x4c, 0x69, 0x72, 0xe2, 0xce, 0x5d, + 0x00, 0xc0, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x40, 0xc0, 0x00, + 0x00, 0x10, 0x00, 0x00, 0x00, 0xf0, 0xd7, 0x00, 0x00, 0x10, 0x28, + 0x00, 0x00]) + mock_cinit.return_value = None + mock_cread.return_value = bytes(TPM_OK_RESPONSE) + with io.StringIO() as stdo: + with contextlib.redirect_stdout(stdo): + ap_ro_hash.main(['RO_VPD', 'RO_SECTION', 'c040:10']) + self.assertEqual(stdo.getvalue(), 'SUCCEEDED\n') + self.assertEqual(mock_cwrite.call_args[0][0], exp_vendor_command) + + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.1