summaryrefslogtreecommitdiff
path: root/asyncio/streams.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-07-22 12:02:58 +0200
committerVictor Stinner <victor.stinner@gmail.com>2014-07-22 12:02:58 +0200
commite78780d8814e2f195de03a45f2edebbe1e5c71e8 (patch)
tree735e562e075150e9bca3a705b19018ed9a0ee8c3 /asyncio/streams.py
parent387d807d731bd0844b9492f9fe1a143f30ef8643 (diff)
downloadtrollius-e78780d8814e2f195de03a45f2edebbe1e5c71e8.tar.gz
Tulip issue #193: Convert StreamWriter.drain() to a classic coroutine
Replace also _make_drain_waiter() function with a classic _drain_helper() coroutine.
Diffstat (limited to 'asyncio/streams.py')
-rw-r--r--asyncio/streams.py37
1 files changed, 18 insertions, 19 deletions
diff --git a/asyncio/streams.py b/asyncio/streams.py
index d18db77..c77eb60 100644
--- a/asyncio/streams.py
+++ b/asyncio/streams.py
@@ -141,15 +141,14 @@ class FlowControlMixin(protocols.Protocol):
resume_reading() and connection_lost(). If the subclass overrides
these it must call the super methods.
- StreamWriter.drain() must check for error conditions and then call
- _make_drain_waiter(), which will return either () or a Future
- depending on the paused state.
+ StreamWriter.drain() must wait for _drain_helper() coroutine.
"""
def __init__(self, loop=None):
self._loop = loop # May be None; we may never need it.
self._paused = False
self._drain_waiter = None
+ self._connection_lost = False
def pause_writing(self):
assert not self._paused
@@ -170,6 +169,7 @@ class FlowControlMixin(protocols.Protocol):
waiter.set_result(None)
def connection_lost(self, exc):
+ self._connection_lost = True
# Wake up the writer if currently paused.
if not self._paused:
return
@@ -184,14 +184,17 @@ class FlowControlMixin(protocols.Protocol):
else:
waiter.set_exception(exc)
- def _make_drain_waiter(self):
+ @coroutine
+ def _drain_helper(self):
+ if self._connection_lost:
+ raise ConnectionResetError('Connection lost')
if not self._paused:
- return ()
+ return
waiter = self._drain_waiter
assert waiter is None or waiter.cancelled()
waiter = futures.Future(loop=self._loop)
self._drain_waiter = waiter
- return waiter
+ yield from waiter
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
@@ -247,6 +250,8 @@ class StreamWriter:
def __init__(self, transport, protocol, reader, loop):
self._transport = transport
self._protocol = protocol
+ # drain() expects that the reader has a exception() method
+ assert reader is None or isinstance(reader, StreamReader)
self._reader = reader
self._loop = loop
@@ -278,26 +283,20 @@ class StreamWriter:
def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)
+ @coroutine
def drain(self):
- """This method has an unusual return value.
+ """Flush the write buffer.
The intended use is to write
w.write(data)
yield from w.drain()
-
- When there's nothing to wait for, drain() returns (), and the
- yield-from continues immediately. When the transport buffer
- is full (the protocol is paused), drain() creates and returns
- a Future and the yield-from will block until that Future is
- completed, which will happen when the buffer is (partially)
- drained and the protocol is resumed.
"""
- if self._reader is not None and self._reader._exception is not None:
- raise self._reader._exception
- if self._transport._conn_lost: # Uses private variable.
- raise ConnectionResetError('Connection lost')
- return self._protocol._make_drain_waiter()
+ if self._reader is not None:
+ exc = self._reader.exception()
+ if exc is not None:
+ raise exc
+ yield from self._protocol._drain_helper()
class StreamReader: