summaryrefslogtreecommitdiff
path: root/Lib/multiprocessing/managers.py
diff options
context:
space:
mode:
authorDavin Potts <applio@users.noreply.github.com>2019-02-23 22:08:16 -0600
committerGitHub <noreply@github.com>2019-02-23 22:08:16 -0600
commite895de3e7f3cc2f7213b87621cfe9812ea4343f0 (patch)
tree5f282ce0e28bc6af3b78ab18f6ef18c665baf3b8 /Lib/multiprocessing/managers.py
parentd610116a2e48b55788b62e11f2e6956af06b3de0 (diff)
downloadcpython-git-e895de3e7f3cc2f7213b87621cfe9812ea4343f0.tar.gz
bpo-35813: Tests and docs for shared_memory (#11816)
* Added tests for shared_memory submodule. * Added tests for ShareableList. * Fix bug in allocationn size during creation of empty ShareableList illuminated by existing test run on Linux. * Initial set of docs for shared_memory module. * Added docs for ShareableList, added doctree entry for shared_memory submodule, name refactoring for greater clarity. * Added examples to SharedMemoryManager docs, for ease of documentation switched away from exclusively registered functions to some explicit methods on SharedMemoryManager. * Wording tweaks to docs. * Fix test failures on Windows. * Added tests around SharedMemoryManager. * Documentation tweaks. * Fix inappropriate test on Windows. * Further documentation tweaks. * Fix bare exception. * Removed __copyright__. * Fixed typo in doc, removed comment. * Updated SharedMemoryManager preliminary tests to reflect change of not supporting all registered functions on SyncManager. * Added Sphinx doctest run controls. * CloseHandle should be in a finally block in case MapViewOfFile fails. * Missed opportunity to use with statement. * Switch to self.addCleanup to spare long try/finally blocks and save one indentation, change to use decorator to skip test instead. * Simplify the posixshmem extension module. Provide shm_open() and shm_unlink() functions. Move other functionality into the shared_memory.py module. * Added to doc around size parameter of SharedMemory. * Changed PosixSharedMemory.size to use os.fstat. * Change SharedMemory.buf to a read-only property as well as NamedSharedMemory.size. * Marked as provisional per PEP411 in docstring. * Changed SharedMemoryTracker to be private. * Removed registered Proxy Objects from SharedMemoryManager. * Removed shareable_wrap(). * Removed shareable_wrap() and dangling references to it. * For consistency added __reduce__ to key classes. * Fix for potential race condition on Windows for O_CREX. * Remove unused imports. * Update access to kernel32 on Windows per feedback from eryksun. * Moved kernel32 calls to _winapi. * Removed ShareableList.copy as redundant. * Changes to _winapi use from eryksun feedback. * Adopt simpler SharedMemory API, collapsing PosixSharedMemory and WindowsNamedSharedMemory into one. * Fix missing docstring on class, add test for ignoring size when attaching. * Moved SharedMemoryManager to managers module, tweak to fragile test. * Tweak to exception in OpenFileMapping suggested by eryksun. * Mark a few dangling bits as private as suggested by Giampaolo.
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r--Lib/multiprocessing/managers.py151
1 files changed, 149 insertions, 2 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 4ae8ddc770..7973012b98 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -1,5 +1,5 @@
#
-# Module providing the `SyncManager` class for dealing
+# Module providing manager classes for dealing
# with shared objects
#
# multiprocessing/managers.py
@@ -8,7 +8,8 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
+__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
+ 'SharedMemoryManager' ]
#
# Imports
@@ -19,6 +20,7 @@ import threading
import array
import queue
import time
+from os import getpid
from traceback import format_exc
@@ -28,6 +30,11 @@ from . import pool
from . import process
from . import util
from . import get_context
+try:
+ from . import shared_memory
+ HAS_SHMEM = True
+except ImportError:
+ HAS_SHMEM = False
#
# Register some things for pickling
@@ -1200,3 +1207,143 @@ SyncManager.register('Namespace', Namespace, NamespaceProxy)
# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)
+
+#
+# Definition of SharedMemoryManager and SharedMemoryServer
+#
+
+if HAS_SHMEM:
+ class _SharedMemoryTracker:
+ "Manages one or more shared memory segments."
+
+ def __init__(self, name, segment_names=[]):
+ self.shared_memory_context_name = name
+ self.segment_names = segment_names
+
+ def register_segment(self, segment_name):
+ "Adds the supplied shared memory block name to tracker."
+ util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
+ self.segment_names.append(segment_name)
+
+ def destroy_segment(self, segment_name):
+ """Calls unlink() on the shared memory block with the supplied name
+ and removes it from the list of blocks being tracked."""
+ util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
+ self.segment_names.remove(segment_name)
+ segment = shared_memory.SharedMemory(segment_name)
+ segment.close()
+ segment.unlink()
+
+ def unlink(self):
+ "Calls destroy_segment() on all tracked shared memory blocks."
+ for segment_name in self.segment_names[:]:
+ self.destroy_segment(segment_name)
+
+ def __del__(self):
+ util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
+ self.unlink()
+
+ def __getstate__(self):
+ return (self.shared_memory_context_name, self.segment_names)
+
+ def __setstate__(self, state):
+ self.__init__(*state)
+
+
+ class SharedMemoryServer(Server):
+
+ public = Server.public + \
+ ['track_segment', 'release_segment', 'list_segments']
+
+ def __init__(self, *args, **kwargs):
+ Server.__init__(self, *args, **kwargs)
+ self.shared_memory_context = \
+ _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
+ util.debug(f"SharedMemoryServer started by pid {getpid()}")
+
+ def create(self, c, typeid, *args, **kwargs):
+ """Create a new distributed-shared object (not backed by a shared
+ memory block) and return its id to be used in a Proxy Object."""
+ # Unless set up as a shared proxy, don't make shared_memory_context
+ # a standard part of kwargs. This makes things easier for supplying
+ # simple functions.
+ if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
+ kwargs['shared_memory_context'] = self.shared_memory_context
+ return Server.create(self, c, typeid, *args, **kwargs)
+
+ def shutdown(self, c):
+ "Call unlink() on all tracked shared memory, terminate the Server."
+ self.shared_memory_context.unlink()
+ return Server.shutdown(self, c)
+
+ def track_segment(self, c, segment_name):
+ "Adds the supplied shared memory block name to Server's tracker."
+ self.shared_memory_context.register_segment(segment_name)
+
+ def release_segment(self, c, segment_name):
+ """Calls unlink() on the shared memory block with the supplied name
+ and removes it from the tracker instance inside the Server."""
+ self.shared_memory_context.destroy_segment(segment_name)
+
+ def list_segments(self, c):
+ """Returns a list of names of shared memory blocks that the Server
+ is currently tracking."""
+ return self.shared_memory_context.segment_names
+
+
+ class SharedMemoryManager(BaseManager):
+ """Like SyncManager but uses SharedMemoryServer instead of Server.
+
+ It provides methods for creating and returning SharedMemory instances
+ and for creating a list-like object (ShareableList) backed by shared
+ memory. It also provides methods that create and return Proxy Objects
+ that support synchronization across processes (i.e. multi-process-safe
+ locks and semaphores).
+ """
+
+ _Server = SharedMemoryServer
+
+ def __init__(self, *args, **kwargs):
+ BaseManager.__init__(self, *args, **kwargs)
+ util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
+
+ def __del__(self):
+ util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
+ pass
+
+ def get_server(self):
+ 'Better than monkeypatching for now; merge into Server ultimately'
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started SharedMemoryServer")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("SharedMemoryManager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
+ return self._Server(self._registry, self._address,
+ self._authkey, self._serializer)
+
+ def SharedMemory(self, size):
+ """Returns a new SharedMemory instance with the specified size in
+ bytes, to be tracked by the manager."""
+ with self._Client(self._address, authkey=self._authkey) as conn:
+ sms = shared_memory.SharedMemory(None, create=True, size=size)
+ try:
+ dispatch(conn, None, 'track_segment', (sms.name,))
+ except BaseException as e:
+ sms.unlink()
+ raise e
+ return sms
+
+ def ShareableList(self, sequence):
+ """Returns a new ShareableList instance populated with the values
+ from the input sequence, to be tracked by the manager."""
+ with self._Client(self._address, authkey=self._authkey) as conn:
+ sl = shared_memory.ShareableList(sequence)
+ try:
+ dispatch(conn, None, 'track_segment', (sl.shm.name,))
+ except BaseException as e:
+ sl.shm.unlink()
+ raise e
+ return sl