summaryrefslogtreecommitdiff
path: root/asyncio
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2014-01-31 13:19:24 -0800
committerGuido van Rossum <guido@python.org>2014-01-31 13:19:24 -0800
commit6dd8d720a3399f8ab9079571a55d5b706ab073f3 (patch)
tree23445ec84e8e81778b0e7ed358e79a750a42c7b8 /asyncio
parent49556edac764ec6fd3619abe7ab8991ec9855cc5 (diff)
downloadtrollius-6dd8d720a3399f8ab9079571a55d5b706ab073f3.tar.gz
Copy a bunch of fixes by Victor for the Proactor event loop from the CPython repo.
Diffstat (limited to 'asyncio')
-rw-r--r--asyncio/proactor_events.py39
-rw-r--r--asyncio/selectors.py2
-rw-r--r--asyncio/windows_events.py14
3 files changed, 38 insertions, 17 deletions
diff --git a/asyncio/proactor_events.py b/asyncio/proactor_events.py
index d2553eb..b6b3be2 100644
--- a/asyncio/proactor_events.py
+++ b/asyncio/proactor_events.py
@@ -205,7 +205,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self.close()
-class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
+class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
transports.WriteTransport):
"""Transport for write pipes."""
@@ -286,8 +286,27 @@ class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
self._force_close(None)
+class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
+ def __init__(self, *args, **kw):
+ super().__init__(*args, **kw)
+ self._read_fut = self._loop._proactor.recv(self._sock, 16)
+ self._read_fut.add_done_callback(self._pipe_closed)
+
+ def _pipe_closed(self, fut):
+ if fut.cancelled():
+ # the transport has been closed
+ return
+ assert fut is self._read_fut, (fut, self._read_fut)
+ self._read_fut = None
+ assert fut.result() == b''
+ if self._write_fut is not None:
+ self._force_close(exc)
+ else:
+ self.close()
+
+
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
- _ProactorWritePipeTransport,
+ _ProactorBaseWritePipeTransport,
transports.Transport):
"""Transport for duplex pipes."""
@@ -299,7 +318,7 @@ class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
class _ProactorSocketTransport(_ProactorReadPipeTransport,
- _ProactorWritePipeTransport,
+ _ProactorBaseWritePipeTransport,
transports.Transport):
"""Transport for connected sockets."""
@@ -335,6 +354,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._selector = proactor # convenient alias
self._self_reading_future = None
self._accept_futures = {} # socket file descriptor => Future
+ self._granularity = max(proactor.resolution, self._granularity)
proactor.set_loop(self)
self._make_self_pipe()
@@ -353,15 +373,10 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
def _make_write_pipe_transport(self, sock, protocol, waiter=None,
- extra=None, check_for_hangup=True):
- if check_for_hangup:
- # We want connection_lost() to be called when other end closes
- return _ProactorDuplexPipeTransport(self,
- sock, protocol, waiter, extra)
- else:
- # If other end closes we may not notice for a long time
- return _ProactorWritePipeTransport(self, sock, protocol, waiter,
- extra)
+ extra=None):
+ # We want connection_lost() to be called when other end closes
+ return _ProactorWritePipeTransport(self,
+ sock, protocol, waiter, extra)
def close(self):
if self._proactor is not None:
diff --git a/asyncio/selectors.py b/asyncio/selectors.py
index 52ee8db..056e45c 100644
--- a/asyncio/selectors.py
+++ b/asyncio/selectors.py
@@ -372,7 +372,7 @@ if hasattr(select, 'poll'):
else:
# poll() has a resolution of 1 millisecond, round away from
# zero to wait *at least* timeout seconds.
- timeout = int(math.ceil(timeout * 1e3))
+ timeout = math.ceil(timeout * 1e3)
ready = []
try:
fd_event_list = self._poll.poll(timeout)
diff --git a/asyncio/windows_events.py b/asyncio/windows_events.py
index d01de2f..b8574fa 100644
--- a/asyncio/windows_events.py
+++ b/asyncio/windows_events.py
@@ -1,11 +1,12 @@
"""Selector and proactor eventloops for Windows."""
+import _winapi
import errno
+import math
import socket
+import struct
import subprocess
import weakref
-import struct
-import _winapi
from . import events
from . import base_subprocess
@@ -190,6 +191,7 @@ class IocpProactor:
self._cache = {}
self._registered = weakref.WeakSet()
self._stopped_serving = weakref.WeakSet()
+ self.resolution = 1e-3
def set_loop(self, loop):
self._loop = loop
@@ -325,7 +327,9 @@ class IocpProactor:
if timeout is None:
ms = _winapi.INFINITE
else:
- ms = int(timeout * 1000 + 0.5)
+ # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
+ # round away from zero to wait *at least* timeout seconds.
+ ms = math.ceil(timeout * 1e3)
# We only create ov so we can use ov.address as a key for the cache.
ov = _overlapped.Overlapped(NULL)
@@ -396,7 +400,9 @@ class IocpProactor:
elif timeout < 0:
raise ValueError("negative timeout")
else:
- ms = int(timeout * 1000 + 0.5)
+ # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
+ # round away from zero to wait *at least* timeout seconds.
+ ms = math.ceil(timeout * 1e3)
if ms >= INFINITE:
raise ValueError("timeout too big")
while True: