summaryrefslogtreecommitdiff
path: root/asyncio/coroutines.py
diff options
context:
space:
mode:
Diffstat (limited to 'asyncio/coroutines.py')
-rw-r--r--asyncio/coroutines.py195
1 files changed, 0 insertions, 195 deletions
diff --git a/asyncio/coroutines.py b/asyncio/coroutines.py
deleted file mode 100644
index c28de95..0000000
--- a/asyncio/coroutines.py
+++ /dev/null
@@ -1,195 +0,0 @@
-__all__ = ['coroutine',
- 'iscoroutinefunction', 'iscoroutine']
-
-import functools
-import inspect
-import opcode
-import os
-import sys
-import traceback
-import types
-
-from . import events
-from . import futures
-from .log import logger
-
-
-# Opcode of "yield from" instruction
-_YIELD_FROM = opcode.opmap['YIELD_FROM']
-
-# If you set _DEBUG to true, @coroutine will wrap the resulting
-# generator objects in a CoroWrapper instance (defined below). That
-# instance will log a message when the generator is never iterated
-# over, which may happen when you forget to use "yield from" with a
-# coroutine call. Note that the value of the _DEBUG flag is taken
-# when the decorator is used, so to be of any use it must be set
-# before you define your coroutines. A downside of using this feature
-# is that tracebacks show entries for the CoroWrapper.__next__ method
-# when _DEBUG is true.
-_DEBUG = (not sys.flags.ignore_environment
- and bool(os.environ.get('PYTHONASYNCIODEBUG')))
-
-
-# Check for CPython issue #21209
-def has_yield_from_bug():
- class MyGen:
- def __init__(self):
- self.send_args = None
- def __iter__(self):
- return self
- def __next__(self):
- return 42
- def send(self, *what):
- self.send_args = what
- return None
- def yield_from_gen(gen):
- yield from gen
- value = (1, 2, 3)
- gen = MyGen()
- coro = yield_from_gen(gen)
- next(coro)
- coro.send(value)
- return gen.send_args != (value,)
-_YIELD_FROM_BUG = has_yield_from_bug()
-del has_yield_from_bug
-
-
-class CoroWrapper:
- # Wrapper for coroutine object in _DEBUG mode.
-
- def __init__(self, gen, func):
- assert inspect.isgenerator(gen), gen
- self.gen = gen
- self.func = func
- self._source_traceback = traceback.extract_stack(sys._getframe(1))
- # __name__, __qualname__, __doc__ attributes are set by the coroutine()
- # decorator
-
- def __repr__(self):
- coro_repr = _format_coroutine(self)
- if self._source_traceback:
- frame = self._source_traceback[-1]
- coro_repr += ', created at %s:%s' % (frame[0], frame[1])
- return '<%s %s>' % (self.__class__.__name__, coro_repr)
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return next(self.gen)
-
- if _YIELD_FROM_BUG:
- # For for CPython issue #21209: using "yield from" and a custom
- # generator, generator.send(tuple) unpacks the tuple instead of passing
- # the tuple unchanged. Check if the caller is a generator using "yield
- # from" to decide if the parameter should be unpacked or not.
- def send(self, *value):
- frame = sys._getframe()
- caller = frame.f_back
- assert caller.f_lasti >= 0
- if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:
- value = value[0]
- return self.gen.send(value)
- else:
- def send(self, value):
- return self.gen.send(value)
-
- def throw(self, exc):
- return self.gen.throw(exc)
-
- def close(self):
- return self.gen.close()
-
- @property
- def gi_frame(self):
- return self.gen.gi_frame
-
- @property
- def gi_running(self):
- return self.gen.gi_running
-
- @property
- def gi_code(self):
- return self.gen.gi_code
-
- def __del__(self):
- # Be careful accessing self.gen.frame -- self.gen might not exist.
- gen = getattr(self, 'gen', None)
- frame = getattr(gen, 'gi_frame', None)
- if frame is not None and frame.f_lasti == -1:
- msg = '%r was never yielded from' % self
- tb = getattr(self, '_source_traceback', ())
- if tb:
- tb = ''.join(traceback.format_list(tb))
- msg += ('\nCoroutine object created at '
- '(most recent call last):\n')
- msg += tb.rstrip()
- logger.error(msg)
-
-
-def coroutine(func):
- """Decorator to mark coroutines.
-
- If the coroutine is not yielded from before it is destroyed,
- an error message is logged.
- """
- if inspect.isgeneratorfunction(func):
- coro = func
- else:
- @functools.wraps(func)
- def coro(*args, **kw):
- res = func(*args, **kw)
- if isinstance(res, futures.Future) or inspect.isgenerator(res):
- res = yield from res
- return res
-
- if not _DEBUG:
- wrapper = coro
- else:
- @functools.wraps(func)
- def wrapper(*args, **kwds):
- w = CoroWrapper(coro(*args, **kwds), func)
- if w._source_traceback:
- del w._source_traceback[-1]
- w.__name__ = func.__name__
- if hasattr(func, '__qualname__'):
- w.__qualname__ = func.__qualname__
- w.__doc__ = func.__doc__
- return w
-
- wrapper._is_coroutine = True # For iscoroutinefunction().
- return wrapper
-
-
-def iscoroutinefunction(func):
- """Return True if func is a decorated coroutine function."""
- return getattr(func, '_is_coroutine', False)
-
-
-_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
-
-def iscoroutine(obj):
- """Return True if obj is a coroutine object."""
- return isinstance(obj, _COROUTINE_TYPES)
-
-
-def _format_coroutine(coro):
- assert iscoroutine(coro)
- coro_name = getattr(coro, '__qualname__', coro.__name__)
-
- filename = coro.gi_code.co_filename
- if (isinstance(coro, CoroWrapper)
- and not inspect.isgeneratorfunction(coro.func)):
- filename, lineno = events._get_function_source(coro.func)
- if coro.gi_frame is None:
- coro_repr = '%s() done, defined at %s:%s' % (coro_name, filename, lineno)
- else:
- coro_repr = '%s() running, defined at %s:%s' % (coro_name, filename, lineno)
- elif coro.gi_frame is not None:
- lineno = coro.gi_frame.f_lineno
- coro_repr = '%s() running at %s:%s' % (coro_name, filename, lineno)
- else:
- lineno = coro.gi_code.co_firstlineno
- coro_repr = '%s() done, defined at %s:%s' % (coro_name, filename, lineno)
-
- return coro_repr