diff options
Diffstat (limited to 'psutil/tests/runner.py')
-rwxr-xr-x | psutil/tests/runner.py | 298 |
1 files changed, 231 insertions, 67 deletions
diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py index 7ffbc1cf..bfadc069 100755 --- a/psutil/tests/runner.py +++ b/psutil/tests/runner.py @@ -6,15 +6,19 @@ """ Unit test runner, providing new features on top of unittest module: -- colourized output (error, skip) +- colourized output +- parallel run (UNIX only) - print failures/tracebacks on CTRL+C - re-run failed tests only (make test-failed) """ from __future__ import print_function +import atexit import optparse import os import sys +import textwrap +import time import unittest from unittest import TestResult from unittest import TextTestResult @@ -24,11 +28,18 @@ try: except ImportError: ctypes = None +try: + import concurrencytest # pip install concurrencytest +except ImportError: + concurrencytest = None + import psutil from psutil._common import hilite from psutil._common import print_color from psutil._common import term_supports_colors from psutil.tests import APPVEYOR +from psutil.tests import import_module_by_path +from psutil.tests import reap_children from psutil.tests import safe_rmpath from psutil.tests import TOX @@ -36,6 +47,9 @@ from psutil.tests import TOX HERE = os.path.abspath(os.path.dirname(__file__)) VERBOSITY = 1 if TOX else 2 FAILED_TESTS_FNAME = '.failed-tests.txt' +NWORKERS = psutil.cpu_count() or 1 + +loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase # ===================================================================== @@ -46,8 +60,7 @@ FAILED_TESTS_FNAME = '.failed-tests.txt' class ColouredResult(TextTestResult): def _print_color(self, s, color, bold=False): - file = sys.stderr if color == "red" else sys.stdout - print_color(s, color, bold=bold, file=file) + print_color(s, color, bold=bold, file=self.stream) def addSuccess(self, test): TestResult.addSuccess(self, test) @@ -63,15 +76,15 @@ class ColouredResult(TextTestResult): def addSkip(self, test, reason): TestResult.addSkip(self, test, reason) - self._print_color("skipped: %s" % reason, "brown") + self._print_color("skipped: %s" % reason.strip(), "brown") def printErrorList(self, flavour, errors): flavour = hilite(flavour, "red", bold=flavour == 'ERROR') TextTestResult.printErrorList(self, flavour, errors) -class ColouredRunner(TextTestRunner): - resultclass = ColouredResult if term_supports_colors() else TextTestResult +class ColouredTextRunner(TextTestRunner): + resultclass = ColouredResult def _makeResult(self): # Store result instance so that it can be accessed on @@ -85,81 +98,232 @@ class ColouredRunner(TextTestRunner): # ===================================================================== -def setup_tests(): - if 'PSUTIL_TESTING' not in os.environ: - # This won't work on Windows but set_testing() below will do it. - os.environ['PSUTIL_TESTING'] = '1' - psutil._psplatform.cext.set_testing() +class SuiteLoader: + + testdir = HERE + skip_files = ['test_memory_leaks.py'] + if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ: + skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py']) + + def _get_testmods(self): + return [os.path.join(self.testdir, x) + for x in os.listdir(self.testdir) + if x.startswith('test_') and x.endswith('.py') and + x not in self.skip_files] + + def _iter_testmod_classes(self): + """Iterate over all test files in this directory and return + all TestCase classes in them. + """ + for path in self._get_testmods(): + mod = import_module_by_path(path) + for name in dir(mod): + obj = getattr(mod, name) + if isinstance(obj, type) and \ + issubclass(obj, unittest.TestCase): + yield obj + + def all(self): + suite = unittest.TestSuite() + for obj in self._iter_testmod_classes(): + test = loadTestsFromTestCase(obj) + suite.addTest(test) + return suite + def parallel(self): + serial = unittest.TestSuite() + parallel = unittest.TestSuite() + for obj in self._iter_testmod_classes(): + test = loadTestsFromTestCase(obj) + if getattr(obj, '_serialrun', False): + serial.addTest(test) + else: + parallel.addTest(test) + return (serial, parallel) + + def last_failed(self): + # ...from previously failed test run + suite = unittest.TestSuite() + if not os.path.isfile(FAILED_TESTS_FNAME): + return suite + with open(FAILED_TESTS_FNAME, 'rt') as f: + names = f.read().split() + for n in names: + test = unittest.defaultTestLoader.loadTestsFromName(n) + suite.addTest(test) + return suite -def get_suite(name=None): - suite = unittest.TestSuite() - if name is None: - testmods = [os.path.splitext(x)[0] for x in os.listdir(HERE) - if x.endswith('.py') and x.startswith('test_') and not - x.startswith('test_memory_leaks')] - if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ: - testmods = [x for x in testmods if not x.endswith(( - "osx", "posix", "linux"))] - for tm in testmods: - # ...so that the full test paths are printed on screen - tm = "psutil.tests.%s" % tm - suite.addTest(unittest.defaultTestLoader.loadTestsFromName(tm)) - else: + def from_name(self, name): + suite = unittest.TestSuite() name = os.path.splitext(os.path.basename(name))[0] suite.addTest(unittest.defaultTestLoader.loadTestsFromName(name)) - return suite + return suite -def get_suite_from_failed(): - # ...from previously failed test run - suite = unittest.TestSuite() - if not os.path.isfile(FAILED_TESTS_FNAME): - return suite - with open(FAILED_TESTS_FNAME, 'rt') as f: - names = f.read().split() - for n in names: - suite.addTest(unittest.defaultTestLoader.loadTestsFromName(n)) - return suite - - -def save_failed_tests(result): - if result.wasSuccessful(): - return safe_rmpath(FAILED_TESTS_FNAME) - with open(FAILED_TESTS_FNAME, 'wt') as f: - for t in result.errors + result.failures: - tname = str(t[0]) - unittest.defaultTestLoader.loadTestsFromName(tname) - f.write(tname + '\n') - - -def run(name=None, last_failed=False): - setup_tests() - if APPVEYOR: - runner = TextTestRunner(verbosity=VERBOSITY) - else: - runner = ColouredRunner(verbosity=VERBOSITY) - suite = get_suite_from_failed() if last_failed else get_suite(name) - try: - result = runner.run(suite) - except (KeyboardInterrupt, SystemExit) as err: - print("received %s" % err.__class__.__name__, file=sys.stderr) - runner.result.printErrors() - sys.exit(1) - else: - save_failed_tests(result) - success = result.wasSuccessful() - sys.exit(0 if success else 1) +class Runner: + + def __init__(self): + self.loader = SuiteLoader() + self.failed_tnames = set() + if term_supports_colors() and not APPVEYOR: + self.runner = ColouredTextRunner(verbosity=VERBOSITY) + else: + self.runner = TextTestRunner(verbosity=VERBOSITY) + + def _write_last_failed(self): + if self.failed_tnames: + with open(FAILED_TESTS_FNAME, 'wt') as f: + for tname in self.failed_tnames: + f.write(tname + '\n') + + def _save_result(self, result): + if not result.wasSuccessful(): + for t in result.errors + result.failures: + tname = t[0].id() + self.failed_tnames.add(tname) + + def _run(self, suite): + try: + result = self.runner.run(suite) + except (KeyboardInterrupt, SystemExit): + result = self.runner.result + result.printErrors() + raise sys.exit(1) + else: + self._save_result(result) + return result + + def _finalize(self, success): + if success: + safe_rmpath(FAILED_TESTS_FNAME) + else: + self._write_last_failed() + print_color("FAILED", "red") + sys.exit(1) + + def run(self, suite=None): + """Run tests serially (1 process).""" + if suite is None: + suite = self.loader.all() + result = self._run(suite) + self._finalize(result.wasSuccessful()) + + def run_last_failed(self): + """Run tests which failed in the last run.""" + self.run(self.loader.last_failed()) + + def run_from_name(self, name): + """Run test by name, e.g.: + "test_linux.TestSystemCPUStats.test_ctx_switches" + """ + self.run(self.loader.from_name(name)) + + def _parallelize_suite(self, suite): + def fdopen(*args, **kwds): + stream = orig_fdopen(*args, **kwds) + atexit.register(stream.close) + return stream + + # Monkey patch concurrencytest lib bug (fdopen() stream not closed). + # https://github.com/cgoldberg/concurrencytest/issues/11 + orig_fdopen = os.fdopen + concurrencytest.os.fdopen = fdopen + forker = concurrencytest.fork_for_tests(NWORKERS) + return concurrencytest.ConcurrentTestSuite(suite, forker) + + def run_parallel(self): + """Run tests in parallel.""" + ser_suite, par_suite = self.loader.parallel() + par_suite = self._parallelize_suite(par_suite) + + # run parallel + print("starting parallel tests using %s workers" % NWORKERS) + t = time.time() + par = self._run(par_suite) + par_elapsed = time.time() - t + + # cleanup workers and test subprocesses + orphans = psutil.Process().children() + gone, alive = psutil.wait_procs(orphans, timeout=1) + if alive: + print_color("alive processes %s" % alive, "red") + reap_children() + + # run serial + t = time.time() + ser = self._run(ser_suite) + ser_elapsed = time.time() - t + + # print + par_fails, par_errs, par_skips = map(len, (par.failures, + par.errors, + par.skipped)) + ser_fails, ser_errs, ser_skips = map(len, (ser.failures, + ser.errors, + ser.skipped)) + print("-" * 70) + print(textwrap.dedent(""" + +----------+----------+----------+----------+----------+----------+ + | | total | failures | errors | skipped | time | + +----------+----------+----------+----------+----------+----------+ + | parallel | %3s | %3s | %3s | %3s | %.2fs | + +----------+----------+----------+----------+----------+----------+ + | serial | %3s | %3s | %3s | %3s | %.2fs | + +----------+----------+----------+----------+----------+----------+ + """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed, + ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed))) + print("Ran %s tests in %.3fs using %s workers" % ( + par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS)) + ok = par.wasSuccessful() and ser.wasSuccessful() + self._finalize(ok) + + +runner = Runner() +run_from_name = runner.run_from_name + + +def _setup(): + if 'PSUTIL_TESTING' not in os.environ: + # This won't work on Windows but set_testing() below will do it. + os.environ['PSUTIL_TESTING'] = '1' + psutil._psplatform.cext.set_testing() def main(): - usage = "python3 -m psutil.tests [opts]" + _setup() + usage = "python3 -m psutil.tests [opts] [test-name]" parser = optparse.OptionParser(usage=usage, description="run unit tests") parser.add_option("--last-failed", action="store_true", default=False, help="only run last failed tests") + parser.add_option("--parallel", + action="store_true", default=False, + help="run tests in parallel") opts, args = parser.parse_args() - run(last_failed=opts.last_failed) + + if not opts.last_failed: + safe_rmpath(FAILED_TESTS_FNAME) + + # test-by-name + if args: + if len(args) > 1: + parser.print_usage() + return sys.exit(1) + return runner.run_from_name(args[0]) + elif opts.last_failed: + runner.run_last_failed() + elif not opts.parallel: + runner.run() + # parallel + elif concurrencytest is None: + print_color("concurrencytest module is not installed; " + "running serial tests instead", "red") + runner.run() + elif NWORKERS == 1: + print_color("only 1 CPU; running serial tests instead", "red") + runner.run() + else: + runner.run_parallel() if __name__ == '__main__': |