diff options
| author | Gregory P. Smith <greg@mad-scientist.com> | 2011-01-04 00:51:50 +0000 | 
|---|---|---|
| committer | Gregory P. Smith <greg@mad-scientist.com> | 2011-01-04 00:51:50 +0000 | 
| commit | 4b129d23f66f88fb91aa95d163ef55a47095ae0d (patch) | |
| tree | be89afd4f61b4dd0ca4237db17549891699ead7b | |
| parent | c3a4787ccbc0f91827120c0a519747305379e4d4 (diff) | |
| download | cpython-git-4b129d23f66f88fb91aa95d163ef55a47095ae0d.tar.gz | |
Merged revisions 87710 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k
........
  r87710 | gregory.p.smith | 2011-01-03 13:06:12 -0800 (Mon, 03 Jan 2011) | 4 lines
  issue6643 - Two locks held within the threading module on each thread instance
  needed to be reinitialized after fork().  Adds tests to confirm that they are
  and that a potential deadlock and crasher bug are fixed (platform dependant).
........
| -rw-r--r-- | Lib/test/test_threading.py | 152 | ||||
| -rw-r--r-- | Lib/threading.py | 4 | ||||
| -rw-r--r-- | Misc/NEWS | 3 | 
3 files changed, 155 insertions, 4 deletions
| diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 587004adc6..2bf84374a7 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2,6 +2,7 @@  import test.support  from test.support import verbose +import os  import random  import re  import sys @@ -10,6 +11,7 @@ import _thread  import time  import unittest  import weakref +import subprocess  from test import lock_tests @@ -247,7 +249,6 @@ class ThreadTests(unittest.TestCase):                  print("test_finalize_with_runnning_thread can't import ctypes")              return  # can't do anything -        import subprocess          rc = subprocess.call([sys.executable, "-c", """if 1:              import ctypes, sys, time, _thread @@ -278,7 +279,6 @@ class ThreadTests(unittest.TestCase):      def test_finalize_with_trace(self):          # Issue1733757          # Avoid a deadlock when sys.settrace steps into threading._shutdown -        import subprocess          p = subprocess.Popen([sys.executable, "-c", """if 1:              import sys, threading @@ -311,7 +311,6 @@ class ThreadTests(unittest.TestCase):      def test_join_nondaemon_on_shutdown(self):          # Issue 1722344          # Raising SystemExit skipped threading._shutdown -        import subprocess          p = subprocess.Popen([sys.executable, "-c", """if 1:                  import threading                  from time import sleep @@ -412,7 +411,6 @@ class ThreadJoinOnShutdown(unittest.TestCase):                  sys.stdout.flush()          \n""" + script -        import subprocess          p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)          rc = p.wait()          data = p.stdout.read().decode().replace('\r', '') @@ -484,6 +482,152 @@ class ThreadJoinOnShutdown(unittest.TestCase):              """          self._run_and_join(script) +    def assertScriptHasOutput(self, script, expected_output): +        p = subprocess.Popen([sys.executable, "-c", script], +                             stdout=subprocess.PIPE) +        rc = p.wait() +        data = p.stdout.read().decode().replace('\r', '') +        self.assertEqual(rc, 0, "Unexpected error") +        self.assertEqual(data, expected_output) + +    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") +    def test_4_joining_across_fork_in_worker_thread(self): +        # There used to be a possible deadlock when forking from a child +        # thread.  See http://bugs.python.org/issue6643. + +        # Skip platforms with known problems forking from a worker thread. +        # See http://bugs.python.org/issue3863. +        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): +            raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) + +        # The script takes the following steps: +        # - The main thread in the parent process starts a new thread and then +        #   tries to join it. +        # - The join operation acquires the Lock inside the thread's _block +        #   Condition.  (See threading.py:Thread.join().) +        # - We stub out the acquire method on the condition to force it to wait +        #   until the child thread forks.  (See LOCK ACQUIRED HERE) +        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS +        #   HERE) +        # - The main thread of the parent process enters Condition.wait(), +        #   which releases the lock on the child thread. +        # - The child process returns.  Without the necessary fix, when the +        #   main thread of the child process (which used to be the child thread +        #   in the parent process) attempts to exit, it will try to acquire the +        #   lock in the Thread._block Condition object and hang, because the +        #   lock was held across the fork. + +        script = """if 1: +            import os, time, threading + +            finish_join = False +            start_fork = False + +            def worker(): +                # Wait until this thread's lock is acquired before forking to +                # create the deadlock. +                global finish_join +                while not start_fork: +                    time.sleep(0.01) +                # LOCK HELD: Main thread holds lock across this call. +                childpid = os.fork() +                finish_join = True +                if childpid != 0: +                    # Parent process just waits for child. +                    os.waitpid(childpid, 0) +                # Child process should just return. + +            w = threading.Thread(target=worker) + +            # Stub out the private condition variable's lock acquire method. +            # This acquires the lock and then waits until the child has forked +            # before returning, which will release the lock soon after.  If +            # someone else tries to fix this test case by acquiring this lock +            # before forking instead of reseting it, the test case will +            # deadlock when it shouldn't. +            condition = w._block +            orig_acquire = condition.acquire +            call_count_lock = threading.Lock() +            call_count = 0 +            def my_acquire(): +                global call_count +                global start_fork +                orig_acquire()  # LOCK ACQUIRED HERE +                start_fork = True +                if call_count == 0: +                    while not finish_join: +                        time.sleep(0.01)  # WORKER THREAD FORKS HERE +                with call_count_lock: +                    call_count += 1 +            condition.acquire = my_acquire + +            w.start() +            w.join() +            print('end of main') +            """ +        self.assertScriptHasOutput(script, "end of main\n") + +    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") +    def test_5_clear_waiter_locks_to_avoid_crash(self): +        # Check that a spawned thread that forks doesn't segfault on certain +        # platforms, namely OS X.  This used to happen if there was a waiter +        # lock in the thread's condition variable's waiters list.  Even though +        # we know the lock will be held across the fork, it is not safe to +        # release locks held across forks on all platforms, so releasing the +        # waiter lock caused a segfault on OS X.  Furthermore, since locks on +        # OS X are (as of this writing) implemented with a mutex + condition +        # variable instead of a semaphore, while we know that the Python-level +        # lock will be acquired, we can't know if the internal mutex will be +        # acquired at the time of the fork. + +        # Skip platforms with known problems forking from a worker thread. +        # See http://bugs.python.org/issue3863. +        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): +            raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) +        script = """if True: +            import os, time, threading + +            start_fork = False + +            def worker(): +                # Wait until the main thread has attempted to join this thread +                # before continuing. +                while not start_fork: +                    time.sleep(0.01) +                childpid = os.fork() +                if childpid != 0: +                    # Parent process just waits for child. +                    (cpid, rc) = os.waitpid(childpid, 0) +                    assert cpid == childpid +                    assert rc == 0 +                    print('end of worker thread') +                else: +                    # Child process should just return. +                    pass + +            w = threading.Thread(target=worker) + +            # Stub out the private condition variable's _release_save method. +            # This releases the condition's lock and flips the global that +            # causes the worker to fork.  At this point, the problematic waiter +            # lock has been acquired once by the waiter and has been put onto +            # the waiters list. +            condition = w._block +            orig_release_save = condition._release_save +            def my_release_save(): +                global start_fork +                orig_release_save() +                # Waiter lock held here, condition lock released. +                start_fork = True +            condition._release_save = my_release_save + +            w.start() +            w.join() +            print('end of main thread') +            """ +        output = "end of worker thread\nend of main thread\n" +        self.assertScriptHasOutput(script, output) +  class ThreadingExceptionTests(unittest.TestCase):      # A RuntimeError should be raised if Thread.start() is called diff --git a/Lib/threading.py b/Lib/threading.py index 24808d43b8..76c729beec 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -856,6 +856,10 @@ def _after_fork():                  # its new value since it can have changed.                  ident = _get_ident()                  thread._ident = ident +                # Any condition variables hanging off of the active thread may +                # be in an invalid state, so we reinitialize them. +                thread._block.__init__() +                thread._started._cond.__init__()                  new_active[ident] = thread              else:                  # All the others are already stopped. @@ -27,6 +27,9 @@ Core and Builtins  Library  ------- +- Issue #6643: Reinitialize locks held within the threading module after fork +  to avoid a potential rare deadlock or crash on some platforms. +  - Issue #10806, issue #9905: Fix subprocess pipes when some of the standard    file descriptors (0, 1, 2) are closed in the parent process.  Initial    patch by Ross Lagerwall. | 
