From 635b1f0d5bb08ed01a71fcfd146816db87da495e Mon Sep 17 00:00:00 2001 From: geekmug Date: Fri, 10 Dec 2010 17:47:45 +0000 Subject: Merging changes from py3k's r86491 (issue10432) --- python2/futures/_base.py | 41 +++++++++++++++++++++++++++++++++++------ python3/futures/_base.py | 41 +++++++++++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/python2/futures/_base.py b/python2/futures/_base.py index ed7a094..58a2c5e 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py @@ -12,6 +12,7 @@ import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' @@ -70,8 +71,30 @@ class _Waiter(object): def add_cancelled(self, future): self.finished_futures.append(future) +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" + """Used by wait(return_when=FIRST_COMPLETED).""" def add_result(self, future): super(_FirstCompletedWaiter, self).add_result(future) @@ -128,7 +151,9 @@ class _AcquireFutures(object): future._condition.release() def _create_and_install_waiters(fs, return_when): - if return_when == FIRST_COMPLETED: + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: waiter = _FirstCompletedWaiter() else: pending_count = sum( @@ -171,7 +196,7 @@ def as_completed(fs, timeout=None): f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) try: for future in finished: @@ -187,11 +212,15 @@ def as_completed(fs, timeout=None): '%d (of %d) futures unfinished' % ( len(pending), len(fs))) - waiter.event.wait(timeout) + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() - for future in waiter.finished_futures[:]: + for future in finished: yield future - waiter.finished_futures.remove(future) pending.remove(future) finally: diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 561f4d2..143f330 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -12,6 +12,7 @@ import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' @@ -70,8 +71,30 @@ class _Waiter(object): def add_cancelled(self, future): self.finished_futures.append(future) +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" + """Used by wait(return_when=FIRST_COMPLETED).""" def add_result(self, future): super().add_result(future) @@ -128,7 +151,9 @@ class _AcquireFutures(object): future._condition.release() def _create_and_install_waiters(fs, return_when): - if return_when == FIRST_COMPLETED: + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: waiter = _FirstCompletedWaiter() else: pending_count = sum( @@ -171,7 +196,7 @@ def as_completed(fs, timeout=None): f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) try: for future in finished: @@ -187,11 +212,15 @@ def as_completed(fs, timeout=None): '%d (of %d) futures unfinished' % ( len(pending), len(fs))) - waiter.event.wait(timeout) + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() - for future in waiter.finished_futures[:]: + for future in finished: yield future - waiter.finished_futures.remove(future) pending.remove(future) finally: -- cgit v1.2.1