summaryrefslogtreecommitdiff
path: root/Lib/test/support.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/support.py')
-rw-r--r--Lib/test/support.py178
1 files changed, 132 insertions, 46 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py
index 8e6ca2a541..d00a51324a 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -15,7 +15,7 @@ import shutil
import warnings
import unittest
import importlib
-import collections
+import collections.abc
import re
import subprocess
import imp
@@ -35,25 +35,28 @@ except ImportError:
multiprocessing = None
+try:
+ import zlib
+except ImportError:
+ zlib = None
+
__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
"verbose", "use_resources", "max_memuse", "record_original_stdout",
"get_original_stdout", "unload", "unlink", "rmtree", "forget",
- "is_resource_enabled", "requires", "requires_mac_ver",
- "find_unused_port", "bind_port",
- "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd",
- "findfile", "sortdict", "check_syntax_error", "open_urlresource",
- "check_warnings", "CleanImport", "EnvironmentVarGuard",
- "TransientResource", "captured_output", "captured_stdout",
- "captured_stdin", "captured_stderr",
- "time_out", "socket_peer_reset", "ioerror_peer_reset",
- "run_with_locale", 'temp_umask', "transient_internet",
- "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner",
- "run_unittest", "run_doctest", "threading_setup", "threading_cleanup",
- "reap_children", "cpython_only", "check_impl_detail", "get_attribute",
- "swap_item", "swap_attr", "requires_IEEE_754",
+ "is_resource_enabled", "requires", "requires_linux_version",
+ "requires_mac_ver", "find_unused_port", "bind_port",
+ "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd",
+ "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource",
+ "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource",
+ "captured_stdout", "captured_stdin", "captured_stderr", "time_out",
+ "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask',
+ "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest",
+ "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
+ "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail",
+ "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
"TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
- "import_fresh_module", "failfast",
+ "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
]
class Error(Exception):
@@ -168,7 +171,7 @@ def get_attribute(obj, name):
attribute = getattr(obj, name)
except AttributeError:
raise unittest.SkipTest("module %s has no attribute %s" % (
- obj.__name__, name))
+ repr(obj), name))
else:
return attribute
@@ -295,9 +298,36 @@ def requires(resource, msg=None):
return
if not is_resource_enabled(resource):
if msg is None:
- msg = "Use of the `%s' resource not enabled" % resource
+ msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
+def requires_linux_version(*min_version):
+ """Decorator raising SkipTest if the OS is Linux and the kernel version is
+ less than min_version.
+
+ For example, @requires_linux_version(2, 6, 35) raises SkipTest if the Linux
+ kernel version is less than 2.6.35.
+ """
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kw):
+ if sys.platform == 'linux':
+ version_txt = platform.release().split('-', 1)[0]
+ try:
+ version = tuple(map(int, version_txt.split('.')))
+ except ValueError:
+ pass
+ else:
+ if version < min_version:
+ min_version_txt = '.'.join(map(str, min_version))
+ raise unittest.SkipTest(
+ "Linux kernel %s or higher required, not %s"
+ % (min_version_txt, version_txt))
+ return func(*args, **kw)
+ wrapper.min_version = min_version
+ return wrapper
+ return decorator
+
def requires_mac_ver(*min_version):
"""Decorator raising SkipTest if the OS is Mac OS X and the OS X
version if less than min_version.
@@ -325,6 +355,7 @@ def requires_mac_ver(*min_version):
return wrapper
return decorator
+
HOST = 'localhost'
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
@@ -420,29 +451,35 @@ def bind_port(sock, host=HOST):
port = sock.getsockname()[1]
return port
-FUZZ = 1e-6
-
-def fcmp(x, y): # fuzzy comparison function
- if isinstance(x, float) or isinstance(y, float):
+def _is_ipv6_enabled():
+ """Check whether IPv6 is enabled on this host."""
+ if socket.has_ipv6:
try:
- fuzz = (abs(x) + abs(y)) * FUZZ
- if abs(x-y) <= fuzz:
- return 0
- except:
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.bind(('::1', 0))
+ except (socket.error, socket.gaierror):
pass
- elif type(x) == type(y) and isinstance(x, (tuple, list)):
- for i in range(min(len(x), len(y))):
- outcome = fcmp(x[i], y[i])
- if outcome != 0:
- return outcome
- return (len(x) > len(y)) - (len(x) < len(y))
- return (x > y) - (x < y)
+ else:
+ sock.close()
+ return True
+ return False
+
+IPV6_ENABLED = _is_ipv6_enabled()
+
+
+# A constant likely larger than the underlying OS pipe buffer size.
+# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe
+# buffer size: take 1M to be sure.
+PIPE_MAX_SIZE = 1024 * 1024
+
# decorator for skipping tests on non-IEEE 754 platforms
requires_IEEE_754 = unittest.skipUnless(
float.__getformat__("double").startswith("IEEE"),
"test requires IEEE 754 doubles")
+requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
+
is_jython = sys.platform.startswith('java')
# Filename used for testing
@@ -543,14 +580,15 @@ def temp_cwd(name='tempcwd', quiet=False, path=None):
rmtree(name)
-@contextlib.contextmanager
-def temp_umask(umask):
- """Context manager that temporarily sets the process umask."""
- oldmask = os.umask(umask)
- try:
- yield
- finally:
- os.umask(oldmask)
+if hasattr(os, "umask"):
+ @contextlib.contextmanager
+ def temp_umask(umask):
+ """Context manager that temporarily sets the process umask."""
+ oldmask = os.umask(umask)
+ try:
+ yield
+ finally:
+ os.umask(oldmask)
def findfile(file, here=__file__, subdir=None):
@@ -568,6 +606,11 @@ def findfile(file, here=__file__, subdir=None):
if os.path.exists(fn): return fn
return file
+def create_empty_file(filename):
+ """Create an empty file. If the file already exists, truncate it."""
+ fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
+ os.close(fd)
+
def sortdict(dict):
"Like repr(dict), but in sorted order."
items = sorted(dict.items())
@@ -632,7 +675,7 @@ def open_urlresource(url, *args, **kw):
f = check_valid_file(fn)
if f is not None:
return f
- raise TestFailed('invalid resource "%s"' % fn)
+ raise TestFailed('invalid resource %r' % fn)
class WarningsRecorder(object):
@@ -753,7 +796,7 @@ class CleanImport(object):
sys.modules.update(self.original_modules)
-class EnvironmentVarGuard(collections.MutableMapping):
+class EnvironmentVarGuard(collections.abc.MutableMapping):
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
@@ -883,7 +926,7 @@ def transient_internet(resource_name, *, timeout=30.0, errnos=()):
('WSANO_DATA', 11004),
]
- denied = ResourceDenied("Resource '%s' is not available" % resource_name)
+ denied = ResourceDenied("Resource %r is not available" % resource_name)
captured_errnos = errnos
gai_errnos = []
if not captured_errnos:
@@ -972,6 +1015,16 @@ def gc_collect():
gc.collect()
gc.collect()
+@contextlib.contextmanager
+def disable_gc():
+ have_gc = gc.isenabled()
+ gc.disable()
+ try:
+ yield
+ finally:
+ if have_gc:
+ gc.enable()
+
def python_is_optimized():
"""Find if Python was built with optimizations."""
@@ -980,7 +1033,7 @@ def python_is_optimized():
for opt in cflags.split():
if opt.startswith('-O'):
final_opt = opt
- return final_opt and final_opt != '-O0'
+ return final_opt != '' and final_opt != '-O0'
#=======================================================================
@@ -1090,6 +1143,11 @@ def bigmemtest(minsize, memuse):
return decorator
def precisionbigmemtest(size, memuse, dry_run=True):
+ """Decorator for bigmem tests that need exact sizes.
+
+ Like bigmemtest, but without the size scaling upward to fill available
+ memory.
+ """
def decorator(f):
def wrapper(self):
size = wrapper.size
@@ -1186,6 +1244,33 @@ def check_impl_detail(**guards):
return guards.get(platform.python_implementation().lower(), default)
+def no_tracing(func):
+ """Decorator to temporarily turn off tracing for the duration of a test."""
+ if not hasattr(sys, 'gettrace'):
+ return func
+ else:
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ original_trace = sys.gettrace()
+ try:
+ sys.settrace(None)
+ return func(*args, **kwargs)
+ finally:
+ sys.settrace(original_trace)
+ return wrapper
+
+
+def refcount_test(test):
+ """Decorator for tests which involve reference counting.
+
+ To start, the decorator does not run the test if is not run by CPython.
+ After that, any trace function is unset during the test to prevent
+ unexpected refcounts caused by the trace function.
+
+ """
+ return no_tracing(cpython_only(test))
+
+
def _filter_suite(suite, pred):
"""Recursively filter test cases in a suite based on a predicate."""
newtests = []
@@ -1198,7 +1283,6 @@ def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests
-
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
if verbose:
@@ -1425,7 +1509,7 @@ def strip_python_stderr(stderr):
def args_from_interpreter_flags():
"""Return a list of command-line arguments reproducing the current
- settings in sys.flags."""
+ settings in sys.flags and sys.warnoptions."""
flag_opt_map = {
'bytes_warning': 'b',
'dont_write_bytecode': 'B',
@@ -1440,6 +1524,8 @@ def args_from_interpreter_flags():
v = getattr(sys.flags, flag)
if v > 0:
args.append('-' + opt * v)
+ for opt in sys.warnoptions:
+ args.append('-W' + opt)
return args
#============================================================