""" lit - LLVM Integrated Tester. See lit.pod for more information. """ import itertools import os import platform import sys import time import lit.cl_arguments import lit.discovery import lit.display import lit.LitConfig import lit.reports import lit.run import lit.Test import lit.util from lit.formats.googletest import GoogleTest from lit.TestTimes import record_test_times def main(builtin_params={}): opts = lit.cl_arguments.parse_args() params = create_params(builtin_params, opts.user_params) is_windows = platform.system() == "Windows" lit_config = lit.LitConfig.LitConfig( progname=os.path.basename(sys.argv[0]), path=opts.path, quiet=opts.quiet, useValgrind=opts.useValgrind, valgrindLeakCheck=opts.valgrindLeakCheck, valgrindArgs=opts.valgrindArgs, noExecute=opts.noExecute, debug=opts.debug, isWindows=is_windows, order=opts.order, params=params, config_prefix=opts.configPrefix, echo_all_commands=opts.echoAllCommands, ) discovered_tests = lit.discovery.find_tests_for_inputs( lit_config, opts.test_paths, opts.indirectlyRunCheck ) if not discovered_tests: sys.stderr.write("error: did not discover any tests for provided path(s)\n") sys.exit(2) if opts.show_suites or opts.show_tests: print_discovered(discovered_tests, opts.show_suites, opts.show_tests) sys.exit(0) if opts.show_used_features: features = set( itertools.chain.from_iterable( t.getUsedFeatures() for t in discovered_tests if t.gtest_json_file is None ) ) print(" ".join(sorted(features))) sys.exit(0) # Command line overrides configuration for maxIndividualTestTime. if opts.maxIndividualTestTime is not None: # `not None` is important (default: 0) if opts.maxIndividualTestTime != lit_config.maxIndividualTestTime: lit_config.note( ( "The test suite configuration requested an individual" " test timeout of {0} seconds but a timeout of {1} seconds was" " requested on the command line. Forcing timeout to be {1}" " seconds" ).format(lit_config.maxIndividualTestTime, opts.maxIndividualTestTime) ) lit_config.maxIndividualTestTime = opts.maxIndividualTestTime determine_order(discovered_tests, opts.order) selected_tests = [ t for t in discovered_tests if opts.filter.search(t.getFullName()) and not opts.filter_out.search(t.getFullName()) ] if not selected_tests: sys.stderr.write( "error: filter did not match any tests " "(of %d discovered). " % len(discovered_tests) ) if opts.allow_empty_runs: sys.stderr.write( "Suppressing error because '--allow-empty-runs' " "was specified.\n" ) sys.exit(0) else: sys.stderr.write("Use '--allow-empty-runs' to suppress this " "error.\n") sys.exit(2) # When running multiple shards, don't include skipped tests in the xunit # output since merging the files will result in duplicates. if opts.shard: (run, shards) = opts.shard selected_tests = filter_by_shard(selected_tests, run, shards, lit_config) if not selected_tests: sys.stderr.write( "warning: shard does not contain any tests. " "Consider decreasing the number of shards.\n" ) sys.exit(0) selected_tests = selected_tests[: opts.max_tests] mark_xfail(discovered_tests, opts) mark_excluded(discovered_tests, selected_tests) start = time.time() run_tests(selected_tests, lit_config, opts, len(discovered_tests)) elapsed = time.time() - start record_test_times(selected_tests, lit_config) selected_tests, discovered_tests = GoogleTest.post_process_shard_results( selected_tests, discovered_tests ) if opts.time_tests: print_histogram(discovered_tests) print_results(discovered_tests, elapsed, opts) tests_for_report = selected_tests if opts.shard else discovered_tests for report in opts.reports: report.write_results(tests_for_report, elapsed) if lit_config.numErrors: sys.stderr.write("\n%d error(s) in tests\n" % lit_config.numErrors) sys.exit(2) if lit_config.numWarnings: sys.stderr.write("\n%d warning(s) in tests\n" % lit_config.numWarnings) has_failure = any(t.isFailure() for t in discovered_tests) if has_failure: if opts.ignoreFail: sys.stderr.write( "\nExiting with status 0 instead of 1 because " "'--ignore-fail' was specified.\n" ) else: sys.exit(1) def create_params(builtin_params, user_params): def parse(p): return p.split("=", 1) if "=" in p else (p, "") params = dict(builtin_params) params.update([parse(p) for p in user_params]) return params def print_discovered(tests, show_suites, show_tests): tests.sort(key=lit.reports.by_suite_and_test_path) if show_suites: tests_by_suite = itertools.groupby(tests, lambda t: t.suite) print("-- Test Suites --") for suite, test_iter in tests_by_suite: test_count = sum(1 for _ in test_iter) print(" %s - %d tests" % (suite.name, test_count)) print(" Source Root: %s" % suite.source_root) print(" Exec Root : %s" % suite.exec_root) features = " ".join(sorted(suite.config.available_features)) print(" Available Features: %s" % features) substitutions = sorted(suite.config.substitutions) substitutions = ("%s => %s" % (x, y) for (x, y) in substitutions) substitutions = "\n".ljust(30).join(substitutions) print(" Available Substitutions: %s" % substitutions) if show_tests: print("-- Available Tests --") for t in tests: print(" %s" % t.getFullName()) def determine_order(tests, order): from lit.cl_arguments import TestOrder enum_order = TestOrder(order) if enum_order == TestOrder.RANDOM: import random random.shuffle(tests) elif enum_order == TestOrder.LEXICAL: tests.sort(key=lambda t: t.getFullName()) else: assert enum_order == TestOrder.SMART, "Unknown TestOrder value" tests.sort( key=lambda t: (not t.previous_failure, -t.previous_elapsed, t.getFullName()) ) def filter_by_shard(tests, run, shards, lit_config): test_ixs = range(run - 1, len(tests), shards) selected_tests = [tests[i] for i in test_ixs] # For clarity, generate a preview of the first few test indices in the shard # to accompany the arithmetic expression. preview_len = 3 preview = ", ".join([str(i + 1) for i in test_ixs[:preview_len]]) if len(test_ixs) > preview_len: preview += ", ..." msg = ( f"Selecting shard {run}/{shards} = " f"size {len(selected_tests)}/{len(tests)} = " f"tests #({shards}*k)+{run} = [{preview}]" ) lit_config.note(msg) return selected_tests def mark_xfail(selected_tests, opts): for t in selected_tests: test_file = os.sep.join(t.path_in_suite) test_full_name = t.getFullName() if test_file in opts.xfail or test_full_name in opts.xfail: t.xfails += "*" if test_file in opts.xfail_not or test_full_name in opts.xfail_not: t.xfail_not = True def mark_excluded(discovered_tests, selected_tests): excluded_tests = set(discovered_tests) - set(selected_tests) result = lit.Test.Result(lit.Test.EXCLUDED) for t in excluded_tests: t.setResult(result) def run_tests(tests, lit_config, opts, discovered_tests): workers = min(len(tests), opts.workers) display = lit.display.create_display(opts, tests, discovered_tests, workers) run = lit.run.Run( tests, lit_config, workers, display.update, opts.max_failures, opts.timeout ) display.print_header() interrupted = False error = None try: execute_in_tmp_dir(run, lit_config) except KeyboardInterrupt: interrupted = True error = " interrupted by user" except lit.run.MaxFailuresError: error = "warning: reached maximum number of test failures" except lit.run.TimeoutError: error = "warning: reached timeout" display.clear(interrupted) if error: sys.stderr.write("%s, skipping remaining tests\n" % error) def execute_in_tmp_dir(run, lit_config): # Create a temp directory inside the normal temp directory so that we can # try to avoid temporary test file leaks. The user can avoid this behavior # by setting LIT_PRESERVES_TMP in the environment, so they can easily use # their own temp directory to monitor temporary file leaks or handle them at # the buildbot level. tmp_dir = None if "LIT_PRESERVES_TMP" not in os.environ: import tempfile # z/OS linker does not support '_' in paths, so use '-'. tmp_dir = tempfile.mkdtemp(prefix="lit-tmp-") tmp_dir_envs = {k: tmp_dir for k in ["TMP", "TMPDIR", "TEMP", "TEMPDIR"]} os.environ.update(tmp_dir_envs) for cfg in {t.config for t in run.tests}: cfg.environment.update(tmp_dir_envs) try: run.execute() finally: if tmp_dir: try: import shutil shutil.rmtree(tmp_dir) except Exception as e: lit_config.warning( "Failed to delete temp directory '%s', try upgrading your version of Python to fix this" % tmp_dir ) def print_histogram(tests): test_times = [ (t.getFullName(), t.result.elapsed) for t in tests if t.result.elapsed ] if test_times: lit.util.printHistogram(test_times, title="Tests") def print_results(tests, elapsed, opts): tests_by_code = {code: [] for code in lit.Test.ResultCode.all_codes()} for test in tests: tests_by_code[test.result.code].append(test) for code in lit.Test.ResultCode.all_codes(): print_group( sorted(tests_by_code[code], key=lambda t: t.getFullName()), code, opts.shown_codes, ) print_summary(tests_by_code, opts.quiet, elapsed) def print_group(tests, code, shown_codes): if not tests: return if not code.isFailure and code not in shown_codes: return print("*" * 20) print("{} Tests ({}):".format(code.label, len(tests))) for test in tests: print(" %s" % test.getFullName()) sys.stdout.write("\n") def print_summary(tests_by_code, quiet, elapsed): if not quiet: print("\nTesting Time: %.2fs" % elapsed) codes = [c for c in lit.Test.ResultCode.all_codes() if not quiet or c.isFailure] groups = [(c.label, len(tests_by_code[c])) for c in codes] groups = [(label, count) for label, count in groups if count] if not groups: return max_label_len = max(len(label) for label, _ in groups) max_count_len = max(len(str(count)) for _, count in groups) for (label, count) in groups: label = label.ljust(max_label_len) count = str(count).rjust(max_count_len) print(" %s: %s" % (label, count))