import sys import unittest import io import atexit import os from test import support from test.support import script_helper ### helpers def h1(): print("h1") def h2(): print("h2") def h3(): print("h3") def h4(*args, **kwargs): print("h4", args, kwargs) def raise1(): raise TypeError def raise2(): raise SystemError def exit(): raise SystemExit class GeneralTest(unittest.TestCase): def setUp(self): self.save_stdout = sys.stdout self.save_stderr = sys.stderr self.stream = io.StringIO() sys.stdout = sys.stderr = self.stream atexit._clear() def tearDown(self): sys.stdout = self.save_stdout sys.stderr = self.save_stderr atexit._clear() def test_args(self): # be sure args are handled properly atexit.register(h1) atexit.register(h4) atexit.register(h4, 4, kw="abc") atexit._run_exitfuncs() self.assertEqual(self.stream.getvalue(), "h4 (4,) {'kw': 'abc'}\nh4 () {}\nh1\n") def test_badargs(self): atexit.register(lambda: 1, 0, 0, (x for x in (1,2)), 0, 0) self.assertRaises(TypeError, atexit._run_exitfuncs) def test_order(self): # be sure handlers are executed in reverse order atexit.register(h1) atexit.register(h2) atexit.register(h3) atexit._run_exitfuncs() self.assertEqual(self.stream.getvalue(), "h3\nh2\nh1\n") def test_raise(self): # be sure raises are handled properly atexit.register(raise1) atexit.register(raise2) self.assertRaises(TypeError, atexit._run_exitfuncs) def test_raise_unnormalized(self): # Issue #10756: Make sure that an unnormalized exception is # handled properly atexit.register(lambda: 1 / 0) self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs) self.assertIn("ZeroDivisionError", self.stream.getvalue()) def test_exit(self): # be sure a SystemExit is handled properly atexit.register(exit) self.assertRaises(SystemExit, atexit._run_exitfuncs) self.assertEqual(self.stream.getvalue(), '') def test_print_tracebacks(self): # Issue #18776: the tracebacks should be printed when errors occur. def f(): 1/0 # one def g(): 1/0 # two def h(): 1/0 # three atexit.register(f) atexit.register(g) atexit.register(h) self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs) stderr = self.stream.getvalue() self.assertEqual(stderr.count("ZeroDivisionError"), 3) self.assertIn("# one", stderr) self.assertIn("# two", stderr) self.assertIn("# three", stderr) def test_stress(self): a = [0] def inc(): a[0] += 1 for i in range(128): atexit.register(inc) atexit._run_exitfuncs() self.assertEqual(a[0], 128) def test_clear(self): a = [0] def inc(): a[0] += 1 atexit.register(inc) atexit._clear() atexit._run_exitfuncs() self.assertEqual(a[0], 0) def test_unregister(self): a = [0] def inc(): a[0] += 1 def dec(): a[0] -= 1 for i in range(4): atexit.register(inc) atexit.register(dec) atexit.unregister(inc) atexit._run_exitfuncs() self.assertEqual(a[0], -1) def test_bound_methods(self): l = [] atexit.register(l.append, 5) atexit._run_exitfuncs() self.assertEqual(l, [5]) atexit.unregister(l.append) atexit._run_exitfuncs() self.assertEqual(l, [5]) def test_shutdown(self): # Actually test the shutdown mechanism in a subprocess code = """if 1: import atexit def f(msg): print(msg) atexit.register(f, "one") atexit.register(f, "two") """ res = script_helper.assert_python_ok("-c", code) self.assertEqual(res.out.decode().splitlines(), ["two", "one"]) self.assertFalse(res.err) @support.cpython_only class SubinterpreterTest(unittest.TestCase): def test_callbacks_leak(self): # This test shows a leak in refleak mode if atexit doesn't # take care to free callbacks in its per-subinterpreter module # state. n = atexit._ncallbacks() code = r"""if 1: import atexit def f(): pass atexit.register(f) del atexit """ ret = support.run_in_subinterp(code) self.assertEqual(ret, 0) self.assertEqual(atexit._ncallbacks(), n) def test_callbacks_leak_refcycle(self): # Similar to the above, but with a refcycle through the atexit # module. n = atexit._ncallbacks() code = r"""if 1: import atexit def f(): pass atexit.register(f) atexit.__atexit = atexit """ ret = support.run_in_subinterp(code) self.assertEqual(ret, 0) self.assertEqual(atexit._ncallbacks(), n) def test_callback_on_subinterpreter_teardown(self): # This tests if a callback is called on # subinterpreter teardown. expected = b"The test has passed!" r, w = os.pipe() code = r"""if 1: import os import atexit def callback(): os.write({:d}, b"The test has passed!") atexit.register(callback) """.format(w) ret = support.run_in_subinterp(code) os.close(w) self.assertEqual(os.read(r, len(expected)), expected) os.close(r) if __name__ == "__main__": unittest.main()