From a924fc7abc2d8788a4a9fa2cbef2caa5c5992ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Sun, 25 May 2014 14:12:12 +0100 Subject: Issue #21565: multiprocessing: use contex-manager protocol for synchronization primitives. --- Lib/multiprocessing/pool.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) (limited to 'Lib/multiprocessing/pool.py') diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 8832a5ceb2..77eb817c3d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -666,8 +666,7 @@ class IMapIterator(object): return self def next(self, timeout=None): - self._cond.acquire() - try: + with self._cond: try: item = self._items.popleft() except IndexError: @@ -680,8 +679,6 @@ class IMapIterator(object): if self._index == self._length: raise StopIteration raise TimeoutError - finally: - self._cond.release() success, value = item if success: @@ -691,8 +688,7 @@ class IMapIterator(object): __next__ = next # XXX def _set(self, i, obj): - self._cond.acquire() - try: + with self._cond: if self._index == i: self._items.append(obj) self._index += 1 @@ -706,18 +702,13 @@ class IMapIterator(object): if self._index == self._length: del self._cache[self._job] - finally: - self._cond.release() def _set_length(self, length): - self._cond.acquire() - try: + with self._cond: self._length = length if self._index == self._length: self._cond.notify() del self._cache[self._job] - finally: - self._cond.release() # # Class whose instances are returned by `Pool.imap_unordered()` @@ -726,15 +717,12 @@ class IMapIterator(object): class IMapUnorderedIterator(IMapIterator): def _set(self, i, obj): - self._cond.acquire() - try: + with self._cond: self._items.append(obj) self._index += 1 self._cond.notify() if self._index == self._length: del self._cache[self._job] - finally: - self._cond.release() # # @@ -760,10 +748,7 @@ class ThreadPool(Pool): @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # put sentinels at head of inqueue to make workers finish - inqueue.not_empty.acquire() - try: + with inqueue.not_empty: inqueue.queue.clear() inqueue.queue.extend([None] * size) inqueue.not_empty.notify_all() - finally: - inqueue.not_empty.release() -- cgit v1.2.1