summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2015-04-19 20:15:12 -0700
committerGuido van Rossum <guido@python.org>2015-04-19 20:15:12 -0700
commitbcb7ec45f24375432dd80736ca6edd0df4356a0f (patch)
treeaff9b58baaf034d356dd414c84deb53b59e499e3
parent7718675eb9d3acf4b45b8f99f7e64468caa7a2a3 (diff)
parente496c7c8e05ef1fe7f49cce51d40e0b669142c02 (diff)
downloadtrollius-git-bcb7ec45f24375432dd80736ca6edd0df4356a0f.tar.gz
Merge pull request #236 from ajdavis/queue-join-fix
Fix LifoQueue's and PriorityQueue's put() and task_done().
-rw-r--r--asyncio/queues.py19
-rw-r--r--tests/test_queues.py24
2 files changed, 32 insertions, 11 deletions
diff --git a/asyncio/queues.py b/asyncio/queues.py
index 84cdabc..ed11662 100644
--- a/asyncio/queues.py
+++ b/asyncio/queues.py
@@ -54,6 +54,8 @@ class Queue:
self._finished.set()
self._init(maxsize)
+ # These three are overridable in subclasses.
+
def _init(self, maxsize):
self._queue = collections.deque()
@@ -62,6 +64,11 @@ class Queue:
def _put(self, item):
self._queue.append(item)
+
+ # End of the overridable methods.
+
+ def __put_internal(self, item):
+ self._put(item)
self._unfinished_tasks += 1
self._finished.clear()
@@ -133,7 +140,7 @@ class Queue:
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
- self._put(item)
+ self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get())
@@ -145,7 +152,7 @@ class Queue:
yield from waiter
else:
- self._put(item)
+ self.__put_internal(item)
def put_nowait(self, item):
"""Put an item into the queue without blocking.
@@ -158,7 +165,7 @@ class Queue:
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
- self._put(item)
+ self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get())
@@ -166,7 +173,7 @@ class Queue:
elif self._maxsize > 0 and self._maxsize <= self.qsize():
raise QueueFull
else:
- self._put(item)
+ self.__put_internal(item)
@coroutine
def get(self):
@@ -180,7 +187,7 @@ class Queue:
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
item, putter = self._putters.popleft()
- self._put(item)
+ self.__put_internal(item)
# When a getter runs and frees up a slot so this putter can
# run, we need to defer the put for a tick to ensure that
@@ -207,7 +214,7 @@ class Queue:
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
item, putter = self._putters.popleft()
- self._put(item)
+ self.__put_internal(item)
# Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters
diff --git a/tests/test_queues.py b/tests/test_queues.py
index a73539d..6edb9f2 100644
--- a/tests/test_queues.py
+++ b/tests/test_queues.py
@@ -408,14 +408,16 @@ class PriorityQueueTests(_QueueTestBase):
self.assertEqual([1, 2, 3], items)
-class QueueJoinTests(_QueueTestBase):
+class _QueueJoinTestBase(_QueueTestBase):
+
+ q_class = None
def test_task_done_underflow(self):
- q = asyncio.Queue(loop=self.loop)
+ q = self.q_class(loop=self.loop)
self.assertRaises(ValueError, q.task_done)
def test_task_done(self):
- q = asyncio.Queue(loop=self.loop)
+ q = self.q_class(loop=self.loop)
for i in range(100):
q.put_nowait(i)
@@ -452,7 +454,7 @@ class QueueJoinTests(_QueueTestBase):
self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop))
def test_join_empty_queue(self):
- q = asyncio.Queue(loop=self.loop)
+ q = self.q_class(loop=self.loop)
# Test that a queue join()s successfully, and before anything else
# (done twice for insurance).
@@ -465,12 +467,24 @@ class QueueJoinTests(_QueueTestBase):
self.loop.run_until_complete(join())
def test_format(self):
- q = asyncio.Queue(loop=self.loop)
+ q = self.q_class(loop=self.loop)
self.assertEqual(q._format(), 'maxsize=0')
q._unfinished_tasks = 2
self.assertEqual(q._format(), 'maxsize=0 tasks=2')
+class QueueJoinTests(_QueueJoinTestBase):
+ q_class = asyncio.Queue
+
+
+class LifoQueueJoinTests(_QueueJoinTestBase):
+ q_class = asyncio.LifoQueue
+
+
+class PriorityQueueJoinTests(_QueueJoinTestBase):
+ q_class = asyncio.PriorityQueue
+
+
if __name__ == '__main__':
unittest.main()