From 8f95cfe898cf596ea930fba403e0f787475eda5f Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 14 Jan 2015 00:52:39 +0100 Subject: Python issue #23198: Reactor StreamReader - Add a new _wakeup_waiter() method - Replace _create_waiter() method with a _wait_for_data() coroutine function - Use the value None instead of True or False to wake up the waiter --- asyncio/streams.py | 47 ++++++++++++++++++++++------------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/asyncio/streams.py b/asyncio/streams.py index 5a96b24..7ff16a4 100644 --- a/asyncio/streams.py +++ b/asyncio/streams.py @@ -313,8 +313,8 @@ class StreamReader: else: self._loop = loop self._buffer = bytearray() - self._eof = False # Whether we're done. - self._waiter = None # A future. + self._eof = False # Whether we're done. + self._waiter = None # A future used by _wait_for_data() self._exception = None self._transport = None self._paused = False @@ -331,6 +331,14 @@ class StreamReader: if not waiter.cancelled(): waiter.set_exception(exc) + def _wakeup_waiter(self): + """Wakeup read() or readline() function waiting for data or EOF.""" + waiter = self._waiter + if waiter is not None: + self._waiter = None + if not waiter.cancelled(): + waiter.set_result(None) + def set_transport(self, transport): assert self._transport is None, 'Transport already set' self._transport = transport @@ -342,11 +350,7 @@ class StreamReader: def feed_eof(self): self._eof = True - waiter = self._waiter - if waiter is not None: - self._waiter = None - if not waiter.cancelled(): - waiter.set_result(True) + self._wakeup_waiter() def at_eof(self): """Return True if the buffer is empty and 'feed_eof' was called.""" @@ -359,12 +363,7 @@ class StreamReader: return self._buffer.extend(data) - - waiter = self._waiter - if waiter is not None: - self._waiter = None - if not waiter.cancelled(): - waiter.set_result(False) + self._wakeup_waiter() if (self._transport is not None and not self._paused and @@ -379,7 +378,8 @@ class StreamReader: else: self._paused = True - def _create_waiter(self, func_name): + def _wait_for_data(self, func_name): + """Wait until feed_data() or feed_eof() is called.""" # StreamReader uses a future to link the protocol feed_data() method # to a read coroutine. Running two read coroutines at the same time # would have an unexpected behaviour. It would not possible to know @@ -387,7 +387,12 @@ class StreamReader: if self._waiter is not None: raise RuntimeError('%s() called while another coroutine is ' 'already waiting for incoming data' % func_name) - return futures.Future(loop=self._loop) + + self._waiter = futures.Future(loop=self._loop) + try: + yield from self._waiter + finally: + self._waiter = None @coroutine def readline(self): @@ -417,11 +422,7 @@ class StreamReader: break if not_enough: - self._waiter = self._create_waiter('readline') - try: - yield from self._waiter - finally: - self._waiter = None + yield from self._wait_for_data('readline') self._maybe_resume_transport() return bytes(line) @@ -448,11 +449,7 @@ class StreamReader: return b''.join(blocks) else: if not self._buffer and not self._eof: - self._waiter = self._create_waiter('read') - try: - yield from self._waiter - finally: - self._waiter = None + yield from self._wait_for_data('read') if n < 0 or len(self._buffer) <= n: data = bytes(self._buffer) -- cgit v1.2.1