summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--asyncio/windows_events.py168
-rw-r--r--overlapped.c25
2 files changed, 159 insertions, 34 deletions
diff --git a/asyncio/windows_events.py b/asyncio/windows_events.py
index 82d0966..5105426 100644
--- a/asyncio/windows_events.py
+++ b/asyncio/windows_events.py
@@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future):
self._ov = None
-class _WaitHandleFuture(futures.Future):
+class _BaseWaitHandleFuture(futures.Future):
"""Subclass of Future which represents a wait handle."""
- def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
+ def __init__(self, ov, handle, wait_handle, *, loop=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
- # iocp and ov are only used by cancel() to notify IocpProactor
- # that the wait was cancelled
- self._iocp = iocp
+ # Keep a reference to the Overlapped object to keep it alive until the
+ # wait is unregistered
self._ov = ov
self._handle = handle
self._wait_handle = wait_handle
+ # Should we call UnregisterWaitEx() if the wait completes
+ # or is cancelled?
+ self._registered = True
+
def _poll(self):
# non-blocking wait: use a timeout of 0 millisecond
return (_winapi.WaitForSingleObject(self._handle, 0) ==
@@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future):
def _repr_info(self):
info = super()._repr_info()
- info.insert(1, 'handle=%#x' % self._handle)
- if self._wait_handle:
+ info.append('handle=%#x' % self._handle)
+ if self._handle is not None:
state = 'signaled' if self._poll() else 'waiting'
- info.insert(1, 'wait_handle=<%s, %#x>'
- % (state, self._wait_handle))
+ info.append(state)
+ if self._wait_handle is not None:
+ info.append('wait_handle=%#x' % self._wait_handle)
return info
+ def _unregister_wait_cb(self, fut):
+ # The wait was unregistered: it's not safe to destroy the Overlapped
+ # object
+ self._ov = None
+
def _unregister_wait(self):
- if self._wait_handle is None:
+ if not self._registered:
return
+ self._registered = False
+
try:
_overlapped.UnregisterWait(self._wait_handle)
except OSError as exc:
- # ERROR_IO_PENDING is not an error, the wait was unregistered
- if exc.winerror != _overlapped.ERROR_IO_PENDING:
+ self._wait_handle = None
+ if exc.winerror == _overlapped.ERROR_IO_PENDING:
+ # ERROR_IO_PENDING is not an error, the wait was unregistered
+ self._unregister_wait_cb(None)
+ elif exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
@@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future):
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- self._wait_handle = None
- self._iocp = None
- self._ov = None
+ else:
+ self._wait_handle = None
+ self._unregister_wait_cb(None)
def cancel(self):
- result = super().cancel()
- if self._ov is not None:
- # signal the cancellation to the overlapped object
- _overlapped.PostQueuedCompletionStatus(self._iocp, True,
- 0, self._ov.address)
self._unregister_wait()
- return result
+ return super().cancel()
def set_exception(self, exception):
- super().set_exception(exception)
self._unregister_wait()
+ super().set_exception(exception)
def set_result(self, result):
- super().set_result(result)
self._unregister_wait()
+ super().set_result(result)
+
+
+class _WaitCancelFuture(_BaseWaitHandleFuture):
+ """Subclass of Future which represents a wait for the cancellation of a
+ _WaitHandleFuture using an event.
+ """
+
+ def __init__(self, ov, event, wait_handle, *, loop=None):
+ super().__init__(ov, event, wait_handle, loop=loop)
+
+ self._done_callback = None
+
+ def _schedule_callbacks(self):
+ super(_WaitCancelFuture, self)._schedule_callbacks()
+ if self._done_callback is not None:
+ self._done_callback(self)
+
+
+class _WaitHandleFuture(_BaseWaitHandleFuture):
+ def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
+ super().__init__(ov, handle, wait_handle, loop=loop)
+ self._proactor = proactor
+ self._unregister_proactor = True
+ self._event = _overlapped.CreateEvent(None, True, False, None)
+ self._event_fut = None
+
+ def _unregister_wait_cb(self, fut):
+ if self._event is not None:
+ _winapi.CloseHandle(self._event)
+ self._event = None
+ self._event_fut = None
+
+ # If the wait was cancelled, the wait may never be signalled, so
+ # it's required to unregister it. Otherwise, IocpProactor.close() will
+ # wait forever for an event which will never come.
+ #
+ # If the IocpProactor already received the event, it's safe to call
+ # _unregister() because we kept a reference to the Overlapped object
+ # which is used as an unique key.
+ self._proactor._unregister(self._ov)
+ self._proactor = None
+
+ super()._unregister_wait_cb(fut)
+
+ def _unregister_wait(self):
+ if not self._registered:
+ return
+ self._registered = False
+
+ try:
+ _overlapped.UnregisterWaitEx(self._wait_handle, self._event)
+ except OSError as exc:
+ self._wait_handle = None
+ if exc.winerror == _overlapped.ERROR_IO_PENDING:
+ # ERROR_IO_PENDING is not an error, the wait was unregistered
+ self._unregister_wait_cb(None)
+ elif exc.winerror != _overlapped.ERROR_IO_PENDING:
+ context = {
+ 'message': 'Failed to unregister the wait handle',
+ 'exception': exc,
+ 'future': self,
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
+ else:
+ self._wait_handle = None
+ self._event_fut = self._proactor._wait_cancel(
+ self._event,
+ self._unregister_wait_cb)
class PipeServer(object):
@@ -291,6 +370,7 @@ class IocpProactor:
_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
self._cache = {}
self._registered = weakref.WeakSet()
+ self._unregistered = []
self._stopped_serving = weakref.WeakSet()
def __repr__(self):
@@ -438,6 +518,16 @@ class IocpProactor:
Return a Future object. The result of the future is True if the wait
completed, or False if the wait did not complete (on timeout).
"""
+ return self._wait_for_handle(handle, timeout, False)
+
+ def _wait_cancel(self, event, done_callback):
+ fut = self._wait_for_handle(event, None, True)
+ # add_done_callback() cannot be used because the wait may only complete
+ # in IocpProactor.close(), while the event loop is not running.
+ fut._done_callback = done_callback
+ return fut
+
+ def _wait_for_handle(self, handle, timeout, _is_cancel):
if timeout is None:
ms = _winapi.INFINITE
else:
@@ -447,9 +537,13 @@ class IocpProactor:
# We only create ov so we can use ov.address as a key for the cache.
ov = _overlapped.Overlapped(NULL)
- wh = _overlapped.RegisterWaitWithQueue(
+ wait_handle = _overlapped.RegisterWaitWithQueue(
handle, self._iocp, ov.address, ms)
- f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
+ if _is_cancel:
+ f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
+ else:
+ f = _WaitHandleFuture(ov, handle, wait_handle, self,
+ loop=self._loop)
if f._source_traceback:
del f._source_traceback[-1]
@@ -462,14 +556,6 @@ class IocpProactor:
# False even though we have not timed out.
return f._poll()
- if f._poll():
- try:
- result = f._poll()
- except OSError as exc:
- f.set_exception(exc)
- else:
- f.set_result(result)
-
self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
return f
@@ -521,6 +607,15 @@ class IocpProactor:
self._cache[ov.address] = (f, ov, obj, callback)
return f
+ def _unregister(self, ov):
+ """Unregister an overlapped object.
+
+ Call this method when its future has been cancelled. The event can
+ already be signalled (pending in the proactor event queue). It is also
+ safe if the event is never signalled (because it was cancelled).
+ """
+ self._unregistered.append(ov)
+
def _get_accept_socket(self, family):
s = socket.socket(family)
s.settimeout(0)
@@ -541,7 +636,7 @@ class IocpProactor:
while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
- return
+ break
ms = 0
err, transferred, key, address = status
@@ -576,6 +671,11 @@ class IocpProactor:
f.set_result(value)
self._results.append(f)
+ # Remove unregisted futures
+ for ov in self._unregistered:
+ self._cache.pop(ov.address, None)
+ self._unregistered.clear()
+
def _stop_serving(self, obj):
# obj is a socket or pipe handle. It will be closed in
# BaseProactorEventLoop._stop_serving() which will make any
diff --git a/overlapped.c b/overlapped.c
index 6842efb..d22c626 100644
--- a/overlapped.c
+++ b/overlapped.c
@@ -309,6 +309,29 @@ overlapped_UnregisterWait(PyObject *self, PyObject *args)
Py_RETURN_NONE;
}
+PyDoc_STRVAR(
+ UnregisterWaitEx_doc,
+ "UnregisterWaitEx(WaitHandle, Event) -> None\n\n"
+ "Unregister wait handle.\n");
+
+static PyObject *
+overlapped_UnregisterWaitEx(PyObject *self, PyObject *args)
+{
+ HANDLE WaitHandle, Event;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE, &WaitHandle, &Event))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = UnregisterWaitEx(WaitHandle, Event);
+ Py_END_ALLOW_THREADS
+
+ if (!ret)
+ return SetFromWindowsErr(0);
+ Py_RETURN_NONE;
+}
+
/*
* Event functions -- currently only used by tests
*/
@@ -1319,6 +1342,8 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, RegisterWaitWithQueue_doc},
{"UnregisterWait", overlapped_UnregisterWait,
METH_VARARGS, UnregisterWait_doc},
+ {"UnregisterWaitEx", overlapped_UnregisterWaitEx,
+ METH_VARARGS, UnregisterWaitEx_doc},
{"CreateEvent", overlapped_CreateEvent,
METH_VARARGS, CreateEvent_doc},
{"SetEvent", overlapped_SetEvent,