diff options
author | Victor Stinner <vstinner@redhat.com> | 2015-09-08 22:55:49 +0200 |
---|---|---|
committer | Victor Stinner <vstinner@redhat.com> | 2015-09-08 22:55:49 +0200 |
commit | 0727ad2b6d49cd94ea0fb86ef08c8050208b839a (patch) | |
tree | 07179220ffd8f0851769b2d271731f2c365621ad /trollius | |
parent | f25cb291d8439d9f3d44f52811607d3fdb305d1f (diff) | |
parent | 8d79c57726e30fd19d5fadf46375853df7895516 (diff) | |
download | trollius-git-0727ad2b6d49cd94ea0fb86ef08c8050208b839a.tar.gz |
Merge asyncio into trollius
Diffstat (limited to 'trollius')
-rw-r--r-- | trollius/base_events.py | 4 | ||||
-rw-r--r-- | trollius/base_subprocess.py | 13 | ||||
-rw-r--r-- | trollius/locks.py | 1 | ||||
-rw-r--r-- | trollius/proactor_events.py | 4 | ||||
-rw-r--r-- | trollius/queues.py | 49 | ||||
-rw-r--r-- | trollius/selector_events.py | 3 | ||||
-rw-r--r-- | trollius/sslproto.py | 11 | ||||
-rw-r--r-- | trollius/streams.py | 20 | ||||
-rw-r--r-- | trollius/tasks.py | 7 | ||||
-rw-r--r-- | trollius/test_utils.py | 2 | ||||
-rw-r--r-- | trollius/transports.py | 2 | ||||
-rw-r--r-- | trollius/unix_events.py | 4 |
12 files changed, 89 insertions, 31 deletions
diff --git a/trollius/base_events.py b/trollius/base_events.py index d4dc448..c5e6eff 100644 --- a/trollius/base_events.py +++ b/trollius/base_events.py @@ -387,7 +387,7 @@ class BaseEventLoop(events.AbstractEventLoop): # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if not self.is_closed(): warnings.warn("unclosed event loop %r" % self, ResourceWarning) @@ -1225,7 +1225,7 @@ class BaseEventLoop(events.AbstractEventLoop): return enabled = bool(enabled) - if self._coroutine_wrapper_set is enabled: + if self._coroutine_wrapper_set == enabled: return wrapper = coroutines.debug_wrapper diff --git a/trollius/base_subprocess.py b/trollius/base_subprocess.py index 13cff7a..ffd6e76 100644 --- a/trollius/base_subprocess.py +++ b/trollius/base_subprocess.py @@ -1,8 +1,8 @@ import collections import subprocess -import sys import warnings +from . import compat from . import futures from . import protocols from . import transports @@ -36,8 +36,13 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._pipes[2] = None # Create the child process: set the _proc attribute - self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, - stderr=stderr, bufsize=bufsize, **kwargs) + try: + self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr, bufsize=bufsize, **kwargs) + except: + self.close() + raise + self._pid = self._proc.pid self._extra['subprocess'] = self._proc @@ -112,7 +117,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if not self._closed: warnings.warn("unclosed transport %r" % self, ResourceWarning) diff --git a/trollius/locks.py b/trollius/locks.py index 0d6de65..03b4daa 100644 --- a/trollius/locks.py +++ b/trollius/locks.py @@ -3,7 +3,6 @@ __all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore'] import collections -import sys from . import compat from . import events diff --git a/trollius/proactor_events.py b/trollius/proactor_events.py index 49d8bc3..66b4caf 100644 --- a/trollius/proactor_events.py +++ b/trollius/proactor_events.py @@ -7,10 +7,10 @@ proactor is only implemented on Windows with IOCP. __all__ = ['BaseProactorEventLoop'] import socket -import sys import warnings from . import base_events +from . import compat from . import constants from . import futures from . import sslproto @@ -82,7 +82,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._sock is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) diff --git a/trollius/queues.py b/trollius/queues.py index 9c77c60..18167ab 100644 --- a/trollius/queues.py +++ b/trollius/queues.py @@ -47,7 +47,7 @@ class Queue(object): # Futures. self._getters = collections.deque() - # Pairs of (item, Future). + # Futures self._putters = collections.deque() self._unfinished_tasks = 0 self._finished = locks.Event(loop=self._loop) @@ -98,7 +98,7 @@ class Queue(object): def _consume_done_putters(self): # Delete waiters at the head of the put() queue who've timed out. - while self._putters and self._putters[0][1].done(): + while self._putters and self._putters[0].done(): self._putters.popleft() def qsize(self): @@ -148,8 +148,9 @@ class Queue(object): elif self._maxsize > 0 and self._maxsize <= self.qsize(): waiter = futures.Future(loop=self._loop) - self._putters.append((item, waiter)) + self._putters.append(waiter) yield From(waiter) + self._put(item) else: self.__put_internal(item) @@ -186,8 +187,7 @@ class Queue(object): self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # When a getter runs and frees up a slot so this putter can # run, we need to defer the put for a tick to ensure that @@ -201,10 +201,40 @@ class Queue(object): raise Return(self._get()) else: waiter = futures.Future(loop=self._loop) - self._getters.append(waiter) - result = yield From(waiter) - raise Return(result) + try: + value = (yield From(waiter)) + raise Return(value) + except futures.CancelledError: + # if we get CancelledError, it means someone cancelled this + # get() coroutine. But there is a chance that the waiter + # already is ready and contains an item that has just been + # removed from the queue. In this case, we need to put the item + # back into the front of the queue. This get() must either + # succeed without fault or, if it gets cancelled, it must be as + # if it never happened. + if waiter.done(): + self._put_it_back(waiter.result()) + raise + + def _put_it_back(self, item): + """ + This is called when we have a waiter to get() an item and this waiter + gets cancelled. In this case, we put the item back: wake up another + waiter or put it in the _queue. + """ + self._consume_done_getters() + if self._getters: + assert not self._queue, ( + 'queue non-empty, why are getters waiting?') + + getter = self._getters.popleft() + self.__put_internal(item) + + # getter cannot be cancelled, we just removed done getters + getter.set_result(item) + else: + self._queue.appendleft(item) def get_nowait(self): """Remove and return an item from the queue. @@ -214,8 +244,7 @@ class Queue(object): self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # Wake putter on next tick. # getter cannot be cancelled, we just removed done putters diff --git a/trollius/selector_events.py b/trollius/selector_events.py index dc27ed1..67ef26e 100644 --- a/trollius/selector_events.py +++ b/trollius/selector_events.py @@ -19,6 +19,7 @@ except ImportError: # pragma: no cover ssl = None from . import base_events +from . import compat from . import constants from . import events from . import futures @@ -584,7 +585,7 @@ class _SelectorTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._sock is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) diff --git a/trollius/sslproto.py b/trollius/sslproto.py index 707cc6d..1404fd7 100644 --- a/trollius/sslproto.py +++ b/trollius/sslproto.py @@ -1,5 +1,4 @@ import collections -import sys import warnings try: import ssl @@ -7,6 +6,7 @@ try: except ImportError: # pragma: no cover ssl = None +from . import compat from . import protocols from . import transports from .log import logger @@ -326,7 +326,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if not self._closed: warnings.warn("unclosed transport %r" % self, ResourceWarning) @@ -623,7 +623,8 @@ class SSLProtocol(protocols.Protocol): if data: ssldata, offset = self._sslpipe.feed_appdata(data, offset) elif offset: - ssldata = self._sslpipe.do_handshake(self._on_handshake_complete) + ssldata = self._sslpipe.do_handshake( + self._on_handshake_complete) offset = 1 else: ssldata = self._sslpipe.shutdown(self._finalize) @@ -647,9 +648,13 @@ class SSLProtocol(protocols.Protocol): self._write_buffer_size -= len(data) except BaseException as exc: if self._in_handshake: + # BaseExceptions will be re-raised in _on_handshake_complete. self._on_handshake_complete(exc) else: self._fatal_error(exc, 'Fatal error on SSL transport') + if not isinstance(exc, Exception): + # BaseException + raise def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. diff --git a/trollius/streams.py b/trollius/streams.py index b7ba4c5..cde58fb 100644 --- a/trollius/streams.py +++ b/trollius/streams.py @@ -6,7 +6,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol', ] import socket -import sys if hasattr(socket, 'AF_UNIX'): __all__.extend(['open_unix_connection', 'start_unix_server']) @@ -243,6 +242,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): def eof_received(self): self._stream_reader.feed_eof() + return True class StreamWriter(object): @@ -324,6 +324,24 @@ class StreamReader(object): self._transport = None self._paused = False + def __repr__(self): + info = ['StreamReader'] + if self._buffer: + info.append('%d bytes' % len(info)) + if self._eof: + info.append('eof') + if self._limit != _DEFAULT_LIMIT: + info.append('l=%d' % self._limit) + if self._waiter: + info.append('w=%r' % self._waiter) + if self._exception: + info.append('e=%r' % self._exception) + if self._transport: + info.append('t=%r' % self._transport) + if self._paused: + info.append('paused') + return '<%s>' % ' '.join(info) + def exception(self): return self._exception diff --git a/trollius/tasks.py b/trollius/tasks.py index 8de3e62..3e0e1b1 100644 --- a/trollius/tasks.py +++ b/trollius/tasks.py @@ -9,7 +9,6 @@ __all__ = ['Task', import functools import linecache -import sys import traceback import warnings try: @@ -141,7 +140,11 @@ class Task(futures.Future): returned for a suspended coroutine. """ frames = [] - f = self._coro.gi_frame + try: + # 'async def' coroutines + f = self._coro.cr_frame + except AttributeError: + f = self._coro.gi_frame if f is not None: while f is not None: if limit is not None: diff --git a/trollius/test_utils.py b/trollius/test_utils.py index 12cdd45..ebebb25 100644 --- a/trollius/test_utils.py +++ b/trollius/test_utils.py @@ -524,7 +524,7 @@ class TestCase(unittest.TestCase): if six.PY2: sys.exc_clear() else: - self.assertEqual(sys.exc_info(), (None, None, None)) + pass #self.assertEqual(sys.exc_info(), (None, None, None)) def check_soure_traceback(self, source_traceback, lineno_delta): frame = sys._getframe(1) diff --git a/trollius/transports.py b/trollius/transports.py index 10bad51..1f086c1 100644 --- a/trollius/transports.py +++ b/trollius/transports.py @@ -1,7 +1,5 @@ """Abstract Transport class.""" -import sys - from trollius import compat __all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport', diff --git a/trollius/unix_events.py b/trollius/unix_events.py index fcccaaa..cdefaca 100644 --- a/trollius/unix_events.py +++ b/trollius/unix_events.py @@ -399,7 +399,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._pipe is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) @@ -582,7 +582,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._pipe is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) |