diff options
Diffstat (limited to 'trollius/coroutines.py')
-rw-r--r-- | trollius/coroutines.py | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/trollius/coroutines.py b/trollius/coroutines.py new file mode 100644 index 0000000..650bc13 --- /dev/null +++ b/trollius/coroutines.py @@ -0,0 +1,342 @@ +__all__ = ['coroutine', + 'iscoroutinefunction', 'iscoroutine'] + +import functools +import inspect +import opcode +import os +import sys +import traceback +import types + +from . import compat +from . import events +from . import futures +from .log import logger + + +# Opcode of "yield from" instruction +_YIELD_FROM = opcode.opmap.get('YIELD_FROM', None) + +# If you set _DEBUG to true, @coroutine will wrap the resulting +# generator objects in a CoroWrapper instance (defined below). That +# instance will log a message when the generator is never iterated +# over, which may happen when you forget to use "yield" with a +# coroutine call. Note that the value of the _DEBUG flag is taken +# when the decorator is used, so to be of any use it must be set +# before you define your coroutines. A downside of using this feature +# is that tracebacks show entries for the CoroWrapper.__next__ method +# when _DEBUG is true. +_DEBUG = bool(os.environ.get('TROLLIUSDEBUG')) + + +if _YIELD_FROM is not None: + # Check for CPython issue #21209 + exec('''if 1: + def has_yield_from_bug(): + class MyGen: + def __init__(self): + self.send_args = None + def __iter__(self): + return self + def __next__(self): + return 42 + def send(self, *what): + self.send_args = what + return None + def yield_from_gen(gen): + yield from gen + value = (1, 2, 3) + gen = MyGen() + coro = yield_from_gen(gen) + next(coro) + coro.send(value) + return gen.send_args != (value,) +''') + _YIELD_FROM_BUG = has_yield_from_bug() + del has_yield_from_bug +else: + _YIELD_FROM_BUG = False + + +if compat.PY33: + # Don't use the Return class on Python 3.3 and later to support asyncio + # coroutines (to avoid the warning emited in Return destructor). + # + # The problem is that Return inherits from StopIteration. "yield from + # trollius_coroutine". Task._step() does not receive the Return exception, + # because "yield from" handles it internally. So it's not possible to set + # the raised attribute to True to avoid the warning in Return destructor. + def Return(*args): + if not args: + value = None + elif len(args) == 1: + value = args[0] + else: + value = args + return StopIteration(value) +else: + class Return(StopIteration): + def __init__(self, *args): + StopIteration.__init__(self) + if not args: + self.value = None + elif len(args) == 1: + self.value = args[0] + else: + self.value = args + self.raised = False + if _DEBUG: + frame = sys._getframe(1) + self._source_traceback = traceback.extract_stack(frame) + # explicitly clear the reference to avoid reference cycles + frame = None + else: + self._source_traceback = None + + def __del__(self): + if self.raised: + return + + fmt = 'Return(%r) used without raise' + if self._source_traceback: + fmt += '\nReturn created at (most recent call last):\n' + tb = ''.join(traceback.format_list(self._source_traceback)) + fmt += tb.rstrip() + logger.error(fmt, self.value) + + +def _coroutine_at_yield_from(coro): + """Test if the last instruction of a coroutine is "yield from". + + Return False if the coroutine completed. + """ + frame = coro.gi_frame + if frame is None: + return False + code = coro.gi_code + assert frame.f_lasti >= 0 + offset = frame.f_lasti + 1 + instr = code.co_code[offset] + return (instr == _YIELD_FROM) + + +class CoroWrapper(object): + # Wrapper for coroutine object in _DEBUG mode. + + def __init__(self, gen, func): + assert inspect.isgenerator(gen), gen + self.gen = gen + self.func = func + self._source_traceback = traceback.extract_stack(sys._getframe(1)) + # __name__, __qualname__, __doc__ attributes are set by the coroutine() + # decorator + + def __repr__(self): + coro_repr = _format_coroutine(self) + if self._source_traceback: + frame = self._source_traceback[-1] + coro_repr += ', created at %s:%s' % (frame[0], frame[1]) + return '<%s %s>' % (self.__class__.__name__, coro_repr) + + def __iter__(self): + return self + + def __next__(self): + return next(self.gen) + next = __next__ + + if _YIELD_FROM_BUG: + # For for CPython issue #21209: using "yield from" and a custom + # generator, generator.send(tuple) unpacks the tuple instead of passing + # the tuple unchanged. Check if the caller is a generator using "yield + # from" to decide if the parameter should be unpacked or not. + def send(self, *value): + frame = sys._getframe() + caller = frame.f_back + assert caller.f_lasti >= 0 + if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM: + value = value[0] + return self.gen.send(value) + else: + def send(self, value): + return self.gen.send(value) + + def throw(self, exc): + return self.gen.throw(exc) + + def close(self): + return self.gen.close() + + @property + def gi_frame(self): + return self.gen.gi_frame + + @property + def gi_running(self): + return self.gen.gi_running + + @property + def gi_code(self): + return self.gen.gi_code + + def __del__(self): + # Be careful accessing self.gen.frame -- self.gen might not exist. + gen = getattr(self, 'gen', None) + frame = getattr(gen, 'gi_frame', None) + if frame is not None and frame.f_lasti == -1: + msg = '%r was never yielded from' % self + tb = getattr(self, '_source_traceback', ()) + if tb: + tb = ''.join(traceback.format_list(tb)) + msg += ('\nCoroutine object created at ' + '(most recent call last):\n') + msg += tb.rstrip() + logger.error(msg) + +if not compat.PY34: + # Backport functools.update_wrapper() from Python 3.4: + # - Python 2.7 fails if assigned attributes don't exist + # - Python 2.7 and 3.1 don't set the __wrapped__ attribute + # - Python 3.2 and 3.3 set __wrapped__ before updating __dict__ + def _update_wrapper(wrapper, + wrapped, + assigned = functools.WRAPPER_ASSIGNMENTS, + updated = functools.WRAPPER_UPDATES): + """Update a wrapper function to look like the wrapped function + + wrapper is the function to be updated + wrapped is the original function + assigned is a tuple naming the attributes assigned directly + from the wrapped function to the wrapper function (defaults to + functools.WRAPPER_ASSIGNMENTS) + updated is a tuple naming the attributes of the wrapper that + are updated with the corresponding attribute from the wrapped + function (defaults to functools.WRAPPER_UPDATES) + """ + for attr in assigned: + try: + value = getattr(wrapped, attr) + except AttributeError: + pass + else: + setattr(wrapper, attr, value) + for attr in updated: + getattr(wrapper, attr).update(getattr(wrapped, attr, {})) + # Issue #17482: set __wrapped__ last so we don't inadvertently copy it + # from the wrapped function when updating __dict__ + wrapper.__wrapped__ = wrapped + # Return the wrapper so this can be used as a decorator via partial() + return wrapper + + def _wraps(wrapped, + assigned = functools.WRAPPER_ASSIGNMENTS, + updated = functools.WRAPPER_UPDATES): + """Decorator factory to apply update_wrapper() to a wrapper function + + Returns a decorator that invokes update_wrapper() with the decorated + function as the wrapper argument and the arguments to wraps() as the + remaining arguments. Default arguments are as for update_wrapper(). + This is a convenience function to simplify applying partial() to + update_wrapper(). + """ + return functools.partial(_update_wrapper, wrapped=wrapped, + assigned=assigned, updated=updated) +else: + _wraps = functools.wraps + +def coroutine(func): + """Decorator to mark coroutines. + + If the coroutine is not yielded from before it is destroyed, + an error message is logged. + """ + if inspect.isgeneratorfunction(func): + coro = func + else: + @_wraps(func) + def coro(*args, **kw): + res = func(*args, **kw) + if (isinstance(res, futures._FUTURE_CLASSES) + or inspect.isgenerator(res)): + res = yield From(res) + raise Return(res) + + if not _DEBUG: + wrapper = coro + else: + @_wraps(func) + def wrapper(*args, **kwds): + coro_wrapper = CoroWrapper(coro(*args, **kwds), func) + if coro_wrapper._source_traceback: + del coro_wrapper._source_traceback[-1] + for attr in ('__name__', '__qualname__', '__doc__'): + try: + value = getattr(func, attr) + except AttributeError: + pass + else: + setattr(coro_wrapper, attr, value) + return coro_wrapper + if not compat.PY3: + wrapper.__wrapped__ = func + + wrapper._is_coroutine = True # For iscoroutinefunction(). + return wrapper + + +def iscoroutinefunction(func): + """Return True if func is a decorated coroutine function.""" + return getattr(func, '_is_coroutine', False) + + +_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper) +if events.asyncio is not None: + # Accept also asyncio CoroWrapper for interoperability + if hasattr(events.asyncio, 'coroutines'): + _COROUTINE_TYPES += (events.asyncio.coroutines.CoroWrapper,) + else: + # old Tulip/Python versions + _COROUTINE_TYPES += (events.asyncio.tasks.CoroWrapper,) + +def iscoroutine(obj): + """Return True if obj is a coroutine object.""" + return isinstance(obj, _COROUTINE_TYPES) + + +def _format_coroutine(coro): + assert iscoroutine(coro) + coro_name = getattr(coro, '__qualname__', coro.__name__) + + filename = coro.gi_code.co_filename + if (isinstance(coro, CoroWrapper) + and not inspect.isgeneratorfunction(coro.func)): + filename, lineno = events._get_function_source(coro.func) + if coro.gi_frame is None: + coro_repr = '%s() done, defined at %s:%s' % (coro_name, filename, lineno) + else: + coro_repr = '%s() running, defined at %s:%s' % (coro_name, filename, lineno) + elif coro.gi_frame is not None: + lineno = coro.gi_frame.f_lineno + coro_repr = '%s() running at %s:%s' % (coro_name, filename, lineno) + else: + lineno = coro.gi_code.co_firstlineno + coro_repr = '%s() done, defined at %s:%s' % (coro_name, filename, lineno) + + return coro_repr + + +class FromWrapper(object): + __slots__ = ('obj',) + + def __init__(self, obj): + if isinstance(obj, FromWrapper): + obj = obj.obj + assert not isinstance(obj, FromWrapper) + self.obj = obj + +def From(obj): + if not _DEBUG: + return obj + else: + return FromWrapper(obj) |