diff options
author | Victor Stinner <vstinner@python.org> | 2022-04-19 16:27:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-19 16:27:00 +0200 |
commit | 061a8bf77c80036bed3ef4973fe0c99705c83fc6 (patch) | |
tree | 19c2418fd5877d4b3cc211e7f40eff5ad5397a08 | |
parent | 74070085da5322ac83c954f101f2caa150655be2 (diff) | |
download | cpython-git-061a8bf77c80036bed3ef4973fe0c99705c83fc6.tar.gz |
gh-91231: Add shutdown_timeout to multiprocessing BaseManager (#32112)
Add an optional keyword 'shutdown_timeout' parameter to the
multiprocessing.BaseManager constructor. Kill the process if
terminate() takes longer than the timeout.
Multiprocessing tests pass test.support.SHORT_TIMEOUT
to BaseManager.shutdown_timeout.
-rw-r--r-- | Doc/library/multiprocessing.rst | 16 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 16 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 34 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst | 3 |
4 files changed, 49 insertions, 20 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index ee40688781..83aa5cb87f 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -1676,7 +1676,7 @@ Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the :mod:`multiprocessing.managers` module: -.. class:: BaseManager([address[, authkey]]) +.. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0) Create a BaseManager object. @@ -1691,6 +1691,20 @@ their parent process exits. The manager classes are defined in the *authkey* is ``None`` then ``current_process().authkey`` is used. Otherwise *authkey* is used and it must be a byte string. + *serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or + ``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization). + + *ctx* is a context object, or ``None`` (use the current context). See the + :func:`get_context` function. + + *shutdown_timeout* is a timeout in seconds used to wait until the process + used by the manager completes in the :meth:`shutdown` method. If the + shutdown times out, the process is terminated. If terminating the process + also times out, the process is killed. + + .. versionchanged: 3.11 + Added the *shutdown_timeout* parameter. + .. method:: start([initializer[, initargs]]) Start a subprocess to start the manager. If *initializer* is not ``None`` diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index d97381926d..3f6479b7e3 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -497,7 +497,7 @@ class BaseManager(object): _Server = Server def __init__(self, address=None, authkey=None, serializer='pickle', - ctx=None): + ctx=None, *, shutdown_timeout=1.0): if authkey is None: authkey = process.current_process().authkey self._address = address # XXX not final address if eg ('', 0) @@ -507,6 +507,7 @@ class BaseManager(object): self._serializer = serializer self._Listener, self._Client = listener_client[serializer] self._ctx = ctx or get_context() + self._shutdown_timeout = shutdown_timeout def get_server(self): ''' @@ -570,8 +571,8 @@ class BaseManager(object): self._state.value = State.STARTED self.shutdown = util.Finalize( self, type(self)._finalize_manager, - args=(self._process, self._address, self._authkey, - self._state, self._Client), + args=(self._process, self._address, self._authkey, self._state, + self._Client, self._shutdown_timeout), exitpriority=0 ) @@ -656,7 +657,8 @@ class BaseManager(object): self.shutdown() @staticmethod - def _finalize_manager(process, address, authkey, state, _Client): + def _finalize_manager(process, address, authkey, state, _Client, + shutdown_timeout): ''' Shutdown the manager process; will be registered as a finalizer ''' @@ -671,15 +673,17 @@ class BaseManager(object): except Exception: pass - process.join(timeout=1.0) + process.join(timeout=shutdown_timeout) if process.is_alive(): util.info('manager still alive') if hasattr(process, 'terminate'): util.info('trying to `terminate()` manager process') process.terminate() - process.join(timeout=0.1) + process.join(timeout=shutdown_timeout) if process.is_alive(): util.info('manager still alive after terminate') + process.kill() + process.join() state.value = State.SHUTDOWN try: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index bb73d9e7cc..427fc0c47a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -119,6 +119,9 @@ if CHECK_TIMINGS: else: TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 +# BaseManager.shutdown_timeout +SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT + HAVE_GETVALUE = not getattr(_multiprocessing, 'HAVE_BROKEN_SEM_GETVALUE', False) @@ -2897,7 +2900,7 @@ class _TestMyManager(BaseTestCase): ALLOWED_TYPES = ('manager',) def test_mymanager(self): - manager = MyManager() + manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) manager.start() self.common(manager) manager.shutdown() @@ -2908,7 +2911,8 @@ class _TestMyManager(BaseTestCase): self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) def test_mymanager_context(self): - with MyManager() as manager: + manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) + with manager: self.common(manager) # bpo-30356: BaseManager._finalize_manager() sends SIGTERM # to the manager process if it takes longer than 1 second to stop, @@ -2916,7 +2920,7 @@ class _TestMyManager(BaseTestCase): self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) def test_mymanager_context_prestarted(self): - manager = MyManager() + manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) manager.start() with manager: self.common(manager) @@ -2978,8 +2982,8 @@ class _TestRemoteManager(BaseTestCase): @classmethod def _putter(cls, address, authkey): manager = QueueManager2( - address=address, authkey=authkey, serializer=SERIALIZER - ) + address=address, authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) manager.connect() queue = manager.get_queue() # Note that xmlrpclib will deserialize object as a list not a tuple @@ -2989,8 +2993,8 @@ class _TestRemoteManager(BaseTestCase): authkey = os.urandom(32) manager = QueueManager( - address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER - ) + address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) manager.start() self.addCleanup(manager.shutdown) @@ -2999,8 +3003,8 @@ class _TestRemoteManager(BaseTestCase): p.start() manager2 = QueueManager2( - address=manager.address, authkey=authkey, serializer=SERIALIZER - ) + address=manager.address, authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) manager2.connect() queue = manager2.get_queue() @@ -3020,7 +3024,8 @@ class _TestManagerRestart(BaseTestCase): @classmethod def _putter(cls, address, authkey): manager = QueueManager( - address=address, authkey=authkey, serializer=SERIALIZER) + address=address, authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) manager.connect() queue = manager.get_queue() queue.put('hello world') @@ -3028,7 +3033,8 @@ class _TestManagerRestart(BaseTestCase): def test_rapid_restart(self): authkey = os.urandom(32) manager = QueueManager( - address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER) + address=(socket_helper.HOST, 0), authkey=authkey, + serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT) try: srvr = manager.get_server() addr = srvr.address @@ -3048,7 +3054,8 @@ class _TestManagerRestart(BaseTestCase): manager.shutdown() manager = QueueManager( - address=addr, authkey=authkey, serializer=SERIALIZER) + address=addr, authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) try: manager.start() self.addCleanup(manager.shutdown) @@ -3059,7 +3066,8 @@ class _TestManagerRestart(BaseTestCase): # (sporadic failure on buildbots) time.sleep(1.0) manager = QueueManager( - address=addr, authkey=authkey, serializer=SERIALIZER) + address=addr, authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) if hasattr(manager, "shutdown"): self.addCleanup(manager.shutdown) diff --git a/Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst b/Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst new file mode 100644 index 0000000000..a61fd8b9e8 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst @@ -0,0 +1,3 @@ +Add an optional keyword *shutdown_timeout* parameter to the +:class:`multiprocessing.BaseManager` constructor. Kill the process if +terminate() takes longer than the timeout. Patch by Victor Stinner. |