summaryrefslogtreecommitdiff
path: root/trollius
diff options
context:
space:
mode:
Diffstat (limited to 'trollius')
-rw-r--r--trollius/base_events.py4
-rw-r--r--trollius/base_subprocess.py13
-rw-r--r--trollius/locks.py1
-rw-r--r--trollius/proactor_events.py4
-rw-r--r--trollius/queues.py49
-rw-r--r--trollius/selector_events.py3
-rw-r--r--trollius/sslproto.py11
-rw-r--r--trollius/streams.py20
-rw-r--r--trollius/tasks.py7
-rw-r--r--trollius/test_utils.py2
-rw-r--r--trollius/transports.py2
-rw-r--r--trollius/unix_events.py4
12 files changed, 89 insertions, 31 deletions
diff --git a/trollius/base_events.py b/trollius/base_events.py
index d4dc448..c5e6eff 100644
--- a/trollius/base_events.py
+++ b/trollius/base_events.py
@@ -387,7 +387,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if not self.is_closed():
warnings.warn("unclosed event loop %r" % self, ResourceWarning)
@@ -1225,7 +1225,7 @@ class BaseEventLoop(events.AbstractEventLoop):
return
enabled = bool(enabled)
- if self._coroutine_wrapper_set is enabled:
+ if self._coroutine_wrapper_set == enabled:
return
wrapper = coroutines.debug_wrapper
diff --git a/trollius/base_subprocess.py b/trollius/base_subprocess.py
index 13cff7a..ffd6e76 100644
--- a/trollius/base_subprocess.py
+++ b/trollius/base_subprocess.py
@@ -1,8 +1,8 @@
import collections
import subprocess
-import sys
import warnings
+from . import compat
from . import futures
from . import protocols
from . import transports
@@ -36,8 +36,13 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._pipes[2] = None
# Create the child process: set the _proc attribute
- self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
- stderr=stderr, bufsize=bufsize, **kwargs)
+ try:
+ self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
+ stderr=stderr, bufsize=bufsize, **kwargs)
+ except:
+ self.close()
+ raise
+
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
@@ -112,7 +117,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if not self._closed:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
diff --git a/trollius/locks.py b/trollius/locks.py
index 0d6de65..03b4daa 100644
--- a/trollius/locks.py
+++ b/trollius/locks.py
@@ -3,7 +3,6 @@
__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
import collections
-import sys
from . import compat
from . import events
diff --git a/trollius/proactor_events.py b/trollius/proactor_events.py
index 49d8bc3..66b4caf 100644
--- a/trollius/proactor_events.py
+++ b/trollius/proactor_events.py
@@ -7,10 +7,10 @@ proactor is only implemented on Windows with IOCP.
__all__ = ['BaseProactorEventLoop']
import socket
-import sys
import warnings
from . import base_events
+from . import compat
from . import constants
from . import futures
from . import sslproto
@@ -82,7 +82,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if self._sock is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
diff --git a/trollius/queues.py b/trollius/queues.py
index 9c77c60..18167ab 100644
--- a/trollius/queues.py
+++ b/trollius/queues.py
@@ -47,7 +47,7 @@ class Queue(object):
# Futures.
self._getters = collections.deque()
- # Pairs of (item, Future).
+ # Futures
self._putters = collections.deque()
self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop)
@@ -98,7 +98,7 @@ class Queue(object):
def _consume_done_putters(self):
# Delete waiters at the head of the put() queue who've timed out.
- while self._putters and self._putters[0][1].done():
+ while self._putters and self._putters[0].done():
self._putters.popleft()
def qsize(self):
@@ -148,8 +148,9 @@ class Queue(object):
elif self._maxsize > 0 and self._maxsize <= self.qsize():
waiter = futures.Future(loop=self._loop)
- self._putters.append((item, waiter))
+ self._putters.append(waiter)
yield From(waiter)
+ self._put(item)
else:
self.__put_internal(item)
@@ -186,8 +187,7 @@ class Queue(object):
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self.__put_internal(item)
+ putter = self._putters.popleft()
# When a getter runs and frees up a slot so this putter can
# run, we need to defer the put for a tick to ensure that
@@ -201,10 +201,40 @@ class Queue(object):
raise Return(self._get())
else:
waiter = futures.Future(loop=self._loop)
-
self._getters.append(waiter)
- result = yield From(waiter)
- raise Return(result)
+ try:
+ value = (yield From(waiter))
+ raise Return(value)
+ except futures.CancelledError:
+ # if we get CancelledError, it means someone cancelled this
+ # get() coroutine. But there is a chance that the waiter
+ # already is ready and contains an item that has just been
+ # removed from the queue. In this case, we need to put the item
+ # back into the front of the queue. This get() must either
+ # succeed without fault or, if it gets cancelled, it must be as
+ # if it never happened.
+ if waiter.done():
+ self._put_it_back(waiter.result())
+ raise
+
+ def _put_it_back(self, item):
+ """
+ This is called when we have a waiter to get() an item and this waiter
+ gets cancelled. In this case, we put the item back: wake up another
+ waiter or put it in the _queue.
+ """
+ self._consume_done_getters()
+ if self._getters:
+ assert not self._queue, (
+ 'queue non-empty, why are getters waiting?')
+
+ getter = self._getters.popleft()
+ self.__put_internal(item)
+
+ # getter cannot be cancelled, we just removed done getters
+ getter.set_result(item)
+ else:
+ self._queue.appendleft(item)
def get_nowait(self):
"""Remove and return an item from the queue.
@@ -214,8 +244,7 @@ class Queue(object):
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self.__put_internal(item)
+ putter = self._putters.popleft()
# Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters
diff --git a/trollius/selector_events.py b/trollius/selector_events.py
index dc27ed1..67ef26e 100644
--- a/trollius/selector_events.py
+++ b/trollius/selector_events.py
@@ -19,6 +19,7 @@ except ImportError: # pragma: no cover
ssl = None
from . import base_events
+from . import compat
from . import constants
from . import events
from . import futures
@@ -584,7 +585,7 @@ class _SelectorTransport(transports._FlowControlMixin,
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if self._sock is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
diff --git a/trollius/sslproto.py b/trollius/sslproto.py
index 707cc6d..1404fd7 100644
--- a/trollius/sslproto.py
+++ b/trollius/sslproto.py
@@ -1,5 +1,4 @@
import collections
-import sys
import warnings
try:
import ssl
@@ -7,6 +6,7 @@ try:
except ImportError: # pragma: no cover
ssl = None
+from . import compat
from . import protocols
from . import transports
from .log import logger
@@ -326,7 +326,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if not self._closed:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
@@ -623,7 +623,8 @@ class SSLProtocol(protocols.Protocol):
if data:
ssldata, offset = self._sslpipe.feed_appdata(data, offset)
elif offset:
- ssldata = self._sslpipe.do_handshake(self._on_handshake_complete)
+ ssldata = self._sslpipe.do_handshake(
+ self._on_handshake_complete)
offset = 1
else:
ssldata = self._sslpipe.shutdown(self._finalize)
@@ -647,9 +648,13 @@ class SSLProtocol(protocols.Protocol):
self._write_buffer_size -= len(data)
except BaseException as exc:
if self._in_handshake:
+ # BaseExceptions will be re-raised in _on_handshake_complete.
self._on_handshake_complete(exc)
else:
self._fatal_error(exc, 'Fatal error on SSL transport')
+ if not isinstance(exc, Exception):
+ # BaseException
+ raise
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
diff --git a/trollius/streams.py b/trollius/streams.py
index b7ba4c5..cde58fb 100644
--- a/trollius/streams.py
+++ b/trollius/streams.py
@@ -6,7 +6,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
]
import socket
-import sys
if hasattr(socket, 'AF_UNIX'):
__all__.extend(['open_unix_connection', 'start_unix_server'])
@@ -243,6 +242,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
def eof_received(self):
self._stream_reader.feed_eof()
+ return True
class StreamWriter(object):
@@ -324,6 +324,24 @@ class StreamReader(object):
self._transport = None
self._paused = False
+ def __repr__(self):
+ info = ['StreamReader']
+ if self._buffer:
+ info.append('%d bytes' % len(info))
+ if self._eof:
+ info.append('eof')
+ if self._limit != _DEFAULT_LIMIT:
+ info.append('l=%d' % self._limit)
+ if self._waiter:
+ info.append('w=%r' % self._waiter)
+ if self._exception:
+ info.append('e=%r' % self._exception)
+ if self._transport:
+ info.append('t=%r' % self._transport)
+ if self._paused:
+ info.append('paused')
+ return '<%s>' % ' '.join(info)
+
def exception(self):
return self._exception
diff --git a/trollius/tasks.py b/trollius/tasks.py
index 8de3e62..3e0e1b1 100644
--- a/trollius/tasks.py
+++ b/trollius/tasks.py
@@ -9,7 +9,6 @@ __all__ = ['Task',
import functools
import linecache
-import sys
import traceback
import warnings
try:
@@ -141,7 +140,11 @@ class Task(futures.Future):
returned for a suspended coroutine.
"""
frames = []
- f = self._coro.gi_frame
+ try:
+ # 'async def' coroutines
+ f = self._coro.cr_frame
+ except AttributeError:
+ f = self._coro.gi_frame
if f is not None:
while f is not None:
if limit is not None:
diff --git a/trollius/test_utils.py b/trollius/test_utils.py
index 12cdd45..ebebb25 100644
--- a/trollius/test_utils.py
+++ b/trollius/test_utils.py
@@ -524,7 +524,7 @@ class TestCase(unittest.TestCase):
if six.PY2:
sys.exc_clear()
else:
- self.assertEqual(sys.exc_info(), (None, None, None))
+ pass #self.assertEqual(sys.exc_info(), (None, None, None))
def check_soure_traceback(self, source_traceback, lineno_delta):
frame = sys._getframe(1)
diff --git a/trollius/transports.py b/trollius/transports.py
index 10bad51..1f086c1 100644
--- a/trollius/transports.py
+++ b/trollius/transports.py
@@ -1,7 +1,5 @@
"""Abstract Transport class."""
-import sys
-
from trollius import compat
__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
diff --git a/trollius/unix_events.py b/trollius/unix_events.py
index fcccaaa..cdefaca 100644
--- a/trollius/unix_events.py
+++ b/trollius/unix_events.py
@@ -399,7 +399,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if self._pipe is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
@@ -582,7 +582,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if sys.version_info >= (3, 4):
+ if compat.PY34:
def __del__(self):
if self._pipe is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)