#!/usr/bin/env python # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Flash PD PSU RW firmware over the USBPD comm channel using console. Example: util/flash_pd.py ./build/zinger/ec.RW.bin """ import array import errno import logging import optparse import os import re import socket import sys import time import serial # TODO(tbroch): Discuss adding hdctools as an EC package RDEPENDS from servo import client from servo import multiservo VERSION = '0.0.2' # RW area is half of the 32-kB MAX_FW_SIZE = 16 * 1024 # 20 first bytes of SHA-256 of RW when erased (set to all F's) ERASED_RW_HASH = 'd86670be 559860c7 2b2149e8 d2ae1104 9550e093' class FlashPDError(Exception): """Exception class for flash_pd utility.""" class FlashPD(client.ServoClient): """class to flash PD MCU. Note, Some designs(samus) have multiple embedded MCUs. In that case the convention is to name the pty associated with usbpd as 'usbpd_uart_pty'. In the case where there is only one MCU we prefer 'usbpd_uart_pty' but will also associate 'ecu_uart_pty' with having capability to flash the USB-PD capable PSU(zinger). Attributes: _options : Values instance from optparse. Public Methods: expect : Examine console output for an expected response. flash_command : Write a PD flash command and interrogate its result. get_version : Retrieve current version of PD FW. """ def __init__(self, options): """Constructor. Args: options : Values instance from optparse. Raises: FlashPDError: If unable to determine the console pty """ super(FlashPD, self).__init__(host=options.server, port=options.port) self._options = options self._serial = None try: pty = self.get('usbpd_uart_pty') except socket.error as e: raise FlashPDError('Can\'t connect to servod :: %s' % e) except client.ServoClientError: pty = self.get('ec_uart_pty') if not pty: raise FlashPDError('Unable to determine EC uart from servod') logging.debug('Opening serial connection to %s', pty) try: self._serial = serial.Serial(pty, timeout=1) except OSError as e: if e.errno == errno.EAGAIN: # try twice if already open EAGAIN failure causes disconnect. self._serial = serial.Serial(pty, timeout=1) else: raise FlashPDError('%s' % e) # quiet other channels that might pollute console. self._serial.write('chan 1\n') self._serial.flushOutput() self._serial.flushInput() def __del__(self): """Deconstructor.""" if self._serial: for l in self._serial: logging.debug('flash: %s', l) self._serial.write('chan 0xffffffff\n') self._serial.write('chan restore\n') self._serial.close() def expect(self, val, timeout=5): """Scan serial output for particular string. Args: val : string to look for timeout : integer seconds to look before timing out. Returns: tuple : boolean if 'val' found in console output. string of line that had 'val' in it. """ done = False deadline = time.time() + timeout while not done and (time.time() < deadline): l = None for l in self._serial: done = val in l logging.debug('Is %s in: %s', val, l) if done or time.time() > deadline: break if not done: logging.debug("Expect '%s' missing", val) return (done, l) def pd_command(self, cmd, expect='DONE 0', retries=2, ignore_fail=False): """Send PD command and interrogate output. Args: cmd : string of 'pd ' command to execute expect : string of expected response after 'cmd' retries : integer number of times to repeat command if it fails. ignore_fail : boolean to ignore failure Returns: tuple : found : boolean, whether response matches expected. line : string of line returned by expect method. Raises: FlashPDError: if command failed to match expected return string after retries. """ tries = retries + 1 for i in xrange(tries): self._serial.write('pd %d %s\n' % (self._options.multiport, cmd)) (found, line) = self.expect(expect) if i: time.sleep(1) logging.debug("pd cmd Retry%d for '%s'", i, cmd) if found: break if (i + 1) == tries and not found and not ignore_fail: raise FlashPDError("Failed pd cmd: '%s' after %d retries\n" % (cmd, retries)) return (found, line) def flash_command(self, cmd, expect='DONE 0', retries=2, ignore_fail=False): """Helper method.""" flash_cmd = 'flash %s' % cmd return self.pd_command(flash_cmd, expect, retries, ignore_fail) def get_version(self): """Retreive PSU firmware version. Looks like: 'version: zinger_v1.1.1917-bfd' Returns: version : string of version Raises: FlashPDError : if can't determine version """ (found, line) = self.flash_command('version', expect='version:') logging.debug('is version in: %s', line) m = False if found: m = re.match(r'.*version:\s+(\w+_v\d+\.\d+\.\d+-\S+).*', line) if not m: raise FlashPDError('Unable to determine PD FW version') return m.group(1) def reboot(self): """Reboot PSU. Use 'version' to poll for success after DONE encountered. Raises: FlashPDError : If unable to reboot """ self.flash_command('reboot', expect=r'DONE', retries=0) self.flash_command('version', retries=10) def flash_pd(options): """Flash power delivery firmware.""" ec = FlashPD(options) with open(options.firmware) as fd: fw = fd.read() fw_size = len(fw) # The RW firmware should be already padded and signed if fw_size != MAX_FW_SIZE: raise FlashPDError('Bad RW firmware size %d/%d' % (fw_size, MAX_FW_SIZE)) words = array.array('I', fw) logging.info('Current PD FW version is %s', ec.get_version()) if options.versiononly: return logging.info('Flashing %d bytes', fw_size) # reset flashed signature to reboot in RO ec.flash_command('signature') # reboot in RO ec.reboot() # erase all RW partition ec.flash_command('erase') # try info command and guarantee we're in RO (done, line) = ec.flash_command('info', expect=r'INFO') m = re.match(r'INFO.*(18d1\S{4})', line) if done and m: done = ec.expect('DONE 0') in_rw = int(m.group(1), 16) & 0x1 if in_rw: raise FlashPDError('Not in RO after erase') # Google UFP devices share their hash to DFP after info command so check it (done, _) = ec.pd_command('hash', expect=ERASED_RW_HASH) if not done: raise FlashPDError('Erase failed') logging.info('Successfully erased flash.') if options.eraseonly: ec.reboot() logging.info('After erase, FW version is %s', ec.get_version()) return # write firmware content for i in xrange(len(words) / 6): chunk = words[i * 6: (i + 1) * 6] cmd = ' '.join(['%08x' % (w) for w in chunk]) ec.flash_command(cmd) if not i % 0x10: logging.info('Chunk %d of %d done.', i, len(words) / 6) # write the remaining words chunk = words[len(words) / 6 * 6:] cmd = ' '.join(['%08x' % (w) for w in chunk]) ec.flash_command(cmd) # reboot in RW ec.reboot() logging.info('Flashing DONE.') logging.info('New PD FW version is %s', ec.get_version()) def parse_args(): """Parse commandline arguments. Note, reads sys.argv directly Returns: options : dict of from optparse.parse_args(). Raises: FlashPDError : If problems with arguments """ description = ( '%prog [] ' '\n' '%prog is a utility for flashing the USB-PD charger RW firmware over ' 'the USB-PD communication channel using PD MCU console commands.' ) examples = ( '\nExamples:\n' ' %prog build/zinger/ec.RW.flat\n' ) parser = optparse.OptionParser(version='%prog ' + VERSION) parser.description = description parser.add_option('-d', '--debug', action='store_true', default=False, help='enable debug messages.') parser.add_option('-s', '--server', help='host where servod is running', default=client.DEFAULT_HOST) parser.add_option('-p', '--port', default=client.DEFAULT_PORT, type=int, help='port servod is listening on.') parser.add_option('-m', '--multiport', default=0, type=int, help='If design has multiple type-C ports, this identifies ' 'which one has USB PD PSU.') parser.add_option('', '--timeout', default=5, type=int, help='Timeout seconds to wait for console output.') parser.add_option('', '--eraseonly', action='store_true', default=False, help='Only erase RW portion and exit.') parser.add_option('-V', '--versiononly', action='store_true', default=False, help='Only read version and exit.') multiservo.add_multiservo_parser_options(parser) parser.set_usage(parser.get_usage() + examples) (options, args) = parser.parse_args() # TODO(tbroch) Add this once we refactor module to ease use in scripts. if options.name: raise NotImplementedError('Multiservo support TBD') # Add after to enumerate options.firmware but outside 'help' generation parser.add_option('-f', '', action='store', type='string', dest='firmware') if len(args) != 1: raise FlashPDError('Must supply power delivery firmware to write.') options.firmware = args[0] if not os.path.exists(options.firmware): raise FlashPDError('Unable to find file %s' % options.firmware) fw_size = os.path.getsize(options.firmware) if fw_size > MAX_FW_SIZE: raise FlashPDError('Firmware too large %d/%d' % (fw_size, MAX_FW_SIZE)) return options def main_function(): options = parse_args() loglevel = logging.INFO log_format = '%(asctime)s - %(name)s - %(levelname)s' if options.debug: loglevel = logging.DEBUG log_format += ' - %(filename)s:%(lineno)d:%(funcName)s' log_format += ' - %(message)s' logging.basicConfig(level=loglevel, format=log_format) flash_pd(options) def main(): """Main function wrapper to catch exceptions properly.""" try: main_function() except KeyboardInterrupt: sys.exit(0) except FlashPDError as e: print 'Error: ', e.message sys.exit(1) if __name__ == '__main__': main()