summaryrefslogtreecommitdiff
path: root/Lib/test/support.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/support.py')
-rw-r--r--Lib/test/support.py769
1 files changed, 769 insertions, 0 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py
new file mode 100644
index 0000000000..b8bc406011
--- /dev/null
+++ b/Lib/test/support.py
@@ -0,0 +1,769 @@
+"""Supporting definitions for the Python regression tests."""
+
+if __name__ != 'test.support':
+ raise ImportError('support must be imported from the test package')
+
+import contextlib
+import errno
+import socket
+import sys
+import os
+import os.path
+import shutil
+import warnings
+import unittest
+
+class Error(Exception):
+ """Base class for regression test exceptions."""
+
+class TestFailed(Error):
+ """Test failed."""
+
+class TestSkipped(Error):
+ """Test skipped.
+
+ This can be raised to indicate that a test was deliberatly
+ skipped, but not because a feature wasn't available. For
+ example, if some resource can't be used, such as the network
+ appears to be unavailable, this should be raised instead of
+ TestFailed.
+ """
+
+class ResourceDenied(TestSkipped):
+ """Test skipped because it requested a disallowed resource.
+
+ This is raised when a test calls requires() for a resource that
+ has not be enabled. It is used to distinguish between expected
+ and unexpected skips.
+ """
+
+def import_module(name, deprecated=False):
+ """Import the module to be tested, raising TestSkipped if it is not
+ available."""
+ with catch_warning(record=False):
+ if deprecated:
+ warnings.filterwarnings("ignore", ".+ (module|package)",
+ DeprecationWarning)
+ try:
+ module = __import__(name, level=0)
+ except ImportError:
+ raise TestSkipped("No module named " + name)
+ else:
+ return module
+
+verbose = 1 # Flag set to 0 by regrtest.py
+use_resources = None # Flag set to [] by regrtest.py
+max_memuse = 0 # Disable bigmem tests (they will still be run with
+ # small sizes, to make sure they work.)
+
+# _original_stdout is meant to hold stdout at the time regrtest began.
+# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
+# The point is to have some flavor of stdout the user can actually see.
+_original_stdout = None
+def record_original_stdout(stdout):
+ global _original_stdout
+ _original_stdout = stdout
+
+def get_original_stdout():
+ return _original_stdout or sys.stdout
+
+def unload(name):
+ try:
+ del sys.modules[name]
+ except KeyError:
+ pass
+
+def unlink(filename):
+ try:
+ os.unlink(filename)
+ except OSError:
+ pass
+
+def rmtree(path):
+ try:
+ shutil.rmtree(path)
+ except OSError as e:
+ # Unix returns ENOENT, Windows returns ESRCH.
+ if e.errno not in (errno.ENOENT, errno.ESRCH):
+ raise
+
+def forget(modname):
+ '''"Forget" a module was ever imported by removing it from sys.modules and
+ deleting any .pyc and .pyo files.'''
+ unload(modname)
+ for dirname in sys.path:
+ unlink(os.path.join(dirname, modname + '.pyc'))
+ # Deleting the .pyo file cannot be within the 'try' for the .pyc since
+ # the chance exists that there is no .pyc (and thus the 'try' statement
+ # is exited) but there is a .pyo file.
+ unlink(os.path.join(dirname, modname + '.pyo'))
+
+def is_resource_enabled(resource):
+ """Test whether a resource is enabled. Known resources are set by
+ regrtest.py."""
+ return use_resources is not None and resource in use_resources
+
+def requires(resource, msg=None):
+ """Raise ResourceDenied if the specified resource is not available.
+
+ If the caller's module is __main__ then automatically return True. The
+ possibility of False being returned occurs when regrtest.py is executing."""
+ # see if the caller's module is __main__ - if so, treat as if
+ # the resource was set
+ if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
+ return
+ if not is_resource_enabled(resource):
+ if msg is None:
+ msg = "Use of the `%s' resource not enabled" % resource
+ raise ResourceDenied(msg)
+
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it.
+ """
+
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
+
+FUZZ = 1e-6
+
+def fcmp(x, y): # fuzzy comparison function
+ if isinstance(x, float) or isinstance(y, float):
+ try:
+ fuzz = (abs(x) + abs(y)) * FUZZ
+ if abs(x-y) <= fuzz:
+ return 0
+ except:
+ pass
+ elif type(x) == type(y) and isinstance(x, (tuple, list)):
+ for i in range(min(len(x), len(y))):
+ outcome = fcmp(x[i], y[i])
+ if outcome != 0:
+ return outcome
+ return (len(x) > len(y)) - (len(x) < len(y))
+ return (x > y) - (x < y)
+
+try:
+ str
+ have_unicode = True
+except NameError:
+ have_unicode = False
+
+is_jython = sys.platform.startswith('java')
+
+# Filename used for testing
+if os.name == 'java':
+ # Jython disallows @ in module names
+ TESTFN = '$test'
+else:
+ TESTFN = '@test'
+
+ # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
+ # TESTFN_UNICODE is a filename that can be encoded using the
+ # file system encoding, but *not* with the default (ascii) encoding
+ TESTFN_UNICODE = "@test-\xe0\xf2"
+ TESTFN_ENCODING = sys.getfilesystemencoding()
+ # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
+ # able to be encoded by *either* the default or filesystem encoding.
+ # This test really only makes sense on Windows NT platforms
+ # which have special Unicode support in posixmodule.
+ if (not hasattr(sys, "getwindowsversion") or
+ sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
+ TESTFN_UNICODE_UNENCODEABLE = None
+ else:
+ # Japanese characters (I think - from bug 846133)
+ TESTFN_UNICODE_UNENCODEABLE = "@test-\u5171\u6709\u3055\u308c\u308b"
+ try:
+ # XXX - Note - should be using TESTFN_ENCODING here - but for
+ # Windows, "mbcs" currently always operates as if in
+ # errors=ignore' mode - hence we get '?' characters rather than
+ # the exception. 'Latin1' operates as we expect - ie, fails.
+ # See [ 850997 ] mbcs encoding ignores errors
+ TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
+ except UnicodeEncodeError:
+ pass
+ else:
+ print('WARNING: The filename %r CAN be encoded by the filesystem. '
+ 'Unicode filename tests may not be effective'
+ % TESTFN_UNICODE_UNENCODEABLE)
+
+# Make sure we can write to TESTFN, try in /tmp if we can't
+fp = None
+try:
+ fp = open(TESTFN, 'w+')
+except IOError:
+ TMP_TESTFN = os.path.join('/tmp', TESTFN)
+ try:
+ fp = open(TMP_TESTFN, 'w+')
+ TESTFN = TMP_TESTFN
+ del TMP_TESTFN
+ except IOError:
+ print(('WARNING: tests will fail, unable to write to: %s or %s' %
+ (TESTFN, TMP_TESTFN)))
+if fp is not None:
+ fp.close()
+ unlink(TESTFN)
+del fp
+
+def findfile(file, here=__file__):
+ """Try to find a file on sys.path and the working directory. If it is not
+ found the argument passed to the function is returned (this does not
+ necessarily signal failure; could still be the legitimate path)."""
+ if os.path.isabs(file):
+ return file
+ path = sys.path
+ path = [os.path.dirname(here)] + path
+ for dn in path:
+ fn = os.path.join(dn, file)
+ if os.path.exists(fn): return fn
+ return file
+
+def verify(condition, reason='test failed'):
+ """Verify that condition is true. If not, raise TestFailed.
+
+ The optional argument reason can be given to provide
+ a better error text.
+ """
+
+ if not condition:
+ raise TestFailed(reason)
+
+def vereq(a, b):
+ """Raise TestFailed if a == b is false.
+
+ This is better than verify(a == b) because, in case of failure, the
+ error message incorporates repr(a) and repr(b) so you can see the
+ inputs.
+
+ Note that "not (a == b)" isn't necessarily the same as "a != b"; the
+ former is tested.
+ """
+
+ if not (a == b):
+ raise TestFailed("%r == %r" % (a, b))
+
+def sortdict(dict):
+ "Like repr(dict), but in sorted order."
+ items = sorted(dict.items())
+ reprpairs = ["%r: %r" % pair for pair in items]
+ withcommas = ", ".join(reprpairs)
+ return "{%s}" % withcommas
+
+def check_syntax_error(testcase, statement):
+ try:
+ compile(statement, '<test string>', 'exec')
+ except SyntaxError:
+ pass
+ else:
+ testcase.fail('Missing SyntaxError: "%s"' % statement)
+
+def open_urlresource(url, *args, **kw):
+ import urllib, urlparse
+
+ requires('urlfetch')
+ filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
+
+ for path in [os.path.curdir, os.path.pardir]:
+ fn = os.path.join(path, filename)
+ if os.path.exists(fn):
+ return open(fn, *args, **kw)
+
+ print('\tfetching %s ...' % url, file=get_original_stdout())
+ fn, _ = urllib.urlretrieve(url, filename)
+ return open(fn, *args, **kw)
+
+
+class WarningMessage(object):
+ "Holds the result of the latest showwarning() call"
+ def __init__(self):
+ self.message = None
+ self.category = None
+ self.filename = None
+ self.lineno = None
+
+ def _showwarning(self, message, category, filename, lineno, file=None,
+ line=None):
+ self.message = message
+ self.category = category
+ self.filename = filename
+ self.lineno = lineno
+ self.line = line
+
+ def reset(self):
+ self._showwarning(*((None,)*6))
+
+ def __str__(self):
+ return ("{message : %r, category : %r, filename : %r, lineno : %s, "
+ "line : %r}" % (self.message,
+ self.category.__name__ if self.category else None,
+ self.filename, self.lineno, self.line))
+
+
+@contextlib.contextmanager
+def catch_warning(module=warnings, record=True):
+ """
+ Guard the warnings filter from being permanently changed and record the
+ data of the last warning that has been issued.
+
+ Use like this:
+
+ with catch_warning() as w:
+ warnings.warn("foo")
+ assert str(w.message) == "foo"
+ """
+ original_filters = module.filters[:]
+ original_showwarning = module.showwarning
+ if record:
+ warning_obj = WarningMessage()
+ module.showwarning = warning_obj._showwarning
+ try:
+ yield warning_obj if record else None
+ finally:
+ module.showwarning = original_showwarning
+ module.filters = original_filters
+
+
+class CleanImport(object):
+ """Context manager to force import to return a new module reference.
+
+ This is useful for testing module-level behaviours, such as
+ the emission of a DepreciationWarning on import.
+
+ Use like this:
+
+ with CleanImport("foo"):
+ __import__("foo") # new reference
+ """
+
+ def __init__(self, *module_names):
+ self.original_modules = sys.modules.copy()
+ for module_name in module_names:
+ if module_name in sys.modules:
+ module = sys.modules[module_name]
+ # It is possible that module_name is just an alias for
+ # another module (e.g. stub for modules renamed in 3.x).
+ # In that case, we also need delete the real module to clear
+ # the import cache.
+ if module.__name__ != module_name:
+ del sys.modules[module.__name__]
+ del sys.modules[module_name]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ sys.modules.update(self.original_modules)
+
+
+class EnvironmentVarGuard(object):
+
+ """Class to help protect the environment variable properly. Can be used as
+ a context manager."""
+
+ def __init__(self):
+ self._environ = os.environ
+ self._unset = set()
+ self._reset = dict()
+
+ def set(self, envvar, value):
+ if envvar not in self._environ:
+ self._unset.add(envvar)
+ else:
+ self._reset[envvar] = self._environ[envvar]
+ self._environ[envvar] = value
+
+ def unset(self, envvar):
+ if envvar in self._environ:
+ self._reset[envvar] = self._environ[envvar]
+ del self._environ[envvar]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ for envvar, value in self._reset.items():
+ self._environ[envvar] = value
+ for unset in self._unset:
+ del self._environ[unset]
+
+class TransientResource(object):
+
+ """Raise ResourceDenied if an exception is raised while the context manager
+ is in effect that matches the specified exception and attributes."""
+
+ def __init__(self, exc, **kwargs):
+ self.exc = exc
+ self.attrs = kwargs
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type_=None, value=None, traceback=None):
+ """If type_ is a subclass of self.exc and value has attributes matching
+ self.attrs, raise ResourceDenied. Otherwise let the exception
+ propagate (if any)."""
+ if type_ is not None and issubclass(self.exc, type_):
+ for attr, attr_value in self.attrs.items():
+ if not hasattr(value, attr):
+ break
+ if getattr(value, attr) != attr_value:
+ break
+ else:
+ raise ResourceDenied("an optional resource is not available")
+
+
+def transient_internet():
+ """Return a context manager that raises ResourceDenied when various issues
+ with the Internet connection manifest themselves as exceptions."""
+ time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
+ socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
+ ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
+ return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
+
+
+@contextlib.contextmanager
+def captured_output(stream_name):
+ """Run the 'with' statement body using a StringIO object in place of a
+ specific attribute on the sys module.
+ Example use (with 'stream_name=stdout')::
+
+ with captured_stdout() as s:
+ print("hello")
+ assert s.getvalue() == "hello"
+ """
+ import io
+ orig_stdout = getattr(sys, stream_name)
+ setattr(sys, stream_name, io.StringIO())
+ try:
+ yield getattr(sys, stream_name)
+ finally:
+ setattr(sys, stream_name, orig_stdout)
+
+def captured_stdout():
+ return captured_output("stdout")
+
+
+#=======================================================================
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+
+def run_with_locale(catstr, *locales):
+ def decorator(func):
+ def inner(*args, **kwds):
+ try:
+ import locale
+ category = getattr(locale, catstr)
+ orig_locale = locale.setlocale(category)
+ except AttributeError:
+ # if the test author gives us an invalid category string
+ raise
+ except:
+ # cannot retrieve original locale, so do nothing
+ locale = orig_locale = None
+ else:
+ for loc in locales:
+ try:
+ locale.setlocale(category, loc)
+ break
+ except:
+ pass
+
+ # now run the function, resetting the locale on exceptions
+ try:
+ return func(*args, **kwds)
+ finally:
+ if locale and orig_locale:
+ locale.setlocale(category, orig_locale)
+ inner.__name__ = func.__name__
+ inner.__doc__ = func.__doc__
+ return inner
+ return decorator
+
+#=======================================================================
+# Big-memory-test support. Separate from 'resources' because memory use
+# should be configurable.
+
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+
+MAX_Py_ssize_t = sys.maxsize
+
+def set_memlimit(limit):
+ import re
+ global max_memuse
+ sizes = {
+ 'k': 1024,
+ 'm': _1M,
+ 'g': _1G,
+ 't': 1024*_1G,
+ }
+ m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+ re.IGNORECASE | re.VERBOSE)
+ if m is None:
+ raise ValueError('Invalid memory limit %r' % (limit,))
+ memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
+ if memlimit > MAX_Py_ssize_t:
+ memlimit = MAX_Py_ssize_t
+ if memlimit < _2G - 1:
+ raise ValueError('Memory limit %r too low to be useful' % (limit,))
+ max_memuse = memlimit
+
+def bigmemtest(minsize, memuse, overhead=5*_1M):
+ """Decorator for bigmem tests.
+
+ 'minsize' is the minimum useful size for the test (in arbitrary,
+ test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+ the test, or a good estimate of it. 'overhead' specifies fixed overhead,
+ independent of the testsize, and defaults to 5Mb.
+
+ The decorator tries to guess a good value for 'size' and passes it to
+ the decorated test function. If minsize * memuse is more than the
+ allowed memory use (as defined by max_memuse), the test is skipped.
+ Otherwise, minsize is adjusted upward to use up to max_memuse.
+ """
+ def decorator(f):
+ def wrapper(self):
+ if not max_memuse:
+ # If max_memuse is 0 (the default),
+ # we still want to run the tests with size set to a few kb,
+ # to make sure they work. We still want to avoid using
+ # too much memory, though, but we do that noisily.
+ maxsize = 5147
+ self.failIf(maxsize * memuse + overhead > 20 * _1M)
+ else:
+ maxsize = int((max_memuse - overhead) / memuse)
+ if maxsize < minsize:
+ # Really ought to print 'test skipped' or something
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ return
+ # Try to keep some breathing room in memory use
+ maxsize = max(maxsize - 50 * _1M, minsize)
+ return f(self, maxsize)
+ wrapper.minsize = minsize
+ wrapper.memuse = memuse
+ wrapper.overhead = overhead
+ return wrapper
+ return decorator
+
+def bigaddrspacetest(f):
+ """Decorator for tests that fill the address space."""
+ def wrapper(self):
+ if max_memuse < MAX_Py_ssize_t:
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ else:
+ return f(self)
+ return wrapper
+
+#=======================================================================
+# unittest integration.
+
+class BasicTestRunner:
+ def run(self, test):
+ result = unittest.TestResult()
+ test(result)
+ return result
+
+
+def _run_suite(suite):
+ """Run tests from a unittest.TestSuite-derived class."""
+ if verbose:
+ runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
+ else:
+ runner = BasicTestRunner()
+
+ result = runner.run(suite)
+ if not result.wasSuccessful():
+ if len(result.errors) == 1 and not result.failures:
+ err = result.errors[0][1]
+ elif len(result.failures) == 1 and not result.errors:
+ err = result.failures[0][1]
+ else:
+ err = "errors occurred; run in verbose mode for details"
+ raise TestFailed(err)
+
+
+def run_unittest(*classes):
+ """Run tests from unittest.TestCase-derived classes."""
+ valid_types = (unittest.TestSuite, unittest.TestCase)
+ suite = unittest.TestSuite()
+ for cls in classes:
+ if isinstance(cls, str):
+ if cls in sys.modules:
+ suite.addTest(unittest.findTestCases(sys.modules[cls]))
+ else:
+ raise ValueError("str arguments must be keys in sys.modules")
+ elif isinstance(cls, valid_types):
+ suite.addTest(cls)
+ else:
+ suite.addTest(unittest.makeSuite(cls))
+ _run_suite(suite)
+
+
+#=======================================================================
+# doctest driver.
+
+def run_doctest(module, verbosity=None):
+ """Run doctest on the given module. Return (#failures, #tests).
+
+ If optional argument verbosity is not specified (or is None), pass
+ support's belief about verbosity on to doctest. Else doctest's
+ usual behavior is used (it searches sys.argv for -v).
+ """
+
+ import doctest
+
+ if verbosity is None:
+ verbosity = verbose
+ else:
+ verbosity = None
+
+ # Direct doctest output (normally just errors) to real stdout; doctest
+ # output shouldn't be compared by regrtest.
+ save_stdout = sys.stdout
+ sys.stdout = get_original_stdout()
+ try:
+ f, t = doctest.testmod(module, verbose=verbosity)
+ if f:
+ raise TestFailed("%d of %d doctests failed" % (f, t))
+ finally:
+ sys.stdout = save_stdout
+ if verbose:
+ print('doctest (%s) ... %d tests with zero failures' %
+ (module.__name__, t))
+ return f, t
+
+#=======================================================================
+# Threading support to prevent reporting refleaks when running regrtest.py -R
+
+def threading_setup():
+ import threading
+ return len(threading._active), len(threading._limbo)
+
+def threading_cleanup(num_active, num_limbo):
+ import threading
+ import time
+
+ _MAX_COUNT = 10
+ count = 0
+ while len(threading._active) != num_active and count < _MAX_COUNT:
+ count += 1
+ time.sleep(0.1)
+
+ count = 0
+ while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
+ count += 1
+ time.sleep(0.1)
+
+def reap_children():
+ """Use this function at the end of test_main() whenever sub-processes
+ are started. This will help ensure that no extra children (zombies)
+ stick around to hog resources and create problems when looking
+ for refleaks.
+ """
+
+ # Reap all our dead child processes so we don't leave zombies around.
+ # These hog resources and might be causing some of the buildbots to die.
+ if hasattr(os, 'waitpid'):
+ any_process = -1
+ while True:
+ try:
+ # This will raise an exception on Windows. That's ok.
+ pid, status = os.waitpid(any_process, os.WNOHANG)
+ if pid == 0:
+ break
+ except:
+ break