diff options
| author | Davin Potts <python@discontinuity.net> | 2016-09-07 18:48:01 -0500 | 
|---|---|---|
| committer | Davin Potts <python@discontinuity.net> | 2016-09-07 18:48:01 -0500 | 
| commit | 86a76684269f940a20366cb42668f1acb0982dca (patch) | |
| tree | cb1e60312e2a1626fff00bda42e4163a549ba77f | |
| parent | 1aa642f6bd8e7f6315721201165efa873e77259b (diff) | |
| download | cpython-git-86a76684269f940a20366cb42668f1acb0982dca.tar.gz | |
Fixes issue #6766: Updated multiprocessing Proxy Objects to support nesting
| -rw-r--r-- | Doc/library/multiprocessing.rst | 83 | ||||
| -rw-r--r-- | Lib/multiprocessing/managers.py | 95 | ||||
| -rw-r--r-- | Lib/test/_test_multiprocessing.py | 70 | 
3 files changed, 192 insertions, 56 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index f886ecb4dd..1813eebcfa 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -1682,7 +1682,9 @@ their parent process exits.  The manager classes are defined in the     of processes.  Objects of this type are returned by     :func:`multiprocessing.Manager`. -   It also supports creation of shared lists and dictionaries. +   Its methods create and return :ref:`multiprocessing-proxy_objects` for a +   number of commonly used data types to be synchronized across processes. +   This notably includes shared lists and dictionaries.     .. method:: Barrier(parties[, action[, timeout]]) @@ -1745,31 +1747,17 @@ their parent process exits.  The manager classes are defined in the                 dict(mapping)                 dict(sequence) -      Create a shared ``dict`` object and return a proxy for it. +      Create a shared :class:`dict` object and return a proxy for it.     .. method:: list()                 list(sequence) -      Create a shared ``list`` object and return a proxy for it. - -   .. note:: - -      Modifications to mutable values or items in dict and list proxies will not -      be propagated through the manager, because the proxy has no way of knowing -      when its values or items are modified.  To modify such an item, you can -      re-assign the modified object to the container proxy:: - -         # create a list proxy and append a mutable object (a dictionary) -         lproxy = manager.list() -         lproxy.append({}) -         # now mutate the dictionary -         d = lproxy[0] -         d['a'] = 1 -         d['b'] = 2 -         # at this point, the changes to d are not yet synced, but by -         # reassigning the dictionary, the proxy is notified of the change -         lproxy[0] = d +      Create a shared :class:`list` object and return a proxy for it. +   .. versionchanged:: 3.6 +      Shared objects are capable of being nested.  For example, a shared +      container object such as a shared list can contain other shared objects +      which will all be managed and synchronized by the :class:`SyncManager`.  .. class:: Namespace @@ -1881,6 +1869,8 @@ client to access it remotely::      >>> s = m.get_server()      >>> s.serve_forever() +.. _multiprocessing-proxy_objects: +  Proxy Objects  ~~~~~~~~~~~~~ @@ -1890,8 +1880,7 @@ proxy.  Multiple proxy objects may have the same referent.  A proxy object has methods which invoke corresponding methods of its referent  (although not every method of the referent will necessarily be available through -the proxy).  A proxy can usually be used in most of the same ways that its -referent can: +the proxy).  In this way, a proxy can be used just like its referent can:  .. doctest:: @@ -1912,9 +1901,9 @@ the referent, whereas applying :func:`repr` will return the representation of  the proxy.  An important feature of proxy objects is that they are picklable so they can be -passed between processes.  Note, however, that if a proxy is sent to the -corresponding manager's process then unpickling it will produce the referent -itself.  This means, for example, that one shared object can contain a second: +passed between processes.  As such, a referent can contain +:ref:`multiprocessing-proxy_objects`.  This permits nesting of these managed +lists, dicts, and other :ref:`multiprocessing-proxy_objects`:  .. doctest:: @@ -1922,10 +1911,46 @@ itself.  This means, for example, that one shared object can contain a second:     >>> b = manager.list()     >>> a.append(b)         # referent of a now contains referent of b     >>> print(a, b) -   [[]] [] +   [<ListProxy object, typeid 'list' at ...>] []     >>> b.append('hello') -   >>> print(a, b) -   [['hello']] ['hello'] +   >>> print(a[0], b) +   ['hello'] ['hello'] + +Similarly, dict and list proxies may be nested inside one another:: + +   >>> l_outer = manager.list([ manager.dict() for i in range(2) ]) +   >>> d_first_inner = l_outer[0] +   >>> d_first_inner['a'] = 1 +   >>> d_first_inner['b'] = 2 +   >>> l_outer[1]['c'] = 3 +   >>> l_outer[1]['z'] = 26 +   >>> print(l_outer[0]) +   {'a': 1, 'b': 2} +   >>> print(l_outer[1]) +   {'c': 3, 'z': 26} + +If standard (non-proxy) :class:`list` or :class:`dict` objects are contained +in a referent, modifications to those mutable values will not be propagated +through the manager because the proxy has no way of knowing when the values +contained within are modified.  However, storing a value in a container proxy +(which triggers a ``__setitem__`` on the proxy object) does propagate through +the manager and so to effectively modify such an item, one could re-assign the +modified value to the container proxy:: + +   # create a list proxy and append a mutable object (a dictionary) +   lproxy = manager.list() +   lproxy.append({}) +   # now mutate the dictionary +   d = lproxy[0] +   d['a'] = 1 +   d['b'] = 2 +   # at this point, the changes to d are not yet synced, but by +   # updating the dictionary, the proxy is notified of the change +   lproxy[0] = d + +This approach is perhaps less convenient than employing nested +:ref:`multiprocessing-proxy_objects` for most use cases but also +demonstrates a level of control over the synchronization.  .. note:: diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index c559b55a3f..6e63a60f85 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -142,7 +142,8 @@ class Server(object):          self.id_to_obj = {'0': (None, ())}          self.id_to_refcount = {} -        self.mutex = threading.RLock() +        self.id_to_local_proxy_obj = {} +        self.mutex = threading.Lock()      def serve_forever(self):          ''' @@ -227,7 +228,14 @@ class Server(object):                  methodname = obj = None                  request = recv()                  ident, methodname, args, kwds = request -                obj, exposed, gettypeid = id_to_obj[ident] +                try: +                    obj, exposed, gettypeid = id_to_obj[ident] +                except KeyError as ke: +                    try: +                        obj, exposed, gettypeid = \ +                            self.id_to_local_proxy_obj[ident] +                    except KeyError as second_ke: +                        raise ke                  if methodname not in exposed:                      raise AttributeError( @@ -308,7 +316,7 @@ class Server(object):          '''          with self.mutex:              result = [] -            keys = list(self.id_to_obj.keys()) +            keys = list(self.id_to_refcount.keys())              keys.sort()              for ident in keys:                  if ident != '0': @@ -321,7 +329,8 @@ class Server(object):          '''          Number of shared objects          ''' -        return len(self.id_to_obj) - 1      # don't count ident='0' +        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' +        return len(self.id_to_refcount)      def shutdown(self, c):          ''' @@ -363,13 +372,9 @@ class Server(object):              self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)              if ident not in self.id_to_refcount:                  self.id_to_refcount[ident] = 0 -            # increment the reference count immediately, to avoid -            # this object being garbage collected before a Proxy -            # object for it can be created.  The caller of create() -            # is responsible for doing a decref once the Proxy object -            # has been created. -            self.incref(c, ident) -            return ident, tuple(exposed) + +        self.incref(c, ident) +        return ident, tuple(exposed)      def get_methods(self, c, token):          ''' @@ -387,15 +392,45 @@ class Server(object):      def incref(self, c, ident):          with self.mutex: -            self.id_to_refcount[ident] += 1 +            try: +                self.id_to_refcount[ident] += 1 +            except KeyError as ke: +                # If no external references exist but an internal (to the +                # manager) still does and a new external reference is created +                # from it, restore the manager's tracking of it from the +                # previously stashed internal ref. +                if ident in self.id_to_local_proxy_obj: +                    self.id_to_refcount[ident] = 1 +                    self.id_to_obj[ident] = \ +                        self.id_to_local_proxy_obj[ident] +                    obj, exposed, gettypeid = self.id_to_obj[ident] +                    util.debug('Server re-enabled tracking & INCREF %r', ident) +                else: +                    raise ke      def decref(self, c, ident): +        if ident not in self.id_to_refcount and \ +            ident in self.id_to_local_proxy_obj: +            util.debug('Server DECREF skipping %r', ident) +            return +          with self.mutex:              assert self.id_to_refcount[ident] >= 1              self.id_to_refcount[ident] -= 1              if self.id_to_refcount[ident] == 0: -                del self.id_to_obj[ident], self.id_to_refcount[ident] -                util.debug('disposing of obj with id %r', ident) +                del self.id_to_refcount[ident] + +        if ident not in self.id_to_refcount: +            # Two-step process in case the object turns out to contain other +            # proxy objects (e.g. a managed list of managed lists). +            # Otherwise, deleting self.id_to_obj[ident] would trigger the +            # deleting of the stored value (another managed object) which would +            # in turn attempt to acquire the mutex that is already held here. +            self.id_to_obj[ident] = (None, (), None)  # thread-safe +            util.debug('disposing of obj with id %r', ident) +            with self.mutex: +                del self.id_to_obj[ident] +  #  # Class to represent state of a manager @@ -658,7 +693,7 @@ class BaseProxy(object):      _mutex = util.ForkAwareThreadLock()      def __init__(self, token, serializer, manager=None, -                 authkey=None, exposed=None, incref=True): +                 authkey=None, exposed=None, incref=True, manager_owned=False):          with BaseProxy._mutex:              tls_idset = BaseProxy._address_to_local.get(token.address, None)              if tls_idset is None: @@ -680,6 +715,12 @@ class BaseProxy(object):          self._serializer = serializer          self._Client = listener_client[serializer][1] +        # Should be set to True only when a proxy object is being created +        # on the manager server; primary use case: nested proxy objects. +        # RebuildProxy detects when a proxy is being created on the manager +        # and sets this value appropriately. +        self._owned_by_manager = manager_owned +          if authkey is not None:              self._authkey = process.AuthenticationString(authkey)          elif self._manager is not None: @@ -738,6 +779,10 @@ class BaseProxy(object):          return self._callmethod('#GETVALUE')      def _incref(self): +        if self._owned_by_manager: +            util.debug('owned_by_manager skipped INCREF of %r', self._token.id) +            return +          conn = self._Client(self._token.address, authkey=self._authkey)          dispatch(conn, None, 'incref', (self._id,))          util.debug('INCREF %r', self._token.id) @@ -822,19 +867,19 @@ class BaseProxy(object):  def RebuildProxy(func, token, serializer, kwds):      '''      Function used for unpickling proxy objects. - -    If possible the shared object is returned, or otherwise a proxy for it.      '''      server = getattr(process.current_process(), '_manager_server', None) -      if server and server.address == token.address: -        return server.id_to_obj[token.id][0] -    else: -        incref = ( -            kwds.pop('incref', True) and -            not getattr(process.current_process(), '_inheriting', False) -            ) -        return func(token, serializer, incref=incref, **kwds) +        util.debug('Rebuild a proxy owned by manager, token=%r', token) +        kwds['manager_owned'] = True +        if token.id not in server.id_to_local_proxy_obj: +            server.id_to_local_proxy_obj[token.id] = \ +                server.id_to_obj[token.id] +    incref = ( +        kwds.pop('incref', True) and +        not getattr(process.current_process(), '_inheriting', False) +        ) +    return func(token, serializer, incref=incref, **kwds)  #  # Functions to create proxies and proxy types diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index cfd801e55c..d88cd07618 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1628,13 +1628,33 @@ class _TestContainers(BaseTestCase):          d = [a, b]          e = self.list(d)          self.assertEqual( -            e[:], +            [element[:] for element in e],              [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]              )          f = self.list([a])          a.append('hello') -        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) +        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) + +    def test_list_proxy_in_list(self): +        a = self.list([self.list(range(3)) for _i in range(3)]) +        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) + +        a[0][-1] = 55 +        self.assertEqual(a[0][:], [0, 1, 55]) +        for i in range(1, 3): +            self.assertEqual(a[i][:], [0, 1, 2]) + +        self.assertEqual(a[1].pop(), 2) +        self.assertEqual(len(a[1]), 2) +        for i in range(0, 3, 2): +            self.assertEqual(len(a[i]), 3) + +        del a + +        b = self.list() +        b.append(b) +        del b      def test_dict(self):          d = self.dict() @@ -1646,6 +1666,52 @@ class _TestContainers(BaseTestCase):          self.assertEqual(sorted(d.values()), [chr(i) for i in indices])          self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) +    def test_dict_proxy_nested(self): +        pets = self.dict(ferrets=2, hamsters=4) +        supplies = self.dict(water=10, feed=3) +        d = self.dict(pets=pets, supplies=supplies) + +        self.assertEqual(supplies['water'], 10) +        self.assertEqual(d['supplies']['water'], 10) + +        d['supplies']['blankets'] = 5 +        self.assertEqual(supplies['blankets'], 5) +        self.assertEqual(d['supplies']['blankets'], 5) + +        d['supplies']['water'] = 7 +        self.assertEqual(supplies['water'], 7) +        self.assertEqual(d['supplies']['water'], 7) + +        del pets +        del supplies +        self.assertEqual(d['pets']['ferrets'], 2) +        d['supplies']['blankets'] = 11 +        self.assertEqual(d['supplies']['blankets'], 11) + +        pets = d['pets'] +        supplies = d['supplies'] +        supplies['water'] = 7 +        self.assertEqual(supplies['water'], 7) +        self.assertEqual(d['supplies']['water'], 7) + +        d.clear() +        self.assertEqual(len(d), 0) +        self.assertEqual(supplies['water'], 7) +        self.assertEqual(pets['hamsters'], 4) + +        l = self.list([pets, supplies]) +        l[0]['marmots'] = 1 +        self.assertEqual(pets['marmots'], 1) +        self.assertEqual(l[0]['marmots'], 1) + +        del pets +        del supplies +        self.assertEqual(l[0]['marmots'], 1) + +        outer = self.list([[88, 99], l]) +        self.assertIsInstance(outer[0], list)  # Not a ListProxy +        self.assertEqual(outer[-1][-1]['feed'], 3) +      def test_namespace(self):          n = self.Namespace()          n.name = 'Bob'  | 
