summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgeekmug <devnull@localhost>2010-12-10 17:47:45 +0000
committergeekmug <devnull@localhost>2010-12-10 17:47:45 +0000
commit635b1f0d5bb08ed01a71fcfd146816db87da495e (patch)
treeffcb111470b9feb5963fda6de2e908ccade4aa3b
parentb7ba8f8c71e621f752c71fac88ba2757ce7b2e1b (diff)
downloadfutures-635b1f0d5bb08ed01a71fcfd146816db87da495e.tar.gz
Merging changes from py3k's r86491 (issue10432)
-rw-r--r--python2/futures/_base.py41
-rw-r--r--python3/futures/_base.py41
2 files changed, 70 insertions, 12 deletions
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index ed7a094..58a2c5e 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -12,6 +12,7 @@ import time
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
+_AS_COMPLETED = '_AS_COMPLETED'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
@@ -70,8 +71,30 @@ class _Waiter(object):
def add_cancelled(self, future):
self.finished_futures.append(future)
+class _AsCompletedWaiter(_Waiter):
+ """Used by as_completed()."""
+
+ def __init__(self):
+ super(_AsCompletedWaiter, self).__init__()
+ self.lock = threading.Lock()
+
+ def add_result(self, future):
+ with self.lock:
+ super(_AsCompletedWaiter, self).add_result(future)
+ self.event.set()
+
+ def add_exception(self, future):
+ with self.lock:
+ super(_AsCompletedWaiter, self).add_exception(future)
+ self.event.set()
+
+ def add_cancelled(self, future):
+ with self.lock:
+ super(_AsCompletedWaiter, self).add_cancelled(future)
+ self.event.set()
+
class _FirstCompletedWaiter(_Waiter):
- """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
+ """Used by wait(return_when=FIRST_COMPLETED)."""
def add_result(self, future):
super(_FirstCompletedWaiter, self).add_result(future)
@@ -128,7 +151,9 @@ class _AcquireFutures(object):
future._condition.release()
def _create_and_install_waiters(fs, return_when):
- if return_when == FIRST_COMPLETED:
+ if return_when == _AS_COMPLETED:
+ waiter = _AsCompletedWaiter()
+ elif return_when == FIRST_COMPLETED:
waiter = _FirstCompletedWaiter()
else:
pending_count = sum(
@@ -171,7 +196,7 @@ def as_completed(fs, timeout=None):
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished
- waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
+ waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
for future in finished:
@@ -187,11 +212,15 @@ def as_completed(fs, timeout=None):
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
- waiter.event.wait(timeout)
+ waiter.event.wait(wait_timeout)
+
+ with waiter.lock:
+ finished = waiter.finished_futures
+ waiter.finished_futures = []
+ waiter.event.clear()
- for future in waiter.finished_futures[:]:
+ for future in finished:
yield future
- waiter.finished_futures.remove(future)
pending.remove(future)
finally:
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
index 561f4d2..143f330 100644
--- a/python3/futures/_base.py
+++ b/python3/futures/_base.py
@@ -12,6 +12,7 @@ import time
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
+_AS_COMPLETED = '_AS_COMPLETED'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
@@ -70,8 +71,30 @@ class _Waiter(object):
def add_cancelled(self, future):
self.finished_futures.append(future)
+class _AsCompletedWaiter(_Waiter):
+ """Used by as_completed()."""
+
+ def __init__(self):
+ super(_AsCompletedWaiter, self).__init__()
+ self.lock = threading.Lock()
+
+ def add_result(self, future):
+ with self.lock:
+ super(_AsCompletedWaiter, self).add_result(future)
+ self.event.set()
+
+ def add_exception(self, future):
+ with self.lock:
+ super(_AsCompletedWaiter, self).add_exception(future)
+ self.event.set()
+
+ def add_cancelled(self, future):
+ with self.lock:
+ super(_AsCompletedWaiter, self).add_cancelled(future)
+ self.event.set()
+
class _FirstCompletedWaiter(_Waiter):
- """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
+ """Used by wait(return_when=FIRST_COMPLETED)."""
def add_result(self, future):
super().add_result(future)
@@ -128,7 +151,9 @@ class _AcquireFutures(object):
future._condition.release()
def _create_and_install_waiters(fs, return_when):
- if return_when == FIRST_COMPLETED:
+ if return_when == _AS_COMPLETED:
+ waiter = _AsCompletedWaiter()
+ elif return_when == FIRST_COMPLETED:
waiter = _FirstCompletedWaiter()
else:
pending_count = sum(
@@ -171,7 +196,7 @@ def as_completed(fs, timeout=None):
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished
- waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
+ waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
for future in finished:
@@ -187,11 +212,15 @@ def as_completed(fs, timeout=None):
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
- waiter.event.wait(timeout)
+ waiter.event.wait(wait_timeout)
+
+ with waiter.lock:
+ finished = waiter.finished_futures
+ waiter.finished_futures = []
+ waiter.event.clear()
- for future in waiter.finished_futures[:]:
+ for future in finished:
yield future
- waiter.finished_futures.remove(future)
pending.remove(future)
finally: