diff options
author | Guido van Rossum <guido@python.org> | 2014-01-31 13:19:24 -0800 |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2014-01-31 13:19:24 -0800 |
commit | 6dd8d720a3399f8ab9079571a55d5b706ab073f3 (patch) | |
tree | 23445ec84e8e81778b0e7ed358e79a750a42c7b8 /asyncio | |
parent | 49556edac764ec6fd3619abe7ab8991ec9855cc5 (diff) | |
download | trollius-6dd8d720a3399f8ab9079571a55d5b706ab073f3.tar.gz |
Copy a bunch of fixes by Victor for the Proactor event loop from the CPython repo.
Diffstat (limited to 'asyncio')
-rw-r--r-- | asyncio/proactor_events.py | 39 | ||||
-rw-r--r-- | asyncio/selectors.py | 2 | ||||
-rw-r--r-- | asyncio/windows_events.py | 14 |
3 files changed, 38 insertions, 17 deletions
diff --git a/asyncio/proactor_events.py b/asyncio/proactor_events.py index d2553eb..b6b3be2 100644 --- a/asyncio/proactor_events.py +++ b/asyncio/proactor_events.py @@ -205,7 +205,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self.close() -class _ProactorWritePipeTransport(_ProactorBasePipeTransport, +class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, transports.WriteTransport): """Transport for write pipes.""" @@ -286,8 +286,27 @@ class _ProactorWritePipeTransport(_ProactorBasePipeTransport, self._force_close(None) +class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): + def __init__(self, *args, **kw): + super().__init__(*args, **kw) + self._read_fut = self._loop._proactor.recv(self._sock, 16) + self._read_fut.add_done_callback(self._pipe_closed) + + def _pipe_closed(self, fut): + if fut.cancelled(): + # the transport has been closed + return + assert fut is self._read_fut, (fut, self._read_fut) + self._read_fut = None + assert fut.result() == b'' + if self._write_fut is not None: + self._force_close(exc) + else: + self.close() + + class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, - _ProactorWritePipeTransport, + _ProactorBaseWritePipeTransport, transports.Transport): """Transport for duplex pipes.""" @@ -299,7 +318,7 @@ class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, class _ProactorSocketTransport(_ProactorReadPipeTransport, - _ProactorWritePipeTransport, + _ProactorBaseWritePipeTransport, transports.Transport): """Transport for connected sockets.""" @@ -335,6 +354,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self._selector = proactor # convenient alias self._self_reading_future = None self._accept_futures = {} # socket file descriptor => Future + self._granularity = max(proactor.resolution, self._granularity) proactor.set_loop(self) self._make_self_pipe() @@ -353,15 +373,10 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra) def _make_write_pipe_transport(self, sock, protocol, waiter=None, - extra=None, check_for_hangup=True): - if check_for_hangup: - # We want connection_lost() to be called when other end closes - return _ProactorDuplexPipeTransport(self, - sock, protocol, waiter, extra) - else: - # If other end closes we may not notice for a long time - return _ProactorWritePipeTransport(self, sock, protocol, waiter, - extra) + extra=None): + # We want connection_lost() to be called when other end closes + return _ProactorWritePipeTransport(self, + sock, protocol, waiter, extra) def close(self): if self._proactor is not None: diff --git a/asyncio/selectors.py b/asyncio/selectors.py index 52ee8db..056e45c 100644 --- a/asyncio/selectors.py +++ b/asyncio/selectors.py @@ -372,7 +372,7 @@ if hasattr(select, 'poll'): else: # poll() has a resolution of 1 millisecond, round away from # zero to wait *at least* timeout seconds. - timeout = int(math.ceil(timeout * 1e3)) + timeout = math.ceil(timeout * 1e3) ready = [] try: fd_event_list = self._poll.poll(timeout) diff --git a/asyncio/windows_events.py b/asyncio/windows_events.py index d01de2f..b8574fa 100644 --- a/asyncio/windows_events.py +++ b/asyncio/windows_events.py @@ -1,11 +1,12 @@ """Selector and proactor eventloops for Windows.""" +import _winapi import errno +import math import socket +import struct import subprocess import weakref -import struct -import _winapi from . import events from . import base_subprocess @@ -190,6 +191,7 @@ class IocpProactor: self._cache = {} self._registered = weakref.WeakSet() self._stopped_serving = weakref.WeakSet() + self.resolution = 1e-3 def set_loop(self, loop): self._loop = loop @@ -325,7 +327,9 @@ class IocpProactor: if timeout is None: ms = _winapi.INFINITE else: - ms = int(timeout * 1000 + 0.5) + # RegisterWaitForSingleObject() has a resolution of 1 millisecond, + # round away from zero to wait *at least* timeout seconds. + ms = math.ceil(timeout * 1e3) # We only create ov so we can use ov.address as a key for the cache. ov = _overlapped.Overlapped(NULL) @@ -396,7 +400,9 @@ class IocpProactor: elif timeout < 0: raise ValueError("negative timeout") else: - ms = int(timeout * 1000 + 0.5) + # GetQueuedCompletionStatus() has a resolution of 1 millisecond, + # round away from zero to wait *at least* timeout seconds. + ms = math.ceil(timeout * 1e3) if ms >= INFINITE: raise ValueError("timeout too big") while True: |