diff options
Diffstat (limited to 'trollius/tasks.py')
-rw-r--r-- | trollius/tasks.py | 740 |
1 files changed, 740 insertions, 0 deletions
diff --git a/trollius/tasks.py b/trollius/tasks.py new file mode 100644 index 0000000..3e0e1b1 --- /dev/null +++ b/trollius/tasks.py @@ -0,0 +1,740 @@ +"""Support for tasks, coroutines and the scheduler.""" +from __future__ import print_function + +__all__ = ['Task', + 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', + 'wait', 'wait_for', 'as_completed', 'sleep', 'async', + 'gather', 'shield', 'ensure_future', + ] + +import functools +import linecache +import traceback +import warnings +try: + from weakref import WeakSet +except ImportError: + # Python 2.6 + from .py27_weakrefset import WeakSet + +from . import compat +from . import coroutines +from . import events +from . import executor +from . import futures +from .locks import Lock, Condition, Semaphore, _ContextManager +from .coroutines import coroutine, From, Return + + + +@coroutine +def _lock_coroutine(lock): + yield From(lock.acquire()) + raise Return(_ContextManager(lock)) + + +class Task(futures.Future): + """A coroutine wrapped in a Future.""" + + # An important invariant maintained while a Task not done: + # + # - Either _fut_waiter is None, and _step() is scheduled; + # - or _fut_waiter is some Future, and _step() is *not* scheduled. + # + # The only transition from the latter to the former is through + # _wakeup(). When _fut_waiter is not None, one of its callbacks + # must be _wakeup(). + + # Weak set containing all tasks alive. + _all_tasks = WeakSet() + + # Dictionary containing tasks that are currently active in + # all running event loops. {EventLoop: Task} + _current_tasks = {} + + # If False, don't log a message if the task is destroyed whereas its + # status is still pending + _log_destroy_pending = True + + @classmethod + def current_task(cls, loop=None): + """Return the currently running task in an event loop or None. + + By default the current task for the current event loop is returned. + + None is returned when called not in the context of a Task. + """ + if loop is None: + loop = events.get_event_loop() + return cls._current_tasks.get(loop) + + @classmethod + def all_tasks(cls, loop=None): + """Return a set of all tasks for an event loop. + + By default all tasks for the current event loop are returned. + """ + if loop is None: + loop = events.get_event_loop() + return set(t for t in cls._all_tasks if t._loop is loop) + + def __init__(self, coro, loop=None): + assert coroutines.iscoroutine(coro), repr(coro) + super(Task, self).__init__(loop=loop) + if self._source_traceback: + del self._source_traceback[-1] + self._coro = coro + self._fut_waiter = None + self._must_cancel = False + self._loop.call_soon(self._step) + self.__class__._all_tasks.add(self) + + # On Python 3.3 or older, objects with a destructor that are part of a + # reference cycle are never destroyed. That's not the case any more on + # Python 3.4 thanks to the PEP 442. + if compat.PY34: + def __del__(self): + if self._state == futures._PENDING and self._log_destroy_pending: + context = { + 'task': self, + 'message': 'Task was destroyed but it is pending!', + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + futures.Future.__del__(self) + + def _repr_info(self): + info = super(Task, self)._repr_info() + + if self._must_cancel: + # replace status + info[0] = 'cancelling' + + coro = coroutines._format_coroutine(self._coro) + info.insert(1, 'coro=<%s>' % coro) + + if self._fut_waiter is not None: + info.insert(2, 'wait_for=%r' % self._fut_waiter) + return info + + def get_stack(self, limit=None): + """Return the list of stack frames for this task's coroutine. + + If the coroutine is not done, this returns the stack where it is + suspended. If the coroutine has completed successfully or was + cancelled, this returns an empty list. If the coroutine was + terminated by an exception, this returns the list of traceback + frames. + + The frames are always ordered from oldest to newest. + + The optional limit gives the maximum number of frames to + return; by default all available frames are returned. Its + meaning differs depending on whether a stack or a traceback is + returned: the newest frames of a stack are returned, but the + oldest frames of a traceback are returned. (This matches the + behavior of the traceback module.) + + For reasons beyond our control, only one stack frame is + returned for a suspended coroutine. + """ + frames = [] + try: + # 'async def' coroutines + f = self._coro.cr_frame + except AttributeError: + f = self._coro.gi_frame + if f is not None: + while f is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(f) + f = f.f_back + frames.reverse() + elif self._exception is not None: + tb = self._exception.__traceback__ + while tb is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(tb.tb_frame) + tb = tb.tb_next + return frames + + def print_stack(self, limit=None, file=None): + """Print the stack or traceback for this task's coroutine. + + This produces output similar to that of the traceback module, + for the frames retrieved by get_stack(). The limit argument + is passed to get_stack(). The file argument is an I/O stream + to which the output is written; by default output is written + to sys.stderr. + """ + extracted_list = [] + checked = set() + for f in self.get_stack(limit=limit): + lineno = f.f_lineno + co = f.f_code + filename = co.co_filename + name = co.co_name + if filename not in checked: + checked.add(filename) + linecache.checkcache(filename) + line = linecache.getline(filename, lineno, f.f_globals) + extracted_list.append((filename, lineno, name, line)) + exc = self._exception + if not extracted_list: + print('No stack for %r' % self, file=file) + elif exc is not None: + print('Traceback for %r (most recent call last):' % self, + file=file) + else: + print('Stack for %r (most recent call last):' % self, + file=file) + traceback.print_list(extracted_list, file=file) + if exc is not None: + for line in traceback.format_exception_only(exc.__class__, exc): + print(line, file=file, end='') + + def cancel(self): + """Request that this task cancel itself. + + This arranges for a CancelledError to be thrown into the + wrapped coroutine on the next cycle through the event loop. + The coroutine then has a chance to clean up or even deny + the request using try/except/finally. + + Unlike Future.cancel, this does not guarantee that the + task will be cancelled: the exception might be caught and + acted upon, delaying cancellation of the task or preventing + cancellation completely. The task may also return a value or + raise a different exception. + + Immediately after this method is called, Task.cancelled() will + not return True (unless the task was already cancelled). A + task will be marked as cancelled when the wrapped coroutine + terminates with a CancelledError exception (even if cancel() + was not called). + """ + if self.done(): + return False + if self._fut_waiter is not None: + if self._fut_waiter.cancel(): + # Leave self._fut_waiter; it may be a Task that + # catches and ignores the cancellation so we may have + # to cancel it again later. + return True + # It must be the case that self._step is already scheduled. + self._must_cancel = True + return True + + def _step(self, value=None, exc=None, exc_tb=None): + assert not self.done(), \ + '_step(): already done: {0!r}, {1!r}, {2!r}'.format(self, value, exc) + if self._must_cancel: + if not isinstance(exc, futures.CancelledError): + exc = futures.CancelledError() + self._must_cancel = False + coro = self._coro + self._fut_waiter = None + + if exc_tb is not None: + init_exc = exc + else: + init_exc = None + self.__class__._current_tasks[self._loop] = self + # Call either coro.throw(exc) or coro.send(value). + try: + if exc is not None: + result = coro.throw(exc) + else: + result = coro.send(value) + except StopIteration as exc: + if compat.PY33: + # asyncio Task object? get the result of the coroutine + result = exc.value + else: + if isinstance(exc, Return): + exc.raised = True + result = exc.value + else: + result = None + self.set_result(result) + except futures.CancelledError as exc: + super(Task, self).cancel() # I.e., Future.cancel(self). + except BaseException as exc: + if exc is init_exc: + self._set_exception_with_tb(exc, exc_tb) + exc_tb = None + else: + self.set_exception(exc) + + if not isinstance(exc, Exception): + # reraise BaseException + raise + else: + if coroutines._DEBUG: + if not coroutines._coroutine_at_yield_from(self._coro): + # trollius coroutine must "yield From(...)" + if not isinstance(result, coroutines.FromWrapper): + self._loop.call_soon( + self._step, None, + RuntimeError("yield used without From")) + return + result = result.obj + else: + # asyncio coroutine using "yield from ..." + if isinstance(result, coroutines.FromWrapper): + result = result.obj + elif isinstance(result, coroutines.FromWrapper): + result = result.obj + + if coroutines.iscoroutine(result): + # "yield coroutine" creates a task, the current task + # will wait until the new task is done + result = self._loop.create_task(result) + # FIXME: faster check. common base class? hasattr? + elif isinstance(result, (Lock, Condition, Semaphore)): + coro = _lock_coroutine(result) + result = self._loop.create_task(coro) + + if isinstance(result, futures._FUTURE_CLASSES): + # Yielded Future must come from Future.__iter__(). + result.add_done_callback(self._wakeup) + self._fut_waiter = result + if self._must_cancel: + if self._fut_waiter.cancel(): + self._must_cancel = False + elif result is None: + # Bare yield relinquishes control for one event loop iteration. + self._loop.call_soon(self._step) + else: + # Yielding something else is an error. + self._loop.call_soon( + self._step, None, + RuntimeError( + 'Task got bad yield: {0!r}'.format(result))) + finally: + self.__class__._current_tasks.pop(self._loop) + self = None # Needed to break cycles when an exception occurs. + + def _wakeup(self, future): + if (future._state == futures._FINISHED + and future._exception is not None): + # Get the traceback before calling exception(), because calling + # the exception() method clears the traceback + exc_tb = future._get_exception_tb() + exc = future.exception() + self._step(None, exc, exc_tb) + exc_tb = None + else: + try: + value = future.result() + except Exception as exc: + # This may also be a cancellation. + self._step(None, exc) + else: + self._step(value, None) + self = None # Needed to break cycles when an exception occurs. + + +# wait() and as_completed() similar to those in PEP 3148. + +# Export symbols in trollius.tasks for compatibility with asyncio +FIRST_COMPLETED = executor.FIRST_COMPLETED +FIRST_EXCEPTION = executor.FIRST_EXCEPTION +ALL_COMPLETED = executor.ALL_COMPLETED + + +@coroutine +def wait(fs, loop=None, timeout=None, return_when=ALL_COMPLETED): + """Wait for the Futures and coroutines given by fs to complete. + + The sequence futures must not be empty. + + Coroutines will be wrapped in Tasks. + + Returns two sets of Future: (done, pending). + + Usage: + + done, pending = yield From(asyncio.wait(fs)) + + Note: This does not raise TimeoutError! Futures that aren't done + when the timeout occurs are returned in the second set. + """ + if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs): + raise TypeError("expect a list of futures, not %s" % type(fs).__name__) + if not fs: + raise ValueError('Set of coroutines/Futures is empty.') + if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): + raise ValueError('Invalid return_when value: {0}'.format(return_when)) + + if loop is None: + loop = events.get_event_loop() + + fs = set(ensure_future(f, loop=loop) for f in set(fs)) + + result = yield From(_wait(fs, timeout, return_when, loop)) + raise Return(result) + + +def _release_waiter(waiter, *args): + if not waiter.done(): + waiter.set_result(None) + + +@coroutine +def wait_for(fut, timeout, loop=None): + """Wait for the single Future or coroutine to complete, with timeout. + + Coroutine will be wrapped in Task. + + Returns result of the Future or coroutine. When a timeout occurs, + it cancels the task and raises TimeoutError. To avoid the task + cancellation, wrap it in shield(). + + If the wait is cancelled, the task is also cancelled. + + This function is a coroutine. + """ + if loop is None: + loop = events.get_event_loop() + + if timeout is None: + result = yield From(fut) + raise Return(result) + + waiter = futures.Future(loop=loop) + timeout_handle = loop.call_later(timeout, _release_waiter, waiter) + cb = functools.partial(_release_waiter, waiter) + + fut = ensure_future(fut, loop=loop) + fut.add_done_callback(cb) + + try: + # wait until the future completes or the timeout + try: + yield From(waiter) + except futures.CancelledError: + fut.remove_done_callback(cb) + fut.cancel() + raise + + if fut.done(): + raise Return(fut.result()) + else: + fut.remove_done_callback(cb) + fut.cancel() + raise futures.TimeoutError() + finally: + timeout_handle.cancel() + + +@coroutine +def _wait(fs, timeout, return_when, loop): + """Internal helper for wait() and _wait_for(). + + The fs argument must be a collection of Futures. + """ + assert fs, 'Set of Futures is empty.' + waiter = futures.Future(loop=loop) + timeout_handle = None + if timeout is not None: + timeout_handle = loop.call_later(timeout, _release_waiter, waiter) + non_local = {'counter': len(fs)} + + def _on_completion(f): + non_local['counter'] -= 1 + if (non_local['counter'] <= 0 or + return_when == FIRST_COMPLETED or + return_when == FIRST_EXCEPTION and (not f.cancelled() and + f.exception() is not None)): + if timeout_handle is not None: + timeout_handle.cancel() + if not waiter.done(): + waiter.set_result(None) + + for f in fs: + f.add_done_callback(_on_completion) + + try: + yield From(waiter) + finally: + if timeout_handle is not None: + timeout_handle.cancel() + + done, pending = set(), set() + for f in fs: + f.remove_done_callback(_on_completion) + if f.done(): + done.add(f) + else: + pending.add(f) + raise Return(done, pending) + + +# This is *not* a @coroutine! It is just an iterator (yielding Futures). +def as_completed(fs, loop=None, timeout=None): + """Return an iterator whose values are coroutines. + + When waiting for the yielded coroutines you'll get the results (or + exceptions!) of the original Futures (or coroutines), in the order + in which and as soon as they complete. + + This differs from PEP 3148; the proper way to use this is: + + for f in as_completed(fs): + result = yield From(f) # The 'yield' may raise. + # Use result. + + If a timeout is specified, the 'yield' will raise + TimeoutError when the timeout occurs before all Futures are done. + + Note: The futures 'f' are not necessarily members of fs. + """ + if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs): + raise TypeError("expect a list of futures, not %s" % type(fs).__name__) + loop = loop if loop is not None else events.get_event_loop() + todo = set(ensure_future(f, loop=loop) for f in set(fs)) + from .queues import Queue # Import here to avoid circular import problem. + done = Queue(loop=loop) + timeout_handle = None + + def _on_timeout(): + for f in todo: + f.remove_done_callback(_on_completion) + done.put_nowait(None) # Queue a dummy value for _wait_for_one(). + todo.clear() # Can't do todo.remove(f) in the loop. + + def _on_completion(f): + if not todo: + return # _on_timeout() was here first. + todo.remove(f) + done.put_nowait(f) + if not todo and timeout_handle is not None: + timeout_handle.cancel() + + @coroutine + def _wait_for_one(): + f = yield From(done.get()) + if f is None: + # Dummy value from _on_timeout(). + raise futures.TimeoutError + raise Return(f.result()) # May raise f.exception(). + + for f in todo: + f.add_done_callback(_on_completion) + if todo and timeout is not None: + timeout_handle = loop.call_later(timeout, _on_timeout) + for _ in range(len(todo)): + yield _wait_for_one() + + +@coroutine +def sleep(delay, result=None, loop=None): + """Coroutine that completes after a given time (in seconds).""" + future = futures.Future(loop=loop) + h = future._loop.call_later(delay, + future._set_result_unless_cancelled, result) + try: + result = yield From(future) + raise Return(result) + finally: + h.cancel() + + +def async(coro_or_future, loop=None): + """Wrap a coroutine in a future. + + If the argument is a Future, it is returned directly. + + This function is deprecated in 3.5. Use asyncio.ensure_future() instead. + """ + + warnings.warn("asyncio.async() function is deprecated, use ensure_future()", + DeprecationWarning) + + return ensure_future(coro_or_future, loop=loop) + + +def ensure_future(coro_or_future, loop=None): + """Wrap a coroutine in a future. + + If the argument is a Future, it is returned directly. + """ + # FIXME: only check if coroutines._DEBUG is True? + if isinstance(coro_or_future, coroutines.FromWrapper): + coro_or_future = coro_or_future.obj + if isinstance(coro_or_future, futures._FUTURE_CLASSES): + if loop is not None and loop is not coro_or_future._loop: + raise ValueError('loop argument must agree with Future') + return coro_or_future + elif coroutines.iscoroutine(coro_or_future): + if loop is None: + loop = events.get_event_loop() + task = loop.create_task(coro_or_future) + if task._source_traceback: + del task._source_traceback[-1] + return task + else: + raise TypeError('A Future or coroutine is required') + + +class _GatheringFuture(futures.Future): + """Helper for gather(). + + This overrides cancel() to cancel all the children and act more + like Task.cancel(), which doesn't immediately mark itself as + cancelled. + """ + + def __init__(self, children, loop=None): + super(_GatheringFuture, self).__init__(loop=loop) + self._children = children + + def cancel(self): + if self.done(): + return False + for child in self._children: + child.cancel() + return True + + +def gather(*coros_or_futures, **kw): + """Return a future aggregating results from the given coroutines + or futures. + + All futures must share the same event loop. If all the tasks are + done successfully, the returned future's result is the list of + results (in the order of the original sequence, not necessarily + the order of results arrival). If *return_exceptions* is True, + exceptions in the tasks are treated the same as successful + results, and gathered in the result list; otherwise, the first + raised exception will be immediately propagated to the returned + future. + + Cancellation: if the outer Future is cancelled, all children (that + have not completed yet) are also cancelled. If any child is + cancelled, this is treated as if it raised CancelledError -- + the outer Future is *not* cancelled in this case. (This is to + prevent the cancellation of one child to cause other children to + be cancelled.) + """ + loop = kw.pop('loop', None) + return_exceptions = kw.pop('return_exceptions', False) + if kw: + raise TypeError("unexpected keyword") + + if not coros_or_futures: + outer = futures.Future(loop=loop) + outer.set_result([]) + return outer + + arg_to_fut = {} + for arg in set(coros_or_futures): + if not isinstance(arg, futures._FUTURE_CLASSES): + fut = ensure_future(arg, loop=loop) + if loop is None: + loop = fut._loop + # The caller cannot control this future, the "destroy pending task" + # warning should not be emitted. + fut._log_destroy_pending = False + else: + fut = arg + if loop is None: + loop = fut._loop + elif fut._loop is not loop: + raise ValueError("futures are tied to different event loops") + arg_to_fut[arg] = fut + + children = [arg_to_fut[arg] for arg in coros_or_futures] + nchildren = len(children) + outer = _GatheringFuture(children, loop=loop) + non_local = {'nfinished': 0} + results = [None] * nchildren + + def _done_callback(i, fut): + if outer.done(): + if not fut.cancelled(): + # Mark exception retrieved. + fut.exception() + return + + if fut.cancelled(): + res = futures.CancelledError() + if not return_exceptions: + outer.set_exception(res) + return + elif fut._exception is not None: + res = fut.exception() # Mark exception retrieved. + if not return_exceptions: + outer.set_exception(res) + return + else: + res = fut._result + results[i] = res + non_local['nfinished'] += 1 + if non_local['nfinished'] == nchildren: + outer.set_result(results) + + for i, fut in enumerate(children): + fut.add_done_callback(functools.partial(_done_callback, i)) + return outer + + +def shield(arg, loop=None): + """Wait for a future, shielding it from cancellation. + + The statement + + res = yield From(shield(something())) + + is exactly equivalent to the statement + + res = yield From(something()) + + *except* that if the coroutine containing it is cancelled, the + task running in something() is not cancelled. From the POV of + something(), the cancellation did not happen. But its caller is + still cancelled, so the yield-from expression still raises + CancelledError. Note: If something() is cancelled by other means + this will still cancel shield(). + + If you want to completely ignore cancellation (not recommended) + you can combine shield() with a try/except clause, as follows: + + try: + res = yield From(shield(something())) + except CancelledError: + res = None + """ + inner = ensure_future(arg, loop=loop) + if inner.done(): + # Shortcut. + return inner + loop = inner._loop + outer = futures.Future(loop=loop) + + def _done_callback(inner): + if outer.cancelled(): + if not inner.cancelled(): + # Mark inner's result as retrieved. + inner.exception() + return + + if inner.cancelled(): + outer.cancel() + else: + exc = inner.exception() + if exc is not None: + outer.set_exception(exc) + else: + outer.set_result(inner.result()) + + inner.add_done_callback(_done_callback) + return outer |