summaryrefslogtreecommitdiff
path: root/util/ap_ro_hash.py
blob: 6c14c111bd1cc01cbde6d7aa1cc717f8ea6cda76 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
#!/usr/bin/env python3
# -*- coding: utf-8 -*-"
# Copyright 2020 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""ap_ro_hash.py - script for generating hashes of AP RO sections.

This script is supposed to run as part of the factory image of a Chrome OS
device, with limited Chrome OS specific Python modules availability.

The command line parameters are a set of one or more AP RO firmware address
ranges. A range could be expressed as either two colon separated hex numbers
(the offset of the range base address into the flash space and the size of the
range) or a text string (the name of a flash map section).

The script does the following:

 - using the flashrom utility read the fmap section from the AP firmware chip;

 - using dump_fmap retrieve addresses of all sections of the image, of
   interest is the WP_RO section which encompasses the entire RO space of the
   flash chip;

 - verify input parameters: make sure that input ranges and fmap sections do
   not overlap, and that all off the ranges and sections fall into WP_RO;

 - create a layout file to direct flashrom to read only the ranges of interest
   and read the flash again, this time creating a file with the required
   sections present;

 - calculate the sha256 sum of the required sections;

 - prepare and send to the Cr50 a vendor command including the ranges and the
   calculated checksum;

 - examine the Cr50 return code and report errors, if any.
"""

import argparse
import hashlib
import os
import struct
import subprocess
import sys
import syslog
import tempfile


class ApRoHashCmdLineError(Exception):
    'Exceptions due to command line arguments errors'
    pass


class ApRoHashExecutionError(Exception):
    'Exceptions due to command system command execution errors'
    pass


class ApRoTpmResponseError(Exception):
    'Exceptions due to TPM command execution errors'
    pass

VENDOR_CC_SEED_AP_RO_CHECK = 54

# Code returned by Cr50 if a vendor command is not supported.
VENDOR_RC_NO_SUCH_COMMAND_ERROR = 0x57f

# The tag and format are the same for command and response.
TPM_TAG = 0x8001
HEADER_FMT = '>H2LH'
MAX_BLOCK_SIZE = (4 * 1024 * 1024)

class Logger(object):
    """A class to support printing into a log and on the console when required.

    Attributes:
        _print_to_console: bool, if true - print on the console in addition to
            the syslog.
    """
    def __init__(self):
        self._print_to_console = False

    def _write(self, prefix, end, *args):
        """Common function for message handling.

        All messages are sent to syslog. Error and fatal messages are also
        always sent to the console to stderr.

        If _print_to_console is True, the regular messages are also printed to
        stdout.

        If prefix is set to FATAL, the program terminates.

        Args:
            prefix: string, one of '', 'ERROR', or 'FATAL'
            end: string, a character to add in the end (facilitates printing
                the newline the same as print())
            args: strings to print, concatenated with a space.
        """
        text = ' '.join(args) + end
        if prefix:
            text = '%s: %s' % (prefix, text)

        syslog.syslog(text)
        if self._print_to_console or prefix in ('ERROR', 'FATAL'):
            if prefix:
                outf = sys.stderr
            else:
                outf = sys.stdout
            print(text, file=outf, end='')
        if prefix == 'FATAL':
            sys.exit(1)

    def enable_verbose(self):
        'Enable verbose logging (printing on the console).'
        self._print_to_console = True

    def log(self, *args, end='\n'):
        'Process regular log message.'
        self._write('', end, *args)

    def error(self, *args, end='\n'):
        'Process non fatal error message.'
        self._write('ERROR', end, *args)

    def fatal(self, *args, end='\n'):
        'Process fatal level error message.'
        self._write('FATAL', end, *args)


LOG = Logger()

class Cr50TpmPacket(object):
    """Class to represent a TPM vendor command packet.

    Cr50 TPM vendor command response packets have the following format, all
    header fields are in big endian order:

    <tag><total length><vendor command><sub command><payload, variable length>
      |        |             |              +---- 2 bytes
      |        |             +------------------- 4 bytes
      |        +--------------------------------- 4 bytes
      +------------------------------------------ 2 bytes

    This class allows to accumulate data of the vendor command payload, and
    generate the full vendor command packet when requested.

    Attributes:
        _subcmd: int, the vendor subcommand
        _data: bytes, the vendor command payload
    """
    TPM_COMMAND = 0x20000000

    def __init__(self, subcmd):
        self._subcmd = subcmd
        self._data = bytes()

    def add_data(self, data):
        'Add data to the vendor command payload.'
        self._data += data

    def packet(self):
        """Generate full vendor command packet using subcommand and payload.

        Returns:
            A byte array which is a fully formatted vendor command packet.
        """
        header = struct.pack(HEADER_FMT,
                             TPM_TAG,
                             struct.calcsize(HEADER_FMT) + len(self._data),
                             self.TPM_COMMAND,
                             self._subcmd)
        return bytearray(header) + self._data


class Cr50TpmResponse(object):
    """Class to represent a TPM response packet.

    Cr50 TPM vendor command response packets have the following format, all
    header fields are in big endian order:

    <tag><total length><return code><sub command>[<data (optional)>]
      |        |             |           +---- 2 bytes
      |        |             +---------------- 4 bytes
      |        +------------------------------ 4 bytes
      +--------------------------------------- 2 bytes

    This class takes a byte buffer, runs basic verification (expected 'tag',
    'total length' matching the buffer length, 'sub command' matching the
    expected value when provided. If verification succeeds, the 'total
    length', 'return code' and 'data' fields are save for future reference.

    If verification fails, appropriate exceptions are raised.

    Attributes:
        _rc: int, 'return code' from packet header.
        _length: int, number of bytes in the packet
        _payload: bytes, contents of the data field, could be empty
    """
    def __init__(self, response, exp_subcmd=None):
        header_size = struct.calcsize(HEADER_FMT)
        if len(response) < header_size:
            raise ApRoTpmResponseError(
                'response too short (%d bytes)' % len(response))
        tag, self._length, self._rc, subcmd = struct.unpack_from(HEADER_FMT,
                                                                 response)
        if tag != TPM_TAG:
            raise ApRoTpmResponseError('unexpected TPM tag %04x' % tag)
        if self._length != len(response):
            raise ApRoTpmResponseError('length mismatch (%d != %d)' %
                                       (self._length, len(response)))
        if self._length not in (header_size, header_size + 1):
            raise ApRoTpmResponseError('unexpected response length %d' %
                                       (self._length))

        if exp_subcmd != None and exp_subcmd != subcmd:
            raise ApRoTpmResponseError('subcommand mismatch (%04x != %04x)' %
                                       (subcmd, exp_subcmd))
        self._payload = response[header_size:]

    @property
    def rc(self):
        'Get the response return code.'
        return self._rc

    @property
    def payload(self):
        'Get the response payload.'
        return self._payload


class TpmChannel(object):
    """Class to represent a channel to communicate with the TPM

    Communications could happen over /dev/tpm0 directly, if it is available,
    or through trunksd using the trunks_send utility.

    Attributes:
        _os_dev: int, file device number of the file opened to communicate
                 with the TPM. Set to None if opening attempt returned an
                 error.
        _response: bytes, data received from trunks_send. Not applicable in
                 case communications use /dev/tpm0.
    """
    def __init__(self):
        try:
            self._os_dev = os.open('/dev/tpm0', os.O_RDWR)
            LOG.log('will use /dev/tpm0')
        except OSError:
            self._os_dev = None
            LOG.log('will use trunks_send')
        self._response = bytes()

    def write(self, data):
        """Send command to the TPM.

        Args:
            data: byte array, the fully TPM command (i.e. Cr50 vendor
                command).
        """
        if self._os_dev:
            LOG.log('will call write to send %d bytes' % len(data), end='')
            rv = os.write(self._os_dev, data)
            LOG.log(', sent %d' % rv)
            return
        command = '/usr/sbin/trunks_send --raw ' + ' '.join(
            '%02x' % x for x in data)
        rv = run(command, ignore_error=False)[0]
        rv_hex = [rv[2*x:2*x+2] for x in range(int(len(rv)/2))]
        self._response = bytes([int(x, 16) for x in rv_hex])

    def read(self):
        """Read TPM response.

        Read the response directly from /dev/tpm0 or use as a response the
        string returned by the previous trunks_send invocation.

        Returns:
            A byte array, the contents of the TPM response packet.
        """
        if self._os_dev:
            # We don't expect much, but let's allow for a long response packet.
            return os.read(self._os_dev, 1000)
        rv = self._response
        self._response = bytes()
        return rv


def run(command, ignore_error=True):
    """Run system command.

    The command could be passed as a string (in which case every word in the
    string is considered a separate command line element) or as a list of
    strings.

    Args:
        command: string or list of strings. If string is given, it is
            converted into a list of strings before proceeding.
        ignore_error: Bool, if set to False and command execution fails, raise
            the exception.

    Returns:
        A tuple of two possibly multiline strings, the stdio and stderr
        generated while the command was being executed.

    Raises:
        ApRoHashExecutionError in case the executed command reported a non
            zero return status, and ignore_error is False
    """
    if isinstance(command, str):
        command = command.split()
    LOG.log('will run "%s"' % ' '.join(command))

    rv = subprocess.run(command,
                        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if ignore_error and rv.returncode != 0:
        raise ApRoHashExecutionError(' '.join(command))

    return(rv.stdout.decode('ascii'), rv.stderr.decode('ascii'))


def send_to_cr50(ranges, digest):
    """Program RO hash information in Cr50.

    This function creates a vendor command packet which will allow Cr50 to
    save ranges and the sha256 hash of the AP flash RO space which needs to be
    verified.

    This vendor command payload has the following format:

    <32 bytes of sha256 digest><range>[<range>..]

    The Cr50 response contains the return code (rc) in the header set to zero
    on success, or a generic error on failure. In case of failure the actual
    error code is delivered as the last byte of the response packet.

    Args:
        ranges: a list of two tuples of ints, base addresses and sizes of
            hashed flash areas
        digest: combined 32 bit sha256 digest of all ranges

    Returns:
        An integer value, zero on success, of the actual error code on failure.
    """
    subcmd = VENDOR_CC_SEED_AP_RO_CHECK
    p = Cr50TpmPacket(subcmd)
    p.add_data(digest)
    for r in ranges:
        for n in r:
            p.add_data(bytearray(struct.pack('<L', n)))
    channel = TpmChannel()
    channel.write(p.packet())
    tpm_response = Cr50TpmResponse(channel.read(), subcmd)
    # Return payload value if present, if not - the header RC value.
    if tpm_response.payload:
        return int(tpm_response.payload[0])
    return tpm_response.rc


def read_fmap(tmpd):
    """Read AP firmware fmap section and convert it into a dictionary.

    The flashrom utility is used to read the only section of the AP firmware,
    the fmap, into a file. Then the dump_fmap utility is used to print the
    fmap contents, which consists of a set of section descriptions of the
    following structure:

       area:            22
       area_offset:     0x00c00000
       area_size:       0x00400000 (4194304)
       area_name:       WP_RO

    This function parses the section descriptions to generate a dictionary
    where the keys are area_names, and values are two int tuples of
    (area_offset, area_size).

    Args:
        tmpd: string, directory to use to store temp files.

    Returns:
        The generated dictionary.
    """
    fmap_file = os.path.join(tmpd, 'fmap.bin')
    run('flashrom -i FMAP -r %s' % fmap_file, ignore_error=False)
    fmap_text = run('dump_fmap -F ' + fmap_file, ignore_error=False)[0]

    fmap = {}
    offset = 0
    size = 0
    for line in fmap_text.splitlines():
        # line format
        # start_offset:end_offset secton_name
        r, name = line.split()
        start, end = r.split(':')
        offset = int(start, 16)
        end_offset = int(end, 16)
        size = end_offset - offset + 1
        fmap[name] = (offset, size)
        LOG.log('%20s 0x%08x:0x%08x %s' % (name, offset, size, r))
    return fmap


def verify_ranges(ranges, limit):
    """Verify that all ranges fall into the limit.

    Args:
        ranges: a sorted list of two tuples of ints, base addresses and sizes
            of hashed flash areas
        limit: a range all ranges' elements must fit into

    Returns:
        A string describing non compliant ranges, an empty string if no problems
        have been found.
    """
    base = limit[0]
    top = base + limit[1]
    errors = []
    for i, r in enumerate(ranges):
        rerrors = []
        rbase = r[0]
        rtop = rbase + r[1]
        if rbase < base or rtop > top:
            rerrors.append('is outside the RO area',)
        if i > 0:
            prev_r = ranges[i - 1]
            if prev_r[0] + prev_r[1] > rbase:
                rerrors.append('overlaps with %x:%x' % (prev_r[0], prev_r[1],))
        if rerrors:
            errors.append(' '.join(['Range %x:%x:' % (r[0], r[1]),] + rerrors))

    return '\n'.join(errors)


def read_ro_ranges(check_file_name, tmpd, ranges):
    """Read firmware ranges into a file.

    Based on the passed in ranges create a layout file to store the list of
    sections to be read by flashrom, and read these sections of the AP
    firmware into a file.

    Args:
        check_file_name: string, name of the file to read into
        tmpd: string, name of the directory used for the output and layout files
        ranges: a list of two int elements, base address and size of each area
    """
    layout_file = os.path.join(tmpd, 'layout')
    read_cmd = ['flashrom', '-l', layout_file, '-r', check_file_name]
    with open(layout_file, 'w') as lf:
        for i, r in enumerate(ranges):
            name = 'section%d' % i
            lf.write('%x:%x %s\n' % (r[0], r[0] + r[1] - 1, name))
            read_cmd += ['-i', name]
    run(read_cmd, ignore_error=False)


def calculate_hash(ro_file, ranges):
    """Calculate sha256 hashes of ranges in the file.

    Args:
        ro_file: string, name of the file containing the areas to be hashed
        ranges: a list of two int elements, base address and size of each area

    Returns:
        A byte array, the calculated sha256 hash of all areas concatenated.
    """
    sha256 = hashlib.sha256()
    with open(ro_file, 'rb') as rof:
        for offset, size in ranges:
            rof.seek(offset)
            while size:
                buf = rof.read(size)
                if not buf:
                    raise ApRoHashCmdLineError(
                        'image is too small for range starting at 0x%x'
                        % offset)
                sha256.update(buf)
                size -= len(buf)

    return sha256.digest()


usage_str = """
%s: [-v] <range>|<fmap_area> [<range>|<fmap_area>...]
         <range>: two colon separated hex values, AP flash area offset and size
         <fmap_area>: symbolic name of the area as reported by dump_fmap
         All ranges and fmap areas must fit into the WP_RO FMAP area
"""  % sys.argv[0].split('/')[-1]

def get_args(args):
    """Prepare argument parser and retrieve command line arguments.

    Returns the parser object with a namespace with all present optional
    arguments set.
    """
    parser = argparse.ArgumentParser(usage=usage_str)
    parser.add_argument(
        '--verbose', '-v',
        type=bool,
        help=('enable verbose logging on the console'),
        default=False)

    return parser.parse_known_args(args)


def main(args):
    'Main function, receives a list of strings, command line arguments'

    # Map of possible error codes returned by Cr50 (both vendor command and
    # subcommand level errors) into strings.
    error_codes = {
        1 : 'Vendor command too short',
        2 : 'Vendor command size mismatch',
        3 : 'Bad offset value',
        4 : 'Bad range size',
        5 : 'Already programmed',
        6 : 'Flash write failed',
        7 : 'BID programmed',
        8 : 'Flash erase failed',
        VENDOR_RC_NO_SUCH_COMMAND_ERROR : 'Insufficient C50 version',
    }

    nsp, rest = get_args(args)

    if nsp.verbose:
        LOG.enable_verbose()

    ranges = []     # Ranges from the command line, including FMAP sections.

    # Let's keep all temp files in an ephemeral temp directory.
    with tempfile.TemporaryDirectory() as tmpd:
        bad_ranges = []	       # Invalid ranges, if any.
        bad_section_names = [] # Invalid section names, if any.

        fmap = read_fmap(tmpd)


        for arg in rest:
            if arg in fmap:
                offset, size = fmap[arg]
                LOG.log('Using %r range from fmap %08x:%08x' %
                        (arg, offset, size))
            elif ':' in arg:
                try:
                    offset_str, size_str = arg.split(':', 1)
                    offset = int(offset_str, 16)
                    size = int(size_str, 16)
                except ValueError:
                    bad_ranges.append(arg)
                    continue
            else:
                bad_section_names.append(arg)
                continue

            while size:
                chunk = min(size, MAX_BLOCK_SIZE)
                LOG.log('Add range - %08x:%08x' % (offset, chunk))
                ranges.append((offset, chunk))
                size -= chunk
                offset += chunk

        error_msg = ''
        if bad_ranges:
            error_msg += 'Ranges %s not valid\n' % ' '.join(bad_ranges)
        if bad_section_names:
            error_msg += ('Section(s) "%s" not in FMAP\n' %
                          '" "'.join(bad_section_names))

        if ranges:
            # Make sure the list is sorted by the first element.
            ranges.sort(key=lambda x: x[0])
            LOG.log('Ranges: %r' % ranges)

            # Make sure ranges do not overlap and fall into the WP_RO section.
            error_msg += verify_ranges(ranges, fmap['WP_RO'])

        if error_msg:
            raise ApRoHashCmdLineError(error_msg)

        if not ranges:
            raise ApRoHashCmdLineError('no hashing ranges specified')

        ro_check_file = os.path.join(tmpd, 'ro_check.bin')
        read_ro_ranges(ro_check_file, tmpd, ranges)
        digest = calculate_hash(ro_check_file, ranges)

    LOG.log('Saving digest:', digest.hex())
    rv = send_to_cr50(ranges, digest)
    if rv != 0:
        err_str = error_codes.get(rv, 'Unknown')
        LOG.error('Cr50 returned %s%x (%s)' % (
            # Add 0x prefix if value exceeds 9.
            '0x' if rv > 9 else '',
            rv, err_str))
        return rv
    print('SUCCEEDED')
    return 0

if __name__ == '__main__':
    try:
        main(sys.argv[1:])
    except ApRoHashCmdLineError as e:
        LOG.fatal('%s' % e)
    except ApRoHashExecutionError as e:
        LOG.fatal('command \"%s\" failed' % e)
    except ApRoTpmResponseError as e:
        LOG.fatal('TPM response problem: \"%s\"' % e)