summaryrefslogtreecommitdiff
path: root/trollius/queues.py
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@redhat.com>2015-09-08 22:55:49 +0200
committerVictor Stinner <vstinner@redhat.com>2015-09-08 22:55:49 +0200
commit0727ad2b6d49cd94ea0fb86ef08c8050208b839a (patch)
tree07179220ffd8f0851769b2d271731f2c365621ad /trollius/queues.py
parentf25cb291d8439d9f3d44f52811607d3fdb305d1f (diff)
parent8d79c57726e30fd19d5fadf46375853df7895516 (diff)
downloadtrollius-git-0727ad2b6d49cd94ea0fb86ef08c8050208b839a.tar.gz
Merge asyncio into trollius
Diffstat (limited to 'trollius/queues.py')
-rw-r--r--trollius/queues.py49
1 files changed, 39 insertions, 10 deletions
diff --git a/trollius/queues.py b/trollius/queues.py
index 9c77c60..18167ab 100644
--- a/trollius/queues.py
+++ b/trollius/queues.py
@@ -47,7 +47,7 @@ class Queue(object):
# Futures.
self._getters = collections.deque()
- # Pairs of (item, Future).
+ # Futures
self._putters = collections.deque()
self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop)
@@ -98,7 +98,7 @@ class Queue(object):
def _consume_done_putters(self):
# Delete waiters at the head of the put() queue who've timed out.
- while self._putters and self._putters[0][1].done():
+ while self._putters and self._putters[0].done():
self._putters.popleft()
def qsize(self):
@@ -148,8 +148,9 @@ class Queue(object):
elif self._maxsize > 0 and self._maxsize <= self.qsize():
waiter = futures.Future(loop=self._loop)
- self._putters.append((item, waiter))
+ self._putters.append(waiter)
yield From(waiter)
+ self._put(item)
else:
self.__put_internal(item)
@@ -186,8 +187,7 @@ class Queue(object):
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self.__put_internal(item)
+ putter = self._putters.popleft()
# When a getter runs and frees up a slot so this putter can
# run, we need to defer the put for a tick to ensure that
@@ -201,10 +201,40 @@ class Queue(object):
raise Return(self._get())
else:
waiter = futures.Future(loop=self._loop)
-
self._getters.append(waiter)
- result = yield From(waiter)
- raise Return(result)
+ try:
+ value = (yield From(waiter))
+ raise Return(value)
+ except futures.CancelledError:
+ # if we get CancelledError, it means someone cancelled this
+ # get() coroutine. But there is a chance that the waiter
+ # already is ready and contains an item that has just been
+ # removed from the queue. In this case, we need to put the item
+ # back into the front of the queue. This get() must either
+ # succeed without fault or, if it gets cancelled, it must be as
+ # if it never happened.
+ if waiter.done():
+ self._put_it_back(waiter.result())
+ raise
+
+ def _put_it_back(self, item):
+ """
+ This is called when we have a waiter to get() an item and this waiter
+ gets cancelled. In this case, we put the item back: wake up another
+ waiter or put it in the _queue.
+ """
+ self._consume_done_getters()
+ if self._getters:
+ assert not self._queue, (
+ 'queue non-empty, why are getters waiting?')
+
+ getter = self._getters.popleft()
+ self.__put_internal(item)
+
+ # getter cannot be cancelled, we just removed done getters
+ getter.set_result(item)
+ else:
+ self._queue.appendleft(item)
def get_nowait(self):
"""Remove and return an item from the queue.
@@ -214,8 +244,7 @@ class Queue(object):
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self.__put_internal(item)
+ putter = self._putters.popleft()
# Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters