diff options
-rw-r--r-- | cts/common/board.py | 85 | ||||
-rwxr-xr-x | cts/cts.py | 272 |
2 files changed, 115 insertions, 242 deletions
diff --git a/cts/common/board.py b/cts/common/board.py index 7b618c8300..d478b1fca8 100644 --- a/cts/common/board.py +++ b/cts/common/board.py @@ -22,6 +22,7 @@ FLASH_OFFSETS = { 'nucleo-f072rb': '0x08000000', 'nucleo-f411re': '0x08000000', } +REBOOT_MARKER = 'UART initialized after reboot' class Board(object): @@ -119,13 +120,11 @@ class Board(object): with open(self.openocd_log) as log: print log.read() - def build(self, module, ec_dir, debug=False): + def build(self, module, ec_dir): """Builds test suite module for board Args: ec_dir: String of the ec directory path - debug: True means compile in debug messages when building (may - affect test results) """ cmds = ['make', '--directory=' + ec_dir, @@ -133,9 +132,6 @@ class Board(object): 'CTS_MODULE=' + self.module, '-j'] - if debug: - cmds.append('CTS_DEBUG=TRUE') - rv = 1 with open(self.build_log, 'a') as output: rv = sp.call(cmds, stdout=output, stderr=sp.STDOUT) @@ -167,11 +163,11 @@ class Board(object): return s def reset(self): - """Reset board (used when can't connect to TTY)""" - return self.send_open_ocd_commands(['init', 'reset init', 'resume']) + """Reset then halt board """ + return self.send_open_ocd_commands(['init', 'reset halt']) def setup_tty(self): - """Call this before trying to call readOutput for the first time. + """Call this before calling read_tty for the first time. This is not in the initialization because caller only should call this function after serial numbers are setup """ @@ -179,68 +175,67 @@ class Board(object): self.reset() self.identify_tty_port() - # In testing 3 retries is enough to reset board (2 can fail) - num_file_setup_retries = 3 - # In testing, 10 seconds is sufficient to allow board to reconnect - reset_wait_time_seconds = 10 tty = None try: tty = self.open_tty() - # If board was just connected, must be reset to be read from except (IOError, OSError): - for i in range(num_file_setup_retries): - self.reset() - time.sleep(reset_wait_time_seconds) - try: - tty = self.open_tty() - break - except (IOError, OSError): - continue - if not tty: raise ValueError('Unable to read ' + self.board + '. If you are running ' 'cat on a ttyACMx file, please kill that process and ' 'try again') self.tty = tty - def read_tty(self): + def read_tty(self, max_boot_count=1): """Read info from a serial port described by a file descriptor + Args: + max_boot_count: Stop reading if boot count exceeds this number + Return: - Bytes that UART has output + result: characters read from tty + boot: boot counts """ buf = [] + line = [] + boot = 0 while True: if select.select([self.tty], [], [], 1)[0]: - buf.append(os.read(self.tty, 1)) + c = os.read(self.tty, 1) else: break + line.append(c) + if c == '\n': + l = ''.join(line) + buf.append(l) + if REBOOT_MARKER in l: + boot += 1 + line = [] + if boot > max_boot_count: + break + + l = ''.join(line) + buf.append(l) result = ''.join(buf) - return result + + return result, boot def identify_tty_port(self): """Saves this board's serial port""" dev_dir = '/dev' id_prefix = 'ID_SERIAL_SHORT=' - num_reset_tries = 3 - reset_wait_time_s = 10 com_devices = [f for f in os.listdir(dev_dir) if f.startswith('ttyACM')] - for i in range(num_reset_tries): - for device in com_devices: - self.tty_port = os.path.join(dev_dir, device) - properties = sp.check_output(['udevadm', - 'info', - '-a', - '-n', - self.tty_port, - '--query=property']) - for line in [l.strip() for l in properties.split('\n')]: - if line.startswith(id_prefix): - if self.hla_serial == line[len(id_prefix):]: - return - if i != num_reset_tries - 1: # No need to reset the board the last time - self.reset() # May need to reset to connect - time.sleep(reset_wait_time_s) + for device in com_devices: + self.tty_port = os.path.join(dev_dir, device) + properties = sp.check_output(['udevadm', + 'info', + '-a', + '-n', + self.tty_port, + '--query=property']) + for line in [l.strip() for l in properties.split('\n')]: + if line.startswith(id_prefix): + if self.hla_serial == line[len(id_prefix):]: + return # If we get here without returning, something is wrong raise RuntimeError('The device dev path could not be found') diff --git a/cts/cts.py b/cts/cts.py index 1ae4fb13c2..0c6a23cd7c 100755 --- a/cts/cts.py +++ b/cts/cts.py @@ -12,19 +12,16 @@ # # To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables # to the host and execute the script: -# $ ./cts.py --debug +# $ ./cts.py # It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR. import argparse -import collections -import os -import time +from collections import defaultdict import common.board as board -from copy import deepcopy -import xml.etree.ElementTree as et -from twisted.python.syslog import DEFAULT_FACILITY +import os import shutil +import time CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly @@ -35,8 +32,6 @@ CTS_COLOR_GREEN = '#7dfb9f' DEFAULT_TH = 'stm32l476g-eval' DEFAULT_DUT = 'nucleo-f072rb' MAX_SUITE_TIME_SEC = 5 -CTS_DEBUG_START = '[DEBUG]' -CTS_DEBUG_END = '[DEBUG_END]' CTS_TEST_RESULT_DIR = '/tmp/ects' @@ -49,26 +44,18 @@ class Cts(object): th: TestHarness object representing th module: Name of module to build/run tests for test_names: List of strings of test names contained in given module - test_results: Dictionary of results of each test from module, with - keys being test name strings and values being test result integers return_codes: Dict of strings of return codes, with a code's integer value being the index for the corresponding string representation - debug: Boolean that indicates whether or not on-board debug message - printing should be enabled when building. - debug_output: Dictionary mapping test name to an array contain debug - messages sent while it was running """ - def __init__(self, ec_dir, th, dut, module, debug=False): + def __init__(self, ec_dir, th, dut, module): """Initializes cts class object with given arguments. Args: - dut: Name of Device Under Test (DUT) board - ec_dir: String path to ec directory - dut: Name of board to use for DUT - module: Name of module to build/run tests for - debug: Boolean that indicates whether or not on-board debug message - printing should be enabled. + ec_dir: Path to ec directory + th: Name of the test harness board + dut: Name of the device under test board + module: Name of module to build/run tests for (e.g. gpio, interrupt) """ self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module) if os.path.isdir(self.results_dir): @@ -77,7 +64,6 @@ class Cts(object): os.makedirs(self.results_dir) self.ec_dir = ec_dir self.module = module - self.debug = debug serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial') self.th = board.TestHarness(th, module, self.results_dir, serial_path) self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir) @@ -85,25 +71,20 @@ class Cts(object): testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist') self.test_names = Cts.get_macro_args(testlist_path, 'CTS_TEST') - self.debug_output = {} - for test in self.test_names: - self.debug_output[test] = [] - return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc') self.return_codes = dict(enumerate(Cts.get_macro_args( return_codes_path, 'CTS_RC_'))) self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT' self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED' - self.test_results = collections.OrderedDict() def build(self): """Build images for DUT and TH""" print 'Building DUT image...' - if not self.dut.build(self.module, self.ec_dir, self.debug): + if not self.dut.build(self.module, self.ec_dir): raise RuntimeError('Building module %s for DUT failed' % (self.module)) print 'Building TH image...' - if not self.th.build(self.module, self.ec_dir, self.debug): + if not self.th.build(self.module, self.ec_dir): raise RuntimeError('Building module %s for TH failed' % (self.module)) def flash_boards(self): @@ -146,176 +127,69 @@ class Cts(object): args.append(l.strip('()').replace(',', '')) return args - def extract_debug_output(self, output): - """Append the debug messages from output to self.debug_output + def parse_output(self, output): + results = defaultdict(lambda: CTS_CORRUPTED_CODE) - Args: - output: String containing output from which to extract debug - messages - """ - lines = [ln.strip() for ln in output.split('\n')] - test_num = 0 - i = 0 - message_buf = [] - while i < len(lines): - if test_num >= len(self.test_names): - break - if lines[i].strip() == CTS_DEBUG_START: - i += 1 - msg = '' - while i < len(lines): - if lines[i] == CTS_DEBUG_END: - break - else: - msg += lines[i] + '\n' - i += 1 - message_buf.append(msg) - else: - current_test = self.test_names[test_num] - if lines[i].strip().startswith(current_test): - self.debug_output[current_test] += message_buf - message_buf = [] - test_num += 1 - i += 1 - - def parse_output(self, r1, r2): - """Parse the outputs of the DUT and TH together + for ln in [ln.strip() for ln in output.split('\n')]: + tokens = ln.split() + if len(tokens) != 2: + continue + test_name = tokens[0].strip() + if test_name not in self.test_names: + continue + try: + return_code = int(tokens[1]) + except ValueError: # Second token is not an int + continue + results[test_name] = return_code - Args; - r1: String output of one of the DUT or the TH (order does not matter) - r2: String output of one of the DUT or the TH (order does not matter) - """ - self.test_results.clear() # empty out any old results + return results - first_corrupted_test = len(self.test_names) + def get_return_code_name(self, code): + return self.return_codes.get(code, '%d' % code) - self.extract_debug_output(r1) - self.extract_debug_output(r2) + def evaluate_run(self, dut_output, th_output): + """Parse outputs to derive test results - for output_str in [r1, r2]: - test_num = 0 - for ln in [ln.strip() for ln in output_str.split('\n')]: - tokens = ln.split() - if len(tokens) != 2: - continue - test = tokens[0].strip() - if test not in self.test_names: - continue - try: - return_code = int(tokens[1]) - except ValueError: # Second token is not an int - continue - if test != self.test_names[test_num]: - first_corrupted_test = test_num - break # Results after this test are corrupted - elif self.test_results.get( - test, - CTS_SUCCESS_CODE) == CTS_SUCCESS_CODE: - self.test_results[test] = return_code - elif return_code == CTS_SUCCESS_CODE: - pass - elif return_code != self.test_results[test]: - self.test_results[test] = CTS_CONFLICTING_CODE - test_num += 1 - - if test_num != len(self.test_names): # If a suite didn't finish - first_corrupted_test = min(first_corrupted_test, test_num) - - if first_corrupted_test < len(self.test_names): - for test in self.test_names[first_corrupted_test:]: - self.test_results[test] = CTS_CORRUPTED_CODE - - def _results_as_string(self): - """Takes saved results and returns a duplicate of their dictionary - with the return codes replaces with their string representation - - Returns: - dictionary with test name strings as keys and test result strings - as values + Args; + dut_output: String output of DUT + th_output: String output of TH """ - result = deepcopy(self.test_results) - # Convert codes to strings - for test, code in result.items(): - result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code) - return result + dut_results = self.parse_output(dut_output) + th_results = self.parse_output(th_output) - def prettify_results(self): - """Takes saved results and returns a string representation of them + len_test_name = max(len(s) for s in self.test_names) + len_code_name = max(len(s) for s in self.return_codes.values()) - Return: Dictionary similar to self.test_results, but with strings - instead of error codes - """ - res = self._results_as_string() - t_long = max(len(s) for s in res.keys()) - e_max_len = max(len(s) for s in res.values()) - - pretty_results = 'CTS Test Results for ' + self.module + ' module:\n' - - for test, code in res.items(): - align_str = '\n{0:<' + str(t_long) + \ - '} {1:>' + str(e_max_len) + '}' - pretty_results += align_str.format(test, code) - - return pretty_results - - def results_as_html(self): - res = self._results_as_string() - root = et.Element('html') - head = et.SubElement(root, 'head') - style = et.SubElement(head, 'style') - style.text = ('table, td, th {border: 1px solid black;}' - 'body {font-family: \"Lucida Console\", Monaco, monospace') - body = et.SubElement(root, 'body') - table = et.SubElement(body, 'table') - table.set('style','width:100%') - title_row = et.SubElement(table, 'tr') - test_name_title = et.SubElement(title_row, 'th') - test_name_title.text = 'Test Name' - test_name_title.set('style', 'white-space : nowrap') - test_results_title = et.SubElement(title_row, 'th') - test_results_title.text = 'Test Result' - test_results_title.set('style', 'white-space : nowrap') - test_debug_title = et.SubElement(title_row, 'th') - test_debug_title.text = 'Debug Output' - test_debug_title.set('style', 'width:99%') - - for name, result in res.items(): - row = et.SubElement(table, 'tr') - name_e = et.SubElement(row, 'td') - name_e.text = name - name_e.set('style', 'white-space : nowrap') - result_e = et.SubElement(row, 'td') - result_e.text = result - result_e.set('style', 'white-space : nowrap') - debug_e = et.SubElement(row, 'td') - debug_e.set('style', 'width:99%') - debug_e.set('style', 'white-space : pre-wrap') - if len(self.debug_output[name]) == 0: - debug_e.text = 'None' - else: - combined_message = '' - for msg in self.debug_output[name]: - combined_message += msg - combined_message = combined_message - debug_e.text = combined_message - if result == self.return_codes[CTS_SUCCESS_CODE]: - result_e.set('bgcolor', CTS_COLOR_GREEN) - else: - result_e.set('bgcolor', CTS_COLOR_RED) - - return et.tostring(root, method='html') + head = '{:^' + str(len_test_name) + '} ' + head += '{:^' + str(len_code_name) + '} ' + head += '{:^' + str(len_code_name) + '}\n' + fmt = '{:' + str(len_test_name) + '} ' + fmt += '{:>' + str(len_code_name) + '} ' + fmt += '{:>' + str(len_code_name) + '}\n' + + self.formatted_results = head.format('test name', 'TH result', 'DUT result') + for test_name in self.test_names: + th_cn = self.get_return_code_name(th_results[test_name]) + dut_cn = self.get_return_code_name(dut_results[test_name]) + self.formatted_results += fmt.format(test_name, th_cn, dut_cn) def run(self): """Resets boards, records test results in results dir""" + print 'Reading serials...' self.identify_boards() + print 'Opening DUT tty...' self.dut.setup_tty() + print 'Opening TH tty...' self.th.setup_tty() # Boards might be still writing to tty. Wait a few seconds before flashing. time.sleep(3) # clear buffers + print 'Clearing DUT tty...' self.dut.read_tty() + print 'Clearing TH tty...' self.th.read_tty() # Resets the boards and allows them to run tests @@ -337,32 +211,40 @@ class Cts(object): time.sleep(MAX_SUITE_TIME_SEC) - dut_results = self.dut.read_tty() - th_results = self.th.read_tty() + print 'Reading DUT tty...' + dut_output, dut_boot = self.dut.read_tty() + print 'Reading TH tty...' + th_output, th_boot = self.th.read_tty() - if not dut_results or not th_results: + print 'Halting TH...' + if not self.th.send_open_ocd_commands(['init', 'reset halt']): + raise RuntimeError('Failed to halt TH') + print 'Halting DUT...' + if not self.dut.send_open_ocd_commands(['init', 'reset halt']): + raise RuntimeError('Failed to halt DUT') + + if not dut_output or not th_output: raise ValueError('Output missing from boards. If you have a process ' 'reading ttyACMx, please kill that process and try ' 'again.') - self.parse_output(dut_results, th_results) - pretty_results = self.prettify_results() - html_results = self.results_as_html() + print 'Pursing results...' + self.evaluate_run(dut_output, th_output) - # Write results in html - dest = os.path.join(self.results_dir, 'results.html') + # Write results + dest = os.path.join(self.results_dir, 'results.log') with open(dest, 'w') as fl: - fl.write(html_results) + fl.write(self.formatted_results) # Write UART outputs dest = os.path.join(self.results_dir, 'uart_th.log') with open(dest, 'w') as fl: - fl.write(th_results) + fl.write(th_output) dest = os.path.join(self.results_dir, 'uart_dut.log') with open(dest, 'w') as fl: - fl.write(dut_results) + fl.write(dut_output) - print pretty_results + print self.formatted_results def main(): @@ -381,10 +263,6 @@ def main(): parser.add_argument('-m', '--module', help='Specify module you want to build/flash') - parser.add_argument('--debug', - action='store_true', - help=('If building, build with debug printing enabled. ' - 'This may change test results')) parser.add_argument('-s', '--setup', action='store_true', @@ -410,7 +288,7 @@ def main(): if args.dut: dut = args.dut - cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module, debug=args.debug) + cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module) if args.setup: cts.setup() |