summaryrefslogtreecommitdiff
path: root/Lib/test/support/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/support/__init__.py')
-rw-r--r--Lib/test/support/__init__.py206
1 files changed, 1 insertions, 205 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 8dee5b9dcc..e894545f87 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -19,8 +19,6 @@ import struct
import subprocess
import sys
import sysconfig
-import _thread
-import threading
import time
import types
import unittest
@@ -62,8 +60,6 @@ __all__ = [
"open_urlresource",
# processes
'temp_umask', "reap_children",
- # threads
- "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
# miscellaneous
"check_warnings", "check_no_resource_warning", "check_no_warnings",
"EnvironmentVarGuard",
@@ -1991,120 +1987,14 @@ def modules_cleanup(oldmodules):
# Implicitly imported *real* modules should be left alone (see issue 10556).
sys.modules.update(oldmodules)
-#=======================================================================
-# Threading support to prevent reporting refleaks when running regrtest.py -R
-
# Flag used by saved_test_environment of test.libregrtest.save_env,
# to check if a test modified the environment. The flag should be set to False
# before running a new test.
#
-# For example, threading_cleanup() sets the flag is the function fails
+# For example, threading_helper.threading_cleanup() sets the flag is the function fails
# to cleanup threads.
environment_altered = False
-# NOTE: we use thread._count() rather than threading.enumerate() (or the
-# moral equivalent thereof) because a threading.Thread object is still alive
-# until its __bootstrap() method has returned, even after it has been
-# unregistered from the threading module.
-# thread._count(), on the other hand, only gets decremented *after* the
-# __bootstrap() method has returned, which gives us reliable reference counts
-# at the end of a test run.
-
-def threading_setup():
- return _thread._count(), threading._dangling.copy()
-
-def threading_cleanup(*original_values):
- global environment_altered
-
- _MAX_COUNT = 100
-
- for count in range(_MAX_COUNT):
- values = _thread._count(), threading._dangling
- if values == original_values:
- break
-
- if not count:
- # Display a warning at the first iteration
- environment_altered = True
- dangling_threads = values[1]
- print_warning(f"threading_cleanup() failed to cleanup "
- f"{values[0] - original_values[0]} threads "
- f"(count: {values[0]}, "
- f"dangling: {len(dangling_threads)})")
- for thread in dangling_threads:
- print_warning(f"Dangling thread: {thread!r}")
-
- # Don't hold references to threads
- dangling_threads = None
- values = None
-
- time.sleep(0.01)
- gc_collect()
-
-
-def reap_threads(func):
- """Use this function when threads are being used. This will
- ensure that the threads are cleaned up even when the test fails.
- """
- @functools.wraps(func)
- def decorator(*args):
- key = threading_setup()
- try:
- return func(*args)
- finally:
- threading_cleanup(*key)
- return decorator
-
-
-@contextlib.contextmanager
-def wait_threads_exit(timeout=None):
- """
- bpo-31234: Context manager to wait until all threads created in the with
- statement exit.
-
- Use _thread.count() to check if threads exited. Indirectly, wait until
- threads exit the internal t_bootstrap() C function of the _thread module.
-
- threading_setup() and threading_cleanup() are designed to emit a warning
- if a test leaves running threads in the background. This context manager
- is designed to cleanup threads started by the _thread.start_new_thread()
- which doesn't allow to wait for thread exit, whereas thread.Thread has a
- join() method.
- """
- if timeout is None:
- timeout = SHORT_TIMEOUT
- old_count = _thread._count()
- try:
- yield
- finally:
- start_time = time.monotonic()
- deadline = start_time + timeout
- while True:
- count = _thread._count()
- if count <= old_count:
- break
- if time.monotonic() > deadline:
- dt = time.monotonic() - start_time
- msg = (f"wait_threads() failed to cleanup {count - old_count} "
- f"threads after {dt:.1f} seconds "
- f"(count: {count}, old count: {old_count})")
- raise AssertionError(msg)
- time.sleep(0.010)
- gc_collect()
-
-
-def join_thread(thread, timeout=None):
- """Join a thread. Raise an AssertionError if the thread is still alive
- after timeout seconds.
- """
- if timeout is None:
- timeout = SHORT_TIMEOUT
- thread.join(timeout)
- if thread.is_alive():
- msg = f"failed to join the thread in {timeout:.1f} seconds"
- raise AssertionError(msg)
-
-
def reap_children():
"""Use this function at the end of test_main() whenever sub-processes
are started. This will help ensure that no extra children (zombies)
@@ -2134,43 +2024,6 @@ def reap_children():
@contextlib.contextmanager
-def start_threads(threads, unlock=None):
- import faulthandler
- threads = list(threads)
- started = []
- try:
- try:
- for t in threads:
- t.start()
- started.append(t)
- except:
- if verbose:
- print("Can't start %d threads, only %d threads started" %
- (len(threads), len(started)))
- raise
- yield
- finally:
- try:
- if unlock:
- unlock()
- endtime = starttime = time.monotonic()
- for timeout in range(1, 16):
- endtime += 60
- for t in started:
- t.join(max(endtime - time.monotonic(), 0.01))
- started = [t for t in started if t.is_alive()]
- if not started:
- break
- if verbose:
- print('Unable to join %d threads during a period of '
- '%d minutes' % (len(started), timeout))
- finally:
- started = [t for t in started if t.is_alive()]
- if started:
- faulthandler.dump_traceback(sys.stdout)
- raise AssertionError('Unable to join %d threads' % len(started))
-
-@contextlib.contextmanager
def swap_attr(obj, attr, new_val):
"""Temporary swap out an attribute with a new object.
@@ -3023,63 +2876,6 @@ class catch_unraisable_exception:
del self.unraisable
-class catch_threading_exception:
- """
- Context manager catching threading.Thread exception using
- threading.excepthook.
-
- Attributes set when an exception is catched:
-
- * exc_type
- * exc_value
- * exc_traceback
- * thread
-
- See threading.excepthook() documentation for these attributes.
-
- These attributes are deleted at the context manager exit.
-
- Usage:
-
- with support.catch_threading_exception() as cm:
- # code spawning a thread which raises an exception
- ...
-
- # check the thread exception, use cm attributes:
- # exc_type, exc_value, exc_traceback, thread
- ...
-
- # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
- # exists at this point
- # (to avoid reference cycles)
- """
-
- def __init__(self):
- self.exc_type = None
- self.exc_value = None
- self.exc_traceback = None
- self.thread = None
- self._old_hook = None
-
- def _hook(self, args):
- self.exc_type = args.exc_type
- self.exc_value = args.exc_value
- self.exc_traceback = args.exc_traceback
- self.thread = args.thread
-
- def __enter__(self):
- self._old_hook = threading.excepthook
- threading.excepthook = self._hook
- return self
-
- def __exit__(self, *exc_info):
- threading.excepthook = self._old_hook
- del self.exc_type
- del self.exc_value
- del self.exc_traceback
- del self.thread
-
-
def wait_process(pid, *, exitcode, timeout=None):
"""
Wait until process pid completes and check that the process exit code is