summaryrefslogtreecommitdiff
path: root/Lib/multiprocessing/queues.py
diff options
context:
space:
mode:
authorAndrew Kuchling <amk@amk.ca>2015-04-14 10:35:43 -0400
committerAndrew Kuchling <amk@amk.ca>2015-04-14 10:35:43 -0400
commit3b3863c2d496f01845eda76ccf6401458714397e (patch)
tree14af5692bc33ffc3a589478b03e33e597ce17166 /Lib/multiprocessing/queues.py
parentb6b4ba499008572740b20b22481074263fb4e5d7 (diff)
parentda4c26c69f751f7c0207b009b2372bdc12c08bba (diff)
downloadcpython-3b3863c2d496f01845eda76ccf6401458714397e.tar.gz
Merge from 3.4
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r--Lib/multiprocessing/queues.py27
1 files changed, 5 insertions, 22 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 293ad7673c..786a303b33 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -82,14 +82,11 @@ class Queue(object):
if not self._sem.acquire(block, timeout):
raise Full
- self._notempty.acquire()
- try:
+ with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
- finally:
- self._notempty.release()
def get(self, block=True, timeout=None):
if block and timeout is None:
@@ -206,12 +203,9 @@ class Queue(object):
@staticmethod
def _finalize_close(buffer, notempty):
debug('telling queue thread to quit')
- notempty.acquire()
- try:
+ with notempty:
buffer.append(_sentinel)
notempty.notify()
- finally:
- notempty.release()
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
@@ -300,35 +294,24 @@ class JoinableQueue(Queue):
if not self._sem.acquire(block, timeout):
raise Full
- self._notempty.acquire()
- self._cond.acquire()
- try:
+ with self._notempty, self._cond:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._unfinished_tasks.release()
self._notempty.notify()
- finally:
- self._cond.release()
- self._notempty.release()
def task_done(self):
- self._cond.acquire()
- try:
+ with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()
- finally:
- self._cond.release()
def join(self):
- self._cond.acquire()
- try:
+ with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
- finally:
- self._cond.release()
#
# Simplified Queue type -- really just a locked pipe