diff options
author | Davin Potts <applio@users.noreply.github.com> | 2019-02-23 22:08:16 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-23 22:08:16 -0600 |
commit | e895de3e7f3cc2f7213b87621cfe9812ea4343f0 (patch) | |
tree | 5f282ce0e28bc6af3b78ab18f6ef18c665baf3b8 /Lib/multiprocessing/managers.py | |
parent | d610116a2e48b55788b62e11f2e6956af06b3de0 (diff) | |
download | cpython-git-e895de3e7f3cc2f7213b87621cfe9812ea4343f0.tar.gz |
bpo-35813: Tests and docs for shared_memory (#11816)
* Added tests for shared_memory submodule.
* Added tests for ShareableList.
* Fix bug in allocationn size during creation of empty ShareableList illuminated by existing test run on Linux.
* Initial set of docs for shared_memory module.
* Added docs for ShareableList, added doctree entry for shared_memory submodule, name refactoring for greater clarity.
* Added examples to SharedMemoryManager docs, for ease of documentation switched away from exclusively registered functions to some explicit methods on SharedMemoryManager.
* Wording tweaks to docs.
* Fix test failures on Windows.
* Added tests around SharedMemoryManager.
* Documentation tweaks.
* Fix inappropriate test on Windows.
* Further documentation tweaks.
* Fix bare exception.
* Removed __copyright__.
* Fixed typo in doc, removed comment.
* Updated SharedMemoryManager preliminary tests to reflect change of not supporting all registered functions on SyncManager.
* Added Sphinx doctest run controls.
* CloseHandle should be in a finally block in case MapViewOfFile fails.
* Missed opportunity to use with statement.
* Switch to self.addCleanup to spare long try/finally blocks and save one indentation, change to use decorator to skip test instead.
* Simplify the posixshmem extension module.
Provide shm_open() and shm_unlink() functions. Move other
functionality into the shared_memory.py module.
* Added to doc around size parameter of SharedMemory.
* Changed PosixSharedMemory.size to use os.fstat.
* Change SharedMemory.buf to a read-only property as well as NamedSharedMemory.size.
* Marked as provisional per PEP411 in docstring.
* Changed SharedMemoryTracker to be private.
* Removed registered Proxy Objects from SharedMemoryManager.
* Removed shareable_wrap().
* Removed shareable_wrap() and dangling references to it.
* For consistency added __reduce__ to key classes.
* Fix for potential race condition on Windows for O_CREX.
* Remove unused imports.
* Update access to kernel32 on Windows per feedback from eryksun.
* Moved kernel32 calls to _winapi.
* Removed ShareableList.copy as redundant.
* Changes to _winapi use from eryksun feedback.
* Adopt simpler SharedMemory API, collapsing PosixSharedMemory and WindowsNamedSharedMemory into one.
* Fix missing docstring on class, add test for ignoring size when attaching.
* Moved SharedMemoryManager to managers module, tweak to fragile test.
* Tweak to exception in OpenFileMapping suggested by eryksun.
* Mark a few dangling bits as private as suggested by Giampaolo.
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r-- | Lib/multiprocessing/managers.py | 151 |
1 files changed, 149 insertions, 2 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 4ae8ddc770..7973012b98 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1,5 +1,5 @@ # -# Module providing the `SyncManager` class for dealing +# Module providing manager classes for dealing # with shared objects # # multiprocessing/managers.py @@ -8,7 +8,8 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] +__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token', + 'SharedMemoryManager' ] # # Imports @@ -19,6 +20,7 @@ import threading import array import queue import time +from os import getpid from traceback import format_exc @@ -28,6 +30,11 @@ from . import pool from . import process from . import util from . import get_context +try: + from . import shared_memory + HAS_SHMEM = True +except ImportError: + HAS_SHMEM = False # # Register some things for pickling @@ -1200,3 +1207,143 @@ SyncManager.register('Namespace', Namespace, NamespaceProxy) # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False) + +# +# Definition of SharedMemoryManager and SharedMemoryServer +# + +if HAS_SHMEM: + class _SharedMemoryTracker: + "Manages one or more shared memory segments." + + def __init__(self, name, segment_names=[]): + self.shared_memory_context_name = name + self.segment_names = segment_names + + def register_segment(self, segment_name): + "Adds the supplied shared memory block name to tracker." + util.debug(f"Register segment {segment_name!r} in pid {getpid()}") + self.segment_names.append(segment_name) + + def destroy_segment(self, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the list of blocks being tracked.""" + util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") + self.segment_names.remove(segment_name) + segment = shared_memory.SharedMemory(segment_name) + segment.close() + segment.unlink() + + def unlink(self): + "Calls destroy_segment() on all tracked shared memory blocks." + for segment_name in self.segment_names[:]: + self.destroy_segment(segment_name) + + def __del__(self): + util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") + self.unlink() + + def __getstate__(self): + return (self.shared_memory_context_name, self.segment_names) + + def __setstate__(self, state): + self.__init__(*state) + + + class SharedMemoryServer(Server): + + public = Server.public + \ + ['track_segment', 'release_segment', 'list_segments'] + + def __init__(self, *args, **kwargs): + Server.__init__(self, *args, **kwargs) + self.shared_memory_context = \ + _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}") + util.debug(f"SharedMemoryServer started by pid {getpid()}") + + def create(self, c, typeid, *args, **kwargs): + """Create a new distributed-shared object (not backed by a shared + memory block) and return its id to be used in a Proxy Object.""" + # Unless set up as a shared proxy, don't make shared_memory_context + # a standard part of kwargs. This makes things easier for supplying + # simple functions. + if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): + kwargs['shared_memory_context'] = self.shared_memory_context + return Server.create(self, c, typeid, *args, **kwargs) + + def shutdown(self, c): + "Call unlink() on all tracked shared memory, terminate the Server." + self.shared_memory_context.unlink() + return Server.shutdown(self, c) + + def track_segment(self, c, segment_name): + "Adds the supplied shared memory block name to Server's tracker." + self.shared_memory_context.register_segment(segment_name) + + def release_segment(self, c, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the tracker instance inside the Server.""" + self.shared_memory_context.destroy_segment(segment_name) + + def list_segments(self, c): + """Returns a list of names of shared memory blocks that the Server + is currently tracking.""" + return self.shared_memory_context.segment_names + + + class SharedMemoryManager(BaseManager): + """Like SyncManager but uses SharedMemoryServer instead of Server. + + It provides methods for creating and returning SharedMemory instances + and for creating a list-like object (ShareableList) backed by shared + memory. It also provides methods that create and return Proxy Objects + that support synchronization across processes (i.e. multi-process-safe + locks and semaphores). + """ + + _Server = SharedMemoryServer + + def __init__(self, *args, **kwargs): + BaseManager.__init__(self, *args, **kwargs) + util.debug(f"{self.__class__.__name__} created by pid {getpid()}") + + def __del__(self): + util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") + pass + + def get_server(self): + 'Better than monkeypatching for now; merge into Server ultimately' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started SharedMemoryServer") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("SharedMemoryManager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return self._Server(self._registry, self._address, + self._authkey, self._serializer) + + def SharedMemory(self, size): + """Returns a new SharedMemory instance with the specified size in + bytes, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sms = shared_memory.SharedMemory(None, create=True, size=size) + try: + dispatch(conn, None, 'track_segment', (sms.name,)) + except BaseException as e: + sms.unlink() + raise e + return sms + + def ShareableList(self, sequence): + """Returns a new ShareableList instance populated with the values + from the input sequence, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sl = shared_memory.ShareableList(sequence) + try: + dispatch(conn, None, 'track_segment', (sl.shm.name,)) + except BaseException as e: + sl.shm.unlink() + raise e + return sl |