summaryrefslogtreecommitdiff
path: root/cts/cts.py
blob: 8eb57f6100ce1b77f3ecb078dfee6cc59eae05d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
#!/usr/bin/python2
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# This file is a utility to quickly flash boards

import argparse
import collections
import fcntl
import os
import select
import subprocess as sp
import time

# For most tests, error codes should never conflict
CTS_CONFLICTING_CODE = -1
CTS_SUCCESS_CODE = 0


class Cts(object):
  """Class that represents a CTS testing setup and provides
  interface to boards (building, flashing, etc.)

  Attributes:
    ocd_script_dir: String containing locations of openocd's config files
    th_board: String containing name of the Test Harness (th) board
    results_dir: String containing test output directory path
    dut_board: Name of Device Under Test (DUT) board
    module: Name of module to build/run tests for
    ec_directory: String containing path to EC top level directory
    th_hla: String containing hla_serial for the th
    dut_hla: String containing hla_serial for the dut, only used for
    boards which have an st-link v2.1 debugger
    th_ser_path: String which contains full path to th serial file
    test_names: List of strings of test names contained in given module
    test_results: Dictionary of results of each test from module
    return_codes: List of strings of return codes, with a code's integer
    value being the index for the corresponding string representation
  """

  def __init__(self, ec_dir, dut_board='nucleo-f072rb', module='gpio'):
    """Initializes cts class object with given arguments.

    Args:
      dut_board: Name of Device Under Test (DUT) board
      module: Name of module to build/run tests for
    """
    self.ocd_script_dir = '/usr/local/share/openocd/scripts'
    self.th_board = 'stm32l476g-eval'
    self.results_dir = '/tmp/cts_results'
    self.dut_board = dut_board
    self.module = module
    self.ec_directory = ec_dir
    self.th_hla = ''
    self.dut_hla = ''
    self.th_ser_path = os.path.join(
      self.ec_directory,
      'build',
      self.th_board,
      'th_hla_serial')
    testlist_path = os.path.join(
      self.ec_directory,
      'cts',
      self.module,
      'cts.testlist')
    self.test_names = self.getMacroArgs(testlist_path, 'CTS_TEST')
    return_codes_path = os.path.join(self.ec_directory,
                    'cts',
                    'common',
                    'cts.rc')
    self.return_codes = self.getMacroArgs(
      return_codes_path, 'CTS_RC_')
    self.test_results = collections.OrderedDict()

  def set_dut_board(self, brd):
    """Sets the dut_board instance variable

    Args:
      brd: String of board name
    """
    self.dut_board = brd

  def set_module(self, mod):
    """Sets the module instance variable

    Args:
      brd: String of board name
    """
    self.module = mod

  def make(self):
    """Builds test suite module for given th/dut boards"""
    print 'Building module \'' + self.module + '\' for th ' + self.th_board
    sp.call(['make',
         '--directory=' + str(self.ec_directory),
         'BOARD=' + self.th_board,
         'CTS_MODULE=' + self.module,
         '-j'])

    print 'Building module \'' + self.module + '\' for dut ' + self.dut_board
    sp.call(['make',
         '--directory=' + str(self.ec_directory),
         'BOARD=' + self.dut_board,
         'CTS_MODULE=' + self.module,
         '-j'])

  def openocdCmd(self, command_list, board):
    """Sends the specified commands to openocd for a board

    Args:
      board: String that contains board name
    """

    board_cfg = self.getBoardConfigName(board)

    args = ['openocd', '-s', self.ocd_script_dir,
        '-f', board_cfg]
    for cmd in command_list:
      args.append('-c')
      args.append(cmd)
    args.append('-c')
    args.append('shutdown')
    sp.call(args)

  def getStLinkSerialNumbers(self):
    """Gets serial numbers of all st-link v2.1 board attached to host

    Returns:
      List of serials
    """
    usb_args = ['lsusb', '-v', '-d', '0x0483:0x374b']
    usb_process = sp.Popen(usb_args, stdout=sp.PIPE, shell=False)
    st_link_info = usb_process.communicate()[0]
    st_serials = []
    for line in st_link_info.split('\n'):
      if 'iSerial' in line:
        st_serials.append(line.split()[2])
    return st_serials

  # params: th_hla_serial is your personal th board's serial
  def saveDutSerial(self):
    """If dut uses same debugger as th, save its serial"""
    stlink_serials = self.getStLinkSerialNumbers()
    if len(stlink_serials) == 1:  # dut doesn't use same debugger
      return ''
    elif len(stlink_serials) == 2:
      dut = [s for s in stlink_serials if self.th_hla not in s]
      if len(dut) != 1:
        raise RuntimeError('Incorrect TH hla_serial')
      else:
        return dut[0]  # Found your other st-link device serial!
    else:
      msg = ('Please connect TH and your DUT\n'
           'and remove all other st-link devices')
      raise RuntimeError(msg)

  def saveThSerial(self):
    """Saves the th serial number to a file located at th_ser_path

    Return: the serial number saved
    """
    serial = self.getStLinkSerialNumbers()
    if len(serial) != 1:
      msg = ('TH could not be identified.\n'
           '\nConnect your TH and remove other st-link devices')
      raise RuntimeError(msg)
    else:
      ser = serial[0]
      if not os.path.exists(os.path.dirname(self.th_ser_path)):
        os.makedirs(os.path.dirname(self.th_ser_path))
      with open(self.th_ser_path, mode='w') as ser_f:
        ser_f.write(ser)
      return ser

  def getBoardConfigName(self, board):
    """Gets the path for the config file relative to the
    openocd scripts directory

    Args:
      board: String containing name of board to get the config file for

    Returns: String containing relative path to board config file
    """
    board_config_locs = {
      'stm32l476g-eval': 'board/stm32l4discovery.cfg',
      'nucleo-f072rb': 'board/st_nucleo_f0.cfg'
    }

    try:
      cfg = board_config_locs[board]
      return cfg
    except KeyError:
      raise ValueError(
        'The config file for board ' +
        board +
        ' was not found')

  def flashBoards(self):
    """Flashes th and dut boards with their most recently build ec.bin"""
    self.updateSerials()
    th_flash_cmds = [
      'hla_serial ' +
      self.th_hla,
      'reset_config connect_assert_srst',
      'init',
      'reset init',
      'flash write_image erase build/' +
      self.th_board +
      '/ec.bin 0x08000000',
      'reset halt']

    dut_flash_cmds = [
      'hla_serial ' +
      self.dut_hla,
      'reset_config connect_assert_srst',
      'init',
      'reset init',
      'flash write_image erase build/' +
      self.dut_board +
      '/ec.bin 0x08000000',
      'reset halt']

    self.openocdCmd(th_flash_cmds, self.th_board)
    self.openocdCmd(dut_flash_cmds, self.dut_board)
    self.openocdCmd(['hla_serial ' + self.th_hla,
             'init',
             'reset init',
             'resume'],
              self.th_board)
    self.openocdCmd(['hla_serial ' + self.dut_hla,
             'init',
             'reset init',
             'resume'],
             self.dut_board)

  def updateSerials(self):
    """Updates serial #s for th and dut"""
    try:
      with open(self.th_ser_path) as th_f:
        self.th_hla = th_f.read()
    except IOError:
      msg = ('Your th hla_serial may not have been saved.\n'
             'Connect only your th and run ./cts --setup, then try again.')
      raise RuntimeError(msg)
    self.saveDutSerial()

  def resetBoards(self):
    """Resets the boards and allows them to run tests"""
    self.updateSerials()
    self.openocdCmd(['hla_serial ' + self.dut_hla,
             'init', 'reset init'], self.dut_board)
    self.openocdCmd(['hla_serial ' + self.th_hla,
             'init', 'reset init'], self.th_board)
    self.openocdCmd(['hla_serial ' + self.th_hla,
             'init', 'resume'], self.th_board)
    self.openocdCmd(['hla_serial ' + self.dut_hla,
             'init', 'resume'], self.dut_board)

  def readAvailableBytes(self, fd):
    """Read info from a serial port described by a file descriptor

    Args:
      fd: file descriptor for device ttyACM file
    """
    buf = []
    while True:
      if select.select([fd], [], [], 1)[0]:
        buf.append(os.read(fd, 1))
      else:
        break
    result = ''.join(buf)
    return result

  def getDevFileDescriptor(self, path):
    """Read available bytes from device dev path

    Args:
      path: The serial device file path to read from

    Return: the file descriptor for the open serial device file
    """
    fd = os.open(path, os.O_RDONLY)
    flag = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    return fd

  def getDevFilenames(self):
    """Read available bytes from device dev path

    Args:
      path: The serial device file path to read from

    Return: the file descriptor for the open serial device file
    """
    com_files = [f for f in os.listdir('/dev/') if f.startswith('ttyACM')]
    if len(com_files) < 2:
      raise RuntimeError('The device dev paths could not be found')
    elif len(com_files) > 2:
      raise RuntimeError('Too many serial devices connected to host')
    else:
      return ('/dev/' + com_files[0], '/dev/' + com_files[1])

  def getMacroArgs(self, filepath, macro):
    """Get list of args of a certain macro in a file when macro is used
    by itself on a line

    Args:
      filepath: String containing absolute path to the file
      macro: String containing text of macro to get args of
    """
    args = []
    with open(filepath, 'r') as fl:
      for ln in [ln for ln in fl.readlines(
      ) if ln.strip().startswith(macro)]:
        ln = ln.strip()[len(macro):]
        args.append(ln.strip('()').replace(',',''))
    return args

  def parseOutput(self, r1, r2):
    """Parse the outputs of the DUT and TH together

    Args;
      r1: String output of one of the DUT or the TH (order does not matter)
      r2: String output of one of the DUT or the TH (order does not matter)
    """
    self.test_results.clear()  # empty out any old results

    for output_str in [r1, r2]:
      for ln in [ln.strip() for ln in output_str.split('\n')]:
        tokens = ln.split()
        if len(tokens) != 2:
          continue
        elif tokens[0].strip() not in self.test_names:
          continue
        elif tokens[0] in self.test_results.keys():
          if self.test_results[tokens[0]] != int(tokens[1]):
            if self.test_results[tokens[0]] == CTS_SUCCESS_CODE:
              self.test_results[tokens[0]] = int(tokens[1])
            elif int(tokens[1]) == CTS_SUCCESS_CODE:
              continue
            else:
              self.test_results[tokens[0]] = CTS_CONFLICTING_CODE
          else:
            continue
        else:
          self.test_results[tokens[0]] = int(tokens[1])

    # Convert codes to strings
    for test, code in self.test_results.items():
      if code == CTS_CONFLICTING_CODE:
        self.test_results[test] = 'RESULTS CONFLICT'
      self.test_results[test] = self.return_codes[code]

    for tn in self.test_names:
      if tn not in self.test_results.keys():
        self.test_results[tn] = 'NO RESULT RETURNED' # Exceptional case

  def resultsAsString(self):
    """Takes saved results and returns a string representation of them

    Return: Saved string that contains results
    """
    t_long = max(len(s) for s in self.test_results.keys())
    e_max_len = max(len(s) for s in self.test_results.values())

    pretty_results = 'CTS Test Results for ' + self.module + ' module:\n'

    for test, code in self.test_results.items():
      align_str = '\n{0:<' + str(t_long) + \
        '} {1:>' + str(e_max_len) + '}'
      pretty_results += align_str.format(test, code)

    return pretty_results

  def resetAndRecord(self):
    """Resets boards, records test results in results dir"""

    self.resetBoards()
    # Doesn't matter which is dut or th because we combine their results
    d1, d2 = self.getDevFilenames()

    try:
      fd1 = self.getDevFileDescriptor(d1)
      fd2 = self.getDevFileDescriptor(d2)
    except:  # If board was just connected, must be reset to be read from
      for i in range(3):
        self.resetBoards()
        time.sleep(10)
        try:
          fd1 = self.getDevFileDescriptor(d1)
          fd2 = self.getDevFileDescriptor(d2)
          break
        except:
          continue

    self.readAvailableBytes(fd1)  # clear any junk from buffer
    self.readAvailableBytes(fd2)
    self.resetBoards()
    time.sleep(3)
    res1 = self.readAvailableBytes(fd1)
    res2 = self.readAvailableBytes(fd2)
    if len(res1) == 0 or len(res2) == 0:
      raise ValueError('Output missing from boards.\n'
                       'If you are running cat on a ttyACMx file,\n'
                       'please kill that process and try again')
    self.parseOutput(res1, res2)
    pretty_results = self.resultsAsString()

    dest = os.path.join(
      self.results_dir,
      self.dut_board,
      self.module + '.txt')
    if not os.path.exists(os.path.dirname(dest)):
      os.makedirs(os.path.dirname(dest))

    with open(dest, 'w') as fl:
      fl.write(pretty_results)

    print pretty_results

def main():
  """Main entry point for cts script from command line"""
  ec_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
  os.chdir(ec_dir)

  cts_suite = Cts(ec_dir)
  dut_board = 'nucleo-f072rb'  # nucleo by default
  module = 'gpio'  # gpio by default

  parser = argparse.ArgumentParser(description='Used to build/flash boards')
  parser.add_argument('-d',
            '--dut',
            help='Specify DUT you want to build/flash')
  parser.add_argument('-m',
            '--module',
            help='Specify module you want to build/flash')
  parser.add_argument('-s',
            '--setup',
            action='store_true',
            help='Connect only the th to save its serial')
  parser.add_argument('-b',
            '--build',
            action='store_true',
            help='Build test suite (no flashing)')
  parser.add_argument('-f',
            '--flash',
            action='store_true',
            help='Flash boards with last image built for them')
  parser.add_argument('-r',
            '--reset',
            action='store_true',
            help='Reset boards and save test results')

  args = parser.parse_args()

  if args.module:
    module = args.module
    cts_suite.set_module(module)

  if args.dut:
    dut_board = args.dut
    cts_suite.set_dut_board(dut_board)

  if args.setup:
    serial = cts_suite.saveThSerial()
    if(serial is not None):
      print 'Your th hla_serial # has been saved as: ' + serial
    else:
      print 'Unable to save serial'
    return

  if args.reset:
    cts_suite.resetAndRecord()

  elif args.build:
    cts_suite.make()

  elif args.flash:
    cts_suite.flashBoards()

  else:
    cts_suite.make()
    cts_suite.flashBoards()

if __name__ == "__main__":
  main()