diff options
author | Andrew Kuchling <amk@amk.ca> | 2015-04-14 10:35:43 -0400 |
---|---|---|
committer | Andrew Kuchling <amk@amk.ca> | 2015-04-14 10:35:43 -0400 |
commit | 3b3863c2d496f01845eda76ccf6401458714397e (patch) | |
tree | 14af5692bc33ffc3a589478b03e33e597ce17166 /Lib/multiprocessing/queues.py | |
parent | b6b4ba499008572740b20b22481074263fb4e5d7 (diff) | |
parent | da4c26c69f751f7c0207b009b2372bdc12c08bba (diff) | |
download | cpython-3b3863c2d496f01845eda76ccf6401458714397e.tar.gz |
Merge from 3.4
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 27 |
1 files changed, 5 insertions, 22 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 293ad7673c..786a303b33 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -82,14 +82,11 @@ class Queue(object): if not self._sem.acquire(block, timeout): raise Full - self._notempty.acquire() - try: + with self._notempty: if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() - finally: - self._notempty.release() def get(self, block=True, timeout=None): if block and timeout is None: @@ -206,12 +203,9 @@ class Queue(object): @staticmethod def _finalize_close(buffer, notempty): debug('telling queue thread to quit') - notempty.acquire() - try: + with notempty: buffer.append(_sentinel) notempty.notify() - finally: - notempty.release() @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): @@ -300,35 +294,24 @@ class JoinableQueue(Queue): if not self._sem.acquire(block, timeout): raise Full - self._notempty.acquire() - self._cond.acquire() - try: + with self._notempty, self._cond: if self._thread is None: self._start_thread() self._buffer.append(obj) self._unfinished_tasks.release() self._notempty.notify() - finally: - self._cond.release() - self._notempty.release() def task_done(self): - self._cond.acquire() - try: + with self._cond: if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): self._cond.notify_all() - finally: - self._cond.release() def join(self): - self._cond.acquire() - try: + with self._cond: if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() - finally: - self._cond.release() # # Simplified Queue type -- really just a locked pipe |