# Run the _testcapi module tests (tests for the Python/C API): by defn, # these are all functions _testcapi exports whose name begins with 'test_'. import os import pickle import random import re import subprocess import sys import sysconfig import textwrap import time import unittest from test import support from test.support import MISSING_C_DOCSTRINGS from test.support.script_helper import assert_python_failure, assert_python_ok try: import _posixsubprocess except ImportError: _posixsubprocess = None try: import threading except ImportError: threading = None # Skip this test if the _testcapi module isn't available. _testcapi = support.import_module('_testcapi') # Were we compiled --with-pydebug or with #define Py_DEBUG? Py_DEBUG = hasattr(sys, 'gettotalrefcount') def testfunction(self): """some doc""" return self class InstanceMethod: id = _testcapi.instancemethod(id) testfunction = _testcapi.instancemethod(testfunction) class CAPITest(unittest.TestCase): def test_instancemethod(self): inst = InstanceMethod() self.assertEqual(id(inst), inst.id()) self.assertTrue(inst.testfunction() is inst) self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) InstanceMethod.testfunction.attribute = "test" self.assertEqual(testfunction.attribute, "test") self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") @unittest.skipUnless(threading, 'Threading required for this test.') def test_no_FatalError_infinite_loop(self): with support.SuppressCrashReport(): p = subprocess.Popen([sys.executable, "-c", 'import _testcapi;' '_testcapi.crash_no_current_thread()'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) (out, err) = p.communicate() self.assertEqual(out, b'') # This used to cause an infinite loop. self.assertTrue(err.rstrip().startswith( b'Fatal Python error:' b' PyThreadState_Get: no current thread')) def test_memoryview_from_NULL_pointer(self): self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) def test_exc_info(self): raised_exception = ValueError("5") new_exc = TypeError("TEST") try: raise raised_exception except ValueError as e: tb = e.__traceback__ orig_sys_exc_info = sys.exc_info() orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) new_sys_exc_info = sys.exc_info() new_exc_info = _testcapi.set_exc_info(*orig_exc_info) reset_sys_exc_info = sys.exc_info() self.assertEqual(orig_exc_info[1], e) self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) self.assertSequenceEqual(new_sys_exc_info, new_exc_info) else: self.assertTrue(False) @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') def test_seq_bytes_to_charp_array(self): # Issue #15732: crash in _PySequence_BytesToCharpArray() class Z(object): def __len__(self): return 1 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) # Issue #15736: overflow in _PySequence_BytesToCharpArray() class Z(object): def __len__(self): return sys.maxsize def __getitem__(self, i): return b'x' self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') def test_subprocess_fork_exec(self): class Z(object): def __len__(self): return 1 # Issue #15738: crash in subprocess_fork_exec() self.assertRaises(TypeError, _posixsubprocess.fork_exec, Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) @unittest.skipIf(MISSING_C_DOCSTRINGS, "Signature information for builtins requires docstrings") def test_docstring_signature_parsing(self): self.assertEqual(_testcapi.no_docstring.__doc__, None) self.assertEqual(_testcapi.no_docstring.__text_signature__, None) self.assertEqual(_testcapi.docstring_empty.__doc__, None) self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) self.assertEqual(_testcapi.docstring_no_signature.__doc__, "This docstring has no signature.") self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, "docstring_with_invalid_signature($module, /, boo)\n" "\n" "This docstring has an invalid signature." ) self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, "docstring_with_invalid_signature2($module, /, boo)\n" "\n" "--\n" "\n" "This docstring also has an invalid signature." ) self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) self.assertEqual(_testcapi.docstring_with_signature.__doc__, "This docstring has a valid signature.") self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, "($module, /, sig)") self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, "\nThis docstring has a valid signature and some extra newlines.") self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, "($module, /, parameter)") def test_c_type_with_matrix_multiplication(self): M = _testcapi.matmulType m1 = M() m2 = M() self.assertEqual(m1 @ m2, ("matmul", m1, m2)) self.assertEqual(m1 @ 42, ("matmul", m1, 42)) self.assertEqual(42 @ m1, ("matmul", 42, m1)) o = m1 o @= m2 self.assertEqual(o, ("imatmul", m1, m2)) o = m1 o @= 42 self.assertEqual(o, ("imatmul", m1, 42)) o = 42 o @= m1 self.assertEqual(o, ("matmul", 42, m1)) def test_return_null_without_error(self): # Issue #23571: A function must not return NULL without setting an # error if Py_DEBUG: code = textwrap.dedent(""" import _testcapi from test import support with support.SuppressCrashReport(): _testcapi.return_null_without_error() """) rc, out, err = assert_python_failure('-c', code) self.assertRegex(err.replace(b'\r', b''), br'Fatal Python error: a function returned NULL ' br'without setting an error\n' br'SystemError: returned NULL ' br'without setting an error\n' br'\n' br'Current thread.*:\n' br' File .*", line 6 in ') else: with self.assertRaises(SystemError) as cm: _testcapi.return_null_without_error() self.assertRegex(str(cm.exception), 'return_null_without_error.* ' 'returned NULL without setting an error') def test_return_result_with_error(self): # Issue #23571: A function must not return a result with an error set if Py_DEBUG: code = textwrap.dedent(""" import _testcapi from test import support with support.SuppressCrashReport(): _testcapi.return_result_with_error() """) rc, out, err = assert_python_failure('-c', code) self.assertRegex(err.replace(b'\r', b''), br'Fatal Python error: a function returned a ' br'result with an error set\n' br'ValueError\n' br'\n' br'The above exception was the direct cause ' br'of the following exception:\n' br'\n' br'SystemError: ' br'returned a result with an error set\n' br'\n' br'Current thread.*:\n' br' File .*, line 6 in ') else: with self.assertRaises(SystemError) as cm: _testcapi.return_result_with_error() self.assertRegex(str(cm.exception), 'return_result_with_error.* ' 'returned a result with an error set') def test_buildvalue_N(self): _testcapi.test_buildvalue_N() def test_set_nomemory(self): code = """if 1: import _testcapi class C(): pass # The first loop tests both functions and that remove_mem_hooks() # can be called twice in a row. The second loop checks a call to # set_nomemory() after a call to remove_mem_hooks(). The third # loop checks the start and stop arguments of set_nomemory(). for outer_cnt in range(1, 4): start = 10 * outer_cnt for j in range(100): if j == 0: if outer_cnt != 3: _testcapi.set_nomemory(start) else: _testcapi.set_nomemory(start, start + 1) try: C() except MemoryError as e: if outer_cnt != 3: _testcapi.remove_mem_hooks() print('MemoryError', outer_cnt, j) _testcapi.remove_mem_hooks() break """ rc, out, err = assert_python_ok('-c', code) self.assertIn(b'MemoryError 1 10', out) self.assertIn(b'MemoryError 2 20', out) self.assertIn(b'MemoryError 3 30', out) @unittest.skipUnless(threading, 'Threading required for this test.') class TestPendingCalls(unittest.TestCase): def pendingcalls_submit(self, l, n): def callback(): #this function can be interrupted by thread switching so let's #use an atomic operation l.append(None) for i in range(n): time.sleep(random.random()*0.02) #0.01 secs on average #try submitting callback until successful. #rely on regular interrupt to flush queue if we are #unsuccessful. while True: if _testcapi._pending_threadfunc(callback): break; def pendingcalls_wait(self, l, n, context = None): #now, stick around until l[0] has grown to 10 count = 0; while len(l) != n: #this busy loop is where we expect to be interrupted to #run our callbacks. Note that callbacks are only run on the #main thread if False and support.verbose: print("(%i)"%(len(l),),) for i in range(1000): a = i*i if context and not context.event.is_set(): continue count += 1 self.assertTrue(count < 10000, "timeout waiting for %i callbacks, got %i"%(n, len(l))) if False and support.verbose: print("(%i)"%(len(l),)) def test_pendingcalls_threaded(self): #do every callback on a separate thread n = 32 #total callbacks threads = [] class foo(object):pass context = foo() context.l = [] context.n = 2 #submits per thread context.nThreads = n // context.n context.nFinished = 0 context.lock = threading.Lock() context.event = threading.Event() threads = [threading.Thread(target=self.pendingcalls_thread, args=(context,)) for i in range(context.nThreads)] with support.start_threads(threads): self.pendingcalls_wait(context.l, n, context) def pendingcalls_thread(self, context): try: self.pendingcalls_submit(context.l, context.n) finally: with context.lock: context.nFinished += 1 nFinished = context.nFinished if False and support.verbose: print("finished threads: ", nFinished) if nFinished == context.nThreads: context.event.set() def test_pendingcalls_non_threaded(self): #again, just using the main thread, likely they will all be dispatched at #once. It is ok to ask for too many, because we loop until we find a slot. #the loop can be interrupted to dispatch. #there are only 32 dispatch slots, so we go for twice that! l = [] n = 64 self.pendingcalls_submit(l, n) self.pendingcalls_wait(l, n) class SubinterpreterTest(unittest.TestCase): def test_subinterps(self): import builtins r, w = os.pipe() code = """if 1: import sys, builtins, pickle with open({:d}, "wb") as f: pickle.dump(id(sys.modules), f) pickle.dump(id(builtins), f) """.format(w) with open(r, "rb") as f: ret = support.run_in_subinterp(code) self.assertEqual(ret, 0) self.assertNotEqual(pickle.load(f), id(sys.modules)) self.assertNotEqual(pickle.load(f), id(builtins)) class EmbeddingTests(unittest.TestCase): def setUp(self): here = os.path.abspath(__file__) basepath = os.path.dirname(os.path.dirname(os.path.dirname(here))) exename = "_testembed" if sys.platform.startswith("win"): ext = ("_d" if "_d" in sys.executable else "") + ".exe" exename += ext exepath = os.path.dirname(sys.executable) else: exepath = os.path.join(basepath, "Programs") self.test_exe = exe = os.path.join(exepath, exename) if not os.path.exists(exe): self.skipTest("%r doesn't exist" % exe) # This is needed otherwise we get a fatal error: # "Py_Initialize: Unable to get the locale encoding # LookupError: no codec search functions registered: can't find encoding" self.oldcwd = os.getcwd() os.chdir(basepath) def tearDown(self): os.chdir(self.oldcwd) def run_embedded_interpreter(self, *args, env=None): """Runs a test in the embedded interpreter""" cmd = [self.test_exe] cmd.extend(args) if env is not None and sys.platform == 'win32': # Windows requires at least the SYSTEMROOT environment variable to # start Python. env = env.copy() env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, env=env) (out, err) = p.communicate() self.assertEqual(p.returncode, 0, "bad returncode %d, stderr is %r" % (p.returncode, err)) return out, err def test_repeated_init_and_subinterpreters(self): # This is just a "don't crash" test out, err = self.run_embedded_interpreter('repeated_init_and_subinterpreters') if support.verbose: print() print(out) print(err) def test_forced_io_encoding(self): # Checks forced configuration of embedded interpreter IO streams env = dict(os.environ, PYTHONIOENCODING="utf-8:surrogateescape") out, err = self.run_embedded_interpreter("forced_io_encoding", env=env) if support.verbose: print() print(out) print(err) expected_stream_encoding = "utf-8" expected_errors = "surrogateescape" expected_output = '\n'.join([ "--- Use defaults ---", "Expected encoding: default", "Expected errors: default", "stdin: {in_encoding}:{errors}", "stdout: {out_encoding}:{errors}", "stderr: {out_encoding}:backslashreplace", "--- Set errors only ---", "Expected encoding: default", "Expected errors: ignore", "stdin: {in_encoding}:ignore", "stdout: {out_encoding}:ignore", "stderr: {out_encoding}:backslashreplace", "--- Set encoding only ---", "Expected encoding: latin-1", "Expected errors: default", "stdin: latin-1:{errors}", "stdout: latin-1:{errors}", "stderr: latin-1:backslashreplace", "--- Set encoding and errors ---", "Expected encoding: latin-1", "Expected errors: replace", "stdin: latin-1:replace", "stdout: latin-1:replace", "stderr: latin-1:backslashreplace"]) expected_output = expected_output.format( in_encoding=expected_stream_encoding, out_encoding=expected_stream_encoding, errors=expected_errors) # This is useful if we ever trip over odd platform behaviour self.maxDiff = None self.assertEqual(out.strip(), expected_output) def test_pre_initialization_api(self): """ Checks the few parts of the C-API that work before the runtine is initialized (via Py_Initialize()). """ env = dict(os.environ, PYTHONPATH=os.pathsep.join(sys.path)) out, err = self.run_embedded_interpreter("pre_initialization_api", env=env) self.assertEqual(out, '') self.assertEqual(err, '') @unittest.skipUnless(threading, 'Threading required for this test.') class TestThreadState(unittest.TestCase): @support.reap_threads def test_thread_state(self): # some extra thread-state tests driven via _testcapi def target(): idents = [] def callback(): idents.append(threading.get_ident()) _testcapi._test_thread_state(callback) a = b = callback time.sleep(1) # Check our main thread is in the list exactly 3 times. self.assertEqual(idents.count(threading.get_ident()), 3, "Couldn't find main thread correctly in the list") target() t = threading.Thread(target=target) t.start() t.join() class Test_testcapi(unittest.TestCase): locals().update((name, getattr(_testcapi, name)) for name in dir(_testcapi) if name.startswith('test_') and not name.endswith('_code')) class PyMemDebugTests(unittest.TestCase): PYTHONMALLOC = 'debug' # '0x04c06e0' or '04C06E0' PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' def check(self, code): with support.SuppressCrashReport(): out = assert_python_failure('-c', code, PYTHONMALLOC=self.PYTHONMALLOC) stderr = out.err return stderr.decode('ascii', 'replace') def test_buffer_overflow(self): out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') regex = (r"Debug memory block at address p={ptr}: API 'm'\n" r" 16 bytes originally requested\n" r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" r" at tail\+0: 0x78 \*\*\* OUCH\n" r" at tail\+1: 0xfb\n" r" at tail\+2: 0xfb\n" r" .*\n" r" The block was made by call #[0-9]+ to debug malloc/realloc.\n" r" Data at p: cb cb cb .*\n" r"\n" r"Enable tracemalloc to get the memory block allocation traceback\n" r"\n" r"Fatal Python error: bad trailing pad byte") regex = regex.format(ptr=self.PTR_REGEX) regex = re.compile(regex, flags=re.DOTALL) self.assertRegex(out, regex) def test_api_misuse(self): out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') regex = (r"Debug memory block at address p={ptr}: API 'm'\n" r" 16 bytes originally requested\n" r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" r" The block was made by call #[0-9]+ to debug malloc/realloc.\n" r" Data at p: cb cb cb .*\n" r"\n" r"Enable tracemalloc to get the memory block allocation traceback\n" r"\n" r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n") regex = regex.format(ptr=self.PTR_REGEX) self.assertRegex(out, regex) @unittest.skipUnless(threading, 'Test requires a GIL (multithreading)') def check_malloc_without_gil(self, code): out = self.check(code) expected = ('Fatal Python error: Python memory allocator called ' 'without holding the GIL') self.assertIn(expected, out) def test_pymem_malloc_without_gil(self): # Debug hooks must raise an error if PyMem_Malloc() is called # without holding the GIL code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' self.check_malloc_without_gil(code) def test_pyobject_malloc_without_gil(self): # Debug hooks must raise an error if PyObject_Malloc() is called # without holding the GIL code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' self.check_malloc_without_gil(code) class PyMemMallocDebugTests(PyMemDebugTests): PYTHONMALLOC = 'malloc_debug' @unittest.skipUnless(sysconfig.get_config_var('WITH_PYMALLOC') == 1, 'need pymalloc') class PyMemPymallocDebugTests(PyMemDebugTests): PYTHONMALLOC = 'pymalloc_debug' @unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') class PyMemDefaultTests(PyMemDebugTests): # test default allocator of Python compiled in debug mode PYTHONMALLOC = '' if __name__ == "__main__": unittest.main()