#!/usr/bin/env python3 # Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Statically analyze stack usage of EC firmware. Example: extra/stack_analyzer/stack_analyzer.py \ --export_taskinfo ./build/elm/util/export_taskinfo.so \ --section RW \ ./build/elm/RW/ec.RW.elf """ from __future__ import print_function import argparse import collections import ctypes import os import re import subprocess import yaml SECTION_RO = 'RO' SECTION_RW = 'RW' # Default size of extra stack frame needed by exception context switch. # This value is for cortex-m with FPU enabled. DEFAULT_EXCEPTION_FRAME_SIZE = 224 class StackAnalyzerError(Exception): """Exception class for stack analyzer utility.""" class TaskInfo(ctypes.Structure): """Taskinfo ctypes structure. The structure definition is corresponding to the "struct taskinfo" in "util/export_taskinfo.so.c". """ _fields_ = [('name', ctypes.c_char_p), ('routine', ctypes.c_char_p), ('stack_size', ctypes.c_uint32)] class Task(object): """Task information. Attributes: name: Task name. routine_name: Routine function name. stack_max_size: Max stack size. routine_address: Resolved routine address. None if it hasn't been resolved. """ def __init__(self, name, routine_name, stack_max_size, routine_address=None): """Constructor. Args: name: Task name. routine_name: Routine function name. stack_max_size: Max stack size. routine_address: Resolved routine address. """ self.name = name self.routine_name = routine_name self.stack_max_size = stack_max_size self.routine_address = routine_address def __eq__(self, other): """Task equality. Args: other: The compared object. Returns: True if equal, False if not. """ if not isinstance(other, Task): return False return (self.name == other.name and self.routine_name == other.routine_name and self.stack_max_size == other.stack_max_size and self.routine_address == other.routine_address) class Symbol(object): """Symbol information. Attributes: address: Symbol address. symtype: Symbol type, 'O' (data, object) or 'F' (function). size: Symbol size. name: Symbol name. """ def __init__(self, address, symtype, size, name): """Constructor. Args: address: Symbol address. symtype: Symbol type. size: Symbol size. name: Symbol name. """ assert symtype in ['O', 'F'] self.address = address self.symtype = symtype self.size = size self.name = name def __eq__(self, other): """Symbol equality. Args: other: The compared object. Returns: True if equal, False if not. """ if not isinstance(other, Symbol): return False return (self.address == other.address and self.symtype == other.symtype and self.size == other.size and self.name == other.name) class Callsite(object): """Function callsite. Attributes: address: Address of callsite location. None if it is unknown. target: Callee address. None if it is unknown. is_tail: A bool indicates that it is a tailing call. callee: Resolved callee function. None if it hasn't been resolved. """ def __init__(self, address, target, is_tail, callee=None): """Constructor. Args: address: Address of callsite location. None if it is unknown. target: Callee address. None if it is unknown. is_tail: A bool indicates that it is a tailing call. (function jump to another function without restoring the stack frame) callee: Resolved callee function. """ # It makes no sense that both address and target are unknown. assert not (address is None and target is None) self.address = address self.target = target self.is_tail = is_tail self.callee = callee def __eq__(self, other): """Callsite equality. Args: other: The compared object. Returns: True if equal, False if not. """ if not isinstance(other, Callsite): return False if not (self.address == other.address and self.target == other.target and self.is_tail == other.is_tail): return False if self.callee is None: return other.callee is None elif other.callee is None: return False # Assume the addresses of functions are unique. return self.callee.address == other.callee.address class Function(object): """Function. Attributes: address: Address of function. name: Name of function from its symbol. stack_frame: Size of stack frame. callsites: Callsite list. stack_max_usage: Max stack usage. None if it hasn't been analyzed. stack_max_path: Max stack usage path. None if it hasn't been analyzed. """ def __init__(self, address, name, stack_frame, callsites): """Constructor. Args: address: Address of function. name: Name of function from its symbol. stack_frame: Size of stack frame. callsites: Callsite list. """ self.address = address self.name = name self.stack_frame = stack_frame self.callsites = callsites self.stack_max_usage = None self.stack_max_path = None def __eq__(self, other): """Function equality. Args: other: The compared object. Returns: True if equal, False if not. """ if not isinstance(other, Function): return False if not (self.address == other.address and self.name == other.name and self.stack_frame == other.stack_frame and self.callsites == other.callsites and self.stack_max_usage == other.stack_max_usage): return False if self.stack_max_path is None: return other.stack_max_path is None elif other.stack_max_path is None: return False if len(self.stack_max_path) != len(other.stack_max_path): return False for self_func, other_func in zip(self.stack_max_path, other.stack_max_path): # Assume the addresses of functions are unique. if self_func.address != other_func.address: return False return True def __hash__(self): return id(self) class AndesAnalyzer(object): """Disassembly analyzer for Andes architecture. Public Methods: AnalyzeFunction: Analyze stack frame and callsites of the function. """ GENERAL_PURPOSE_REGISTER_SIZE = 4 # Possible condition code suffixes. CONDITION_CODES = [ 'eq', 'eqz', 'gez', 'gtz', 'lez', 'ltz', 'ne', 'nez', 'eqc', 'nec', 'nezs', 'nes', 'eqs'] CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>' # Branch instructions. JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$' \ .format(CONDITION_CODES_RE)) # Call instructions. CALL_OPCODE_RE = re.compile \ (r'^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$') CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE)) # Ignore lp register because it's for return. INDIRECT_CALL_OPERAND_RE = re.compile \ (r'^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$') # TODO: Handle other kinds of store instructions. PUSH_OPCODE_RE = re.compile(r'^push(\d{1,})$') PUSH_OPERAND_RE = re.compile(r'^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}') SMW_OPCODE_RE = re.compile(r'^smw(\.\w\w|\.\w\w\w)$') SMW_OPERAND_RE = re.compile(r'^(\$r\d{1,}|\$\wp), \[\$\wp\], ' r'(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}') OPERANDGROUP_RE = re.compile(r'^\$r\d{1,}\~\$r\d{1,}') LWI_OPCODE_RE = re.compile(r'^lwi(\.\w\w)$') LWI_PC_OPERAND_RE = re.compile(r'^\$pc, \[([^\]]+)\]') # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec" # Assume there is always a "\t" after the hex data. DISASM_REGEX_RE = re.compile(r'^(?P
[0-9A-Fa-f]+):\s+' r'(?P[0-9A-Fa-f ]+)' r'\t\s*(?P\S+)(\s+(?P[^;]*))?') def ParseInstruction(self, line, function_end): """Parse the line of instruction. Args: line: Text of disassembly. function_end: End address of the current function. None if unknown. Returns: (address, words, opcode, operand_text): The instruction address, words, opcode, and the text of operands. None if it isn't an instruction line. """ result = self.DISASM_REGEX_RE.match(line) if result is None: return None address = int(result.group('address'), 16) # Check if it's out of bound. if function_end is not None and address >= function_end: return None opcode = result.group('opcode').strip() operand_text = result.group('operand') words = result.group('words') if operand_text is None: operand_text = '' else: operand_text = operand_text.strip() return (address, words, opcode, operand_text) def AnalyzeFunction(self, function_symbol, instructions): stack_frame = 0 callsites = [] for address, words, opcode, operand_text in instructions: is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None if is_jump_opcode or is_call_opcode: is_tail = is_jump_opcode result = self.CALL_OPERAND_RE.match(operand_text) if result is None: if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): # Found an indirect call. callsites.append(Callsite(address, None, is_tail)) else: target_address = int(result.group(1), 16) # Filter out the in-function target (branches and in-function calls, # which are actually branches). if not (function_symbol.size > 0 and function_symbol.address < target_address < (function_symbol.address + function_symbol.size)): # Maybe it is a callsite. callsites.append(Callsite(address, target_address, is_tail)) elif self.LWI_OPCODE_RE.match(opcode) is not None: result = self.LWI_PC_OPERAND_RE.match(operand_text) if result is not None: # Ignore "lwi $pc, [$sp], xx" because it's usually a return. if result.group(1) != '$sp': # Found an indirect call. callsites.append(Callsite(address, None, True)) elif self.PUSH_OPCODE_RE.match(opcode) is not None: # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp} if self.PUSH_OPERAND_RE.match(operand_text) is not None: # capture fc 20 imm5u = int(words.split(' ')[1], 16) # sp = sp - (imm5u << 3) imm8u = (imm5u<<3) & 0xff stack_frame += imm8u result = self.PUSH_OPERAND_RE.match(operand_text) operandgroup_text = result.group(1) # capture $rx~$ry if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: # capture number & transfer string to integer oprandgrouphead = operandgroup_text.split(',')[0] rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0]))) ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1]))) stack_frame += ((len(operandgroup_text.split(','))+ry-rx) * self.GENERAL_PURPOSE_REGISTER_SIZE) else: stack_frame += (len(operandgroup_text.split(',')) * self.GENERAL_PURPOSE_REGISTER_SIZE) elif self.SMW_OPCODE_RE.match(opcode) is not None: # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp} if self.SMW_OPERAND_RE.match(operand_text) is not None: result = self.SMW_OPERAND_RE.match(operand_text) operandgroup_text = result.group(3) # capture $rx~$ry if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: # capture number & transfer string to integer oprandgrouphead = operandgroup_text.split(',')[0] rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0]))) ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1]))) stack_frame += ((len(operandgroup_text.split(','))+ry-rx) * self.GENERAL_PURPOSE_REGISTER_SIZE) else: stack_frame += (len(operandgroup_text.split(',')) * self.GENERAL_PURPOSE_REGISTER_SIZE) return (stack_frame, callsites) class ArmAnalyzer(object): """Disassembly analyzer for ARM architecture. Public Methods: AnalyzeFunction: Analyze stack frame and callsites of the function. """ GENERAL_PURPOSE_REGISTER_SIZE = 4 # Possible condition code suffixes. CONDITION_CODES = ['', 'eq', 'ne', 'cs', 'hs', 'cc', 'lo', 'mi', 'pl', 'vs', 'vc', 'hi', 'ls', 'ge', 'lt', 'gt', 'le'] CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) # Assume there is no function name containing ">". IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>' # Fuzzy regular expressions for instruction and operand parsing. # Branch instructions. JUMP_OPCODE_RE = re.compile( r'^(b{0}|bx{0})(\.\w)?$'.format(CONDITION_CODES_RE)) # Call instructions. CALL_OPCODE_RE = re.compile( r'^(bl{0}|blx{0})(\.\w)?$'.format(CONDITION_CODES_RE)) CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE)) CBZ_CBNZ_OPCODE_RE = re.compile(r'^(cbz|cbnz)(\.\w)?$') # Example: "r0, 1009bcbe " CBZ_CBNZ_OPERAND_RE = re.compile(r'^[^,]+,\s+{}$'.format(IMM_ADDRESS_RE)) # Ignore lr register because it's for return. INDIRECT_CALL_OPERAND_RE = re.compile(r'^r\d+|sb|sl|fp|ip|sp|pc$') # TODO(cheyuw): Handle conditional versions of following # instructions. # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc). LDR_OPCODE_RE = re.compile(r'^ldr(\.\w)?$') # Example: "pc, [sp], #4" LDR_PC_OPERAND_RE = re.compile(r'^pc, \[([^\]]+)\]') # TODO(cheyuw): Handle other kinds of stm instructions. PUSH_OPCODE_RE = re.compile(r'^push$') STM_OPCODE_RE = re.compile(r'^stmdb$') # Stack subtraction instructions. SUB_OPCODE_RE = re.compile(r'^sub(s|w)?(\.\w)?$') SUB_OPERAND_RE = re.compile(r'^sp[^#]+#(\d+)') # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68" # Assume there is always a "\t" after the hex data. DISASM_REGEX_RE = re.compile(r'^(?P
[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+' r'\t\s*(?P\S+)(\s+(?P[^;]*))?') def ParseInstruction(self, line, function_end): """Parse the line of instruction. Args: line: Text of disassembly. function_end: End address of the current function. None if unknown. Returns: (address, opcode, operand_text): The instruction address, opcode, and the text of operands. None if it isn't an instruction line. """ result = self.DISASM_REGEX_RE.match(line) if result is None: return None address = int(result.group('address'), 16) # Check if it's out of bound. if function_end is not None and address >= function_end: return None opcode = result.group('opcode').strip() operand_text = result.group('operand') if operand_text is None: operand_text = '' else: operand_text = operand_text.strip() return (address, opcode, operand_text) def AnalyzeFunction(self, function_symbol, instructions): """Analyze function, resolve the size of stack frame and callsites. Args: function_symbol: Function symbol. instructions: Instruction list. Returns: (stack_frame, callsites): Size of stack frame, callsite list. """ stack_frame = 0 callsites = [] for address, opcode, operand_text in instructions: is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode: is_tail = is_jump_opcode or is_cbz_cbnz_opcode if is_cbz_cbnz_opcode: result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text) else: result = self.CALL_OPERAND_RE.match(operand_text) if result is None: # Failed to match immediate address, maybe it is an indirect call. # CBZ and CBNZ can't be indirect calls. if (not is_cbz_cbnz_opcode and self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): # Found an indirect call. callsites.append(Callsite(address, None, is_tail)) else: target_address = int(result.group(1), 16) # Filter out the in-function target (branches and in-function calls, # which are actually branches). if not (function_symbol.size > 0 and function_symbol.address < target_address < (function_symbol.address + function_symbol.size)): # Maybe it is a callsite. callsites.append(Callsite(address, target_address, is_tail)) elif self.LDR_OPCODE_RE.match(opcode) is not None: result = self.LDR_PC_OPERAND_RE.match(operand_text) if result is not None: # Ignore "ldr pc, [sp], xx" because it's usually a return. if result.group(1) != 'sp': # Found an indirect call. callsites.append(Callsite(address, None, True)) elif self.PUSH_OPCODE_RE.match(opcode) is not None: # Example: "{r4, r5, r6, r7, lr}" stack_frame += (len(operand_text.split(',')) * self.GENERAL_PURPOSE_REGISTER_SIZE) elif self.SUB_OPCODE_RE.match(opcode) is not None: result = self.SUB_OPERAND_RE.match(operand_text) if result is not None: stack_frame += int(result.group(1)) else: # Unhandled stack register subtraction. assert not operand_text.startswith('sp') elif self.STM_OPCODE_RE.match(opcode) is not None: if operand_text.startswith('sp!'): # Subtract and writeback to stack register. # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}" # Get the text of pushed register list. unused_sp, unused_sep, parameter_text = operand_text.partition(',') stack_frame += (len(parameter_text.split(',')) * self.GENERAL_PURPOSE_REGISTER_SIZE) return (stack_frame, callsites) class StackAnalyzer(object): """Class to analyze stack usage. Public Methods: Analyze: Run the stack analysis. """ C_FUNCTION_NAME = r'_A-Za-z0-9' # Assume there is no ":" in the path. # Example: "driver/accel_kionix.c:321 (discriminator 3)" ADDRTOLINE_RE = re.compile( r'^(?P[^:]+):(?P\d+)(\s+\(discriminator\s+\d+\))?$') # To eliminate the suffix appended by compilers, try to extract the # C function name from the prefix of symbol name. # Example: "SHA256_transform.constprop.28" FUNCTION_PREFIX_NAME_RE = re.compile( r'^(?P[{0}]+)([^{0}].*)?$'.format(C_FUNCTION_NAME)) # Errors of annotation resolving. ANNOTATION_ERROR_INVALID = 'invalid signature' ANNOTATION_ERROR_NOTFOUND = 'function is not found' ANNOTATION_ERROR_AMBIGUOUS = 'signature is ambiguous' def __init__(self, options, symbols, rodata, tasklist, annotation): """Constructor. Args: options: Namespace from argparse.parse_args(). symbols: Symbol list. rodata: Content of .rodata section (offset, data) tasklist: Task list. annotation: Annotation config. """ self.options = options self.symbols = symbols self.rodata_offset = rodata[0] self.rodata = rodata[1] self.tasklist = tasklist self.annotation = annotation self.address_to_line_cache = {} def AddressToLine(self, address, resolve_inline=False): """Convert address to line. Args: address: Target address. resolve_inline: Output the stack of inlining. Returns: lines: List of the corresponding lines. Raises: StackAnalyzerError: If addr2line is failed. """ cache_key = (address, resolve_inline) if cache_key in self.address_to_line_cache: return self.address_to_line_cache[cache_key] try: args = [self.options.addr2line, '-f', '-e', self.options.elf_path, '{:x}'.format(address)] if resolve_inline: args.append('-i') line_text = subprocess.check_output(args, encoding='utf-8') except subprocess.CalledProcessError: raise StackAnalyzerError('addr2line failed to resolve lines.') except OSError: raise StackAnalyzerError('Failed to run addr2line.') lines = [line.strip() for line in line_text.splitlines()] # Assume the output has at least one pair like "function\nlocation\n", and # they always show up in pairs. # Example: "handle_request\n # common/usb_pd_protocol.c:1191\n" assert len(lines) >= 2 and len(lines) % 2 == 0 line_infos = [] for index in range(0, len(lines), 2): (function_name, line_text) = lines[index:index + 2] if line_text in ['??:0', ':?']: line_infos.append(None) else: result = self.ADDRTOLINE_RE.match(line_text) # Assume the output is always well-formed. assert result is not None line_infos.append((function_name.strip(), os.path.realpath(result.group('path').strip()), int(result.group('linenum')))) self.address_to_line_cache[cache_key] = line_infos return line_infos def AnalyzeDisassembly(self, disasm_text): """Parse the disassembly text, analyze, and build a map of all functions. Args: disasm_text: Disassembly text. Returns: function_map: Dict of functions. """ disasm_lines = [line.strip() for line in disasm_text.splitlines()] if 'nds' in disasm_lines[1]: analyzer = AndesAnalyzer() elif 'arm' in disasm_lines[1]: analyzer = ArmAnalyzer() else: raise StackAnalyzerError('Unsupported architecture.') # Example: "08028c8c :" function_signature_regex = re.compile( r'^(?P
[0-9A-Fa-f]+)\s+<(?P[^>]+)>:$') def DetectFunctionHead(line): """Check if the line is a function head. Args: line: Text of disassembly. Returns: symbol: Function symbol. None if it isn't a function head. """ result = function_signature_regex.match(line) if result is None: return None address = int(result.group('address'), 16) symbol = symbol_map.get(address) # Check if the function exists and matches. if symbol is None or symbol.symtype != 'F': return None return symbol # Build symbol map, indexed by symbol address. symbol_map = {} for symbol in self.symbols: # If there are multiple symbols with same address, keeping any of them is # good enough. symbol_map[symbol.address] = symbol # Parse the disassembly text. We update the variable "line" to next line # when needed. There are two steps of parser: # # Step 1: Searching for the function head. Once reach the function head, # move to the next line, which is the first line of function body. # # Step 2: Parsing each instruction line of function body. Once reach a # non-instruction line, stop parsing and analyze the parsed instructions. # # Finally turn back to the step 1 without updating the line, because the # current non-instruction line can be another function head. function_map = {} # The following three variables are the states of the parsing processing. # They will be initialized properly during the state changes. function_symbol = None function_end = None instructions = [] # Remove heading and tailing spaces for each line. line_index = 0 while line_index < len(disasm_lines): # Get the current line. line = disasm_lines[line_index] if function_symbol is None: # Step 1: Search for the function head. function_symbol = DetectFunctionHead(line) if function_symbol is not None: # Assume there is no empty function. If the function head is followed # by EOF, it is an empty function. assert line_index + 1 < len(disasm_lines) # Found the function head, initialize and turn to the step 2. instructions = [] # If symbol size exists, use it as a hint of function size. if function_symbol.size > 0: function_end = function_symbol.address + function_symbol.size else: function_end = None else: # Step 2: Parse the function body. instruction = analyzer.ParseInstruction(line, function_end) if instruction is not None: instructions.append(instruction) if instruction is None or line_index + 1 == len(disasm_lines): # Either the invalid instruction or EOF indicates the end of the # function, finalize the function analysis. # Assume there is no empty function. assert len(instructions) > 0 (stack_frame, callsites) = analyzer.AnalyzeFunction(function_symbol, instructions) # Assume the function addresses are unique in the disassembly. assert function_symbol.address not in function_map function_map[function_symbol.address] = Function( function_symbol.address, function_symbol.name, stack_frame, callsites) # Initialize and turn back to the step 1. function_symbol = None # If the current line isn't an instruction, it can be another function # head, skip moving to the next line. if instruction is None: continue # Move to the next line. line_index += 1 # Resolve callees of functions. for function in function_map.values(): for callsite in function.callsites: if callsite.target is not None: # Remain the callee as None if we can't resolve it. callsite.callee = function_map.get(callsite.target) return function_map def MapAnnotation(self, function_map, signature_set): """Map annotation signatures to functions. Args: function_map: Function map. signature_set: Set of annotation signatures. Returns: Map of signatures to functions, map of signatures which can't be resolved. """ # Build the symbol map indexed by symbol name. If there are multiple symbols # with the same name, add them into a set. (e.g. symbols of static function # with the same name) symbol_map = collections.defaultdict(set) for symbol in self.symbols: if symbol.symtype == 'F': # Function symbol. result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) if result is not None: function = function_map.get(symbol.address) # Ignore the symbol not in disassembly. if function is not None: # If there are multiple symbol with the same name and point to the # same function, the set will deduplicate them. symbol_map[result.group('name').strip()].add(function) # Build the signature map indexed by annotation signature. signature_map = {} sig_error_map = {} symbol_path_map = {} for sig in signature_set: (name, path, _) = sig functions = symbol_map.get(name) if functions is None: sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND continue if name not in symbol_path_map: # Lazy symbol path resolving. Since the addr2line isn't fast, only # resolve needed symbol paths. group_map = collections.defaultdict(list) for function in functions: line_info = self.AddressToLine(function.address)[0] if line_info is None: continue (_, symbol_path, _) = line_info # Group the functions with the same symbol signature (symbol name + # symbol path). Assume they are the same copies and do the same # annotation operations of them because we don't know which copy is # indicated by the users. group_map[symbol_path].append(function) symbol_path_map[name] = group_map # Symbol matching. function_group = None group_map = symbol_path_map[name] if len(group_map) > 0: if path is None: if len(group_map) > 1: # There is ambiguity but the path isn't specified. sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS continue # No path signature but all symbol signatures of functions are same. # Assume they are the same functions, so there is no ambiguity. (function_group,) = group_map.values() else: function_group = group_map.get(path) if function_group is None: sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND continue # The function_group is a list of all the same functions (according to # our assumption) which should be annotated together. signature_map[sig] = function_group return (signature_map, sig_error_map) def LoadAnnotation(self): """Load annotation rules. Returns: Map of add rules, set of remove rules, set of text signatures which can't be parsed. """ # Assume there is no ":" in the path. # Example: "get_range.lto.2501[driver/accel_kionix.c:327]" annotation_signature_regex = re.compile( r'^(?P[^\[]+)(\[(?P[^:]+)(:(?P\d+))?\])?$') def NormalizeSignature(signature_text): """Parse and normalize the annotation signature. Args: signature_text: Text of the annotation signature. Returns: (function name, path, line number) of the signature. The path and line number can be None if not exist. None if failed to parse. """ result = annotation_signature_regex.match(signature_text.strip()) if result is None: return None name_result = self.FUNCTION_PREFIX_NAME_RE.match( result.group('name').strip()) if name_result is None: return None path = result.group('path') if path is not None: path = os.path.realpath(path.strip()) linenum = result.group('linenum') if linenum is not None: linenum = int(linenum.strip()) return (name_result.group('name').strip(), path, linenum) def ExpandArray(dic): """Parse and expand a symbol array Args: dic: Dictionary for the array annotation Returns: array of (symbol name, None, None). """ # TODO(drinkcat): This function is quite inefficient, as it goes through # the symbol table multiple times. begin_name = dic['name'] end_name = dic['name'] + "_end" offset = dic['offset'] if 'offset' in dic else 0 stride = dic['stride'] begin_address = None end_address = None for symbol in self.symbols: if (symbol.name == begin_name): begin_address = symbol.address if (symbol.name == end_name): end_address = symbol.address if (not begin_address or not end_address): return None output = [] # TODO(drinkcat): This is inefficient as we go from address to symbol # object then to symbol name, and later on we'll go back from symbol name # to symbol object. for addr in range(begin_address+offset, end_address, stride): # TODO(drinkcat): Not all architectures need to drop the first bit. val = self.rodata[(addr-self.rodata_offset) // 4] & 0xfffffffe name = None for symbol in self.symbols: if (symbol.address == val): result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) name = result.group('name') break if not name: raise StackAnalyzerError('Cannot find function for address %s.', hex(val)) output.append((name, None, None)) return output add_rules = collections.defaultdict(set) remove_rules = list() invalid_sigtxts = set() if 'add' in self.annotation and self.annotation['add'] is not None: for src_sigtxt, dst_sigtxts in self.annotation['add'].items(): src_sig = NormalizeSignature(src_sigtxt) if src_sig is None: invalid_sigtxts.add(src_sigtxt) continue for dst_sigtxt in dst_sigtxts: if isinstance(dst_sigtxt, dict): dst_sig = ExpandArray(dst_sigtxt) if dst_sig is None: invalid_sigtxts.add(str(dst_sigtxt)) else: add_rules[src_sig].update(dst_sig) else: dst_sig = NormalizeSignature(dst_sigtxt) if dst_sig is None: invalid_sigtxts.add(dst_sigtxt) else: add_rules[src_sig].add(dst_sig) if 'remove' in self.annotation and self.annotation['remove'] is not None: for sigtxt_path in self.annotation['remove']: if isinstance(sigtxt_path, str): # The path has only one vertex. sigtxt_path = [sigtxt_path] if len(sigtxt_path) == 0: continue # Generate multiple remove paths from all the combinations of the # signatures of each vertex. sig_paths = [[]] broken_flag = False for sigtxt_node in sigtxt_path: if isinstance(sigtxt_node, str): # The vertex has only one signature. sigtxt_set = {sigtxt_node} elif isinstance(sigtxt_node, list): # The vertex has multiple signatures. sigtxt_set = set(sigtxt_node) else: # Assume the format of annotation is verified. There should be no # invalid case. assert False sig_set = set() for sigtxt in sigtxt_set: sig = NormalizeSignature(sigtxt) if sig is None: invalid_sigtxts.add(sigtxt) broken_flag = True elif not broken_flag: sig_set.add(sig) if broken_flag: continue # Append each signature of the current node to the all previous # remove paths. sig_paths = [path + [sig] for path in sig_paths for sig in sig_set] if not broken_flag: # All signatures are normalized. The remove path has no error. remove_rules.extend(sig_paths) return (add_rules, remove_rules, invalid_sigtxts) def ResolveAnnotation(self, function_map): """Resolve annotation. Args: function_map: Function map. Returns: Set of added call edges, list of remove paths, set of eliminated callsite addresses, set of annotation signatures which can't be resolved. """ def StringifySignature(signature): """Stringify the tupled signature. Args: signature: Tupled signature. Returns: Signature string. """ (name, path, linenum) = signature bracket_text = '' if path is not None: path = os.path.relpath(path) if linenum is None: bracket_text = '[{}]'.format(path) else: bracket_text = '[{}:{}]'.format(path, linenum) return name + bracket_text (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation() signature_set = set() for src_sig, dst_sigs in add_rules.items(): signature_set.add(src_sig) signature_set.update(dst_sigs) for remove_sigs in remove_rules: signature_set.update(remove_sigs) # Map signatures to functions. (signature_map, sig_error_map) = self.MapAnnotation(function_map, signature_set) # Build the indirect callsite map indexed by callsite signature. indirect_map = collections.defaultdict(set) for function in function_map.values(): for callsite in function.callsites: if callsite.target is not None: continue # Found an indirect callsite. line_info = self.AddressToLine(callsite.address)[0] if line_info is None: continue (name, path, linenum) = line_info result = self.FUNCTION_PREFIX_NAME_RE.match(name) if result is None: continue indirect_map[(result.group('name').strip(), path, linenum)].add( (function, callsite.address)) # Generate the annotation sets. add_set = set() remove_list = list() eliminated_addrs = set() for src_sig, dst_sigs in add_rules.items(): src_funcs = set(signature_map.get(src_sig, [])) # Try to match the source signature to the indirect callsites. Even if it # can't be found in disassembly. indirect_calls = indirect_map.get(src_sig) if indirect_calls is not None: for function, callsite_address in indirect_calls: # Add the caller of the indirect callsite to the source functions. src_funcs.add(function) # Assume each callsite can be represented by a unique address. eliminated_addrs.add(callsite_address) if src_sig in sig_error_map: # Assume the error is always the not found error. Since the signature # found in indirect callsite map must be a full signature, it can't # happen the ambiguous error. assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND # Found in inline stack, remove the not found error. del sig_error_map[src_sig] for dst_sig in dst_sigs: dst_funcs = signature_map.get(dst_sig) if dst_funcs is None: continue # Duplicate the call edge for all the same source and destination # functions. for src_func in src_funcs: for dst_func in dst_funcs: add_set.add((src_func, dst_func)) for remove_sigs in remove_rules: # Since each signature can be mapped to multiple functions, generate # multiple remove paths from all the combinations of these functions. remove_paths = [[]] skip_flag = False for remove_sig in remove_sigs: # Transform each signature to the corresponding functions. remove_funcs = signature_map.get(remove_sig) if remove_funcs is None: # There is an unresolved signature in the remove path. Ignore the # whole broken remove path. skip_flag = True break else: # Append each function of the current signature to the all previous # remove paths. remove_paths = [p + [f] for p in remove_paths for f in remove_funcs] if skip_flag: # Ignore the broken remove path. continue for remove_path in remove_paths: # Deduplicate the remove paths. if remove_path not in remove_list: remove_list.append(remove_path) # Format the error messages. failed_sigtxts = set() for sigtxt in invalid_sigtxts: failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID)) for sig, error in sig_error_map.items(): failed_sigtxts.add((StringifySignature(sig), error)) return (add_set, remove_list, eliminated_addrs, failed_sigtxts) def PreprocessAnnotation(self, function_map, add_set, remove_list, eliminated_addrs): """Preprocess the annotation and callgraph. Add the missing call edges, and delete simple remove paths (the paths have one or two vertices) from the function_map. Eliminate the annotated indirect callsites. Return the remaining remove list. Args: function_map: Function map. add_set: Set of missing call edges. remove_list: List of remove paths. eliminated_addrs: Set of eliminated callsite addresses. Returns: List of remaining remove paths. """ def CheckEdge(path): """Check if all edges of the path are on the callgraph. Args: path: Path. Returns: True or False. """ for index in range(len(path) - 1): if (path[index], path[index + 1]) not in edge_set: return False return True for src_func, dst_func in add_set: # TODO(cheyuw): Support tailing call annotation. src_func.callsites.append( Callsite(None, dst_func.address, False, dst_func)) # Delete simple remove paths. remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2) edge_set = set() for function in function_map.values(): cleaned_callsites = [] for callsite in function.callsites: if ((callsite.callee,) in remove_simple or (function, callsite.callee) in remove_simple): continue if callsite.target is None and callsite.address in eliminated_addrs: continue cleaned_callsites.append(callsite) if callsite.callee is not None: edge_set.add((function, callsite.callee)) function.callsites = cleaned_callsites return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)] def AnalyzeCallGraph(self, function_map, remove_list): """Analyze callgraph. It will update the max stack size and path for each function. Args: function_map: Function map. remove_list: List of remove paths. Returns: List of function cycles. """ def Traverse(curr_state): """Traverse the callgraph and calculate the max stack usages of functions. Args: curr_state: Current state. Returns: SCC lowest link. """ scc_index = scc_index_counter[0] scc_index_counter[0] += 1 scc_index_map[curr_state] = scc_index scc_lowlink = scc_index scc_stack.append(curr_state) # Push the current state in the stack. We can use a set to maintain this # because the stacked states are unique; otherwise we will find a cycle # first. stacked_states.add(curr_state) (curr_address, curr_positions) = curr_state curr_func = function_map[curr_address] invalid_flag = False new_positions = list(curr_positions) for index, position in enumerate(curr_positions): remove_path = remove_list[index] # The position of each remove path in the state is the length of the # longest matching path between the prefix of the remove path and the # suffix of the current traversing path. We maintain this length when # appending the next callee to the traversing path. And it can be used # to check if the remove path appears in the traversing path. # TODO(cheyuw): Implement KMP algorithm to match remove paths # efficiently. if remove_path[position] is curr_func: # Matches the current function, extend the length. new_positions[index] = position + 1 if new_positions[index] == len(remove_path): # The length of the longest matching path is equal to the length of # the remove path, which means the suffix of the current traversing # path matches the remove path. invalid_flag = True break else: # We can't get the new longest matching path by extending the previous # one directly. Fallback to search the new longest matching path. # If we can't find any matching path in the following search, reset # the matching length to 0. new_positions[index] = 0 # We want to find the new longest matching prefix of remove path with # the suffix of the current traversing path. Because the new longest # matching path won't be longer than the prevous one now, and part of # the suffix matches the prefix of remove path, we can get the needed # suffix from the previous matching prefix of the invalid path. suffix = remove_path[:position] + [curr_func] for offset in range(1, len(suffix)): length = position - offset if remove_path[:length] == suffix[offset:]: new_positions[index] = length break new_positions = tuple(new_positions) # If the current suffix is invalid, set the max stack usage to 0. max_stack_usage = 0 max_callee_state = None self_loop = False if not invalid_flag: # Max stack usage is at least equal to the stack frame. max_stack_usage = curr_func.stack_frame for callsite in curr_func.callsites: callee = callsite.callee if callee is None: continue callee_state = (callee.address, new_positions) if callee_state not in scc_index_map: # Unvisited state. scc_lowlink = min(scc_lowlink, Traverse(callee_state)) elif callee_state in stacked_states: # The state is shown in the stack. There is a cycle. sub_stack_usage = 0 scc_lowlink = min(scc_lowlink, scc_index_map[callee_state]) if callee_state == curr_state: self_loop = True done_result = done_states.get(callee_state) if done_result is not None: # Already done this state and use its result. If the state reaches a # cycle, reusing the result will cause inaccuracy (the stack usage # of cycle depends on where the entrance is). But it's fine since we # can't get accurate stack usage under this situation, and we rely # on user-provided annotations to break the cycle, after which the # result will be accurate again. (sub_stack_usage, _) = done_result if callsite.is_tail: # For tailing call, since the callee reuses the stack frame of the # caller, choose the larger one directly. stack_usage = max(curr_func.stack_frame, sub_stack_usage) else: stack_usage = curr_func.stack_frame + sub_stack_usage if stack_usage > max_stack_usage: max_stack_usage = stack_usage max_callee_state = callee_state if scc_lowlink == scc_index: group = [] while scc_stack[-1] != curr_state: scc_state = scc_stack.pop() stacked_states.remove(scc_state) group.append(scc_state) scc_stack.pop() stacked_states.remove(curr_state) # If the cycle is not empty, record it. if len(group) > 0 or self_loop: group.append(curr_state) cycle_groups.append(group) # Store the done result. done_states[curr_state] = (max_stack_usage, max_callee_state) if curr_positions == initial_positions: # If the current state is initial state, we traversed the callgraph by # using the current function as start point. Update the stack usage of # the function. # If the function matches a single vertex remove path, this will set its # max stack usage to 0, which is not expected (we still calculate its # max stack usage, but prevent any function from calling it). However, # all the single vertex remove paths have been preprocessed and removed. curr_func.stack_max_usage = max_stack_usage # Reconstruct the max stack path by traversing the state transitions. max_stack_path = [curr_func] callee_state = max_callee_state while callee_state is not None: # The first element of state tuple is function address. max_stack_path.append(function_map[callee_state[0]]) done_result = done_states.get(callee_state) # All of the descendants should be done. assert done_result is not None (_, callee_state) = done_result curr_func.stack_max_path = max_stack_path return scc_lowlink # The state is the concatenation of the current function address and the # state of matching position. initial_positions = (0,) * len(remove_list) done_states = {} stacked_states = set() scc_index_counter = [0] scc_index_map = {} scc_stack = [] cycle_groups = [] for function in function_map.values(): if function.stack_max_usage is None: Traverse((function.address, initial_positions)) cycle_functions = [] for group in cycle_groups: cycle = set(function_map[state[0]] for state in group) if cycle not in cycle_functions: cycle_functions.append(cycle) return cycle_functions def Analyze(self): """Run the stack analysis. Raises: StackAnalyzerError: If disassembly fails. """ def OutputInlineStack(address, prefix=''): """Output beautiful inline stack. Args: address: Address. prefix: Prefix of each line. Returns: Key for sorting, output text """ line_infos = self.AddressToLine(address, True) if line_infos[0] is None: order_key = (None, None) else: (_, path, linenum) = line_infos[0] order_key = (linenum, path) line_texts = [] for line_info in reversed(line_infos): if line_info is None: (function_name, path, linenum) = ('??', '??', 0) else: (function_name, path, linenum) = line_info line_texts.append('{}[{}:{}]'.format(function_name, os.path.relpath(path), linenum)) output = '{}-> {} {:x}\n'.format(prefix, line_texts[0], address) for depth, line_text in enumerate(line_texts[1:]): output += '{} {}- {}\n'.format(prefix, ' ' * depth, line_text) # Remove the last newline character. return (order_key, output.rstrip('\n')) # Analyze disassembly. try: disasm_text = subprocess.check_output([self.options.objdump, '-d', self.options.elf_path], encoding='utf-8') except subprocess.CalledProcessError: raise StackAnalyzerError('objdump failed to disassemble.') except OSError: raise StackAnalyzerError('Failed to run objdump.') function_map = self.AnalyzeDisassembly(disasm_text) result = self.ResolveAnnotation(function_map) (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result remove_list = self.PreprocessAnnotation(function_map, add_set, remove_list, eliminated_addrs) cycle_functions = self.AnalyzeCallGraph(function_map, remove_list) # Print the results of task-aware stack analysis. extra_stack_frame = self.annotation.get('exception_frame_size', DEFAULT_EXCEPTION_FRAME_SIZE) for task in self.tasklist: routine_func = function_map[task.routine_address] print('Task: {}, Max size: {} ({} + {}), Allocated size: {}'.format( task.name, routine_func.stack_max_usage + extra_stack_frame, routine_func.stack_max_usage, extra_stack_frame, task.stack_max_size)) print('Call Trace:') max_stack_path = routine_func.stack_max_path # Assume the routine function is resolved. assert max_stack_path is not None for depth, curr_func in enumerate(max_stack_path): line_info = self.AddressToLine(curr_func.address)[0] if line_info is None: (path, linenum) = ('??', 0) else: (_, path, linenum) = line_info print(' {} ({}) [{}:{}] {:x}'.format(curr_func.name, curr_func.stack_frame, os.path.relpath(path), linenum, curr_func.address)) if depth + 1 < len(max_stack_path): succ_func = max_stack_path[depth + 1] text_list = [] for callsite in curr_func.callsites: if callsite.callee is succ_func: indent_prefix = ' ' if callsite.address is None: order_text = (None, '{}-> [annotation]'.format(indent_prefix)) else: order_text = OutputInlineStack(callsite.address, indent_prefix) text_list.append(order_text) for _, text in sorted(text_list, key=lambda item: item[0]): print(text) print('Unresolved indirect callsites:') for function in function_map.values(): indirect_callsites = [] for callsite in function.callsites: if callsite.target is None: indirect_callsites.append(callsite.address) if len(indirect_callsites) > 0: print(' In function {}:'.format(function.name)) text_list = [] for address in indirect_callsites: text_list.append(OutputInlineStack(address, ' ')) for _, text in sorted(text_list, key=lambda item: item[0]): print(text) print('Unresolved annotation signatures:') for sigtxt, error in failed_sigtxts: print(' {}: {}'.format(sigtxt, error)) if len(cycle_functions) > 0: print('There are cycles in the following function sets:') for functions in cycle_functions: print('[{}]'.format(', '.join(function.name for function in functions))) def ParseArgs(): """Parse commandline arguments. Returns: options: Namespace from argparse.parse_args(). """ parser = argparse.ArgumentParser(description="EC firmware stack analyzer.") parser.add_argument('elf_path', help="the path of EC firmware ELF") parser.add_argument('--export_taskinfo', required=True, help="the path of export_taskinfo.so utility") parser.add_argument('--section', required=True, help='the section.', choices=[SECTION_RO, SECTION_RW]) parser.add_argument('--objdump', default='objdump', help='the path of objdump') parser.add_argument('--addr2line', default='addr2line', help='the path of addr2line') parser.add_argument('--annotation', default=None, help='the path of annotation file') # TODO(cheyuw): Add an option for dumping stack usage of all functions. return parser.parse_args() def ParseSymbolText(symbol_text): """Parse the content of the symbol text. Args: symbol_text: Text of the symbols. Returns: symbols: Symbol list. """ # Example: "10093064 g F .text 0000015c .hidden hook_task" symbol_regex = re.compile(r'^(?P
[0-9A-Fa-f]+)\s+[lwg]\s+' r'((?P[OF])\s+)?\S+\s+' r'(?P[0-9A-Fa-f]+)\s+' r'(\S+\s+)?(?P\S+)$') symbols = [] for line in symbol_text.splitlines(): line = line.strip() result = symbol_regex.match(line) if result is not None: address = int(result.group('address'), 16) symtype = result.group('type') if symtype is None: symtype = 'O' size = int(result.group('size'), 16) name = result.group('name') symbols.append(Symbol(address, symtype, size, name)) return symbols def ParseRoDataText(rodata_text): """Parse the content of rodata Args: symbol_text: Text of the rodata dump. Returns: symbols: Symbol list. """ # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K... # 100a7294 00000000 00000000 01000000 ............ base_offset = None offset = None rodata = [] for line in rodata_text.splitlines(): line = line.strip() space = line.find(' ') if space < 0: continue try: address = int(line[0:space], 16) except ValueError: continue if not base_offset: base_offset = address offset = address elif address != offset: raise StackAnalyzerError('objdump of rodata not contiguous.') for i in range(0, 4): num = line[(space + 1 + i*9):(space + 9 + i*9)] if len(num.strip()) > 0: val = int(num, 16) else: val = 0 # TODO(drinkcat): Not all platforms are necessarily big-endian rodata.append((val & 0x000000ff) << 24 | (val & 0x0000ff00) << 8 | (val & 0x00ff0000) >> 8 | (val & 0xff000000) >> 24) offset = offset + 4*4 return (base_offset, rodata) def LoadTasklist(section, export_taskinfo, symbols): """Load the task information. Args: section: Section (RO | RW). export_taskinfo: Handle of export_taskinfo.so. symbols: Symbol list. Returns: tasklist: Task list. """ TaskInfoPointer = ctypes.POINTER(TaskInfo) taskinfos = TaskInfoPointer() if section == SECTION_RO: get_taskinfos_func = export_taskinfo.get_ro_taskinfos else: get_taskinfos_func = export_taskinfo.get_rw_taskinfos taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos)) tasklist = [] for index in range(taskinfo_num): taskinfo = taskinfos[index] tasklist.append(Task(taskinfo.name.decode('utf-8'), taskinfo.routine.decode('utf-8'), taskinfo.stack_size)) # Resolve routine address for each task. It's more efficient to resolve all # routine addresses of tasks together. routine_map = dict((task.routine_name, None) for task in tasklist) for symbol in symbols: # Resolve task routine address. if symbol.name in routine_map: # Assume the symbol of routine is unique. assert routine_map[symbol.name] is None routine_map[symbol.name] = symbol.address for task in tasklist: address = routine_map[task.routine_name] # Assume we have resolved all routine addresses. assert address is not None task.routine_address = address return tasklist def main(): """Main function.""" try: options = ParseArgs() # Load annotation config. if options.annotation is None: annotation = {} elif not os.path.exists(options.annotation): print('Warning: Annotation file {} does not exist.' .format(options.annotation)) annotation = {} else: try: with open(options.annotation, 'r') as annotation_file: annotation = yaml.safe_load(annotation_file) except yaml.YAMLError: raise StackAnalyzerError('Failed to parse annotation file {}.' .format(options.annotation)) except IOError: raise StackAnalyzerError('Failed to open annotation file {}.' .format(options.annotation)) # TODO(cheyuw): Do complete annotation format verification. if not isinstance(annotation, dict): raise StackAnalyzerError('Invalid annotation file {}.' .format(options.annotation)) # Generate and parse the symbols. try: symbol_text = subprocess.check_output([options.objdump, '-t', options.elf_path], encoding='utf-8') rodata_text = subprocess.check_output([options.objdump, '-s', '-j', '.rodata', options.elf_path], encoding='utf-8') except subprocess.CalledProcessError: raise StackAnalyzerError('objdump failed to dump symbol table or rodata.') except OSError: raise StackAnalyzerError('Failed to run objdump.') symbols = ParseSymbolText(symbol_text) rodata = ParseRoDataText(rodata_text) # Load the tasklist. try: export_taskinfo = ctypes.CDLL(options.export_taskinfo) except OSError: raise StackAnalyzerError('Failed to load export_taskinfo.') tasklist = LoadTasklist(options.section, export_taskinfo, symbols) analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation) analyzer.Analyze() except StackAnalyzerError as e: print('Error: {}'.format(e)) if __name__ == '__main__': main()